feat(batch): add result projection for v2 runs and items
This commit is contained in:
254
internal/batch/status_projection.go
Normal file
254
internal/batch/status_projection.go
Normal file
@@ -0,0 +1,254 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"sub2api-cn-relay-manager/internal/probe"
|
||||
"sub2api-cn-relay-manager/internal/store/sqlite"
|
||||
)
|
||||
|
||||
type ProjectionBadge struct {
|
||||
Kind string `json:"kind"`
|
||||
Tone string `json:"tone"`
|
||||
Label string `json:"label"`
|
||||
}
|
||||
|
||||
type RunSummaryProjection struct {
|
||||
RunID string `json:"run_id"`
|
||||
State string `json:"state"`
|
||||
StateBadge ProjectionBadge `json:"state_badge"`
|
||||
Mode string `json:"mode"`
|
||||
AccessMode string `json:"access_mode"`
|
||||
TotalItems int `json:"total_items"`
|
||||
CompletedItems int `json:"completed_items"`
|
||||
ActiveItems int `json:"active_items"`
|
||||
DegradedItems int `json:"degraded_items"`
|
||||
BrokenItems int `json:"broken_items"`
|
||||
WarningItems int `json:"warning_items"`
|
||||
StartedAt string `json:"started_at"`
|
||||
FinishedAt string `json:"finished_at"`
|
||||
RecentWarnings []string `json:"recent_warnings"`
|
||||
}
|
||||
|
||||
type ItemSummaryProjection struct {
|
||||
ItemID string `json:"item_id"`
|
||||
BaseURL string `json:"base_url"`
|
||||
ProviderID string `json:"provider_id"`
|
||||
APIKeyFingerprint string `json:"api_key_fingerprint"`
|
||||
RequestedModels []string `json:"requested_models"`
|
||||
CanonicalModelFamilies []string `json:"canonical_model_families"`
|
||||
ResolvedSmokeModel string `json:"resolved_smoke_model"`
|
||||
CurrentStage string `json:"current_stage"`
|
||||
ConfirmationStatus string `json:"confirmation_status"`
|
||||
AccessStatus string `json:"access_status"`
|
||||
MatchedAccountState string `json:"matched_account_state"`
|
||||
AccountResolution string `json:"account_resolution"`
|
||||
ProvisionReused bool `json:"provision_reused"`
|
||||
RetryCount int `json:"retry_count"`
|
||||
LastRetryAt string `json:"last_retry_at"`
|
||||
AdvisoryMessages []string `json:"advisory_messages"`
|
||||
LastErrorStage string `json:"last_error_stage"`
|
||||
LastError string `json:"last_error"`
|
||||
Badges []ProjectionBadge `json:"badges"`
|
||||
}
|
||||
|
||||
type EventProjection struct {
|
||||
EventID string `json:"event_id"`
|
||||
EventType string `json:"event_type"`
|
||||
Stage string `json:"stage"`
|
||||
Attempt int `json:"attempt"`
|
||||
Message string `json:"message"`
|
||||
PayloadJSON string `json:"payload_json"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
type ItemDetailProjection struct {
|
||||
ItemSummaryProjection
|
||||
RawModels []string `json:"raw_models"`
|
||||
NormalizedModels []string `json:"normalized_models"`
|
||||
RecommendedModels []string `json:"recommended_models"`
|
||||
ReusedFromProviderID string `json:"reused_from_provider_id"`
|
||||
ReusedFromAccountID *int64 `json:"reused_from_account_id"`
|
||||
ChannelID *int64 `json:"channel_id"`
|
||||
AccountID *int64 `json:"account_id"`
|
||||
CapabilityProfile probe.CapabilityProfile `json:"capability_profile"`
|
||||
Events []EventProjection `json:"events"`
|
||||
}
|
||||
|
||||
func ProjectRunSummary(run sqlite.ImportRun) RunSummaryProjection {
|
||||
view := RunSummaryProjection{
|
||||
RunID: run.RunID,
|
||||
State: run.State,
|
||||
StateBadge: runStateBadge(run.State),
|
||||
Mode: run.Mode,
|
||||
AccessMode: run.AccessMode,
|
||||
TotalItems: run.TotalItems,
|
||||
CompletedItems: run.CompletedItems,
|
||||
ActiveItems: run.ActiveItems,
|
||||
DegradedItems: run.DegradedItems,
|
||||
BrokenItems: run.BrokenItems,
|
||||
WarningItems: run.WarningItems,
|
||||
StartedAt: run.StartedAt,
|
||||
FinishedAt: run.FinishedAt,
|
||||
RecentWarnings: []string{},
|
||||
}
|
||||
if run.WarningItems > 0 {
|
||||
view.RecentWarnings = append(view.RecentWarnings, fmt.Sprintf("该批次包含 %d 条 advisory item,建议检查 capability profile 与 retry 轨迹", run.WarningItems))
|
||||
}
|
||||
return view
|
||||
}
|
||||
|
||||
func ProjectItemSummary(item sqlite.ImportRunItem) ItemSummaryProjection {
|
||||
view := ItemSummaryProjection{
|
||||
ItemID: item.ItemID,
|
||||
BaseURL: item.BaseURL,
|
||||
ProviderID: item.ProviderID,
|
||||
APIKeyFingerprint: item.APIKeyFingerprint,
|
||||
RequestedModels: parseJSONStringArray(item.RequestedModelsJSON),
|
||||
CanonicalModelFamilies: parseJSONStringArray(item.CanonicalFamiliesJSON),
|
||||
ResolvedSmokeModel: item.ResolvedSmokeModel,
|
||||
CurrentStage: item.CurrentStage,
|
||||
ConfirmationStatus: item.ConfirmationStatus,
|
||||
AccessStatus: item.AccessStatus,
|
||||
MatchedAccountState: item.MatchedAccountState,
|
||||
AccountResolution: item.AccountResolution,
|
||||
ProvisionReused: item.ProvisionReused,
|
||||
RetryCount: item.RetryCount,
|
||||
LastRetryAt: item.LastRetryAt,
|
||||
AdvisoryMessages: mapAdvisoryMessages(parseJSONStringArray(item.AdvisoryMessagesJSON)),
|
||||
LastErrorStage: item.LastErrorStage,
|
||||
LastError: item.LastError,
|
||||
Badges: []ProjectionBadge{},
|
||||
}
|
||||
|
||||
if item.ProvisionReused {
|
||||
view.Badges = append(view.Badges, ProjectionBadge{Kind: "reused", Tone: "cyan", Label: "reused"})
|
||||
}
|
||||
if badge, ok := matchedAccountStateBadge(item.MatchedAccountState); ok {
|
||||
view.Badges = append(view.Badges, badge)
|
||||
}
|
||||
if badge, ok := accountResolutionBadge(item.AccountResolution); ok {
|
||||
view.Badges = append(view.Badges, badge)
|
||||
}
|
||||
|
||||
return view
|
||||
}
|
||||
|
||||
func ProjectItemDetail(item sqlite.ImportRunItem, events []sqlite.ImportRunItemEvent) (ItemDetailProjection, error) {
|
||||
view := ItemDetailProjection{
|
||||
ItemSummaryProjection: ProjectItemSummary(item),
|
||||
RawModels: parseJSONStringArray(item.RawModelsJSON),
|
||||
NormalizedModels: parseJSONStringArray(item.NormalizedModelsJSON),
|
||||
RecommendedModels: parseJSONStringArray(item.RecommendedModelsJSON),
|
||||
ReusedFromProviderID: item.ReusedFromProviderID,
|
||||
ReusedFromAccountID: item.ReusedFromAccountID,
|
||||
ChannelID: item.ChannelID,
|
||||
AccountID: item.AccountID,
|
||||
CapabilityProfile: probe.CapabilityProfile{},
|
||||
Events: make([]EventProjection, 0, len(events)),
|
||||
}
|
||||
|
||||
if payload := strings.TrimSpace(item.CapabilityProfileJSON); payload != "" {
|
||||
if err := json.Unmarshal([]byte(payload), &view.CapabilityProfile); err != nil {
|
||||
return ItemDetailProjection{}, fmt.Errorf("decode capability profile: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
view.Events = append(view.Events, EventProjection{
|
||||
EventID: event.EventID,
|
||||
EventType: event.EventType,
|
||||
Stage: event.Stage,
|
||||
Attempt: event.Attempt,
|
||||
Message: event.Message,
|
||||
PayloadJSON: event.PayloadJSON,
|
||||
CreatedAt: event.CreatedAt,
|
||||
})
|
||||
}
|
||||
|
||||
return view, nil
|
||||
}
|
||||
|
||||
func parseJSONStringArray(raw string) []string {
|
||||
values := []string{}
|
||||
if err := json.Unmarshal([]byte(defaultJSONString(raw, "[]")), &values); err != nil {
|
||||
return []string{}
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
func mapAdvisoryMessages(messages []string) []string {
|
||||
mapped := make([]string, 0, len(messages))
|
||||
for _, message := range messages {
|
||||
switch strings.TrimSpace(message) {
|
||||
case "responses_unsupported_but_chat_ok":
|
||||
mapped = append(mapped, "该上游不支持 /v1/responses,系统已自动回退到 /v1/chat/completions")
|
||||
case "initial_probe_race_expected":
|
||||
mapped = append(mapped, "账号创建后宿主异步探测尚未稳定,首次 /test 已按 advisory 处理")
|
||||
case "gateway_warmup_retry_succeeded":
|
||||
mapped = append(mapped, "初次调度出现 no available accounts,短暂重试后已恢复")
|
||||
case "provision_reused":
|
||||
mapped = append(mapped, "已检测到同 URL + 同模型家族 + 健康账号,系统直接复用已有 provider")
|
||||
case "patch_only_new_aliases":
|
||||
mapped = append(mapped, "模型属于已覆盖家族,仅补充别名映射与定价,不重复创建资源")
|
||||
case "duplicate_active_account":
|
||||
mapped = append(mapped, "该账号已存在且处于启用状态,本次未重复创建,直接复用")
|
||||
case "deprecated_account_reactivated":
|
||||
mapped = append(mapped, "该账号此前处于弃用/停用状态,本次已快速启用并重新确认")
|
||||
default:
|
||||
if trimmed := strings.TrimSpace(message); trimmed != "" {
|
||||
mapped = append(mapped, trimmed)
|
||||
}
|
||||
}
|
||||
}
|
||||
return mapped
|
||||
}
|
||||
|
||||
func runStateBadge(state string) ProjectionBadge {
|
||||
switch strings.TrimSpace(state) {
|
||||
case string(RunStateRunning):
|
||||
return ProjectionBadge{Kind: "state", Tone: "blue", Label: "running"}
|
||||
case string(RunStateCompleted):
|
||||
return ProjectionBadge{Kind: "state", Tone: "green", Label: "completed"}
|
||||
case string(RunStateCompletedWithWarnings):
|
||||
return ProjectionBadge{Kind: "state", Tone: "yellow", Label: "warning"}
|
||||
case string(RunStateFailed):
|
||||
return ProjectionBadge{Kind: "state", Tone: "red", Label: "failed"}
|
||||
case string(RunStateCancelled):
|
||||
return ProjectionBadge{Kind: "state", Tone: "gray", Label: "cancelled"}
|
||||
default:
|
||||
return ProjectionBadge{Kind: "state", Tone: "gray", Label: strings.TrimSpace(state)}
|
||||
}
|
||||
}
|
||||
|
||||
func matchedAccountStateBadge(state string) (ProjectionBadge, bool) {
|
||||
switch strings.TrimSpace(state) {
|
||||
case string(MatchedAccountStateActive):
|
||||
return ProjectionBadge{Kind: "matched_account_state", Tone: "green", Label: "已启用"}, true
|
||||
case string(MatchedAccountStateDisabled):
|
||||
return ProjectionBadge{Kind: "matched_account_state", Tone: "gray", Label: "已停用"}, true
|
||||
case string(MatchedAccountStateDeprecated):
|
||||
return ProjectionBadge{Kind: "matched_account_state", Tone: "yellow", Label: "已弃用"}, true
|
||||
case string(MatchedAccountStateBroken):
|
||||
return ProjectionBadge{Kind: "matched_account_state", Tone: "red", Label: "已损坏"}, true
|
||||
default:
|
||||
return ProjectionBadge{}, false
|
||||
}
|
||||
}
|
||||
|
||||
func accountResolutionBadge(resolution string) (ProjectionBadge, bool) {
|
||||
switch strings.TrimSpace(resolution) {
|
||||
case string(AccountResolutionCreated):
|
||||
return ProjectionBadge{Kind: "account_resolution", Tone: "blue", Label: "新建"}, true
|
||||
case string(AccountResolutionReused):
|
||||
return ProjectionBadge{Kind: "account_resolution", Tone: "cyan", Label: "复用"}, true
|
||||
case string(AccountResolutionReactivated):
|
||||
return ProjectionBadge{Kind: "account_resolution", Tone: "green", Label: "已快速启用"}, true
|
||||
case string(AccountResolutionReplaced):
|
||||
return ProjectionBadge{Kind: "account_resolution", Tone: "red", Label: "已替换"}, true
|
||||
default:
|
||||
return ProjectionBadge{}, false
|
||||
}
|
||||
}
|
||||
172
internal/batch/status_projection_test.go
Normal file
172
internal/batch/status_projection_test.go
Normal file
@@ -0,0 +1,172 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"sub2api-cn-relay-manager/internal/store/sqlite"
|
||||
)
|
||||
|
||||
func TestStatusProjection(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("run summary exposes recent warnings and warning badge label", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
run := sqlite.ImportRun{
|
||||
RunID: "run-1",
|
||||
State: string(RunStateCompletedWithWarnings),
|
||||
Mode: "partial",
|
||||
AccessMode: "subscription",
|
||||
TotalItems: 2,
|
||||
CompletedItems: 2,
|
||||
ActiveItems: 1,
|
||||
DegradedItems: 1,
|
||||
BrokenItems: 0,
|
||||
WarningItems: 1,
|
||||
StartedAt: "2026-05-22T12:20:00+08:00",
|
||||
FinishedAt: "2026-05-22T12:20:07+08:00",
|
||||
}
|
||||
|
||||
view := ProjectRunSummary(run)
|
||||
if view.StateBadge.Label != "warning" {
|
||||
t.Fatalf("StateBadge.Label = %q, want warning", view.StateBadge.Label)
|
||||
}
|
||||
if len(view.RecentWarnings) != 1 {
|
||||
t.Fatalf("len(RecentWarnings) = %d, want 1", len(view.RecentWarnings))
|
||||
}
|
||||
if view.RecentWarnings[0] != "该批次包含 1 条 advisory item,建议检查 capability profile 与 retry 轨迹" {
|
||||
t.Fatalf("RecentWarnings[0] = %q, want canonical warning copy", view.RecentWarnings[0])
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("item summary projection maps warning copy and reuse badges", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
item := sqlite.ImportRunItem{
|
||||
ItemID: "item-1",
|
||||
RunID: "run-1",
|
||||
BaseURL: "https://kimi.example.com/v1",
|
||||
ProviderID: "kimi-a7m-7d7ac291",
|
||||
APIKeyFingerprint: "sha256:8d8c4b5f",
|
||||
RequestedModelsJSON: `["kimi-k2.6"]`,
|
||||
CanonicalFamiliesJSON: `["kimi-k2.6"]`,
|
||||
ResolvedSmokeModel: "kimi-k2.6",
|
||||
CurrentStage: string(ItemStageDone),
|
||||
ConfirmationStatus: string(ConfirmationAdvisory),
|
||||
AccessStatus: string(AccessStatusActive),
|
||||
MatchedAccountState: string(MatchedAccountStateActive),
|
||||
AccountResolution: string(AccountResolutionReused),
|
||||
ProvisionReused: true,
|
||||
RetryCount: 2,
|
||||
LastRetryAt: "2026-05-22T12:20:05+08:00",
|
||||
AdvisoryMessagesJSON: `["responses_unsupported_but_chat_ok","gateway_warmup_retry_succeeded"]`,
|
||||
LastErrorStage: string(ItemStageConfirm),
|
||||
LastError: "API returned 403: Forbidden",
|
||||
}
|
||||
|
||||
view := ProjectItemSummary(item)
|
||||
if len(view.Badges) < 3 {
|
||||
t.Fatalf("len(Badges) = %d, want at least 3", len(view.Badges))
|
||||
}
|
||||
if !hasBadge(view.Badges, "reused", "reused") {
|
||||
t.Fatalf("Badges = %#v, want reused badge", view.Badges)
|
||||
}
|
||||
if !hasBadge(view.Badges, "matched_account_state", "已启用") {
|
||||
t.Fatalf("Badges = %#v, want matched account state badge", view.Badges)
|
||||
}
|
||||
if !hasBadge(view.Badges, "account_resolution", "复用") {
|
||||
t.Fatalf("Badges = %#v, want account resolution badge", view.Badges)
|
||||
}
|
||||
if len(view.AdvisoryMessages) != 2 {
|
||||
t.Fatalf("len(AdvisoryMessages) = %d, want 2", len(view.AdvisoryMessages))
|
||||
}
|
||||
if view.AdvisoryMessages[0] != "该上游不支持 /v1/responses,系统已自动回退到 /v1/chat/completions" {
|
||||
t.Fatalf("AdvisoryMessages[0] = %q, want responses fallback copy", view.AdvisoryMessages[0])
|
||||
}
|
||||
if view.AdvisoryMessages[1] != "初次调度出现 no available accounts,短暂重试后已恢复" {
|
||||
t.Fatalf("AdvisoryMessages[1] = %q, want warmup retry copy", view.AdvisoryMessages[1])
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("item detail projection exposes capability profile and event trail", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
item := sqlite.ImportRunItem{
|
||||
ItemID: "item-2",
|
||||
RunID: "run-1",
|
||||
BaseURL: "https://kimi.example.com/v1",
|
||||
ProviderID: "kimi-a7m-7d7ac291",
|
||||
APIKeyFingerprint: "sha256:8d8c4b5f",
|
||||
RequestedModelsJSON: `["kimi-k2.6"]`,
|
||||
RawModelsJSON: `["kimi-k2.6"]`,
|
||||
NormalizedModelsJSON: `["kimi-k2.6"]`,
|
||||
CanonicalFamiliesJSON: `["kimi-k2.6"]`,
|
||||
RecommendedModelsJSON: `[]`,
|
||||
ResolvedSmokeModel: "kimi-k2.6",
|
||||
CurrentStage: string(ItemStageDone),
|
||||
ConfirmationStatus: string(ConfirmationAdvisory),
|
||||
AccessStatus: string(AccessStatusActive),
|
||||
MatchedAccountState: string(MatchedAccountStateDeprecated),
|
||||
AccountResolution: string(AccountResolutionReactivated),
|
||||
ProvisionReused: true,
|
||||
ReusedFromProviderID: "kimi-a7m-7d7ac291",
|
||||
ReusedFromAccountID: int64Ptr(4),
|
||||
RetryCount: 2,
|
||||
LastRetryAt: "2026-05-22T12:20:05+08:00",
|
||||
ChannelID: int64Ptr(12),
|
||||
AccountID: int64Ptr(4),
|
||||
AdvisoryMessagesJSON: `["responses_unsupported_but_chat_ok","initial_probe_race_expected"]`,
|
||||
LastErrorStage: string(ItemStageConfirm),
|
||||
LastError: "API returned 403: Forbidden",
|
||||
CapabilityProfileJSON: `{"transport_profile":{"supports_openai_models":true,"supports_openai_chat_completions":true,"supports_openai_responses":false,"supports_anthropic_messages":false,"auth_style":"bearer","model_id_style":"canonical","known_advisories":["responses_unsupported_but_chat_ok","initial_probe_race_expected"]},"model_profiles":[{"raw_model_id":"kimi-k2.6","normalized_model_id":"kimi-k2.6","canonical_model_family":"kimi-k2.6","supports_stream":"true","supports_tools":"unknown","supports_reasoning_fields":"unknown","smoke_chat_ok":true}]}`,
|
||||
}
|
||||
events := []sqlite.ImportRunItemEvent{
|
||||
{
|
||||
EventID: "evt-01",
|
||||
RunID: "run-1",
|
||||
ItemID: "item-2",
|
||||
EventType: "retry_scheduled",
|
||||
Stage: string(ItemStageConfirm),
|
||||
Attempt: 1,
|
||||
Message: "initial 503 no available accounts, retry scheduled",
|
||||
PayloadJSON: `{"delay_ms":500}`,
|
||||
CreatedAt: "2026-05-22T12:20:04+08:00",
|
||||
},
|
||||
}
|
||||
|
||||
view, err := ProjectItemDetail(item, events)
|
||||
if err != nil {
|
||||
t.Fatalf("ProjectItemDetail() error = %v", err)
|
||||
}
|
||||
if view.ReusedFromProviderID != "kimi-a7m-7d7ac291" {
|
||||
t.Fatalf("ReusedFromProviderID = %q, want kimi-a7m-7d7ac291", view.ReusedFromProviderID)
|
||||
}
|
||||
if view.ReusedFromAccountID == nil || *view.ReusedFromAccountID != 4 {
|
||||
t.Fatalf("ReusedFromAccountID = %#v, want 4", view.ReusedFromAccountID)
|
||||
}
|
||||
if len(view.Events) != 1 || view.Events[0].EventType != "retry_scheduled" {
|
||||
t.Fatalf("Events = %#v, want retry event trail", view.Events)
|
||||
}
|
||||
if !view.CapabilityProfile.TransportProfile.SupportsOpenAIChatCompletions {
|
||||
t.Fatal("CapabilityProfile.TransportProfile.SupportsOpenAIChatCompletions = false, want true")
|
||||
}
|
||||
if len(view.CapabilityProfile.ModelProfiles) != 1 || view.CapabilityProfile.ModelProfiles[0].CanonicalModelFamily != "kimi-k2.6" {
|
||||
t.Fatalf("CapabilityProfile.ModelProfiles = %#v, want canonical model family", view.CapabilityProfile.ModelProfiles)
|
||||
}
|
||||
if !hasBadge(view.Badges, "account_resolution", "已快速启用") {
|
||||
t.Fatalf("Badges = %#v, want reactivated badge", view.Badges)
|
||||
}
|
||||
if !hasBadge(view.Badges, "matched_account_state", "已弃用") {
|
||||
t.Fatalf("Badges = %#v, want deprecated badge", view.Badges)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func hasBadge(badges []ProjectionBadge, kind, label string) bool {
|
||||
for _, badge := range badges {
|
||||
if badge.Kind == kind && badge.Label == label {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
Reference in New Issue
Block a user