diff --git a/test/e2e/sub2api_callback_flow_test.go b/test/e2e/sub2api_callback_flow_test.go index 99345b0..d0b4576 100644 --- a/test/e2e/sub2api_callback_flow_test.go +++ b/test/e2e/sub2api_callback_flow_test.go @@ -8,7 +8,6 @@ import ( "fmt" "net/http" "net/http/httptest" - "sync" "testing" "time" @@ -63,7 +62,7 @@ func resetE2EPlatformDB(t *testing.T, db *sql.DB) { } } -func newSub2APIE2EApp(t *testing.T, callbackURL string, callbackSecret string, maxRetries int) *app.App { +func newSub2APIE2EApp(t *testing.T, db *sql.DB, callbackURL string, callbackSecret string, maxRetries int) *app.App { t.Helper() cfg := &config.Config{} cfg.HTTP.Addr = ":0" @@ -101,6 +100,9 @@ func newSub2APIE2EApp(t *testing.T, callbackURL string, callbackSecret string, m } t.Cleanup(func() { _ = application.Shutdown(context.Background()) + if db != nil { + _ = db.Close() + } }) return application } @@ -117,56 +119,52 @@ func eventually(t *testing.T, timeout time.Duration, fn func() bool) { t.Fatal("condition not satisfied before timeout") } -func waitForSessionEvents(t *testing.T, timeout time.Duration, mu *sync.Mutex, received *[]platformevent.Event, sessionID string, want int) []platformevent.Event { +func waitForSessionEvents(t *testing.T, timeout time.Duration, eventsCh <-chan platformevent.Event, sessionID string, want int) []platformevent.Event { t.Helper() deadline := time.Now().Add(timeout) + var filtered []platformevent.Event for time.Now().Before(deadline) { - mu.Lock() - filtered := make([]platformevent.Event, 0, len(*received)) - for _, event := range *received { + select { + case event := <-eventsCh: if event.SessionID == sessionID { filtered = append(filtered, event) } + if len(filtered) == want { + return filtered + } + case <-time.After(200 * time.Millisecond): } - mu.Unlock() - if len(filtered) == want { - return filtered - } - time.Sleep(200 * time.Millisecond) } - mu.Lock() - defer mu.Unlock() - snapshot := make([]string, 0, len(*received)) - for _, event := range *received { - snapshot = append(snapshot, fmt.Sprintf("%s/%s", event.SessionID, event.EventType)) + snapshot := make([]string, 0) + for { + select { + case event := <-eventsCh: + snapshot = append(snapshot, fmt.Sprintf("%s/%s", event.SessionID, event.EventType)) + default: + } + break } - t.Fatalf("session %s received %d events, want %d; snapshot=%v", sessionID, len(snapshot), want, snapshot) + t.Fatalf("session %s received %d events, want %d; snapshot=%v", sessionID, len(filtered), want, snapshot) return nil } func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *testing.T) { db := openE2EPlatformDB(t) - defer db.Close() resetE2EPlatformDB(t, db) - var ( - mu sync.Mutex - received []platformevent.Event - ) + eventsCh := make(chan platformevent.Event) callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - mu.Lock() - defer mu.Unlock() var event platformevent.Event if err := json.NewDecoder(r.Body).Decode(&event); err != nil { t.Fatalf("decode callback body failed: %v", err) } - received = append(received, event) + eventsCh <- event w.WriteHeader(http.StatusOK) })) defer callbackServer.Close() - application := newSub2APIE2EApp(t, callbackServer.URL, "sub2api-callback-secret", 3) + application := newSub2APIE2EApp(t, db, callbackServer.URL, "sub2api-callback-secret", 3) server := httptest.NewServer(application.Server.Handler) defer server.Close() @@ -206,7 +204,7 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes t.Fatalf("ack session_id = %v, want non-empty", ack["session_id"]) } - filtered := waitForSessionEvents(t, 8*time.Second, &mu, &received, sessionID, 6) + filtered := waitForSessionEvents(t, 8*time.Second, eventsCh, sessionID, 6) wantTypes := []string{ platformevent.TypeMessageReceived, platformevent.TypeMessageProcessing, @@ -247,7 +245,6 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes func TestSub2APICallbackFlow_ShouldDeadLetterAfterMaxRetries(t *testing.T) { db := openE2EPlatformDB(t) - defer db.Close() resetE2EPlatformDB(t, db) callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { @@ -256,7 +253,7 @@ func TestSub2APICallbackFlow_ShouldDeadLetterAfterMaxRetries(t *testing.T) { })) defer callbackServer.Close() - application := newSub2APIE2EApp(t, callbackServer.URL, "sub2api-callback-secret", 1) + application := newSub2APIE2EApp(t, db, callbackServer.URL, "sub2api-callback-secret", 1) server := httptest.NewServer(application.Server.Handler) defer server.Close()