diff --git a/internal/batch/confirmation.go b/internal/batch/confirmation.go new file mode 100644 index 00000000..b0d37b68 --- /dev/null +++ b/internal/batch/confirmation.go @@ -0,0 +1,255 @@ +package batch + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "sub2api-cn-relay-manager/internal/store/sqlite" +) + +type ConfirmationResult struct { + StatusCode int + Message string +} + +type ConfirmationItemStore interface { + List(ctx context.Context) ([]sqlite.ImportRunItem, error) + Upsert(ctx context.Context, item sqlite.ImportRunItem) error +} + +type ConfirmationEventStore interface { + Append(ctx context.Context, event sqlite.ImportRunItemEvent) error +} + +type ConfirmationWorker struct { + WorkerID string + ItemStore ConfirmationItemStore + EventStore ConfirmationEventStore + LeaseDuration time.Duration + RetryDelay time.Duration + Confirmer func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) +} + +func (w ConfirmationWorker) Tick(ctx context.Context, now time.Time) error { + if w.ItemStore == nil { + return fmt.Errorf("item store is required") + } + if w.EventStore == nil { + return fmt.Errorf("event store is required") + } + if w.Confirmer == nil { + return fmt.Errorf("confirmer is required") + } + + items, err := w.ItemStore.List(ctx) + if err != nil { + return err + } + + for _, item := range items { + if !isConfirmationCandidate(item, now) { + continue + } + if err := w.ConfirmItem(ctx, item, now); err != nil { + return err + } + } + return nil +} + +func (w ConfirmationWorker) ConfirmItem(ctx context.Context, item sqlite.ImportRunItem, now time.Time) error { + result, err := w.Confirmer(ctx, item) + if err != nil { + return err + } + + item.ConfirmationAttempts++ + item.LeaseOwner = strings.TrimSpace(w.WorkerID) + item.LeaseUntil = now.Add(defaultDuration(w.LeaseDuration, time.Minute)).Format(time.RFC3339) + + switch { + case result.StatusCode >= 200 && result.StatusCode < 300: + item.ConfirmationStatus = string(ConfirmationConfirmed) + item.CurrentStage = string(ItemStageValidate) + item.NextRetryAt = "" + item.LastError = "" + item.LastErrorStage = "" + item.LeaseOwner = "" + item.LeaseUntil = "" + if err := w.ItemStore.Upsert(ctx, item); err != nil { + return err + } + return w.EventStore.Append(ctx, sqlite.ImportRunItemEvent{ + EventID: confirmationEventID(item.ItemID, "stage_transition", now), + RunID: item.RunID, + ItemID: item.ItemID, + EventType: "stage_transition", + Stage: string(ItemStageValidate), + Attempt: item.ConfirmationAttempts, + Message: "confirmation succeeded", + PayloadJSON: `{"confirmation_status":"confirmed"}`, + }) + case result.StatusCode == 403 && supportsProbe403Advisory(item.CapabilityProfileJSON): + item.ConfirmationStatus = string(ConfirmationAdvisory) + item.CurrentStage = string(ItemStageValidate) + item.AdvisoryMessagesJSON = appendAdvisoryJSON(item.AdvisoryMessagesJSON, "initial_probe_race_expected") + item.LastError = strings.TrimSpace(result.Message) + item.LastErrorStage = string(ItemStageConfirm) + item.NextRetryAt = "" + item.LeaseOwner = "" + item.LeaseUntil = "" + if err := w.ItemStore.Upsert(ctx, item); err != nil { + return err + } + return w.EventStore.Append(ctx, sqlite.ImportRunItemEvent{ + EventID: confirmationEventID(item.ItemID, "advisory_added", now), + RunID: item.RunID, + ItemID: item.ItemID, + EventType: "advisory_added", + Stage: string(ItemStageConfirm), + Attempt: item.ConfirmationAttempts, + Message: "initial probe race handled as advisory", + PayloadJSON: `{"advisory":"initial_probe_race_expected"}`, + }) + case isWarmupRetryCandidate(result): + item.RetryCount++ + item.LastRetryAt = now.Format(time.RFC3339) + item.NextRetryAt = now.Add(defaultDuration(w.RetryDelay, time.Second)).Format(time.RFC3339) + item.LastError = strings.TrimSpace(result.Message) + item.LastErrorStage = string(ItemStageConfirm) + item.LeaseOwner = "" + item.LeaseUntil = "" + if err := w.ItemStore.Upsert(ctx, item); err != nil { + return err + } + return w.EventStore.Append(ctx, sqlite.ImportRunItemEvent{ + EventID: confirmationEventID(item.ItemID, "retry_scheduled", now), + RunID: item.RunID, + ItemID: item.ItemID, + EventType: "retry_scheduled", + Stage: string(ItemStageConfirm), + Attempt: item.ConfirmationAttempts, + Message: "initial 503 no available accounts, retry scheduled", + PayloadJSON: fmt.Sprintf(`{"next_retry_at":%q}`, item.NextRetryAt), + }) + default: + item.ConfirmationStatus = string(ConfirmationFailed) + item.CurrentStage = string(ItemStageDone) + item.LastError = strings.TrimSpace(result.Message) + item.LastErrorStage = string(ItemStageConfirm) + item.NextRetryAt = "" + item.LeaseOwner = "" + item.LeaseUntil = "" + if err := w.ItemStore.Upsert(ctx, item); err != nil { + return err + } + return nil + } +} + +func isConfirmationCandidate(item sqlite.ImportRunItem, now time.Time) bool { + if item.CurrentStage != string(ItemStageConfirm) { + return false + } + if item.ConfirmationStatus != string(ConfirmationPending) { + return false + } + if !isRetryDue(item.NextRetryAt, now) { + return false + } + if !leaseExpired(item.LeaseUntil, now) { + return false + } + return true +} + +func isRetryDue(nextRetryAt string, now time.Time) bool { + nextRetryAt = strings.TrimSpace(nextRetryAt) + if nextRetryAt == "" { + return true + } + parsed, err := time.Parse(time.RFC3339, nextRetryAt) + if err != nil { + return true + } + return !parsed.After(now) +} + +func leaseExpired(leaseUntil string, now time.Time) bool { + leaseUntil = strings.TrimSpace(leaseUntil) + if leaseUntil == "" { + return true + } + parsed, err := time.Parse(time.RFC3339, leaseUntil) + if err != nil { + return true + } + return parsed.Before(now) +} + +func supportsProbe403Advisory(capabilityProfileJSON string) bool { + var payload struct { + TransportProfile struct { + KnownAdvisories []string `json:"known_advisories"` + } `json:"transport_profile"` + } + if err := json.Unmarshal([]byte(strings.TrimSpace(capabilityProfileJSON)), &payload); err != nil { + return false + } + for _, advisory := range payload.TransportProfile.KnownAdvisories { + if advisory == "initial_probe_race_expected" { + return true + } + } + return false +} + +func isWarmupRetryCandidate(result ConfirmationResult) bool { + message := strings.ToLower(strings.TrimSpace(result.Message)) + return result.StatusCode == 503 && strings.Contains(message, "no available accounts") +} + +func appendAdvisoryJSON(rawJSON, advisory string) string { + advisory = strings.TrimSpace(advisory) + if advisory == "" { + return defaultJSONString(rawJSON, "[]") + } + + values := []string{} + if err := json.Unmarshal([]byte(defaultJSONString(rawJSON, "[]")), &values); err != nil { + values = []string{} + } + for _, existing := range values { + if existing == advisory { + payload, _ := json.Marshal(values) + return string(payload) + } + } + values = append(values, advisory) + payload, err := json.Marshal(values) + if err != nil { + return defaultJSONString(rawJSON, "[]") + } + return string(payload) +} + +func confirmationEventID(itemID, eventType string, now time.Time) string { + return fmt.Sprintf("%s-%s-%d", itemID, eventType, now.UnixNano()) +} + +func defaultDuration(value, fallback time.Duration) time.Duration { + if value > 0 { + return value + } + return fallback +} + +func defaultJSONString(value, fallback string) string { + if strings.TrimSpace(value) == "" { + return fallback + } + return value +} diff --git a/internal/batch/confirmation_test.go b/internal/batch/confirmation_test.go new file mode 100644 index 00000000..3e0be126 --- /dev/null +++ b/internal/batch/confirmation_test.go @@ -0,0 +1,246 @@ +package batch + +import ( + "context" + "strings" + "testing" + "time" + + "sub2api-cn-relay-manager/internal/store/sqlite" +) + +func TestConfirmationWorker(t *testing.T) { + t.Parallel() + + t.Run("only processes pending confirm items that are due and leaseable", func(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 22, 13, 0, 0, 0, time.UTC) + store := newFakeConfirmationStore([]sqlite.ImportRunItem{ + {ItemID: "eligible", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"}, + {ItemID: "wrong-stage", RunID: "run-1", CurrentStage: "probe", ConfirmationStatus: "pending"}, + {ItemID: "not-pending", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "confirmed"}, + {ItemID: "future-retry", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending", NextRetryAt: now.Add(time.Minute).Format(time.RFC3339)}, + {ItemID: "leased", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending", LeaseUntil: now.Add(time.Minute).Format(time.RFC3339)}, + }) + + worker := ConfirmationWorker{ + WorkerID: "worker-a", + ItemStore: store, + EventStore: store, + LeaseDuration: time.Minute, + RetryDelay: time.Second, + Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) { + return ConfirmationResult{StatusCode: 200}, nil + }, + } + + if err := worker.Tick(context.Background(), now); err != nil { + t.Fatalf("Tick() error = %v", err) + } + if len(store.processed) != 1 || store.processed[0] != "eligible" { + t.Fatalf("processed = %#v, want only eligible item", store.processed) + } + }) + + t.Run("403 probe race becomes advisory and advances to validate", func(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 22, 13, 1, 0, 0, time.UTC) + store := newFakeConfirmationStore([]sqlite.ImportRunItem{ + { + ItemID: "advisory", + RunID: "run-1", + CurrentStage: "confirm", + ConfirmationStatus: "pending", + CapabilityProfileJSON: `{"transport_profile":{"known_advisories":["initial_probe_race_expected"]}}`, + }, + }) + + worker := ConfirmationWorker{ + WorkerID: "worker-a", + ItemStore: store, + EventStore: store, + LeaseDuration: time.Minute, + RetryDelay: time.Second, + Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) { + return ConfirmationResult{StatusCode: 403, Message: "forbidden"}, nil + }, + } + + if err := worker.Tick(context.Background(), now); err != nil { + t.Fatalf("Tick() error = %v", err) + } + + got := store.mustItem(t, "advisory") + if got.ConfirmationStatus != string(ConfirmationAdvisory) { + t.Fatalf("ConfirmationStatus = %q, want advisory", got.ConfirmationStatus) + } + if got.CurrentStage != string(ItemStageValidate) { + t.Fatalf("CurrentStage = %q, want validate", got.CurrentStage) + } + if !strings.Contains(got.AdvisoryMessagesJSON, "initial_probe_race_expected") { + t.Fatalf("AdvisoryMessagesJSON = %q, want initial_probe_race_expected", got.AdvisoryMessagesJSON) + } + }) + + t.Run("initial 503 schedules retry then succeeds", func(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 22, 13, 2, 0, 0, time.UTC) + store := newFakeConfirmationStore([]sqlite.ImportRunItem{ + {ItemID: "retry", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"}, + }) + + callCount := 0 + worker := ConfirmationWorker{ + WorkerID: "worker-a", + ItemStore: store, + EventStore: store, + LeaseDuration: time.Minute, + RetryDelay: time.Second, + Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) { + callCount++ + if callCount == 1 { + return ConfirmationResult{StatusCode: 503, Message: "no available accounts"}, nil + } + return ConfirmationResult{StatusCode: 200}, nil + }, + } + + if err := worker.Tick(context.Background(), now); err != nil { + t.Fatalf("first Tick() error = %v", err) + } + first := store.mustItem(t, "retry") + if first.RetryCount != 1 { + t.Fatalf("RetryCount = %d, want 1", first.RetryCount) + } + if first.ConfirmationStatus != string(ConfirmationPending) { + t.Fatalf("ConfirmationStatus = %q, want pending", first.ConfirmationStatus) + } + if !strings.Contains(first.LastError, "no available accounts") { + t.Fatalf("LastError = %q, want transient message", first.LastError) + } + + if err := worker.Tick(context.Background(), now.Add(2*time.Second)); err != nil { + t.Fatalf("second Tick() error = %v", err) + } + second := store.mustItem(t, "retry") + if second.ConfirmationStatus != string(ConfirmationConfirmed) { + t.Fatalf("ConfirmationStatus = %q, want confirmed", second.ConfirmationStatus) + } + if second.CurrentStage != string(ItemStageValidate) { + t.Fatalf("CurrentStage = %q, want validate", second.CurrentStage) + } + }) + + t.Run("lease prevents duplicate processing across workers", func(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 22, 13, 3, 0, 0, time.UTC) + store := newFakeConfirmationStore([]sqlite.ImportRunItem{ + {ItemID: "shared", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"}, + }) + + confirmer := func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) { + return ConfirmationResult{StatusCode: 200}, nil + } + + workerA := ConfirmationWorker{WorkerID: "worker-a", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer} + workerB := ConfirmationWorker{WorkerID: "worker-b", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer} + + if err := workerA.Tick(context.Background(), now); err != nil { + t.Fatalf("workerA.Tick() error = %v", err) + } + if err := workerB.Tick(context.Background(), now); err != nil { + t.Fatalf("workerB.Tick() error = %v", err) + } + if len(store.processed) != 1 { + t.Fatalf("processed = %#v, want single processing", store.processed) + } + }) + + t.Run("reactivated account metadata is preserved", func(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 22, 13, 4, 0, 0, time.UTC) + store := newFakeConfirmationStore([]sqlite.ImportRunItem{ + { + ItemID: "reactivated", + RunID: "run-1", + CurrentStage: "confirm", + ConfirmationStatus: "pending", + MatchedAccountState: string(MatchedAccountStateDeprecated), + AccountResolution: string(AccountResolutionReactivated), + }, + }) + + worker := ConfirmationWorker{ + WorkerID: "worker-a", + ItemStore: store, + EventStore: store, + LeaseDuration: time.Minute, + RetryDelay: time.Second, + Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) { + return ConfirmationResult{StatusCode: 200}, nil + }, + } + + if err := worker.Tick(context.Background(), now); err != nil { + t.Fatalf("Tick() error = %v", err) + } + got := store.mustItem(t, "reactivated") + if got.MatchedAccountState != string(MatchedAccountStateDeprecated) { + t.Fatalf("MatchedAccountState = %q, want deprecated", got.MatchedAccountState) + } + if got.AccountResolution != string(AccountResolutionReactivated) { + t.Fatalf("AccountResolution = %q, want reactivated", got.AccountResolution) + } + }) +} + +type fakeConfirmationStore struct { + items map[string]sqlite.ImportRunItem + processed []string + events []sqlite.ImportRunItemEvent +} + +func newFakeConfirmationStore(items []sqlite.ImportRunItem) *fakeConfirmationStore { + store := &fakeConfirmationStore{ + items: make(map[string]sqlite.ImportRunItem, len(items)), + events: []sqlite.ImportRunItemEvent{}, + } + for _, item := range items { + store.items[item.ItemID] = item + } + return store +} + +func (f *fakeConfirmationStore) List(ctx context.Context) ([]sqlite.ImportRunItem, error) { + items := make([]sqlite.ImportRunItem, 0, len(f.items)) + for _, item := range f.items { + items = append(items, item) + } + return items, nil +} + +func (f *fakeConfirmationStore) Upsert(ctx context.Context, item sqlite.ImportRunItem) error { + f.items[item.ItemID] = item + f.processed = append(f.processed, item.ItemID) + return nil +} + +func (f *fakeConfirmationStore) Append(ctx context.Context, event sqlite.ImportRunItemEvent) error { + f.events = append(f.events, event) + return nil +} + +func (f *fakeConfirmationStore) mustItem(t *testing.T, itemID string) sqlite.ImportRunItem { + t.Helper() + + item, ok := f.items[itemID] + if !ok { + t.Fatalf("item %q not found", itemID) + } + return item +}