feat(outbox): implement concurrent claim mechanism with UPDATE RETURNING + SKIP LOCKED
- Add migration 0004 to introduce 'claiming' status and timeout index - Add StatusClaiming to platformevent domain and allow it in Validate() - Rewrite ListDue as transactional UPDATE ... RETURNING with FOR UPDATE SKIP LOCKED - Add ReleaseStaleClaims to reset expired claiming events back to retrying - Worker Start() now runs a 30s ticker for stale claim recovery (5m timeout) - Update stubEventStore in tests to satisfy new EventStore interface Refs: D-02
This commit is contained in:
11
db/migration/0004_outbox_claiming_status.up.sql
Normal file
11
db/migration/0004_outbox_claiming_status.up.sql
Normal file
@@ -0,0 +1,11 @@
|
||||
-- Add 'claiming' status to outbox CHECK constraint and add claim timeout index
|
||||
|
||||
ALTER TABLE cs_platform_event_outbox
|
||||
DROP CONSTRAINT IF EXISTS chk_cs_platform_event_outbox_status;
|
||||
|
||||
ALTER TABLE cs_platform_event_outbox
|
||||
ADD CONSTRAINT chk_cs_platform_event_outbox_status
|
||||
CHECK (status IN ('pending','retrying','delivered','dead_letter','claiming'));
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_cs_platform_event_outbox_claiming_timeout
|
||||
ON cs_platform_event_outbox(status, updated_at);
|
||||
@@ -13,6 +13,7 @@ const (
|
||||
StatusRetrying Status = "retrying"
|
||||
StatusDelivered Status = "delivered"
|
||||
StatusDeadLetter Status = "dead_letter"
|
||||
StatusClaiming Status = "claiming"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -60,7 +61,7 @@ func (e Event) Validate() error {
|
||||
return fmt.Errorf("event type is required")
|
||||
}
|
||||
switch e.Status {
|
||||
case StatusPending, StatusRetrying, StatusDelivered, StatusDeadLetter:
|
||||
case StatusPending, StatusRetrying, StatusDelivered, StatusDeadLetter, StatusClaiming:
|
||||
default:
|
||||
return fmt.Errorf("invalid status: %s", e.Status)
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ type EventStore interface {
|
||||
RecordDeliveryAttempt(ctx context.Context, eventID string, attemptNo int, responseStatus int, responseBody string, errorMessage string) error
|
||||
MarkRetry(ctx context.Context, eventID string, attemptCount int, nextAttemptAt time.Time, lastError string) error
|
||||
MarkDeadLetter(ctx context.Context, eventID string, attemptCount int, finalError string) error
|
||||
ReleaseStaleClaims(ctx context.Context, timeout time.Duration) (int, error)
|
||||
}
|
||||
|
||||
type Worker struct {
|
||||
@@ -31,6 +32,7 @@ type Worker struct {
|
||||
MaxRetries int
|
||||
BatchSize int
|
||||
PollInterval time.Duration
|
||||
ClaimTimeout time.Duration
|
||||
RetrySchedule []time.Duration
|
||||
Now func() time.Time
|
||||
Logger *slog.Logger
|
||||
@@ -52,6 +54,7 @@ func NewWorker(platform, callbackURL string, store EventStore, client *http.Clie
|
||||
MaxRetries: maxRetries,
|
||||
BatchSize: 20,
|
||||
PollInterval: 5 * time.Second,
|
||||
ClaimTimeout: 5 * time.Minute,
|
||||
RetrySchedule: []time.Duration{10 * time.Second, 30 * time.Second, 60 * time.Second, 5 * time.Minute, 15 * time.Minute},
|
||||
Now: time.Now,
|
||||
}
|
||||
@@ -63,6 +66,8 @@ func (w *Worker) Start(ctx context.Context) {
|
||||
}
|
||||
ticker := time.NewTicker(w.pollInterval())
|
||||
defer ticker.Stop()
|
||||
claimTicker := time.NewTicker(30 * time.Second)
|
||||
defer claimTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -77,6 +82,16 @@ func (w *Worker) Start(ctx context.Context) {
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-claimTicker.C:
|
||||
if w.Store != nil {
|
||||
if _, err := w.Store.ReleaseStaleClaims(ctx, w.claimTimeout()); err != nil && w.Logger != nil {
|
||||
w.Logger.Error("release stale claims failed", "platform", w.Platform, "error", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,6 +184,13 @@ func (w *Worker) pollInterval() time.Duration {
|
||||
return w.PollInterval
|
||||
}
|
||||
|
||||
func (w *Worker) claimTimeout() time.Duration {
|
||||
if w.ClaimTimeout <= 0 {
|
||||
return 5 * time.Minute
|
||||
}
|
||||
return w.ClaimTimeout
|
||||
}
|
||||
|
||||
func (w *Worker) now() time.Time {
|
||||
if w.Now == nil {
|
||||
return time.Now()
|
||||
|
||||
@@ -64,6 +64,10 @@ func (s *stubEventStore) MarkDeadLetter(_ context.Context, eventID string, attem
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stubEventStore) ReleaseStaleClaims(_ context.Context, _ time.Duration) (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func TestWorker_ShouldDeliverPendingEventToCallbackServer(t *testing.T) {
|
||||
now := time.Now().UTC().Truncate(time.Second)
|
||||
store := &stubEventStore{
|
||||
|
||||
@@ -75,14 +75,30 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe
|
||||
if platform == "" {
|
||||
return nil, fmt.Errorf("platform is required")
|
||||
}
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
SELECT id, platform, event_type, COALESCE(session_id::text, ''), COALESCE(ticket_id::text, ''), COALESCE(source_message_id, ''),
|
||||
payload, status, attempt_count, next_attempt_at, occurred_at, created_at, updated_at,
|
||||
delivered_at, COALESCE(last_error, '')
|
||||
FROM cs_platform_event_outbox
|
||||
WHERE platform = $1 AND status IN ('pending', 'retrying') AND next_attempt_at <= $2
|
||||
ORDER BY next_attempt_at ASC, occurred_at ASC, created_at ASC, id ASC
|
||||
LIMIT $3
|
||||
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
rows, err := tx.QueryContext(ctx, `
|
||||
UPDATE cs_platform_event_outbox
|
||||
SET status = 'claiming', updated_at = NOW()
|
||||
WHERE id IN (
|
||||
SELECT id FROM cs_platform_event_outbox
|
||||
WHERE platform = $1 AND status IN ('pending','retrying') AND next_attempt_at <= $2
|
||||
ORDER BY next_attempt_at ASC, occurred_at ASC, created_at ASC, id ASC
|
||||
LIMIT $3
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id, platform, event_type, COALESCE(session_id::text, ''), COALESCE(ticket_id::text, ''), COALESCE(source_message_id, ''),
|
||||
payload, status, attempt_count, next_attempt_at, occurred_at, created_at, updated_at,
|
||||
delivered_at, COALESCE(last_error, '')
|
||||
`, platform, dueBefore, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -126,9 +142,32 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (s *PlatformEventStore) ReleaseStaleClaims(ctx context.Context, timeout time.Duration) (int, error) {
|
||||
if s.db == nil {
|
||||
return 0, fmt.Errorf("db is nil")
|
||||
}
|
||||
res, err := s.db.ExecContext(ctx, `
|
||||
UPDATE cs_platform_event_outbox
|
||||
SET status = 'retrying', updated_at = NOW()
|
||||
WHERE status = 'claiming' AND updated_at < NOW() - $1::interval
|
||||
`, timeout.Seconds())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return int(n), nil
|
||||
}
|
||||
|
||||
func (s *PlatformEventStore) MarkDelivered(ctx context.Context, eventID string, deliveredAt time.Time) error {
|
||||
if s.db == nil {
|
||||
return fmt.Errorf("db is nil")
|
||||
|
||||
Reference in New Issue
Block a user