package batch import ( "context" "strings" "sync" "sync/atomic" "testing" "time" "sub2api-cn-relay-manager/internal/store/sqlite" ) func TestConfirmationWorker(t *testing.T) { t.Parallel() t.Run("only processes pending confirm items that are due and leaseable", func(t *testing.T) { t.Parallel() now := time.Date(2026, 5, 22, 13, 0, 0, 0, time.UTC) store := newFakeConfirmationStore([]sqlite.ImportRunItem{ {ItemID: "eligible", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"}, {ItemID: "wrong-stage", RunID: "run-1", CurrentStage: "probe", ConfirmationStatus: "pending"}, {ItemID: "not-pending", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "confirmed"}, {ItemID: "future-retry", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending", NextRetryAt: now.Add(time.Minute).Format(time.RFC3339)}, {ItemID: "leased", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending", LeaseUntil: now.Add(time.Minute).Format(time.RFC3339)}, }) worker := ConfirmationWorker{ WorkerID: "worker-a", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) { return ConfirmationResult{StatusCode: 200}, nil }, } if err := worker.Tick(context.Background(), now); err != nil { t.Fatalf("Tick() error = %v", err) } if len(store.processed) != 1 || store.processed[0] != "eligible" { t.Fatalf("processed = %#v, want only eligible item", store.processed) } }) t.Run("403 probe race becomes advisory and advances to validate", func(t *testing.T) { t.Parallel() now := time.Date(2026, 5, 22, 13, 1, 0, 0, time.UTC) store := newFakeConfirmationStore([]sqlite.ImportRunItem{ { ItemID: "advisory", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending", CapabilityProfileJSON: `{"transport_profile":{"known_advisories":["initial_probe_race_expected"]}}`, }, }) worker := ConfirmationWorker{ WorkerID: "worker-a", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) { return ConfirmationResult{StatusCode: 403, Message: "forbidden"}, nil }, } if err := worker.Tick(context.Background(), now); err != nil { t.Fatalf("Tick() error = %v", err) } got := store.mustItem(t, "advisory") if got.ConfirmationStatus != string(ConfirmationAdvisory) { t.Fatalf("ConfirmationStatus = %q, want advisory", got.ConfirmationStatus) } if got.CurrentStage != string(ItemStageValidate) { t.Fatalf("CurrentStage = %q, want validate", got.CurrentStage) } if !strings.Contains(got.AdvisoryMessagesJSON, "initial_probe_race_expected") { t.Fatalf("AdvisoryMessagesJSON = %q, want initial_probe_race_expected", got.AdvisoryMessagesJSON) } }) t.Run("initial 503 schedules retry then succeeds", func(t *testing.T) { t.Parallel() now := time.Date(2026, 5, 22, 13, 2, 0, 0, time.UTC) store := newFakeConfirmationStore([]sqlite.ImportRunItem{ {ItemID: "retry", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"}, }) callCount := 0 worker := ConfirmationWorker{ WorkerID: "worker-a", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) { callCount++ if callCount == 1 { return ConfirmationResult{StatusCode: 503, Message: "no available accounts"}, nil } return ConfirmationResult{StatusCode: 200}, nil }, } if err := worker.Tick(context.Background(), now); err != nil { t.Fatalf("first Tick() error = %v", err) } first := store.mustItem(t, "retry") if first.RetryCount != 1 { t.Fatalf("RetryCount = %d, want 1", first.RetryCount) } if first.ConfirmationStatus != string(ConfirmationPending) { t.Fatalf("ConfirmationStatus = %q, want pending", first.ConfirmationStatus) } if !strings.Contains(first.LastError, "no available accounts") { t.Fatalf("LastError = %q, want transient message", first.LastError) } if err := worker.Tick(context.Background(), now.Add(2*time.Second)); err != nil { t.Fatalf("second Tick() error = %v", err) } second := store.mustItem(t, "retry") if second.ConfirmationStatus != string(ConfirmationConfirmed) { t.Fatalf("ConfirmationStatus = %q, want confirmed", second.ConfirmationStatus) } if second.CurrentStage != string(ItemStageValidate) { t.Fatalf("CurrentStage = %q, want validate", second.CurrentStage) } }) t.Run("lease prevents duplicate processing across workers", func(t *testing.T) { t.Parallel() now := time.Date(2026, 5, 22, 13, 3, 0, 0, time.UTC) store := newFakeConfirmationStore([]sqlite.ImportRunItem{ {ItemID: "shared", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"}, }) confirmer := func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) { return ConfirmationResult{StatusCode: 200}, nil } workerA := ConfirmationWorker{WorkerID: "worker-a", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer} workerB := ConfirmationWorker{WorkerID: "worker-b", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer} if err := workerA.Tick(context.Background(), now); err != nil { t.Fatalf("workerA.Tick() error = %v", err) } if err := workerB.Tick(context.Background(), now); err != nil { t.Fatalf("workerB.Tick() error = %v", err) } if len(store.processed) != 1 { t.Fatalf("processed = %#v, want single processing", store.processed) } }) t.Run("concurrent workers do not both call confirmer before lease is persisted", func(t *testing.T) { t.Parallel() now := time.Date(2026, 5, 22, 13, 3, 30, 0, time.UTC) store := newFakeConfirmationStore([]sqlite.ImportRunItem{ {ItemID: "shared", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"}, }) started := make(chan struct{}, 2) release := make(chan struct{}) var calls atomic.Int32 confirmer := func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) { calls.Add(1) started <- struct{}{} <-release return ConfirmationResult{StatusCode: 200}, nil } workerA := ConfirmationWorker{WorkerID: "worker-a", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer} workerB := ConfirmationWorker{WorkerID: "worker-b", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer} errCh := make(chan error, 2) go func() { errCh <- workerA.Tick(context.Background(), now) }() go func() { errCh <- workerB.Tick(context.Background(), now) }() <-started select { case <-started: t.Fatal("second worker reached confirmer before lease was acquired") case <-time.After(50 * time.Millisecond): } close(release) for range 2 { if err := <-errCh; err != nil { t.Fatalf("Tick() error = %v", err) } } if got := calls.Load(); got != 1 { t.Fatalf("confirmer calls = %d, want 1", got) } }) t.Run("reactivated account metadata is preserved", func(t *testing.T) { t.Parallel() now := time.Date(2026, 5, 22, 13, 4, 0, 0, time.UTC) store := newFakeConfirmationStore([]sqlite.ImportRunItem{ { ItemID: "reactivated", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending", MatchedAccountState: string(MatchedAccountStateDeprecated), AccountResolution: string(AccountResolutionReactivated), }, }) worker := ConfirmationWorker{ WorkerID: "worker-a", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) { return ConfirmationResult{StatusCode: 200}, nil }, } if err := worker.Tick(context.Background(), now); err != nil { t.Fatalf("Tick() error = %v", err) } got := store.mustItem(t, "reactivated") if got.MatchedAccountState != string(MatchedAccountStateDeprecated) { t.Fatalf("MatchedAccountState = %q, want deprecated", got.MatchedAccountState) } if got.AccountResolution != string(AccountResolutionReactivated) { t.Fatalf("AccountResolution = %q, want reactivated", got.AccountResolution) } }) } type fakeConfirmationStore struct { mu sync.Mutex items map[string]sqlite.ImportRunItem processed []string events []sqlite.ImportRunItemEvent } func newFakeConfirmationStore(items []sqlite.ImportRunItem) *fakeConfirmationStore { store := &fakeConfirmationStore{ items: make(map[string]sqlite.ImportRunItem, len(items)), events: []sqlite.ImportRunItemEvent{}, } for _, item := range items { store.items[item.ItemID] = item } return store } func (f *fakeConfirmationStore) List(ctx context.Context) ([]sqlite.ImportRunItem, error) { f.mu.Lock() defer f.mu.Unlock() items := make([]sqlite.ImportRunItem, 0, len(f.items)) for _, item := range f.items { items = append(items, item) } return items, nil } func (f *fakeConfirmationStore) Upsert(ctx context.Context, item sqlite.ImportRunItem) error { f.mu.Lock() defer f.mu.Unlock() f.items[item.ItemID] = item f.processed = append(f.processed, item.ItemID) return nil } func (f *fakeConfirmationStore) TryAcquireLease(ctx context.Context, itemID, workerID string, now time.Time, leaseDuration time.Duration) (sqlite.ImportRunItem, bool, error) { f.mu.Lock() defer f.mu.Unlock() item, ok := f.items[itemID] if !ok { return sqlite.ImportRunItem{}, false, nil } if !isConfirmationCandidate(item, now) { return sqlite.ImportRunItem{}, false, nil } item.ConfirmationAttempts++ item.LeaseOwner = workerID item.LeaseUntil = now.Add(leaseDuration).Format(time.RFC3339) f.items[itemID] = item return item, true, nil } func (f *fakeConfirmationStore) Append(ctx context.Context, event sqlite.ImportRunItemEvent) error { f.mu.Lock() defer f.mu.Unlock() f.events = append(f.events, event) return nil } func (f *fakeConfirmationStore) mustItem(t *testing.T, itemID string) sqlite.ImportRunItem { t.Helper() f.mu.Lock() defer f.mu.Unlock() item, ok := f.items[itemID] if !ok { t.Fatalf("item %q not found", itemID) } return item }