From a94de1b32b757d5076d35dc4a3d17e5684d6185c Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 13 Apr 2026 20:54:38 +0800 Subject: [PATCH] refactor(outbox): share domain backoff policy --- supply-api/internal/domain/outbox.go | 8 ++-- supply-api/internal/domain/outbox_test.go | 6 +-- supply-api/internal/outbox/outbox.go | 16 +------ supply-api/internal/outbox/outbox_test.go | 57 ++++++++++++++++++++++- 4 files changed, 65 insertions(+), 22 deletions(-) diff --git a/supply-api/internal/domain/outbox.go b/supply-api/internal/domain/outbox.go index 38d4828f..66a888dd 100644 --- a/supply-api/internal/domain/outbox.go +++ b/supply-api/internal/domain/outbox.go @@ -148,8 +148,8 @@ func (p *OutboxProcessor) handleFailure(ctx context.Context, event *OutboxEvent, } } else { // 计算下次重试时间(指数退避) - backoffSeconds := calculateBackoff(event.RetryCount, event.MaxRetries) - nextRetry := time.Now().Add(time.Duration(backoffSeconds) * time.Second) + backoffSeconds := CalculateOutboxBackoff(event.RetryCount, event.MaxRetries) + nextRetry := time.Now().Add(time.Duration(backoffSeconds) * time.Second) // 在存储层更新重试状态(这里简化处理) if err := p.eventStore.MarkFailed(ctx, event.EventID, publishErr.Error()); err != nil { @@ -162,8 +162,8 @@ func (p *OutboxProcessor) handleFailure(ctx context.Context, event *OutboxEvent, } } -// calculateBackoff 计算指数退避时间 -func calculateBackoff(retryCount, maxRetries int) int { +// CalculateOutboxBackoff 计算指数退避时间 +func CalculateOutboxBackoff(retryCount, maxRetries int) int { backoff := DefaultInitialBackoffSeconds * int(math.Pow(2, float64(retryCount-1))) if backoff > DefaultMaxBackoffSeconds { backoff = DefaultMaxBackoffSeconds diff --git a/supply-api/internal/domain/outbox_test.go b/supply-api/internal/domain/outbox_test.go index 7c60d186..5d2dd3bf 100644 --- a/supply-api/internal/domain/outbox_test.go +++ b/supply-api/internal/domain/outbox_test.go @@ -56,7 +56,7 @@ func (m *mockOutboxEventStore) MarkFailed(ctx context.Context, eventID string, e if e, ok := m.events[eventID]; ok { e.Status = OutboxStatusFailed e.ErrorMessage = errorMsg - backoff := calculateBackoff(e.RetryCount, e.MaxRetries) + backoff := CalculateOutboxBackoff(e.RetryCount, e.MaxRetries) nextRetry := time.Now().Add(time.Duration(backoff) * time.Second) e.NextRetryAt = &nextRetry m.failed = append(m.failed, e) @@ -269,7 +269,7 @@ func TestP006_ExponentialBackoff(t *testing.T) { } for _, tt := range tests { - backoff := calculateBackoff(tt.retryCount, tt.maxRetries) + backoff := CalculateOutboxBackoff(tt.retryCount, tt.maxRetries) if backoff < tt.expectedMin || backoff > tt.expectedMax { t.Errorf("retry %d: expected backoff %d-%d, got %d", tt.retryCount, tt.expectedMin, tt.expectedMax, backoff) @@ -280,7 +280,7 @@ func TestP006_ExponentialBackoff(t *testing.T) { // TestP006_MaxBackoffCap 验证退避时间上限 func TestP006_MaxBackoffCap(t *testing.T) { // 即使重试很多次,退避时间也不应超过60秒 - backoff := calculateBackoff(100, 100) + backoff := CalculateOutboxBackoff(100, 100) if backoff > DefaultMaxBackoffSeconds { t.Errorf("backoff should be capped at %d, got %d", DefaultMaxBackoffSeconds, backoff) } diff --git a/supply-api/internal/outbox/outbox.go b/supply-api/internal/outbox/outbox.go index 6fbcaa04..6309c1cb 100644 --- a/supply-api/internal/outbox/outbox.go +++ b/supply-api/internal/outbox/outbox.go @@ -3,7 +3,6 @@ package outbox import ( "context" "fmt" - "math" "time" "lijiaoqiao/supply-api/internal/domain" @@ -147,8 +146,8 @@ func (r *OutboxProcessorRunner) handleFailure(ctx context.Context, event *domain } } else { // 计算下次重试时间(指数退避) - backoffSeconds := CalculateOutboxBackoff(event.RetryCount, event.MaxRetries) - nextRetry := time.Now().Add(time.Duration(backoffSeconds) * time.Second) + backoffSeconds := domain.CalculateOutboxBackoff(event.RetryCount, event.MaxRetries) + nextRetry := time.Now().Add(time.Duration(backoffSeconds) * time.Second) if err := r.repo.MarkFailed(ctx, event.EventID, publishErr.Error(), &nextRetry); err != nil { r.stats.RecordOutboxFailure("mark_failed_failed") @@ -157,14 +156,3 @@ func (r *OutboxProcessorRunner) handleFailure(ctx context.Context, event *domain } } } - -// CalculateOutboxBackoff 计算指数退避时间 -func CalculateOutboxBackoff(retryCount, maxRetries int) int { - initialBackoff := 1.0 - maxBackoff := 60.0 - backoff := initialBackoff * math.Pow(2, float64(retryCount-1)) - if backoff > maxBackoff { - backoff = maxBackoff - } - return int(backoff) -} diff --git a/supply-api/internal/outbox/outbox_test.go b/supply-api/internal/outbox/outbox_test.go index c7cc106a..aef5d7d1 100644 --- a/supply-api/internal/outbox/outbox_test.go +++ b/supply-api/internal/outbox/outbox_test.go @@ -3,16 +3,21 @@ package outbox import ( "context" "encoding/json" + "errors" "strings" "testing" "time" + "lijiaoqiao/supply-api/internal/domain" "lijiaoqiao/supply-api/internal/messaging" "lijiaoqiao/supply-api/internal/repository" ) type stubRunnerRepo struct { - events []*repository.OutboxEvent + events []*repository.OutboxEvent + failedEventID string + failedErrorMsg string + failedNextRetryAt *time.Time } func (r *stubRunnerRepo) FetchAndLock(ctx context.Context, limit int) ([]*repository.OutboxEvent, error) { @@ -24,6 +29,9 @@ func (r *stubRunnerRepo) MarkCompleted(ctx context.Context, eventID string) erro } func (r *stubRunnerRepo) MarkFailed(ctx context.Context, eventID string, errorMsg string, nextRetryAt *time.Time) error { + r.failedEventID = eventID + r.failedErrorMsg = errorMsg + r.failedNextRetryAt = nextRetryAt return nil } @@ -56,3 +64,50 @@ func TestOutboxProcessorRunner_ProcessRejectsNilMessageBroker(t *testing.T) { t.Fatalf("expected error to mention message broker, got %v", err) } } + +type failingBroker struct { + err error +} + +func (b *failingBroker) Publish(ctx context.Context, event *repository.OutboxEvent) error { + return b.err +} + +func TestOutboxProcessorRunner_HandleFailureUsesDomainBackoff(t *testing.T) { + payload := json.RawMessage(`{"event":"created"}`) + repo := &stubRunnerRepo{ + events: []*repository.OutboxEvent{ + { + ID: 1, + AggregateType: "account", + AggregateID: "acc-1", + EventType: "created", + EventID: "evt-1", + Payload: payload, + Status: repository.OutboxStatusProcessing, + RetryCount: 0, + MaxRetries: 5, + }, + }, + } + + runner := NewOutboxProcessorRunner(repo, &failingBroker{ + err: errors.New("publish failed"), + }, &messaging.NoOpOutboxStats{}) + + start := time.Now() + if err := runner.process(context.Background()); err != nil { + t.Fatalf("expected runner to handle publish failure, got %v", err) + } + if repo.failedEventID != "evt-1" { + t.Fatalf("expected failed event evt-1, got %s", repo.failedEventID) + } + if repo.failedNextRetryAt == nil { + t.Fatal("expected failed retry timestamp to be recorded") + } + expectedBackoff := time.Duration(domain.CalculateOutboxBackoff(1, 5)) * time.Second + actualBackoff := repo.failedNextRetryAt.Sub(start) + if actualBackoff < expectedBackoff-time.Second || actualBackoff > expectedBackoff+time.Second { + t.Fatalf("expected retry backoff around %s, got %s", expectedBackoff, actualBackoff) + } +}