Files
sub2api-cn-relay-manager/internal/app/reconcile_background_test.go
2026-05-23 09:39:02 +08:00

533 lines
17 KiB
Go

package app
import (
"context"
"errors"
"fmt"
"net/http/httptest"
"path/filepath"
"strings"
"testing"
"time"
"sub2api-cn-relay-manager/internal/pack"
"sub2api-cn-relay-manager/internal/provision"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
func TestRunReconcileBackgroundSweepCreatesReconcileRunForLatestSuccessfulBatch(t *testing.T) {
t.Parallel()
server := httptest.NewServer(newBatchImportActionStubServer(t))
defer server.Close()
store := openReconcileBackgroundTestStore(t)
defer closeAppTestStore(t, store)
batchID, hostPK, _ := seedReconcileBackgroundRuntimeImport(t, store, server.URL)
if err := runReconcileBackgroundSweep(context.Background(), store, 10*time.Minute, time.Now()); err != nil {
t.Fatalf("runReconcileBackgroundSweep() error = %v", err)
}
providers, err := store.Providers().ListByProviderID(context.Background(), "deepseek")
if err != nil {
t.Fatalf("Providers().ListByProviderID() error = %v", err)
}
runs, err := store.ReconcileRuns().GetByProviderIDAndHostID(context.Background(), providers[0].ID, hostPK)
if err != nil {
t.Fatalf("ReconcileRuns().GetByProviderIDAndHostID() error = %v", err)
}
if len(runs) != 1 {
t.Fatalf("reconcile runs = %d, want 1", len(runs))
}
if runs[0].BatchID != batchID {
t.Fatalf("reconcile batch_id = %d, want %d", runs[0].BatchID, batchID)
}
if runs[0].Status == "" {
t.Fatal("reconcile status = empty, want persisted result")
}
}
func TestRunReconcileBackgroundSweepSkipsRecentReconcileRun(t *testing.T) {
t.Parallel()
server := httptest.NewServer(newBatchImportActionStubServer(t))
defer server.Close()
store := openReconcileBackgroundTestStore(t)
defer closeAppTestStore(t, store)
batchID, hostPK, providerPK := seedReconcileBackgroundRuntimeImport(t, store, server.URL)
if _, err := store.ReconcileRuns().Create(context.Background(), sqlite.ReconcileRun{
BatchID: batchID,
HostID: hostPK,
ProviderID: providerPK,
Status: "active",
SummaryJSON: `{"seed":true}`,
}); err != nil {
t.Fatalf("ReconcileRuns().Create() error = %v", err)
}
if err := runReconcileBackgroundSweep(context.Background(), store, 10*time.Minute, time.Now()); err != nil {
t.Fatalf("runReconcileBackgroundSweep() error = %v", err)
}
runs, err := store.ReconcileRuns().GetByProviderIDAndHostID(context.Background(), providerPK, hostPK)
if err != nil {
t.Fatalf("ReconcileRuns().GetByProviderIDAndHostID() error = %v", err)
}
if len(runs) != 1 {
t.Fatalf("reconcile runs = %d, want 1 recent run only", len(runs))
}
}
func openReconcileBackgroundTestStore(t *testing.T) *sqlite.DB {
t.Helper()
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_pragma=foreign_keys(0)", filepath.ToSlash(filepath.Join(t.TempDir(), "state.db")))
store, err := sqlite.Open(context.Background(), dsn)
if err != nil {
t.Fatalf("sqlite.Open() error = %v", err)
}
if _, err := store.SQLDB().Exec("PRAGMA foreign_keys = OFF"); err != nil {
t.Fatalf("disable foreign keys pragma error = %v", err)
}
return store
}
func seedReconcileBackgroundRuntimeImport(t *testing.T, store *sqlite.DB, baseURL string) (int64, int64, int64) {
t.Helper()
hostPK, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "host-1",
BaseURL: baseURL,
HostVersion: "0.1.126",
CapabilityProbeJSON: "{}",
AuthType: "apikey",
AuthToken: "host-token",
})
if err != nil {
t.Fatalf("Hosts().Create() error = %v", err)
}
client, err := newSub2APIClient(baseURL, CreateHostAuth{Type: "apikey", Token: "host-token"})
if err != nil {
t.Fatalf("newSub2APIClient() error = %v", err)
}
loadedPack := pack.LoadedPack{
Manifest: pack.Manifest{
PackID: "openai-cn-pack",
Version: "1.0.0",
Vendor: "OpenAI CN",
TargetHost: "sub2api",
MinHostVersion: "0.1.126",
MaxHostVersion: "0.2.x",
},
Providers: []pack.ProviderManifest{{
ProviderID: "deepseek",
DisplayName: "DeepSeek",
BaseURL: "https://api.deepseek.example",
Platform: "openai",
AccountType: "openai",
DefaultModels: []string{"kimi-k2.6"},
SmokeTestModel: "kimi-k2.6",
GroupTemplate: pack.GroupTemplate{Name: "DeepSeek 默认分组", RateMultiplier: 1},
ChannelTemplate: pack.ChannelTemplate{
Name: "DeepSeek 默认渠道",
ModelMapping: map[string]string{"kimi-k2.6": "kimi-k2.6"},
},
PlanTemplate: pack.PlanTemplate{Name: "DeepSeek 默认套餐", Price: 0, ValidityDays: 30, ValidityUnit: "day"},
Import: pack.ImportOptions{SupportsMultiKey: true, SupportsStrict: true, SupportsPartial: true},
}},
Checksum: "checksum-1",
}
result, err := provision.NewRuntimeImportService(store, client).Import(context.Background(), provision.RuntimeImportRequest{
HostID: "host-1",
HostBaseURL: baseURL,
Pack: loadedPack,
Provider: loadedPack.Providers[0],
Mode: provision.ImportModePartial,
Keys: []string{"entry-key"},
Access: provision.AccessRequest{
Mode: provision.AccessModeSelfService,
ProbeAPIKey: "gateway-key",
},
})
if err != nil {
t.Fatalf("RuntimeImportService.Import() error = %v", err)
}
packRow, err := store.Packs().GetByPackID(context.Background(), loadedPack.Manifest.PackID)
if err != nil {
t.Fatalf("Packs().GetByPackID() error = %v", err)
}
providerRow, err := store.Providers().GetByPackIDAndProviderID(context.Background(), packRow.ID, loadedPack.Providers[0].ProviderID)
if err != nil {
t.Fatalf("Providers().GetByPackIDAndProviderID() error = %v", err)
}
return result.BatchID, hostPK, providerRow.ID
}
func TestRunReconcileBackgroundSweepRequiresStore(t *testing.T) {
t.Parallel()
err := runReconcileBackgroundSweep(context.Background(), nil, time.Minute, time.Now())
if err == nil || err.Error() != "store is required" {
t.Fatalf("runReconcileBackgroundSweep() error = %v, want store is required", err)
}
}
func TestRunReconcileBackgroundSweepReturnsContextErrorBeforeCandidateRun(t *testing.T) {
t.Parallel()
store := openReconcileBackgroundTestStore(t)
defer closeAppTestStore(t, store)
seedReconcileBackgroundBatch(t, store)
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := runReconcileBackgroundSweep(ctx, store, time.Minute, time.Now())
if !errors.Is(err, context.Canceled) {
t.Fatalf("runReconcileBackgroundSweep() error = %v, want wrapped %v", err, context.Canceled)
}
}
func TestRunReconcileBackgroundSweepReturnsJoinedCandidateErrors(t *testing.T) {
t.Parallel()
store := openReconcileBackgroundTestStore(t)
defer closeAppTestStore(t, store)
batchID, _, _ := seedReconcileBackgroundBatch(t, store)
err := runReconcileBackgroundSweep(context.Background(), store, time.Minute, time.Now())
if err == nil {
t.Fatal("runReconcileBackgroundSweep() error = nil, want candidate failure")
}
want := fmt.Sprintf("run reconcile for batch %d: access closure not found for batch %d", batchID, batchID)
if !strings.Contains(err.Error(), want) {
t.Fatalf("runReconcileBackgroundSweep() error = %v, want contains %q", err, want)
}
}
func TestReconcileRunDue(t *testing.T) {
t.Parallel()
now := time.Date(2026, 5, 23, 10, 0, 0, 0, time.UTC)
tests := []struct {
name string
run *sqlite.ReconcileRun
interval time.Duration
want bool
}{
{name: "nil run", run: nil, interval: time.Minute, want: true},
{name: "non positive interval", run: &sqlite.ReconcileRun{CreatedAt: "2026-05-23 09:59:59"}, interval: 0, want: true},
{name: "invalid timestamp", run: &sqlite.ReconcileRun{CreatedAt: "bad-time"}, interval: time.Minute, want: true},
{name: "recent run not due", run: &sqlite.ReconcileRun{CreatedAt: "2026-05-23 09:59:30"}, interval: time.Minute, want: false},
{name: "old run due", run: &sqlite.ReconcileRun{CreatedAt: "2026-05-23 09:58:00"}, interval: time.Minute, want: true},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
if got := reconcileRunDue(now, tc.run, tc.interval); got != tc.want {
t.Fatalf("reconcileRunDue() = %v, want %v", got, tc.want)
}
})
}
}
func TestParseAccessClosureDetailsReturnsEmptyMapForInvalidJSON(t *testing.T) {
t.Parallel()
got := parseAccessClosureDetails("{")
if len(got) != 0 {
t.Fatalf("parseAccessClosureDetails() = %#v, want empty map", got)
}
}
func TestParseJSONStringArrayAndParseJSONInt(t *testing.T) {
t.Parallel()
values := parseJSONStringArray([]any{" user-1 ", 42, "", "user-2"})
if len(values) != 2 || values[0] != "user-1" || values[1] != "user-2" {
t.Fatalf("parseJSONStringArray() = %v, want [user-1 user-2]", values)
}
if got := parseJSONStringArray("wrong-type"); got != nil {
t.Fatalf("parseJSONStringArray(wrong-type) = %v, want nil", got)
}
if got := parseJSONInt(float64(30)); got != 30 {
t.Fatalf("parseJSONInt(float64) = %d, want 30", got)
}
if got := parseJSONInt(15); got != 15 {
t.Fatalf("parseJSONInt(int) = %d, want 15", got)
}
if got := parseJSONInt("30"); got != 0 {
t.Fatalf("parseJSONInt(string) = %d, want 0", got)
}
}
func TestStoredLoadedPackFallsBackToColumns(t *testing.T) {
t.Parallel()
loaded, err := storedLoadedPack(sqlite.Pack{
PackID: "openai-cn-pack",
Version: "1.0.0",
Checksum: "checksum-1",
Vendor: "OpenAI CN",
TargetHost: "sub2api",
MinHostVersion: "0.1.126",
MaxHostVersion: "0.2.x",
ManifestJSON: "{}",
})
if err != nil {
t.Fatalf("storedLoadedPack() error = %v", err)
}
if loaded.Manifest.PackID != "openai-cn-pack" || loaded.Manifest.TargetHost != "sub2api" || loaded.Checksum != "checksum-1" {
t.Fatalf("storedLoadedPack() = %+v, want fallback fields populated", loaded)
}
}
func TestStoredLoadedPackRejectsInvalidManifestJSON(t *testing.T) {
t.Parallel()
_, err := storedLoadedPack(sqlite.Pack{ManifestJSON: "{"})
if err == nil || !strings.Contains(err.Error(), "decode stored pack manifest") {
t.Fatalf("storedLoadedPack() error = %v, want decode stored pack manifest", err)
}
}
func TestStoredProviderManifestFallsBackToColumns(t *testing.T) {
t.Parallel()
provider, err := storedProviderManifest(sqlite.Provider{
ProviderID: "deepseek",
DisplayName: "DeepSeek",
BaseURL: "https://api.example.com",
Platform: "openai",
AccountType: "openai",
SmokeTestModel: "deepseek-chat",
ManifestJSON: "{}",
})
if err != nil {
t.Fatalf("storedProviderManifest() error = %v", err)
}
if provider.ProviderID != "deepseek" || provider.AccountType != "openai" || provider.SmokeTestModel != "deepseek-chat" {
t.Fatalf("storedProviderManifest() = %+v, want fallback fields populated", provider)
}
}
func TestStoredProviderManifestRejectsInvalidManifestJSON(t *testing.T) {
t.Parallel()
_, err := storedProviderManifest(sqlite.Provider{ManifestJSON: "{"})
if err == nil || !strings.Contains(err.Error(), "decode stored provider manifest") {
t.Fatalf("storedProviderManifest() error = %v, want decode stored provider manifest", err)
}
}
func TestResolveManagedResourceHostIDByBatch(t *testing.T) {
t.Parallel()
store := openReconcileBackgroundTestStore(t)
defer closeAppTestStore(t, store)
batchID, hostPK, _ := seedReconcileBackgroundBatch(t, store)
if _, err := store.ManagedResources().Create(context.Background(), sqlite.ManagedResource{
BatchID: batchID,
HostID: hostPK,
ResourceType: "group",
HostResourceID: "group_1",
ResourceName: "group one",
}); err != nil {
t.Fatalf("ManagedResources().Create() error = %v", err)
}
groupID, err := resolveManagedResourceHostIDByBatch(context.Background(), store, batchID, "group")
if err != nil {
t.Fatalf("resolveManagedResourceHostIDByBatch() error = %v", err)
}
if groupID != "group_1" {
t.Fatalf("groupID = %q, want group_1", groupID)
}
if _, err := resolveManagedResourceHostIDByBatch(context.Background(), store, batchID, "plan"); err == nil || err.Error() != fmt.Sprintf("managed resource %q not found for batch %d", "plan", batchID) {
t.Fatalf("resolveManagedResourceHostIDByBatch(plan) error = %v, want missing resource error", err)
}
}
func TestReconcileProbeAPIKeySelfService(t *testing.T) {
t.Parallel()
store := openReconcileBackgroundTestStore(t)
defer closeAppTestStore(t, store)
batchID, _, _ := seedReconcileBackgroundBatch(t, store)
hostRow := mustGetBackgroundHost(t, store)
tests := []struct {
name string
record sqlite.AccessClosureRecord
wantAPIKey string
wantErr string
}{
{
name: "prefers access api key",
record: sqlite.AccessClosureRecord{
BatchID: batchID,
ClosureType: provision.AccessModeSelfService,
Status: "self_service_ready",
DetailsJSON: `{"access_api_key":" access-key ","probe_api_key":"probe-key"}`,
},
wantAPIKey: "access-key",
},
{
name: "falls back to probe api key",
record: sqlite.AccessClosureRecord{
BatchID: batchID,
ClosureType: provision.AccessModeSelfService,
Status: "self_service_ready",
DetailsJSON: `{"probe_api_key":" probe-key "}`,
},
wantAPIKey: "probe-key",
},
{
name: "requires api key",
record: sqlite.AccessClosureRecord{
BatchID: batchID,
ClosureType: provision.AccessModeSelfService,
Status: "self_service_ready",
DetailsJSON: `{}`,
},
wantErr: "self_service access closure missing probe api key",
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
got, err := reconcileProbeAPIKey(context.Background(), store, hostRow, sqlite.ImportBatch{ID: batchID}, []sqlite.AccessClosureRecord{tc.record})
if tc.wantErr != "" {
if err == nil || err.Error() != tc.wantErr {
t.Fatalf("reconcileProbeAPIKey() error = %v, want %q", err, tc.wantErr)
}
return
}
if err != nil {
t.Fatalf("reconcileProbeAPIKey() error = %v", err)
}
if got != tc.wantAPIKey {
t.Fatalf("reconcileProbeAPIKey() = %q, want %q", got, tc.wantAPIKey)
}
})
}
}
func TestReconcileProbeAPIKeyRejectsMissingSubscriptionUsers(t *testing.T) {
t.Parallel()
store := openReconcileBackgroundTestStore(t)
defer closeAppTestStore(t, store)
batchID, _, _ := seedReconcileBackgroundBatch(t, store)
hostRow := mustGetBackgroundHost(t, store)
_, err := reconcileProbeAPIKey(context.Background(), store, hostRow, sqlite.ImportBatch{ID: batchID}, []sqlite.AccessClosureRecord{{
BatchID: batchID,
ClosureType: provision.AccessModeSubscription,
Status: "subscription_ready",
DetailsJSON: `{}`,
}})
if err == nil || err.Error() != "subscription access closure missing subscription_users" {
t.Fatalf("reconcileProbeAPIKey() error = %v, want missing subscription_users", err)
}
}
func TestReconcileProbeAPIKeyRejectsUnsupportedClosureType(t *testing.T) {
t.Parallel()
store := openReconcileBackgroundTestStore(t)
defer closeAppTestStore(t, store)
batchID, _, _ := seedReconcileBackgroundBatch(t, store)
hostRow := mustGetBackgroundHost(t, store)
_, err := reconcileProbeAPIKey(context.Background(), store, hostRow, sqlite.ImportBatch{ID: batchID}, []sqlite.AccessClosureRecord{{
BatchID: batchID,
ClosureType: "other",
Status: "unknown",
}})
if err == nil || err.Error() != `unsupported access closure type "other"` {
t.Fatalf("reconcileProbeAPIKey() error = %v, want unsupported type", err)
}
}
func mustGetBackgroundHost(t *testing.T, store *sqlite.DB) sqlite.Host {
t.Helper()
host, err := store.Hosts().GetByHostID(context.Background(), "host-1")
if err != nil {
t.Fatalf("Hosts().GetByHostID() error = %v", err)
}
return host
}
func seedReconcileBackgroundBatch(t *testing.T, store *sqlite.DB) (int64, int64, int64) {
t.Helper()
hostPK, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "host-1",
BaseURL: "https://sub2api.example.com",
HostVersion: "0.1.126",
CapabilityProbeJSON: "{}",
AuthType: "apikey",
AuthToken: "host-token",
})
if err != nil {
t.Fatalf("Hosts().Create() error = %v", err)
}
packPK, err := store.Packs().Create(context.Background(), sqlite.Pack{
PackID: "openai-cn-pack",
Version: "1.0.0",
Checksum: "checksum-1",
Vendor: "OpenAI CN",
TargetHost: "sub2api",
MinHostVersion: "0.1.126",
MaxHostVersion: "0.2.x",
ManifestJSON: `{"pack_id":"openai-cn-pack","version":"1.0.0","target_host":"sub2api"}`,
})
if err != nil {
t.Fatalf("Packs().Create() error = %v", err)
}
providerPK, err := store.Providers().Create(context.Background(), sqlite.Provider{
PackID: packPK,
ProviderID: "deepseek",
DisplayName: "DeepSeek",
BaseURL: "https://api.example.com",
Platform: "openai",
AccountType: "openai",
SmokeTestModel: "deepseek-chat",
ManifestJSON: `{"provider_id":"deepseek","base_url":"https://api.example.com","platform":"openai","account_type":"openai","smoke_test_model":"deepseek-chat"}`,
})
if err != nil {
t.Fatalf("Providers().Create() error = %v", err)
}
batchID, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{
HostID: hostPK,
PackID: packPK,
ProviderID: providerPK,
Mode: provision.ImportModePartial,
BatchStatus: "partially_succeeded",
AccessStatus: "self_service_ready",
})
if err != nil {
t.Fatalf("ImportBatches().Create() error = %v", err)
}
return batchID, hostPK, providerPK
}