From fb136ead7ce106be7262fb14e8890011eefaaf4c Mon Sep 17 00:00:00 2001 From: phamnazage-jpg Date: Fri, 22 May 2026 15:39:43 +0800 Subject: [PATCH] feat(api): add batch import v2 endpoints --- internal/app/http_api.go | 26 +++ internal/app/http_batch_import.go | 261 +++++++++++++++++++++++++ internal/app/http_batch_import_test.go | 129 ++++++++++++ internal/app/http_batch_runs.go | 205 +++++++++++++++++++ internal/app/http_batch_runs_test.go | 223 +++++++++++++++++++++ 5 files changed, 844 insertions(+) create mode 100644 internal/app/http_batch_import.go create mode 100644 internal/app/http_batch_import_test.go create mode 100644 internal/app/http_batch_runs.go create mode 100644 internal/app/http_batch_runs_test.go diff --git a/internal/app/http_api.go b/internal/app/http_api.go index 17bc5ea2..07a6074e 100644 --- a/internal/app/http_api.go +++ b/internal/app/http_api.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" + "sub2api-cn-relay-manager/internal/batch" "sub2api-cn-relay-manager/internal/host/sub2api" "sub2api-cn-relay-manager/internal/pack" "sub2api-cn-relay-manager/internal/provision" @@ -19,6 +20,11 @@ import ( ) type ActionSet struct { + CreateBatchImportRun func(context.Context, CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) + ListBatchImportRuns func(context.Context, ListBatchImportRunsRequest) ([]batch.RunSummaryProjection, error) + GetBatchImportRun func(context.Context, string) (batch.RunSummaryProjection, error) + ListBatchImportRunItems func(context.Context, ListBatchImportRunItemsRequest) ([]batch.ItemSummaryProjection, error) + GetBatchImportRunItem func(context.Context, GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error) InstallPack func(context.Context, InstallPackRequest) (provision.PackInstallResult, error) BatchDetail func(context.Context, BatchDetailRequest) (provision.BatchDetailResult, error) GetProviderStatus func(context.Context, ProviderQueryRequest) (provision.ProviderSnapshot, error) @@ -201,6 +207,21 @@ func (e *httpError) Error() string { func NewAPIHandler(adminToken string, actions ActionSet) http.Handler { mux := http.NewServeMux() mux.HandleFunc("GET /healthz", healthz) + mux.Handle("POST /api/batch-import/runs", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleCreateBatchImportRun(w, r, actions.CreateBatchImportRun) + }))) + mux.Handle("GET /api/batch-import/runs", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleListBatchImportRuns(w, r, actions.ListBatchImportRuns) + }))) + mux.Handle("GET /api/batch-import/runs/{run_id}", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleGetBatchImportRun(w, r, actions.GetBatchImportRun) + }))) + mux.Handle("GET /api/batch-import/runs/{run_id}/items", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleListBatchImportRunItems(w, r, actions.ListBatchImportRunItems) + }))) + mux.Handle("GET /api/batch-import/runs/{run_id}/items/{item_id}", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleGetBatchImportRunItem(w, r, actions.GetBatchImportRunItem) + }))) mux.Handle("GET /api/import-batches/{batchID}", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handleBatchDetail(w, r, actions.BatchDetail) }))) @@ -867,6 +888,11 @@ func classifyError(err error) *httpError { func NewActionSet(sqliteDSN string) ActionSet { return ActionSet{ + CreateBatchImportRun: buildCreateBatchImportRunAction(sqliteDSN), + ListBatchImportRuns: buildListBatchImportRunsAction(sqliteDSN), + GetBatchImportRun: buildGetBatchImportRunAction(sqliteDSN), + ListBatchImportRunItems: buildListBatchImportRunItemsAction(sqliteDSN), + GetBatchImportRunItem: buildGetBatchImportRunItemAction(sqliteDSN), InstallPack: func(ctx context.Context, req InstallPackRequest) (provision.PackInstallResult, error) { loadedPack, err := pack.LoadPath(req.PackPath) if err != nil { diff --git a/internal/app/http_batch_import.go b/internal/app/http_batch_import.go new file mode 100644 index 00000000..1ca75239 --- /dev/null +++ b/internal/app/http_batch_import.go @@ -0,0 +1,261 @@ +package app + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "sub2api-cn-relay-manager/internal/batch" + "sub2api-cn-relay-manager/internal/store/sqlite" +) + +type BatchImportEntryRequest struct { + BaseURL string `json:"base_url"` + APIKey string `json:"api_key"` + RequestedModels []string `json:"requested_models"` +} + +type CreateBatchImportRunRequest struct { + HostID string `json:"host_id,omitempty"` + Mode string `json:"mode"` + AccessMode string `json:"access_mode"` + ConfirmWaitTimeoutSec int `json:"confirm_wait_timeout_sec,omitempty"` + SubscriptionUsers []string `json:"subscription_users,omitempty"` + SubscriptionDays int `json:"subscription_days,omitempty"` + ProbeAPIKey string `json:"probe_api_key,omitempty"` + Entries []BatchImportEntryRequest `json:"entries"` +} + +type BatchImportRunCreateResponse struct { + RunID string `json:"run_id"` + State string `json:"state"` + ResultPage string `json:"result_page"` + TotalItems int `json:"total_items"` + ActiveItems int `json:"active_items"` + DegradedItems int `json:"degraded_items"` + BrokenItems int `json:"broken_items"` + WarningItems int `json:"warning_items"` +} + +type ListBatchImportRunsRequest struct { + State string + AccessMode string + Query string + Limit int +} + +type ListBatchImportRunItemsRequest struct { + RunID string + CurrentStage string + ConfirmationStatus string + AccessStatus string + HasWarning *bool + ProviderID string + MatchedAccountState string + AccountResolution string + Query string + Limit int +} + +type GetBatchImportRunItemRequest struct { + RunID string + ItemID string +} + +func handleCreateBatchImportRun(w http.ResponseWriter, r *http.Request, fn func(context.Context, CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error)) { + if fn == nil { + writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "create-batch-import-run action is not configured"}) + return + } + var req CreateBatchImportRunRequest + if err := decodeJSON(r, &req); err != nil { + writeHTTPError(w, err) + return + } + if err := validateCreateBatchImportRunRequest(req); err != nil { + writeHTTPError(w, err) + return + } + result, err := fn(r.Context(), req) + if err != nil { + writeHTTPError(w, classifyError(err)) + return + } + writeJSON(w, http.StatusOK, result) +} + +func handleListBatchImportRuns(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListBatchImportRunsRequest) ([]batch.RunSummaryProjection, error)) { + if fn == nil { + writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-batch-import-runs action is not configured"}) + return + } + result, err := fn(r.Context(), ListBatchImportRunsRequest{ + State: strings.TrimSpace(r.URL.Query().Get("state")), + AccessMode: strings.TrimSpace(r.URL.Query().Get("access_mode")), + Query: strings.TrimSpace(r.URL.Query().Get("q")), + Limit: parsePositiveInt(r.URL.Query().Get("limit")), + }) + if err != nil { + writeHTTPError(w, classifyError(err)) + return + } + if result == nil { + result = []batch.RunSummaryProjection{} + } + writeJSON(w, http.StatusOK, map[string]any{"runs": result}) +} + +func buildCreateBatchImportRunAction(sqliteDSN string) func(context.Context, CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { + return func(ctx context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return BatchImportRunCreateResponse{}, err + } + defer store.Close() + + runID := fmt.Sprintf("run_%d", time.Now().UnixNano()) + run := sqlite.ImportRun{ + RunID: runID, + Mode: strings.TrimSpace(req.Mode), + AccessMode: strings.TrimSpace(req.AccessMode), + State: string(batch.RunStateRunning), + TotalItems: len(req.Entries), + } + if err := store.ImportRuns().Create(ctx, run); err != nil { + return BatchImportRunCreateResponse{}, err + } + + for idx, entry := range req.Entries { + item := sqlite.ImportRunItem{ + ItemID: fmt.Sprintf("%s-item-%d", runID, idx+1), + RunID: runID, + BaseURL: strings.TrimSpace(entry.BaseURL), + ProviderID: batch.NormalizeProviderID(entry.BaseURL), + APIKeyFingerprint: fingerprintBatchAPIKey(entry.APIKey), + RequestedModelsJSON: mustMarshalAppJSON(entry.RequestedModels, "[]"), + CurrentStage: string(batch.ItemStageProbe), + ConfirmationStatus: string(batch.ConfirmationPending), + AccessStatus: string(batch.AccessStatusUnknown), + MatchedAccountState: string(batch.MatchedAccountStateNone), + AccountResolution: string(batch.AccountResolutionCreated), + ProvisionReused: false, + CanonicalFamiliesJSON: "[]", + RawModelsJSON: "[]", + NormalizedModelsJSON: "[]", + RecommendedModelsJSON: "[]", + } + if err := store.ImportRunItems().Upsert(ctx, item); err != nil { + return BatchImportRunCreateResponse{}, err + } + } + + return BatchImportRunCreateResponse{ + RunID: runID, + State: string(batch.RunStateRunning), + ResultPage: "/batch-import/runs/" + runID, + TotalItems: len(req.Entries), + ActiveItems: 0, + DegradedItems: 0, + BrokenItems: 0, + WarningItems: 0, + }, nil + } +} + +func buildListBatchImportRunsAction(sqliteDSN string) func(context.Context, ListBatchImportRunsRequest) ([]batch.RunSummaryProjection, error) { + return func(ctx context.Context, req ListBatchImportRunsRequest) ([]batch.RunSummaryProjection, error) { + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return nil, err + } + defer store.Close() + + runs, err := store.ImportRuns().List(ctx, defaultPositiveInt(req.Limit, 50)) + if err != nil { + return nil, err + } + + result := make([]batch.RunSummaryProjection, 0, len(runs)) + for _, run := range runs { + if req.State != "" && run.State != req.State { + continue + } + if req.AccessMode != "" && run.AccessMode != req.AccessMode { + continue + } + if req.Query != "" && !strings.Contains(strings.ToLower(run.RunID), strings.ToLower(req.Query)) { + continue + } + result = append(result, batch.ProjectRunSummary(run)) + } + return result, nil + } +} + +func validateCreateBatchImportRunRequest(req CreateBatchImportRunRequest) *httpError { + if strings.TrimSpace(req.HostID) == "" { + return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "host_id is required"} + } + if strings.TrimSpace(req.Mode) == "" { + return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "mode is required"} + } + if strings.TrimSpace(req.AccessMode) == "" { + return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "access_mode is required"} + } + if len(req.Entries) == 0 { + return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "entries is required"} + } + switch strings.TrimSpace(req.AccessMode) { + case "subscription": + if len(req.SubscriptionUsers) == 0 { + return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "subscription_users is required when access_mode=subscription"} + } + if req.SubscriptionDays <= 0 { + return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "subscription_days is required when access_mode=subscription"} + } + case "self_service": + if strings.TrimSpace(req.ProbeAPIKey) == "" { + return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "probe_api_key is required when access_mode=self_service"} + } + default: + return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "access_mode must be subscription or self_service"} + } + return nil +} + +func parsePositiveInt(raw string) int { + value, err := strconv.Atoi(strings.TrimSpace(raw)) + if err != nil || value <= 0 { + return 0 + } + return value +} + +func defaultPositiveInt(value, fallback int) int { + if value > 0 { + return value + } + return fallback +} + +func fingerprintBatchAPIKey(apiKey string) string { + trimmed := strings.TrimSpace(apiKey) + if trimmed == "" { + return "" + } + sum := sha256.Sum256([]byte(trimmed)) + return fmt.Sprintf("sha256:%x", sum[:4]) +} + +func mustMarshalAppJSON(value any, fallback string) string { + payload, err := json.Marshal(value) + if err != nil { + return fallback + } + return string(payload) +} diff --git a/internal/app/http_batch_import_test.go b/internal/app/http_batch_import_test.go new file mode 100644 index 00000000..e5c4f3bf --- /dev/null +++ b/internal/app/http_batch_import_test.go @@ -0,0 +1,129 @@ +package app + +import ( + "context" + "net/http" + "testing" +) + +func TestBatchImportHTTP(t *testing.T) { + t.Parallel() + + t.Run("POST create run returns run summary", func(t *testing.T) { + t.Parallel() + + handler := NewAPIHandler("secret-token", ActionSet{ + CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { + if req.HostID != "host-1" { + t.Fatalf("HostID = %q, want host-1", req.HostID) + } + if req.AccessMode != "subscription" { + t.Fatalf("AccessMode = %q, want subscription", req.AccessMode) + } + if len(req.SubscriptionUsers) != 1 || req.SubscriptionUsers[0] != "user-1" { + t.Fatalf("SubscriptionUsers = %#v, want [user-1]", req.SubscriptionUsers) + } + if len(req.Entries) != 1 || req.Entries[0].BaseURL != "https://kimi.example.com/v1" { + t.Fatalf("Entries = %#v, want request payload", req.Entries) + } + return BatchImportRunCreateResponse{ + RunID: "run_20260522_0001", + State: "running", + ResultPage: "/batch-import/runs/run_20260522_0001", + TotalItems: 1, + ActiveItems: 0, + DegradedItems: 0, + BrokenItems: 0, + WarningItems: 0, + }, nil + }, + }) + + req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{ + "host_id": "host-1", + "mode": "strict", + "access_mode": "subscription", + "subscription_users": []string{"user-1"}, + "subscription_days": 30, + "entries": []map[string]any{ + {"base_url": "https://kimi.example.com/v1", "api_key": "sk-test", "requested_models": []string{"kimi-k2.6"}}, + }, + }, "secret-token") + res := httptestRecorder(handler, req) + assertStatusCode(t, res, http.StatusOK) + assertJSONContains(t, res.Body().Bytes(), "run_id", "run_20260522_0001") + assertJSONContains(t, res.Body().Bytes(), "result_page", "/batch-import/runs/run_20260522_0001") + }) + + t.Run("subscription request requires subscription fields", func(t *testing.T) { + t.Parallel() + + handler := NewAPIHandler("secret-token", ActionSet{ + CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { + t.Fatal("CreateBatchImportRun should not be called when request is invalid") + return BatchImportRunCreateResponse{}, nil + }, + }) + + req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{ + "host_id": "host-1", + "mode": "strict", + "access_mode": "subscription", + "entries": []map[string]any{ + {"base_url": "https://kimi.example.com/v1", "api_key": "sk-test"}, + }, + }, "secret-token") + res := httptestRecorder(handler, req) + assertStatusCode(t, res, http.StatusBadRequest) + assertJSONContains(t, res.Body().Bytes(), "error.code", "invalid_request") + }) + + t.Run("self service request requires probe api key", func(t *testing.T) { + t.Parallel() + + handler := NewAPIHandler("secret-token", ActionSet{ + CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { + t.Fatal("CreateBatchImportRun should not be called when request is invalid") + return BatchImportRunCreateResponse{}, nil + }, + }) + + req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{ + "host_id": "host-1", + "mode": "partial", + "access_mode": "self_service", + "entries": []map[string]any{ + {"base_url": "https://deepseek.example.com/v1", "api_key": "sk-test"}, + }, + }, "secret-token") + res := httptestRecorder(handler, req) + assertStatusCode(t, res, http.StatusBadRequest) + assertJSONContains(t, res.Body().Bytes(), "error.code", "invalid_request") + assertJSONContains(t, res.Body().Bytes(), "error.message", "probe_api_key is required when access_mode=self_service") + }) + + t.Run("create run requires host id", func(t *testing.T) { + t.Parallel() + + handler := NewAPIHandler("secret-token", ActionSet{ + CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { + t.Fatal("CreateBatchImportRun should not be called when host_id is missing") + return BatchImportRunCreateResponse{}, nil + }, + }) + + req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{ + "mode": "strict", + "access_mode": "subscription", + "subscription_users": []string{"user-1"}, + "subscription_days": 30, + "entries": []map[string]any{ + {"base_url": "https://kimi.example.com/v1", "api_key": "sk-test"}, + }, + }, "secret-token") + res := httptestRecorder(handler, req) + assertStatusCode(t, res, http.StatusBadRequest) + assertJSONContains(t, res.Body().Bytes(), "error.code", "invalid_request") + assertJSONContains(t, res.Body().Bytes(), "error.message", "host_id is required") + }) +} diff --git a/internal/app/http_batch_runs.go b/internal/app/http_batch_runs.go new file mode 100644 index 00000000..321dda08 --- /dev/null +++ b/internal/app/http_batch_runs.go @@ -0,0 +1,205 @@ +package app + +import ( + "context" + "database/sql" + "fmt" + "net/http" + "strings" + + "sub2api-cn-relay-manager/internal/batch" + "sub2api-cn-relay-manager/internal/store/sqlite" +) + +func handleGetBatchImportRun(w http.ResponseWriter, r *http.Request, fn func(context.Context, string) (batch.RunSummaryProjection, error)) { + if fn == nil { + writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "get-batch-import-run action is not configured"}) + return + } + runID := strings.TrimSpace(r.PathValue("run_id")) + if runID == "" { + writeHTTPError(w, &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "run_id is required"}) + return + } + run, err := fn(r.Context(), runID) + if err != nil { + writeHTTPError(w, classifyError(err)) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "run": run, + "recent_warnings": run.RecentWarnings, + }) +} + +func handleListBatchImportRunItems(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListBatchImportRunItemsRequest) ([]batch.ItemSummaryProjection, error)) { + if fn == nil { + writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-batch-import-run-items action is not configured"}) + return + } + runID := strings.TrimSpace(r.PathValue("run_id")) + if runID == "" { + writeHTTPError(w, &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "run_id is required"}) + return + } + req := ListBatchImportRunItemsRequest{ + RunID: runID, + CurrentStage: strings.TrimSpace(r.URL.Query().Get("current_stage")), + ConfirmationStatus: strings.TrimSpace(r.URL.Query().Get("confirmation_status")), + AccessStatus: strings.TrimSpace(r.URL.Query().Get("access_status")), + ProviderID: strings.TrimSpace(r.URL.Query().Get("provider_id")), + MatchedAccountState: strings.TrimSpace(r.URL.Query().Get("matched_account_state")), + AccountResolution: strings.TrimSpace(r.URL.Query().Get("account_resolution")), + Query: strings.TrimSpace(r.URL.Query().Get("q")), + Limit: parsePositiveInt(r.URL.Query().Get("limit")), + } + if hasWarningRaw := strings.TrimSpace(r.URL.Query().Get("has_warning")); hasWarningRaw != "" { + value := strings.EqualFold(hasWarningRaw, "true") + req.HasWarning = &value + } + items, err := fn(r.Context(), req) + if err != nil { + writeHTTPError(w, classifyError(err)) + return + } + if items == nil { + items = []batch.ItemSummaryProjection{} + } + writeJSON(w, http.StatusOK, map[string]any{"items": items}) +} + +func handleGetBatchImportRunItem(w http.ResponseWriter, r *http.Request, fn func(context.Context, GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error)) { + if fn == nil { + writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "get-batch-import-run-item action is not configured"}) + return + } + req := GetBatchImportRunItemRequest{ + RunID: strings.TrimSpace(r.PathValue("run_id")), + ItemID: strings.TrimSpace(r.PathValue("item_id")), + } + if req.RunID == "" || req.ItemID == "" { + writeHTTPError(w, &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "run_id and item_id are required"}) + return + } + item, err := fn(r.Context(), req) + if err != nil { + writeHTTPError(w, classifyError(err)) + return + } + writeJSON(w, http.StatusOK, item) +} + +func buildGetBatchImportRunAction(sqliteDSN string) func(context.Context, string) (batch.RunSummaryProjection, error) { + return func(ctx context.Context, runID string) (batch.RunSummaryProjection, error) { + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return batch.RunSummaryProjection{}, err + } + defer store.Close() + + run, err := store.ImportRuns().GetByRunID(ctx, runID) + if err != nil { + if err == sql.ErrNoRows { + return batch.RunSummaryProjection{}, fmt.Errorf("run not found: %s", runID) + } + return batch.RunSummaryProjection{}, err + } + return batch.ProjectRunSummary(run), nil + } +} + +func buildListBatchImportRunItemsAction(sqliteDSN string) func(context.Context, ListBatchImportRunItemsRequest) ([]batch.ItemSummaryProjection, error) { + return func(ctx context.Context, req ListBatchImportRunItemsRequest) ([]batch.ItemSummaryProjection, error) { + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return nil, err + } + defer store.Close() + + if _, err := store.ImportRuns().GetByRunID(ctx, req.RunID); err != nil { + if err == sql.ErrNoRows { + return nil, fmt.Errorf("run not found: %s", req.RunID) + } + return nil, err + } + items, err := store.ImportRunItems().ListByRunID(ctx, req.RunID) + if err != nil { + return nil, err + } + + result := make([]batch.ItemSummaryProjection, 0, len(items)) + for _, item := range items { + view := batch.ProjectItemSummary(item) + if !matchesItemFilters(view, req) { + continue + } + result = append(result, view) + if req.Limit > 0 && len(result) >= req.Limit { + break + } + } + return result, nil + } +} + +func buildGetBatchImportRunItemAction(sqliteDSN string) func(context.Context, GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error) { + return func(ctx context.Context, req GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error) { + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return batch.ItemDetailProjection{}, err + } + defer store.Close() + + item, err := store.ImportRunItems().GetByItemID(ctx, req.ItemID) + if err != nil { + if err == sql.ErrNoRows { + return batch.ItemDetailProjection{}, fmt.Errorf("item not found: %s", req.ItemID) + } + return batch.ItemDetailProjection{}, err + } + if item.RunID != req.RunID { + return batch.ItemDetailProjection{}, fmt.Errorf("item not found in run %s", req.RunID) + } + events, err := store.ImportRunEvents().ListByItemID(ctx, req.ItemID) + if err != nil { + return batch.ItemDetailProjection{}, err + } + return batch.ProjectItemDetail(item, events) + } +} + +func matchesItemFilters(view batch.ItemSummaryProjection, req ListBatchImportRunItemsRequest) bool { + if req.CurrentStage != "" && view.CurrentStage != req.CurrentStage { + return false + } + if req.ConfirmationStatus != "" && view.ConfirmationStatus != req.ConfirmationStatus { + return false + } + if req.AccessStatus != "" && view.AccessStatus != req.AccessStatus { + return false + } + if req.ProviderID != "" && view.ProviderID != req.ProviderID { + return false + } + if req.MatchedAccountState != "" && view.MatchedAccountState != req.MatchedAccountState { + return false + } + if req.AccountResolution != "" && view.AccountResolution != req.AccountResolution { + return false + } + if req.HasWarning != nil { + hasWarning := len(view.AdvisoryMessages) > 0 + if hasWarning != *req.HasWarning { + return false + } + } + if req.Query != "" { + query := strings.ToLower(req.Query) + if !strings.Contains(strings.ToLower(view.ItemID), query) && + !strings.Contains(strings.ToLower(view.ProviderID), query) && + !strings.Contains(strings.ToLower(view.BaseURL), query) { + return false + } + } + return true +} diff --git a/internal/app/http_batch_runs_test.go b/internal/app/http_batch_runs_test.go new file mode 100644 index 00000000..9bfb8a85 --- /dev/null +++ b/internal/app/http_batch_runs_test.go @@ -0,0 +1,223 @@ +package app + +import ( + "context" + "encoding/json" + "net/http" + "testing" + + "sub2api-cn-relay-manager/internal/batch" + "sub2api-cn-relay-manager/internal/probe" +) + +func TestBatchRunsHTTP(t *testing.T) { + t.Parallel() + + t.Run("GET runs returns projected summaries", func(t *testing.T) { + t.Parallel() + + handler := NewAPIHandler("secret-token", ActionSet{ + ListBatchImportRuns: func(_ context.Context, req ListBatchImportRunsRequest) ([]batch.RunSummaryProjection, error) { + if req.State != "completed_with_warnings" { + t.Fatalf("State = %q, want completed_with_warnings", req.State) + } + if req.AccessMode != "subscription" { + t.Fatalf("AccessMode = %q, want subscription", req.AccessMode) + } + return []batch.RunSummaryProjection{{ + RunID: "run-1", + State: "completed_with_warnings", + Mode: "partial", + AccessMode: "subscription", + TotalItems: 2, + WarningItems: 1, + }}, nil + }, + }) + + req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs?state=completed_with_warnings&access_mode=subscription", nil, "secret-token") + res := httptestRecorder(handler, req) + assertStatusCode(t, res, http.StatusOK) + run := decodeJSONArrayObjectAt(t, res.Body().Bytes(), "runs", 0) + assertJSONObjectValue(t, run, "run_id", "run-1") + assertJSONObjectValue(t, run, "state", "completed_with_warnings") + assertJSONObjectValue(t, run, "access_mode", "subscription") + assertJSONObjectValue(t, run, "warning_items", float64(1)) + }) + + t.Run("GET run detail returns wrapped projection", func(t *testing.T) { + t.Parallel() + + handler := NewAPIHandler("secret-token", ActionSet{ + GetBatchImportRun: func(_ context.Context, runID string) (batch.RunSummaryProjection, error) { + if runID != "run-1" { + t.Fatalf("runID = %q, want run-1", runID) + } + return batch.RunSummaryProjection{ + RunID: "run-1", + State: "completed_with_warnings", + Mode: "partial", + AccessMode: "subscription", + TotalItems: 2, + CompletedItems: 2, + ActiveItems: 1, + DegradedItems: 1, + WarningItems: 1, + RecentWarnings: []string{"warning"}, + }, nil + }, + }) + + req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1", nil, "secret-token") + res := httptestRecorder(handler, req) + assertStatusCode(t, res, http.StatusOK) + assertJSONContains(t, res.Body().Bytes(), "run.run_id", "run-1") + assertJSONArrayValueAt(t, res.Body().Bytes(), "recent_warnings", 0, "warning") + }) + + t.Run("GET items forwards matched account filters", func(t *testing.T) { + t.Parallel() + + handler := NewAPIHandler("secret-token", ActionSet{ + ListBatchImportRunItems: func(_ context.Context, req ListBatchImportRunItemsRequest) ([]batch.ItemSummaryProjection, error) { + if req.RunID != "run-1" { + t.Fatalf("RunID = %q, want run-1", req.RunID) + } + if req.MatchedAccountState != "active" { + t.Fatalf("MatchedAccountState = %q, want active", req.MatchedAccountState) + } + if req.AccountResolution != "reused" { + t.Fatalf("AccountResolution = %q, want reused", req.AccountResolution) + } + return []batch.ItemSummaryProjection{{ + ItemID: "item-1", + BaseURL: "https://kimi.example.com/v1", + ProviderID: "kimi-a7m-1", + APIKeyFingerprint: "sha256:1234", + CurrentStage: "done", + ConfirmationStatus: "advisory", + AccessStatus: "active", + MatchedAccountState: "active", + AccountResolution: "reused", + ProvisionReused: true, + }}, nil + }, + }) + + req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1/items?matched_account_state=active&account_resolution=reused", nil, "secret-token") + res := httptestRecorder(handler, req) + assertStatusCode(t, res, http.StatusOK) + item := decodeJSONArrayObjectAt(t, res.Body().Bytes(), "items", 0) + assertJSONObjectValue(t, item, "item_id", "item-1") + assertJSONObjectValue(t, item, "matched_account_state", "active") + assertJSONObjectValue(t, item, "account_resolution", "reused") + assertJSONObjectValue(t, item, "provision_reused", true) + }) + + t.Run("GET item detail returns capability profile and events", func(t *testing.T) { + t.Parallel() + + handler := NewAPIHandler("secret-token", ActionSet{ + GetBatchImportRunItem: func(_ context.Context, req GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error) { + if req.RunID != "run-1" || req.ItemID != "item-1" { + t.Fatalf("request = %#v, want run-1/item-1", req) + } + return batch.ItemDetailProjection{ + ItemSummaryProjection: batch.ItemSummaryProjection{ + ItemID: "item-1", + BaseURL: "https://kimi.example.com/v1", + ProviderID: "kimi-a7m-1", + APIKeyFingerprint: "sha256:1234", + CanonicalModelFamilies: []string{"kimi-2.6"}, + CurrentStage: "done", + ConfirmationStatus: "advisory", + AccessStatus: "active", + MatchedAccountState: "deprecated", + AccountResolution: "reactivated", + ProvisionReused: true, + }, + ReusedFromProviderID: "provider-kimi-old", + CapabilityProfile: batchProbeProfile(), + Events: []batch.EventProjection{{ + EventID: "evt-1", + EventType: "retry_scheduled", + Stage: "confirm", + Message: "retry queued", + }}, + }, nil + }, + }) + + req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1/items/item-1", nil, "secret-token") + res := httptestRecorder(handler, req) + assertStatusCode(t, res, http.StatusOK) + assertJSONContains(t, res.Body().Bytes(), "item_id", "item-1") + assertJSONContains(t, res.Body().Bytes(), "reused_from_provider_id", "provider-kimi-old") + assertJSONContains(t, res.Body().Bytes(), "capability_profile.transport_profile.supports_openai_chat_completions", true) + event := decodeJSONArrayObjectAt(t, res.Body().Bytes(), "events", 0) + assertJSONObjectValue(t, event, "event_type", "retry_scheduled") + assertJSONObjectValue(t, event, "stage", "confirm") + }) +} + +func batchProbeProfile() probe.CapabilityProfile { + return probe.CapabilityProfile{ + TransportProfile: probe.TransportProfile{ + SupportsOpenAIChatCompletions: true, + }, + } +} + +func decodeJSONArrayObjectAt(t *testing.T, payload []byte, key string, index int) map[string]any { + t.Helper() + + values, ok := decodeTopLevelArray(t, payload, key) + if !ok { + t.Fatalf("json key %q is not an array; payload=%s", key, string(payload)) + } + if index < 0 || index >= len(values) { + t.Fatalf("json key %q length = %d, want index %d present; payload=%s", key, len(values), index, string(payload)) + } + + object, ok := values[index].(map[string]any) + if !ok { + t.Fatalf("json key %q[%d] is not an object; payload=%s", key, index, string(payload)) + } + return object +} + +func assertJSONArrayValueAt(t *testing.T, payload []byte, key string, index int, want any) { + t.Helper() + + values, ok := decodeTopLevelArray(t, payload, key) + if !ok { + t.Fatalf("json key %q is not an array; payload=%s", key, string(payload)) + } + if index < 0 || index >= len(values) { + t.Fatalf("json key %q length = %d, want index %d present; payload=%s", key, len(values), index, string(payload)) + } + + if got := values[index]; got != want { + t.Fatalf("json key %q[%d] = %#v, want %#v; payload=%s", key, index, got, want, string(payload)) + } +} + +func assertJSONObjectValue(t *testing.T, object map[string]any, key string, want any) { + t.Helper() + + if got := object[key]; got != want { + t.Fatalf("json object key %q = %#v, want %#v", key, got, want) + } +} + +func decodeTopLevelArray(t *testing.T, payload []byte, key string) ([]any, bool) { + t.Helper() + + var decoded map[string]any + if err := json.Unmarshal(payload, &decoded); err != nil { + t.Fatalf("json.Unmarshal() error = %v; payload=%s", err, string(payload)) + } + + values, ok := decoded[key].([]any) + return values, ok +}