From 7ae8caf2164eec892ac566e52053731337b76398 Mon Sep 17 00:00:00 2001 From: phamnazage-jpg Date: Sat, 23 May 2026 09:39:02 +0800 Subject: [PATCH] Strengthen runtime and scheduler coverage --- .../access/openai_responses_repair_test.go | 210 ++++++ internal/app/reconcile_background_test.go | 361 ++++++++++ .../sub2api/account_capability_repair_test.go | 67 ++ internal/reconcile/service_runtime_test.go | 636 ++++++++++++++++++ internal/reconcile/service_test.go | 78 +++ 5 files changed, 1352 insertions(+) create mode 100644 internal/access/openai_responses_repair_test.go create mode 100644 internal/host/sub2api/account_capability_repair_test.go create mode 100644 internal/reconcile/service_runtime_test.go diff --git a/internal/access/openai_responses_repair_test.go b/internal/access/openai_responses_repair_test.go new file mode 100644 index 00000000..af18a02d --- /dev/null +++ b/internal/access/openai_responses_repair_test.go @@ -0,0 +1,210 @@ +package access + +import ( + "context" + "errors" + "testing" + + "sub2api-cn-relay-manager/internal/host/sub2api" +) + +func TestSuspectsOpenAIResponsesCapabilityMismatch(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + probe sub2api.ProbeResult + want bool + }{ + { + name: "ok result is not suspect", + probe: sub2api.ProbeResult{OK: true, Message: "API returned 403: Forbidden"}, + want: false, + }, + { + name: "blank message is not suspect", + probe: sub2api.ProbeResult{OK: false}, + want: false, + }, + { + name: "403 forbidden is suspect", + probe: sub2api.ProbeResult{OK: false, Message: " API returned 403: Forbidden "}, + want: true, + }, + { + name: "responses advisory chinese is suspect", + probe: sub2api.ProbeResult{OK: false, Message: "账号本身可正常使用,但当前测试接口仅支持 Responses API 路径。请直接通过实际 API 调用验证。"}, + want: true, + }, + { + name: "responses advisory english is suspect", + probe: sub2api.ProbeResult{OK: false, Message: "Responses API endpoint exists, please directly verify with actual API calls."}, + want: true, + }, + { + name: "unrelated failure is not suspect", + probe: sub2api.ProbeResult{OK: false, Message: "API returned 401: invalid token"}, + want: false, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + if got := SuspectsOpenAIResponsesCapabilityMismatch(tc.probe); got != tc.want { + t.Fatalf("SuspectsOpenAIResponsesCapabilityMismatch(%+v) = %v, want %v", tc.probe, got, tc.want) + } + }) + } +} + +func TestShouldAttemptOpenAIResponsesCapabilityRepair(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + suspect bool + completion sub2api.GatewayCompletionResult + want bool + }{ + { + name: "suspect 502 temporarily unavailable", + suspect: true, + completion: sub2api.GatewayCompletionResult{StatusCode: 502, BodyPreview: `{"error":{"message":"Upstream service temporarily unavailable"}}`}, + want: true, + }, + { + name: "suspect 503 no available accounts", + suspect: true, + completion: sub2api.GatewayCompletionResult{StatusCode: 503, BodyPreview: `{"error":{"message":"No available accounts"}}`}, + want: true, + }, + { + name: "non suspect does not repair", + suspect: false, + completion: sub2api.GatewayCompletionResult{StatusCode: 502, BodyPreview: `{"error":{"message":"Upstream service temporarily unavailable"}}`}, + want: false, + }, + { + name: "successful completion does not repair", + suspect: true, + completion: sub2api.GatewayCompletionResult{OK: true, StatusCode: 200}, + want: false, + }, + { + name: "wrong status does not repair", + suspect: true, + completion: sub2api.GatewayCompletionResult{StatusCode: 500, BodyPreview: `{"error":{"message":"Upstream service temporarily unavailable"}}`}, + want: false, + }, + { + name: "wrong body does not repair", + suspect: true, + completion: sub2api.GatewayCompletionResult{StatusCode: 502, BodyPreview: `{"error":{"message":"bad gateway"}}`}, + want: false, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + if got := ShouldAttemptOpenAIResponsesCapabilityRepair(tc.suspect, tc.completion); got != tc.want { + t.Fatalf("ShouldAttemptOpenAIResponsesCapabilityRepair(%v, %+v) = %v, want %v", tc.suspect, tc.completion, got, tc.want) + } + }) + } +} + +func TestNormalizedAccountIDs(t *testing.T) { + t.Parallel() + + got := normalizedAccountIDs([]string{" account-1 ", "", "account-2", "account-1", " "}) + want := []string{"account-1", "account-2"} + if len(got) != len(want) { + t.Fatalf("normalizedAccountIDs() len = %d, want %d; values = %v", len(got), len(want), got) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("normalizedAccountIDs()[%d] = %q, want %q; values = %v", i, got[i], want[i], got) + } + } +} + +func TestMaybeRepairOpenAIResponsesCapabilitySkipsWithoutAccountIDs(t *testing.T) { + t.Parallel() + + service := NewService(&fakeRepairHost{}) + original := sub2api.GatewayCompletionResult{ + StatusCode: 502, + BodyPreview: `{"error":{"message":"No available accounts"}}`, + } + got, err := service.maybeRepairOpenAIResponsesCapability(context.Background(), ClosureRequest{ + AccountIDs: []string{" ", ""}, + ResponsesCapabilitySuspect: true, + }, sub2api.GatewayCompletionCheckRequest{APIKey: "user-key", Model: "kimi-k2.6"}, original) + if err != nil { + t.Fatalf("maybeRepairOpenAIResponsesCapability() error = %v", err) + } + if got != original { + t.Fatalf("maybeRepairOpenAIResponsesCapability() = %+v, want original %+v", got, original) + } +} + +func TestMaybeRepairOpenAIResponsesCapabilitySwallowsDisableError(t *testing.T) { + t.Parallel() + + host := &fakeRepairHost{disableErr: errors.New("update failed")} + service := NewService(host) + original := sub2api.GatewayCompletionResult{ + StatusCode: 502, + BodyPreview: `{"error":{"message":"Upstream service temporarily unavailable"}}`, + } + got, err := service.maybeRepairOpenAIResponsesCapability(context.Background(), ClosureRequest{ + AccountIDs: []string{"account-1"}, + ResponsesCapabilitySuspect: true, + }, sub2api.GatewayCompletionCheckRequest{APIKey: "user-key", Model: "kimi-k2.6"}, original) + if err != nil { + t.Fatalf("maybeRepairOpenAIResponsesCapability() error = %v", err) + } + if got != original { + t.Fatalf("maybeRepairOpenAIResponsesCapability() = %+v, want original %+v", got, original) + } + if host.disableCalls != 1 { + t.Fatalf("disableCalls = %d, want 1", host.disableCalls) + } + if host.completionCalls != 0 { + t.Fatalf("completionCalls = %d, want 0", host.completionCalls) + } +} + +type fakeRepairHost struct { + disableErr error + disableCalls int + completionCalls int +} + +func (f *fakeRepairHost) EnsureSubscriptionAccess(_ context.Context, _ sub2api.EnsureSubscriptionAccessRequest) (sub2api.SubscriptionAccessRef, error) { + return sub2api.SubscriptionAccessRef{}, nil +} + +func (f *fakeRepairHost) AssignSubscription(_ context.Context, _ sub2api.AssignSubscriptionRequest) (sub2api.SubscriptionRef, error) { + return sub2api.SubscriptionRef{}, nil +} + +func (f *fakeRepairHost) CheckGatewayAccess(_ context.Context, _ sub2api.GatewayAccessCheckRequest) (sub2api.GatewayAccessResult, error) { + return sub2api.GatewayAccessResult{}, nil +} + +func (f *fakeRepairHost) CheckGatewayCompletion(_ context.Context, _ sub2api.GatewayCompletionCheckRequest) (sub2api.GatewayCompletionResult, error) { + f.completionCalls++ + return sub2api.GatewayCompletionResult{OK: true, StatusCode: 200}, nil +} + +func (f *fakeRepairHost) DisableOpenAIResponsesAPI(_ context.Context, _ []string) error { + f.disableCalls++ + return f.disableErr +} diff --git a/internal/app/reconcile_background_test.go b/internal/app/reconcile_background_test.go index 7fc3cce0..b4bfbc52 100644 --- a/internal/app/reconcile_background_test.go +++ b/internal/app/reconcile_background_test.go @@ -2,9 +2,11 @@ package app import ( "context" + "errors" "fmt" "net/http/httptest" "path/filepath" + "strings" "testing" "time" @@ -169,3 +171,362 @@ func seedReconcileBackgroundRuntimeImport(t *testing.T, store *sqlite.DB, baseUR return result.BatchID, hostPK, providerRow.ID } + +func TestRunReconcileBackgroundSweepRequiresStore(t *testing.T) { + t.Parallel() + + err := runReconcileBackgroundSweep(context.Background(), nil, time.Minute, time.Now()) + if err == nil || err.Error() != "store is required" { + t.Fatalf("runReconcileBackgroundSweep() error = %v, want store is required", err) + } +} + +func TestRunReconcileBackgroundSweepReturnsContextErrorBeforeCandidateRun(t *testing.T) { + t.Parallel() + + store := openReconcileBackgroundTestStore(t) + defer closeAppTestStore(t, store) + + seedReconcileBackgroundBatch(t, store) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := runReconcileBackgroundSweep(ctx, store, time.Minute, time.Now()) + if !errors.Is(err, context.Canceled) { + t.Fatalf("runReconcileBackgroundSweep() error = %v, want wrapped %v", err, context.Canceled) + } +} + +func TestRunReconcileBackgroundSweepReturnsJoinedCandidateErrors(t *testing.T) { + t.Parallel() + + store := openReconcileBackgroundTestStore(t) + defer closeAppTestStore(t, store) + + batchID, _, _ := seedReconcileBackgroundBatch(t, store) + + err := runReconcileBackgroundSweep(context.Background(), store, time.Minute, time.Now()) + if err == nil { + t.Fatal("runReconcileBackgroundSweep() error = nil, want candidate failure") + } + want := fmt.Sprintf("run reconcile for batch %d: access closure not found for batch %d", batchID, batchID) + if !strings.Contains(err.Error(), want) { + t.Fatalf("runReconcileBackgroundSweep() error = %v, want contains %q", err, want) + } +} + +func TestReconcileRunDue(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 23, 10, 0, 0, 0, time.UTC) + tests := []struct { + name string + run *sqlite.ReconcileRun + interval time.Duration + want bool + }{ + {name: "nil run", run: nil, interval: time.Minute, want: true}, + {name: "non positive interval", run: &sqlite.ReconcileRun{CreatedAt: "2026-05-23 09:59:59"}, interval: 0, want: true}, + {name: "invalid timestamp", run: &sqlite.ReconcileRun{CreatedAt: "bad-time"}, interval: time.Minute, want: true}, + {name: "recent run not due", run: &sqlite.ReconcileRun{CreatedAt: "2026-05-23 09:59:30"}, interval: time.Minute, want: false}, + {name: "old run due", run: &sqlite.ReconcileRun{CreatedAt: "2026-05-23 09:58:00"}, interval: time.Minute, want: true}, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + if got := reconcileRunDue(now, tc.run, tc.interval); got != tc.want { + t.Fatalf("reconcileRunDue() = %v, want %v", got, tc.want) + } + }) + } +} + +func TestParseAccessClosureDetailsReturnsEmptyMapForInvalidJSON(t *testing.T) { + t.Parallel() + + got := parseAccessClosureDetails("{") + if len(got) != 0 { + t.Fatalf("parseAccessClosureDetails() = %#v, want empty map", got) + } +} + +func TestParseJSONStringArrayAndParseJSONInt(t *testing.T) { + t.Parallel() + + values := parseJSONStringArray([]any{" user-1 ", 42, "", "user-2"}) + if len(values) != 2 || values[0] != "user-1" || values[1] != "user-2" { + t.Fatalf("parseJSONStringArray() = %v, want [user-1 user-2]", values) + } + if got := parseJSONStringArray("wrong-type"); got != nil { + t.Fatalf("parseJSONStringArray(wrong-type) = %v, want nil", got) + } + if got := parseJSONInt(float64(30)); got != 30 { + t.Fatalf("parseJSONInt(float64) = %d, want 30", got) + } + if got := parseJSONInt(15); got != 15 { + t.Fatalf("parseJSONInt(int) = %d, want 15", got) + } + if got := parseJSONInt("30"); got != 0 { + t.Fatalf("parseJSONInt(string) = %d, want 0", got) + } +} + +func TestStoredLoadedPackFallsBackToColumns(t *testing.T) { + t.Parallel() + + loaded, err := storedLoadedPack(sqlite.Pack{ + PackID: "openai-cn-pack", + Version: "1.0.0", + Checksum: "checksum-1", + Vendor: "OpenAI CN", + TargetHost: "sub2api", + MinHostVersion: "0.1.126", + MaxHostVersion: "0.2.x", + ManifestJSON: "{}", + }) + if err != nil { + t.Fatalf("storedLoadedPack() error = %v", err) + } + if loaded.Manifest.PackID != "openai-cn-pack" || loaded.Manifest.TargetHost != "sub2api" || loaded.Checksum != "checksum-1" { + t.Fatalf("storedLoadedPack() = %+v, want fallback fields populated", loaded) + } +} + +func TestStoredLoadedPackRejectsInvalidManifestJSON(t *testing.T) { + t.Parallel() + + _, err := storedLoadedPack(sqlite.Pack{ManifestJSON: "{"}) + if err == nil || !strings.Contains(err.Error(), "decode stored pack manifest") { + t.Fatalf("storedLoadedPack() error = %v, want decode stored pack manifest", err) + } +} + +func TestStoredProviderManifestFallsBackToColumns(t *testing.T) { + t.Parallel() + + provider, err := storedProviderManifest(sqlite.Provider{ + ProviderID: "deepseek", + DisplayName: "DeepSeek", + BaseURL: "https://api.example.com", + Platform: "openai", + AccountType: "openai", + SmokeTestModel: "deepseek-chat", + ManifestJSON: "{}", + }) + if err != nil { + t.Fatalf("storedProviderManifest() error = %v", err) + } + if provider.ProviderID != "deepseek" || provider.AccountType != "openai" || provider.SmokeTestModel != "deepseek-chat" { + t.Fatalf("storedProviderManifest() = %+v, want fallback fields populated", provider) + } +} + +func TestStoredProviderManifestRejectsInvalidManifestJSON(t *testing.T) { + t.Parallel() + + _, err := storedProviderManifest(sqlite.Provider{ManifestJSON: "{"}) + if err == nil || !strings.Contains(err.Error(), "decode stored provider manifest") { + t.Fatalf("storedProviderManifest() error = %v, want decode stored provider manifest", err) + } +} + +func TestResolveManagedResourceHostIDByBatch(t *testing.T) { + t.Parallel() + + store := openReconcileBackgroundTestStore(t) + defer closeAppTestStore(t, store) + + batchID, hostPK, _ := seedReconcileBackgroundBatch(t, store) + if _, err := store.ManagedResources().Create(context.Background(), sqlite.ManagedResource{ + BatchID: batchID, + HostID: hostPK, + ResourceType: "group", + HostResourceID: "group_1", + ResourceName: "group one", + }); err != nil { + t.Fatalf("ManagedResources().Create() error = %v", err) + } + + groupID, err := resolveManagedResourceHostIDByBatch(context.Background(), store, batchID, "group") + if err != nil { + t.Fatalf("resolveManagedResourceHostIDByBatch() error = %v", err) + } + if groupID != "group_1" { + t.Fatalf("groupID = %q, want group_1", groupID) + } + if _, err := resolveManagedResourceHostIDByBatch(context.Background(), store, batchID, "plan"); err == nil || err.Error() != fmt.Sprintf("managed resource %q not found for batch %d", "plan", batchID) { + t.Fatalf("resolveManagedResourceHostIDByBatch(plan) error = %v, want missing resource error", err) + } +} + +func TestReconcileProbeAPIKeySelfService(t *testing.T) { + t.Parallel() + + store := openReconcileBackgroundTestStore(t) + defer closeAppTestStore(t, store) + + batchID, _, _ := seedReconcileBackgroundBatch(t, store) + hostRow := mustGetBackgroundHost(t, store) + tests := []struct { + name string + record sqlite.AccessClosureRecord + wantAPIKey string + wantErr string + }{ + { + name: "prefers access api key", + record: sqlite.AccessClosureRecord{ + BatchID: batchID, + ClosureType: provision.AccessModeSelfService, + Status: "self_service_ready", + DetailsJSON: `{"access_api_key":" access-key ","probe_api_key":"probe-key"}`, + }, + wantAPIKey: "access-key", + }, + { + name: "falls back to probe api key", + record: sqlite.AccessClosureRecord{ + BatchID: batchID, + ClosureType: provision.AccessModeSelfService, + Status: "self_service_ready", + DetailsJSON: `{"probe_api_key":" probe-key "}`, + }, + wantAPIKey: "probe-key", + }, + { + name: "requires api key", + record: sqlite.AccessClosureRecord{ + BatchID: batchID, + ClosureType: provision.AccessModeSelfService, + Status: "self_service_ready", + DetailsJSON: `{}`, + }, + wantErr: "self_service access closure missing probe api key", + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got, err := reconcileProbeAPIKey(context.Background(), store, hostRow, sqlite.ImportBatch{ID: batchID}, []sqlite.AccessClosureRecord{tc.record}) + if tc.wantErr != "" { + if err == nil || err.Error() != tc.wantErr { + t.Fatalf("reconcileProbeAPIKey() error = %v, want %q", err, tc.wantErr) + } + return + } + if err != nil { + t.Fatalf("reconcileProbeAPIKey() error = %v", err) + } + if got != tc.wantAPIKey { + t.Fatalf("reconcileProbeAPIKey() = %q, want %q", got, tc.wantAPIKey) + } + }) + } +} + +func TestReconcileProbeAPIKeyRejectsMissingSubscriptionUsers(t *testing.T) { + t.Parallel() + + store := openReconcileBackgroundTestStore(t) + defer closeAppTestStore(t, store) + + batchID, _, _ := seedReconcileBackgroundBatch(t, store) + hostRow := mustGetBackgroundHost(t, store) + + _, err := reconcileProbeAPIKey(context.Background(), store, hostRow, sqlite.ImportBatch{ID: batchID}, []sqlite.AccessClosureRecord{{ + BatchID: batchID, + ClosureType: provision.AccessModeSubscription, + Status: "subscription_ready", + DetailsJSON: `{}`, + }}) + if err == nil || err.Error() != "subscription access closure missing subscription_users" { + t.Fatalf("reconcileProbeAPIKey() error = %v, want missing subscription_users", err) + } +} + +func TestReconcileProbeAPIKeyRejectsUnsupportedClosureType(t *testing.T) { + t.Parallel() + + store := openReconcileBackgroundTestStore(t) + defer closeAppTestStore(t, store) + + batchID, _, _ := seedReconcileBackgroundBatch(t, store) + hostRow := mustGetBackgroundHost(t, store) + + _, err := reconcileProbeAPIKey(context.Background(), store, hostRow, sqlite.ImportBatch{ID: batchID}, []sqlite.AccessClosureRecord{{ + BatchID: batchID, + ClosureType: "other", + Status: "unknown", + }}) + if err == nil || err.Error() != `unsupported access closure type "other"` { + t.Fatalf("reconcileProbeAPIKey() error = %v, want unsupported type", err) + } +} + +func mustGetBackgroundHost(t *testing.T, store *sqlite.DB) sqlite.Host { + t.Helper() + + host, err := store.Hosts().GetByHostID(context.Background(), "host-1") + if err != nil { + t.Fatalf("Hosts().GetByHostID() error = %v", err) + } + return host +} + +func seedReconcileBackgroundBatch(t *testing.T, store *sqlite.DB) (int64, int64, int64) { + t.Helper() + + hostPK, err := store.Hosts().Create(context.Background(), sqlite.Host{ + HostID: "host-1", + BaseURL: "https://sub2api.example.com", + HostVersion: "0.1.126", + CapabilityProbeJSON: "{}", + AuthType: "apikey", + AuthToken: "host-token", + }) + if err != nil { + t.Fatalf("Hosts().Create() error = %v", err) + } + packPK, err := store.Packs().Create(context.Background(), sqlite.Pack{ + PackID: "openai-cn-pack", + Version: "1.0.0", + Checksum: "checksum-1", + Vendor: "OpenAI CN", + TargetHost: "sub2api", + MinHostVersion: "0.1.126", + MaxHostVersion: "0.2.x", + ManifestJSON: `{"pack_id":"openai-cn-pack","version":"1.0.0","target_host":"sub2api"}`, + }) + if err != nil { + t.Fatalf("Packs().Create() error = %v", err) + } + providerPK, err := store.Providers().Create(context.Background(), sqlite.Provider{ + PackID: packPK, + ProviderID: "deepseek", + DisplayName: "DeepSeek", + BaseURL: "https://api.example.com", + Platform: "openai", + AccountType: "openai", + SmokeTestModel: "deepseek-chat", + ManifestJSON: `{"provider_id":"deepseek","base_url":"https://api.example.com","platform":"openai","account_type":"openai","smoke_test_model":"deepseek-chat"}`, + }) + if err != nil { + t.Fatalf("Providers().Create() error = %v", err) + } + batchID, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{ + HostID: hostPK, + PackID: packPK, + ProviderID: providerPK, + Mode: provision.ImportModePartial, + BatchStatus: "partially_succeeded", + AccessStatus: "self_service_ready", + }) + if err != nil { + t.Fatalf("ImportBatches().Create() error = %v", err) + } + return batchID, hostPK, providerPK +} diff --git a/internal/host/sub2api/account_capability_repair_test.go b/internal/host/sub2api/account_capability_repair_test.go new file mode 100644 index 00000000..648f811e --- /dev/null +++ b/internal/host/sub2api/account_capability_repair_test.go @@ -0,0 +1,67 @@ +package sub2api + +import ( + "context" + "io" + "net/http" + "strings" + "testing" +) + +func TestDisableOpenAIResponsesAPISkipsEmptyAccountIDs(t *testing.T) { + t.Parallel() + + client, err := NewClient("https://sub2api.example.com", WithHTTPClient(&http.Client{ + Transport: roundTripperFunc(func(*http.Request) (*http.Response, error) { + t.Fatal("unexpected HTTP request for empty account ids") + return nil, nil + }), + })) + if err != nil { + t.Fatalf("NewClient() error = %v", err) + } + + if err := client.DisableOpenAIResponsesAPI(context.Background(), []string{" ", ""}); err != nil { + t.Fatalf("DisableOpenAIResponsesAPI() error = %v", err) + } +} + +func TestDisableOpenAIResponsesAPIReturnsHTTPError(t *testing.T) { + t.Parallel() + + var gotPath string + client, err := NewClient("https://sub2api.example.com", WithHTTPClient(&http.Client{ + Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) { + gotPath = req.URL.Path + return &http.Response{ + StatusCode: http.StatusForbidden, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader(`{"error":"forbidden"}`)), + }, nil + }), + })) + if err != nil { + t.Fatalf("NewClient() error = %v", err) + } + + err = client.DisableOpenAIResponsesAPI(context.Background(), []string{"account-1"}) + if err == nil { + t.Fatal("DisableOpenAIResponsesAPI() error = nil, want HTTP error") + } + httpErr, ok := err.(*HTTPError) + if !ok { + t.Fatalf("DisableOpenAIResponsesAPI() error type = %T, want *HTTPError", err) + } + if gotPath != "/api/v1/admin/accounts/account-1" { + t.Fatalf("request path = %q, want /api/v1/admin/accounts/account-1", gotPath) + } + if httpErr.StatusCode != http.StatusForbidden { + t.Fatalf("HTTPError.StatusCode = %d, want %d", httpErr.StatusCode, http.StatusForbidden) + } +} + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (fn roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return fn(req) +} diff --git a/internal/reconcile/service_runtime_test.go b/internal/reconcile/service_runtime_test.go new file mode 100644 index 00000000..ad3a5c58 --- /dev/null +++ b/internal/reconcile/service_runtime_test.go @@ -0,0 +1,636 @@ +package reconcile + +import ( + "context" + "errors" + "fmt" + "path/filepath" + "testing" + + "sub2api-cn-relay-manager/internal/host/sub2api" + "sub2api-cn-relay-manager/internal/store/sqlite" +) + +func TestRerunAccountProbesReturnsErrorForInvalidProbeSummary(t *testing.T) { + t.Parallel() + + store := openReconcileTestStore(t) + defer closeReconcileTestStore(t, store) + + fixture := seedReconcileFixture(t, store) + itemID := mustCreateImportBatchItem(t, store, fixture.batchID, "fp-1", "{") + + svc := NewService(store, &reconcileHostStub{}) + _, err := svc.rerunAccountProbes(context.Background(), []sqlite.ImportBatchItem{{ + ID: itemID, + BatchID: fixture.batchID, + KeyFingerprint: "fp-1", + AccountStatus: "pending", + ProbeSummaryJSON: "{", + }}, "deepseek-chat") + if err == nil || err.Error() != fmt.Sprintf("decode import batch item %d probe summary: unexpected end of JSON input", itemID) { + t.Fatalf("rerunAccountProbes() error = %v, want decode error", err) + } +} + +func TestRerunAccountProbesReturnsErrorForMissingAccountID(t *testing.T) { + t.Parallel() + + store := openReconcileTestStore(t) + defer closeReconcileTestStore(t, store) + + fixture := seedReconcileFixture(t, store) + itemID := mustCreateImportBatchItem(t, store, fixture.batchID, "fp-1", `{}`) + + svc := NewService(store, &reconcileHostStub{}) + _, err := svc.rerunAccountProbes(context.Background(), []sqlite.ImportBatchItem{{ + ID: itemID, + BatchID: fixture.batchID, + KeyFingerprint: "fp-1", + AccountStatus: "pending", + ProbeSummaryJSON: `{}`, + }}, "deepseek-chat") + if err == nil || err.Error() != fmt.Sprintf("import batch item %d missing account_id in probe summary", itemID) { + t.Fatalf("rerunAccountProbes() error = %v, want missing account_id", err) + } +} + +func TestRerunAccountProbesReturnsErrorWhenRetestFails(t *testing.T) { + t.Parallel() + + store := openReconcileTestStore(t) + defer closeReconcileTestStore(t, store) + + fixture := seedReconcileFixture(t, store) + itemID := mustCreateImportBatchItem(t, store, fixture.batchID, "fp-1", `{"account_id":"account-1"}`) + + host := &reconcileHostStub{ + testErrors: map[string]error{"account-1": errors.New("probe failed")}, + } + svc := NewService(store, host) + _, err := svc.rerunAccountProbes(context.Background(), []sqlite.ImportBatchItem{{ + ID: itemID, + BatchID: fixture.batchID, + KeyFingerprint: "fp-1", + AccountStatus: "pending", + ProbeSummaryJSON: `{"account_id":"account-1"}`, + }}, "deepseek-chat") + if err == nil || err.Error() != "re-test account account-1: probe failed" { + t.Fatalf("rerunAccountProbes() error = %v, want retest failure", err) + } +} + +func TestRerunAccountProbesReturnsErrorWhenReloadModelsFails(t *testing.T) { + t.Parallel() + + store := openReconcileTestStore(t) + defer closeReconcileTestStore(t, store) + + fixture := seedReconcileFixture(t, store) + itemID := mustCreateImportBatchItem(t, store, fixture.batchID, "fp-1", `{"account_id":"account-1"}`) + + host := &reconcileHostStub{ + testResults: map[string]sub2api.ProbeResult{ + "account-1": {OK: true, Status: "passed"}, + }, + modelErrors: map[string]error{"account-1": errors.New("models unavailable")}, + } + svc := NewService(store, host) + _, err := svc.rerunAccountProbes(context.Background(), []sqlite.ImportBatchItem{{ + ID: itemID, + BatchID: fixture.batchID, + KeyFingerprint: "fp-1", + AccountStatus: "pending", + ProbeSummaryJSON: `{"account_id":"account-1"}`, + }}, "deepseek-chat") + if err == nil || err.Error() != "reload account models account-1: models unavailable" { + t.Fatalf("rerunAccountProbes() error = %v, want model reload failure", err) + } +} + +func TestRerunAccountProbesPersistsWarningsAndDeduplicatesSuspectAccounts(t *testing.T) { + t.Parallel() + + store := openReconcileTestStore(t) + defer closeReconcileTestStore(t, store) + + fixture := seedReconcileFixture(t, store) + itemID1 := mustCreateImportBatchItem(t, store, fixture.batchID, "fp-1", `{"account_id":"account-1"}`) + itemID2 := mustCreateImportBatchItem(t, store, fixture.batchID, "fp-2", `{"account_id":" account-1 "}`) + + host := &reconcileHostStub{ + testResults: map[string]sub2api.ProbeResult{ + "account-1": { + OK: false, + Status: "failed", + Message: "API returned 403: Forbidden", + }, + }, + models: map[string][]sub2api.AccountModel{ + "account-1": {{ID: "deepseek-chat"}}, + }, + } + svc := NewService(store, host) + summary, err := svc.rerunAccountProbes(context.Background(), []sqlite.ImportBatchItem{ + { + ID: itemID1, + BatchID: fixture.batchID, + KeyFingerprint: "fp-1", + AccountStatus: "pending", + ProbeSummaryJSON: `{"account_id":"account-1"}`, + }, + { + ID: itemID2, + BatchID: fixture.batchID, + KeyFingerprint: "fp-2", + AccountStatus: "pending", + ProbeSummaryJSON: `{"account_id":" account-1 "}`, + }, + }, "deepseek-chat") + if err != nil { + t.Fatalf("rerunAccountProbes() error = %v", err) + } + if summary.Failures != 0 { + t.Fatalf("summary.Failures = %d, want 0 for advisory warning", summary.Failures) + } + if !summary.ResponsesCapabilitySuspect { + t.Fatal("summary.ResponsesCapabilitySuspect = false, want true") + } + if len(summary.AccountIDs) != 1 || summary.AccountIDs[0] != "account-1" { + t.Fatalf("summary.AccountIDs = %v, want [account-1]", summary.AccountIDs) + } + for _, itemID := range []int64{itemID1, itemID2} { + items, err := store.ImportBatchItems().GetByBatchID(context.Background(), fixture.batchID) + if err != nil { + t.Fatalf("ImportBatchItems().GetByBatchID() error = %v", err) + } + var got sqlite.ImportBatchItem + for _, item := range items { + if item.ID == itemID { + got = item + break + } + } + if got.AccountStatus != accountStatusWarning { + t.Fatalf("item %d AccountStatus = %q, want %q", itemID, got.AccountStatus, accountStatusWarning) + } + probes, err := store.ProbeResults().GetByBatchItemID(context.Background(), itemID) + if err != nil { + t.Fatalf("ProbeResults().GetByBatchItemID(%d) error = %v", itemID, err) + } + if len(probes) != 1 || probes[0].Status != accountStatusWarning { + t.Fatalf("probe results for item %d = %+v, want single warning result", itemID, probes) + } + } +} + +func TestRerunAccessClosureReturnsPreviousStatusWithoutProbeAPIKey(t *testing.T) { + t.Parallel() + + store := openReconcileTestStore(t) + defer closeReconcileTestStore(t, store) + + fixture := seedReconcileFixture(t, store) + closures := []sqlite.AccessClosureRecord{{ + BatchID: fixture.batchID, + ClosureType: accessModeSelfService, + Status: accessStatusSelfServiceReady, + }} + + status, checked, err := NewService(store, &reconcileHostStub{}).rerunAccessClosure( + context.Background(), + fixture.batchID, + closures, + "", + "deepseek-chat", + nil, + false, + ) + if err != nil { + t.Fatalf("rerunAccessClosure() error = %v", err) + } + if checked { + t.Fatal("checked = true, want false without probe api key") + } + if status != accessStatusSelfServiceReady { + t.Fatalf("status = %q, want %q", status, accessStatusSelfServiceReady) + } +} + +func TestRerunAccessClosureReturnsErrorWhenGatewayCheckFails(t *testing.T) { + t.Parallel() + + store := openReconcileTestStore(t) + defer closeReconcileTestStore(t, store) + + fixture := seedReconcileFixture(t, store) + closures := mustCreateAndLoadAccessClosures(t, store, fixture.batchID, sqlite.AccessClosureRecord{ + BatchID: fixture.batchID, + ClosureType: accessModeSelfService, + Status: accessStatusSelfServiceReady, + }) + + host := &reconcileHostStub{gatewayErr: errors.New("gateway down")} + _, _, err := NewService(store, host).rerunAccessClosure(context.Background(), fixture.batchID, closures, "user-key", "deepseek-chat", nil, false) + if err == nil || err.Error() != "re-check gateway access: gateway down" { + t.Fatalf("rerunAccessClosure() error = %v, want gateway failure", err) + } +} + +func TestRerunAccessClosureReturnsErrorWhenCompletionCheckFails(t *testing.T) { + t.Parallel() + + store := openReconcileTestStore(t) + defer closeReconcileTestStore(t, store) + + fixture := seedReconcileFixture(t, store) + closures := mustCreateAndLoadAccessClosures(t, store, fixture.batchID, sqlite.AccessClosureRecord{ + BatchID: fixture.batchID, + ClosureType: accessModeSelfService, + Status: accessStatusSelfServiceReady, + }) + + host := &reconcileHostStub{ + gatewayResult: sub2api.GatewayAccessResult{OK: true, HasExpectedModel: true}, + completionErrs: []error{ + errors.New("completion failed"), + }, + } + _, _, err := NewService(store, host).rerunAccessClosure(context.Background(), fixture.batchID, closures, "user-key", "deepseek-chat", nil, false) + if err == nil || err.Error() != "re-check gateway completion: completion failed" { + t.Fatalf("rerunAccessClosure() error = %v, want completion failure", err) + } +} + +func TestRerunAccessClosureReturnsErrorWhenCompletionAfterRepairFails(t *testing.T) { + t.Parallel() + + store := openReconcileTestStore(t) + defer closeReconcileTestStore(t, store) + + fixture := seedReconcileFixture(t, store) + closures := mustCreateAndLoadAccessClosures(t, store, fixture.batchID, sqlite.AccessClosureRecord{ + BatchID: fixture.batchID, + ClosureType: accessModeSelfService, + Status: accessStatusSelfServiceReady, + }) + + host := &reconcileHostStub{ + gatewayResult: sub2api.GatewayAccessResult{OK: true, HasExpectedModel: true}, + completionResults: []sub2api.GatewayCompletionResult{ + {OK: false, StatusCode: 502, BodyPreview: `{"error":{"message":"No available accounts"}}`}, + }, + completionErrs: []error{ + nil, + errors.New("still failing"), + }, + } + _, _, err := NewService(store, host).rerunAccessClosure(context.Background(), fixture.batchID, closures, "user-key", "deepseek-chat", []string{"account-1"}, true) + if err == nil || err.Error() != "re-check gateway completion after capability repair: still failing" { + t.Fatalf("rerunAccessClosure() error = %v, want post-repair completion failure", err) + } +} + +func TestRerunAccessClosurePersistsBrokenRecordWhenRepairCannotRun(t *testing.T) { + t.Parallel() + + store := openReconcileTestStore(t) + defer closeReconcileTestStore(t, store) + + fixture := seedReconcileFixture(t, store) + closures := mustCreateAndLoadAccessClosures(t, store, fixture.batchID, sqlite.AccessClosureRecord{ + BatchID: fixture.batchID, + ClosureType: accessModeSelfService, + Status: accessStatusSelfServiceReady, + }) + + host := &reconcileHostStub{ + gatewayResult: sub2api.GatewayAccessResult{OK: true, StatusCode: 200, HasExpectedModel: true, Models: []string{"deepseek-chat"}}, + completionResults: []sub2api.GatewayCompletionResult{ + {OK: false, StatusCode: 502, ContentType: "application/json", BodyPreview: `{"error":{"message":"Upstream service temporarily unavailable"}}`}, + }, + disableResponsesErr: errors.New("host rejected update"), + } + status, checked, err := NewService(store, host).rerunAccessClosure(context.Background(), fixture.batchID, closures, "user-key", "deepseek-chat", []string{"account-1"}, true) + if err != nil { + t.Fatalf("rerunAccessClosure() error = %v", err) + } + if !checked { + t.Fatal("checked = false, want true") + } + if status != accessStatusBroken { + t.Fatalf("status = %q, want %q", status, accessStatusBroken) + } + if host.disableResponsesCalls != 1 { + t.Fatalf("disableResponsesCalls = %d, want 1", host.disableResponsesCalls) + } + if host.completionCalls != 1 { + t.Fatalf("completionCalls = %d, want 1 without retry after disable failure", host.completionCalls) + } + records, err := store.AccessClosures().GetByBatchID(context.Background(), fixture.batchID) + if err != nil { + t.Fatalf("AccessClosures().GetByBatchID() error = %v", err) + } + if len(records) != 2 { + t.Fatalf("access closure records = %d, want 2", len(records)) + } + if records[1].Status != accessStatusBroken { + t.Fatalf("persisted access closure status = %q, want %q", records[1].Status, accessStatusBroken) + } +} + +func TestStoredResourcesForReconcileMergesCurrentBatchAndSharedScaffoldingOnly(t *testing.T) { + t.Parallel() + + store := openReconcileTestStore(t) + defer closeReconcileTestStore(t, store) + + fixture := seedReconcileFixture(t, store) + otherBatchID := mustCreateImportBatch(t, store, fixture.hostPK, fixture.packPK, fixture.providerPK, "partial", "succeeded", "self_service_ready") + mustCreateManagedResource(t, store, fixture.batchID, fixture.hostPK, "group", "group-1", "group one") + mustCreateManagedResource(t, store, fixture.batchID, fixture.hostPK, "account", "account-1", "account one") + mustCreateManagedResource(t, store, otherBatchID, fixture.hostPK, "group", "group-2", "shared group") + mustCreateManagedResource(t, store, otherBatchID, fixture.hostPK, "channel", "channel-2", "shared channel") + mustCreateManagedResource(t, store, otherBatchID, fixture.hostPK, "plan", "plan-2", "shared plan") + mustCreateManagedResource(t, store, otherBatchID, fixture.hostPK, "account", "account-2", "shared account should not merge") + + got, err := NewService(store, &reconcileHostStub{}).storedResourcesForReconcile(context.Background(), fixture.providerPK, fixture.hostPK, fixture.batchID) + if err != nil { + t.Fatalf("storedResourcesForReconcile() error = %v", err) + } + if len(got) != 5 { + t.Fatalf("storedResourcesForReconcile() len = %d, want 5; values = %+v", len(got), got) + } + want := map[string]bool{ + "group:group-1": false, + "group:group-2": false, + "account:account-1": false, + "channel:channel-2": false, + "plan:plan-2": false, + } + for _, resource := range got { + key := resource.ResourceType + ":" + resource.HostResourceID + if _, ok := want[key]; !ok { + t.Fatalf("unexpected merged resource %q in %+v", key, got) + } + want[key] = true + } + for key, seen := range want { + if !seen { + t.Fatalf("missing merged resource %q in %+v", key, got) + } + } +} + +type reconcileFixture struct { + hostPK int64 + packPK int64 + providerPK int64 + batchID int64 +} + +func openReconcileTestStore(t *testing.T) *sqlite.DB { + t.Helper() + + dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_pragma=foreign_keys(0)", filepath.ToSlash(filepath.Join(t.TempDir(), "state.db"))) + store, err := sqlite.Open(context.Background(), dsn) + if err != nil { + t.Fatalf("sqlite.Open() error = %v", err) + } + return store +} + +func closeReconcileTestStore(t *testing.T, store *sqlite.DB) { + t.Helper() + if err := store.Close(); err != nil { + t.Fatalf("store.Close() error = %v", err) + } +} + +func seedReconcileFixture(t *testing.T, store *sqlite.DB) reconcileFixture { + t.Helper() + + hostPK, err := store.Hosts().Create(context.Background(), sqlite.Host{ + HostID: "host-1", + BaseURL: "https://sub2api.example.com", + HostVersion: "0.1.126", + AuthType: "apikey", + AuthToken: "test-token", + }) + if err != nil { + t.Fatalf("Hosts().Create() error = %v", err) + } + packPK, err := store.Packs().Create(context.Background(), sqlite.Pack{ + PackID: "openai-cn-pack", + Version: "1.0.0", + Checksum: "checksum-1", + }) + if err != nil { + t.Fatalf("Packs().Create() error = %v", err) + } + providerPK, err := store.Providers().Create(context.Background(), sqlite.Provider{ + PackID: packPK, + ProviderID: "deepseek", + DisplayName: "DeepSeek", + BaseURL: "https://api.example.com", + Platform: "openai", + AccountType: "openai", + SmokeTestModel: "deepseek-chat", + }) + if err != nil { + t.Fatalf("Providers().Create() error = %v", err) + } + batchID := mustCreateImportBatch(t, store, hostPK, packPK, providerPK, "partial", "partially_succeeded", "self_service_ready") + return reconcileFixture{ + hostPK: hostPK, + packPK: packPK, + providerPK: providerPK, + batchID: batchID, + } +} + +func mustCreateImportBatch(t *testing.T, store *sqlite.DB, hostPK, packPK, providerPK int64, mode, batchStatus, accessStatus string) int64 { + t.Helper() + + id, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{ + HostID: hostPK, + PackID: packPK, + ProviderID: providerPK, + Mode: mode, + BatchStatus: batchStatus, + AccessStatus: accessStatus, + }) + if err != nil { + t.Fatalf("ImportBatches().Create() error = %v", err) + } + return id +} + +func mustCreateImportBatchItem(t *testing.T, store *sqlite.DB, batchID int64, fingerprint, probeSummary string) int64 { + t.Helper() + + id, err := store.ImportBatchItems().Create(context.Background(), sqlite.ImportBatchItem{ + BatchID: batchID, + KeyFingerprint: fingerprint, + AccountStatus: "pending", + ProbeSummaryJSON: probeSummary, + }) + if err != nil { + t.Fatalf("ImportBatchItems().Create() error = %v", err) + } + return id +} + +func mustCreateManagedResource(t *testing.T, store *sqlite.DB, batchID, hostPK int64, resourceType, hostResourceID, resourceName string) int64 { + t.Helper() + + id, err := store.ManagedResources().Create(context.Background(), sqlite.ManagedResource{ + BatchID: batchID, + HostID: hostPK, + ResourceType: resourceType, + HostResourceID: hostResourceID, + ResourceName: resourceName, + }) + if err != nil { + t.Fatalf("ManagedResources().Create() error = %v", err) + } + return id +} + +func mustCreateAndLoadAccessClosures(t *testing.T, store *sqlite.DB, batchID int64, records ...sqlite.AccessClosureRecord) []sqlite.AccessClosureRecord { + t.Helper() + + for _, record := range records { + if _, err := store.AccessClosures().Create(context.Background(), record); err != nil { + t.Fatalf("AccessClosures().Create() error = %v", err) + } + } + loaded, err := store.AccessClosures().GetByBatchID(context.Background(), batchID) + if err != nil { + t.Fatalf("AccessClosures().GetByBatchID() error = %v", err) + } + return loaded +} + +type reconcileHostStub struct { + disableResponsesErr error + gatewayErr error + gatewayResult sub2api.GatewayAccessResult + testResults map[string]sub2api.ProbeResult + testErrors map[string]error + models map[string][]sub2api.AccountModel + modelErrors map[string]error + completionResults []sub2api.GatewayCompletionResult + completionErrs []error + completionCalls int + disableResponsesCalls int + disabledResponsesAccounts []string +} + +func (h *reconcileHostStub) GetHostVersion(context.Context) (string, error) { + return "0.1.126", nil +} + +func (h *reconcileHostStub) ProbeCapabilities(context.Context) (sub2api.HostCapabilities, error) { + return sub2api.HostCapabilities{}, nil +} + +func (h *reconcileHostStub) CreateGroup(context.Context, sub2api.CreateGroupRequest) (sub2api.GroupRef, error) { + return sub2api.GroupRef{}, nil +} + +func (h *reconcileHostStub) DeleteGroup(context.Context, string) error { + return nil +} + +func (h *reconcileHostStub) CreateChannel(context.Context, sub2api.CreateChannelRequest) (sub2api.ChannelRef, error) { + return sub2api.ChannelRef{}, nil +} + +func (h *reconcileHostStub) UpdateChannel(context.Context, string, sub2api.CreateChannelRequest) error { + return nil +} + +func (h *reconcileHostStub) DeleteChannel(context.Context, string) error { + return nil +} + +func (h *reconcileHostStub) CreatePlan(context.Context, sub2api.CreatePlanRequest) (sub2api.PlanRef, error) { + return sub2api.PlanRef{}, nil +} + +func (h *reconcileHostStub) DeletePlan(context.Context, string) error { + return nil +} + +func (h *reconcileHostStub) CreateAccount(context.Context, sub2api.CreateAccountRequest) (sub2api.AccountRef, error) { + return sub2api.AccountRef{}, nil +} + +func (h *reconcileHostStub) BatchCreateAccounts(context.Context, sub2api.BatchCreateAccountsRequest) ([]sub2api.AccountRef, error) { + return nil, nil +} + +func (h *reconcileHostStub) DeleteAccount(context.Context, string) error { + return nil +} + +func (h *reconcileHostStub) TestAccount(_ context.Context, accountID, _ string) (sub2api.ProbeResult, error) { + if err, ok := h.testErrors[accountID]; ok { + return sub2api.ProbeResult{}, err + } + if result, ok := h.testResults[accountID]; ok { + return result, nil + } + return sub2api.ProbeResult{}, fmt.Errorf("missing test result for %s", accountID) +} + +func (h *reconcileHostStub) GetAccountModels(_ context.Context, accountID string) ([]sub2api.AccountModel, error) { + if err, ok := h.modelErrors[accountID]; ok { + return nil, err + } + if models, ok := h.models[accountID]; ok { + return models, nil + } + return nil, fmt.Errorf("missing models for %s", accountID) +} + +func (h *reconcileHostStub) EnsureSubscriptionAccess(context.Context, sub2api.EnsureSubscriptionAccessRequest) (sub2api.SubscriptionAccessRef, error) { + return sub2api.SubscriptionAccessRef{}, nil +} + +func (h *reconcileHostStub) AssignSubscription(context.Context, sub2api.AssignSubscriptionRequest) (sub2api.SubscriptionRef, error) { + return sub2api.SubscriptionRef{}, nil +} + +func (h *reconcileHostStub) CheckGatewayAccess(_ context.Context, _ sub2api.GatewayAccessCheckRequest) (sub2api.GatewayAccessResult, error) { + if h.gatewayErr != nil { + return sub2api.GatewayAccessResult{}, h.gatewayErr + } + return h.gatewayResult, nil +} + +func (h *reconcileHostStub) CheckGatewayCompletion(_ context.Context, _ sub2api.GatewayCompletionCheckRequest) (sub2api.GatewayCompletionResult, error) { + idx := h.completionCalls + h.completionCalls++ + if idx < len(h.completionErrs) && h.completionErrs[idx] != nil { + return sub2api.GatewayCompletionResult{}, h.completionErrs[idx] + } + if len(h.completionResults) == 0 { + return sub2api.GatewayCompletionResult{}, nil + } + if idx >= len(h.completionResults) { + idx = len(h.completionResults) - 1 + } + return h.completionResults[idx], nil +} + +func (h *reconcileHostStub) DisableOpenAIResponsesAPI(_ context.Context, accountIDs []string) error { + h.disableResponsesCalls++ + h.disabledResponsesAccounts = append([]string(nil), accountIDs...) + return h.disableResponsesErr +} + +func (h *reconcileHostStub) ListManagedResources(context.Context, sub2api.ListManagedResourcesRequest) (sub2api.ManagedResourceSnapshot, error) { + return sub2api.ManagedResourceSnapshot{}, nil +} diff --git a/internal/reconcile/service_test.go b/internal/reconcile/service_test.go index c6193000..4a8f4000 100644 --- a/internal/reconcile/service_test.go +++ b/internal/reconcile/service_test.go @@ -102,3 +102,81 @@ func TestAccountValidationStatusTreatsResponsesRaceAsWarning(t *testing.T) { t.Fatalf("accountValidationStatus() = %q, want %q", status, accountStatusWarning) } } + +func TestAccountValidationStatusTreatsTransientProbeFailureAsWarning(t *testing.T) { + t.Parallel() + + status := accountValidationStatus(sub2api.ProbeResult{ + OK: false, + Status: "failed", + Message: "API returned 503: upstream service unavailable, retry later", + }, true) + if status != accountStatusWarning { + t.Fatalf("accountValidationStatus() = %q, want %q", status, accountStatusWarning) + } +} + +func TestNormalizedUniqueAccountIDs(t *testing.T) { + t.Parallel() + + got := normalizedUniqueAccountIDs([]string{" account_1 ", "", "account_2", "account_1", " "}) + want := []string{"account_1", "account_2"} + if len(got) != len(want) { + t.Fatalf("normalizedUniqueAccountIDs() len = %d, want %d; values = %v", len(got), len(want), got) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("normalizedUniqueAccountIDs()[%d] = %q, want %q; values = %v", i, got[i], want[i], got) + } + } +} + +func TestAccessClosureTypeReturnsLatestTrimmedType(t *testing.T) { + t.Parallel() + + if got := accessClosureType(nil); got != "" { + t.Fatalf("accessClosureType(nil) = %q, want empty", got) + } + got := accessClosureType([]sqlite.AccessClosureRecord{ + {ClosureType: " subscription "}, + {ClosureType: " self_service "}, + }) + if got != accessModeSelfService { + t.Fatalf("accessClosureType() = %q, want %q", got, accessModeSelfService) + } +} + +func TestClassifyHistoricalAccountNoiseOnlyKeepsPrefixedOutOfBatchAccounts(t *testing.T) { + t.Parallel() + + got := classifyHistoricalAccountNoise( + []sqlite.ManagedResource{ + {ResourceType: "account", HostResourceID: "account_1"}, + {ResourceType: "group", HostResourceID: "group_1"}, + }, + []sub2api.NamedResource{ + {ID: "account_1", Name: "deepseek-01"}, + {ID: "account_2", Name: "deepseek-02"}, + {ID: "account_3", Name: "other-03"}, + {ID: " ", Name: "deepseek-04"}, + }, + "deepseek-", + ) + if len(got) != 1 { + t.Fatalf("classifyHistoricalAccountNoise() len = %d, want 1; values = %+v", len(got), got) + } + if got[0].ID != "account_2" || got[0].Name != "deepseek-02" { + t.Fatalf("classifyHistoricalAccountNoise()[0] = %+v, want account_2/deepseek-02", got[0]) + } +} + +func TestGatewayAccessReadyRequiresCompletionSuccess(t *testing.T) { + t.Parallel() + + if gatewayAccessReady(sub2api.GatewayAccessResult{OK: true, HasExpectedModel: true}) { + t.Fatal("gatewayAccessReady() = true, want false without completion success") + } + if !gatewayAccessReady(sub2api.GatewayAccessResult{OK: true, HasExpectedModel: true, CompletionOK: true}) { + t.Fatal("gatewayAccessReady() = false, want true when gateway and completion are ready") + } +}