diff --git a/internal/domain/platformevent/event.go b/internal/domain/platformevent/event.go index 446e09c..1fbb5d8 100644 --- a/internal/domain/platformevent/event.go +++ b/internal/domain/platformevent/event.go @@ -38,7 +38,6 @@ type Event struct { SessionID string `json:"session_id,omitempty"` TicketID string `json:"ticket_id,omitempty"` SourceMessageID string `json:"source_message_id,omitempty"` - CallbackTarget string `json:"callback_target"` Payload map[string]any `json:"payload"` Status Status `json:"status"` AttemptCount int `json:"attempt_count"` @@ -60,9 +59,6 @@ func (e Event) Validate() error { if strings.TrimSpace(e.EventType) == "" { return fmt.Errorf("event type is required") } - if strings.TrimSpace(e.CallbackTarget) == "" { - return fmt.Errorf("callback target is required") - } switch e.Status { case StatusPending, StatusRetrying, StatusDelivered, StatusDeadLetter: default: diff --git a/internal/domain/platformevent/event_test.go b/internal/domain/platformevent/event_test.go index 9f2fb0b..880828f 100644 --- a/internal/domain/platformevent/event_test.go +++ b/internal/domain/platformevent/event_test.go @@ -9,14 +9,13 @@ import ( func TestEvent_Validate(t *testing.T) { now := time.Now() event := Event{ - ID: "evt-1", - Platform: "sub2api", - EventType: TypeReplyGenerated, - CallbackTarget: "default", - Status: StatusPending, - AttemptCount: 0, - NextAttemptAt: now, - OccurredAt: now, + ID: "evt-1", + Platform: "sub2api", + EventType: TypeReplyGenerated, + Status: StatusPending, + AttemptCount: 0, + NextAttemptAt: now, + OccurredAt: now, } if err := event.Validate(); err != nil { @@ -26,13 +25,12 @@ func TestEvent_Validate(t *testing.T) { func TestEvent_ValidateRejectsInvalidStatus(t *testing.T) { event := Event{ - ID: "evt-1", - Platform: "sub2api", - EventType: TypeReplyGenerated, - CallbackTarget: "default", - Status: Status("invalid"), - NextAttemptAt: time.Now(), - OccurredAt: time.Now(), + ID: "evt-1", + Platform: "sub2api", + EventType: TypeReplyGenerated, + Status: Status("invalid"), + NextAttemptAt: time.Now(), + OccurredAt: time.Now(), } err := event.Validate() diff --git a/internal/platformadapter/types.go b/internal/platformadapter/types.go index bedafad..7c8a996 100644 --- a/internal/platformadapter/types.go +++ b/internal/platformadapter/types.go @@ -22,7 +22,6 @@ type PlatformInboundMeta struct { Channel string SourceMessageID string SourceUserID string - CallbackTarget string } type PlatformAdapter interface { diff --git a/internal/service/platformdelivery/worker_test.go b/internal/service/platformdelivery/worker_test.go index bb82bdb..c7ffd9b 100644 --- a/internal/service/platformdelivery/worker_test.go +++ b/internal/service/platformdelivery/worker_test.go @@ -68,12 +68,11 @@ func TestWorker_ShouldDeliverPendingEventToCallbackServer(t *testing.T) { now := time.Now().UTC().Truncate(time.Second) store := &stubEventStore{ events: []platformevent.Event{{ - ID: "evt-1", - Platform: "sub2api", - EventType: platformevent.TypeReplyGenerated, - CallbackTarget: "default", - Payload: map[string]any{"reply": "好的"}, - Status: platformevent.StatusPending, + ID: "evt-1", + Platform: "sub2api", + EventType: platformevent.TypeReplyGenerated, + Payload: map[string]any{"reply": "好的"}, + Status: platformevent.StatusPending, NextAttemptAt: now, OccurredAt: now, CreatedAt: now, @@ -113,12 +112,11 @@ func TestWorker_ShouldRetryWhenCallbackReturns5xx(t *testing.T) { now := time.Now().UTC().Truncate(time.Second) store := &stubEventStore{ events: []platformevent.Event{{ - ID: "evt-1", - Platform: "sub2api", - EventType: platformevent.TypeReplyGenerated, - CallbackTarget: "default", - Payload: map[string]any{"reply": "好的"}, - Status: platformevent.StatusPending, + ID: "evt-1", + Platform: "sub2api", + EventType: platformevent.TypeReplyGenerated, + Payload: map[string]any{"reply": "好的"}, + Status: platformevent.StatusPending, NextAttemptAt: now, OccurredAt: now, CreatedAt: now, @@ -152,12 +150,11 @@ func TestWorker_ShouldMoveEventToDeadLetterAfterMaxRetries(t *testing.T) { now := time.Now().UTC().Truncate(time.Second) store := &stubEventStore{ events: []platformevent.Event{{ - ID: "evt-1", - Platform: "sub2api", - EventType: platformevent.TypeReplyGenerated, - CallbackTarget: "default", - Payload: map[string]any{"reply": "失败"}, - Status: platformevent.StatusRetrying, + ID: "evt-1", + Platform: "sub2api", + EventType: platformevent.TypeReplyGenerated, + Payload: map[string]any{"reply": "失败"}, + Status: platformevent.StatusRetrying, AttemptCount: 1, NextAttemptAt: now, OccurredAt: now, @@ -185,12 +182,11 @@ func TestWorker_ShouldPersistDeliveryAttemptAudit(t *testing.T) { now := time.Now().UTC().Truncate(time.Second) store := &stubEventStore{ events: []platformevent.Event{{ - ID: "evt-1", - Platform: "sub2api", - EventType: platformevent.TypeReplyGenerated, - CallbackTarget: "default", - Payload: map[string]any{"reply": "失败"}, - Status: platformevent.StatusPending, + ID: "evt-1", + Platform: "sub2api", + EventType: platformevent.TypeReplyGenerated, + Payload: map[string]any{"reply": "失败"}, + Status: platformevent.StatusPending, NextAttemptAt: now, OccurredAt: now, CreatedAt: now, diff --git a/internal/service/platformevents/builder.go b/internal/service/platformevents/builder.go index 1fc269e..c399cec 100644 --- a/internal/service/platformevents/builder.go +++ b/internal/service/platformevents/builder.go @@ -12,8 +12,6 @@ import ( "github.com/bridge/ai-customer-service/internal/service/dialog" ) -const defaultCallbackTarget = "default" - func BuildInboundEvents(msg *message.UnifiedMessage, result *dialog.Result, meta *platformadapter.PlatformInboundMeta, now time.Time) ([]platformevent.Event, error) { if msg == nil { return nil, fmt.Errorf("message is nil") @@ -28,10 +26,6 @@ func BuildInboundEvents(msg *message.UnifiedMessage, result *dialog.Result, meta now = time.Now() } - callbackTarget := meta.CallbackTarget - if callbackTarget == "" { - callbackTarget = defaultCallbackTarget - } eventIndex := 0 baseEvent := func(eventType string, payload map[string]any) platformevent.Event { eventTime := now.Add(time.Duration(eventIndex) * time.Nanosecond) @@ -43,7 +37,6 @@ func BuildInboundEvents(msg *message.UnifiedMessage, result *dialog.Result, meta SessionID: result.SessionID, TicketID: result.TicketID, SourceMessageID: meta.SourceMessageID, - CallbackTarget: callbackTarget, Payload: payload, Status: platformevent.StatusPending, AttemptCount: 0, diff --git a/internal/service/platformevents/builder_test.go b/internal/service/platformevents/builder_test.go index 595c78a..1f78b56 100644 --- a/internal/service/platformevents/builder_test.go +++ b/internal/service/platformevents/builder_test.go @@ -24,7 +24,6 @@ func TestBuildInboundEvents_ShouldBuildReplyFlowEvents(t *testing.T) { Platform: "sub2api", Channel: "sub2api", SourceMessageID: "m1", - CallbackTarget: "default", }, now, ) @@ -72,7 +71,4 @@ func TestBuildInboundEvents_ShouldIncludeHandoffAndTicketCreated(t *testing.T) { if events[4].EventType != "ticket.created" { t.Fatalf("ticket event type = %s", events[4].EventType) } - if events[0].CallbackTarget != "default" { - t.Fatalf("callback target = %s, want default", events[0].CallbackTarget) - } } diff --git a/internal/store/postgres/platform_event_store.go b/internal/store/postgres/platform_event_store.go index 4f6e8ca..1fb6a0d 100644 --- a/internal/store/postgres/platform_event_store.go +++ b/internal/store/postgres/platform_event_store.go @@ -49,13 +49,13 @@ func (s *PlatformEventStore) InsertPendingBatch(ctx context.Context, events []pl } if _, err := tx.ExecContext(ctx, ` INSERT INTO cs_platform_event_outbox( - id, platform, event_type, session_id, ticket_id, source_message_id, callback_target, + id, platform, event_type, session_id, ticket_id, source_message_id, payload, status, attempt_count, next_attempt_at, occurred_at, delivered_at, last_error, created_at, updated_at ) VALUES ( - $1, $2, $3, NULLIF($4,'')::uuid, NULLIF($5,'')::uuid, $6, $7, - $8::jsonb, $9, $10, $11, $12, $13, NULLIF($14,''), $15, $16 + $1, $2, $3, NULLIF($4,'')::uuid, NULLIF($5,'')::uuid, $6, + $7::jsonb, $8, $9, $10, $11, $12, NULLIF($13,''), $14, $15 ) - `, event.ID, event.Platform, event.EventType, event.SessionID, event.TicketID, event.SourceMessageID, event.CallbackTarget, + `, event.ID, event.Platform, event.EventType, event.SessionID, event.TicketID, event.SourceMessageID, string(payload), string(event.Status), event.AttemptCount, event.NextAttemptAt, event.OccurredAt, event.DeliveredAt, event.LastError, event.CreatedAt, event.UpdatedAt); err != nil { _ = tx.Rollback() return err @@ -77,7 +77,7 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe } rows, err := s.db.QueryContext(ctx, ` SELECT id, platform, event_type, COALESCE(session_id::text, ''), COALESCE(ticket_id::text, ''), COALESCE(source_message_id, ''), - callback_target, payload, status, attempt_count, next_attempt_at, occurred_at, created_at, updated_at, + payload, status, attempt_count, next_attempt_at, occurred_at, created_at, updated_at, delivered_at, COALESCE(last_error, '') FROM cs_platform_event_outbox WHERE platform = $1 AND status IN ('pending', 'retrying') AND next_attempt_at <= $2 @@ -103,7 +103,6 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe &event.SessionID, &event.TicketID, &event.SourceMessageID, - &event.CallbackTarget, &payloadJSON, &status, &event.AttemptCount, @@ -182,8 +181,8 @@ func (s *PlatformEventStore) MarkDeadLetter(ctx context.Context, eventID string, return err } if _, err := tx.ExecContext(ctx, ` - INSERT INTO cs_platform_event_dead_letters(event_id, platform, event_type, callback_target, payload, attempt_count, final_error) - SELECT id, platform, event_type, callback_target, payload, attempt_count, last_error + INSERT INTO cs_platform_event_dead_letters(event_id, platform, event_type, payload, attempt_count, final_error) + SELECT id, platform, event_type, payload, attempt_count, last_error FROM cs_platform_event_outbox WHERE id = $1 ON CONFLICT (event_id) DO UPDATE diff --git a/internal/store/postgres/platform_event_store_test.go b/internal/store/postgres/platform_event_store_test.go index bf5aab4..edc40a2 100644 --- a/internal/store/postgres/platform_event_store_test.go +++ b/internal/store/postgres/platform_event_store_test.go @@ -19,7 +19,6 @@ func TestPlatformEventStore_ShouldInsertPendingEvent(t *testing.T) { ID: uniqueID("evt"), Platform: "sub2api", EventType: platformevent.TypeMessageReceived, - CallbackTarget: "default", Payload: map[string]any{"message": "hello"}, Status: platformevent.StatusPending, AttemptCount: 0, @@ -33,24 +32,20 @@ func TestPlatformEventStore_ShouldInsertPendingEvent(t *testing.T) { if err := store.InsertPending(context.Background(), event); err != nil { t.Fatalf("InsertPending() error = %v", err) } - var ( - status string - callbackName string + status string ) + if err := db.QueryRowContext(context.Background(), ` - SELECT status, callback_target + SELECT status FROM cs_platform_event_outbox WHERE id = $1 - `, event.ID).Scan(&status, &callbackName); err != nil { + `, event.ID).Scan(&status); err != nil { t.Fatalf("query inserted event failed: %v", err) } if status != string(platformevent.StatusPending) { t.Fatalf("status = %s, want %s", status, platformevent.StatusPending) } - if callbackName != "default" { - t.Fatalf("callback target = %s, want default", callbackName) - } } func TestPlatformEventStore_ShouldListPendingEventsInOrder(t *testing.T) { @@ -62,24 +57,22 @@ func TestPlatformEventStore_ShouldListPendingEventsInOrder(t *testing.T) { platformName := "s2a-" + uniqueID("plt")[:8] first := &platformevent.Event{ - ID: uniqueID("evt"), - Platform: platformName, - EventType: platformevent.TypeMessageReceived, - CallbackTarget: "default", - Payload: map[string]any{"step": 1}, - Status: platformevent.StatusPending, + ID: uniqueID("evt"), + Platform: platformName, + EventType: platformevent.TypeMessageReceived, + Payload: map[string]any{"step": 1}, + Status: platformevent.StatusPending, NextAttemptAt: now, OccurredAt: now.Add(-5 * time.Nanosecond), CreatedAt: now.Add(-5 * time.Nanosecond), UpdatedAt: now.Add(-5 * time.Nanosecond), } second := &platformevent.Event{ - ID: uniqueID("evt"), - Platform: platformName, - EventType: platformevent.TypeMessageProcessing, - CallbackTarget: "default", - Payload: map[string]any{"step": 2}, - Status: platformevent.StatusPending, + ID: uniqueID("evt"), + Platform: platformName, + EventType: platformevent.TypeMessageProcessing, + Payload: map[string]any{"step": 2}, + Status: platformevent.StatusPending, NextAttemptAt: now, OccurredAt: now.Add(-4 * time.Nanosecond), CreatedAt: now.Add(-4 * time.Nanosecond), @@ -119,12 +112,11 @@ func TestPlatformEventStore_ShouldPersistDeliveryAttemptAudit(t *testing.T) { store := NewPlatformEventStore(db) now := time.Now().UTC().Truncate(time.Second) event := &platformevent.Event{ - ID: uniqueID("evt"), - Platform: "s2a-" + uniqueID("plt")[:8], - EventType: platformevent.TypeReplyGenerated, - CallbackTarget: "default", - Payload: map[string]any{"reply": "好的"}, - Status: platformevent.StatusPending, + ID: uniqueID("evt"), + Platform: "s2a-" + uniqueID("plt")[:8], + EventType: platformevent.TypeReplyGenerated, + Payload: map[string]any{"reply": "好的"}, + Status: platformevent.StatusPending, NextAttemptAt: now, OccurredAt: now, CreatedAt: now, @@ -166,12 +158,11 @@ func TestPlatformEventStore_ShouldMoveToDeadLetter(t *testing.T) { store := NewPlatformEventStore(db) now := time.Now().UTC().Truncate(time.Second) event := &platformevent.Event{ - ID: uniqueID("evt"), - Platform: "s2a-" + uniqueID("plt")[:8], - EventType: platformevent.TypeReplyGenerated, - CallbackTarget: "default", - Payload: map[string]any{"reply": "失败"}, - Status: platformevent.StatusPending, + ID: uniqueID("evt"), + Platform: "s2a-" + uniqueID("plt")[:8], + EventType: platformevent.TypeReplyGenerated, + Payload: map[string]any{"reply": "失败"}, + Status: platformevent.StatusPending, NextAttemptAt: now, OccurredAt: now, CreatedAt: now,