diff --git a/tests/integration/batch_import_v2_test.go b/tests/integration/batch_import_v2_test.go new file mode 100644 index 00000000..8a76e468 --- /dev/null +++ b/tests/integration/batch_import_v2_test.go @@ -0,0 +1,395 @@ +package integration_test + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "sub2api-cn-relay-manager/internal/batch" + "sub2api-cn-relay-manager/internal/host/sub2api" + "sub2api-cn-relay-manager/internal/probe" + "sub2api-cn-relay-manager/internal/store/sqlite" +) + +func TestBatchImportV2(t *testing.T) { + t.Parallel() + + ctx := context.Background() + harness := newBatchImportV2Harness(t) + defer harness.Close(t) + + provisioner := &batchImportProvisionerStub{} + runID, err := harness.RunBatchImport(ctx, provisioner) + if err != nil { + t.Fatalf("RunBatchImport() error = %v", err) + } + + if runID == "" { + t.Fatal("runID = empty, want persisted run") + } + if provisioner.provisionCalls != 3 { + t.Fatalf("provision calls = %d, want 3 for new/advisory/retry items", provisioner.provisionCalls) + } + if provisioner.patchCalls != 1 { + t.Fatalf("patch calls = %d, want 1 for alias-only reuse flow", provisioner.patchCalls) + } + if got := provisioner.lastPatch.Contract.ModelMapping["Kimi-K2.6"]; got != "kimi-2.6" { + t.Fatalf("patch mapping = %#v, want raw alias mapped to canonical family", provisioner.lastPatch.Contract.ModelMapping) + } + + run, err := harness.store.ImportRuns().GetByRunID(ctx, runID) + if err != nil { + t.Fatalf("ImportRuns().GetByRunID() error = %v", err) + } + runView := batch.ProjectRunSummary(run) + if runView.State != string(batch.RunStateCompleted) { + t.Fatalf("run state = %q, want completed", runView.State) + } + if runView.CompletedItems != 6 || runView.ActiveItems != 6 || runView.TotalItems != 6 { + t.Fatalf("run view = %+v, want all 6 items completed and active", runView) + } + + items, err := harness.store.ImportRunItems().ListByRunID(ctx, runID) + if err != nil { + t.Fatalf("ImportRunItems().ListByRunID() error = %v", err) + } + if len(items) != 6 { + t.Fatalf("len(items) = %d, want 6", len(items)) + } + + newItem := findItemByBaseURL(t, items, harness.baseURL+"/new") + if got := batch.ProjectItemSummary(newItem).CanonicalModelFamilies; len(got) != 1 || got[0] != "deepseek-v4-pro" { + t.Fatalf("new item canonical families = %#v, want [deepseek-v4-pro]", got) + } + + activeItem := findItemByBaseURL(t, items, harness.baseURL+"/active") + activeView := batch.ProjectItemSummary(activeItem) + if activeView.MatchedAccountState != string(batch.MatchedAccountStateActive) || activeView.AccountResolution != string(batch.AccountResolutionReused) || !activeView.ProvisionReused { + t.Fatalf("active duplicate projection = %+v, want active/reused/provision_reused", activeView) + } + + deprecatedItem := findItemByBaseURL(t, items, harness.baseURL+"/deprecated") + deprecatedView := batch.ProjectItemSummary(deprecatedItem) + if deprecatedView.MatchedAccountState != string(batch.MatchedAccountStateDeprecated) || deprecatedView.AccountResolution != string(batch.AccountResolutionReactivated) || !deprecatedView.ProvisionReused { + t.Fatalf("deprecated duplicate projection = %+v, want deprecated/reactivated/provision_reused", deprecatedView) + } + + advisoryItem := findItemByBaseURL(t, items, harness.baseURL+"/advisory") + advisoryEvents, err := harness.store.ImportRunEvents().ListByItemID(ctx, advisoryItem.ItemID) + if err != nil { + t.Fatalf("ImportRunEvents().ListByItemID(advisory) error = %v", err) + } + advisoryDetail, err := batch.ProjectItemDetail(advisoryItem, advisoryEvents) + if err != nil { + t.Fatalf("ProjectItemDetail(advisory) error = %v", err) + } + if advisoryDetail.ConfirmationStatus != string(batch.ConfirmationAdvisory) || advisoryDetail.AccessStatus != string(batch.AccessStatusActive) { + t.Fatalf("advisory detail = %+v, want advisory confirmation and active access", advisoryDetail) + } + if !containsString(advisoryDetail.CapabilityProfile.TransportProfile.KnownAdvisories, "initial_probe_race_expected") { + t.Fatalf("advisory capability profile = %+v, want initial_probe_race_expected", advisoryDetail.CapabilityProfile.TransportProfile.KnownAdvisories) + } + if !containsSubstring(advisoryDetail.AdvisoryMessages, "异步探测尚未稳定") { + t.Fatalf("advisory messages = %#v, want mapped probe race advisory", advisoryDetail.AdvisoryMessages) + } + if !containsEventType(advisoryDetail.Events, "advisory_added") { + t.Fatalf("advisory events = %+v, want advisory_added event", advisoryDetail.Events) + } + + retryItem := findItemByBaseURL(t, items, harness.baseURL+"/retry") + retryEvents, err := harness.store.ImportRunEvents().ListByItemID(ctx, retryItem.ItemID) + if err != nil { + t.Fatalf("ImportRunEvents().ListByItemID(retry) error = %v", err) + } + retryDetail, err := batch.ProjectItemDetail(retryItem, retryEvents) + if err != nil { + t.Fatalf("ProjectItemDetail(retry) error = %v", err) + } + if retryDetail.RetryCount != 1 || retryDetail.AccessStatus != string(batch.AccessStatusActive) { + t.Fatalf("retry detail = %+v, want retry_count=1 and active access", retryDetail) + } + if !containsEventType(retryDetail.Events, "retry_scheduled") || !containsEventType(retryDetail.Events, "stage_transition") { + t.Fatalf("retry events = %+v, want retry_scheduled and stage_transition", retryDetail.Events) + } +} + +type batchImportV2Harness struct { + store *sqlite.DB + server *httptest.Server + baseURL string +} + +func newBatchImportV2Harness(t *testing.T) *batchImportV2Harness { + t.Helper() + + store := openTestStore(t) + server := httptest.NewServer(newBatchImportUpstreamMux()) + + return &batchImportV2Harness{ + store: store, + server: server, + baseURL: server.URL, + } +} + +func (h *batchImportV2Harness) Close(t *testing.T) { + t.Helper() + h.server.Close() + closeTestStore(t, h.store) +} + +func (h *batchImportV2Harness) RunBatchImport(ctx context.Context, provisioner *batchImportProvisionerStub) (string, error) { + service := batch.BatchImportService{ + RunStore: h.store.ImportRuns(), + ItemStore: h.store.ImportRunItems(), + ProbeModels: probe.ProviderModels, + ProbeCapabilities: probe.ProbeCapabilities, + InspectReuse: h.inspectReuse, + Provisioner: provisioner, + } + + result, err := service.StartRun(ctx, batch.BatchImportRunRequest{ + RunID: "run-v2-int-001", + HostID: "host-int-1", + Mode: "strict", + AccessMode: "self_service", + Entries: []batch.BatchImportEntry{ + {BaseURL: h.baseURL + "/new", APIKey: "sk-new", RequestedModels: []string{"DeepSeek V4 Pro"}}, + {BaseURL: h.baseURL + "/active", APIKey: "sk-active", RequestedModels: []string{"kimi 2.6"}}, + {BaseURL: h.baseURL + "/deprecated", APIKey: "sk-deprecated", RequestedModels: []string{"kimi 2.6"}}, + {BaseURL: h.baseURL + "/patch", APIKey: "sk-patch", RequestedModels: []string{"kimi 2.6"}}, + {BaseURL: h.baseURL + "/advisory", APIKey: "sk-advisory", RequestedModels: []string{"kimi-k2.6"}}, + {BaseURL: h.baseURL + "/retry", APIKey: "sk-retry", RequestedModels: []string{"kimi-k2.6"}}, + }, + }) + if err != nil { + return "", err + } + + worker := batch.ConfirmationWorker{ + WorkerID: "worker-int-1", + ItemStore: batchImportConfirmationStore{store: h.store, runID: result.RunID}, + EventStore: h.store.ImportRunEvents(), + LeaseDuration: time.Minute, + RetryDelay: time.Second, + Confirmer: h.confirm, + } + now := time.Date(2026, 5, 22, 13, 0, 0, 0, time.UTC) + if err := worker.Tick(ctx, now); err != nil { + return "", err + } + if err := worker.Tick(ctx, now.Add(2*time.Second)); err != nil { + return "", err + } + + items, err := h.store.ImportRunItems().ListByRunID(ctx, result.RunID) + if err != nil { + return "", err + } + validator := batch.ValidationService{ + ItemStore: h.store.ImportRunItems(), + RunStore: h.store.ImportRuns(), + Validator: h.validate, + } + for _, item := range items { + if item.CurrentStage != string(batch.ItemStageValidate) { + continue + } + if err := validator.ValidateItem(ctx, item); err != nil { + return "", err + } + } + + return result.RunID, nil +} + +func (h *batchImportV2Harness) inspectReuse(_ context.Context, input batch.ReuseLookupInput) (batch.ReuseLookupResult, error) { + switch { + case strings.HasSuffix(input.BaseURL, "/active"): + return batch.ReuseLookupResult{ + ExistingProviderID: batch.NormalizeProviderID(input.BaseURL), + ExistingAccessStatus: batch.AccessStatusActive, + ExistingCanonicalFamilys: []string{"kimi 2.6"}, + MatchedAccountID: 201, + MatchedAccountState: batch.MatchedAccountStateActive, + }, nil + case strings.HasSuffix(input.BaseURL, "/deprecated"): + return batch.ReuseLookupResult{ + ExistingProviderID: batch.NormalizeProviderID(input.BaseURL), + ExistingAccessStatus: batch.AccessStatusActive, + ExistingCanonicalFamilys: []string{"kimi 2.6"}, + MatchedAccountID: 301, + MatchedAccountState: batch.MatchedAccountStateDeprecated, + }, nil + case strings.HasSuffix(input.BaseURL, "/patch"): + return batch.ReuseLookupResult{ + ExistingProviderID: batch.NormalizeProviderID(input.BaseURL), + ExistingAccessStatus: batch.AccessStatusActive, + ExistingCanonicalFamilys: []string{"kimi 2.6"}, + MatchedAccountID: 401, + MatchedAccountState: batch.MatchedAccountStateActive, + ExistingModelMapping: map[string]string{"kimi-k2.6": "kimi-2.6"}, + }, nil + default: + return batch.ReuseLookupResult{}, nil + } +} + +func (h *batchImportV2Harness) confirm(_ context.Context, item sqlite.ImportRunItem) (batch.ConfirmationResult, error) { + switch { + case strings.HasSuffix(item.BaseURL, "/advisory"): + return batch.ConfirmationResult{StatusCode: http.StatusForbidden, Message: "probe race expected"}, nil + case strings.HasSuffix(item.BaseURL, "/retry") && item.ConfirmationAttempts == 0: + return batch.ConfirmationResult{StatusCode: http.StatusServiceUnavailable, Message: "no available accounts"}, nil + default: + return batch.ConfirmationResult{StatusCode: http.StatusOK, Message: "confirmation succeeded"}, nil + } +} + +func (h *batchImportV2Harness) validate(_ context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) { + return sub2api.GatewayCompletionResult{ + OK: true, + StatusCode: http.StatusOK, + ContentType: "application/json", + BodyPreview: fmt.Sprintf(`{"item_id":%q,"status":"ok"}`, item.ItemID), + }, nil +} + +type batchImportProvisionerStub struct { + provisionCalls int + patchCalls int + lastPatch batch.PatchProvisionRequest +} + +func (p *batchImportProvisionerStub) Provision(_ context.Context, req batch.ProvisionRequest) (batch.ProvisionResult, error) { + p.provisionCalls++ + legacyBatchID := int64(800 + p.provisionCalls) + return batch.ProvisionResult{ + LegacyBatchID: &legacyBatchID, + LegacyProviderID: req.ProviderID, + }, nil +} + +func (p *batchImportProvisionerStub) Patch(_ context.Context, req batch.PatchProvisionRequest) error { + p.patchCalls++ + p.lastPatch = req + return nil +} + +type batchImportConfirmationStore struct { + store *sqlite.DB + runID string +} + +func (s batchImportConfirmationStore) List(ctx context.Context) ([]sqlite.ImportRunItem, error) { + return s.store.ImportRunItems().ListByRunID(ctx, s.runID) +} + +func (s batchImportConfirmationStore) Upsert(ctx context.Context, item sqlite.ImportRunItem) error { + return s.store.ImportRunItems().Upsert(ctx, item) +} + +func newBatchImportUpstreamMux() http.Handler { + modelsByToken := map[string][]string{ + "sk-new": {"deepseek-ai/DeepSeek-V4-Pro"}, + "sk-active": {"kimi-k2.6"}, + "sk-deprecated": {"kimi-k2.6"}, + "sk-patch": {"Kimi-K2.6"}, + "sk-advisory": {"kimi-k2.6"}, + "sk-retry": {"kimi-k2.6"}, + } + + mux := http.NewServeMux() + mux.HandleFunc("/v1/models", func(w http.ResponseWriter, r *http.Request) { + token := strings.TrimSpace(strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")) + models, ok := modelsByToken[token] + if !ok { + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte(`{"error":"unauthorized"}`)) + return + } + + data := make([]map[string]any, 0, len(models)) + for _, model := range models { + data = append(data, map[string]any{"id": model}) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(mustJSON(data))) + }) + mux.HandleFunc("/v1/responses", func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusForbidden) + _, _ = w.Write([]byte(`{"error":"responses unsupported"}`)) + }) + mux.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) { + token := strings.TrimSpace(strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")) + if _, ok := modelsByToken[token]; !ok { + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte(`{"error":"unauthorized"}`)) + return + } + if token == "sk-advisory" { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusForbidden) + _, _ = w.Write([]byte(`{"error":"probe race expected"}`)) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"id":"chatcmpl_batch_import_v2","choices":[{"index":0,"message":{"role":"assistant","content":"pong"}}]}`)) + }) + + return mux +} + +func findItemByBaseURL(t *testing.T, items []sqlite.ImportRunItem, baseURL string) sqlite.ImportRunItem { + t.Helper() + + for _, item := range items { + if item.BaseURL == baseURL { + return item + } + } + t.Fatalf("item with base_url %q not found in %#v", baseURL, items) + return sqlite.ImportRunItem{} +} + +func containsString(values []string, want string) bool { + for _, value := range values { + if value == want { + return true + } + } + return false +} + +func containsSubstring(values []string, fragment string) bool { + for _, value := range values { + if strings.Contains(value, fragment) { + return true + } + } + return false +} + +func containsEventType(events []batch.EventProjection, want string) bool { + for _, event := range events { + if event.EventType == want { + return true + } + } + return false +} + +func mustJSON(data []map[string]any) string { + values := make([]string, 0, len(data)) + for _, item := range data { + values = append(values, fmt.Sprintf(`{"id":%q}`, item["id"])) + } + return `{"data":[` + strings.Join(values, ",") + `]}` +}