diff --git a/internal/batch/validation.go b/internal/batch/validation.go new file mode 100644 index 00000000..34e38136 --- /dev/null +++ b/internal/batch/validation.go @@ -0,0 +1,133 @@ +package batch + +import ( + "context" + "fmt" + "strings" + + "sub2api-cn-relay-manager/internal/host/sub2api" + "sub2api-cn-relay-manager/internal/store/sqlite" +) + +type ValidationItemStore interface { + Upsert(ctx context.Context, item sqlite.ImportRunItem) error +} + +type ValidationRunStore interface { + GetByRunID(ctx context.Context, runID string) (sqlite.ImportRun, error) + Update(ctx context.Context, run sqlite.ImportRun) error +} + +type ValidationService struct { + ItemStore ValidationItemStore + RunStore ValidationRunStore + Validator func(ctx context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) +} + +func (s ValidationService) ValidateItem(ctx context.Context, item sqlite.ImportRunItem) error { + if s.ItemStore == nil { + return fmt.Errorf("item store is required") + } + if s.RunStore == nil { + return fmt.Errorf("run store is required") + } + if s.Validator == nil { + return fmt.Errorf("validator is required") + } + if strings.TrimSpace(item.CurrentStage) != string(ItemStageValidate) { + return fmt.Errorf("item %s is not ready for validation", strings.TrimSpace(item.ItemID)) + } + + completion, err := s.Validator(ctx, item) + if err != nil { + return err + } + + item.CurrentStage = string(ItemStageDone) + item.AccessStatus = string(resolveValidationAccessStatus(item.ConfirmationStatus, completion)) + if item.AccessStatus == string(AccessStatusDegraded) { + item.AdvisoryMessagesJSON = appendAdvisoryJSON(item.AdvisoryMessagesJSON, "gateway_warmup_retry_succeeded") + } + if !completion.OK { + item.LastErrorStage = string(ItemStageValidate) + item.LastError = strings.TrimSpace(completion.BodyPreview) + } + + if err := s.ItemStore.Upsert(ctx, item); err != nil { + return err + } + + run, err := s.RunStore.GetByRunID(ctx, item.RunID) + if err != nil { + return err + } + + run.CompletedItems++ + switch item.AccessStatus { + case string(AccessStatusActive): + run.ActiveItems++ + case string(AccessStatusDegraded): + run.DegradedItems++ + run.WarningItems++ + case string(AccessStatusBroken): + run.BrokenItems++ + } + run.State = deriveRunState(run) + + return s.RunStore.Update(ctx, run) +} + +func resolveValidationAccessStatus(confirmationStatus string, completion sub2api.GatewayCompletionResult) AccessStatus { + switch strings.TrimSpace(confirmationStatus) { + case string(ConfirmationFailed): + return AccessStatusBroken + case string(ConfirmationConfirmed), string(ConfirmationAdvisory): + if completion.OK && completion.StatusCode >= 200 && completion.StatusCode < 300 { + return AccessStatusActive + } + if isTransientValidationFailure(completion) { + return AccessStatusDegraded + } + return AccessStatusBroken + default: + return AccessStatusBroken + } +} + +func isTransientValidationFailure(result sub2api.GatewayCompletionResult) bool { + if result.OK { + return false + } + if result.StatusCode != 0 && result.StatusCode != 429 && result.StatusCode != 502 && result.StatusCode != 503 && result.StatusCode != 504 { + return false + } + + body := strings.ToLower(strings.TrimSpace(result.BodyPreview)) + return strings.Contains(body, "service temporarily unavailable") || + strings.Contains(body, "no available accounts") || + strings.Contains(body, "temporar") || + strings.Contains(body, "try again") +} + +func deriveRunState(run sqlite.ImportRun) string { + if run.TotalItems > 0 && run.CompletedItems >= run.TotalItems { + switch { + case run.BrokenItems > 0: + return string(RunStateFailed) + case run.WarningItems > 0 || run.DegradedItems > 0: + return string(RunStateCompletedWithWarnings) + default: + return string(RunStateCompleted) + } + } + return firstNonEmptyRunState(run.State, string(RunStateRunning)) +} + +func firstNonEmptyRunState(values ...string) string { + for _, value := range values { + if trimmed := strings.TrimSpace(value); trimmed != "" { + return trimmed + } + } + return string(RunStateRunning) +} diff --git a/internal/batch/validation_test.go b/internal/batch/validation_test.go new file mode 100644 index 00000000..278abb43 --- /dev/null +++ b/internal/batch/validation_test.go @@ -0,0 +1,212 @@ +package batch + +import ( + "context" + "strings" + "testing" + + "sub2api-cn-relay-manager/internal/host/sub2api" + "sub2api-cn-relay-manager/internal/store/sqlite" +) + +func TestValidationService(t *testing.T) { + t.Parallel() + + t.Run("confirmed or advisory with chat 200 becomes active", func(t *testing.T) { + t.Parallel() + + for _, confirmationStatus := range []string{string(ConfirmationConfirmed), string(ConfirmationAdvisory)} { + confirmationStatus := confirmationStatus + t.Run(confirmationStatus, func(t *testing.T) { + t.Parallel() + + itemStore := &fakeValidationItemStore{} + runStore := &fakeValidationRunStore{ + run: sqlite.ImportRun{RunID: "run-1", TotalItems: 1}, + } + service := ValidationService{ + ItemStore: itemStore, + RunStore: runStore, + Validator: func(ctx context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) { + return sub2api.GatewayCompletionResult{OK: true, StatusCode: 200, ContentType: "application/json"}, nil + }, + } + + item := sqlite.ImportRunItem{ + ItemID: "item-1", + RunID: "run-1", + CurrentStage: string(ItemStageValidate), + ConfirmationStatus: confirmationStatus, + AccessStatus: string(AccessStatusUnknown), + ResolvedSmokeModel: "kimi-k2.6", + APIKeyFingerprint: "sha256:abc", + MatchedAccountState: string(MatchedAccountStateActive), + AccountResolution: string(AccountResolutionReused), + } + + if err := service.ValidateItem(context.Background(), item); err != nil { + t.Fatalf("ValidateItem() error = %v", err) + } + got := itemStore.last + if got.AccessStatus != string(AccessStatusActive) { + t.Fatalf("AccessStatus = %q, want active", got.AccessStatus) + } + if got.CurrentStage != string(ItemStageDone) { + t.Fatalf("CurrentStage = %q, want done", got.CurrentStage) + } + if runStore.updated.ActiveItems != 1 || runStore.updated.CompletedItems != 1 { + t.Fatalf("run summary = %+v, want active/completed increment", runStore.updated) + } + }) + } + }) + + t.Run("exhausted transient completion becomes degraded", func(t *testing.T) { + t.Parallel() + + itemStore := &fakeValidationItemStore{} + runStore := &fakeValidationRunStore{ + run: sqlite.ImportRun{RunID: "run-1", TotalItems: 1}, + } + service := ValidationService{ + ItemStore: itemStore, + RunStore: runStore, + Validator: func(ctx context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) { + return sub2api.GatewayCompletionResult{ + OK: false, + StatusCode: 503, + BodyPreview: `{"error":{"message":"no available accounts"}}`, + }, nil + }, + } + + item := sqlite.ImportRunItem{ + ItemID: "item-1", + RunID: "run-1", + CurrentStage: string(ItemStageValidate), + ConfirmationStatus: string(ConfirmationConfirmed), + AccessStatus: string(AccessStatusUnknown), + ResolvedSmokeModel: "kimi-k2.6", + APIKeyFingerprint: "sha256:abc", + MatchedAccountState: string(MatchedAccountStateActive), + AccountResolution: string(AccountResolutionReused), + } + + if err := service.ValidateItem(context.Background(), item); err != nil { + t.Fatalf("ValidateItem() error = %v", err) + } + if itemStore.last.AccessStatus != string(AccessStatusDegraded) { + t.Fatalf("AccessStatus = %q, want degraded", itemStore.last.AccessStatus) + } + if !strings.Contains(itemStore.last.AdvisoryMessagesJSON, "gateway_warmup_retry_succeeded") { + t.Fatalf("AdvisoryMessagesJSON = %q, want warmup advisory", itemStore.last.AdvisoryMessagesJSON) + } + if runStore.updated.DegradedItems != 1 || runStore.updated.WarningItems != 1 { + t.Fatalf("run summary = %+v, want degraded/warning increment", runStore.updated) + } + }) + + t.Run("definitive invalid path becomes broken", func(t *testing.T) { + t.Parallel() + + itemStore := &fakeValidationItemStore{} + runStore := &fakeValidationRunStore{ + run: sqlite.ImportRun{RunID: "run-1", TotalItems: 1}, + } + service := ValidationService{ + ItemStore: itemStore, + RunStore: runStore, + Validator: func(ctx context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) { + return sub2api.GatewayCompletionResult{ + OK: false, + StatusCode: 404, + BodyPreview: `{"error":"route missing"}`, + }, nil + }, + } + + item := sqlite.ImportRunItem{ + ItemID: "item-1", + RunID: "run-1", + CurrentStage: string(ItemStageValidate), + ConfirmationStatus: string(ConfirmationConfirmed), + AccessStatus: string(AccessStatusUnknown), + ResolvedSmokeModel: "kimi-k2.6", + APIKeyFingerprint: "sha256:abc", + MatchedAccountState: string(MatchedAccountStateActive), + AccountResolution: string(AccountResolutionReused), + } + + if err := service.ValidateItem(context.Background(), item); err != nil { + t.Fatalf("ValidateItem() error = %v", err) + } + if itemStore.last.AccessStatus != string(AccessStatusBroken) { + t.Fatalf("AccessStatus = %q, want broken", itemStore.last.AccessStatus) + } + if runStore.updated.BrokenItems != 1 || runStore.updated.CompletedItems != 1 { + t.Fatalf("run summary = %+v, want broken/completed increment", runStore.updated) + } + }) + + t.Run("only validation stage may write access status", func(t *testing.T) { + t.Parallel() + + itemStore := &fakeValidationItemStore{} + runStore := &fakeValidationRunStore{ + run: sqlite.ImportRun{RunID: "run-1", TotalItems: 1}, + } + service := ValidationService{ + ItemStore: itemStore, + RunStore: runStore, + Validator: func(ctx context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) { + return sub2api.GatewayCompletionResult{OK: true, StatusCode: 200}, nil + }, + } + + item := sqlite.ImportRunItem{ + ItemID: "item-1", + RunID: "run-1", + CurrentStage: string(ItemStageConfirm), + ConfirmationStatus: string(ConfirmationConfirmed), + AccessStatus: string(AccessStatusUnknown), + ResolvedSmokeModel: "kimi-k2.6", + APIKeyFingerprint: "sha256:abc", + MatchedAccountState: string(MatchedAccountStateActive), + AccountResolution: string(AccountResolutionReused), + } + + err := service.ValidateItem(context.Background(), item) + if err == nil { + t.Fatal("ValidateItem() error = nil, want validation stage guard") + } + if itemStore.calls != 0 { + t.Fatalf("item upsert calls = %d, want 0", itemStore.calls) + } + }) +} + +type fakeValidationItemStore struct { + last sqlite.ImportRunItem + calls int +} + +func (f *fakeValidationItemStore) Upsert(ctx context.Context, item sqlite.ImportRunItem) error { + f.last = item + f.calls++ + return nil +} + +type fakeValidationRunStore struct { + run sqlite.ImportRun + updated sqlite.ImportRun +} + +func (f *fakeValidationRunStore) GetByRunID(ctx context.Context, runID string) (sqlite.ImportRun, error) { + return f.run, nil +} + +func (f *fakeValidationRunStore) Update(ctx context.Context, run sqlite.ImportRun) error { + f.updated = run + f.run = run + return nil +}