fix(test): stabilize sub2api callback flow in ci
All checks were successful
CI / verify (push) Successful in 45s

This commit is contained in:
Your Name
2026-05-06 12:36:19 +08:00
parent 06eeb5776b
commit 631ba80899
2 changed files with 71 additions and 34 deletions

View File

@@ -39,12 +39,12 @@ jobs:
- name: Run unit and integration tests - name: Run unit and integration tests
run: | run: |
cd "$WORKSPACE_DIR" cd "$WORKSPACE_DIR"
go test ./... -count=1 go test ./... -count=1 -p 1
- name: Run race tests - name: Run race tests
run: | run: |
cd "$WORKSPACE_DIR" cd "$WORKSPACE_DIR"
go test -race ./... go test -race ./... -p 1
- name: Run vet - name: Run vet
run: | run: |

View File

@@ -5,6 +5,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"sync" "sync"
@@ -41,6 +42,27 @@ func openE2EPlatformDB(t *testing.T) *sql.DB {
return db return db
} }
func resetE2EPlatformDB(t *testing.T, db *sql.DB) {
t.Helper()
if db == nil {
t.Fatal("db is nil")
}
if _, err := db.ExecContext(context.Background(), `
TRUNCATE TABLE
cs_platform_event_dead_letters,
cs_platform_event_delivery_attempts,
cs_platform_event_outbox,
cs_message_dedup,
cs_messages,
cs_tickets,
cs_audit_logs,
cs_sessions
RESTART IDENTITY CASCADE
`); err != nil {
t.Fatalf("reset e2e postgres state failed: %v", err)
}
}
func newSub2APIE2EApp(t *testing.T, callbackURL string, callbackSecret string, maxRetries int) *app.App { func newSub2APIE2EApp(t *testing.T, callbackURL string, callbackSecret string, maxRetries int) *app.App {
t.Helper() t.Helper()
cfg := &config.Config{} cfg := &config.Config{}
@@ -69,6 +91,9 @@ func newSub2APIE2EApp(t *testing.T, callbackURL string, callbackSecret string, m
cfg.PlatformAdapters.Sub2API.CallbackSecret = callbackSecret cfg.PlatformAdapters.Sub2API.CallbackSecret = callbackSecret
cfg.PlatformAdapters.Sub2API.CallbackTimeoutMS = 2000 cfg.PlatformAdapters.Sub2API.CallbackTimeoutMS = 2000
cfg.PlatformAdapters.Sub2API.CallbackMaxRetries = maxRetries cfg.PlatformAdapters.Sub2API.CallbackMaxRetries = maxRetries
cfg.PlatformAdapters.Sub2API.CallbackPollIntervalMS = 200
cfg.PlatformAdapters.Sub2API.CallbackBatchSize = 8
cfg.PlatformAdapters.Sub2API.CallbackRetrySchedule = []int{1, 2, 5}
application, err := app.New(cfg, logging.New()) application, err := app.New(cfg, logging.New())
if err != nil { if err != nil {
@@ -92,9 +117,38 @@ func eventually(t *testing.T, timeout time.Duration, fn func() bool) {
t.Fatal("condition not satisfied before timeout") t.Fatal("condition not satisfied before timeout")
} }
func waitForSessionEvents(t *testing.T, timeout time.Duration, mu *sync.Mutex, received *[]platformevent.Event, sessionID string, want int) []platformevent.Event {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
mu.Lock()
filtered := make([]platformevent.Event, 0, len(*received))
for _, event := range *received {
if event.SessionID == sessionID {
filtered = append(filtered, event)
}
}
mu.Unlock()
if len(filtered) == want {
return filtered
}
time.Sleep(200 * time.Millisecond)
}
mu.Lock()
defer mu.Unlock()
snapshot := make([]string, 0, len(*received))
for _, event := range *received {
snapshot = append(snapshot, fmt.Sprintf("%s/%s", event.SessionID, event.EventType))
}
t.Fatalf("session %s received %d events, want %d; snapshot=%v", sessionID, len(snapshot), want, snapshot)
return nil
}
func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *testing.T) { func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *testing.T) {
db := openE2EPlatformDB(t) db := openE2EPlatformDB(t)
defer db.Close() defer db.Close()
resetE2EPlatformDB(t, db)
var ( var (
mu sync.Mutex mu sync.Mutex
@@ -152,26 +206,7 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes
t.Fatalf("ack session_id = %v, want non-empty", ack["session_id"]) t.Fatalf("ack session_id = %v, want non-empty", ack["session_id"])
} }
eventually(t, 8*time.Second, func() bool { filtered := waitForSessionEvents(t, 8*time.Second, &mu, &received, sessionID, 6)
mu.Lock()
defer mu.Unlock()
count := 0
for _, event := range received {
if event.SessionID == sessionID {
count++
}
}
return count == 6
})
mu.Lock()
defer mu.Unlock()
filtered := make([]platformevent.Event, 0, 6)
for _, event := range received {
if event.SessionID == sessionID {
filtered = append(filtered, event)
}
}
wantTypes := []string{ wantTypes := []string{
platformevent.TypeMessageReceived, platformevent.TypeMessageReceived,
platformevent.TypeMessageProcessing, platformevent.TypeMessageProcessing,
@@ -195,23 +230,25 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes
} }
var deliveredCount int var deliveredCount int
if err := db.QueryRowContext(context.Background(), ` eventually(t, 8*time.Second, func() bool {
SELECT COUNT(1) err := db.QueryRowContext(context.Background(), `
FROM cs_platform_event_outbox SELECT COUNT(1)
WHERE platform = 'sub2api' AND status = 'delivered' AND session_id IN ( FROM cs_platform_event_outbox
SELECT id FROM cs_sessions WHERE channel = 'sub2api' AND open_id = $1 WHERE platform = 'sub2api' AND status = 'delivered' AND session_id IN (
) SELECT id FROM cs_sessions WHERE channel = 'sub2api' AND open_id = $1
`, openID).Scan(&deliveredCount); err != nil { )
t.Fatalf("query delivered count failed: %v", err) `, openID).Scan(&deliveredCount)
} if err != nil {
if deliveredCount != 6 { t.Fatalf("query delivered count failed: %v", err)
t.Fatalf("delivered count = %d, want 6", deliveredCount) }
} return deliveredCount == 6
})
} }
func TestSub2APICallbackFlow_ShouldDeadLetterAfterMaxRetries(t *testing.T) { func TestSub2APICallbackFlow_ShouldDeadLetterAfterMaxRetries(t *testing.T) {
db := openE2EPlatformDB(t) db := openE2EPlatformDB(t)
defer db.Close() defer db.Close()
resetE2EPlatformDB(t, db)
callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusBadGateway) w.WriteHeader(http.StatusBadGateway)