Files
sub2api-cn-relay-manager/docs/2026-05-21-BATCH_AUTO_IMPORT_TDD_PLAN.md
2026-05-22 14:15:41 +08:00

18 KiB
Raw Permalink Blame History

TDD 实施计划 v2 — Batch Auto-Import

日期2026-05-21 技术架构:docs/2026-05-22-BATCH_AUTO_IMPORT_V2_ARCHITECTURE.md

1. 目标

本计划只服务一件事:把 V2 设计落成可测试、可恢复、可观察的实现路径。

对应目标:

  1. URL + key 自动发现模型
  2. 模型名归一化与推荐纠错
  3. 跨中转同模型快速匹配与复用
  4. provider/model 兼容画像建模
  5. 宿主资源演化与 provider 绑定
  6. 后台异步确认与有限重试
  7. 最终 gateway completion 验证
  8. run/item 状态持久化与结果页可读

2. Canonical Contract

实现前先锁定 canonical contract测试、接口、状态表全部按这一套。

2.1 核心 ID

  • run_id string
  • item_id string
  • provider_id string = {normalized_host}-{url_hash_last8}

2.2 Run 状态

type RunState string

const (
    RunStateRunning               RunState = "running"
    RunStateCompleted             RunState = "completed"
    RunStateCompletedWithWarnings RunState = "completed_with_warnings"
    RunStateFailed                RunState = "failed"
    RunStateCancelled             RunState = "cancelled"
)

2.3 Item 状态

type ItemStage string

const (
    ItemStageProbe     ItemStage = "probe"
    ItemStageProvision ItemStage = "provision"
    ItemStageConfirm   ItemStage = "confirm"
    ItemStageValidate  ItemStage = "validate"
    ItemStageDone      ItemStage = "done"
)

type ConfirmationStatus string

const (
    ConfirmationPending   ConfirmationStatus = "pending"
    ConfirmationConfirmed ConfirmationStatus = "confirmed"
    ConfirmationAdvisory  ConfirmationStatus = "advisory"
    ConfirmationFailed    ConfirmationStatus = "failed"
)

type AccessStatus string

const (
    AccessStatusUnknown  AccessStatus = "unknown"
    AccessStatusActive   AccessStatus = "active"
    AccessStatusDegraded AccessStatus = "degraded"
    AccessStatusBroken   AccessStatus = "broken"
)

2.4 Access Mode 输入

type BatchImportRunRequest struct {
    HostID                string
    Mode                  string
    AccessMode            string
    ConfirmWaitTimeoutSec int
    SubscriptionUsers     []string
    SubscriptionDays      int
    ProbeAPIKey           string
    Entries               []BatchImportEntry
}

type BatchImportEntry struct {
    BaseURL         string
    APIKey          string
    RequestedModels []string
}

校验规则:

  • subscription 必须有 SubscriptionUsers + SubscriptionDays
  • self_service 必须有 ProbeAPIKey
  • RequestedModels 只作提示,不作事实源

3. 实现顺序

必须按以下顺序做:

probe/models + probe/aliases
          ↓
probe/capability + probe/completion
          ↓
batch/provider_id + batch/capability_profile
          ↓
host/channel_patch_contract
          ↓
batch/run_state + batch/run_events
          ↓
batch/service
          ↓
batch/confirmation_worker
          ↓
batch/validation
          ↓
app/http_batch_import + app/http_batch_runs
          ↓
cmd/cli/batch_import
          ↓
tests/integration/batch_import

原则:

  • 先锁死状态契约,再写 worker
  • 先让状态库存得全,再做结果页
  • 先让 Validation Engine 成为 access_status 唯一写入方,再做 projection

4. Stage 1: Probe

4.1 internal/probe/models.go

职责:拉取 /v1/models

type ModelsResult struct {
    RawModels  []string
    HTTPStatus int
    LatencyMs  int64
    Error      string
}

func ProviderModels(ctx context.Context, baseURL, apiKey string) (*ModelsResult, error)

单测:

func TestProviderModels_OpenAIFormat_ReturnsModelList(t *testing.T)
func TestProviderModels_EmptyData_ReturnsEmptySlice(t *testing.T)
func TestProviderModels_AuthFailed_ReturnsErrAuthFailed(t *testing.T)
func TestProviderModels_Timeout_ReturnsErrUpstreamUnreachable(t *testing.T)

4.2 internal/probe/aliases.go

职责:模型归一化、别名、推荐纠错。

type AliasResult struct {
    Raw        string
    Normalized string
    Canonical  string
}

func NormalizeModelID(raw string) string
func CanonicalModelID(raw string) string
func CanonicalModelFamily(raw string) string
func BuildAliasTable(rawModels []string) map[string]AliasResult
func ResolveRequestedModel(requested string, rawModels []string) (resolved string, ok bool)
func RecommendModels(requested []string, rawModels []string) []string

单测:

func TestNormalizeModelID_MinimaxCanonical(t *testing.T)
func TestNormalizeModelID_DeepSeekVendorPrefix(t *testing.T)
func TestCanonicalModelFamily_KimiVariantsCollapseToSameFamily(t *testing.T)
func TestResolveRequestedModel_UsesNormalizedAlias(t *testing.T)
func TestRecommendModels_ReturnsCanonicalCandidates(t *testing.T)

4.3 internal/probe/capability.go

职责:生成 transport profile + model profiles。

type TransportProfile struct {
    SupportsOpenAIModels          bool
    SupportsOpenAIChatCompletions bool
    SupportsOpenAIResponses       bool
    SupportsAnthropicMessages     bool
    AuthStyle                     string
    ModelIDStyle                  string
    KnownAdvisories               []string
}

type ModelCapabilityProfile struct {
    RawModelID              string
    NormalizedModelID       string
    SupportsStream          string
    SupportsTools           string
    SupportsReasoningFields string
    SmokeChatOK             bool
}

type CapabilityProfile struct {
    Transport    TransportProfile
    ModelProfile []ModelCapabilityProfile
}

func ProbeCapabilities(ctx context.Context, baseURL, apiKey string, rawModels []string) (*CapabilityProfile, error)

单测:

func TestProbeCapabilities_Responses403Chat200_MarksResponsesUnsupported(t *testing.T)
func TestProbeCapabilities_ModelProfilesCapturedPerModel(t *testing.T)
func TestProbeCapabilities_RecordsKnownAdvisories(t *testing.T)

4.4 internal/probe/completion.go

职责:决定 smoke model并做最小 completion。

type CompletionResult struct {
    Model          string
    HTTPStatus     int
    LatencyMs      int64
    Classification string
    Error          string
}

func ResolveSmokeModel(requested []string, rawModels []string, profile *CapabilityProfile) (string, []string, error)
func SmokeCompletion(ctx context.Context, baseURL, apiKey, model string, profile *CapabilityProfile) (*CompletionResult, error)

单测:

func TestResolveSmokeModel_UsesRequestedAliasWhenMatched(t *testing.T)
func TestResolveSmokeModel_FallsBackToDiscoveredModel(t *testing.T)
func TestSmokeCompletion_ResponsesUnsupported_UsesChatCompletions(t *testing.T)

5. Stage 2: Provision & Channel Evolution

5.1 internal/batch/provider_id.go

func NormalizeProviderID(baseURL string) string

规则:

  • 规范化 host
  • 基于完整 URL 做 hash
  • 同 host 不同 path 必须不同 ID

单测:

func TestNormalizeProviderID_Basic(t *testing.T)
func TestNormalizeProviderID_WithPath_IncludesPathHash(t *testing.T)
func TestNormalizeProviderID_DifferentPaths_DifferentIDs(t *testing.T)

5.2 internal/batch/capability_profile.go

职责:把 capability profile 转成导入/确认策略。

type ImportRoutingStrategy struct {
    UseRawChatCompletions bool
    SkipResponsesChecks   bool
    RetryInitial503       bool
    TreatProbe403Advisory bool
}

func BuildImportRoutingStrategy(profile *probe.CapabilityProfile) ImportRoutingStrategy

单测:

func TestBuildImportRoutingStrategy_ResponsesUnsupported_UsesRawChat(t *testing.T)
func TestBuildImportRoutingStrategy_ProbeRaceAdvisory_EnablesProbe403Advisory(t *testing.T)

5.3 internal/batch/channel_evolution.go

职责:构造完整 channel patch contract。

type ChannelPatchContract struct {
    ModelMapping       map[string]string
    ModelPricing       map[string]any
    RestrictModels     bool
    BillingModelSource string
}

func ModelMappingDelta(existing map[string]string, discoveredAliases map[string]probe.AliasResult) ChannelPatchContract

单测:

func TestModelMappingDelta_PreservesExistingEntries(t *testing.T)
func TestModelMappingDelta_AddsRawToCanonicalMappings(t *testing.T)
func TestModelMappingDelta_SetsRestrictModelsAndBillingSource(t *testing.T)

5.4 internal/batch/reuse_policy.go

职责:判断已存在 provider/account 是否可直接复用。

type ReuseDecision struct {
    ReuseProvision       bool
    PatchOnly            bool
    ReplaceAccount       bool
    ReactivateAccount    bool
    MatchedAccountState  string
    AccountResolution    string
    ReusedFromProviderID string
    ReusedFromAccountID  *int64
}

func DecideReuse(existing ExistingProviderSnapshot, incoming IncomingProviderSnapshot) ReuseDecision

判断依据:

  • host_id + provider_id
  • base_url + api_key_fingerprint
  • canonical_model_families
  • 现有 access_status
  • 现有 key/account 健康状态

单测:

func TestDecideReuse_FullyCoveredAndActive_ReusesProvision(t *testing.T)
func TestDecideReuse_MissingFamilies_PatchOnly(t *testing.T)
func TestDecideReuse_BrokenProvider_RequestsReplacement(t *testing.T)
func TestDecideReuse_SameFamilyDifferentAlias_TreatedAsCovered(t *testing.T)
func TestDecideReuse_ExistingActiveAccount_MarksDuplicateReused(t *testing.T)
func TestDecideReuse_DisabledAccount_RequestsReactivation(t *testing.T)

6. Stage 3: State Store

6.1 internal/batch/run_state.go

V2 canonical runtime store

  • import_runs
  • import_run_items
  • import_run_item_events
type ImportRunState struct {
    RunID          string
    Mode           string
    AccessMode     string
    State          RunState
    TotalItems     int
    CompletedItems int
    ActiveItems    int
    DegradedItems  int
    BrokenItems    int
    WarningItems   int
    StartedAt      time.Time
    UpdatedAt      time.Time
    FinishedAt     *time.Time
}

type ImportRunItemState struct {
    RunID                string
    ItemID               string
    BaseURL              string
    ProviderID           string
    APIKeyFingerprint    string
    CurrentStage         ItemStage
    ConfirmationStatus   ConfirmationStatus
    AccessStatus         AccessStatus
    MatchedAccountState  string
    AccountResolution    string
    RequestedModels      []string
    RawModels            []string
    NormalizedModels     []string
    CanonicalModelFamilies []string
    ResolvedSmokeModel   *string
    RecommendedModels    []string
    CapabilityProfileJSON string
    ChannelID            *int64
    AccountID            *int64
    RetryCount           int
    ConfirmationAttempts int
    LastRetryAt          *time.Time
    NextRetryAt          *time.Time
    LeaseOwner           *string
    LeaseUntil           *time.Time
    AdvisoryMessages     []string
    LastErrorStage       *string
    LastError            *string
    LegacyBatchID        *int64
    LegacyProviderID     *string
    ProvisionReused      bool
    ReusedFromProviderID *string
    ReusedFromAccountID  *int64
    CreatedAt            time.Time
    UpdatedAt            time.Time
}

type ImportRunItemEvent struct {
    EventID      string
    RunID        string
    ItemID       string
    EventType    string
    Stage        string
    Attempt      int
    Message      string
    PayloadJSON  string
    CreatedAt    time.Time
}

单测:

func TestRunStateStore_CreateAndUpdateRun(t *testing.T)
func TestRunStateStore_UpsertItemStoresProjectionFields(t *testing.T)
func TestRunStateStore_EventTrailCanBeQueried(t *testing.T)
func TestRunStateStore_LeaseFieldsPersist(t *testing.T)
func TestRunStateStore_AccountReuseFieldsPersist(t *testing.T)

7. Stage 4: Batch Service

7.1 internal/batch/service.go

type BatchImportService struct {
    Host       hostadapter.HostAdapter
    Probe      *probe.Client
    Provision  *provision.ImportService
    StateStore RunStateStore
    Queue      ConfirmationQueue
}

func (s *BatchImportService) StartRun(ctx context.Context, req BatchImportRunRequest) (*BatchImportRunResult, error)

职责:

  • 创建 run + item
  • 先执行 reuse preflight决定是复用、patch 还是 replace
  • 先落 probe/provision 结果
  • 入队 confirm不在请求线程里承担全部确认责任
  • CLI/HTTP 只负责“发起”和“可选等待窗口”

单测:

func TestBatchImport_StartRun_PersistsInitialState(t *testing.T)
func TestBatchImport_RequestedModelMiss_UsesDiscoveredModel(t *testing.T)
func TestBatchImport_ProvisionWritesLegacyLinks(t *testing.T)
func TestBatchImport_ExistingActiveProviderAndCoveredFamilies_ReusesProvision(t *testing.T)

8. Stage 5: Confirmation Worker

8.1 internal/batch/confirmation.go

type ConfirmationWorker struct {
    Host       hostadapter.HostAdapter
    StateStore RunStateStore
    Validate   ValidationService
    Clock      Clock
    WorkerID   string
}

func (w *ConfirmationWorker) Tick(ctx context.Context, now time.Time) error
func (w *ConfirmationWorker) ConfirmItem(ctx context.Context, item ImportRunItemState) (*ImportRunItemState, error)

行为:

  • 轮询 current_stage=confirmnext_retry_at<=now 的 item
  • 获取 lease
  • 执行 account models / account test / transient 503 absorb
  • confirmation_status = confirmed | advisory | failed
  • confirm 完毕后推进到 validate

约束:

  • 首次 403 probe race/models 已正确且 profile 说明 responses 不支持,则标记 advisory
  • confirmation_status 不是最终可用性

单测:

func TestConfirmationWorker_Probe403Race_ReturnsAdvisory(t *testing.T)
func TestConfirmationWorker_UsesLeaseAndNextRetryAt(t *testing.T)
func TestConfirmationWorker_RestartCanResumeUnlockedItem(t *testing.T)

9. Stage 6: Validation Engine

9.1 internal/batch/validation.go

type ValidationService struct {
    Host hostadapter.HostAdapter
}

func (s *ValidationService) ValidateAccess(ctx context.Context, item ImportRunItemState, req BatchImportRunRequest) (AccessStatus, []string, error)

规则:

  • 只有这里能最终写 access_status
  • confirmed + gateway chat 200active
  • advisory + gateway chat 200active
  • gateway chat transient but exhausteddegraded
  • gateway chat definitively failedbroken

单测:

func TestValidationService_GatewayChat200_ReturnsActive(t *testing.T)
func TestValidationService_Transient503Exhausted_ReturnsDegraded(t *testing.T)
func TestValidationService_FinalFailure_ReturnsBroken(t *testing.T)

10. Stage 7: HTTP API & Result Pages

10.1 internal/app/http_batch_import.go

func (a *App) createBatchImportRun(w http.ResponseWriter, r *http.Request)
func (a *App) listBatchImportRuns(w http.ResponseWriter, r *http.Request)
func (a *App) getBatchImportRun(w http.ResponseWriter, r *http.Request)
func (a *App) listBatchImportRunItems(w http.ResponseWriter, r *http.Request)
func (a *App) getBatchImportRunItem(w http.ResponseWriter, r *http.Request)

要求:

  • 直接返回 projection不让页面自己拼状态
  • 列表页筛选使用 run.state
  • item 详情必须返回 event trail

单测:

func TestCreateBatchImportRun_ValidatesAccessModeInputs(t *testing.T)
func TestListBatchImportRuns_ReturnsCanonicalState(t *testing.T)
func TestGetBatchImportRunItem_ReturnsEventTrailAndRecommendedModels(t *testing.T)

10.2 internal/app/http_batch_runs.go

页面:

  • /batch-import/runs
  • /batch-import/runs/{run_id}

单测:

func TestBatchImportRunsPage_RendersCanonicalBadges(t *testing.T)
func TestBatchImportRunDetailPage_RendersCapabilitySummary(t *testing.T)

11. Stage 8: CLI

11.1 cmd/cli/batch_import.go

go run ./cmd/cli batch-import \
  --host-id string \
  --entry "url,key" \
  --batch-file string \
  --mode "strict|partial" \
  --access-mode "subscription|self_service" \
  --subscription-users "u1,u2" \
  --subscription-days 30 \
  --probe-api-key string \
  --confirm-wait-timeout 15s

CLI 集成测试:

func TestBatchImportCLI_ReportsRunIDAndResultPage(t *testing.T)
func TestBatchImportCLI_ReportsResolvedAndRecommendedModels(t *testing.T)
func TestBatchImportCLI_ReportsConfirmationAndAccessStatus(t *testing.T)

12. Integration Tests

tests/integration/batch_import_test.go

覆盖场景:

  1. 标准 OpenAI-compatible 上游成功导入
  2. 人工模型名错误alias 自动纠正
  3. /responses=403/chat/completions=200
  4. 首次 /accounts/:id/test=403,稍后转 advisory
  5. 首次 /v1/chat/completions=503 no available accounts,重试后 200
  6. capability profile 按模型粒度输出
  7. 导入进行中可查询 run/item 状态
  8. 控制面重启后 worker 能继续拾取 unfinished item
func TestBatchImport_FullPipeline(t *testing.T)
func TestBatchImport_RequestedModelTypo_IsAutoCorrected(t *testing.T)
func TestBatchImport_ThirdPartyResponsesUnsupported_StillSucceeds(t *testing.T)
func TestBatchImport_ProbeRace_BecomesAdvisory(t *testing.T)
func TestBatchImport_Initial503Warmup_RetrySucceeds(t *testing.T)
func TestBatchImport_RunStatusIsQueryableDuringExecution(t *testing.T)
func TestBatchImport_RunResultSurvivesRestartAndResumes(t *testing.T)

13. Required OpenAPI sync

实现时必须同步:

  • POST /api/batch-import/runs
  • GET /api/batch-import/runs
  • GET /api/batch-import/runs/{run_id}
  • GET /api/batch-import/runs/{run_id}/items
  • GET /api/batch-import/runs/{run_id}/items/{item_id}

并将 /api/import-batches/* 标注为 v1/legacy。

14. Acceptance commands

go test ./internal/probe/... -v -count=1
go test ./internal/batch/... -v -count=1
go test ./internal/app/... -v -count=1
go test ./internal/host/sub2api/... -v -count=1
go test ./tests/integration/... -count=1
go test -cover ./internal/... -count=1
go vet ./...
gofmt -l .

15. Task checklist

  • internal/probe/models.go
  • internal/probe/aliases.go
  • internal/probe/capability.go
  • internal/probe/completion.go
  • internal/batch/provider_id.go
  • internal/batch/capability_profile.go
  • internal/batch/channel_evolution.go
  • internal/batch/run_state.go
  • internal/batch/service.go
  • internal/batch/confirmation.go
  • internal/batch/validation.go
  • internal/host/sub2api/channel.go
  • internal/host/sub2api/accounts.go
  • internal/app/http_batch_import.go
  • internal/app/http_batch_runs.go
  • cmd/cli/batch_import.go
  • tests/integration/batch_import_test.go
  • docs/openapi.yaml