diff --git a/supply-api/README.md b/supply-api/README.md index fb88321a..3e81b7dc 100644 --- a/supply-api/README.md +++ b/supply-api/README.md @@ -49,10 +49,12 @@ supply-api/ │ └── config/ # 配置管理 ├── sql/ │ └── postgresql/ # 数据库 DDL 脚本 +│ ├── audit_alerts_v1.sql │ ├── audit_events_migration_v1_to_v2.sql │ ├── outbox_pattern_v1.sql │ ├── partition_strategy_v1.sql │ ├── settlement_withdraw_constraint_v1.sql +│ ├── supply_core_schema_v2.sql │ ├── supply_idempotency_record_v1.sql │ └── token_status_registry_v1.sql └── scripts/ @@ -158,8 +160,9 @@ go run ./cmd/supply-api -env=dev -config ./config/config.local.yaml - 入口是 [main.go](/home/long/project/立交桥/supply-api/cmd/supply-api/main.go)。 - 当 PostgreSQL 可用时,会装配 DB-backed 的账户、套餐、结算、收益、审计、token 状态、outbox 与补偿链路。 -- 开发模式下如果 PostgreSQL 或 Redis 不可用,部分能力仍会回退到内存实现。 -- 告警 API 当前仍使用内存告警存储,不是 PostgreSQL-backed 实现。 +- 开发模式下如果 PostgreSQL 不可用,部分链路仍会回退到内存实现。 +- 告警 API 在 PostgreSQL 可用时会装配 DB-backed 仓储;数据库不可用时才显式回退内存实现。 +- 依赖幂等仓储的写接口在中间件缺失时会返回 `503 SUP_HTTP_5031`,不再静默切换到内联逻辑。 - Outbox processor 与补偿 worker 仅在数据库可用时启动。 ## 构建和运行 @@ -203,8 +206,10 @@ bash scripts/ci/repo_integrity_check.sh ./scripts/migrate.sh -env=dev ``` -当前仓库中实际存在的 PostgreSQL DDL / 约束文件包括: +当前仓库中实际存在且应优先参考的 PostgreSQL DDL / 约束文件包括: +- `sql/postgresql/supply_core_schema_v2.sql` +- `sql/postgresql/audit_alerts_v1.sql` - `sql/postgresql/outbox_pattern_v1.sql` - `sql/postgresql/partition_strategy_v1.sql` - `sql/postgresql/settlement_withdraw_constraint_v1.sql` diff --git a/supply-api/internal/repository/account_integration_test.go b/supply-api/internal/repository/account_integration_test.go index 1486db4e..a0fbbedd 100644 --- a/supply-api/internal/repository/account_integration_test.go +++ b/supply-api/internal/repository/account_integration_test.go @@ -11,7 +11,6 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) -// getTestDB 获取测试数据库连接 func getTestDB(t *testing.T) *pgxpool.Pool { t.Helper() @@ -33,7 +32,6 @@ func getTestDB(t *testing.T) *pgxpool.Pool { dbName = "supply_test" } - // 构建 DSN - 如果 host 是路径(Unix socket),使用 host= 参数 var dsn string if host[0] == '/' { dsn = "postgres://" + user + ":" + password + "@/" + dbName + "?host=" + host + "&sslmode=disable" @@ -46,7 +44,6 @@ func getTestDB(t *testing.T) *pgxpool.Pool { t.Skipf("跳过集成测试:无法连接数据库: %v", err) return nil } - if err := pool.Ping(context.Background()); err != nil { pool.Close() t.Skipf("跳过集成测试:无法 ping 数据库: %v", err) @@ -56,11 +53,81 @@ func getTestDB(t *testing.T) *pgxpool.Pool { t.Cleanup(func() { pool.Close() }) - return pool } -// TestAccountRepository_Create_Integration 集成测试:创建账号 +func requireTable(t *testing.T, pool *pgxpool.Pool, table string) { + t.Helper() + + var exists bool + err := pool.QueryRow(context.Background(), ` + SELECT EXISTS( + SELECT 1 + FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = $1 + ) + `, table).Scan(&exists) + if err != nil { + t.Fatalf("检查表 %s 失败: %v", table, err) + } + if !exists { + t.Fatalf("缺少表 %s", table) + } +} + +func requireColumn(t *testing.T, pool *pgxpool.Pool, table, column string) { + t.Helper() + + var exists bool + err := pool.QueryRow(context.Background(), ` + SELECT EXISTS( + SELECT 1 + FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = $1 AND column_name = $2 + ) + `, table, column).Scan(&exists) + if err != nil { + t.Fatalf("检查列 %s.%s 失败: %v", table, column, err) + } + if !exists { + t.Fatalf("缺少列 %s.%s", table, column) + } +} + +func requireColumns(t *testing.T, pool *pgxpool.Pool, table string, columns []string) { + t.Helper() + requireTable(t, pool, table) + for _, column := range columns { + requireColumn(t, pool, table, column) + } +} + +func TestAccountRepositorySchemaContract(t *testing.T) { + if testing.Short() { + t.Skip("integration only") + } + + pool := getTestDB(t) + if pool == nil { + return + } + + requireColumns(t, pool, "supply_accounts", []string{ + "id", "user_id", "platform", "account_type", "account_name", + "encrypted_credentials", "key_id", + "status", "risk_level", "total_quota", "available_quota", "frozen_quota", + "is_verified", "verified_at", "last_check_at", + "tos_compliant", "tos_check_result", + "total_requests", "total_tokens", "total_cost", "success_rate", + "risk_score", "risk_reason", "is_frozen", "frozen_reason", + "credential_cipher_algo", "credential_kms_key_alias", "credential_key_version", + "quota_unit", "currency_code", "version", + "created_ip", "updated_ip", "audit_trace_id", + "request_id", "idempotency_key", + "created_at", "updated_at", + }) +} + func TestAccountRepository_Create_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") @@ -71,7 +138,6 @@ func TestAccountRepository_Create_Integration(t *testing.T) { return } - // 验证连接成功 var result int err := pool.QueryRow(context.Background(), "SELECT 1").Scan(&result) if err != nil { @@ -80,11 +146,8 @@ func TestAccountRepository_Create_Integration(t *testing.T) { if result != 1 { t.Fatalf("预期结果 1,实际: %d", result) } - - t.Log("集成测试:数据库连接成功") } -// TestAccountRepository_GetByID_Integration 集成测试:获取账号 func TestAccountRepository_GetByID_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") @@ -95,17 +158,9 @@ func TestAccountRepository_GetByID_Integration(t *testing.T) { return } - // 验证 supply_accounts 表存在 - var tableName string - err := pool.QueryRow(context.Background(), "SELECT table_name FROM information_schema.tables WHERE table_name = 'supply_accounts'").Scan(&tableName) - if err != nil { - t.Skipf("跳过:supply_accounts 表不存在: %v", err) - } - - t.Log("集成测试:supply_accounts 表存在") + requireTable(t, pool, "supply_accounts") } -// TestAccountRepository_Update_Integration 集成测试:更新账号(乐观锁) func TestAccountRepository_Update_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") @@ -116,22 +171,9 @@ func TestAccountRepository_Update_Integration(t *testing.T) { return } - // 验证表结构包含 version 字段(乐观锁) - var columnExists bool - err := pool.QueryRow(context.Background(), ` - SELECT EXISTS( - SELECT 1 FROM information_schema.columns - WHERE table_name = 'supply_accounts' AND column_name = 'version' - ) - `).Scan(&columnExists) - if err != nil || !columnExists { - t.Skip("跳过:supply_accounts 表缺少 version 字段(乐观锁)") - } - - t.Log("集成测试:supply_accounts 表包含 version 字段(乐观锁)") + requireColumn(t, pool, "supply_accounts", "version") } -// TestAccountRepository_List_Integration 集成测试:列出账号 func TestAccountRepository_List_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") @@ -142,7 +184,6 @@ func TestAccountRepository_List_Integration(t *testing.T) { return } - // 列出所有表 rows, err := pool.Query(context.Background(), ` SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' @@ -155,18 +196,16 @@ func TestAccountRepository_List_Integration(t *testing.T) { count := 0 for rows.Next() { var name string - rows.Scan(&name) + if scanErr := rows.Scan(&name); scanErr != nil { + t.Fatalf("扫描表名失败: %v", scanErr) + } count++ } - if count == 0 { t.Fatal("预期至少有一些表") } - - t.Logf("集成测试:数据库包含 %d 个表", count) } -// TestAccountRepository_GetWithdrawableBalance_Integration 集成测试:获取可提现余额 func TestAccountRepository_GetWithdrawableBalance_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") @@ -177,17 +216,15 @@ func TestAccountRepository_GetWithdrawableBalance_Integration(t *testing.T) { return } - // 验证 supply_accounts 表存在并且有相关字段 - var accountID int64 - err := pool.QueryRow(context.Background(), "SELECT COALESCE(MAX(id), 0) FROM supply_accounts").Scan(&accountID) + requireColumn(t, pool, "supply_accounts", "available_quota") + + var total float64 + err := pool.QueryRow(context.Background(), "SELECT COALESCE(SUM(available_quota), 0) FROM supply_accounts").Scan(&total) if err != nil { - t.Logf("集成测试:supply_accounts 表为空或不存在: %v", err) - } else { - t.Logf("集成测试:supply_accounts 最大 ID = %d", accountID) + t.Fatalf("查询可用额度失败: %v", err) } } -// TestAccountRepository_OptimisticLock_Integration 集成测试:乐观锁冲突 func TestAccountRepository_OptimisticLock_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") @@ -198,20 +235,9 @@ func TestAccountRepository_OptimisticLock_Integration(t *testing.T) { return } - // 验证 version 字段存在 - var versionCol int - err := pool.QueryRow(context.Background(), ` - SELECT COUNT(*) FROM information_schema.columns - WHERE table_name = 'supply_accounts' AND column_name = 'version' - `).Scan(&versionCol) - if err != nil || versionCol == 0 { - t.Skip("跳过:supply_accounts 表缺少 version 字段") - } - - t.Log("集成测试:乐观锁字段验证通过") + requireColumn(t, pool, "supply_accounts", "version") } -// TestAccountRepository_Transaction_Integration 集成测试:事务操作 func TestAccountRepository_Transaction_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") @@ -222,7 +248,6 @@ func TestAccountRepository_Transaction_Integration(t *testing.T) { return } - // 测试事务 tx, err := pool.Begin(context.Background()) if err != nil { t.Fatalf("开始事务失败: %v", err) @@ -234,11 +259,7 @@ func TestAccountRepository_Transaction_Integration(t *testing.T) { if err != nil { t.Fatalf("事务内查询失败: %v", err) } - - err = tx.Commit(context.Background()) - if err != nil { + if err := tx.Commit(context.Background()); err != nil { t.Fatalf("提交事务失败: %v", err) } - - t.Log("集成测试:事务操作成功") } diff --git a/supply-api/internal/repository/package_integration_test.go b/supply-api/internal/repository/package_integration_test.go index c2f6be5c..a0a8ed0d 100644 --- a/supply-api/internal/repository/package_integration_test.go +++ b/supply-api/internal/repository/package_integration_test.go @@ -5,201 +5,126 @@ package repository import ( "context" - "os" "testing" - - "github.com/jackc/pgx/v5/pgxpool" ) -// getPackageTestDB 获取测试数据库连接 -func getPackageTestDB(t *testing.T) *pgxpool.Pool { - t.Helper() - - host := os.Getenv("SUPPLY_API_DB_HOST") - if host == "" { - host = "/var/run/postgresql" - } - port := os.Getenv("SUPPLY_API_DB_PORT") - if port == "" { - port = "5432" - } - user := os.Getenv("SUPPLY_API_DB_USER") - if user == "" { - user = "long" - } - password := os.Getenv("SUPPLY_API_DB_PASSWORD") - dbName := os.Getenv("SUPPLY_API_DB_NAME") - if dbName == "" { - dbName = "supply_test" +func TestPackageRepositorySchemaContract(t *testing.T) { + if testing.Short() { + t.Skip("integration only") } - // 构建 DSN - 如果 host 是路径(Unix socket),使用 host= 参数 - var dsn string - if host[0] == '/' { - dsn = "postgres://" + user + ":" + password + "@/" + dbName + "?host=" + host + "&sslmode=disable" - } else { - dsn = "postgres://" + user + ":" + password + "@" + host + ":" + port + "/" + dbName + "?sslmode=disable" + pool := getTestDB(t) + if pool == nil { + return } - pool, err := pgxpool.New(context.Background(), dsn) - if err != nil { - t.Skipf("跳过集成测试:无法连接数据库: %v", err) - return nil - } - - if err := pool.Ping(context.Background()); err != nil { - pool.Close() - t.Skipf("跳过集成测试:无法 ping 数据库: %v", err) - return nil - } - - t.Cleanup(func() { - pool.Close() + requireColumns(t, pool, "supply_packages", []string{ + "id", "supply_account_id", "user_id", "platform", "model", + "total_quota", "available_quota", "sold_quota", "reserved_quota", + "price_per_1m_input", "price_per_1m_output", "min_purchase", + "start_at", "end_at", "valid_days", + "status", "max_concurrent", "rate_limit_rpm", + "total_orders", "total_revenue", "rating", "rating_count", + "quota_unit", "price_unit", "currency_code", "version", + "created_ip", "updated_ip", "audit_trace_id", + "request_id", "created_at", "updated_at", }) - - return pool } -// TestPackageRepository_Create_Integration 集成测试:创建套餐 func TestPackageRepository_Create_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getPackageTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 验证 supply_packages 表存在 - var tableName string - err := pool.QueryRow(context.Background(), "SELECT table_name FROM information_schema.tables WHERE table_name = 'supply_packages'").Scan(&tableName) - if err != nil { - t.Skipf("跳过:supply_packages 表不存在: %v", err) - } - - t.Log("集成测试:supply_packages 表存在") + requireTable(t, pool, "supply_packages") } -// TestPackageRepository_GetByID_Integration 集成测试:获取套餐 func TestPackageRepository_GetByID_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getPackageTestDB(t) + pool := getTestDB(t) if pool == nil { return } var count int - err := pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM supply_packages").Scan(&count) - if err != nil { - t.Logf("集成测试:supply_packages 查询结果: %v", err) - } else { - t.Logf("集成测试:supply_packages 共有 %d 条记录", count) + if err := pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM supply_packages").Scan(&count); err != nil { + t.Fatalf("查询 supply_packages 失败: %v", err) } } -// TestPackageRepository_Update_Integration 集成测试:更新套餐(乐观锁) func TestPackageRepository_Update_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getPackageTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 验证 version 字段存在(乐观锁) - var columnExists bool - err := pool.QueryRow(context.Background(), ` - SELECT EXISTS( - SELECT 1 FROM information_schema.columns - WHERE table_name = 'supply_packages' AND column_name = 'version' - ) - `).Scan(&columnExists) - if err != nil || !columnExists { - t.Skip("跳过:supply_packages 表缺少 version 字段") - } - - t.Log("集成测试:supply_packages 包含 version 字段(乐观锁)") + requireColumn(t, pool, "supply_packages", "version") } -// TestPackageRepository_List_Integration 集成测试:列出套餐 func TestPackageRepository_List_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getPackageTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 列出供应商标套餐 rows, err := pool.Query(context.Background(), ` - SELECT id, supplier_id, available_quota + SELECT id, user_id, available_quota FROM supply_packages LIMIT 10 `) if err != nil { - t.Logf("集成测试:列出套餐: %v", err) - return + t.Fatalf("列出套餐失败: %v", err) } defer rows.Close() - count := 0 for rows.Next() { - var id, supplierID int64 - var availableQuota int - rows.Scan(&id, &supplierID, &availableQuota) - count++ - t.Logf("集成测试:套餐 ID=%d, SupplierID=%d, AvailableQuota=%d", id, supplierID, availableQuota) + var id, userID int64 + var availableQuota float64 + if scanErr := rows.Scan(&id, &userID, &availableQuota); scanErr != nil { + t.Fatalf("扫描套餐失败: %v", scanErr) + } } - - t.Logf("集成测试:列出 %d 个套餐", count) } -// TestPackageRepository_UpdateQuota_Integration 集成测试:扣减配额 func TestPackageRepository_UpdateQuota_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getPackageTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 验证 available_quota 字段存在 - var columnExists bool - err := pool.QueryRow(context.Background(), ` - SELECT EXISTS( - SELECT 1 FROM information_schema.columns - WHERE table_name = 'supply_packages' AND column_name = 'available_quota' - ) - `).Scan(&columnExists) - if err != nil || !columnExists { - t.Skip("跳过:supply_packages 表缺少 available_quota 字段") - } - - t.Log("集成测试:supply_packages 包含 available_quota 字段") + requireColumn(t, pool, "supply_packages", "available_quota") } -// TestPackageRepository_GetForUpdate_Integration 集成测试:悲观锁获取 func TestPackageRepository_GetForUpdate_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getPackageTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 测试 FOR UPDATE 锁 tx, err := pool.Begin(context.Background()) if err != nil { t.Fatalf("开始事务失败: %v", err) @@ -208,35 +133,20 @@ func TestPackageRepository_GetForUpdate_Integration(t *testing.T) { rows, err := tx.Query(context.Background(), "SELECT id FROM supply_packages LIMIT 1 FOR UPDATE") if err != nil { - t.Logf("集成测试:FOR UPDATE 查询: %v", err) - } else { - rows.Close() - t.Log("集成测试:FOR UPDATE 锁获取成功") + t.Fatalf("FOR UPDATE 查询失败: %v", err) } - - tx.Rollback(context.Background()) + rows.Close() } -// TestPackageRepository_OptimisticLock_Integration 集成测试:乐观锁冲突 func TestPackageRepository_OptimisticLock_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getPackageTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 验证 version 字段存在 - var versionCol int - err := pool.QueryRow(context.Background(), ` - SELECT COUNT(*) FROM information_schema.columns - WHERE table_name = 'supply_packages' AND column_name = 'version' - `).Scan(&versionCol) - if err != nil || versionCol == 0 { - t.Skip("跳过:supply_packages 表缺少 version 字段") - } - - t.Log("集成测试:乐观锁字段验证通过") + requireColumn(t, pool, "supply_packages", "version") } diff --git a/supply-api/internal/repository/settlement_integration_test.go b/supply-api/internal/repository/settlement_integration_test.go index fa06f0dc..33a6184f 100644 --- a/supply-api/internal/repository/settlement_integration_test.go +++ b/supply-api/internal/repository/settlement_integration_test.go @@ -5,176 +5,113 @@ package repository import ( "context" - "os" "testing" - - "github.com/jackc/pgx/v5/pgxpool" ) -// getSettlementTestDB 获取测试数据库连接 -func getSettlementTestDB(t *testing.T) *pgxpool.Pool { - t.Helper() - - host := os.Getenv("SUPPLY_API_DB_HOST") - if host == "" { - host = "/var/run/postgresql" - } - port := os.Getenv("SUPPLY_API_DB_PORT") - if port == "" { - port = "5432" - } - user := os.Getenv("SUPPLY_API_DB_USER") - if user == "" { - user = "long" - } - password := os.Getenv("SUPPLY_API_DB_PASSWORD") - dbName := os.Getenv("SUPPLY_API_DB_NAME") - if dbName == "" { - dbName = "supply_test" +func TestSettlementRepositorySchemaContract(t *testing.T) { + if testing.Short() { + t.Skip("integration only") } - // 构建 DSN - 如果 host 是路径(Unix socket),使用 host= 参数 - var dsn string - if host[0] == '/' { - dsn = "postgres://" + user + ":" + password + "@/" + dbName + "?host=" + host + "&sslmode=disable" - } else { - dsn = "postgres://" + user + ":" + password + "@" + host + ":" + port + "/" + dbName + "?sslmode=disable" + pool := getTestDB(t) + if pool == nil { + return } - pool, err := pgxpool.New(context.Background(), dsn) - if err != nil { - t.Skipf("跳过集成测试:无法连接数据库: %v", err) - return nil - } - - if err := pool.Ping(context.Background()); err != nil { - pool.Close() - t.Skipf("跳过集成测试:无法 ping 数据库: %v", err) - return nil - } - - t.Cleanup(func() { - pool.Close() + requireColumns(t, pool, "supply_settlements", []string{ + "id", "settlement_no", "user_id", + "total_amount", "fee_amount", "net_amount", + "status", "payment_method", "payment_account", + "period_start", "period_end", "total_orders", "total_usage_records", + "payment_transaction_id", "paid_at", + "currency_code", "amount_unit", "version", + "request_id", "idempotency_key", "audit_trace_id", + "created_at", "updated_at", }) - - return pool } -// TestSettlementRepository_Create_Integration 集成测试:创建结算单 func TestSettlementRepository_Create_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getSettlementTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 验证 supply_settlements 表存在 - var tableName string - err := pool.QueryRow(context.Background(), "SELECT table_name FROM information_schema.tables WHERE table_name = 'supply_settlements'").Scan(&tableName) - if err != nil { - t.Skipf("跳过:supply_settlements 表不存在: %v", err) - } - - t.Log("集成测试:supply_settlements 表存在") + requireTable(t, pool, "supply_settlements") } -// TestSettlementRepository_GetByID_Integration 集成测试:获取结算单 func TestSettlementRepository_GetByID_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getSettlementTestDB(t) + pool := getTestDB(t) if pool == nil { return } var count int - err := pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM supply_settlements").Scan(&count) - if err != nil { - t.Logf("集成测试:supply_settlements 查询结果: %v", err) - } else { - t.Logf("集成测试:supply_settlements 共有 %d 条记录", count) + if err := pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM supply_settlements").Scan(&count); err != nil { + t.Fatalf("查询 supply_settlements 失败: %v", err) } } -// TestSettlementRepository_Update_Integration 集成测试:更新结算单(乐观锁) func TestSettlementRepository_Update_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getSettlementTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 验证 version 字段存在(乐观锁) - var columnExists bool - err := pool.QueryRow(context.Background(), ` - SELECT EXISTS( - SELECT 1 FROM information_schema.columns - WHERE table_name = 'supply_settlements' AND column_name = 'version' - ) - `).Scan(&columnExists) - if err != nil || !columnExists { - t.Skip("跳过:supply_settlements 表缺少 version 字段") - } - - t.Log("集成测试:supply_settlements 包含 version 字段(乐观锁)") + requireColumn(t, pool, "supply_settlements", "version") } -// TestSettlementRepository_List_Integration 集成测试:列出结算单 func TestSettlementRepository_List_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getSettlementTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 列出结算单 rows, err := pool.Query(context.Background(), ` - SELECT id, supplier_id, status, total_amount + SELECT id, user_id, status, total_amount FROM supply_settlements LIMIT 10 `) if err != nil { - t.Logf("集成测试:列出结算单: %v", err) - return + t.Fatalf("列出结算单失败: %v", err) } defer rows.Close() - count := 0 for rows.Next() { - var id, supplierID int64 + var id, userID int64 var status string var totalAmount float64 - rows.Scan(&id, &supplierID, &status, &totalAmount) - count++ - t.Logf("集成测试:结算单 ID=%d, SupplierID=%d, Status=%s, Amount=%.2f", id, supplierID, status, totalAmount) + if scanErr := rows.Scan(&id, &userID, &status, &totalAmount); scanErr != nil { + t.Fatalf("扫描结算单失败: %v", scanErr) + } } - - t.Logf("集成测试:列出 %d 个结算单", count) } -// TestSettlementRepository_GetForUpdate_Integration 集成测试:悲观锁获取 func TestSettlementRepository_GetForUpdate_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getSettlementTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 测试 FOR UPDATE SKIP LOCKED tx, err := pool.Begin(context.Background()) if err != nil { t.Fatalf("开始事务失败: %v", err) @@ -183,27 +120,21 @@ func TestSettlementRepository_GetForUpdate_Integration(t *testing.T) { rows, err := tx.Query(context.Background(), "SELECT id FROM supply_settlements WHERE status = 'pending' LIMIT 1 FOR UPDATE SKIP LOCKED") if err != nil { - t.Logf("集成测试:FOR UPDATE SKIP LOCKED 查询: %v", err) - } else { - rows.Close() - t.Log("集成测试:FOR UPDATE SKIP LOCKED 获取成功") + t.Fatalf("FOR UPDATE SKIP LOCKED 查询失败: %v", err) } - - tx.Rollback(context.Background()) + rows.Close() } -// TestSettlementRepository_GetForUpdateNoWait_Integration 集成测试:NOWAIT悲观锁 func TestSettlementRepository_GetForUpdateNoWait_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getSettlementTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 测试 FOR UPDATE NOWAIT tx, err := pool.Begin(context.Background()) if err != nil { t.Fatalf("开始事务失败: %v", err) @@ -212,117 +143,76 @@ func TestSettlementRepository_GetForUpdateNoWait_Integration(t *testing.T) { rows, err := tx.Query(context.Background(), "SELECT id FROM supply_settlements LIMIT 1 FOR UPDATE NOWAIT") if err != nil { - t.Logf("集成测试:FOR UPDATE NOWAIT 查询: %v", err) - } else { - rows.Close() - t.Log("集成测试:FOR UPDATE NOWAIT 获取成功") + t.Fatalf("FOR UPDATE NOWAIT 查询失败: %v", err) } - - tx.Rollback(context.Background()) + rows.Close() } -// TestSettlementRepository_GetProcessing_Integration 集成测试:获取处理中的结算单 func TestSettlementRepository_GetProcessing_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getSettlementTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 查找处理中的结算单 var count int - err := pool.QueryRow(context.Background(), ` + if err := pool.QueryRow(context.Background(), ` SELECT COUNT(*) FROM supply_settlements WHERE status = 'processing' - `).Scan(&count) - if err != nil { - t.Logf("集成测试:查找处理中结算单: %v", err) - } else { - t.Logf("集成测试:处理中的结算单数量 = %d", count) + `).Scan(&count); err != nil { + t.Fatalf("查询处理中结算单失败: %v", err) } } -// TestSettlementRepository_CreateInTx_Integration 集成测试:事务中创建结算单 func TestSettlementRepository_CreateInTx_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getSettlementTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 测试在事务中创建记录 tx, err := pool.Begin(context.Background()) if err != nil { t.Fatalf("开始事务失败: %v", err) } defer tx.Rollback(context.Background()) - // 验证可以插入测试数据 var result int - err = tx.QueryRow(context.Background(), "SELECT 1").Scan(&result) - if err != nil { + if err := tx.QueryRow(context.Background(), "SELECT 1").Scan(&result); err != nil { t.Fatalf("事务内查询失败: %v", err) } - - err = tx.Commit(context.Background()) - if err != nil { + if err := tx.Commit(context.Background()); err != nil { t.Fatalf("提交事务失败: %v", err) } - - t.Log("集成测试:事务操作成功") } -// TestSettlementRepository_OptimisticLock_Integration 集成测试:乐观锁冲突 func TestSettlementRepository_OptimisticLock_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getSettlementTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 验证 version 字段存在 - var versionCol int - err := pool.QueryRow(context.Background(), ` - SELECT COUNT(*) FROM information_schema.columns - WHERE table_name = 'supply_settlements' AND column_name = 'version' - `).Scan(&versionCol) - if err != nil || versionCol == 0 { - t.Skip("跳过:supply_settlements 表缺少 version 字段") - } - - t.Log("集成测试:乐观锁字段验证通过") + requireColumn(t, pool, "supply_settlements", "version") } -// TestSettlementRepository_Idempotency_Integration 集成测试:幂等性验证 func TestSettlementRepository_Idempotency_Integration(t *testing.T) { if testing.Short() { t.Skip("跳过集成测试(short mode)") } - pool := getSettlementTestDB(t) + pool := getTestDB(t) if pool == nil { return } - // 验证 idempotency_key 字段存在 - var columnExists bool - err := pool.QueryRow(context.Background(), ` - SELECT EXISTS( - SELECT 1 FROM information_schema.columns - WHERE table_name = 'supply_settlements' AND column_name = 'idempotency_key' - ) - `).Scan(&columnExists) - if err != nil || !columnExists { - t.Skip("跳过:supply_settlements 表缺少 idempotency_key 字段") - } - - t.Log("集成测试:幂等性字段验证通过") + requireColumn(t, pool, "supply_settlements", "idempotency_key") } diff --git a/supply-api/scripts/run_integration_tests.sh b/supply-api/scripts/run_integration_tests.sh old mode 100755 new mode 100644 index f7340588..5b865c28 --- a/supply-api/scripts/run_integration_tests.sh +++ b/supply-api/scripts/run_integration_tests.sh @@ -2,81 +2,215 @@ # Supply API Integration Test Runner # Usage: ./scripts/run_integration_tests.sh [package] -set -e +set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" PROJECT_DIR="$(dirname "$SCRIPT_DIR")" DEPLOY_DIR="$PROJECT_DIR/deploy" +PACKAGE="${1:-./internal/repository}" -# Default package to test -PACKAGE="${1:-./internal/middleware/...}" +MODE="" +COMPOSE_CMD=() +LOCAL_ROOT="" +LOCAL_PG_BIN="" +LOCAL_PG_DATA="" +LOCAL_PG_SOCKET="" +LOCAL_PG_LOG="" +LOCAL_PG_PORT="15432" +LOCAL_PG_HOST="127.0.0.1" + +SCHEMA_FILES=( + "sql/postgresql/supply_core_schema_v2.sql" + "sql/postgresql/partition_strategy_v1.sql" + "sql/postgresql/outbox_pattern_v1.sql" + "sql/postgresql/token_status_registry_v1.sql" + "sql/postgresql/audit_alerts_v1.sql" +) echo "=== Supply API Integration Tests ===" echo "" -# Resolve compose command -if command -v docker-compose &> /dev/null; then - COMPOSE_CMD=(docker-compose) -elif command -v docker &> /dev/null && docker compose version &> /dev/null; then - COMPOSE_CMD=(docker compose) -else - echo "ERROR: neither docker-compose nor docker compose is available" - exit 1 +cleanup() { + echo "" + echo "Stopping test infrastructure..." + case "$MODE" in + docker) + "${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" down -v >/dev/null 2>&1 || true + ;; + local-postgres) + if [ -n "$LOCAL_PG_BIN" ] && [ -n "$LOCAL_PG_DATA" ] && [ -d "$LOCAL_PG_DATA" ]; then + "$LOCAL_PG_BIN/pg_ctl" -D "$LOCAL_PG_DATA" stop -m fast >/dev/null 2>&1 || true + fi + if [ -n "$LOCAL_ROOT" ] && [ -d "$LOCAL_ROOT" ]; then + rm -rf "$LOCAL_ROOT" + fi + ;; + esac +} + +trap cleanup EXIT + +resolve_compose() { + if command -v docker-compose >/dev/null 2>&1; then + COMPOSE_CMD=(docker-compose) + return 0 + fi + if command -v docker >/dev/null 2>&1 && docker compose version >/dev/null 2>&1; then + COMPOSE_CMD=(docker compose) + return 0 + fi + return 1 +} + +find_pg_bin() { + local dir + for dir in /usr/lib/postgresql/*/bin; do + if [ -x "$dir/initdb" ] && [ -x "$dir/pg_ctl" ] && [ -x "$dir/psql" ]; then + echo "$dir" + return 0 + fi + done + return 1 +} + +start_docker_infra() { + if ! resolve_compose; then + return 1 + fi + + echo "Starting test infrastructure with docker..." + if ! "${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" up -d >/dev/null 2>&1; then + return 1 + fi + + MODE="docker" + + echo "Waiting for PostgreSQL to be ready..." + for i in {1..30}; do + if "${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" exec -T postgres pg_isready -U supply_test >/dev/null 2>&1; then + echo "PostgreSQL is ready" + break + fi + if [ "$i" -eq 30 ]; then + echo "ERROR: PostgreSQL failed to start" + "${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" logs postgres || true + return 1 + fi + sleep 1 + done + + echo "Waiting for Redis to be ready..." + for i in {1..30}; do + if "${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" exec -T redis redis-cli ping >/dev/null 2>&1; then + echo "Redis is ready" + break + fi + if [ "$i" -eq 30 ]; then + echo "ERROR: Redis failed to start" + "${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" logs redis || true + return 1 + fi + sleep 1 + done + + return 0 +} + +start_local_postgres() { + LOCAL_PG_BIN="$(find_pg_bin)" || { + echo "ERROR: docker 不可用,且未找到本机 PostgreSQL 二进制" + return 1 + } + + MODE="local-postgres" + LOCAL_ROOT="$(mktemp -d /tmp/lijiaoqiao-supply-integration.XXXXXX)" + LOCAL_PG_DATA="$LOCAL_ROOT/postgres-data" + LOCAL_PG_SOCKET="$LOCAL_ROOT/postgres-socket" + LOCAL_PG_LOG="$LOCAL_ROOT/postgres.log" + + mkdir -p "$LOCAL_PG_SOCKET" + + echo "Starting local PostgreSQL test infrastructure..." + "$LOCAL_PG_BIN/initdb" -D "$LOCAL_PG_DATA" -U supply_test --auth=trust >/dev/null + "$LOCAL_PG_BIN/pg_ctl" -D "$LOCAL_PG_DATA" -l "$LOCAL_PG_LOG" -o "-k $LOCAL_PG_SOCKET -h $LOCAL_PG_HOST -p $LOCAL_PG_PORT" start >/dev/null + + for i in {1..30}; do + if "$LOCAL_PG_BIN/pg_isready" -h "$LOCAL_PG_HOST" -p "$LOCAL_PG_PORT" -U supply_test >/dev/null 2>&1; then + echo "PostgreSQL is ready" + break + fi + if [ "$i" -eq 30 ]; then + echo "ERROR: local PostgreSQL failed to start" + cat "$LOCAL_PG_LOG" || true + return 1 + fi + sleep 1 + done + + "$LOCAL_PG_BIN/createdb" -h "$LOCAL_PG_HOST" -p "$LOCAL_PG_PORT" -U supply_test supply_test + echo "Redis is not required for package $PACKAGE" + return 0 +} + +apply_schema() { + echo "" + echo "Applying schema baseline..." + local schema + for schema in "${SCHEMA_FILES[@]}"; do + echo " - $(basename "$schema")" + case "$MODE" in + docker) + "${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" exec -T postgres \ + psql -v ON_ERROR_STOP=1 -U supply_test -d supply_test \ + -f "/docker-entrypoint-initdb.d/$(basename "$schema")" >/dev/null + ;; + local-postgres) + "$LOCAL_PG_BIN/psql" -v ON_ERROR_STOP=1 -h "$LOCAL_PG_HOST" -p "$LOCAL_PG_PORT" -U supply_test -d supply_test \ + -f "$PROJECT_DIR/$schema" >/dev/null + ;; + *) + echo "ERROR: unknown infrastructure mode" + return 1 + ;; + esac + done +} + +export_test_env() { + case "$MODE" in + docker) + export SUPPLY_API_DB_HOST="localhost" + export SUPPLY_API_DB_PORT="5432" + export SUPPLY_TEST_REDIS="localhost:6379" + ;; + local-postgres) + export SUPPLY_API_DB_HOST="$LOCAL_PG_HOST" + export SUPPLY_API_DB_PORT="$LOCAL_PG_PORT" + export SUPPLY_TEST_REDIS="" + ;; + *) + echo "ERROR: unknown infrastructure mode" + return 1 + ;; + esac + + export SUPPLY_API_DB_USER="supply_test" + export SUPPLY_API_DB_PASSWORD="" + export SUPPLY_API_DB_NAME="supply_test" + export GOCACHE="${GOCACHE:-/tmp/lijiaoqiao-go-cache-supply-integration}" +} + +if ! start_docker_infra; then + echo "Docker infrastructure unavailable, falling back to local PostgreSQL binaries..." + start_local_postgres fi -# Start infrastructure -echo "Starting test infrastructure..." -"${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" up -d - -# Wait for services to be healthy -echo "Waiting for PostgreSQL to be ready..." -for i in {1..30}; do - if "${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" exec -T postgres pg_isready -U supply_test &> /dev/null; then - echo "PostgreSQL is ready" - break - fi - if [ $i -eq 30 ]; then - echo "ERROR: PostgreSQL failed to start" - "${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" logs postgres - exit 1 - fi - sleep 1 -done - -echo "Waiting for Redis to be ready..." -for i in {1..30}; do - if "${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" exec -T redis redis-cli ping &> /dev/null; then - echo "Redis is ready" - break - fi - if [ $i -eq 30 ]; then - echo "ERROR: Redis failed to start" - "${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" logs redis - exit 1 - fi - sleep 1 -done +apply_schema echo "" echo "Running integration tests for: $PACKAGE" echo "" -# Run tests with integration tag cd "$PROJECT_DIR" -export SUPPLY_API_DB_HOST="localhost" -export SUPPLY_API_DB_PORT="5432" -export SUPPLY_API_DB_USER="supply_test" -export SUPPLY_API_DB_PASSWORD="supply_test_pass" -export SUPPLY_API_DB_NAME="supply_test" -export SUPPLY_TEST_POSTGRES="postgres://supply_test:supply_test_pass@localhost:5432/supply_test?sslmode=disable" -export SUPPLY_TEST_REDIS="localhost:6379" - +export_test_env go test -tags=integration -v "$PACKAGE" - -TEST_RESULT=$? - -echo "" -echo "Stopping test infrastructure..." -"${COMPOSE_CMD[@]}" -f "$DEPLOY_DIR/docker-compose.yml" down -v - -exit $TEST_RESULT diff --git a/supply-api/sql/postgresql/supply_core_schema_v2.sql b/supply-api/sql/postgresql/supply_core_schema_v2.sql new file mode 100644 index 00000000..95336df5 --- /dev/null +++ b/supply-api/sql/postgresql/supply_core_schema_v2.sql @@ -0,0 +1,144 @@ +-- Supply Core Schema v2 +-- 统一维护 supply-api 核心业务表的当前事实源。 +-- 范围: +-- 1. supply_accounts +-- 2. supply_packages +-- 3. supply_settlements +-- +-- 说明: +-- - 审计事件、分区策略、outbox、token 状态、告警表仍由各自独立 DDL 维护。 +-- - 当前仓库仍使用应用层外键校验,因此这里先不引入数据库级外键,避免把未收口的关系语义写死。 + +CREATE TABLE IF NOT EXISTS supply_accounts ( + id BIGSERIAL PRIMARY KEY, + user_id BIGINT NOT NULL, + platform VARCHAR(64) NOT NULL, + account_type VARCHAR(32) NOT NULL, + account_name VARCHAR(255) NOT NULL DEFAULT '', + encrypted_credentials TEXT NOT NULL DEFAULT '', + key_id VARCHAR(255) NOT NULL DEFAULT '', + status VARCHAR(32) NOT NULL, + risk_level VARCHAR(32) NOT NULL DEFAULT '', + total_quota NUMERIC(20, 6) NOT NULL DEFAULT 0, + available_quota NUMERIC(20, 6) NOT NULL DEFAULT 0, + frozen_quota NUMERIC(20, 6) NOT NULL DEFAULT 0, + is_verified BOOLEAN NOT NULL DEFAULT FALSE, + verified_at TIMESTAMPTZ, + last_check_at TIMESTAMPTZ, + tos_compliant BOOLEAN NOT NULL DEFAULT FALSE, + tos_check_result TEXT NOT NULL DEFAULT '', + total_requests BIGINT NOT NULL DEFAULT 0, + total_tokens BIGINT NOT NULL DEFAULT 0, + total_cost NUMERIC(20, 6) NOT NULL DEFAULT 0, + success_rate NUMERIC(10, 4) NOT NULL DEFAULT 0, + risk_score INTEGER NOT NULL DEFAULT 0, + risk_reason TEXT NOT NULL DEFAULT '', + is_frozen BOOLEAN NOT NULL DEFAULT FALSE, + frozen_reason TEXT NOT NULL DEFAULT '', + credential_cipher_algo VARCHAR(64) NOT NULL DEFAULT '', + credential_kms_key_alias VARCHAR(255) NOT NULL DEFAULT '', + credential_key_version INTEGER NOT NULL DEFAULT 1, + quota_unit VARCHAR(32) NOT NULL DEFAULT 'token', + currency_code VARCHAR(16) NOT NULL DEFAULT 'USD', + version INTEGER NOT NULL DEFAULT 0, + created_ip INET, + updated_ip INET, + audit_trace_id VARCHAR(128) NOT NULL DEFAULT '', + request_id VARCHAR(128) NOT NULL DEFAULT '', + idempotency_key VARCHAR(128) NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_supply_accounts_user_status_created_at + ON supply_accounts (user_id, status, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_supply_accounts_request_id + ON supply_accounts (request_id) + WHERE request_id <> ''; + +CREATE INDEX IF NOT EXISTS idx_supply_accounts_idempotency_key + ON supply_accounts (user_id, idempotency_key) + WHERE idempotency_key <> ''; + +CREATE TABLE IF NOT EXISTS supply_packages ( + id BIGSERIAL PRIMARY KEY, + supply_account_id BIGINT NOT NULL, + user_id BIGINT NOT NULL, + platform VARCHAR(64) NOT NULL DEFAULT '', + model VARCHAR(128) NOT NULL, + total_quota NUMERIC(20, 6) NOT NULL DEFAULT 0, + available_quota NUMERIC(20, 6) NOT NULL DEFAULT 0, + sold_quota NUMERIC(20, 6) NOT NULL DEFAULT 0, + reserved_quota NUMERIC(20, 6) NOT NULL DEFAULT 0, + price_per_1m_input NUMERIC(20, 6) NOT NULL DEFAULT 0, + price_per_1m_output NUMERIC(20, 6) NOT NULL DEFAULT 0, + min_purchase NUMERIC(20, 6) NOT NULL DEFAULT 0, + start_at TIMESTAMPTZ, + end_at TIMESTAMPTZ, + valid_days INTEGER NOT NULL DEFAULT 0, + status VARCHAR(32) NOT NULL, + max_concurrent INTEGER NOT NULL DEFAULT 0, + rate_limit_rpm INTEGER NOT NULL DEFAULT 0, + total_orders INTEGER NOT NULL DEFAULT 0, + total_revenue NUMERIC(20, 6) NOT NULL DEFAULT 0, + rating NUMERIC(10, 4) NOT NULL DEFAULT 0, + rating_count INTEGER NOT NULL DEFAULT 0, + quota_unit VARCHAR(32) NOT NULL DEFAULT 'token', + price_unit VARCHAR(32) NOT NULL DEFAULT 'per_1m_tokens', + currency_code VARCHAR(16) NOT NULL DEFAULT 'USD', + version INTEGER NOT NULL DEFAULT 0, + created_ip INET, + updated_ip INET, + audit_trace_id VARCHAR(128) NOT NULL DEFAULT '', + request_id VARCHAR(128) NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_supply_packages_user_status_created_at + ON supply_packages (user_id, status, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_supply_packages_account_id + ON supply_packages (supply_account_id); + +CREATE INDEX IF NOT EXISTS idx_supply_packages_request_id + ON supply_packages (request_id) + WHERE request_id <> ''; + +CREATE TABLE IF NOT EXISTS supply_settlements ( + id BIGSERIAL PRIMARY KEY, + settlement_no VARCHAR(128) NOT NULL UNIQUE, + user_id BIGINT NOT NULL, + total_amount NUMERIC(20, 6) NOT NULL DEFAULT 0, + fee_amount NUMERIC(20, 6) NOT NULL DEFAULT 0, + net_amount NUMERIC(20, 6) NOT NULL DEFAULT 0, + status VARCHAR(32) NOT NULL, + payment_method VARCHAR(32) NOT NULL DEFAULT '', + payment_account VARCHAR(255) NOT NULL DEFAULT '', + period_start TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + period_end TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + total_orders INTEGER NOT NULL DEFAULT 0, + total_usage_records INTEGER NOT NULL DEFAULT 0, + payment_transaction_id VARCHAR(128) NOT NULL DEFAULT '', + paid_at TIMESTAMPTZ, + currency_code VARCHAR(16) NOT NULL DEFAULT 'USD', + amount_unit VARCHAR(16) NOT NULL DEFAULT 'minor', + version INTEGER NOT NULL DEFAULT 0, + request_id VARCHAR(128) NOT NULL DEFAULT '', + idempotency_key VARCHAR(128) NOT NULL DEFAULT '', + audit_trace_id VARCHAR(128) NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_supply_settlements_user_status_created_at + ON supply_settlements (user_id, status, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_supply_settlements_idempotency_key + ON supply_settlements (user_id, idempotency_key) + WHERE idempotency_key <> ''; + +CREATE INDEX IF NOT EXISTS idx_supply_settlements_request_id + ON supply_settlements (request_id) + WHERE request_id <> '';