package app import ( "context" "net/http" "strconv" "strings" "sub2api-cn-relay-manager/internal/batch" "sub2api-cn-relay-manager/internal/store/sqlite" ) type BatchImportEntryRequest struct { BaseURL string `json:"base_url"` APIKey string `json:"api_key"` RequestedModels []string `json:"requested_models"` } type CreateBatchImportRunRequest struct { HostID string `json:"host_id,omitempty"` Mode string `json:"mode"` AccessMode string `json:"access_mode"` ConfirmWaitTimeoutSec int `json:"confirm_wait_timeout_sec,omitempty"` SubscriptionUsers []string `json:"subscription_users,omitempty"` SubscriptionDays int `json:"subscription_days,omitempty"` ProbeAPIKey string `json:"probe_api_key,omitempty"` Entries []BatchImportEntryRequest `json:"entries"` } type BatchImportRunCreateResponse struct { RunID string `json:"run_id"` State string `json:"state"` ResultPage string `json:"result_page"` TotalItems int `json:"total_items"` ActiveItems int `json:"active_items"` DegradedItems int `json:"degraded_items"` BrokenItems int `json:"broken_items"` WarningItems int `json:"warning_items"` } type ListBatchImportRunsRequest struct { State string AccessMode string Query string Cursor string Limit int } type ListBatchImportRunsResponse struct { Runs []batch.RunSummaryProjection `json:"runs"` NextCursor *string `json:"next_cursor"` } type ListBatchImportRunItemsRequest struct { RunID string CurrentStage string ConfirmationStatus string AccessStatus string HasWarning *bool ProviderID string MatchedAccountState string AccountResolution string Query string Cursor string Limit int } type ListBatchImportRunItemsResponse struct { Items []batch.ItemSummaryProjection `json:"items"` NextCursor *string `json:"next_cursor"` } type GetBatchImportRunItemRequest struct { RunID string ItemID string } func handleCreateBatchImportRun(w http.ResponseWriter, r *http.Request, fn func(context.Context, CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error)) { if fn == nil { writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "create-batch-import-run action is not configured"}) return } var req CreateBatchImportRunRequest if err := decodeJSON(r, &req); err != nil { writeHTTPError(w, err) return } if err := validateCreateBatchImportRunRequest(req); err != nil { writeHTTPError(w, err) return } result, err := fn(r.Context(), req) if err != nil { writeHTTPError(w, classifyError(err)) return } writeJSON(w, http.StatusOK, result) } func handleListBatchImportRuns(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListBatchImportRunsRequest) (ListBatchImportRunsResponse, error)) { if fn == nil { writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-batch-import-runs action is not configured"}) return } result, err := fn(r.Context(), ListBatchImportRunsRequest{ State: strings.TrimSpace(r.URL.Query().Get("state")), AccessMode: strings.TrimSpace(r.URL.Query().Get("access_mode")), Query: strings.TrimSpace(r.URL.Query().Get("q")), Cursor: strings.TrimSpace(r.URL.Query().Get("cursor")), Limit: parsePositiveInt(r.URL.Query().Get("limit")), }) if err != nil { writeHTTPError(w, classifyError(err)) return } if result.Runs == nil { result.Runs = []batch.RunSummaryProjection{} } writeJSON(w, http.StatusOK, result) } func buildCreateBatchImportRunAction(sqliteDSN string) func(context.Context, CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { return func(ctx context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) { store, err := sqlite.Open(ctx, sqliteDSN) if err != nil { return BatchImportRunCreateResponse{}, err } defer store.Close() hostRow, client, err := resolveManagedHost(ctx, store, req.HostID, "", CreateHostAuth{}) if err != nil { return BatchImportRunCreateResponse{}, err } runner := batchImportRuntimeRunner{ store: store, hostRow: hostRow, hostClient: client, request: req, } return runner.execute(ctx) } } func buildListBatchImportRunsAction(sqliteDSN string) func(context.Context, ListBatchImportRunsRequest) (ListBatchImportRunsResponse, error) { return func(ctx context.Context, req ListBatchImportRunsRequest) (ListBatchImportRunsResponse, error) { store, err := sqlite.Open(ctx, sqliteDSN) if err != nil { return ListBatchImportRunsResponse{}, err } defer store.Close() runs, err := store.ImportRuns().List(ctx, 1000) if err != nil { return ListBatchImportRunsResponse{}, err } limit := defaultPositiveInt(req.Limit, 50) result := make([]batch.RunSummaryProjection, 0, limit) nextCursor := (*string)(nil) started := strings.TrimSpace(req.Cursor) == "" for _, run := range runs { if !started { if run.RunID == strings.TrimSpace(req.Cursor) { started = true } continue } if req.State != "" && run.State != req.State { continue } if req.AccessMode != "" && run.AccessMode != req.AccessMode { continue } if req.Query != "" { query := strings.ToLower(req.Query) if !strings.Contains(strings.ToLower(run.RunID), query) { items, err := store.ImportRunItems().ListByRunID(ctx, run.RunID) if err != nil { return ListBatchImportRunsResponse{}, err } matched := false for _, item := range items { if strings.Contains(strings.ToLower(item.ProviderID), query) || strings.Contains(strings.ToLower(item.BaseURL), query) { matched = true break } } if !matched { continue } } } if len(result) >= limit { cursor := run.RunID nextCursor = &cursor break } result = append(result, batch.ProjectRunSummary(run)) } return ListBatchImportRunsResponse{Runs: result, NextCursor: nextCursor}, nil } } func validateCreateBatchImportRunRequest(req CreateBatchImportRunRequest) *httpError { if strings.TrimSpace(req.HostID) == "" { return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "host_id is required"} } if strings.TrimSpace(req.Mode) == "" { return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "mode is required"} } if strings.TrimSpace(req.AccessMode) == "" { return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "access_mode is required"} } if len(req.Entries) == 0 { return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "entries is required"} } switch strings.TrimSpace(req.AccessMode) { case "subscription": if len(req.SubscriptionUsers) == 0 { return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "subscription_users is required when access_mode=subscription"} } if req.SubscriptionDays <= 0 { return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "subscription_days is required when access_mode=subscription"} } case "self_service": if strings.TrimSpace(req.ProbeAPIKey) == "" { return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "probe_api_key is required when access_mode=self_service"} } default: return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "access_mode must be subscription or self_service"} } return nil } func parsePositiveInt(raw string) int { value, err := strconv.Atoi(strings.TrimSpace(raw)) if err != nil || value <= 0 { return 0 } return value } func defaultPositiveInt(value, fallback int) int { if value > 0 { return value } return fallback }