package app import ( "context" "fmt" "net/http" "net/http/httptest" "strings" "testing" "sub2api-cn-relay-manager/internal/store/sqlite" "sub2api-cn-relay-manager/internal/testutil" ) func TestBatchImportHTTP(t *testing.T) { t.Parallel() t.Run("POST create run returns run summary", func(t *testing.T) { t.Parallel() handler := NewAPIHandler("secret-token", ActionSet{ CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { if req.HostID != "host-1" { t.Fatalf("HostID = %q, want host-1", req.HostID) } if req.AccessMode != "subscription" { t.Fatalf("AccessMode = %q, want subscription", req.AccessMode) } if len(req.SubscriptionUsers) != 1 || req.SubscriptionUsers[0] != "user-1" { t.Fatalf("SubscriptionUsers = %#v, want [user-1]", req.SubscriptionUsers) } if len(req.Entries) != 1 || req.Entries[0].BaseURL != "https://kimi.example.com/v1" { t.Fatalf("Entries = %#v, want request payload", req.Entries) } return BatchImportRunCreateResponse{ RunID: "run_20260522_0001", State: "running", ResultPage: "/batch-import/runs/run_20260522_0001", TotalItems: 1, ActiveItems: 0, DegradedItems: 0, BrokenItems: 0, WarningItems: 0, }, nil }, }) req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{ "host_id": "host-1", "mode": "strict", "access_mode": "subscription", "subscription_users": []string{"user-1"}, "subscription_days": 30, "entries": []map[string]any{ {"base_url": "https://kimi.example.com/v1", "api_key": "sk-test", "requested_models": []string{"kimi-k2.6"}}, }, }, "secret-token") res := httptestRecorder(handler, req) assertStatusCode(t, res, http.StatusOK) assertJSONContains(t, res.Body().Bytes(), "run_id", "run_20260522_0001") assertJSONContains(t, res.Body().Bytes(), "result_page", "/batch-import/runs/run_20260522_0001") }) t.Run("subscription request requires subscription fields", func(t *testing.T) { t.Parallel() handler := NewAPIHandler("secret-token", ActionSet{ CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { t.Fatal("CreateBatchImportRun should not be called when request is invalid") return BatchImportRunCreateResponse{}, nil }, }) req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{ "host_id": "host-1", "mode": "strict", "access_mode": "subscription", "entries": []map[string]any{ {"base_url": "https://kimi.example.com/v1", "api_key": "sk-test"}, }, }, "secret-token") res := httptestRecorder(handler, req) assertStatusCode(t, res, http.StatusBadRequest) assertJSONContains(t, res.Body().Bytes(), "error.code", "invalid_request") }) t.Run("self service request requires probe api key", func(t *testing.T) { t.Parallel() handler := NewAPIHandler("secret-token", ActionSet{ CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { t.Fatal("CreateBatchImportRun should not be called when request is invalid") return BatchImportRunCreateResponse{}, nil }, }) req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{ "host_id": "host-1", "mode": "partial", "access_mode": "self_service", "entries": []map[string]any{ {"base_url": "https://deepseek.example.com/v1", "api_key": "sk-test"}, }, }, "secret-token") res := httptestRecorder(handler, req) assertStatusCode(t, res, http.StatusBadRequest) assertJSONContains(t, res.Body().Bytes(), "error.code", "invalid_request") assertJSONContains(t, res.Body().Bytes(), "error.message", "probe_api_key is required when access_mode=self_service") }) t.Run("create run requires host id", func(t *testing.T) { t.Parallel() handler := NewAPIHandler("secret-token", ActionSet{ CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { t.Fatal("CreateBatchImportRun should not be called when host_id is missing") return BatchImportRunCreateResponse{}, nil }, }) req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{ "mode": "strict", "access_mode": "subscription", "subscription_users": []string{"user-1"}, "subscription_days": 30, "entries": []map[string]any{ {"base_url": "https://kimi.example.com/v1", "api_key": "sk-test"}, }, }, "secret-token") res := httptestRecorder(handler, req) assertStatusCode(t, res, http.StatusBadRequest) assertJSONContains(t, res.Body().Bytes(), "error.code", "invalid_request") assertJSONContains(t, res.Body().Bytes(), "error.message", "host_id is required") }) t.Run("create run action wires real batch pipeline", func(t *testing.T) { t.Parallel() server := httptest.NewServer(newBatchImportActionStubServer(t)) defer server.Close() dsn := testutil.SQLiteTestDSN(t, "state.db", true) store := testutil.OpenSQLiteStore(t, dsn) defer closeAppTestStore(t, store) if _, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "host-1", BaseURL: server.URL, HostVersion: "0.1.126", CapabilityProbeJSON: "{}", AuthType: "apikey", AuthToken: "host-token", }); err != nil { t.Fatalf("Hosts().Create() error = %v", err) } action := buildCreateBatchImportRunAction(dsn) result, err := action(context.Background(), CreateBatchImportRunRequest{ HostID: "host-1", Mode: "strict", AccessMode: "self_service", ConfirmWaitTimeoutSec: 1, ProbeAPIKey: "gateway-key", Entries: []BatchImportEntryRequest{ { BaseURL: server.URL, APIKey: "entry-key", RequestedModels: []string{"kimi-k2.6"}, }, }, }) if err != nil { t.Fatalf("buildCreateBatchImportRunAction() error = %v", err) } if result.State != string("completed") { t.Fatalf("result.State = %q, want completed", result.State) } if result.ActiveItems != 1 { t.Fatalf("result.ActiveItems = %d, want 1", result.ActiveItems) } run, err := store.ImportRuns().GetByRunID(context.Background(), result.RunID) if err != nil { t.Fatalf("ImportRuns().GetByRunID() error = %v", err) } if run.State != "completed" { t.Fatalf("run.State = %q, want completed", run.State) } items, err := store.ImportRunItems().ListByRunID(context.Background(), result.RunID) if err != nil { t.Fatalf("ImportRunItems().ListByRunID() error = %v", err) } if len(items) != 1 { t.Fatalf("len(items) = %d, want 1", len(items)) } if items[0].CurrentStage != "done" || items[0].AccessStatus != "active" { t.Fatalf("item = %+v, want current_stage=done and access_status=active", items[0]) } }) } func TestBatchImportWrapperFunctions(t *testing.T) { t.Parallel() t.Run("handleCreateBatchImportRun requires action", func(t *testing.T) { t.Parallel() req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{}, "") rec := &responseRecorder{header: map[string][]string{}} handleCreateBatchImportRun(rec, req, nil) assertStatusCode(t, rec, http.StatusInternalServerError) assertJSONContains(t, rec.Body().Bytes(), "error.code", "server_misconfigured") }) t.Run("handleCreateBatchImportRun classifies action error", func(t *testing.T) { t.Parallel() req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{ "host_id": "host-1", "mode": "strict", "access_mode": "self_service", "probe_api_key": "probe-key", "entries": []map[string]any{ {"base_url": "https://kimi.example.com/v1", "api_key": "sk-test"}, }, }, "") rec := &responseRecorder{header: map[string][]string{}} handleCreateBatchImportRun(rec, req, func(context.Context, CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { return BatchImportRunCreateResponse{}, fmt.Errorf("host x not found") }) assertStatusCode(t, rec, http.StatusNotFound) assertJSONContains(t, rec.Body().Bytes(), "error.code", "not_found") }) t.Run("handleListBatchImportRuns requires action", func(t *testing.T) { t.Parallel() req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs", nil, "") rec := &responseRecorder{header: map[string][]string{}} handleListBatchImportRuns(rec, req, nil) assertStatusCode(t, rec, http.StatusInternalServerError) assertJSONContains(t, rec.Body().Bytes(), "error.code", "server_misconfigured") }) t.Run("handleListBatchImportRuns returns empty array", func(t *testing.T) { t.Parallel() req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs?limit=5", nil, "") rec := &responseRecorder{header: map[string][]string{}} handleListBatchImportRuns(rec, req, func(_ context.Context, got ListBatchImportRunsRequest) (ListBatchImportRunsResponse, error) { if got.Limit != 5 { t.Fatalf("ListBatchImportRunsRequest.Limit = %d, want 5", got.Limit) } return ListBatchImportRunsResponse{}, nil }) assertStatusCode(t, rec, http.StatusOK) runs, ok := decodeTopLevelArray(t, rec.Body().Bytes(), "runs") if !ok || len(runs) != 0 { t.Fatalf("runs = %#v, want empty array", runs) } }) } func TestBatchImportRejectsOversizedJSONBody(t *testing.T) { t.Parallel() handler := NewAPIHandler("secret-token", ActionSet{ CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { t.Fatal("CreateBatchImportRun should not be called for oversized body") return BatchImportRunCreateResponse{}, nil }, }) payload := `{"host_id":"host-1","mode":"strict","access_mode":"self_service","probe_api_key":"probe-key","entries":[{"base_url":"https://kimi.example.com/v1","api_key":"` + strings.Repeat("x", int(maxJSONBodyBytes)) + `"}]}` req := httptest.NewRequest(http.MethodPost, "/api/batch-import/runs", strings.NewReader(payload)) req.Header.Set("Authorization", "Bearer secret-token") res := httptestRecorder(handler, req) assertStatusCode(t, res, http.StatusRequestEntityTooLarge) assertJSONContains(t, res.Body().Bytes(), "error.code", "request_too_large") } func newBatchImportActionStubServer(t *testing.T) http.Handler { t.Helper() mux := http.NewServeMux() mux.HandleFunc("/api/v1/admin/system/version", func(w http.ResponseWriter, r *http.Request) { if !requireBatchImportActionAdminToken(t, w, r) { return } writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"version": "0.1.126"}}) }) mux.HandleFunc("/api/v1/admin/groups", func(w http.ResponseWriter, r *http.Request) { if !requireBatchImportActionAdminToken(t, w, r) { return } switch r.Method { case http.MethodGet: writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{}}) case http.MethodPost: writeJSON(w, http.StatusCreated, map[string]any{"data": map[string]any{"id": "group_1", "name": "batch-import-group"}}) default: http.NotFound(w, r) } }) mux.HandleFunc("/api/v1/admin/channels", func(w http.ResponseWriter, r *http.Request) { if !requireBatchImportActionAdminToken(t, w, r) { return } switch r.Method { case http.MethodGet: writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{}}) case http.MethodPost: writeJSON(w, http.StatusCreated, map[string]any{"data": map[string]any{"id": "channel_1", "name": "batch-import-channel"}}) default: http.NotFound(w, r) } }) mux.HandleFunc("/api/v1/admin/payment/plans", func(w http.ResponseWriter, r *http.Request) { if !requireBatchImportActionAdminToken(t, w, r) { return } switch r.Method { case http.MethodGet: writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{}}) case http.MethodPost: writeJSON(w, http.StatusCreated, map[string]any{"data": map[string]any{"id": "plan_1", "name": "batch-import-plan"}}) default: http.NotFound(w, r) } }) mux.HandleFunc("/api/v1/admin/accounts", func(w http.ResponseWriter, r *http.Request) { if !requireBatchImportActionAdminToken(t, w, r) { return } writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{}, "pages": 1}}) }) mux.HandleFunc("/api/v1/admin/accounts/batch", func(w http.ResponseWriter, r *http.Request) { if !requireBatchImportActionAdminToken(t, w, r) { return } writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{{"id": "account_1", "name": "batch-import-01"}}}) }) mux.HandleFunc("/api/v1/admin/accounts/__probe__/test", func(w http.ResponseWriter, r *http.Request) { if !requireBatchImportActionAdminToken(t, w, r) { return } writeJSON(w, http.StatusBadRequest, map[string]any{"error": "probe only"}) }) mux.HandleFunc("/api/v1/admin/accounts/__probe__/models", func(w http.ResponseWriter, r *http.Request) { if !requireBatchImportActionAdminToken(t, w, r) { return } writeJSON(w, http.StatusBadRequest, map[string]any{"error": "probe only"}) }) mux.HandleFunc("/api/v1/admin/accounts/account_1/test", func(w http.ResponseWriter, r *http.Request) { if !requireBatchImportActionAdminToken(t, w, r) { return } w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("event: result\n")) _, _ = w.Write([]byte("data: {\"status\":\"passed\",\"message\":\"smoke passed\",\"ok\":true}\n\n")) }) mux.HandleFunc("/api/v1/admin/accounts/account_1/models", func(w http.ResponseWriter, r *http.Request) { if !requireBatchImportActionAdminToken(t, w, r) { return } writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{{"id": "kimi-k2.6", "display_name": "Kimi K2.6", "type": "chat"}}}}) }) mux.HandleFunc("/api/v1/admin/subscriptions/assign", func(w http.ResponseWriter, r *http.Request) { if !requireBatchImportActionAdminToken(t, w, r) { return } switch r.Method { case http.MethodGet: writeJSON(w, http.StatusBadRequest, map[string]any{"error": "probe only"}) case http.MethodPost: writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": "subscription_1"}}) default: http.NotFound(w, r) } }) mux.HandleFunc("/v1/models", func(w http.ResponseWriter, r *http.Request) { switch strings.TrimSpace(r.Header.Get("Authorization")) { case "Bearer entry-key", "Bearer gateway-key": writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{{"id": "kimi-k2.6"}}}) default: w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte(`{"error":"unauthorized"}`)) } }) mux.HandleFunc("/v1/responses", func(w http.ResponseWriter, r *http.Request) { if strings.TrimSpace(r.Header.Get("Authorization")) != "Bearer entry-key" { w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte(`{"error":"unauthorized"}`)) return } w.WriteHeader(http.StatusForbidden) _, _ = w.Write([]byte(`{"error":"responses unsupported"}`)) }) mux.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) { switch strings.TrimSpace(r.Header.Get("Authorization")) { case "Bearer entry-key", "Bearer gateway-key": writeJSON(w, http.StatusOK, map[string]any{ "id": "chatcmpl_batch_import", "choices": []map[string]any{{ "index": 0, "message": map[string]any{ "role": "assistant", "content": "pong", }, }}, }) default: w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte(`{"error":"unauthorized"}`)) } }) return mux } func requireBatchImportActionAdminToken(t *testing.T, w http.ResponseWriter, r *http.Request) bool { t.Helper() if strings.TrimSpace(r.Header.Get("x-api-key")) != "host-token" { w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte(`{"error":"unauthorized"}`)) return false } return true }