refactor(supply-api): layer runtime startup flow

This commit is contained in:
Your Name
2026-04-15 18:42:06 +08:00
parent bdacc4452c
commit 6940ff52b6
5 changed files with 848 additions and 289 deletions

View File

@@ -0,0 +1,144 @@
# Supply API Runtime Layering Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:**`supply-api/cmd/supply-api/main.go` 剩余的基础设施初始化与后台 worker 启动职责下沉到 `internal/app`,让 `main` 只保留进程入口与生命周期控制。
**Architecture:** 在现有 `internal/app/bootstrap.go` 之上新增运行时构建层 `BuildRuntime` 和后台任务启动层 `Runtime.StartBackgroundWorkers`。保持当前 dev/prod 语义、DB/Redis 降级策略和门禁错误口径不变,只把装配逻辑拆分成可测试的 app 层。
**Tech Stack:** Go, net/http, context, Go test
---
### Task 1: 抽取可复用的运行时构建层
**Files:**
- Create: `supply-api/internal/app/runtime.go`
- Create: `supply-api/internal/app/runtime_test.go`
- Modify: `supply-api/cmd/supply-api/main.go`
**Step 1: Write the failing test**
```go
func TestBuildRuntime_ProdRequiresDatabase(t *testing.T) {
_, err := buildRuntimeWithFactory(RuntimeOptions{Env: "prod", Config: cfg, Logger: testLogger{}}, runtimeFactory{
newDB: func(context.Context, config.DatabaseConfig) (*repository.DB, error) {
return nil, errors.New("db down")
},
})
if err == nil {
t.Fatal("expected prod runtime build to reject database outage")
}
}
```
**Step 2: Run test to verify it fails**
Run: `cd "supply-api" && go test ./internal/app -run 'TestBuildRuntime_(ProdRequiresDatabase|DevFallsBackToInMemoryDependencies)' -v`
Expected: FAIL因为 `BuildRuntime` 尚不存在
**Step 3: Write minimal implementation**
```go
type Runtime struct {
...
}
func BuildRuntime(opts RuntimeOptions) (*Runtime, error) {
return buildRuntimeWithFactory(opts, defaultRuntimeFactory())
}
```
**Step 4: Run test to verify it passes**
Run: `cd "supply-api" && go test ./internal/app -run 'TestBuildRuntime_(ProdRequiresDatabase|DevFallsBackToInMemoryDependencies)' -v`
Expected: PASS
**Step 5: Commit**
```bash
git add supply-api/internal/app/runtime.go supply-api/internal/app/runtime_test.go supply-api/cmd/supply-api/main.go
git commit -m "refactor(supply-api): extract runtime bootstrap"
```
### Task 2: 抽取后台 worker 启动层并收紧 main 入口
**Files:**
- Create: `supply-api/internal/app/background.go`
- Modify: `supply-api/internal/app/runtime.go`
- Modify: `supply-api/internal/app/runtime_test.go`
- Modify: `supply-api/cmd/supply-api/main.go`
**Step 1: Write the failing test**
```go
func TestRuntime_StartBackgroundWorkers_ProdRequiresOutboxBroker(t *testing.T) {
runtime := &Runtime{env: "prod", logger: testLogger{}, db: &repository.DB{}}
err := startBackgroundWorkersWithFactory(context.Background(), context.Background(), runtime, backgroundFactory{
newOutboxRepository: func(*repository.DB) outboxRepository { return stubOutboxRepository{} },
newMessageBroker: func(*cache.RedisCache) messaging.MessageBroker { return nil },
})
if err == nil {
t.Fatal("expected missing outbox broker to fail in prod")
}
}
```
**Step 2: Run test to verify it fails**
Run: `cd "supply-api" && go test ./internal/app -run 'TestRuntime_StartBackgroundWorkers_(WithoutDatabaseIsNoop|ProdRequiresOutboxBroker)' -v`
Expected: FAIL因为后台启动层尚未抽出
**Step 3: Write minimal implementation**
```go
func (r *Runtime) StartBackgroundWorkers(rootCtx, initCtx context.Context) error {
return startBackgroundWorkersWithFactory(rootCtx, initCtx, r, defaultBackgroundFactory())
}
```
**Step 4: Run test to verify it passes**
Run: `cd "supply-api" && go test ./internal/app -run 'TestRuntime_StartBackgroundWorkers_(WithoutDatabaseIsNoop|ProdRequiresOutboxBroker)' -v`
Expected: PASS
**Step 5: Commit**
```bash
git add supply-api/internal/app/background.go supply-api/internal/app/runtime.go supply-api/internal/app/runtime_test.go supply-api/cmd/supply-api/main.go
git commit -m "refactor(supply-api): layer background startup"
```
### Task 3: 全量验证与收尾
**Files:**
- Verify: `supply-api/internal/app/runtime.go`
- Verify: `supply-api/internal/app/background.go`
- Verify: `supply-api/cmd/supply-api/main.go`
**Step 1: Run focused tests**
Run: `cd "supply-api" && go test ./internal/app ./cmd/supply-api ./internal/httpapi`
Expected: PASS
**Step 2: Run e2e build-tag tests**
Run: `cd "supply-api" && go test -tags=e2e ./e2e`
Expected: PASS
**Step 3: Run repo exit verification**
Run: `bash "scripts/ci/repo_integrity_check.sh"`
Expected: PASS
**Step 4: Check formatting**
Run: `git diff --check`
Expected: no output
**Step 5: Commit**
```bash
git add docs/plans/2026-04-15-supply-api-runtime-layering-plan.md supply-api/internal/app/runtime.go supply-api/internal/app/runtime_test.go supply-api/internal/app/background.go supply-api/cmd/supply-api/main.go
git commit -m "refactor(supply-api): layer runtime startup flow"
```

View File

@@ -9,21 +9,9 @@ import (
"syscall"
"time"
"lijiaoqiao/supply-api/internal/adapter"
"lijiaoqiao/supply-api/internal/app"
"lijiaoqiao/supply-api/internal/audit"
auditrepo "lijiaoqiao/supply-api/internal/audit/repository"
auditservice "lijiaoqiao/supply-api/internal/audit/service"
"lijiaoqiao/supply-api/internal/cache"
"lijiaoqiao/supply-api/internal/compensation"
"lijiaoqiao/supply-api/internal/config"
"lijiaoqiao/supply-api/internal/domain"
"lijiaoqiao/supply-api/internal/httpapi"
"lijiaoqiao/supply-api/internal/messaging"
"lijiaoqiao/supply-api/internal/middleware"
"lijiaoqiao/supply-api/internal/outbox"
"lijiaoqiao/supply-api/internal/pkg/logging"
"lijiaoqiao/supply-api/internal/repository"
)
func main() {
@@ -47,301 +35,45 @@ func main() {
}
jsonLogger.Infof("starting supply-api in %s mode", *env)
isProd := *env == "prod"
rootCtx, stop := context.WithCancel(context.Background())
defer stop()
initCtx, initCancel := context.WithTimeout(rootCtx, 30*time.Second)
defer initCancel()
// 初始化数据库连接
db, err := repository.NewDB(initCtx, cfg.Database)
if err != nil {
if isProd {
jsonLogger.Fatalf("production startup requirement failed: database unavailable: %v", err)
}
jsonLogger.Infof("warning: failed to connect to database: %v (using in-memory store)", err)
db = nil
} else {
jsonLogger.Infof("connected to database at %s:%d", cfg.Database.Host, cfg.Database.Port)
defer db.Close()
}
// 初始化Redis缓存
redisCache, err := cache.NewRedisCache(cfg.Redis)
if err != nil {
if isProd {
jsonLogger.Infof("warning: redis unavailable at startup: %v", err)
} else {
jsonLogger.Infof("warning: failed to connect to redis: %v (caching disabled)", err)
}
redisCache = nil
} else {
jsonLogger.Infof("connected to redis at %s:%d", cfg.Redis.Host, cfg.Redis.Port)
defer redisCache.Close()
}
// 初始化存储层
var accountStore domain.AccountStore
var packageStore domain.PackageStore
var settlementStore domain.SettlementStore
var earningStore domain.EarningStore
var auditRepo *auditrepo.PostgresAuditRepository
var tokenStatusRepo *repository.TokenStatusRepository
if db != nil {
// 使用PostgreSQL存储
accountRepo := repository.NewAccountRepository(db.Pool)
packageRepo := repository.NewPackageRepository(db.Pool)
settlementRepo := repository.NewSettlementRepository(db.Pool)
usageRepo := repository.NewUsageRepository(db.Pool)
idempotencyRepo := repository.NewIdempotencyRepository(db.Pool)
auditRepo = auditrepo.NewPostgresAuditRepository(db.Pool)
tokenStatusRepo = repository.NewTokenStatusRepository(db.Pool)
// 创建DB-backed存储使用adapter包中的类型
accountStore = adapter.NewDBAccountStore(accountRepo)
packageStore = adapter.NewDBPackageStore(packageRepo)
settlementStore = adapter.NewDBSettlementStore(settlementRepo, accountRepo, db.Pool)
earningStore = adapter.NewDBEarningStore(usageRepo)
_ = idempotencyRepo // 用于幂等中间件
} else {
// 回退到内存存储(开发模式)
accountStore = adapter.NewInMemoryAccountStoreAdapter()
packageStore = adapter.NewInMemoryPackageStoreAdapter()
settlementStore = adapter.NewInMemorySettlementStoreAdapter()
earningStore = adapter.NewInMemoryEarningStoreAdapter()
}
// P0-R08修复: 初始化审计存储 - 使用DB-backed实现
var auditStore audit.AuditStore
if auditRepo != nil {
auditStore = audit.NewPostgresAuditStore(auditRepo)
jsonLogger.Info("审计存储: 使用PostgreSQL (DB-backed)")
} else {
auditStore = audit.NewMemoryAuditStore()
jsonLogger.Info("警告: 审计存储使用内存实现 (生产环境不应使用)")
}
var alertStore auditservice.AlertStoreInterface
if db != nil {
alertStore = auditrepo.NewPostgresAlertRepository(db.Pool)
jsonLogger.Info("告警存储: 使用PostgreSQL (DB-backed)")
} else {
alertStore = auditservice.NewInMemoryAlertStore()
jsonLogger.Info("警告: 告警存储使用内存实现 (仅开发环境允许)")
}
alertService := auditservice.NewAlertService(alertStore)
// P0-09修复: 初始化外键校验器
var fkValidator *repository.ForeignKeyValidator
if db != nil {
fkValidator = repository.NewForeignKeyValidator(db.Pool)
jsonLogger.Info("外键校验器: 已初始化 (PostgreSQL-backed)")
} else {
jsonLogger.Info("警告: 外键校验器未启用 (db不可用)")
}
// 初始化不变量检查器
invariantChecker := domain.NewInvariantChecker(accountStore, packageStore, settlementStore)
_ = invariantChecker // 用于业务逻辑校验
// 初始化领域服务
accountService := domain.NewAccountService(accountStore, auditStore)
packageService := domain.NewPackageService(packageStore, accountStore, auditStore)
settlementService := domain.NewSettlementService(settlementStore, earningStore, auditStore)
earningService := domain.NewEarningService(earningStore)
// 初始化幂等仓储
var idempotencyRepo *repository.IdempotencyRepository
if db != nil {
idempotencyRepo = repository.NewIdempotencyRepository(db.Pool)
_ = idempotencyRepo // idempotencyRepo 会在下面初始化幂等中间件时使用
}
// 初始化Token缓存
tokenCache := middleware.NewTokenCache()
if redisCache != nil {
// 可以使用Redis缓存
}
// 初始化token状态后端P0-03修复: 使用DB-backed实现
var tokenBackend middleware.TokenStatusBackend
if tokenStatusRepo != nil {
tokenBackend = middleware.NewDBTokenStatusBackend(tokenStatusRepo, redisCache, cfg.Token.RevocationCacheTTL)
jsonLogger.Info("Token状态后端: 使用PostgreSQL (DB-backed)")
// 启动主动吊销订阅机制仅在Redis可用时
if redisCache != nil {
if dbTokenBackend, ok := tokenBackend.(*middleware.DBTokenStatusBackend); ok {
if err := dbTokenBackend.StartRevocationSubscriber(rootCtx); err != nil {
jsonLogger.Infof("警告: 启动主动吊销订阅失败: %v", err)
} else {
jsonLogger.Info("主动吊销机制: 已启动 (Redis Pub/Sub)")
}
}
}
} else {
tokenBackend = adapter.NewMemoryTokenBackend()
jsonLogger.Info("警告: Token状态后端使用内存实现 (生产环境不应使用)")
}
// 初始化审计事件适配器NEW-P1-03修复
auditEmitter := adapter.NewAuditEmitterAdapter(auditStore)
// 初始化鉴权中间件
authConfig := middleware.AuthConfig{
SecretKey: cfg.Token.SecretKey,
PublicKey: cfg.Token.PublicKey,
Algorithm: cfg.Token.Algorithm,
Issuer: cfg.Token.Issuer,
CacheTTL: cfg.Token.RevocationCacheTTL,
Enabled: *env != "dev", // 开发模式禁用鉴权
}
authMiddleware := middleware.NewAuthMiddleware(authConfig, tokenCache, tokenBackend, auditEmitter)
// 初始化幂等中间件。
// 当仓储不可用时,相关写接口会通过统一错误码返回 503而不是切回其他实现路径。
var idempotencyMiddleware *middleware.IdempotencyMiddleware
if db != nil && idempotencyRepo != nil {
idempotencyMiddleware = middleware.NewIdempotencyMiddleware(idempotencyRepo, middleware.IdempotencyConfig{
TTL: 24 * time.Hour,
Enabled: *env != "dev",
})
jsonLogger.Info("幂等中间件已启用DB-backed")
} else {
if isProd {
jsonLogger.Fatalf("production startup requirement failed: idempotency repository unavailable")
}
jsonLogger.Info("警告幂等中间件未启用db或repo不可用- 需要幂等的写接口将返回 503")
}
// P0-05修复: 初始化限流中间件
rateLimitConfig := middleware.DefaultRateLimitConfig()
rateLimitConfig.Enabled = *env != "dev" // 生产环境启用
jsonLogger.Info("限流中间件已初始化")
// 初始化HTTP API处理器
// P0-P4修复: 使用DB-backed幂等中间件替代内联幂等存储
api, err := httpapi.NewSupplyAPI(
accountService,
packageService,
settlementService,
earningService,
idempotencyMiddleware, // 使用幂等中间件DB-backed
auditStore,
fkValidator, // P0-09修复: 外键校验器
cfg.Server.DefaultSupplierID,
cfg.Server.StatementBaseURL,
time.Now,
)
if err != nil {
jsonLogger.Fatalf("failed to initialize supply api: %v", err)
}
api.SetWithdrawEnabled(cfg.Settlement.WithdrawEnabled)
var dbHealthCheck func(ctx context.Context) error
var redisHealthCheck func(ctx context.Context) error
if db != nil {
dbHealthCheck = db.HealthCheck
}
if redisCache != nil {
redisHealthCheck = redisCache.HealthCheck
}
// 注册告警API路由
alertAPI, err := httpapi.NewAlertAPI(alertService)
if err != nil {
jsonLogger.Fatalf("failed to initialize alert api: %v", err)
}
// 创建HTTP服务器
srv, err := app.BuildServer(app.BuildServerOptions{
Env: *env,
ServerConfig: cfg.Server,
Logger: jsonLogger,
SupplyAPI: api,
AlertAPI: alertAPI,
AuthMiddleware: authMiddleware,
RateLimitConfig: rateLimitConfig,
DBHealthCheck: dbHealthCheck,
RedisHealthCheck: redisHealthCheck,
runtime, err := app.BuildRuntime(app.RuntimeOptions{
Env: *env,
Config: cfg,
Logger: jsonLogger,
InitContext: initCtx,
Now: time.Now,
})
if err != nil {
if *env == "prod" {
jsonLogger.Fatalf("production startup requirement failed: %v", err)
}
jsonLogger.Fatalf("failed to build runtime: %v", err)
}
defer runtime.Close()
srv, err := runtime.BuildServer()
if err != nil {
jsonLogger.Fatalf("failed to build http server: %v", err)
}
serverErrCh := make(chan error, 1)
go func() {
jsonLogger.Infof("starting HTTP server on %s", cfg.Server.Addr)
jsonLogger.Infof("starting HTTP server on %s", srv.Addr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
serverErrCh <- err
}
}()
// P0-06修复: 启动OutboxProcessor仅在DB可用时
var outboxProcessor *outbox.OutboxProcessorRunner
if db != nil {
outboxRepo := repository.NewOutboxRepository(db.Pool)
var msgBroker messaging.MessageBroker
if redisCache != nil {
// 使用Redis Streams作为消息代理
redisClient := redisCache.GetClient()
msgBroker = messaging.NewOutboxMessageBroker(redisClient, "supply:outbox:stream", "outbox-processor")
if err := runtime.StartBackgroundWorkers(rootCtx, initCtx); err != nil {
if *env == "prod" {
jsonLogger.Fatalf("production startup requirement failed: %v", err)
}
if msgBroker == nil {
if isProd {
jsonLogger.Fatalf("production startup requirement failed: outbox message broker unavailable")
}
jsonLogger.Info("警告: OutboxProcessor未启动 (message broker不可用)")
} else {
stats := &messaging.NoOpOutboxStats{}
outboxProcessor = outbox.NewOutboxProcessorRunner(outboxRepo, msgBroker, stats)
go outboxProcessor.Start(rootCtx)
jsonLogger.Info("OutboxProcessor已启动")
}
// 分区维护:确保未来分区已创建
partitionManager := repository.NewPartitionManager(db.Pool)
if err := partitionManager.EnsureFuturePartitions(initCtx); err != nil {
jsonLogger.Infof("警告: 预创建未来分区失败: %v", err)
} else {
jsonLogger.Info("分区管理: 未来分区已确保存在")
}
// 启动后台分区维护goroutine每小时检查一次
go func() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-rootCtx.Done():
return
case <-ticker.C:
if err := partitionManager.EnsureFuturePartitions(context.Background()); err != nil {
jsonLogger.Infof("分区维护: 预创建未来分区失败: %v", err)
}
// 清理过期分区(仅在需要时)
for _, tableName := range []string{"audit_events", "supply_usage_records", "supply_idempotency_records"} {
if _, err := partitionManager.DropOldPartitions(context.Background(), tableName); err != nil {
jsonLogger.Infof("分区维护: 清理过期分区失败 (%s): %v", tableName, err)
}
}
}
}
}()
// P0-07修复: 初始化批量补偿处理器
compensationStore := domain.NewSQLCompensationStore(db.Pool)
compensationStats := &domain.NoOpCompensationStats{}
compensationExecutor := compensation.NewDefaultCompensationExecutor()
compensationProcessor := domain.NewCompensationProcessor(compensationStore, compensationExecutor, compensationStats)
jsonLogger.Info("批量补偿处理器: 已初始化")
// 启动后台补偿处理goroutine
compensationProcessor.StartBackgroundWorker(rootCtx, 5*time.Minute)
jsonLogger.Info("批量补偿处理器: 后台worker已启动 (每5分钟检查一次)")
jsonLogger.Fatalf("failed to start background workers: %v", err)
}
// 优雅关闭
@@ -358,7 +90,7 @@ func main() {
jsonLogger.Info("shutting down...")
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), cfg.Server.ShutdownTimeout)
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), runtime.ShutdownTimeout())
defer shutdownCancel()
if err := srv.Shutdown(shutdownCtx); err != nil {

View File

@@ -0,0 +1,190 @@
package app
import (
"context"
"errors"
"time"
"lijiaoqiao/supply-api/internal/cache"
"lijiaoqiao/supply-api/internal/compensation"
"lijiaoqiao/supply-api/internal/domain"
"lijiaoqiao/supply-api/internal/messaging"
"lijiaoqiao/supply-api/internal/outbox"
"lijiaoqiao/supply-api/internal/pkg/logging"
"lijiaoqiao/supply-api/internal/repository"
)
type outboxRepository interface {
FetchAndLock(ctx context.Context, limit int) ([]*repository.OutboxEvent, error)
MarkCompleted(ctx context.Context, eventID string) error
MarkFailed(ctx context.Context, eventID string, errorMsg string, nextRetryAt *time.Time) error
MoveToDeadLetter(ctx context.Context, event *repository.OutboxEvent, errorMsg string) error
}
type outboxRunner interface {
Start(ctx context.Context)
}
type partitionManager interface {
EnsureFuturePartitions(ctx context.Context) error
DropOldPartitions(ctx context.Context, tableName string) (int, error)
}
type compensationWorker interface {
StartBackgroundWorker(ctx context.Context, interval time.Duration) context.Context
}
type backgroundFactory struct {
newOutboxRepository func(db *repository.DB) outboxRepository
newMessageBroker func(redisCache *cache.RedisCache) messaging.MessageBroker
newOutboxRunner func(repo outboxRepository, broker messaging.MessageBroker, stats messaging.OutboxStats) outboxRunner
newPartitionManager func(db *repository.DB) partitionManager
newCompensationStore func(db *repository.DB) domain.CompensationStore
newCompensationExecutor func() domain.OperationExecutor
newCompensationProcessor func(
store domain.CompensationStore,
executor domain.OperationExecutor,
stats domain.CompensationStats,
) compensationWorker
}
// StartBackgroundWorkers 启动后台依赖服务。
func (r *Runtime) StartBackgroundWorkers(rootCtx, initCtx context.Context) error {
return startBackgroundWorkersWithFactory(rootCtx, initCtx, r, backgroundFactory{})
}
func startBackgroundWorkersWithFactory(
rootCtx context.Context,
initCtx context.Context,
runtime *Runtime,
factory backgroundFactory,
) error {
if runtime == nil {
return errors.New("runtime is required")
}
if runtime.logger == nil {
return errors.New("runtime logger is required")
}
if rootCtx == nil {
rootCtx = context.Background()
}
if initCtx == nil {
initCtx = rootCtx
}
factory = withDefaultBackgroundFactory(factory)
if runtime.revocationSubscriber != nil && runtime.redisCache != nil {
if err := runtime.revocationSubscriber.StartRevocationSubscriber(rootCtx); err != nil {
infof(runtime.logger, "警告: 启动主动吊销订阅失败: %v", err)
} else {
runtime.logger.Info("主动吊销机制: 已启动 (Redis Pub/Sub)", nil)
}
}
if runtime.db == nil {
return nil
}
outboxRepo := factory.newOutboxRepository(runtime.db)
msgBroker := factory.newMessageBroker(runtime.redisCache)
if msgBroker == nil {
if runtime.env == "prod" {
return errors.New("outbox message broker unavailable")
}
runtime.logger.Info("警告: OutboxProcessor未启动 (message broker不可用)", nil)
} else {
stats := &messaging.NoOpOutboxStats{}
runner := factory.newOutboxRunner(outboxRepo, msgBroker, stats)
go runner.Start(rootCtx)
runtime.logger.Info("OutboxProcessor已启动", nil)
}
partitionManager := factory.newPartitionManager(runtime.db)
if err := partitionManager.EnsureFuturePartitions(initCtx); err != nil {
infof(runtime.logger, "警告: 预创建未来分区失败: %v", err)
} else {
runtime.logger.Info("分区管理: 未来分区已确保存在", nil)
}
go startPartitionMaintenance(rootCtx, runtime.logger, partitionManager)
compensationStore := factory.newCompensationStore(runtime.db)
compensationStats := &domain.NoOpCompensationStats{}
compensationExecutor := factory.newCompensationExecutor()
compensationProcessor := factory.newCompensationProcessor(compensationStore, compensationExecutor, compensationStats)
runtime.logger.Info("批量补偿处理器: 已初始化", nil)
compensationProcessor.StartBackgroundWorker(rootCtx, 5*time.Minute)
runtime.logger.Info("批量补偿处理器: 后台worker已启动 (每5分钟检查一次)", nil)
return nil
}
func withDefaultBackgroundFactory(factory backgroundFactory) backgroundFactory {
if factory.newOutboxRepository == nil {
factory.newOutboxRepository = func(db *repository.DB) outboxRepository {
return repository.NewOutboxRepository(db.Pool)
}
}
if factory.newMessageBroker == nil {
factory.newMessageBroker = func(redisCache *cache.RedisCache) messaging.MessageBroker {
if redisCache == nil {
return nil
}
return messaging.NewOutboxMessageBroker(redisCache.GetClient(), "supply:outbox:stream", "outbox-processor")
}
}
if factory.newOutboxRunner == nil {
factory.newOutboxRunner = func(repo outboxRepository, broker messaging.MessageBroker, stats messaging.OutboxStats) outboxRunner {
return outbox.NewOutboxProcessorRunner(repo, broker, stats)
}
}
if factory.newPartitionManager == nil {
factory.newPartitionManager = func(db *repository.DB) partitionManager {
return repository.NewPartitionManager(db.Pool)
}
}
if factory.newCompensationStore == nil {
factory.newCompensationStore = func(db *repository.DB) domain.CompensationStore {
return domain.NewSQLCompensationStore(db.Pool)
}
}
if factory.newCompensationExecutor == nil {
factory.newCompensationExecutor = compensationNewDefaultExecutor
}
if factory.newCompensationProcessor == nil {
factory.newCompensationProcessor = func(
store domain.CompensationStore,
executor domain.OperationExecutor,
stats domain.CompensationStats,
) compensationWorker {
return domain.NewCompensationProcessor(store, executor, stats)
}
}
return factory
}
var compensationNewDefaultExecutor = func() domain.OperationExecutor {
return compensation.NewDefaultCompensationExecutor()
}
func startPartitionMaintenance(ctx context.Context, logger logging.Logger, manager partitionManager) {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := manager.EnsureFuturePartitions(context.Background()); err != nil {
infof(logger, "分区维护: 预创建未来分区失败: %v", err)
}
for _, tableName := range []string{"audit_events", "supply_usage_records", "supply_idempotency_records"} {
if _, err := manager.DropOldPartitions(context.Background(), tableName); err != nil {
infof(logger, "分区维护: 清理过期分区失败 (%s): %v", tableName, err)
}
}
}
}
}

View File

@@ -0,0 +1,314 @@
package app
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"time"
"lijiaoqiao/supply-api/internal/adapter"
"lijiaoqiao/supply-api/internal/audit"
auditrepo "lijiaoqiao/supply-api/internal/audit/repository"
auditservice "lijiaoqiao/supply-api/internal/audit/service"
"lijiaoqiao/supply-api/internal/cache"
"lijiaoqiao/supply-api/internal/config"
"lijiaoqiao/supply-api/internal/domain"
"lijiaoqiao/supply-api/internal/httpapi"
"lijiaoqiao/supply-api/internal/middleware"
"lijiaoqiao/supply-api/internal/pkg/logging"
"lijiaoqiao/supply-api/internal/repository"
)
// RuntimeOptions 定义构建 supply-api 运行时所需的输入。
type RuntimeOptions struct {
Env string
Config *config.Config
Logger logging.Logger
InitContext context.Context
Now func() time.Time
}
// Runtime 聚合 HTTP 启动和后台任务启动所需的运行时依赖。
type Runtime struct {
env string
logger logging.Logger
now func() time.Time
serverConfig config.ServerConfig
db *repository.DB
redisCache *cache.RedisCache
supplyAPI *httpapi.SupplyAPI
alertAPI *httpapi.AlertAPI
authMiddleware *middleware.AuthMiddleware
rateLimitConfig *middleware.RateLimitConfig
revocationSubscriber revocationSubscriber
}
type revocationSubscriber interface {
StartRevocationSubscriber(ctx context.Context) error
}
type runtimeFactory struct {
newDB func(ctx context.Context, cfg config.DatabaseConfig) (*repository.DB, error)
newRedisCache func(cfg config.RedisConfig) (*cache.RedisCache, error)
}
// BuildRuntime 构建 supply-api 运行时依赖。
func BuildRuntime(opts RuntimeOptions) (*Runtime, error) {
return buildRuntimeWithFactory(opts, runtimeFactory{
newDB: repository.NewDB,
newRedisCache: cache.NewRedisCache,
})
}
func buildRuntimeWithFactory(opts RuntimeOptions, factory runtimeFactory) (*Runtime, error) {
if opts.Config == nil {
return nil, errors.New("config is required")
}
if opts.Logger == nil {
return nil, errors.New("logger is required")
}
if factory.newDB == nil {
factory.newDB = repository.NewDB
}
if factory.newRedisCache == nil {
factory.newRedisCache = cache.NewRedisCache
}
env := normalizeEnv(opts.Env)
now := opts.Now
if now == nil {
now = time.Now
}
initCtx := opts.InitContext
if initCtx == nil {
initCtx = context.Background()
}
isProd := env == "prod"
db, err := factory.newDB(initCtx, opts.Config.Database)
if err != nil {
if isProd {
return nil, fmt.Errorf("database unavailable: %w", err)
}
infof(opts.Logger, "warning: failed to connect to database: %v (using in-memory store)", err)
db = nil
} else if db != nil {
infof(opts.Logger, "connected to database at %s:%d", opts.Config.Database.Host, opts.Config.Database.Port)
}
redisCache, err := factory.newRedisCache(opts.Config.Redis)
if err != nil {
if isProd {
infof(opts.Logger, "warning: redis unavailable at startup: %v", err)
} else {
infof(opts.Logger, "warning: failed to connect to redis: %v (caching disabled)", err)
}
redisCache = nil
} else if redisCache != nil {
infof(opts.Logger, "connected to redis at %s:%d", opts.Config.Redis.Host, opts.Config.Redis.Port)
}
var accountStore domain.AccountStore
var packageStore domain.PackageStore
var settlementStore domain.SettlementStore
var earningStore domain.EarningStore
var auditRepository *auditrepo.PostgresAuditRepository
var tokenStatusRepo *repository.TokenStatusRepository
var idempotencyRepo *repository.IdempotencyRepository
if db != nil {
accountRepo := repository.NewAccountRepository(db.Pool)
packageRepo := repository.NewPackageRepository(db.Pool)
settlementRepo := repository.NewSettlementRepository(db.Pool)
usageRepo := repository.NewUsageRepository(db.Pool)
idempotencyRepo = repository.NewIdempotencyRepository(db.Pool)
auditRepository = auditrepo.NewPostgresAuditRepository(db.Pool)
tokenStatusRepo = repository.NewTokenStatusRepository(db.Pool)
accountStore = adapter.NewDBAccountStore(accountRepo)
packageStore = adapter.NewDBPackageStore(packageRepo)
settlementStore = adapter.NewDBSettlementStore(settlementRepo, accountRepo, db.Pool)
earningStore = adapter.NewDBEarningStore(usageRepo)
} else {
accountStore = adapter.NewInMemoryAccountStoreAdapter()
packageStore = adapter.NewInMemoryPackageStoreAdapter()
settlementStore = adapter.NewInMemorySettlementStoreAdapter()
earningStore = adapter.NewInMemoryEarningStoreAdapter()
}
var auditStore audit.AuditStore
if auditRepository != nil {
auditStore = audit.NewPostgresAuditStore(auditRepository)
opts.Logger.Info("审计存储: 使用PostgreSQL (DB-backed)", nil)
} else {
auditStore = audit.NewMemoryAuditStore()
opts.Logger.Info("警告: 审计存储使用内存实现 (生产环境不应使用)", nil)
}
var alertStore auditservice.AlertStoreInterface
if db != nil {
alertStore = auditrepo.NewPostgresAlertRepository(db.Pool)
opts.Logger.Info("告警存储: 使用PostgreSQL (DB-backed)", nil)
} else {
alertStore = auditservice.NewInMemoryAlertStore()
opts.Logger.Info("警告: 告警存储使用内存实现 (仅开发环境允许)", nil)
}
alertService := auditservice.NewAlertService(alertStore)
var fkValidator *repository.ForeignKeyValidator
if db != nil {
fkValidator = repository.NewForeignKeyValidator(db.Pool)
opts.Logger.Info("外键校验器: 已初始化 (PostgreSQL-backed)", nil)
} else {
opts.Logger.Info("警告: 外键校验器未启用 (db不可用)", nil)
}
_ = domain.NewInvariantChecker(accountStore, packageStore, settlementStore)
accountService := domain.NewAccountService(accountStore, auditStore)
packageService := domain.NewPackageService(packageStore, accountStore, auditStore)
settlementService := domain.NewSettlementService(settlementStore, earningStore, auditStore)
earningService := domain.NewEarningService(earningStore)
tokenCache := middleware.NewTokenCache()
var tokenBackend middleware.TokenStatusBackend
var revocationSubscriber revocationSubscriber
if tokenStatusRepo != nil {
dbTokenBackend := middleware.NewDBTokenStatusBackend(tokenStatusRepo, redisCache, opts.Config.Token.RevocationCacheTTL)
tokenBackend = dbTokenBackend
revocationSubscriber = dbTokenBackend
opts.Logger.Info("Token状态后端: 使用PostgreSQL (DB-backed)", nil)
} else {
tokenBackend = adapter.NewMemoryTokenBackend()
opts.Logger.Info("警告: Token状态后端使用内存实现 (生产环境不应使用)", nil)
}
auditEmitter := adapter.NewAuditEmitterAdapter(auditStore)
authMiddleware := middleware.NewAuthMiddleware(middleware.AuthConfig{
SecretKey: opts.Config.Token.SecretKey,
PublicKey: opts.Config.Token.PublicKey,
Algorithm: opts.Config.Token.Algorithm,
Issuer: opts.Config.Token.Issuer,
CacheTTL: opts.Config.Token.RevocationCacheTTL,
Enabled: env != "dev",
}, tokenCache, tokenBackend, auditEmitter)
var idempotencyMiddleware *middleware.IdempotencyMiddleware
if db != nil && idempotencyRepo != nil {
idempotencyMiddleware = middleware.NewIdempotencyMiddleware(idempotencyRepo, middleware.IdempotencyConfig{
TTL: 24 * time.Hour,
Enabled: env != "dev",
})
opts.Logger.Info("幂等中间件已启用DB-backed", nil)
} else {
if isProd {
return nil, errors.New("idempotency repository unavailable")
}
opts.Logger.Info("警告幂等中间件未启用db或repo不可用- 需要幂等的写接口将返回 503", nil)
}
rateLimitConfig := middleware.DefaultRateLimitConfig()
rateLimitConfig.Enabled = env != "dev"
opts.Logger.Info("限流中间件已初始化", nil)
supplyAPI, err := httpapi.NewSupplyAPI(
accountService,
packageService,
settlementService,
earningService,
idempotencyMiddleware,
auditStore,
fkValidator,
opts.Config.Server.DefaultSupplierID,
opts.Config.Server.StatementBaseURL,
now,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize supply api: %w", err)
}
supplyAPI.SetWithdrawEnabled(opts.Config.Settlement.WithdrawEnabled)
alertAPI, err := httpapi.NewAlertAPI(alertService)
if err != nil {
return nil, fmt.Errorf("failed to initialize alert api: %w", err)
}
return &Runtime{
env: env,
logger: opts.Logger,
now: now,
serverConfig: opts.Config.Server,
db: db,
redisCache: redisCache,
supplyAPI: supplyAPI,
alertAPI: alertAPI,
authMiddleware: authMiddleware,
rateLimitConfig: rateLimitConfig,
revocationSubscriber: revocationSubscriber,
}, nil
}
// BuildServer 使用运行时依赖构建 HTTP server。
func (r *Runtime) BuildServer() (*http.Server, error) {
if r == nil {
return nil, errors.New("runtime is required")
}
var dbHealthCheck func(context.Context) error
var redisHealthCheck func(context.Context) error
if r.db != nil {
dbHealthCheck = r.db.HealthCheck
}
if r.redisCache != nil {
redisHealthCheck = r.redisCache.HealthCheck
}
return BuildServer(BuildServerOptions{
Env: r.env,
ServerConfig: r.serverConfig,
Logger: r.logger,
SupplyAPI: r.supplyAPI,
AlertAPI: r.alertAPI,
AuthMiddleware: r.authMiddleware,
RateLimitConfig: r.rateLimitConfig,
DBHealthCheck: dbHealthCheck,
RedisHealthCheck: redisHealthCheck,
})
}
// Close 关闭运行时持有的外部资源。
func (r *Runtime) Close() {
if r == nil {
return
}
if r.redisCache != nil {
_ = r.redisCache.Close()
}
if r.db != nil {
r.db.Close()
}
}
// ShutdownTimeout 返回服务优雅关闭超时时间。
func (r *Runtime) ShutdownTimeout() time.Duration {
if r == nil {
return 0
}
return r.serverConfig.ShutdownTimeout
}
func normalizeEnv(env string) string {
normalized := strings.ToLower(strings.TrimSpace(env))
if normalized == "" {
return "dev"
}
return normalized
}
func infof(logger logging.Logger, format string, args ...any) {
logger.Info(fmt.Sprintf(format, args...), nil)
}

View File

@@ -0,0 +1,179 @@
package app
import (
"context"
"errors"
"strings"
"testing"
"time"
"lijiaoqiao/supply-api/internal/cache"
"lijiaoqiao/supply-api/internal/config"
"lijiaoqiao/supply-api/internal/messaging"
"lijiaoqiao/supply-api/internal/repository"
)
func TestBuildRuntime_ProdRequiresDatabase(t *testing.T) {
_, err := buildRuntimeWithFactory(RuntimeOptions{
Env: "prod",
Config: testRuntimeConfig(),
Logger: testLogger{},
InitContext: context.Background(),
Now: func() time.Time {
return time.Unix(1712800000, 0).UTC()
},
}, runtimeFactory{
newDB: func(context.Context, config.DatabaseConfig) (*repository.DB, error) {
return nil, errors.New("db down")
},
newRedisCache: func(config.RedisConfig) (*cache.RedisCache, error) {
return nil, nil
},
})
if err == nil {
t.Fatal("expected prod runtime build to reject database outage")
}
if !strings.Contains(err.Error(), "database unavailable") {
t.Fatalf("unexpected error: %v", err)
}
}
func TestBuildRuntime_DevFallsBackToInMemoryDependencies(t *testing.T) {
runtime, err := buildRuntimeWithFactory(RuntimeOptions{
Env: "dev",
Config: testRuntimeConfig(),
Logger: testLogger{},
InitContext: context.Background(),
Now: func() time.Time {
return time.Unix(1712800000, 0).UTC()
},
}, runtimeFactory{
newDB: func(context.Context, config.DatabaseConfig) (*repository.DB, error) {
return nil, errors.New("db down")
},
newRedisCache: func(config.RedisConfig) (*cache.RedisCache, error) {
return nil, errors.New("redis down")
},
})
if err != nil {
t.Fatalf("expected dev runtime to fall back to in-memory dependencies, got %v", err)
}
if runtime == nil {
t.Fatal("expected runtime")
}
if runtime.db != nil {
t.Fatal("expected nil db after dev fallback")
}
if runtime.redisCache != nil {
t.Fatal("expected nil redis cache after dev fallback")
}
if runtime.supplyAPI == nil || runtime.alertAPI == nil {
t.Fatal("expected apis to be initialized")
}
if runtime.authMiddleware == nil {
t.Fatal("expected auth middleware to be initialized")
}
if runtime.rateLimitConfig == nil {
t.Fatal("expected rate limit config to be initialized")
}
}
func TestRuntime_StartBackgroundWorkers_WithoutDatabaseIsNoop(t *testing.T) {
var outboxRepoCalled bool
err := startBackgroundWorkersWithFactory(context.Background(), context.Background(), &Runtime{
env: "dev",
logger: testLogger{},
}, backgroundFactory{
newOutboxRepository: func(*repository.DB) outboxRepository {
outboxRepoCalled = true
return stubOutboxRepository{}
},
})
if err != nil {
t.Fatalf("expected nil error when database is unavailable, got %v", err)
}
if outboxRepoCalled {
t.Fatal("expected background workers to skip db-backed startup when db is nil")
}
}
func TestRuntime_StartBackgroundWorkers_ProdRequiresOutboxBroker(t *testing.T) {
err := startBackgroundWorkersWithFactory(context.Background(), context.Background(), &Runtime{
env: "prod",
logger: testLogger{},
db: &repository.DB{},
}, backgroundFactory{
newOutboxRepository: func(*repository.DB) outboxRepository {
return stubOutboxRepository{}
},
newMessageBroker: func(*cache.RedisCache) messaging.MessageBroker {
return nil
},
})
if err == nil {
t.Fatal("expected missing outbox broker to fail in prod")
}
if !strings.Contains(err.Error(), "outbox message broker unavailable") {
t.Fatalf("unexpected error: %v", err)
}
}
func testRuntimeConfig() *config.Config {
return &config.Config{
Server: config.ServerConfig{
Addr: ":18082",
ReadTimeout: 10 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 30 * time.Second,
ShutdownTimeout: 5 * time.Second,
DefaultSupplierID: 1,
StatementBaseURL: "https://statements.example.com",
},
Database: config.DatabaseConfig{
Host: "127.0.0.1",
Port: 5432,
User: "test",
Password: "test",
Database: "supply",
MaxOpenConns: 4,
MaxIdleConns: 2,
ConnMaxLifetime: time.Minute,
ConnMaxIdleTime: time.Minute,
},
Redis: config.RedisConfig{
Host: "127.0.0.1",
Port: 6379,
Password: "",
DB: 0,
PoolSize: 2,
},
Token: config.TokenConfig{
SecretKey: "runtime-test-secret",
Algorithm: "HS256",
Issuer: "runtime-test",
RevocationCacheTTL: 10 * time.Second,
},
Settlement: config.SettlementConfig{
WithdrawEnabled: true,
},
}
}
type stubOutboxRepository struct{}
func (stubOutboxRepository) FetchAndLock(context.Context, int) ([]*repository.OutboxEvent, error) {
return nil, nil
}
func (stubOutboxRepository) MarkCompleted(context.Context, string) error {
return nil
}
func (stubOutboxRepository) MarkFailed(context.Context, string, string, *time.Time) error {
return nil
}
func (stubOutboxRepository) MoveToDeadLetter(context.Context, *repository.OutboxEvent, string) error {
return nil
}