fix: close p0 auth and release gate gaps
This commit is contained in:
@@ -37,22 +37,29 @@ func main() {
|
||||
}
|
||||
|
||||
// 加载配置
|
||||
cfg, err := config.Load(*env)
|
||||
cfg, err := config.LoadFromPath(*env, *configPath)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to load config: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("starting supply-api in %s mode", *env)
|
||||
isProd := *env == "prod"
|
||||
|
||||
// P1-010修复: 初始化结构化日志
|
||||
jsonLogger := logging.NewLogger("supply-api", logging.LogLevelInfo)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
rootCtx, stop := context.WithCancel(context.Background())
|
||||
defer stop()
|
||||
|
||||
initCtx, initCancel := context.WithTimeout(rootCtx, 30*time.Second)
|
||||
defer initCancel()
|
||||
|
||||
// 初始化数据库连接
|
||||
db, err := repository.NewDB(ctx, cfg.Database)
|
||||
db, err := repository.NewDB(initCtx, cfg.Database)
|
||||
if err != nil {
|
||||
if isProd {
|
||||
log.Fatalf("production startup requirement failed: database unavailable: %v", err)
|
||||
}
|
||||
log.Printf("warning: failed to connect to database: %v (using in-memory store)", err)
|
||||
db = nil
|
||||
} else {
|
||||
@@ -63,7 +70,11 @@ func main() {
|
||||
// 初始化Redis缓存
|
||||
redisCache, err := cache.NewRedisCache(cfg.Redis)
|
||||
if err != nil {
|
||||
log.Printf("warning: failed to connect to redis: %v (caching disabled)", err)
|
||||
if isProd {
|
||||
log.Printf("warning: redis unavailable at startup: %v", err)
|
||||
} else {
|
||||
log.Printf("warning: failed to connect to redis: %v (caching disabled)", err)
|
||||
}
|
||||
redisCache = nil
|
||||
} else {
|
||||
log.Printf("connected to redis at %s:%d", cfg.Redis.Host, cfg.Redis.Port)
|
||||
@@ -154,7 +165,7 @@ func main() {
|
||||
// 启动主动吊销订阅机制(仅在Redis可用时)
|
||||
if redisCache != nil {
|
||||
if dbTokenBackend, ok := tokenBackend.(*middleware.DBTokenStatusBackend); ok {
|
||||
if err := dbTokenBackend.StartRevocationSubscriber(ctx); err != nil {
|
||||
if err := dbTokenBackend.StartRevocationSubscriber(rootCtx); err != nil {
|
||||
log.Printf("警告: 启动主动吊销订阅失败: %v", err)
|
||||
} else {
|
||||
log.Println("主动吊销机制: 已启动 (Redis Pub/Sub)")
|
||||
@@ -172,6 +183,8 @@ func main() {
|
||||
// 初始化鉴权中间件
|
||||
authConfig := middleware.AuthConfig{
|
||||
SecretKey: cfg.Token.SecretKey,
|
||||
PublicKey: cfg.Token.PublicKey,
|
||||
Algorithm: cfg.Token.Algorithm,
|
||||
Issuer: cfg.Token.Issuer,
|
||||
CacheTTL: cfg.Token.RevocationCacheTTL,
|
||||
Enabled: *env != "dev", // 开发模式禁用鉴权
|
||||
@@ -210,6 +223,7 @@ func main() {
|
||||
cfg.Server.StatementBaseURL,
|
||||
time.Now,
|
||||
)
|
||||
api.SetWithdrawEnabled(cfg.Settlement.WithdrawEnabled)
|
||||
|
||||
// 创建路由器
|
||||
mux := http.NewServeMux()
|
||||
@@ -254,14 +268,12 @@ func main() {
|
||||
|
||||
// 生产环境启用安全中间件
|
||||
if *env != "dev" {
|
||||
// 5. QueryKeyReject - 拒绝外部query key
|
||||
handler = authMiddleware.QueryKeyRejectMiddleware(handler)
|
||||
// 6. BearerExtract
|
||||
handler = authMiddleware.BearerExtractMiddleware(handler)
|
||||
// 7. TokenVerify
|
||||
handler = authMiddleware.TokenVerifyMiddleware(handler)
|
||||
// 8. RateLimit - 限流 (使用中间件包装器)
|
||||
// 包装顺序与请求执行顺序相反,这里从内到外构建,保证实际执行顺序为:
|
||||
// QueryKeyReject -> BearerExtract -> TokenVerify -> RateLimit
|
||||
handler = middleware.NewRateLimitHandler(rateLimitConfig, handler)
|
||||
handler = authMiddleware.TokenVerifyMiddleware(handler)
|
||||
handler = authMiddleware.BearerExtractMiddleware(handler)
|
||||
handler = authMiddleware.QueryKeyRejectMiddleware(handler)
|
||||
}
|
||||
|
||||
// 创建HTTP服务器
|
||||
@@ -274,6 +286,14 @@ func main() {
|
||||
IdleTimeout: cfg.Server.IdleTimeout,
|
||||
}
|
||||
|
||||
serverErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
log.Printf("starting HTTP server on %s", cfg.Server.Addr)
|
||||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
serverErrCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
// P0-06修复: 启动OutboxProcessor(仅在DB可用时)
|
||||
var outboxProcessor *outbox.OutboxProcessorRunner
|
||||
if db != nil {
|
||||
@@ -284,14 +304,21 @@ func main() {
|
||||
redisClient := redisCache.GetClient()
|
||||
msgBroker = messaging.NewOutboxMessageBroker(redisClient, "supply:outbox:stream", "outbox-processor")
|
||||
}
|
||||
stats := &messaging.NoOpOutboxStats{}
|
||||
outboxProcessor = outbox.NewOutboxProcessorRunner(outboxRepo, msgBroker, stats)
|
||||
go outboxProcessor.Start(ctx)
|
||||
log.Println("OutboxProcessor已启动")
|
||||
if msgBroker == nil {
|
||||
if isProd {
|
||||
log.Fatalf("production startup requirement failed: outbox message broker unavailable")
|
||||
}
|
||||
log.Println("警告: OutboxProcessor未启动 (message broker不可用)")
|
||||
} else {
|
||||
stats := &messaging.NoOpOutboxStats{}
|
||||
outboxProcessor = outbox.NewOutboxProcessorRunner(outboxRepo, msgBroker, stats)
|
||||
go outboxProcessor.Start(rootCtx)
|
||||
log.Println("OutboxProcessor已启动")
|
||||
}
|
||||
|
||||
// 分区维护:确保未来分区已创建
|
||||
partitionManager := repository.NewPartitionManager(db.Pool)
|
||||
if err := partitionManager.EnsureFuturePartitions(ctx); err != nil {
|
||||
if err := partitionManager.EnsureFuturePartitions(initCtx); err != nil {
|
||||
log.Printf("警告: 预创建未来分区失败: %v", err)
|
||||
} else {
|
||||
log.Println("分区管理: 未来分区已确保存在")
|
||||
@@ -303,7 +330,7 @@ func main() {
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-rootCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := partitionManager.EnsureFuturePartitions(context.Background()); err != nil {
|
||||
@@ -327,14 +354,21 @@ func main() {
|
||||
log.Println("批量补偿处理器: 已初始化")
|
||||
|
||||
// 启动后台补偿处理goroutine
|
||||
compensationProcessor.StartBackgroundWorker(ctx, 5*time.Minute)
|
||||
compensationProcessor.StartBackgroundWorker(rootCtx, 5*time.Minute)
|
||||
log.Println("批量补偿处理器: 后台worker已启动 (每5分钟检查一次)")
|
||||
}
|
||||
|
||||
// 优雅关闭
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigCh
|
||||
select {
|
||||
case sig := <-sigCh:
|
||||
log.Printf("received signal %s", sig)
|
||||
case err := <-serverErrCh:
|
||||
log.Fatalf("server failed: %v", err)
|
||||
}
|
||||
|
||||
stop()
|
||||
|
||||
log.Println("shutting down...")
|
||||
|
||||
|
||||
71
supply-api/cmd/supply-api/main_test.go
Normal file
71
supply-api/cmd/supply-api/main_test.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMain_ProdStartupFailsWhenDatabaseUnavailable(t *testing.T) {
|
||||
configPath := filepath.Join(t.TempDir(), "config.prod.yaml")
|
||||
content := []byte(`
|
||||
server:
|
||||
addr: "127.0.0.1:0"
|
||||
shutdown_timeout: 1s
|
||||
default_supplier_id: 0
|
||||
database:
|
||||
host: "127.0.0.1"
|
||||
port: 1
|
||||
user: "postgres"
|
||||
password: "secret"
|
||||
database: "supply_db"
|
||||
redis:
|
||||
host: "127.0.0.1"
|
||||
port: 1
|
||||
token:
|
||||
issuer: "prod-issuer"
|
||||
secret_key: "prod-secret"
|
||||
algorithm: "HS256"
|
||||
`)
|
||||
if err := os.WriteFile(configPath, content, 0o600); err != nil {
|
||||
t.Fatalf("failed to write config file: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestMainHelperProcess", "--", "-env", "prod", "-config", configPath)
|
||||
cmd.Env = append(os.Environ(), "GO_WANT_HELPER_PROCESS=1")
|
||||
output, err := cmd.CombinedOutput()
|
||||
|
||||
if ctx.Err() == context.DeadlineExceeded {
|
||||
t.Fatalf("expected prod startup to fail fast, but process timed out. output=%s", string(output))
|
||||
}
|
||||
if err == nil {
|
||||
t.Fatalf("expected prod startup to fail, but process exited successfully. output=%s", string(output))
|
||||
}
|
||||
if !strings.Contains(string(output), "production startup requirement failed") {
|
||||
t.Fatalf("expected startup failure output to mention production startup requirement failed, got: %s", string(output))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMainHelperProcess(t *testing.T) {
|
||||
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
|
||||
return
|
||||
}
|
||||
|
||||
for i, arg := range os.Args {
|
||||
if arg != "--" {
|
||||
continue
|
||||
}
|
||||
os.Args = append([]string{os.Args[0]}, os.Args[i+1:]...)
|
||||
break
|
||||
}
|
||||
|
||||
main()
|
||||
os.Exit(0)
|
||||
}
|
||||
Reference in New Issue
Block a user