From 135718836e8b00e74981c19f48d1158178d5f0dc Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 11 May 2026 14:08:19 +0800 Subject: [PATCH] fix(e2e,store,delivery): fix E2E stability - lifecycle ordering, event ordering, callback_target NOT NULL, worker ticker loop --- internal/service/platformdelivery/worker.go | 18 ++++++---------- internal/service/platformevents/builder.go | 2 +- .../store/postgres/platform_event_store.go | 21 +++++++++++++++---- test/e2e/sub2api_callback_flow_test.go | 9 ++++++-- 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/internal/service/platformdelivery/worker.go b/internal/service/platformdelivery/worker.go index 53af6f8..74659d7 100644 --- a/internal/service/platformdelivery/worker.go +++ b/internal/service/platformdelivery/worker.go @@ -64,27 +64,21 @@ func (w *Worker) Start(ctx context.Context) { if ctx == nil { return } + if err := w.RunOnce(ctx); err != nil && w.Logger != nil { + w.Logger.Error("platform callback delivery run failed", "platform", w.Platform, "error", err.Error()) + } ticker := time.NewTicker(w.pollInterval()) defer ticker.Stop() claimTicker := time.NewTicker(30 * time.Second) defer claimTicker.Stop() for { - select { - case <-ctx.Done(): - return - default: - } - if err := w.RunOnce(ctx); err != nil && w.Logger != nil { - w.Logger.Error("platform callback delivery run failed", "platform", w.Platform, "error", err.Error()) - } select { case <-ctx.Done(): return case <-ticker.C: - } - select { - case <-ctx.Done(): - return + if err := w.RunOnce(ctx); err != nil && w.Logger != nil { + w.Logger.Error("platform callback delivery run failed", "platform", w.Platform, "error", err.Error()) + } case <-claimTicker.C: if w.Store != nil { if _, err := w.Store.ReleaseStaleClaims(ctx, w.claimTimeout()); err != nil && w.Logger != nil { diff --git a/internal/service/platformevents/builder.go b/internal/service/platformevents/builder.go index c399cec..b924981 100644 --- a/internal/service/platformevents/builder.go +++ b/internal/service/platformevents/builder.go @@ -28,7 +28,7 @@ func BuildInboundEvents(msg *message.UnifiedMessage, result *dialog.Result, meta eventIndex := 0 baseEvent := func(eventType string, payload map[string]any) platformevent.Event { - eventTime := now.Add(time.Duration(eventIndex) * time.Nanosecond) + eventTime := now.Add(time.Duration(eventIndex) * time.Millisecond) eventIndex++ return platformevent.Event{ ID: uuid.New().String(), diff --git a/internal/store/postgres/platform_event_store.go b/internal/store/postgres/platform_event_store.go index 66ff719..01e86a2 100644 --- a/internal/store/postgres/platform_event_store.go +++ b/internal/store/postgres/platform_event_store.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/json" "fmt" + "sort" "strings" "time" @@ -50,10 +51,10 @@ func (s *PlatformEventStore) InsertPendingBatch(ctx context.Context, events []pl if _, err := tx.ExecContext(ctx, ` INSERT INTO cs_platform_event_outbox( id, platform, event_type, session_id, ticket_id, source_message_id, - payload, status, attempt_count, next_attempt_at, occurred_at, delivered_at, last_error, created_at, updated_at + callback_target, payload, status, attempt_count, next_attempt_at, occurred_at, delivered_at, last_error, created_at, updated_at ) VALUES ( $1, $2, $3, NULLIF($4,'')::uuid, NULLIF($5,'')::uuid, $6, - $7::jsonb, $8, $9, $10, $11, $12, NULLIF($13,''), $14, $15 + '', $7::jsonb, $8, $9, $10, $11, $12, NULLIF($13,''), $14, $15 ) `, event.ID, event.Platform, event.EventType, event.SessionID, event.TicketID, event.SourceMessageID, string(payload), string(event.Status), event.AttemptCount, event.NextAttemptAt, event.OccurredAt, event.DeliveredAt, event.LastError, event.CreatedAt, event.UpdatedAt); err != nil { @@ -146,6 +147,18 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe if err := tx.Commit(); err != nil { return nil, err } + sort.Slice(events, func(i, j int) bool { + if !events[i].NextAttemptAt.Equal(events[j].NextAttemptAt) { + return events[i].NextAttemptAt.Before(events[j].NextAttemptAt) + } + if !events[i].OccurredAt.Equal(events[j].OccurredAt) { + return events[i].OccurredAt.Before(events[j].OccurredAt) + } + if !events[i].CreatedAt.Equal(events[j].CreatedAt) { + return events[i].CreatedAt.Before(events[j].CreatedAt) + } + return events[i].ID < events[j].ID + }) return events, nil } @@ -220,8 +233,8 @@ func (s *PlatformEventStore) MarkDeadLetter(ctx context.Context, eventID string, return err } if _, err := tx.ExecContext(ctx, ` - INSERT INTO cs_platform_event_dead_letters(event_id, platform, event_type, payload, attempt_count, final_error) - SELECT id, platform, event_type, payload, attempt_count, last_error + INSERT INTO cs_platform_event_dead_letters(event_id, platform, event_type, callback_target, payload, attempt_count, final_error) + SELECT id, platform, event_type, callback_target, payload, attempt_count, last_error FROM cs_platform_event_outbox WHERE id = $1 ON CONFLICT (event_id) DO UPDATE diff --git a/test/e2e/sub2api_callback_flow_test.go b/test/e2e/sub2api_callback_flow_test.go index d0b4576..8357572 100644 --- a/test/e2e/sub2api_callback_flow_test.go +++ b/test/e2e/sub2api_callback_flow_test.go @@ -132,7 +132,7 @@ func waitForSessionEvents(t *testing.T, timeout time.Duration, eventsCh <-chan p if len(filtered) == want { return filtered } - case <-time.After(200 * time.Millisecond): + case <-time.After(50 * time.Millisecond): } } @@ -159,7 +159,11 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes if err := json.NewDecoder(r.Body).Decode(&event); err != nil { t.Fatalf("decode callback body failed: %v", err) } - eventsCh <- event + select { + case eventsCh <- event: + case <-time.After(5 * time.Second): + t.Fatalf("eventsCh send timeout") + } w.WriteHeader(http.StatusOK) })) defer callbackServer.Close() @@ -205,6 +209,7 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes } filtered := waitForSessionEvents(t, 8*time.Second, eventsCh, sessionID, 6) + wantTypes := []string{ platformevent.TypeMessageReceived, platformevent.TypeMessageProcessing,