docs(v2): reconcile batch import design contracts

This commit is contained in:
phamnazage-jpg
2026-05-22 13:38:56 +08:00
parent afce3da3df
commit 485b6e7a6b
5 changed files with 1519 additions and 1326 deletions

View File

@@ -5,554 +5,461 @@
## 1. Objective
让管理员只提供一批 `(base_url, api_key)` 对,就能自动完成
V2 的目标不是“又一条导入命令”,而是把这件事做成**稳定、可恢复、可追踪**的控制面能力
1. **上游发现** — 调用 `GET {base_url}/v1/models` 与最小 smoke 请求,动态获取该 key 真正支持的模型列表
2. **名称纠错** — 自动把“人工填错的模型名”与上游真实返回做比对、归一化、纠偏
3. **能力画像**记录个上游/模型对 OpenAI/Anthropic 兼容能力、Responses 支持、stream/tool 调用等差异
4. **宿主演化** — 将发现结果与宿主 channel / account 配置对比,自动扩展 `model_mapping`
5. **异步确认** — 对“建账号成功但宿主异步 probe / 调度尚未稳定”的场景做延迟确认,不把瞬时失败立即记成最终失败
6. **中转闭环验证** — 用托管 key 跑真实 `/v1/chat/completions` 验证,确认最终 `active/degraded/broken`
7. **状态可观测** — 持久化每个 run、item、模型、账号、provider 的阶段结果,并提供页面查看导入状态
1. **上游发现**:基于 `(base_url, api_key)` 自动发现模型,而不是默认信任人工输入
2. **模型纠错**:自动归一化、别名匹配、推荐正确模型名
3. **兼容画像**记录个上游和每个模型的兼容能力,避免重复踩坑
4. **宿主演化**:自动创建/更新 channelaccount、provider binding
5. **异步确认**:吸收宿主异步 probe、首次 `403/503` 的预热窗口
6. **闭环验证**:以宿主网关真实 `/v1/chat/completions` 结果作为最终可用性判断
7. **结果可视**:提供 run 列表、run 详情、item 详情,而不是只靠日志和 artifact
目标不是“绝对零人工”,而是把人工输入压缩到最小,并把容易写错、容易误判的部分交给系统自动确认。
## 2. Scope
## 2. 为什么现在需要这个
V2 在现有 v1 pack-based 路径旁边新增一条**URL + key auto-import** 路径。
当前 v1 依赖预定义 provider manifest`packs/openai-cn-pack/providers/*.json`),每个 provider 必须手动写好 `base_url / default_models / smoke_test_model / channel_template`。这带来三个问题:
### In Scope
- **新 key 无法即插即用**:每次接一个陌生 provider URL都得先查文档再写 manifest
- **模型列表人工维护**provider 上游升级模型pack 里不会自动同步
- **调试链路长**:假设备注 manifest → 导入 → 发现 channel 缺少模型 → 手动补 → 重新导入
- **模型名容易写错**:例如 `minimax-m27-highspeed``MiniMax-M2.7-highspeed`,人工输入极易出错
- **国产模型兼容差异大**很多“OpenAI-compatible”只兼容 `/chat/completions`,不兼容 `/responses``tools``stream_options`
- **宿主存在异步窗口**账号创建、Responses probe、调度预热、账号可选状态更新并非原子完成一次即时检查容易得到假阴性
- **长任务稳定性不足**:批量导入跨多个阶段,若没有状态持久化、重试边界和结果投影,失败后很难判断卡在哪一步
- **结果不可视**:当前主要靠 CLI、日志和 artifact 复盘,缺少专门页面查看导入状态和账号/模型明细
- 批量输入 `(base_url, api_key, requested_models?)`
- 兼容 `subscription``self_service`
- 运行态状态持久化
- 后台异步确认与有限重试
- 最小结果 API 与结果页
- 模型纠错与 capability profile 持久化
v2 需要把“探测 → 配置 → 注册 → 异步确认 → 验证”压缩成**一键闭环**。
### Out of Scope
## 3. 核心用户故事
- 多 key 自动负载均衡
- 宿主数据库直连
- 自动价格发现/自动调价
- 实时 WebSocket 推送
- 复杂工作台前端
> 作为管理员,我有了一批新的中转 keyURL + token我想在已经运行的宿主上快速开通这些模型。理想情况是我把这批 key 列出来,系统自动探测每个 key 支持什么模型、自动纠正模型名、自动识别兼容能力、自动配置宿主 channel、自动注册为可控 provider、自动异步确认账号和闭环状态并在控制面页面里直接告诉我哪些真正可用、哪些只是暂时不稳定、哪些需要特定兼容策略。
## 3. Canonical Contract
## 4. 技术方案
V2 从这里开始只认一套 canonical contract后续所有文档、API、页面、状态库都必须遵循这套命名。
### 4.1 四阶段管道 + 运行态持久化
### 3.1 ID 规则
```
输入: [(base_url, api_key), ...]
- `run_id`:一次批量导入任务 ID字符串
- `item_id`run 内单条导入记录 ID字符串
- `provider_id``{normalized_host}-{url_hash_last8}`
Stage 0: Run Setup ──────────────────────────────────────────────
create import_run
→ persist operator input / retry policy / timestamps
→ assign run_id and item_ids
`provider_id` 生成规则:
Stage 1: Probe ─────────────────────────────────────────────────
for each (url, key):
upstream_models = GET {url}/v1/models
→ extract model list
upstream_capabilities = probe endpoint compatibility
→ /models | /chat/completions | /responses | /messages
upstream_completion = POST {url}/v1/chat/completions (smoke)
→ HTTP status, latency, error_type, usable_model
classify: models_ok | models_fail | completion_fail | unreachable
normalize model ids and select smoke model automatically
Stage 2: Provision ──────────────────────────────────────────────
for each (url, key) where upstream_models != models_fail:
host_channel = find_or_create_channel(provider_id, url, capability_profile)
missing_models = normalized_models - host_channel.model_mapping.keys
if missing_models:
patch_channel(host_channel, add model_mapping entries)
managed_account = create_or_update_account(url, key, normalized_models)
register_provider_binding(provider_id, url, key, normalized_models, capability_profile)
Stage 3: Async Confirm ──────────────────────────────────────────
for each registered account:
async account confirm:
re-check account models
re-check account test (after host async probe settles)
re-check temporary 503/no available accounts windows
→ write confirmation_status: pending | confirmed | warning | failed
Stage 4: Validate ───────────────────────────────────────────────
for each confirmed account:
final_completion = POST host_gw/v1/chat/completions
via managed_account key
→ write access_status: active | broken | degraded
persist final run summary and UI-facing status projections
output: per-url status + summary
输出: BatchImportResult {
run_id: string
total: int
active: int
broken: int
degraded: int
details: [{url, normalized_models, capability_profile, confirmation_status, access_status, error}]
}
```
### 4.1.1 为什么必须引入异步确认
真实验收已经证明,“账号创建完成”不等于“立即可验证成功”:
1. 宿主对第三方 OpenAI 兼容上游的 `/responses` 能力探测是异步落库的
2. 账号刚创建后,第一次 `/accounts/:id/test` 可能仍走旧路径,返回临时 `403 Forbidden`
3. channel / group / subscription 已经写好后,第一次 `/v1/chat/completions` 也可能短暂命中 `503 no available accounts`
4. 几百毫秒到几秒后,同一条链路又会恢复为 `200`
因此 v2 不能继续用“创建后立刻同步 test 一次”的策略直接定生死。必须区分:
- **提交成功**
- **异步确认中**
- **最终确认成功/失败**
### 4.1.2 状态机
每个导入条目应至少具备以下状态机:
```
discovered
→ provisioned
→ confirming
→ confirmed_active
→ confirmed_warning
→ confirmed_broken
```
其中:
- `provisioned`:宿主资源已创建,但不能对外宣称 ready
- `confirming`:正在等待宿主异步 probe / account warm-up / gateway 调度稳定
- `confirmed_warning`:链路可用,但有 advisory 风险,例如 probe 403 race、兼容能力受限
- `confirmed_broken`:经过重试与延迟确认后仍不可用
每个状态转换都必须持久化,不能只留在内存中。控制面至少要能恢复:
- 当前 run 进行到哪个阶段
- 哪些 item 已完成
- 哪些 item 仍在 confirming
- 哪些 item 因 transient 错误进入下一次 retry
### 4.2 关键设计决策
#### Q1: 如何从 `/v1/models` 提取并纠正模型列表?
OpenAI-compatible 上游返回格式为:
```json
{
"data": [{"id": "gpt-4", "object": "model", ...}, ...]
}
```
提取策略:
-`data[].id` 作为上游原始模型名
- 保留 `raw_model_id`
- 同时生成 `normalized_model_id`
- 默认不过滤“看起来像 GPT”的名字而是把原始值完整记录下来再根据 provider host / capability profile 判断是否属于目标模型
归一化规则至少覆盖:
- 大小写归一
- 连字符 / 点号差异
- `vendor/model` 前缀剥离
- 常见别名映射
1. 取完整 `base_url` 规范化后参与计算,不能只取 host
2. `normalized_host` 用于可读性
3. `url_hash_last8` 用于区分同 host 不同 path
示例:
| raw | normalized |
|---|---|
| `MiniMax-M2.7-highspeed` | `minimax-m2.7-highspeed` |
| `minimax-m27-highspeed` | `minimax-m27-highspeed` |
| `deepseek-ai/DeepSeek-V4-Pro` | `deepseek-v4-pro` |
| `Kimi-K2.6` | `kimi-k2.6` |
- `https://api.deepseek.com/v1``api-deepseek-9f31c2ab`
- `https://api.deepseek.com/proxy/v1``api-deepseek-4a2d88f1`
系统不应再默认信任人工填入的模型名,而应优先信任 key 实探结果。
### 3.2 Run 级状态
#### Q2: 如何把上游模型写入宿主 channel
`run.state` 固定为:
宿主 channel 有两个相关字段:
- `model_mapping: map[string]string``{upstream_model: gateway_model}`
- `restrict_models: bool` — true 时 gateway 只路由 mapping 内的模型
- `running`
- `completed`
- `completed_with_warnings`
- `failed`
- `cancelled`
策略
- channel 中同时保留:
- `raw_model_id`
- `normalized_model_id`
- 最终对外 gateway model 名
- 默认行为是:
- `gateway_model = normalized_model_id`
- `upstream_model = raw_model_id`
- 若宿主侧必须保持原名路由,则至少要把 alias 关系落到 profile后续导入与对账都按 normalized 视角比较
说明
- `model_pricing` 填默认值(`price_per_1m=0`, `max_batch=0`),不阻塞导入
- 如果 channel 不存在,创建新 channel`name = host_registered_{provider_id}`
- `completed_with_warnings` 是 run 级总状态
- 页面可以显示成黄色 badge `warning`
- 但 API/状态库里一律写全量枚举值 `completed_with_warnings`
#### Q3: Provider ID 如何生成?
### 3.3 Item 级状态
自动生成规则
-`base_url` 的 host 部分,规范化(去掉 `https://`、去除尾部 `/`
- 去除常见后缀(`.com``.cn`
- 转小写 + 中划线连接
- 示例:`https://api.deepseek.com``api-deepseek`
`item.current_stage` 固定为
这样同一 URL 的多次导入会命中同一个 provider_id实现增量更新。
- `probe`
- `provision`
- `confirm`
- `validate`
- `done`
#### Q4: 如何避免重复 key 覆盖已有配置?
`item.confirmation_status` 固定为:
导入前执行 reconcile
- 如果 `base_url + key` 对应的 account 已存在,且 `upstream_models` 与已有 account 的 `credentials.model_mapping` 一致 → 跳过
- 如果 account 存在但模型列表变长了 → patch channel 扩展 model_mapping
- 如果 account 存在但 key 已失效 → 标记为 `broken`,新建 account
- `pending`
- `confirmed`
- `advisory`
- `failed`
#### Q5: 验证 key 失效 vs 上游断连如何区分?
`item.access_status` 固定为:
Stage 1 的 smoke test 需要区分错误类型:
- `401/403 unauthorized` → key 无效
- `429 rate_limit` → key 有额度但被限流 → 记录,不阻塞
- `502/503/connection_error` → 上游不可达 → 降级处理
- `200 + valid response` → key 可用
- `unknown`
- `active`
- `degraded`
- `broken`
Stage 3 的 host relay smoke 测试结果才决定最终 `access_status`
约束:
#### Q6: 如何记录兼容能力,避免每次重新踩坑?
- `confirmation_status` 只描述“宿主异步窗口是否已确认稳定”
- `access_status` 只描述“最终网关真实可用性”
- `Validation Engine``access_status` 的唯一写入方
v2 必须引入 `capability_profile` 概念。至少记录:
### 3.4 Legacy 兼容规则
V1 的 `import_batches` / `import_batch_items` / `managed_resources` 继续保留,但在 V2 中:
- 仅作为 legacy execution evidence 或资源关联来源
- 不再作为结果页主数据源
- V2 结果页/API 只读 `import_runs` / `import_run_items` / `import_run_item_events`
## 4. Request / Result Contract
### 4.1 Batch Import Request
```text
BatchImportRunRequest
- host_id: string
- mode: "strict" | "partial"
- access_mode: "subscription" | "self_service"
- confirm_wait_timeout_sec: int # CLI/HTTP 可选等待时间
- entries: []BatchImportEntry
- subscription_users: []string # access_mode=subscription 必填
- subscription_days: int # access_mode=subscription 必填
- probe_api_key: string # access_mode=self_service 必填
BatchImportEntry
- base_url: string
- api_key: string
- requested_models: []string # 可选,仅作为提示
```
### 4.2 Access Mode 必填规则
`subscription`
- 必填:`subscription_users`
- 必填:`subscription_days`
- 不接受只写 `access_mode=subscription` 但不带订阅目标
`self_service`
- 必填:`probe_api_key`
- `probe_api_key` 用于最终 gateway access validation
### 4.3 Batch Import Result
```text
BatchImportRunResult
- run_id: string
- state: string
- total_items: int
- active_items: int
- degraded_items: int
- broken_items: int
- warning_items: int
- result_page: string
```
### 4.4 Item Projection
```text
BatchImportRunItemView
- item_id: string
- base_url: string
- provider_id: string
- requested_models: []string
- raw_models: []string
- normalized_models: []string
- resolved_smoke_model: string | null
- recommended_models: []string
- current_stage: string
- confirmation_status: string
- access_status: string
- retry_count: int
- last_retry_at: string | null
- advisory_messages: []string
- last_error_stage: string | null
- last_error: string | null
- channel_id: int64 | null
- account_id: int64 | null
- capability_profile: object
```
## 5. Core Pipeline
### 5.1 Five-stage pipeline
```text
Stage 0: Run Setup
create import_run + import_run_items
persist operator input
Stage 1: Probe
/v1/models
capability probe
completion smoke
normalize aliases
Stage 2: Provision
find/create channel
patch model_mapping + model_pricing + restrict_models + billing_model_source
create/update account
persist managed resource link
Stage 3: Confirm
background confirmer absorbs async probe race / warmup window
writes confirmation_status
Stage 4: Validate
host gateway real /v1/chat/completions
writes final access_status
Stage 5: Project
update run summary
serve result API / pages
```
### 5.2 Ownership boundaries
- `Probe Layer` 负责发现和分类,不决定最终 `access_status`
- `Provision Adapter` 负责创建/更新宿主资源
- `Confirmation Engine` 负责把瞬时 `403/503` 吸收到 `pending/advisory/failed`
- `Validation Engine` 负责最终 `access_status`
- `Result Projection` 负责把状态库转换成页面/API 视图
## 6. Capability Profile
### 6.1 为什么要分两层
真实场景里兼容能力不是“一个 key 一个总画像”就能表达清楚的。必须拆成:
1. **transport profile**:这个 upstream 支不支持 `/models``/chat/completions``/responses``/messages`
2. **model profiles**:这个 upstream 下的具体模型,在 stream/tools/reasoning 字段上是否可用
### 6.2 Canonical schema
```json
{
"supports_openai_models": true,
"supports_openai_chat_completions": true,
"supports_openai_responses": false,
"supports_anthropic_messages": false,
"supports_stream": true,
"supports_tools": "unknown",
"supports_reasoning_fields": "unknown",
"auth_style": "bearer",
"model_id_style": "vendor_prefixed | canonical | mixed",
"known_advisories": [
"responses_403_third_party",
"initial_account_probe_race",
"gateway_no_available_accounts_warmup"
"transport_profile": {
"supports_openai_models": true,
"supports_openai_chat_completions": true,
"supports_openai_responses": false,
"supports_anthropic_messages": false,
"auth_style": "bearer",
"model_id_style": "vendor_prefixed",
"known_advisories": [
"responses_unsupported_but_chat_ok",
"initial_probe_race_expected"
]
},
"model_profiles": [
{
"raw_model_id": "deepseek-ai/DeepSeek-V4-Pro",
"normalized_model_id": "deepseek-v4-pro",
"supports_stream": true,
"supports_tools": "unknown",
"supports_reasoning_fields": "unknown",
"smoke_chat_ok": true
}
]
}
```
这个 profile 的用途不是“好看”,而是后续快速匹配策略:
### 6.3 用途
- 哪些 provider 需要跳过 `/responses`
- 哪些 provider 要优先走 raw `/chat/completions`
- 哪些 provider 要启用 completion retry
- 哪些 provider 的模型名要先归一化再对比
- 哪些 provider 需要 Anthropic 兼容入口
- 决定是否跳过 `/responses`
- 决定是否直接走 raw `/chat/completions`
- 决定 warning 文案
- 决定推荐 smoke model
- 决定后续快速匹配“哪个模型在哪种兼容层下靠谱”
### 4.3 数据流
## 7. Channel / Account Evolution Contract
```
BatchImportRequest
├── base_url: string
├── api_key: string
├── access_mode: "subscription" | "self_service" (可选,默认 subscription)
└── requested_models: []string (可选,作为提示而不是信任源)
BatchImportResult
├── batch_id: string
├── total: int
├── active: int
├── broken: int
├── degraded: int
└── results: []ImportItemResult
ImportItemResult
├── base_url: string
├── provider_id: string (自动生成)
├── upstream_models: []string (Stage 1 发现的原始模型)
├── normalized_models: []string (归一化后的模型)
├── resolved_smoke_model: string
├── capability_profile: object
├── channel_id: int64 (Stage 2 创建/更新)
├── account_id: int64 (Stage 2 创建/更新)
├── probe_ok: bool (Stage 3 account test 最终结果)
├── confirmation_status: string
├── access_status: string (Stage 3 最终)
├── stage_status: string (discovered | provisioned | confirming | confirmed_*)
├── advisory_messages: []string
├── retry_count: int
├── last_error_stage: string | null
└── error: string | null
```
新增运行态持久化对象:
V2 不再使用“薄 patch 接口”表达 channel 更新。宿主 patch 必须以完整 contract 表达:
```text
ImportRun
- run_id
- mode
- access_mode
- total_items
- completed_items
- active_items
- degraded_items
- broken_items
- state (running | completed | completed_with_warnings | failed | cancelled)
- started_at
- updated_at
- finished_at
ImportRunItem
- run_id
- item_id
- base_url
- provider_id
- current_stage
- stage_status
- requested_models
- normalized_models
- resolved_smoke_model
- channel_id
- account_id
- confirmation_status
- access_status
- retry_count
- advisory_messages
- last_error_stage
- last_error
ChannelPatchContract
- model_mapping: map[string]string
- model_pricing: map[string]PriceSpec
- restrict_models: true
- billing_model_source: "channel_mapped"
```
### 4.4 CLI 接口
约束:
```bash
# 单条
go run ./cmd/cli batch-import \
--host-base-url http://localhost:18097 \
--host-api-key <admin-key> \
--entry "https://api.deepseek.com,<deepseek-key>" \
--access-mode subscription
- `model_mapping` 同时记录 raw → canonical
- `model_pricing` 默认可填零值,但字段必须完整存在
- patch 不得破坏旧模型
- `PatchChannel(addModels []string)` 这类接口不再作为 V2 canonical contract
# 批量(文件,每行 url,key
go run ./cmd/cli batch-import \
--host-base-url http://localhost:18097 \
--host-api-key <admin-key> \
--batch-file ./keys.csv \
--access-mode subscription
## 8. Async Confirmation Mechanism
# 批量stdin
cat keys.txt | xargs -I{} go run ./cmd/cli batch-import \
--host-base-url http://localhost:18097 \
--host-api-key <admin-key> \
--batch-stdin
```
### 8.1 为什么 V2 必须有后台 confirmer
`keys.csv` 格式
```csv
https://api.deepseek.com,sk-xxx
https://api.completion.com,sk-yyy
```
V2 的稳定性目标不能建立在“请求线程里顺序 sleep + retry”。必须有独立后台机制推进
CLI 输出必须引用 `run_id`,并能直接打印结果页入口:
- `confirming` item
- 因 probe race 暂时 advisory 的 item
-`503 no available accounts` 等待预热的 item
### 8.2 Canonical executor
V2 必须实现 `ConfirmationWorker`
```text
run_id: batch-20260522-001
result_page: /batch-import/runs/batch-20260522-001
ConfirmationWorker
- poll import_run_items where current_stage='confirm'
- condition: next_retry_at <= now
- acquire lease
- run confirm logic
- update item state
- release lease
```
### 4.5 结果查看 API 与页面
### 8.3 必需字段
v2 不再只提供 CLI 输出,必须提供最小可用的控制面结果查看能力。
`import_run_items` 至少要有:
#### HTTP API
- `confirmation_attempts`
- `retry_count`
- `last_retry_at`
- `next_retry_at`
- `lease_owner`
- `lease_until`
### 8.4 Restart safety
V2 第一版即要求:
- 进程重启后 unfinished confirm item 会被 worker 重新拾取
- 页面能看到 item 停在哪个阶段
- CLI `--confirm-wait-timeout` 只是“等待窗口”,不是确认机制本身
## 9. Single Source of Truth
### 9.1 Canonical runtime tables
V2 运行态只认三类表:
- `import_runs`
- `import_run_items`
- `import_run_item_events`
### 9.2 Legacy linkage
若某个 V2 item 调用了现有 v1 provision 流程,可在 item 上保留:
- `legacy_batch_id`
- `legacy_provider_id`
但这些字段仅作为追溯链接,不能替代 V2 状态源。
### 9.3 Result page data source
结果页/API 只读 V2 canonical tables不直接拼接
- `import_batches`
- `probe_results`
- `access_closure_records`
- 宿主数据库
## 10. Result API and Pages
### 10.1 API
V2 标准 API
```text
GET /api/batch-import/runs
GET /api/batch-import/runs/{run_id}
GET /api/batch-import/runs/{run_id}/items
GET /api/batch-import/runs/{run_id}/items/{item_id}
POST /api/batch-import/runs
GET /api/batch-import/runs
GET /api/batch-import/runs/{run_id}
GET /api/batch-import/runs/{run_id}/items
GET /api/batch-import/runs/{run_id}/items/{item_id}
```
用途:
Legacy API `/api/import-batches/*` 保留,但标为 v1/legacy。
- 列出最近批次
- 查看某个批次的整体统计
- 查看每条 URL / provider / account 的阶段结果
- 查看模型纠错、capability profile、advisory、retry 轨迹
#### 页面
至少提供一个简单结果页:
### 10.2 Pages
```text
/batch-import/runs
/batch-import/runs/{run_id}
```
页面最低要求
结果页必须能直接回答
- 批次列表页:
- run_id
- started_at / finished_at
- total / active / degraded / broken
- overall state
- 批次详情页:
- 每个 item 的 base_url / provider_id
- requested_models / normalized_models / resolved_smoke_model
- capability_profile 摘要
- channel_id / account_id
- confirmation_status / access_status
- advisory_messages
- last_error_stage / last_error
- 哪条 URL 导入成功
- 哪条卡在 `probe/provision/confirm/validate`
- 哪条发生模型纠错
- 哪条是 advisory 而不是 broken
- 重试过几次
- 当前 warning 的原因是什么
页面目标不是做复杂前端,而是让运营和开发能快速回答:
## 11. CLI Contract
- 哪条导入卡住了
- 卡在哪一阶段
- 是模型名错、兼容不支持、probe race还是 completion 失败
- 这个 warning 是暂时性的还是最终要人工处理的
## 5. 宿主硬约束(继承自 v1
- 不修改宿主源码
- 不直接写宿主数据库
- 只通过宿主 HTTP Admin API 和 Gateway API 工作
- channel 完整收口字段必须同时存在:`model_mapping` + `model_pricing` + `restrict_models=true` + `billing_model_source=channel_mapped`
- `/v1/models``/v1/chat/completions` 是两个独立验收层
- 结果页与运行状态只能读取控制面自己的状态库,不读取宿主数据库
## 6. 访问闭环
Stage 3 的 `access_status` 决定真实可用性:
| access_status | 含义 | 用户可使用 |
|---|---|---|
| `active` | Stage1 probe OK + Stage2 account OK + Stage3 completion OK | ✅ |
| `degraded` | Stage1/2 OK但 Stage3 completion 异常 | ⚠️ 限流/不稳定 |
| `broken` | Stage1 probe 失败或 Stage2 account test 失败 | ❌ |
补充约束:
- `requested_models` 只是提示,不是验收依据
- 只有 `resolved_smoke_model` 经上游实探成功,才能作为最终 smoke 模型
- 对于第三方 upstream 的首次 `403 Forbidden` account probe`/models` 已命中且 capability profile 已识别为 `responses_unsupported`,应先进入 `warning/confirming`,而不是立即 `broken`
- 对于导入后瞬时 `503 no available accounts`,应先进入短暂 retry 窗口,而不是立即最终失败
## 7. 错误恢复策略
- Stage 1 失败:记录 `upstream_unreachable`,跳过 Stage 2/3
- Stage 2 部分失败:已完成资源保留(不自动回滚)
- Stage 3 首次失败:进入 `confirming`,按 capability profile 与 transient 分类决定是否重试
- Stage 4 最终失败access_status 降级,但已创建资源不删除
- 整批中断:按 `--mode strict | partial` 处理
- `strict`:任一 item 失败,整批停止,报告已完成的
- `partial`(默认):失败 item 单独记录,成功的继续
需要新增两类恢复策略:
1. **模型名纠错恢复**
- 若请求方显式填写了模型名,但 upstream `/models` 未返回该模型
- 系统应尝试 normalized 比对和 alias 命中
- 若仍未命中,则返回“推荐模型名”,不要盲目创建错误配置
2. **兼容能力恢复**
-`/responses` 失败但 `/chat/completions` 成功
- profile 应明确标记 `supports_openai_responses=false`
- 后续同类 provider 默认直接跳过 responses 探测
3. **运行态稳定性恢复**
- item 的阶段结果、retry_count、last_error_stage 必须持久化
- 控制面重启后,历史 run 结果仍应可查看
- 若未来支持 resume必须显式区分 resumed run 与原始 run
## 8. 与 v1 的关系
v2 **不取代** v1而是新增一条并行入口
| | v1 (Pack-Based) | v2 (Auto-Import) |
|---|---|---|
| 输入 | provider manifest | URL + API key |
| 模型来源 | pack 内置 | 上游动态探测 |
| 适用场景 | 已知 provider批量标准化导入 | 新 provider即插即用 |
| channel 配置 | manifest 预定义 | 自动发现 + 扩展 |
v2 的 provider binding 复用 v1 已有 `managed_resources``import_batches` 表,只是入口不同。
## 9. 项目结构变化
```
internal/
probe/ # 新增:上游探测模块
models.go # GET /v1/models 解析
aliases.go # 模型名归一化 / 别名比对
completion.go # smoke test POST /v1/chat/completions
capability.go # /responses / /messages / stream / tools 能力探测
classifier.go # 错误分类auth/rate_limit/upstream/unreachable
batch/ # 新增:批量导入编排
service.go # BatchImportService: 管道编排
provider_id.go # URL → provider_id 规范化
channel_evolution.go # model_mapping 扩展逻辑
confirmation.go # 异步确认状态机 / retry policy
capability_profile.go # provider/model 兼容能力画像持久化与决策
run_state.go # import run / item 持久化模型
status_projection.go # 列表页 / 详情页统计投影
host/sub2api/
channel.go # 新增: PatchChannel(channel_id, add_model_mapping)
app/
http_batch_import.go # 批量导入 API
http_batch_runs.go # run 列表 / 详情 API 与页面
cmd/
cli/
batch_import.go # 新增: batch-import 命令
tests/integration/
batch_import_test.go # 新增: 批量导入集成测试
```bash
go run ./cmd/cli batch-import \
--host-id "<host_id>" \
--entry "https://example.com/v1,sk-xxx" \
--batch-file "./keys.csv" \
--mode "strict|partial" \
--access-mode "subscription|self_service" \
--subscription-users "u1,u2" \
--subscription-days 30 \
--probe-api-key "<user_gateway_key>" \
--confirm-wait-timeout 15s
```
## 10. 测试策略
CLI 输出必须至少包含:
### 单测
- `probe/models_test.go` — 模型列表解析,覆盖 OpenAI 格式变体
- `probe/aliases_test.go` — 模型名归一化、前缀剥离、常见拼写误差提示
- `probe/capability_test.go` — OpenAI/Anthropic/Responses 兼容能力探测
- `probe/classifier_test.go` — 错误类型分类
- `batch/provider_id_test.go` — URL → provider_id 规范化
- `batch/channel_evolution_test.go` — model_mapping 扩展差异计算
- `batch/confirmation_test.go` — 异步确认窗口、短暂 503 retry、advisory 降级
- `batch/capability_profile_test.go` — compatibility → routing strategy 决策
- `batch/run_state_test.go` — run/item 状态持久化与状态投影
- `batch/service_test.go` — 管道编排 mock 测试
- `app/http_batch_import_test.go` — 结果 API / 页面输出
- `run_id`
- `result_page`
- 每个 entry 的 `resolved_smoke_model`
- capability 摘要
- `confirmation_status`
- `access_status`
- 推荐模型名(若发生纠错)
### 集成测
- `tests/integration/batch_import_test.go`
- 两组 (url, key)probe + provision + validate 全流程
- strict 模式任一失败整批停止
- partial 模式失败 item 隔离
- 第一次 account test `403 Forbidden`,异步确认后转 warning/active
- 第一次 completion `503 no available accounts`,重试后转 active
- `requested_models` 填错时,能给出 `normalized_models/recommended_model`
- 导入过程中查询 run detail能看到阶段推进和 retry_count 变化
- 导入完成后页面/API 可查看 run summary 和 item 详情
## 12. Error Policy
## 11. 暂不做v2 范围外)
### Blocking
- 自动生成价格策略(先记录默认值和未确认状态)
- 自动发现 provider 的 channel pricingmodel pricing 留空,等用户配置)
- 多 key 之间的负载均衡策略
- 对账调度器( reconcile 由 v1 提供)
- `401/403 unauthorized` 且证据表明 key 无效
- `/v1/models` 完全不可用且无替代路径
- provision 明确失败
## 12. 成功标准
### Advisory
1. CLI `batch-import` 可接受单条和文件批量输入
2. Stage 1 probe 能在 10s 内返回上游模型列表(超时控制)
3. 重复导入同一 URL+key 时,不重复创建 channel/account幂等
4. Stage 3 completion 测试通过时,`access_status=active`
5. Stage 3 失败时access_status 正确降级broken/degraded
6. `strict` 模式下,任一 item 失败整批停止并报告
7. `partial` 模式下,成功的 item 不因失败 item 而中断
8. 结果页可查看每个 run / item 的状态、advisory、retry 轨迹和最终 access status
9. 控制面重启后,历史 run 结果仍可查看
10. 全流程不修改宿主源码,不写宿主数据库
- 第三方 upstream `/responses=403``/chat/completions=200`
- 首次 `/accounts/:id/test=403`,但 probe race 已被识别
- 首次 `/v1/chat/completions=503 no available accounts`,且重试后恢复
- `429 rate_limit`
## 13. 开放问题(已决策)
### Access status ownership
1. **provider_id 策略**:选 Bhost + hash`{normalized_host}-{url_hash_last8}`
2. **model_pricing 为空**:选 B自动补空 pricing填默认值不阻塞导入
3. **smoke test model**:选 C遍历 data 找第一个能完成 chat completion 的模型
- `confirmation_status=advisory` 不自动等于 `access_status=degraded`
- 只有 Validation Engine 可以把 item 标成 `active/degraded/broken`
## 13. Success Criteria
1. `access_mode` 输入契约完整,`subscription` / `self_service` 都可单独落地
2. run / item 状态、重试、warning、错误阶段能持久化并在重启后恢复可见
3. 结果页和 API 只读 V2 canonical tables
4. 模型纠错结果、capability profile、推荐模型名可追溯
5. 第三方兼容 upstream 的 `/responses` 误判和宿主异步窗口不会把可用链路直接打成最终失败
6. 页面可以清楚地区分 `confirmed/advisory/failed``active/degraded/broken`
7. OpenAPI、SPEC、TDD、Architecture 对同一字段和同一状态枚举保持一致
## 14. Non-goals for first implementation
- 多 key 自动调度
- 实时推送
- 自动定价策略
- 自动负载均衡
## 15. Final decisions
1. `provider_id` 采用 `normalized_host + url_hash_last8`
2. `requested_models` 仅作提示,不作为事实源
3. `Validation Engine``access_status` 唯一写入方
4. V2 runtime canonical tables 为 `import_runs/import_run_items/import_run_item_events`
5. `ConfirmationWorker` 是 V2 必备组件,不是可选增强

View File

@@ -3,24 +3,106 @@
日期2026-05-21
技术架构:`docs/2026-05-22-BATCH_AUTO_IMPORT_V2_ARCHITECTURE.md`
## 目标
## 1. 目标
让管理员只提供 `(base_url, api_key)`,系统即可自动完成:
本计划只服务一件事:把 V2 设计落成**可测试、可恢复、可观察**的实现路径。
1. 上游模型发现
2. 模型名归一化与纠错
3. 兼容能力画像生成
4. 宿主资源创建与 channel 演化
5. 异步确认账号与宿主稳定状态
6. 最终 `/v1/chat/completions` 闭环验证
7. 运行态状态持久化与结果恢复
8. 结果查看 API 与页面
对应目标:
本计划与 [2026-05-21-BATCH_AUTO_IMPORT_SPEC.md](./2026-05-21-BATCH_AUTO_IMPORT_SPEC.md) 保持一致,重点把“多次真实验收中踩出的经验”落实成可测试的实现顺序。
1. URL + key 自动发现模型
2. 模型名归一化与推荐纠错
3. provider/model 兼容画像建模
4. 宿主资源演化与 provider 绑定
5. 后台异步确认与有限重试
6. 最终 gateway completion 验证
7. run/item 状态持久化与结果页可读
## 依赖顺序
## 2. Canonical Contract
必须按以下顺序实现,前一个未完成前不开始后一个:
实现前先锁定 canonical contract测试、接口、状态表全部按这一套。
### 2.1 核心 ID
- `run_id string`
- `item_id string`
- `provider_id string = {normalized_host}-{url_hash_last8}`
### 2.2 Run 状态
```go
type RunState string
const (
RunStateRunning RunState = "running"
RunStateCompleted RunState = "completed"
RunStateCompletedWithWarnings RunState = "completed_with_warnings"
RunStateFailed RunState = "failed"
RunStateCancelled RunState = "cancelled"
)
```
### 2.3 Item 状态
```go
type ItemStage string
const (
ItemStageProbe ItemStage = "probe"
ItemStageProvision ItemStage = "provision"
ItemStageConfirm ItemStage = "confirm"
ItemStageValidate ItemStage = "validate"
ItemStageDone ItemStage = "done"
)
type ConfirmationStatus string
const (
ConfirmationPending ConfirmationStatus = "pending"
ConfirmationConfirmed ConfirmationStatus = "confirmed"
ConfirmationAdvisory ConfirmationStatus = "advisory"
ConfirmationFailed ConfirmationStatus = "failed"
)
type AccessStatus string
const (
AccessStatusUnknown AccessStatus = "unknown"
AccessStatusActive AccessStatus = "active"
AccessStatusDegraded AccessStatus = "degraded"
AccessStatusBroken AccessStatus = "broken"
)
```
### 2.4 Access Mode 输入
```go
type BatchImportRunRequest struct {
HostID string
Mode string
AccessMode string
ConfirmWaitTimeoutSec int
SubscriptionUsers []string
SubscriptionDays int
ProbeAPIKey string
Entries []BatchImportEntry
}
type BatchImportEntry struct {
BaseURL string
APIKey string
RequestedModels []string
}
```
校验规则:
- `subscription` 必须有 `SubscriptionUsers` + `SubscriptionDays`
- `self_service` 必须有 `ProbeAPIKey`
- `RequestedModels` 只作提示,不作事实源
## 3. 实现顺序
必须按以下顺序做:
```text
probe/models + probe/aliases
@@ -29,11 +111,15 @@ probe/capability + probe/completion
batch/provider_id + batch/capability_profile
host/channel_patch + batch/run_state
host/channel_patch_contract
batch/run_state + batch/run_events
batch/service
batch/confirmation
batch/confirmation_worker
batch/validation
app/http_batch_import + app/http_batch_runs
@@ -42,50 +128,41 @@ cmd/cli/batch_import
tests/integration/batch_import
```
关键原则:
原则:
-把“上游真实返回什么”查清楚,再决定写入宿主什么
-把“兼容能力”显式建模,再决定 `/responses``/chat/completions`、Anthropic 兼容入口如何分流
-把“异步确认窗口”建模,再讨论最终 `active/degraded/broken`
- 先把“状态如何持久化和投影”建模,再做结果页;页面只读运行态状态库,不直接拼接宿主实时返回
-锁死状态契约,再写 worker
-让状态库存得全,再做结果页
-让 Validation Engine 成为 `access_status` 唯一写入方,再做 projection
## Stage 1: probe 模块(上游发现)
## 4. Stage 1: Probe
### 1.1 `internal/probe/models.go`
### 4.1 `internal/probe/models.go`
**职责**:调用 `GET {base_url}/v1/models`,解析 OpenAI-compatible 响应,返回原始模型 ID 列表
职责:拉取 `/v1/models`
```go
type ModelsResult struct {
RawModels []string
HTTPStatus int
LatencyMs int64
Error string
RawModels []string
HTTPStatus int
LatencyMs int64
Error string
}
func ProviderModels(ctx context.Context, baseURL, apiKey string) (*ModelsResult, error)
```
错误分类
- `ErrAuthFailed`401/403
- `ErrRateLimited`429
- `ErrUpstreamUnreachable`502/503/timeout/connection
- `ErrUnexpected`:其他 HTTP / decode 错误
**单测**
单测
```go
func TestProviderModels_OpenAIFormat_ReturnsModelList(t *testing.T)
func TestProviderModels_EmptyData_ReturnsEmptySlice(t *testing.T)
func TestProviderModels_AuthFailed_ReturnsErrAuthFailed(t *testing.T)
func TestProviderModels_Timeout_ReturnsErrUpstreamUnreachable(t *testing.T)
func TestProviderModels_RecordsLatency(t *testing.T)
```
### 1.2 `internal/probe/aliases.go`
### 4.2 `internal/probe/aliases.go`
**职责**:归一化模型名,消除大小写、供应商前缀、常见格式差异,并生成推荐模型
职责:模型归一化、别名、推荐纠错
```go
type AliasResult struct {
@@ -98,72 +175,61 @@ func NormalizeModelID(raw string) string
func CanonicalModelID(raw string) string
func BuildAliasTable(rawModels []string) map[string]AliasResult
func ResolveRequestedModel(requested string, rawModels []string) (resolved string, ok bool)
func RecommendModels(requested []string, rawModels []string) []string
```
归一化最少覆盖
- 大小写归一
- `vendor/model` 前缀剥离
- 点号/连字符差异
- 典型人工误写场景,例如 `m27` vs `m2.7`
**单测**
单测
```go
func TestNormalizeModelID_MinimaxCanonical(t *testing.T)
func TestNormalizeModelID_DeepSeekVendorPrefix(t *testing.T)
func TestCanonicalModelID_KimiCaseInsensitive(t *testing.T)
func TestResolveRequestedModel_ExactHit(t *testing.T)
func TestResolveRequestedModel_UsesNormalizedAlias(t *testing.T)
func TestResolveRequestedModel_MissReturnsFalse(t *testing.T)
func TestRecommendModels_ReturnsCanonicalCandidates(t *testing.T)
```
### 1.3 `internal/probe/capability.go`
### 4.3 `internal/probe/capability.go`
**职责**:对同一把 key 进行最小兼容探测,生成 capability profile。
探测对象最少包括:
- `GET /v1/models`
- `POST /v1/chat/completions`
- `POST /v1/responses`
- `POST /v1/messages`Anthropic compatible
职责:生成 transport profile + model profiles
```go
type CapabilityProfile struct {
type TransportProfile struct {
SupportsOpenAIModels bool
SupportsOpenAIChatCompletions bool
SupportsOpenAIResponses bool
SupportsAnthropicMessages bool
SupportsStream string
SupportsTools string
SupportsReasoningFields string
AuthStyle string
ModelIDStyle string
KnownAdvisories []string
}
type ModelCapabilityProfile struct {
RawModelID string
NormalizedModelID string
SupportsStream string
SupportsTools string
SupportsReasoningFields string
SmokeChatOK bool
}
type CapabilityProfile struct {
Transport TransportProfile
ModelProfile []ModelCapabilityProfile
}
func ProbeCapabilities(ctx context.Context, baseURL, apiKey string, rawModels []string) (*CapabilityProfile, error)
```
约束
- 对第三方 OpenAI-compatible 上游,`/responses=403` 不得机械判成 `supported`
- 要能记录 `responses_unsupported_but_chat_ok`
- 要能记录 `initial_probe_race_expected`
**单测**
单测
```go
func TestProbeCapabilities_Responses403Chat200_MarksResponsesUnsupported(t *testing.T)
func TestProbeCapabilities_AnthropicMessages200_MarksSupported(t *testing.T)
func TestProbeCapabilities_ModelsOnly_MarksPartialProfile(t *testing.T)
func TestProbeCapabilities_ModelProfilesCapturedPerModel(t *testing.T)
func TestProbeCapabilities_RecordsKnownAdvisories(t *testing.T)
```
### 1.4 `internal/probe/completion.go`
### 4.4 `internal/probe/completion.go`
**职责**:从 discovered models 中选择 smoke model执行最小 completion 测试
职责:决定 smoke model并做最小 completion。
```go
type CompletionResult struct {
@@ -174,383 +240,365 @@ type CompletionResult struct {
Error string
}
func ResolveSmokeModel(requested []string, rawModels []string, profile *CapabilityProfile) (string, error)
func ResolveSmokeModel(requested []string, rawModels []string, profile *CapabilityProfile) (string, []string, error)
func SmokeCompletion(ctx context.Context, baseURL, apiKey, model string, profile *CapabilityProfile) (*CompletionResult, error)
```
规则
- 优先使用 `ResolveRequestedModel`
- 若人工指定模型无效,则自动退回上游真实可用模型
- 若 profile 已知不支持 `/responses`,必须直接走 raw `/chat/completions`
**单测**
单测
```go
func TestResolveSmokeModel_UsesRequestedAliasWhenMatched(t *testing.T)
func TestResolveSmokeModel_FallsBackToDiscoveredModel(t *testing.T)
func TestSmokeCompletion_ResponsesUnsupported_UsesChatCompletions(t *testing.T)
func TestSmokeCompletion_AllCandidatesFail_ReturnsErrNoUsableModel(t *testing.T)
```
## Stage 2: batch 模块(批量导入编排)
## 5. Stage 2: Provision & Channel Evolution
### 2.1 `internal/batch/provider_id.go`
**职责**:把 URL 规范化成稳定 `provider_id`
### 5.1 `internal/batch/provider_id.go`
```go
func NormalizeProviderID(baseURL string) string
```
策略
规则
- host 为主体
- 完整 URL hash 防碰撞
- 同 host 不同 path 生成不同 ID
- 规范化 host
- 基于完整 URL hash
- 同 host 不同 path 必须不同 ID
**单测**
单测:
```go
func TestNormalizeProviderID_Basic(t *testing.T)
func TestNormalizeProviderID_WithPath_IncludesPathHash(t *testing.T)
func TestNormalizeProviderID_Idempotent(t *testing.T)
func TestNormalizeProviderID_DifferentPaths_DifferentIDs(t *testing.T)
```
### 2.2 `internal/batch/capability_profile.go`
### 5.2 `internal/batch/capability_profile.go`
**职责**:将 Stage 1 的 capability profile 映射为导入策略。
职责:把 capability profile 转成导入/确认策略。
```go
type ImportRoutingStrategy struct {
UseRawChatCompletions bool
SkipResponsesChecks bool
RetryInitial503 bool
TreatProbe403AsAdvisory bool
TreatProbe403Advisory bool
}
func BuildImportRoutingStrategy(profile *probe.CapabilityProfile) ImportRoutingStrategy
```
**单测**
单测:
```go
func TestBuildImportRoutingStrategy_ResponsesUnsupported_UsesRawChat(t *testing.T)
func TestBuildImportRoutingStrategy_ProbeRaceAdvisory_EnablesProbe403Advisory(t *testing.T)
func TestBuildImportRoutingStrategy_WarmupExpected_Enables503Retry(t *testing.T)
```
### 2.3 `internal/batch/channel_evolution.go`
### 5.3 `internal/batch/channel_evolution.go`
**职责**:对比 channel 现有模型和新探测模型,计算 patch
职责:构造完整 channel patch contract
```go
func ModelMappingDelta(existing []string, discovered []string) (add []string)
func BuildPatchModelMapping(existing map[string]string, aliases map[string]probe.AliasResult) map[string]string
func BuildPatchModelPricing(models []string) map[string]any
```
要求:
- upstream raw model 和 gateway canonical model 的映射必须同时可追踪
- patch 后不得破坏旧模型
**单测**
```go
func TestModelMappingDelta_NoOverlap_AddsAll(t *testing.T)
func TestModelMappingDelta_PartialOverlap_AddsMissingOnly(t *testing.T)
func TestBuildPatchModelMapping_PreservesExistingEntries(t *testing.T)
func TestBuildPatchModelMapping_AddsCanonicalAliases(t *testing.T)
```
### 2.4 `internal/batch/service.go`
**职责**:编排 Probe → Provision → Async Confirm → Validate 四阶段。
```go
type BatchImportRequest struct {
BaseURL string
APIKey string
RequestedModels []string
AccessMode string
type ChannelPatchContract struct {
ModelMapping map[string]string
ModelPricing map[string]any
RestrictModels bool
BillingModelSource string
}
func ModelMappingDelta(existing map[string]string, discoveredAliases map[string]probe.AliasResult) ChannelPatchContract
```
单测:
```go
func TestModelMappingDelta_PreservesExistingEntries(t *testing.T)
func TestModelMappingDelta_AddsRawToCanonicalMappings(t *testing.T)
func TestModelMappingDelta_SetsRestrictModelsAndBillingSource(t *testing.T)
```
## 6. Stage 3: State Store
### 6.1 `internal/batch/run_state.go`
V2 canonical runtime store
- `import_runs`
- `import_run_items`
- `import_run_item_events`
```go
type ImportRunState struct {
RunID string
Mode string
AccessMode string
State RunState
TotalItems int
CompletedItems int
ActiveItems int
DegradedItems int
BrokenItems int
WarningItems int
StartedAt time.Time
UpdatedAt time.Time
FinishedAt *time.Time
}
type ImportRunItemState struct {
RunID string
ItemID string
BaseURL string
ProviderID string
CurrentStage ItemStage
ConfirmationStatus ConfirmationStatus
AccessStatus AccessStatus
RequestedModels []string
RawModels []string
NormalizedModels []string
ResolvedSmokeModel *string
RecommendedModels []string
CapabilityProfileJSON string
ChannelID *int64
AccountID *int64
RetryCount int
ConfirmationAttempts int
LastRetryAt *time.Time
NextRetryAt *time.Time
LeaseOwner *string
LeaseUntil *time.Time
AdvisoryMessages []string
LastErrorStage *string
LastError *string
LegacyBatchID *int64
LegacyProviderID *string
CreatedAt time.Time
UpdatedAt time.Time
}
type ImportRunItemEvent struct {
EventID string
RunID string
ItemID string
EventType string
Stage string
Attempt int
Message string
PayloadJSON string
CreatedAt time.Time
}
```
单测:
```go
func TestRunStateStore_CreateAndUpdateRun(t *testing.T)
func TestRunStateStore_UpsertItemStoresProjectionFields(t *testing.T)
func TestRunStateStore_EventTrailCanBeQueried(t *testing.T)
func TestRunStateStore_LeaseFieldsPersist(t *testing.T)
```
## 7. Stage 4: Batch Service
### 7.1 `internal/batch/service.go`
```go
type BatchImportService struct {
Host hostadapter.HostAdapter
Probe *probe.Client
Provision *provision.ImportService
Confirm *ConfirmationService
StateStore RunStateStore
Queue ConfirmationQueue
}
func (s *BatchImportService) ImportBatch(ctx context.Context, req BatchImportRequest) (*BatchImportResult, error)
func (s *BatchImportService) StartRun(ctx context.Context, req BatchImportRunRequest) (*BatchImportRunResult, error)
```
职责细化
职责:
- 不再信任 `requested_models` 为最终事实
- 必须把 `raw_models` / `normalized_models` / `resolved_smoke_model` 写入结果
- 首次 account test 的 `403 Forbidden` 允许按 advisory 处理
- 首次 gateway completion 的短暂 `503 no available accounts` 允许短时重试
- 创建 run + item
- 先落 probe/provision 结果
- 入队 confirm不在请求线程里承担全部确认责任
- CLI/HTTP 只负责“发起”和“可选等待窗口”
**单测**
单测:
```go
func TestBatchImport_AllProbeOk_ProvisionsAndValidates(t *testing.T)
func TestBatchImport_StartRun_PersistsInitialState(t *testing.T)
func TestBatchImport_RequestedModelMiss_UsesDiscoveredModel(t *testing.T)
func TestBatchImport_Probe403Race_DowngradesToWarning(t *testing.T)
func TestBatchImport_Initial503Warmup_RetriesBeforeBroken(t *testing.T)
func TestBatchImport_PartialMode_ContinuesOnFailure(t *testing.T)
func TestBatchImport_Idempotent_SkipsExistingAccount(t *testing.T)
func TestBatchImport_PersistsRunAndItemState(t *testing.T)
func TestBatchImport_ProvisionWritesLegacyLinks(t *testing.T)
```
### 2.5 `internal/batch/run_state.go`
## 8. Stage 5: Confirmation Worker
**职责**:持久化 import run / item 的阶段结果,并生成页面读取所需的统计投影。
### 8.1 `internal/batch/confirmation.go`
```go
type ImportRunState struct {
RunID string
State string
TotalItems int
ActiveItems int
DegradedItems int
BrokenItems int
StartedAt time.Time
UpdatedAt time.Time
FinishedAt *time.Time
type ConfirmationWorker struct {
Host hostadapter.HostAdapter
StateStore RunStateStore
Validate ValidationService
Clock Clock
WorkerID string
}
type ImportRunItemState struct {
RunID string
ItemID string
BaseURL string
ProviderID string
CurrentStage string
StageStatus string
RetryCount int
AdvisoryMessages []string
LastErrorStage string
LastError string
}
type RunStateStore interface {
CreateRun(ctx context.Context, run ImportRunState) error
UpdateRun(ctx context.Context, run ImportRunState) error
UpsertItem(ctx context.Context, item ImportRunItemState) error
ListRuns(ctx context.Context, limit int) ([]ImportRunState, error)
ListRunItems(ctx context.Context, runID string) ([]ImportRunItemState, error)
}
```
**单测**
```go
func TestRunStateStore_CreateAndUpdateRun(t *testing.T)
func TestRunStateStore_UpsertItemPreservesLatestStage(t *testing.T)
func TestRunStateStore_ListRunsReturnsSummary(t *testing.T)
func TestRunStateStore_ListRunItemsReturnsRetryAndAdvisory(t *testing.T)
```
## Stage 3: host adapter 扩展
### 3.1 `internal/host/sub2api/channel.go`
新增:
```go
func (h *HostAdapter) PatchChannel(ctx context.Context, channelID int64, addModels []string) error
func (h *HostAdapter) PatchChannelPricing(ctx context.Context, channelID int64, pricing map[string]any) error
```
**单测**
```go
func TestPatchChannel_AddsModelMappingEntries(t *testing.T)
func TestPatchChannel_PreservesExistingEntries(t *testing.T)
func TestPatchChannelPricing_AddsNewModels(t *testing.T)
```
### 3.2 `internal/host/sub2api/accounts.go`
若当前 host adapter 的 account test / models 读取逻辑无法暴露 advisory 信息,需要最小增强:
```go
type AccountProbeSnapshot struct {
Models []string
ProbeStatus string
ProbeAdvisory bool
ProbeMessage string
}
func (h *HostAdapter) GetAccountProbeSnapshot(ctx context.Context, accountID int64) (*AccountProbeSnapshot, error)
```
**单测**
```go
func TestGetAccountProbeSnapshot_403RaceCapturedAsAdvisory(t *testing.T)
func TestGetAccountProbeSnapshot_ReturnsModelsAndMessage(t *testing.T)
```
## Stage 4: async confirm 模块
### 4.1 `internal/batch/confirmation.go`
**职责**:把“账号刚建好但宿主异步行为未稳定”的窗口显式建模。
```go
type ConfirmationStatus string
const (
ConfirmationPending ConfirmationStatus = "pending"
ConfirmationActive ConfirmationStatus = "confirmed_active"
ConfirmationWarning ConfirmationStatus = "confirmed_warning"
ConfirmationBroken ConfirmationStatus = "confirmed_broken"
)
type ConfirmationService struct {
Host hostadapter.HostAdapter
}
func (s *ConfirmationService) ConfirmAccount(ctx context.Context, req ConfirmationRequest) (*ConfirmationResult, error)
func (w *ConfirmationWorker) Tick(ctx context.Context, now time.Time) error
func (w *ConfirmationWorker) ConfirmItem(ctx context.Context, item ImportRunItemState) (*ImportRunItemState, error)
```
行为:
- 先查 `/accounts/:id/models`
- 若 models 已正确,但 `/test` 为首次 `403 Forbidden` 且 profile 指示 third-party responses unsupported则判为 advisory
- 若 completion 首次 `503 no available accounts`,在预算内短暂重试
- 最终将结果归入:
- `confirmed_active`
- `confirmed_warning`
- `confirmed_broken`
- 轮询 `current_stage=confirm``next_retry_at<=now` 的 item
- 获取 lease
- 执行 account models / account test / transient 503 absorb
- `confirmation_status = confirmed | advisory | failed`
- confirm 完毕后推进到 `validate`
**单测**
约束:
- 首次 `403` probe race`/models` 已正确且 profile 说明 `responses` 不支持,则标记 `advisory`
- `confirmation_status` 不是最终可用性
单测:
```go
func TestConfirmAccount_ModelsOkProbe403Race_ReturnsWarning(t *testing.T)
func TestConfirmAccount_Initial503Then200_ReturnsActive(t *testing.T)
func TestConfirmAccount_AllRetriesExhausted_ReturnsBroken(t *testing.T)
func TestConfirmAccount_RecordsRetryAttempts(t *testing.T)
func TestConfirmationWorker_Probe403Race_ReturnsAdvisory(t *testing.T)
func TestConfirmationWorker_UsesLeaseAndNextRetryAt(t *testing.T)
func TestConfirmationWorker_RestartCanResumeUnlockedItem(t *testing.T)
```
## Stage 5: 结果 API 与页面
## 9. Stage 6: Validation Engine
### 5.1 `internal/app/http_batch_import.go`
**职责**:暴露运行结果查询 API。
### 9.1 `internal/batch/validation.go`
```go
type ValidationService struct {
Host hostadapter.HostAdapter
}
func (s *ValidationService) ValidateAccess(ctx context.Context, item ImportRunItemState, req BatchImportRunRequest) (AccessStatus, []string, error)
```
规则:
- 只有这里能最终写 `access_status`
- `confirmed + gateway chat 200``active`
- `advisory + gateway chat 200``active`
- `gateway chat transient but exhausted``degraded`
- `gateway chat definitively failed``broken`
单测:
```go
func TestValidationService_GatewayChat200_ReturnsActive(t *testing.T)
func TestValidationService_Transient503Exhausted_ReturnsDegraded(t *testing.T)
func TestValidationService_FinalFailure_ReturnsBroken(t *testing.T)
```
## 10. Stage 7: HTTP API & Result Pages
### 10.1 `internal/app/http_batch_import.go`
```go
func (a *App) createBatchImportRun(w http.ResponseWriter, r *http.Request)
func (a *App) listBatchImportRuns(w http.ResponseWriter, r *http.Request)
func (a *App) getBatchImportRun(w http.ResponseWriter, r *http.Request)
func (a *App) listBatchImportRunItems(w http.ResponseWriter, r *http.Request)
func (a *App) getBatchImportRunItem(w http.ResponseWriter, r *http.Request)
```
返回内容至少包含
要求
- run summary
- item current stage
- normalized/resolved model 信息
- confirmation / access 状态
- advisory / retry / last_error
- 直接返回 projection不让页面自己拼状态
- 列表页筛选使用 `run.state`
- item 详情必须返回 event trail
**单测**
单测:
```go
func TestListBatchImportRuns_ReturnsSummary(t *testing.T)
func TestGetBatchImportRun_ReturnsRunDetail(t *testing.T)
func TestListBatchImportRunItems_ReturnsProjectedItems(t *testing.T)
func TestGetBatchImportRunItem_ReturnsAdvisoryAndRetryTrail(t *testing.T)
func TestCreateBatchImportRun_ValidatesAccessModeInputs(t *testing.T)
func TestListBatchImportRuns_ReturnsCanonicalState(t *testing.T)
func TestGetBatchImportRunItem_ReturnsEventTrailAndRecommendedModels(t *testing.T)
```
### 5.2 `internal/app/http_batch_runs.go`
**职责**:提供最小页面输出,不要求复杂前端,但必须让人直接看懂结果。
### 10.2 `internal/app/http_batch_runs.go`
页面:
- `/batch-import/runs`
- `/batch-import/runs/{run_id}`
**页面要求**
- 列表页可见 run 总状态
- 详情页可见 item 级状态、模型纠错结果、capability profile 摘要、warning / broken 原因
**单测**
单测:
```go
func TestBatchImportRunsPage_RendersRunSummary(t *testing.T)
func TestBatchImportRunDetailPage_RendersItemStates(t *testing.T)
func TestBatchImportRunsPage_RendersCanonicalBadges(t *testing.T)
func TestBatchImportRunDetailPage_RendersCapabilitySummary(t *testing.T)
```
## Stage 6: CLI
## 11. Stage 8: CLI
### 6.1 `cmd/cli/batch_import.go`
### 11.1 `cmd/cli/batch_import.go`
```bash
go run ./cmd/cli batch-import \
--host-base-url string \
--host-api-key string \
--host-id string \
--entry "url,key" \
--batch-file string \
--mode "strict|partial" \
--access-mode "subscription|self_service" \
--requested-model string \
--confirm-timeout duration
--subscription-users "u1,u2" \
--subscription-days 30 \
--probe-api-key string \
--confirm-wait-timeout 15s
```
补充要求
- 输出必须包含:
- `run_id`
- `result_page`
- `raw_models`
- `normalized_models`
- `resolved_smoke_model`
- `capability_profile`
- `confirmation_status`
- 如果人工输入模型名不匹配CLI 要明确给出“推荐模型名”
**CLI 集成测试**
CLI 集成测试
```go
func TestBatchImportCLI_ReportsResolvedModel(t *testing.T)
func TestBatchImportCLI_ReportsCapabilityProfile(t *testing.T)
func TestBatchImportCLI_ReportsConfirmationStatus(t *testing.T)
func TestBatchImportCLI_ReportsRunResultPage(t *testing.T)
func TestBatchImportCLI_ReportsRunIDAndResultPage(t *testing.T)
func TestBatchImportCLI_ReportsResolvedAndRecommendedModels(t *testing.T)
func TestBatchImportCLI_ReportsConfirmationAndAccessStatus(t *testing.T)
```
## Stage 7: 集成测试
## 12. Integration Tests
### `tests/integration/batch_import_test.go`
使用真实 SQLite + fake/httptest upstream覆盖:
覆盖场景
1. 标准 OpenAI-compatible 上游成功导入
2. 人工输入模型名错误,但通过 alias 解析成功
3. `/responses=403``/chat/completions=200` 的第三方兼容场景
4. 首次 `/accounts/:id/test=403`,稍后 advisory 翻正
5. 首次 `/v1/chat/completions=503 no available accounts`,重试后 `200`
6. capability profile 驱动路由分流
7. 导入进行中可查询 run / item 状态
8. 控制面重启后,历史 run 结果仍可查看
2. 人工模型名错误alias 自动纠正
3. `/responses=403``/chat/completions=200`
4. 首次 `/accounts/:id/test=403`,稍后 advisory
5. 首次 `/v1/chat/completions=503 no available accounts`,重试后 200
6. capability profile 按模型粒度输出
7. 导入进行中可查询 run/item 状态
8. 控制面重启后 worker 能继续拾取 unfinished item
```go
func TestBatchImport_FullPipeline(t *testing.T)
func TestBatchImport_RequestedModelTypo_IsAutoCorrected(t *testing.T)
func TestBatchImport_ThirdPartyResponsesUnsupported_StillSucceeds(t *testing.T)
func TestBatchImport_ProbeRace_BecomesWarningNotBroken(t *testing.T)
func TestBatchImport_ProbeRace_BecomesAdvisory(t *testing.T)
func TestBatchImport_Initial503Warmup_RetrySucceeds(t *testing.T)
func TestBatchImport_RunStatusIsQueryableDuringExecution(t *testing.T)
func TestBatchImport_RunResultSurvivesRestart(t *testing.T)
func TestBatchImport_RunResultSurvivesRestartAndResumes(t *testing.T)
```
## 验收命令
## 13. Required OpenAPI sync
实现时必须同步:
- `POST /api/batch-import/runs`
- `GET /api/batch-import/runs`
- `GET /api/batch-import/runs/{run_id}`
- `GET /api/batch-import/runs/{run_id}/items`
- `GET /api/batch-import/runs/{run_id}/items/{item_id}`
并将 `/api/import-batches/*` 标注为 v1/legacy。
## 14. Acceptance commands
```bash
go test ./internal/probe/... -v -count=1
@@ -563,13 +611,7 @@ go vet ./...
gofmt -l .
```
覆盖率目标:
- `internal/probe`: >= 80%
- `internal/batch`: >= 75%
- `internal/provision`: >= 75%
## 任务清单
## 15. Task checklist
- [ ] `internal/probe/models.go`
- [ ] `internal/probe/aliases.go`
@@ -578,13 +620,14 @@ gofmt -l .
- [ ] `internal/batch/provider_id.go`
- [ ] `internal/batch/capability_profile.go`
- [ ] `internal/batch/channel_evolution.go`
- [ ] `internal/batch/run_state.go`
- [ ] `internal/batch/service.go`
- [ ] `internal/batch/confirmation.go`
- [ ] `internal/batch/run_state.go`
- [ ] `internal/batch/validation.go`
- [ ] `internal/host/sub2api/channel.go`
- [ ] `internal/host/sub2api/accounts.go`
- [ ] `internal/app/http_batch_import.go`
- [ ] `internal/app/http_batch_runs.go`
- [ ] `cmd/cli/batch_import.go`
- [ ] `tests/integration/batch_import_test.go`
- [ ] 更新 `EXECUTION_BOARD.md` 跟踪 V2 实施状态
- [ ] `docs/openapi.yaml`

File diff suppressed because it is too large Load Diff

View File

@@ -143,7 +143,7 @@
## v2 规划Batch Auto-ImportURL + Key
**当前阶段**:🔨 设计中(待评审与完善
**当前阶段**:🔨 设计中(核心契约已收口,待按收口版本进入实现
**文档**`docs/2026-05-21-BATCH_AUTO_IMPORT_SPEC.md`(需求规格)
**TDD 计划**`docs/2026-05-21-BATCH_AUTO_IMPORT_TDD_PLAN.md`(实现路径,已确认开放问题)
@@ -158,13 +158,22 @@
- run / item 状态持久化、retry 轨迹、控制面重启后的历史结果查看
- 批次列表页 / 批次详情页用于查看模型纠错结果、账号状态、provider warning 与最终 access 状态
- 当前 v2 的目标已从“同步导入成功”升级为“导入 + 异步确认 + 最终闭环验真”
- 已按 review 收口最关键的 4 个冲突:
- 统一 canonical contract`run_id/item_id/provider_id/run.state/confirmation_status/access_status`
- 补齐 `subscription` / `self_service` 的输入契约
- 明确 V2 唯一状态源为 `import_runs/import_run_items/import_run_item_events`
- 明确 `ConfirmationWorker + lease + next_retry_at` 的异步确认执行机制
- 其余 review 问题也已同步收口:
- capability 从 upstream 总画像升级为 transport + model profiles
- 结果页字段、状态库存储字段、retry/event trail 已统一
- OpenAPI 已补齐 `/api/batch-import/runs*`legacy `/api/import-batches/*` 降级为 v1/legacy
**设计待完成**
- [ ] **技术设计**API 接口CLI + HTTP、数据模型、DB schema 变更、错误处理
- [ ] **UI 设计**CLI 输出格式 / HTTP API 文档 / Web 控制台(待确认交付形态)
- [ ] **评审**:相关专业人员评审设计文档
**当前剩余项**
- [ ] 按收口后的 canonical contract 输出数据库 migration 草案
- [ ] 按收口后的 OpenAPI 与 projection 字段开始实现
- [ ] 进入实现前再做一次实现前审阅,确认没有新增分叉
**实现暂停**:等设计评审通过后再开始写代码
**实现前 Gate**:文档级 review 问题已收口,当前可以进入“按文档写 migration / 接口 / worker”的实现准备阶段
---

View File

@@ -151,19 +151,104 @@ paths:
$ref: '#/components/responses/Unauthorized'
'404':
description: pack not found
/api/batch-import/runs:
post:
security:
- bearerAuth: []
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateBatchImportRunRequest'
responses:
'200':
description: batch import run created
content:
application/json:
schema:
$ref: '#/components/schemas/BatchImportRunCreateResponse'
'401':
$ref: '#/components/responses/Unauthorized'
get:
security:
- bearerAuth: []
responses:
'200':
description: list batch import runs
content:
application/json:
schema:
$ref: '#/components/schemas/ListBatchImportRunsResponse'
'401':
$ref: '#/components/responses/Unauthorized'
/api/batch-import/runs/{run_id}:
get:
security:
- bearerAuth: []
parameters:
- $ref: '#/components/parameters/RunID'
responses:
'200':
description: batch import run detail
content:
application/json:
schema:
$ref: '#/components/schemas/BatchImportRunDetail'
'401':
$ref: '#/components/responses/Unauthorized'
'404':
description: run not found
/api/batch-import/runs/{run_id}/items:
get:
security:
- bearerAuth: []
parameters:
- $ref: '#/components/parameters/RunID'
responses:
'200':
description: batch import run items
content:
application/json:
schema:
$ref: '#/components/schemas/ListBatchImportRunItemsResponse'
'401':
$ref: '#/components/responses/Unauthorized'
'404':
description: run not found
/api/batch-import/runs/{run_id}/items/{item_id}:
get:
security:
- bearerAuth: []
parameters:
- $ref: '#/components/parameters/RunID'
- $ref: '#/components/parameters/ItemID'
responses:
'200':
description: batch import run item detail
content:
application/json:
schema:
$ref: '#/components/schemas/BatchImportRunItemDetail'
'401':
$ref: '#/components/responses/Unauthorized'
'404':
description: run or item not found
/api/import-batches/{batchID}:
get:
deprecated: true
security:
- bearerAuth: []
parameters:
- $ref: '#/components/parameters/BatchID'
responses:
'200':
description: batch detail
description: v1 legacy batch detail
'401':
$ref: '#/components/responses/Unauthorized'
/api/import-batches/{batchID}/rollback:
post:
deprecated: true
security:
- bearerAuth: []
parameters:
@@ -362,6 +447,18 @@ components:
schema:
type: integer
format: int64
RunID:
name: run_id
in: path
required: true
schema:
type: string
ItemID:
name: item_id
in: path
required: true
schema:
type: string
ProviderID:
name: providerID
in: path
@@ -494,6 +591,274 @@ components:
type: array
items:
$ref: '#/components/schemas/PackProviderInfo'
BatchImportEntry:
type: object
required: [base_url, api_key]
properties:
base_url:
type: string
api_key:
type: string
requested_models:
type: array
items:
type: string
CreateBatchImportRunRequestBase:
type: object
required: [host_id, mode, access_mode, entries]
properties:
host_id:
type: string
mode:
type: string
enum: [strict, partial]
access_mode:
type: string
enum: [subscription, self_service]
confirm_wait_timeout_sec:
type: integer
subscription_users:
type: array
items:
type: string
subscription_days:
type: integer
probe_api_key:
type: string
entries:
type: array
items:
$ref: '#/components/schemas/BatchImportEntry'
CreateBatchImportRunSubscriptionRequest:
allOf:
- $ref: '#/components/schemas/CreateBatchImportRunRequestBase'
- type: object
required: [subscription_users, subscription_days]
properties:
access_mode:
type: string
enum: [subscription]
CreateBatchImportRunSelfServiceRequest:
allOf:
- $ref: '#/components/schemas/CreateBatchImportRunRequestBase'
- type: object
required: [probe_api_key]
properties:
access_mode:
type: string
enum: [self_service]
CreateBatchImportRunRequest:
oneOf:
- $ref: '#/components/schemas/CreateBatchImportRunSubscriptionRequest'
- $ref: '#/components/schemas/CreateBatchImportRunSelfServiceRequest'
BatchImportRunSummary:
type: object
properties:
run_id:
type: string
state:
type: string
enum: [running, completed, completed_with_warnings, failed, cancelled]
mode:
type: string
enum: [strict, partial]
access_mode:
type: string
enum: [subscription, self_service]
total_items:
type: integer
completed_items:
type: integer
active_items:
type: integer
degraded_items:
type: integer
broken_items:
type: integer
warning_items:
type: integer
started_at:
type: string
format: date-time
finished_at:
type: string
format: date-time
nullable: true
BatchImportRunCreateResponse:
type: object
properties:
run_id:
type: string
state:
type: string
result_page:
type: string
total_items:
type: integer
active_items:
type: integer
degraded_items:
type: integer
broken_items:
type: integer
warning_items:
type: integer
ListBatchImportRunsResponse:
type: object
properties:
runs:
type: array
items:
$ref: '#/components/schemas/BatchImportRunSummary'
BatchImportCapabilityTransportProfile:
type: object
properties:
supports_openai_models:
type: boolean
supports_openai_chat_completions:
type: boolean
supports_openai_responses:
type: boolean
supports_anthropic_messages:
type: boolean
auth_style:
type: string
model_id_style:
type: string
known_advisories:
type: array
items:
type: string
BatchImportCapabilityModelProfile:
type: object
properties:
raw_model_id:
type: string
normalized_model_id:
type: string
supports_stream:
type: string
supports_tools:
type: string
supports_reasoning_fields:
type: string
smoke_chat_ok:
type: boolean
BatchImportCapabilityProfile:
type: object
properties:
transport_profile:
$ref: '#/components/schemas/BatchImportCapabilityTransportProfile'
model_profiles:
type: array
items:
$ref: '#/components/schemas/BatchImportCapabilityModelProfile'
BatchImportRunItemSummary:
type: object
properties:
item_id:
type: string
base_url:
type: string
provider_id:
type: string
requested_models:
type: array
items:
type: string
resolved_smoke_model:
type: string
nullable: true
current_stage:
type: string
enum: [probe, provision, confirm, validate, done]
confirmation_status:
type: string
enum: [pending, confirmed, advisory, failed]
access_status:
type: string
enum: [unknown, active, degraded, broken]
retry_count:
type: integer
last_retry_at:
type: string
format: date-time
nullable: true
advisory_messages:
type: array
items:
type: string
last_error_stage:
type: string
nullable: true
last_error:
type: string
nullable: true
BatchImportRunItemEvent:
type: object
properties:
event_id:
type: string
event_type:
type: string
stage:
type: string
attempt:
type: integer
message:
type: string
payload_json:
type: string
created_at:
type: string
format: date-time
BatchImportRunItemDetail:
allOf:
- $ref: '#/components/schemas/BatchImportRunItemSummary'
- type: object
properties:
raw_models:
type: array
items:
type: string
normalized_models:
type: array
items:
type: string
recommended_models:
type: array
items:
type: string
channel_id:
type: integer
format: int64
nullable: true
account_id:
type: integer
format: int64
nullable: true
capability_profile:
$ref: '#/components/schemas/BatchImportCapabilityProfile'
events:
type: array
items:
$ref: '#/components/schemas/BatchImportRunItemEvent'
BatchImportRunDetail:
type: object
properties:
run:
$ref: '#/components/schemas/BatchImportRunSummary'
recent_warnings:
type: array
items:
type: string
ListBatchImportRunItemsResponse:
type: object
properties:
items:
type: array
items:
$ref: '#/components/schemas/BatchImportRunItemSummary'
ImportBatchInfo:
type: object
properties: