From 631ba80899c41835e46433b3769fe5e4a7eefc2a Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 6 May 2026 12:36:19 +0800 Subject: [PATCH] fix(test): stabilize sub2api callback flow in ci --- .gitea/workflows/ci.yml | 4 +- test/e2e/sub2api_callback_flow_test.go | 101 +++++++++++++++++-------- 2 files changed, 71 insertions(+), 34 deletions(-) diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml index b3aa930..2491fc3 100644 --- a/.gitea/workflows/ci.yml +++ b/.gitea/workflows/ci.yml @@ -39,12 +39,12 @@ jobs: - name: Run unit and integration tests run: | cd "$WORKSPACE_DIR" - go test ./... -count=1 + go test ./... -count=1 -p 1 - name: Run race tests run: | cd "$WORKSPACE_DIR" - go test -race ./... + go test -race ./... -p 1 - name: Run vet run: | diff --git a/test/e2e/sub2api_callback_flow_test.go b/test/e2e/sub2api_callback_flow_test.go index 1460af8..ea52e5d 100644 --- a/test/e2e/sub2api_callback_flow_test.go +++ b/test/e2e/sub2api_callback_flow_test.go @@ -5,6 +5,7 @@ import ( "context" "database/sql" "encoding/json" + "fmt" "net/http" "net/http/httptest" "sync" @@ -41,6 +42,27 @@ func openE2EPlatformDB(t *testing.T) *sql.DB { return db } +func resetE2EPlatformDB(t *testing.T, db *sql.DB) { + t.Helper() + if db == nil { + t.Fatal("db is nil") + } + if _, err := db.ExecContext(context.Background(), ` + TRUNCATE TABLE + cs_platform_event_dead_letters, + cs_platform_event_delivery_attempts, + cs_platform_event_outbox, + cs_message_dedup, + cs_messages, + cs_tickets, + cs_audit_logs, + cs_sessions + RESTART IDENTITY CASCADE + `); err != nil { + t.Fatalf("reset e2e postgres state failed: %v", err) + } +} + func newSub2APIE2EApp(t *testing.T, callbackURL string, callbackSecret string, maxRetries int) *app.App { t.Helper() cfg := &config.Config{} @@ -69,6 +91,9 @@ func newSub2APIE2EApp(t *testing.T, callbackURL string, callbackSecret string, m cfg.PlatformAdapters.Sub2API.CallbackSecret = callbackSecret cfg.PlatformAdapters.Sub2API.CallbackTimeoutMS = 2000 cfg.PlatformAdapters.Sub2API.CallbackMaxRetries = maxRetries + cfg.PlatformAdapters.Sub2API.CallbackPollIntervalMS = 200 + cfg.PlatformAdapters.Sub2API.CallbackBatchSize = 8 + cfg.PlatformAdapters.Sub2API.CallbackRetrySchedule = []int{1, 2, 5} application, err := app.New(cfg, logging.New()) if err != nil { @@ -92,9 +117,38 @@ func eventually(t *testing.T, timeout time.Duration, fn func() bool) { t.Fatal("condition not satisfied before timeout") } +func waitForSessionEvents(t *testing.T, timeout time.Duration, mu *sync.Mutex, received *[]platformevent.Event, sessionID string, want int) []platformevent.Event { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + mu.Lock() + filtered := make([]platformevent.Event, 0, len(*received)) + for _, event := range *received { + if event.SessionID == sessionID { + filtered = append(filtered, event) + } + } + mu.Unlock() + if len(filtered) == want { + return filtered + } + time.Sleep(200 * time.Millisecond) + } + + mu.Lock() + defer mu.Unlock() + snapshot := make([]string, 0, len(*received)) + for _, event := range *received { + snapshot = append(snapshot, fmt.Sprintf("%s/%s", event.SessionID, event.EventType)) + } + t.Fatalf("session %s received %d events, want %d; snapshot=%v", sessionID, len(snapshot), want, snapshot) + return nil +} + func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *testing.T) { db := openE2EPlatformDB(t) defer db.Close() + resetE2EPlatformDB(t, db) var ( mu sync.Mutex @@ -152,26 +206,7 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes t.Fatalf("ack session_id = %v, want non-empty", ack["session_id"]) } - eventually(t, 8*time.Second, func() bool { - mu.Lock() - defer mu.Unlock() - count := 0 - for _, event := range received { - if event.SessionID == sessionID { - count++ - } - } - return count == 6 - }) - - mu.Lock() - defer mu.Unlock() - filtered := make([]platformevent.Event, 0, 6) - for _, event := range received { - if event.SessionID == sessionID { - filtered = append(filtered, event) - } - } + filtered := waitForSessionEvents(t, 8*time.Second, &mu, &received, sessionID, 6) wantTypes := []string{ platformevent.TypeMessageReceived, platformevent.TypeMessageProcessing, @@ -195,23 +230,25 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes } var deliveredCount int - if err := db.QueryRowContext(context.Background(), ` - SELECT COUNT(1) - FROM cs_platform_event_outbox - WHERE platform = 'sub2api' AND status = 'delivered' AND session_id IN ( - SELECT id FROM cs_sessions WHERE channel = 'sub2api' AND open_id = $1 - ) - `, openID).Scan(&deliveredCount); err != nil { - t.Fatalf("query delivered count failed: %v", err) - } - if deliveredCount != 6 { - t.Fatalf("delivered count = %d, want 6", deliveredCount) - } + eventually(t, 8*time.Second, func() bool { + err := db.QueryRowContext(context.Background(), ` + SELECT COUNT(1) + FROM cs_platform_event_outbox + WHERE platform = 'sub2api' AND status = 'delivered' AND session_id IN ( + SELECT id FROM cs_sessions WHERE channel = 'sub2api' AND open_id = $1 + ) + `, openID).Scan(&deliveredCount) + if err != nil { + t.Fatalf("query delivered count failed: %v", err) + } + return deliveredCount == 6 + }) } func TestSub2APICallbackFlow_ShouldDeadLetterAfterMaxRetries(t *testing.T) { db := openE2EPlatformDB(t) defer db.Close() + resetE2EPlatformDB(t, db) callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusBadGateway)