From 319d9e19892a8cdc198eb1d754ecb174322c47a4 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 20 Apr 2026 11:50:20 +0800 Subject: [PATCH] fix(supply-api): realign audit event persistence contract --- .../internal/audit/postgres_audit_store.go | 16 +- .../postgres_audit_store_integration_test.go | 113 +++++++++++++ .../audit/repository/audit_repository.go | 7 +- .../audit_repository_integration_test.go | 160 ++++++++++++++++++ .../postgresql/audit_events_contract_v2.sql | 99 +++++++++++ .../sql/postgresql/partition_strategy_v1.sql | 62 +++++-- 6 files changed, 438 insertions(+), 19 deletions(-) create mode 100644 supply-api/internal/audit/postgres_audit_store_integration_test.go create mode 100644 supply-api/internal/audit/repository/audit_repository_integration_test.go create mode 100644 supply-api/sql/postgresql/audit_events_contract_v2.sql diff --git a/supply-api/internal/audit/postgres_audit_store.go b/supply-api/internal/audit/postgres_audit_store.go index 918d04c1..c3dc812a 100644 --- a/supply-api/internal/audit/postgres_audit_store.go +++ b/supply-api/internal/audit/postgres_audit_store.go @@ -24,14 +24,19 @@ var _ AuditStore = (*PostgresAuditStore)(nil) // Emit 发送审计事件 func (s *PostgresAuditStore) Emit(ctx context.Context, event Event) error { + timestamp := event.CreatedAt + if timestamp.IsZero() { + timestamp = time.Now() + } + // 转换 audit.Event -> model.AuditEvent modelEvent := &model.AuditEvent{ EventID: event.EventID, EventName: event.Action, EventCategory: "", EventSubCategory: "", - Timestamp: event.CreatedAt, - TimestampMs: event.CreatedAt.UnixMilli(), + Timestamp: timestamp, + TimestampMs: timestamp.UnixMilli(), RequestID: event.RequestID, IdempotencyKey: "", TenantID: event.TenantID, @@ -39,7 +44,14 @@ func (s *PostgresAuditStore) Emit(ctx context.Context, event Event) error { ObjectID: event.ObjectID, Action: event.Action, ResultCode: event.ResultCode, + Success: event.ResultCode == "OK", + BeforeState: event.BeforeState, + AfterState: event.AfterState, SourceIP: event.SourceIP, + SecurityFlags: *model.NewSecurityFlags(), + ComplianceTags: []string{}, + Version: 1, + CreatedAt: timestamp, } return s.repo.Emit(ctx, modelEvent) } diff --git a/supply-api/internal/audit/postgres_audit_store_integration_test.go b/supply-api/internal/audit/postgres_audit_store_integration_test.go new file mode 100644 index 00000000..959800d9 --- /dev/null +++ b/supply-api/internal/audit/postgres_audit_store_integration_test.go @@ -0,0 +1,113 @@ +//go:build integration +// +build integration + +package audit + +import ( + "context" + "os" + "testing" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + auditrepo "lijiaoqiao/supply-api/internal/audit/repository" +) + +func getAuditStoreTestDB(t *testing.T) *pgxpool.Pool { + t.Helper() + + host := os.Getenv("SUPPLY_API_DB_HOST") + if host == "" { + host = "/var/run/postgresql" + } + port := os.Getenv("SUPPLY_API_DB_PORT") + if port == "" { + port = "5432" + } + user := os.Getenv("SUPPLY_API_DB_USER") + if user == "" { + user = "long" + } + password := os.Getenv("SUPPLY_API_DB_PASSWORD") + dbName := os.Getenv("SUPPLY_API_DB_NAME") + if dbName == "" { + dbName = "supply_test" + } + + var dsn string + if host[0] == '/' { + dsn = "postgres://" + user + ":" + password + "@/" + dbName + "?host=" + host + "&sslmode=disable" + } else { + dsn = "postgres://" + user + ":" + password + "@" + host + ":" + port + "/" + dbName + "?sslmode=disable" + } + + pool, err := pgxpool.New(context.Background(), dsn) + if err != nil { + t.Skipf("跳过集成测试:无法连接数据库: %v", err) + return nil + } + if err := pool.Ping(context.Background()); err != nil { + pool.Close() + t.Skipf("跳过集成测试:无法 ping 数据库: %v", err) + return nil + } + + t.Cleanup(func() { + pool.Close() + }) + return pool +} + +func TestPostgresAuditStore_QueryWithTotalAndGetByID_Integration(t *testing.T) { + if testing.Short() { + t.Skip("跳过集成测试(short mode)") + } + + pool := getAuditStoreTestDB(t) + if pool == nil { + return + } + + store := NewPostgresAuditStore(auditrepo.NewPostgresAuditRepository(pool)) + now := time.Now().UTC().Truncate(time.Millisecond) + if err := store.Emit(context.Background(), Event{ + TenantID: 3001, + ObjectType: "supply_settlement", + ObjectID: 88, + Action: "cancel", + BeforeState: map[string]any{"status": "pending"}, + AfterState: map[string]any{"status": "cancelled"}, + RequestID: "req-audit-store-int", + ResultCode: "OK", + SourceIP: "127.0.0.1", + CreatedAt: now, + }); err != nil { + t.Fatalf("emit audit store event failed: %v", err) + } + + events, total, err := store.QueryWithTotal(context.Background(), EventFilter{ + TenantID: 3001, + Action: "cancel", + Limit: 10, + }) + if err != nil { + t.Fatalf("query with total failed: %v", err) + } + if total == 0 || len(events) == 0 { + t.Fatalf("expected queried audit events, total=%d len=%d", total, len(events)) + } + + found, err := store.GetByID(context.Background(), events[0].EventID) + if err != nil { + t.Fatalf("get audit event by id failed: %v", err) + } + if found.Action != "cancel" { + t.Fatalf("expected action %q, got %q", "cancel", found.Action) + } + if found.BeforeState["status"] != "pending" { + t.Fatalf("expected before status pending, got %v", found.BeforeState["status"]) + } + if found.AfterState["status"] != "cancelled" { + t.Fatalf("expected after status cancelled, got %v", found.AfterState["status"]) + } +} diff --git a/supply-api/internal/audit/repository/audit_repository.go b/supply-api/internal/audit/repository/audit_repository.go index 06a11079..4fe91dab 100644 --- a/supply-api/internal/audit/repository/audit_repository.go +++ b/supply-api/internal/audit/repository/audit_repository.go @@ -87,6 +87,11 @@ func (r *PostgresAuditRepository) Emit(ctx context.Context, event *model.AuditEv return fmt.Errorf("failed to marshal security flags: %w", err) } + complianceTags := event.ComplianceTags + if complianceTags == nil { + complianceTags = []string{} + } + // 序列化状态变更 var beforeStateJSON, afterStateJSON []byte if event.BeforeState != nil { @@ -144,7 +149,7 @@ func (r *PostgresAuditRepository) Emit(ctx context.Context, event *model.AuditEv event.ResultCode, event.ResultMessage, event.Success, beforeStateJSON, afterStateJSON, securityFlagsJSON, event.RiskScore, - event.ComplianceTags, event.InvariantRule, + complianceTags, event.InvariantRule, extensionsJSON, 1, time.Now(), ) diff --git a/supply-api/internal/audit/repository/audit_repository_integration_test.go b/supply-api/internal/audit/repository/audit_repository_integration_test.go new file mode 100644 index 00000000..2b7e4682 --- /dev/null +++ b/supply-api/internal/audit/repository/audit_repository_integration_test.go @@ -0,0 +1,160 @@ +//go:build integration +// +build integration + +package repository + +import ( + "context" + "os" + "testing" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + "lijiaoqiao/supply-api/internal/audit/model" +) + +func getAuditRepositoryTestDB(t *testing.T) *pgxpool.Pool { + t.Helper() + + host := os.Getenv("SUPPLY_API_DB_HOST") + if host == "" { + host = "/var/run/postgresql" + } + port := os.Getenv("SUPPLY_API_DB_PORT") + if port == "" { + port = "5432" + } + user := os.Getenv("SUPPLY_API_DB_USER") + if user == "" { + user = "long" + } + password := os.Getenv("SUPPLY_API_DB_PASSWORD") + dbName := os.Getenv("SUPPLY_API_DB_NAME") + if dbName == "" { + dbName = "supply_test" + } + + var dsn string + if host[0] == '/' { + dsn = "postgres://" + user + ":" + password + "@/" + dbName + "?host=" + host + "&sslmode=disable" + } else { + dsn = "postgres://" + user + ":" + password + "@" + host + ":" + port + "/" + dbName + "?sslmode=disable" + } + + pool, err := pgxpool.New(context.Background(), dsn) + if err != nil { + t.Skipf("跳过集成测试:无法连接数据库: %v", err) + return nil + } + if err := pool.Ping(context.Background()); err != nil { + pool.Close() + t.Skipf("跳过集成测试:无法 ping 数据库: %v", err) + return nil + } + + t.Cleanup(func() { + pool.Close() + }) + return pool +} + +func TestPostgresAuditRepository_EmitQueryAndGetByEventID_Integration(t *testing.T) { + if testing.Short() { + t.Skip("跳过集成测试(short mode)") + } + + pool := getAuditRepositoryTestDB(t) + if pool == nil { + return + } + + repo := NewPostgresAuditRepository(pool) + now := time.Now().UTC().Truncate(time.Millisecond) + event := &model.AuditEvent{ + EventName: "AUD-REPO-INTEGRATION", + EventCategory: model.CategorySECURITY, + EventSubCategory: model.SubCategoryCredIngress, + Timestamp: now, + RequestID: "req-audit-repo-int", + TraceID: "trace-audit-repo-int", + SpanID: "span-audit-repo-int", + IdempotencyKey: "idem-audit-repo-int", + OperatorID: 42, + OperatorType: model.OperatorTypeUser, + OperatorRole: "supplier_admin", + TenantID: 1001, + TenantType: model.TenantTypeSupplier, + ObjectType: "supply_account", + ObjectID: 2002, + Action: "activate", + ActionDetail: "activate account audit trail", + CredentialType: model.CredentialTypePlatformToken, + CredentialID: "ptok-001", + CredentialFingerprint: "fp-001", + SourceType: "api", + SourceIP: "127.0.0.1", + SourceRegion: "cn-hz", + UserAgent: "audit-repo-integration", + TargetType: "http", + TargetEndpoint: "/api/v1/audit/events", + TargetDirect: false, + ResultCode: "OK", + ResultMessage: "emitted", + Success: true, + BeforeState: map[string]any{"status": "pending"}, + AfterState: map[string]any{"status": "active"}, + SecurityFlags: model.SecurityFlags{ + HasCredential: true, + CredentialExposed: false, + Desensitized: true, + Scanned: true, + ScanPassed: true, + ViolationTypes: []string{}, + }, + RiskScore: 12, + ComplianceTags: []string{"SOC2", "GDPR"}, + InvariantRule: "AUD-001", + Extensions: map[string]any{"source": "integration"}, + Version: 1, + CreatedAt: now, + } + + if err := repo.Emit(context.Background(), event); err != nil { + t.Fatalf("emit audit event failed: %v", err) + } + if event.EventID == "" { + t.Fatal("expected event id after emit") + } + + stored, err := repo.GetByEventID(context.Background(), event.EventID) + if err != nil { + t.Fatalf("get by event id failed: %v", err) + } + if stored.TraceID != event.TraceID { + t.Fatalf("expected trace id %q, got %q", event.TraceID, stored.TraceID) + } + if stored.SpanID != event.SpanID { + t.Fatalf("expected span id %q, got %q", event.SpanID, stored.SpanID) + } + if stored.BeforeState["status"] != event.BeforeState["status"] { + t.Fatalf("expected before status %v, got %v", event.BeforeState["status"], stored.BeforeState["status"]) + } + if stored.AfterState["status"] != event.AfterState["status"] { + t.Fatalf("expected after status %v, got %v", event.AfterState["status"], stored.AfterState["status"]) + } + if len(stored.ComplianceTags) != len(event.ComplianceTags) { + t.Fatalf("expected compliance tags %v, got %v", event.ComplianceTags, stored.ComplianceTags) + } + + events, total, err := repo.Query(context.Background(), &EventFilter{ + TenantID: 1001, + EventName: event.EventName, + Limit: 10, + }) + if err != nil { + t.Fatalf("query audit events failed: %v", err) + } + if total == 0 || len(events) == 0 { + t.Fatalf("expected queried events, total=%d len=%d", total, len(events)) + } +} diff --git a/supply-api/sql/postgresql/audit_events_contract_v2.sql b/supply-api/sql/postgresql/audit_events_contract_v2.sql new file mode 100644 index 00000000..6a424ac3 --- /dev/null +++ b/supply-api/sql/postgresql/audit_events_contract_v2.sql @@ -0,0 +1,99 @@ +-- Forward-only migration: align supply-api audit_events with the repository contract. +-- This migration is intended for databases that were initialized from the older +-- compact audit_events schema in supply-api/sql/postgresql/partition_strategy_v1.sql. + +ALTER TABLE IF EXISTS audit_events + ADD COLUMN IF NOT EXISTS trace_id VARCHAR(64) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS span_id VARCHAR(64) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS operator_id BIGINT NOT NULL DEFAULT 0, + ADD COLUMN IF NOT EXISTS operator_type VARCHAR(32) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS operator_role VARCHAR(64) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS tenant_type VARCHAR(32) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS action_detail TEXT NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS credential_type VARCHAR(64) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS credential_id VARCHAR(255) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS credential_fingerprint VARCHAR(255) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS source_type VARCHAR(32) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS source_region VARCHAR(100) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS user_agent TEXT NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS target_type VARCHAR(32) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS target_endpoint TEXT NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS target_direct BOOLEAN NOT NULL DEFAULT FALSE, + ADD COLUMN IF NOT EXISTS result_message TEXT NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS success BOOLEAN NOT NULL DEFAULT FALSE, + ADD COLUMN IF NOT EXISTS before_state JSONB, + ADD COLUMN IF NOT EXISTS after_state JSONB, + ADD COLUMN IF NOT EXISTS security_flags JSONB NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS risk_score INTEGER NOT NULL DEFAULT 0, + ADD COLUMN IF NOT EXISTS compliance_tags TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[], + ADD COLUMN IF NOT EXISTS invariant_rule VARCHAR(255) NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS extensions JSONB, + ADD COLUMN IF NOT EXISTS version INTEGER NOT NULL DEFAULT 1; + +UPDATE audit_events +SET + event_category = COALESCE(event_category, ''), + event_sub_category = COALESCE(event_sub_category, ''), + timestamp_ms = COALESCE(timestamp_ms, EXTRACT(EPOCH FROM timestamp) * 1000), + request_id = COALESCE(request_id, ''), + idempotency_key = COALESCE(idempotency_key, ''), + tenant_id = COALESCE(tenant_id, 0), + object_type = COALESCE(object_type, ''), + action = COALESCE(action, ''), + result_code = COALESCE(result_code, ''), + source_ip = COALESCE(source_ip, ''); + +ALTER TABLE IF EXISTS audit_events + ALTER COLUMN event_category SET DEFAULT '', + ALTER COLUMN event_category SET NOT NULL, + ALTER COLUMN event_sub_category SET DEFAULT '', + ALTER COLUMN event_sub_category SET NOT NULL, + ALTER COLUMN timestamp_ms SET DEFAULT 0, + ALTER COLUMN timestamp_ms SET NOT NULL, + ALTER COLUMN request_id SET DEFAULT '', + ALTER COLUMN request_id SET NOT NULL, + ALTER COLUMN idempotency_key SET DEFAULT '', + ALTER COLUMN idempotency_key SET NOT NULL, + ALTER COLUMN tenant_id SET DEFAULT 0, + ALTER COLUMN tenant_id SET NOT NULL, + ALTER COLUMN object_type SET DEFAULT '', + ALTER COLUMN object_type SET NOT NULL, + ALTER COLUMN result_code SET DEFAULT '', + ALTER COLUMN result_code SET NOT NULL, + ALTER COLUMN source_ip SET DEFAULT '', + ALTER COLUMN source_ip SET NOT NULL; + +DO $$ +DECLARE + current_type TEXT; +BEGIN + SELECT data_type + INTO current_type + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'audit_events' + AND column_name = 'object_id'; + + IF current_type IS NULL THEN + ALTER TABLE audit_events + ADD COLUMN object_id BIGINT NOT NULL DEFAULT 0; + ELSIF current_type <> 'bigint' THEN + UPDATE audit_events + SET object_id = '0' + WHERE object_id IS NULL + OR object_id = '' + OR object_id !~ '^[0-9]+$'; + + ALTER TABLE audit_events + ALTER COLUMN object_id TYPE BIGINT USING object_id::BIGINT; + END IF; + + ALTER TABLE audit_events + ALTER COLUMN object_id SET DEFAULT 0, + ALTER COLUMN object_id SET NOT NULL; +END $$; + +CREATE INDEX IF NOT EXISTS idx_audit_events_event_id ON audit_events(event_id); +CREATE INDEX IF NOT EXISTS idx_audit_events_event_name ON audit_events(event_name); +CREATE INDEX IF NOT EXISTS idx_audit_events_trace_id ON audit_events(trace_id); +CREATE INDEX IF NOT EXISTS idx_audit_events_idempotency_key ON audit_events(idempotency_key); diff --git a/supply-api/sql/postgresql/partition_strategy_v1.sql b/supply-api/sql/postgresql/partition_strategy_v1.sql index 5ca84c5c..3290e7f6 100644 --- a/supply-api/sql/postgresql/partition_strategy_v1.sql +++ b/supply-api/sql/postgresql/partition_strategy_v1.sql @@ -5,22 +5,48 @@ -- 创建父表 CREATE TABLE IF NOT EXISTS audit_events ( - id BIGSERIAL, - event_id VARCHAR(100) NOT NULL, - event_name VARCHAR(100) NOT NULL, - event_category VARCHAR(50), - event_sub_category VARCHAR(50), - timestamp TIMESTAMPTZ NOT NULL, - timestamp_ms BIGINT NOT NULL, - request_id VARCHAR(100), - idempotency_key VARCHAR(128), - tenant_id BIGINT, - object_type VARCHAR(100), - object_id VARCHAR(100), - action VARCHAR(100) NOT NULL, - result_code VARCHAR(50), - source_ip VARCHAR(50), - created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + id BIGSERIAL, + event_id VARCHAR(100) NOT NULL, + event_name VARCHAR(100) NOT NULL, + event_category VARCHAR(50) NOT NULL DEFAULT '', + event_sub_category VARCHAR(50) NOT NULL DEFAULT '', + timestamp TIMESTAMPTZ NOT NULL, + timestamp_ms BIGINT NOT NULL DEFAULT 0, + request_id VARCHAR(100) NOT NULL DEFAULT '', + trace_id VARCHAR(64) NOT NULL DEFAULT '', + span_id VARCHAR(64) NOT NULL DEFAULT '', + idempotency_key VARCHAR(128) NOT NULL DEFAULT '', + operator_id BIGINT NOT NULL DEFAULT 0, + operator_type VARCHAR(32) NOT NULL DEFAULT '', + operator_role VARCHAR(64) NOT NULL DEFAULT '', + tenant_id BIGINT NOT NULL DEFAULT 0, + tenant_type VARCHAR(32) NOT NULL DEFAULT '', + object_type VARCHAR(100) NOT NULL DEFAULT '', + object_id BIGINT NOT NULL DEFAULT 0, + action VARCHAR(100) NOT NULL, + action_detail TEXT NOT NULL DEFAULT '', + credential_type VARCHAR(64) NOT NULL DEFAULT '', + credential_id VARCHAR(255) NOT NULL DEFAULT '', + credential_fingerprint VARCHAR(255) NOT NULL DEFAULT '', + source_type VARCHAR(32) NOT NULL DEFAULT '', + source_ip VARCHAR(50) NOT NULL DEFAULT '', + source_region VARCHAR(100) NOT NULL DEFAULT '', + user_agent TEXT NOT NULL DEFAULT '', + target_type VARCHAR(32) NOT NULL DEFAULT '', + target_endpoint TEXT NOT NULL DEFAULT '', + target_direct BOOLEAN NOT NULL DEFAULT FALSE, + result_code VARCHAR(50) NOT NULL DEFAULT '', + result_message TEXT NOT NULL DEFAULT '', + success BOOLEAN NOT NULL DEFAULT FALSE, + before_state JSONB, + after_state JSONB, + security_flags JSONB NOT NULL DEFAULT '{}'::jsonb, + risk_score INTEGER NOT NULL DEFAULT 0, + compliance_tags TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[], + invariant_rule VARCHAR(255) NOT NULL DEFAULT '', + extensions JSONB, + version INTEGER NOT NULL DEFAULT 1, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, timestamp) ) PARTITION BY RANGE (timestamp); @@ -211,7 +237,11 @@ END $$; -- 在父表上创建索引(会自动继承到分区) CREATE INDEX IF NOT EXISTS idx_audit_events_tenant_id ON audit_events(tenant_id); +CREATE INDEX IF NOT EXISTS idx_audit_events_event_id ON audit_events(event_id); +CREATE INDEX IF NOT EXISTS idx_audit_events_event_name ON audit_events(event_name); CREATE INDEX IF NOT EXISTS idx_audit_events_request_id ON audit_events(request_id); +CREATE INDEX IF NOT EXISTS idx_audit_events_trace_id ON audit_events(trace_id); +CREATE INDEX IF NOT EXISTS idx_audit_events_idempotency_key ON audit_events(idempotency_key); CREATE INDEX IF NOT EXISTS idx_audit_events_created_at ON audit_events(created_at); CREATE INDEX IF NOT EXISTS idx_audit_events_object ON audit_events(object_type, object_id);