feat(api): add batch import v2 endpoints

This commit is contained in:
phamnazage-jpg
2026-05-22 15:39:43 +08:00
parent c9b6589320
commit fb136ead7c
5 changed files with 844 additions and 0 deletions

View File

@@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sub2api-cn-relay-manager/internal/batch"
"sub2api-cn-relay-manager/internal/host/sub2api"
"sub2api-cn-relay-manager/internal/pack"
"sub2api-cn-relay-manager/internal/provision"
@@ -19,6 +20,11 @@ import (
)
type ActionSet struct {
CreateBatchImportRun func(context.Context, CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error)
ListBatchImportRuns func(context.Context, ListBatchImportRunsRequest) ([]batch.RunSummaryProjection, error)
GetBatchImportRun func(context.Context, string) (batch.RunSummaryProjection, error)
ListBatchImportRunItems func(context.Context, ListBatchImportRunItemsRequest) ([]batch.ItemSummaryProjection, error)
GetBatchImportRunItem func(context.Context, GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error)
InstallPack func(context.Context, InstallPackRequest) (provision.PackInstallResult, error)
BatchDetail func(context.Context, BatchDetailRequest) (provision.BatchDetailResult, error)
GetProviderStatus func(context.Context, ProviderQueryRequest) (provision.ProviderSnapshot, error)
@@ -201,6 +207,21 @@ func (e *httpError) Error() string {
func NewAPIHandler(adminToken string, actions ActionSet) http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("GET /healthz", healthz)
mux.Handle("POST /api/batch-import/runs", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handleCreateBatchImportRun(w, r, actions.CreateBatchImportRun)
})))
mux.Handle("GET /api/batch-import/runs", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handleListBatchImportRuns(w, r, actions.ListBatchImportRuns)
})))
mux.Handle("GET /api/batch-import/runs/{run_id}", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handleGetBatchImportRun(w, r, actions.GetBatchImportRun)
})))
mux.Handle("GET /api/batch-import/runs/{run_id}/items", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handleListBatchImportRunItems(w, r, actions.ListBatchImportRunItems)
})))
mux.Handle("GET /api/batch-import/runs/{run_id}/items/{item_id}", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handleGetBatchImportRunItem(w, r, actions.GetBatchImportRunItem)
})))
mux.Handle("GET /api/import-batches/{batchID}", requireAdminToken(adminToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handleBatchDetail(w, r, actions.BatchDetail)
})))
@@ -867,6 +888,11 @@ func classifyError(err error) *httpError {
func NewActionSet(sqliteDSN string) ActionSet {
return ActionSet{
CreateBatchImportRun: buildCreateBatchImportRunAction(sqliteDSN),
ListBatchImportRuns: buildListBatchImportRunsAction(sqliteDSN),
GetBatchImportRun: buildGetBatchImportRunAction(sqliteDSN),
ListBatchImportRunItems: buildListBatchImportRunItemsAction(sqliteDSN),
GetBatchImportRunItem: buildGetBatchImportRunItemAction(sqliteDSN),
InstallPack: func(ctx context.Context, req InstallPackRequest) (provision.PackInstallResult, error) {
loadedPack, err := pack.LoadPath(req.PackPath)
if err != nil {

View File

@@ -0,0 +1,261 @@
package app
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"sub2api-cn-relay-manager/internal/batch"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
type BatchImportEntryRequest struct {
BaseURL string `json:"base_url"`
APIKey string `json:"api_key"`
RequestedModels []string `json:"requested_models"`
}
type CreateBatchImportRunRequest struct {
HostID string `json:"host_id,omitempty"`
Mode string `json:"mode"`
AccessMode string `json:"access_mode"`
ConfirmWaitTimeoutSec int `json:"confirm_wait_timeout_sec,omitempty"`
SubscriptionUsers []string `json:"subscription_users,omitempty"`
SubscriptionDays int `json:"subscription_days,omitempty"`
ProbeAPIKey string `json:"probe_api_key,omitempty"`
Entries []BatchImportEntryRequest `json:"entries"`
}
type BatchImportRunCreateResponse struct {
RunID string `json:"run_id"`
State string `json:"state"`
ResultPage string `json:"result_page"`
TotalItems int `json:"total_items"`
ActiveItems int `json:"active_items"`
DegradedItems int `json:"degraded_items"`
BrokenItems int `json:"broken_items"`
WarningItems int `json:"warning_items"`
}
type ListBatchImportRunsRequest struct {
State string
AccessMode string
Query string
Limit int
}
type ListBatchImportRunItemsRequest struct {
RunID string
CurrentStage string
ConfirmationStatus string
AccessStatus string
HasWarning *bool
ProviderID string
MatchedAccountState string
AccountResolution string
Query string
Limit int
}
type GetBatchImportRunItemRequest struct {
RunID string
ItemID string
}
func handleCreateBatchImportRun(w http.ResponseWriter, r *http.Request, fn func(context.Context, CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "create-batch-import-run action is not configured"})
return
}
var req CreateBatchImportRunRequest
if err := decodeJSON(r, &req); err != nil {
writeHTTPError(w, err)
return
}
if err := validateCreateBatchImportRunRequest(req); err != nil {
writeHTTPError(w, err)
return
}
result, err := fn(r.Context(), req)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
writeJSON(w, http.StatusOK, result)
}
func handleListBatchImportRuns(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListBatchImportRunsRequest) ([]batch.RunSummaryProjection, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-batch-import-runs action is not configured"})
return
}
result, err := fn(r.Context(), ListBatchImportRunsRequest{
State: strings.TrimSpace(r.URL.Query().Get("state")),
AccessMode: strings.TrimSpace(r.URL.Query().Get("access_mode")),
Query: strings.TrimSpace(r.URL.Query().Get("q")),
Limit: parsePositiveInt(r.URL.Query().Get("limit")),
})
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
if result == nil {
result = []batch.RunSummaryProjection{}
}
writeJSON(w, http.StatusOK, map[string]any{"runs": result})
}
func buildCreateBatchImportRunAction(sqliteDSN string) func(context.Context, CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) {
return func(ctx context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return BatchImportRunCreateResponse{}, err
}
defer store.Close()
runID := fmt.Sprintf("run_%d", time.Now().UnixNano())
run := sqlite.ImportRun{
RunID: runID,
Mode: strings.TrimSpace(req.Mode),
AccessMode: strings.TrimSpace(req.AccessMode),
State: string(batch.RunStateRunning),
TotalItems: len(req.Entries),
}
if err := store.ImportRuns().Create(ctx, run); err != nil {
return BatchImportRunCreateResponse{}, err
}
for idx, entry := range req.Entries {
item := sqlite.ImportRunItem{
ItemID: fmt.Sprintf("%s-item-%d", runID, idx+1),
RunID: runID,
BaseURL: strings.TrimSpace(entry.BaseURL),
ProviderID: batch.NormalizeProviderID(entry.BaseURL),
APIKeyFingerprint: fingerprintBatchAPIKey(entry.APIKey),
RequestedModelsJSON: mustMarshalAppJSON(entry.RequestedModels, "[]"),
CurrentStage: string(batch.ItemStageProbe),
ConfirmationStatus: string(batch.ConfirmationPending),
AccessStatus: string(batch.AccessStatusUnknown),
MatchedAccountState: string(batch.MatchedAccountStateNone),
AccountResolution: string(batch.AccountResolutionCreated),
ProvisionReused: false,
CanonicalFamiliesJSON: "[]",
RawModelsJSON: "[]",
NormalizedModelsJSON: "[]",
RecommendedModelsJSON: "[]",
}
if err := store.ImportRunItems().Upsert(ctx, item); err != nil {
return BatchImportRunCreateResponse{}, err
}
}
return BatchImportRunCreateResponse{
RunID: runID,
State: string(batch.RunStateRunning),
ResultPage: "/batch-import/runs/" + runID,
TotalItems: len(req.Entries),
ActiveItems: 0,
DegradedItems: 0,
BrokenItems: 0,
WarningItems: 0,
}, nil
}
}
func buildListBatchImportRunsAction(sqliteDSN string) func(context.Context, ListBatchImportRunsRequest) ([]batch.RunSummaryProjection, error) {
return func(ctx context.Context, req ListBatchImportRunsRequest) ([]batch.RunSummaryProjection, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return nil, err
}
defer store.Close()
runs, err := store.ImportRuns().List(ctx, defaultPositiveInt(req.Limit, 50))
if err != nil {
return nil, err
}
result := make([]batch.RunSummaryProjection, 0, len(runs))
for _, run := range runs {
if req.State != "" && run.State != req.State {
continue
}
if req.AccessMode != "" && run.AccessMode != req.AccessMode {
continue
}
if req.Query != "" && !strings.Contains(strings.ToLower(run.RunID), strings.ToLower(req.Query)) {
continue
}
result = append(result, batch.ProjectRunSummary(run))
}
return result, nil
}
}
func validateCreateBatchImportRunRequest(req CreateBatchImportRunRequest) *httpError {
if strings.TrimSpace(req.HostID) == "" {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "host_id is required"}
}
if strings.TrimSpace(req.Mode) == "" {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "mode is required"}
}
if strings.TrimSpace(req.AccessMode) == "" {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "access_mode is required"}
}
if len(req.Entries) == 0 {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "entries is required"}
}
switch strings.TrimSpace(req.AccessMode) {
case "subscription":
if len(req.SubscriptionUsers) == 0 {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "subscription_users is required when access_mode=subscription"}
}
if req.SubscriptionDays <= 0 {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "subscription_days is required when access_mode=subscription"}
}
case "self_service":
if strings.TrimSpace(req.ProbeAPIKey) == "" {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "probe_api_key is required when access_mode=self_service"}
}
default:
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "access_mode must be subscription or self_service"}
}
return nil
}
func parsePositiveInt(raw string) int {
value, err := strconv.Atoi(strings.TrimSpace(raw))
if err != nil || value <= 0 {
return 0
}
return value
}
func defaultPositiveInt(value, fallback int) int {
if value > 0 {
return value
}
return fallback
}
func fingerprintBatchAPIKey(apiKey string) string {
trimmed := strings.TrimSpace(apiKey)
if trimmed == "" {
return ""
}
sum := sha256.Sum256([]byte(trimmed))
return fmt.Sprintf("sha256:%x", sum[:4])
}
func mustMarshalAppJSON(value any, fallback string) string {
payload, err := json.Marshal(value)
if err != nil {
return fallback
}
return string(payload)
}

View File

@@ -0,0 +1,129 @@
package app
import (
"context"
"net/http"
"testing"
)
func TestBatchImportHTTP(t *testing.T) {
t.Parallel()
t.Run("POST create run returns run summary", func(t *testing.T) {
t.Parallel()
handler := NewAPIHandler("secret-token", ActionSet{
CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) {
if req.HostID != "host-1" {
t.Fatalf("HostID = %q, want host-1", req.HostID)
}
if req.AccessMode != "subscription" {
t.Fatalf("AccessMode = %q, want subscription", req.AccessMode)
}
if len(req.SubscriptionUsers) != 1 || req.SubscriptionUsers[0] != "user-1" {
t.Fatalf("SubscriptionUsers = %#v, want [user-1]", req.SubscriptionUsers)
}
if len(req.Entries) != 1 || req.Entries[0].BaseURL != "https://kimi.example.com/v1" {
t.Fatalf("Entries = %#v, want request payload", req.Entries)
}
return BatchImportRunCreateResponse{
RunID: "run_20260522_0001",
State: "running",
ResultPage: "/batch-import/runs/run_20260522_0001",
TotalItems: 1,
ActiveItems: 0,
DegradedItems: 0,
BrokenItems: 0,
WarningItems: 0,
}, nil
},
})
req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{
"host_id": "host-1",
"mode": "strict",
"access_mode": "subscription",
"subscription_users": []string{"user-1"},
"subscription_days": 30,
"entries": []map[string]any{
{"base_url": "https://kimi.example.com/v1", "api_key": "sk-test", "requested_models": []string{"kimi-k2.6"}},
},
}, "secret-token")
res := httptestRecorder(handler, req)
assertStatusCode(t, res, http.StatusOK)
assertJSONContains(t, res.Body().Bytes(), "run_id", "run_20260522_0001")
assertJSONContains(t, res.Body().Bytes(), "result_page", "/batch-import/runs/run_20260522_0001")
})
t.Run("subscription request requires subscription fields", func(t *testing.T) {
t.Parallel()
handler := NewAPIHandler("secret-token", ActionSet{
CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) {
t.Fatal("CreateBatchImportRun should not be called when request is invalid")
return BatchImportRunCreateResponse{}, nil
},
})
req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{
"host_id": "host-1",
"mode": "strict",
"access_mode": "subscription",
"entries": []map[string]any{
{"base_url": "https://kimi.example.com/v1", "api_key": "sk-test"},
},
}, "secret-token")
res := httptestRecorder(handler, req)
assertStatusCode(t, res, http.StatusBadRequest)
assertJSONContains(t, res.Body().Bytes(), "error.code", "invalid_request")
})
t.Run("self service request requires probe api key", func(t *testing.T) {
t.Parallel()
handler := NewAPIHandler("secret-token", ActionSet{
CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) {
t.Fatal("CreateBatchImportRun should not be called when request is invalid")
return BatchImportRunCreateResponse{}, nil
},
})
req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{
"host_id": "host-1",
"mode": "partial",
"access_mode": "self_service",
"entries": []map[string]any{
{"base_url": "https://deepseek.example.com/v1", "api_key": "sk-test"},
},
}, "secret-token")
res := httptestRecorder(handler, req)
assertStatusCode(t, res, http.StatusBadRequest)
assertJSONContains(t, res.Body().Bytes(), "error.code", "invalid_request")
assertJSONContains(t, res.Body().Bytes(), "error.message", "probe_api_key is required when access_mode=self_service")
})
t.Run("create run requires host id", func(t *testing.T) {
t.Parallel()
handler := NewAPIHandler("secret-token", ActionSet{
CreateBatchImportRun: func(_ context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) {
t.Fatal("CreateBatchImportRun should not be called when host_id is missing")
return BatchImportRunCreateResponse{}, nil
},
})
req := httptestRequest(t, http.MethodPost, "/api/batch-import/runs", map[string]any{
"mode": "strict",
"access_mode": "subscription",
"subscription_users": []string{"user-1"},
"subscription_days": 30,
"entries": []map[string]any{
{"base_url": "https://kimi.example.com/v1", "api_key": "sk-test"},
},
}, "secret-token")
res := httptestRecorder(handler, req)
assertStatusCode(t, res, http.StatusBadRequest)
assertJSONContains(t, res.Body().Bytes(), "error.code", "invalid_request")
assertJSONContains(t, res.Body().Bytes(), "error.message", "host_id is required")
})
}

View File

@@ -0,0 +1,205 @@
package app
import (
"context"
"database/sql"
"fmt"
"net/http"
"strings"
"sub2api-cn-relay-manager/internal/batch"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
func handleGetBatchImportRun(w http.ResponseWriter, r *http.Request, fn func(context.Context, string) (batch.RunSummaryProjection, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "get-batch-import-run action is not configured"})
return
}
runID := strings.TrimSpace(r.PathValue("run_id"))
if runID == "" {
writeHTTPError(w, &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "run_id is required"})
return
}
run, err := fn(r.Context(), runID)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
writeJSON(w, http.StatusOK, map[string]any{
"run": run,
"recent_warnings": run.RecentWarnings,
})
}
func handleListBatchImportRunItems(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListBatchImportRunItemsRequest) ([]batch.ItemSummaryProjection, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-batch-import-run-items action is not configured"})
return
}
runID := strings.TrimSpace(r.PathValue("run_id"))
if runID == "" {
writeHTTPError(w, &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "run_id is required"})
return
}
req := ListBatchImportRunItemsRequest{
RunID: runID,
CurrentStage: strings.TrimSpace(r.URL.Query().Get("current_stage")),
ConfirmationStatus: strings.TrimSpace(r.URL.Query().Get("confirmation_status")),
AccessStatus: strings.TrimSpace(r.URL.Query().Get("access_status")),
ProviderID: strings.TrimSpace(r.URL.Query().Get("provider_id")),
MatchedAccountState: strings.TrimSpace(r.URL.Query().Get("matched_account_state")),
AccountResolution: strings.TrimSpace(r.URL.Query().Get("account_resolution")),
Query: strings.TrimSpace(r.URL.Query().Get("q")),
Limit: parsePositiveInt(r.URL.Query().Get("limit")),
}
if hasWarningRaw := strings.TrimSpace(r.URL.Query().Get("has_warning")); hasWarningRaw != "" {
value := strings.EqualFold(hasWarningRaw, "true")
req.HasWarning = &value
}
items, err := fn(r.Context(), req)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
if items == nil {
items = []batch.ItemSummaryProjection{}
}
writeJSON(w, http.StatusOK, map[string]any{"items": items})
}
func handleGetBatchImportRunItem(w http.ResponseWriter, r *http.Request, fn func(context.Context, GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "get-batch-import-run-item action is not configured"})
return
}
req := GetBatchImportRunItemRequest{
RunID: strings.TrimSpace(r.PathValue("run_id")),
ItemID: strings.TrimSpace(r.PathValue("item_id")),
}
if req.RunID == "" || req.ItemID == "" {
writeHTTPError(w, &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "run_id and item_id are required"})
return
}
item, err := fn(r.Context(), req)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
writeJSON(w, http.StatusOK, item)
}
func buildGetBatchImportRunAction(sqliteDSN string) func(context.Context, string) (batch.RunSummaryProjection, error) {
return func(ctx context.Context, runID string) (batch.RunSummaryProjection, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return batch.RunSummaryProjection{}, err
}
defer store.Close()
run, err := store.ImportRuns().GetByRunID(ctx, runID)
if err != nil {
if err == sql.ErrNoRows {
return batch.RunSummaryProjection{}, fmt.Errorf("run not found: %s", runID)
}
return batch.RunSummaryProjection{}, err
}
return batch.ProjectRunSummary(run), nil
}
}
func buildListBatchImportRunItemsAction(sqliteDSN string) func(context.Context, ListBatchImportRunItemsRequest) ([]batch.ItemSummaryProjection, error) {
return func(ctx context.Context, req ListBatchImportRunItemsRequest) ([]batch.ItemSummaryProjection, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return nil, err
}
defer store.Close()
if _, err := store.ImportRuns().GetByRunID(ctx, req.RunID); err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("run not found: %s", req.RunID)
}
return nil, err
}
items, err := store.ImportRunItems().ListByRunID(ctx, req.RunID)
if err != nil {
return nil, err
}
result := make([]batch.ItemSummaryProjection, 0, len(items))
for _, item := range items {
view := batch.ProjectItemSummary(item)
if !matchesItemFilters(view, req) {
continue
}
result = append(result, view)
if req.Limit > 0 && len(result) >= req.Limit {
break
}
}
return result, nil
}
}
func buildGetBatchImportRunItemAction(sqliteDSN string) func(context.Context, GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error) {
return func(ctx context.Context, req GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return batch.ItemDetailProjection{}, err
}
defer store.Close()
item, err := store.ImportRunItems().GetByItemID(ctx, req.ItemID)
if err != nil {
if err == sql.ErrNoRows {
return batch.ItemDetailProjection{}, fmt.Errorf("item not found: %s", req.ItemID)
}
return batch.ItemDetailProjection{}, err
}
if item.RunID != req.RunID {
return batch.ItemDetailProjection{}, fmt.Errorf("item not found in run %s", req.RunID)
}
events, err := store.ImportRunEvents().ListByItemID(ctx, req.ItemID)
if err != nil {
return batch.ItemDetailProjection{}, err
}
return batch.ProjectItemDetail(item, events)
}
}
func matchesItemFilters(view batch.ItemSummaryProjection, req ListBatchImportRunItemsRequest) bool {
if req.CurrentStage != "" && view.CurrentStage != req.CurrentStage {
return false
}
if req.ConfirmationStatus != "" && view.ConfirmationStatus != req.ConfirmationStatus {
return false
}
if req.AccessStatus != "" && view.AccessStatus != req.AccessStatus {
return false
}
if req.ProviderID != "" && view.ProviderID != req.ProviderID {
return false
}
if req.MatchedAccountState != "" && view.MatchedAccountState != req.MatchedAccountState {
return false
}
if req.AccountResolution != "" && view.AccountResolution != req.AccountResolution {
return false
}
if req.HasWarning != nil {
hasWarning := len(view.AdvisoryMessages) > 0
if hasWarning != *req.HasWarning {
return false
}
}
if req.Query != "" {
query := strings.ToLower(req.Query)
if !strings.Contains(strings.ToLower(view.ItemID), query) &&
!strings.Contains(strings.ToLower(view.ProviderID), query) &&
!strings.Contains(strings.ToLower(view.BaseURL), query) {
return false
}
}
return true
}

View File

@@ -0,0 +1,223 @@
package app
import (
"context"
"encoding/json"
"net/http"
"testing"
"sub2api-cn-relay-manager/internal/batch"
"sub2api-cn-relay-manager/internal/probe"
)
func TestBatchRunsHTTP(t *testing.T) {
t.Parallel()
t.Run("GET runs returns projected summaries", func(t *testing.T) {
t.Parallel()
handler := NewAPIHandler("secret-token", ActionSet{
ListBatchImportRuns: func(_ context.Context, req ListBatchImportRunsRequest) ([]batch.RunSummaryProjection, error) {
if req.State != "completed_with_warnings" {
t.Fatalf("State = %q, want completed_with_warnings", req.State)
}
if req.AccessMode != "subscription" {
t.Fatalf("AccessMode = %q, want subscription", req.AccessMode)
}
return []batch.RunSummaryProjection{{
RunID: "run-1",
State: "completed_with_warnings",
Mode: "partial",
AccessMode: "subscription",
TotalItems: 2,
WarningItems: 1,
}}, nil
},
})
req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs?state=completed_with_warnings&access_mode=subscription", nil, "secret-token")
res := httptestRecorder(handler, req)
assertStatusCode(t, res, http.StatusOK)
run := decodeJSONArrayObjectAt(t, res.Body().Bytes(), "runs", 0)
assertJSONObjectValue(t, run, "run_id", "run-1")
assertJSONObjectValue(t, run, "state", "completed_with_warnings")
assertJSONObjectValue(t, run, "access_mode", "subscription")
assertJSONObjectValue(t, run, "warning_items", float64(1))
})
t.Run("GET run detail returns wrapped projection", func(t *testing.T) {
t.Parallel()
handler := NewAPIHandler("secret-token", ActionSet{
GetBatchImportRun: func(_ context.Context, runID string) (batch.RunSummaryProjection, error) {
if runID != "run-1" {
t.Fatalf("runID = %q, want run-1", runID)
}
return batch.RunSummaryProjection{
RunID: "run-1",
State: "completed_with_warnings",
Mode: "partial",
AccessMode: "subscription",
TotalItems: 2,
CompletedItems: 2,
ActiveItems: 1,
DegradedItems: 1,
WarningItems: 1,
RecentWarnings: []string{"warning"},
}, nil
},
})
req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1", nil, "secret-token")
res := httptestRecorder(handler, req)
assertStatusCode(t, res, http.StatusOK)
assertJSONContains(t, res.Body().Bytes(), "run.run_id", "run-1")
assertJSONArrayValueAt(t, res.Body().Bytes(), "recent_warnings", 0, "warning")
})
t.Run("GET items forwards matched account filters", func(t *testing.T) {
t.Parallel()
handler := NewAPIHandler("secret-token", ActionSet{
ListBatchImportRunItems: func(_ context.Context, req ListBatchImportRunItemsRequest) ([]batch.ItemSummaryProjection, error) {
if req.RunID != "run-1" {
t.Fatalf("RunID = %q, want run-1", req.RunID)
}
if req.MatchedAccountState != "active" {
t.Fatalf("MatchedAccountState = %q, want active", req.MatchedAccountState)
}
if req.AccountResolution != "reused" {
t.Fatalf("AccountResolution = %q, want reused", req.AccountResolution)
}
return []batch.ItemSummaryProjection{{
ItemID: "item-1",
BaseURL: "https://kimi.example.com/v1",
ProviderID: "kimi-a7m-1",
APIKeyFingerprint: "sha256:1234",
CurrentStage: "done",
ConfirmationStatus: "advisory",
AccessStatus: "active",
MatchedAccountState: "active",
AccountResolution: "reused",
ProvisionReused: true,
}}, nil
},
})
req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1/items?matched_account_state=active&account_resolution=reused", nil, "secret-token")
res := httptestRecorder(handler, req)
assertStatusCode(t, res, http.StatusOK)
item := decodeJSONArrayObjectAt(t, res.Body().Bytes(), "items", 0)
assertJSONObjectValue(t, item, "item_id", "item-1")
assertJSONObjectValue(t, item, "matched_account_state", "active")
assertJSONObjectValue(t, item, "account_resolution", "reused")
assertJSONObjectValue(t, item, "provision_reused", true)
})
t.Run("GET item detail returns capability profile and events", func(t *testing.T) {
t.Parallel()
handler := NewAPIHandler("secret-token", ActionSet{
GetBatchImportRunItem: func(_ context.Context, req GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error) {
if req.RunID != "run-1" || req.ItemID != "item-1" {
t.Fatalf("request = %#v, want run-1/item-1", req)
}
return batch.ItemDetailProjection{
ItemSummaryProjection: batch.ItemSummaryProjection{
ItemID: "item-1",
BaseURL: "https://kimi.example.com/v1",
ProviderID: "kimi-a7m-1",
APIKeyFingerprint: "sha256:1234",
CanonicalModelFamilies: []string{"kimi-2.6"},
CurrentStage: "done",
ConfirmationStatus: "advisory",
AccessStatus: "active",
MatchedAccountState: "deprecated",
AccountResolution: "reactivated",
ProvisionReused: true,
},
ReusedFromProviderID: "provider-kimi-old",
CapabilityProfile: batchProbeProfile(),
Events: []batch.EventProjection{{
EventID: "evt-1",
EventType: "retry_scheduled",
Stage: "confirm",
Message: "retry queued",
}},
}, nil
},
})
req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1/items/item-1", nil, "secret-token")
res := httptestRecorder(handler, req)
assertStatusCode(t, res, http.StatusOK)
assertJSONContains(t, res.Body().Bytes(), "item_id", "item-1")
assertJSONContains(t, res.Body().Bytes(), "reused_from_provider_id", "provider-kimi-old")
assertJSONContains(t, res.Body().Bytes(), "capability_profile.transport_profile.supports_openai_chat_completions", true)
event := decodeJSONArrayObjectAt(t, res.Body().Bytes(), "events", 0)
assertJSONObjectValue(t, event, "event_type", "retry_scheduled")
assertJSONObjectValue(t, event, "stage", "confirm")
})
}
func batchProbeProfile() probe.CapabilityProfile {
return probe.CapabilityProfile{
TransportProfile: probe.TransportProfile{
SupportsOpenAIChatCompletions: true,
},
}
}
func decodeJSONArrayObjectAt(t *testing.T, payload []byte, key string, index int) map[string]any {
t.Helper()
values, ok := decodeTopLevelArray(t, payload, key)
if !ok {
t.Fatalf("json key %q is not an array; payload=%s", key, string(payload))
}
if index < 0 || index >= len(values) {
t.Fatalf("json key %q length = %d, want index %d present; payload=%s", key, len(values), index, string(payload))
}
object, ok := values[index].(map[string]any)
if !ok {
t.Fatalf("json key %q[%d] is not an object; payload=%s", key, index, string(payload))
}
return object
}
func assertJSONArrayValueAt(t *testing.T, payload []byte, key string, index int, want any) {
t.Helper()
values, ok := decodeTopLevelArray(t, payload, key)
if !ok {
t.Fatalf("json key %q is not an array; payload=%s", key, string(payload))
}
if index < 0 || index >= len(values) {
t.Fatalf("json key %q length = %d, want index %d present; payload=%s", key, len(values), index, string(payload))
}
if got := values[index]; got != want {
t.Fatalf("json key %q[%d] = %#v, want %#v; payload=%s", key, index, got, want, string(payload))
}
}
func assertJSONObjectValue(t *testing.T, object map[string]any, key string, want any) {
t.Helper()
if got := object[key]; got != want {
t.Fatalf("json object key %q = %#v, want %#v", key, got, want)
}
}
func decodeTopLevelArray(t *testing.T, payload []byte, key string) ([]any, bool) {
t.Helper()
var decoded map[string]any
if err := json.Unmarshal(payload, &decoded); err != nil {
t.Fatalf("json.Unmarshal() error = %v; payload=%s", err, string(payload))
}
values, ok := decoded[key].([]any)
return values, ok
}