test(integration): cover batch import v2 flows
This commit is contained in:
395
tests/integration/batch_import_v2_test.go
Normal file
395
tests/integration/batch_import_v2_test.go
Normal file
@@ -0,0 +1,395 @@
|
||||
package integration_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"sub2api-cn-relay-manager/internal/batch"
|
||||
"sub2api-cn-relay-manager/internal/host/sub2api"
|
||||
"sub2api-cn-relay-manager/internal/probe"
|
||||
"sub2api-cn-relay-manager/internal/store/sqlite"
|
||||
)
|
||||
|
||||
func TestBatchImportV2(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
harness := newBatchImportV2Harness(t)
|
||||
defer harness.Close(t)
|
||||
|
||||
provisioner := &batchImportProvisionerStub{}
|
||||
runID, err := harness.RunBatchImport(ctx, provisioner)
|
||||
if err != nil {
|
||||
t.Fatalf("RunBatchImport() error = %v", err)
|
||||
}
|
||||
|
||||
if runID == "" {
|
||||
t.Fatal("runID = empty, want persisted run")
|
||||
}
|
||||
if provisioner.provisionCalls != 3 {
|
||||
t.Fatalf("provision calls = %d, want 3 for new/advisory/retry items", provisioner.provisionCalls)
|
||||
}
|
||||
if provisioner.patchCalls != 1 {
|
||||
t.Fatalf("patch calls = %d, want 1 for alias-only reuse flow", provisioner.patchCalls)
|
||||
}
|
||||
if got := provisioner.lastPatch.Contract.ModelMapping["Kimi-K2.6"]; got != "kimi-2.6" {
|
||||
t.Fatalf("patch mapping = %#v, want raw alias mapped to canonical family", provisioner.lastPatch.Contract.ModelMapping)
|
||||
}
|
||||
|
||||
run, err := harness.store.ImportRuns().GetByRunID(ctx, runID)
|
||||
if err != nil {
|
||||
t.Fatalf("ImportRuns().GetByRunID() error = %v", err)
|
||||
}
|
||||
runView := batch.ProjectRunSummary(run)
|
||||
if runView.State != string(batch.RunStateCompleted) {
|
||||
t.Fatalf("run state = %q, want completed", runView.State)
|
||||
}
|
||||
if runView.CompletedItems != 6 || runView.ActiveItems != 6 || runView.TotalItems != 6 {
|
||||
t.Fatalf("run view = %+v, want all 6 items completed and active", runView)
|
||||
}
|
||||
|
||||
items, err := harness.store.ImportRunItems().ListByRunID(ctx, runID)
|
||||
if err != nil {
|
||||
t.Fatalf("ImportRunItems().ListByRunID() error = %v", err)
|
||||
}
|
||||
if len(items) != 6 {
|
||||
t.Fatalf("len(items) = %d, want 6", len(items))
|
||||
}
|
||||
|
||||
newItem := findItemByBaseURL(t, items, harness.baseURL+"/new")
|
||||
if got := batch.ProjectItemSummary(newItem).CanonicalModelFamilies; len(got) != 1 || got[0] != "deepseek-v4-pro" {
|
||||
t.Fatalf("new item canonical families = %#v, want [deepseek-v4-pro]", got)
|
||||
}
|
||||
|
||||
activeItem := findItemByBaseURL(t, items, harness.baseURL+"/active")
|
||||
activeView := batch.ProjectItemSummary(activeItem)
|
||||
if activeView.MatchedAccountState != string(batch.MatchedAccountStateActive) || activeView.AccountResolution != string(batch.AccountResolutionReused) || !activeView.ProvisionReused {
|
||||
t.Fatalf("active duplicate projection = %+v, want active/reused/provision_reused", activeView)
|
||||
}
|
||||
|
||||
deprecatedItem := findItemByBaseURL(t, items, harness.baseURL+"/deprecated")
|
||||
deprecatedView := batch.ProjectItemSummary(deprecatedItem)
|
||||
if deprecatedView.MatchedAccountState != string(batch.MatchedAccountStateDeprecated) || deprecatedView.AccountResolution != string(batch.AccountResolutionReactivated) || !deprecatedView.ProvisionReused {
|
||||
t.Fatalf("deprecated duplicate projection = %+v, want deprecated/reactivated/provision_reused", deprecatedView)
|
||||
}
|
||||
|
||||
advisoryItem := findItemByBaseURL(t, items, harness.baseURL+"/advisory")
|
||||
advisoryEvents, err := harness.store.ImportRunEvents().ListByItemID(ctx, advisoryItem.ItemID)
|
||||
if err != nil {
|
||||
t.Fatalf("ImportRunEvents().ListByItemID(advisory) error = %v", err)
|
||||
}
|
||||
advisoryDetail, err := batch.ProjectItemDetail(advisoryItem, advisoryEvents)
|
||||
if err != nil {
|
||||
t.Fatalf("ProjectItemDetail(advisory) error = %v", err)
|
||||
}
|
||||
if advisoryDetail.ConfirmationStatus != string(batch.ConfirmationAdvisory) || advisoryDetail.AccessStatus != string(batch.AccessStatusActive) {
|
||||
t.Fatalf("advisory detail = %+v, want advisory confirmation and active access", advisoryDetail)
|
||||
}
|
||||
if !containsString(advisoryDetail.CapabilityProfile.TransportProfile.KnownAdvisories, "initial_probe_race_expected") {
|
||||
t.Fatalf("advisory capability profile = %+v, want initial_probe_race_expected", advisoryDetail.CapabilityProfile.TransportProfile.KnownAdvisories)
|
||||
}
|
||||
if !containsSubstring(advisoryDetail.AdvisoryMessages, "异步探测尚未稳定") {
|
||||
t.Fatalf("advisory messages = %#v, want mapped probe race advisory", advisoryDetail.AdvisoryMessages)
|
||||
}
|
||||
if !containsEventType(advisoryDetail.Events, "advisory_added") {
|
||||
t.Fatalf("advisory events = %+v, want advisory_added event", advisoryDetail.Events)
|
||||
}
|
||||
|
||||
retryItem := findItemByBaseURL(t, items, harness.baseURL+"/retry")
|
||||
retryEvents, err := harness.store.ImportRunEvents().ListByItemID(ctx, retryItem.ItemID)
|
||||
if err != nil {
|
||||
t.Fatalf("ImportRunEvents().ListByItemID(retry) error = %v", err)
|
||||
}
|
||||
retryDetail, err := batch.ProjectItemDetail(retryItem, retryEvents)
|
||||
if err != nil {
|
||||
t.Fatalf("ProjectItemDetail(retry) error = %v", err)
|
||||
}
|
||||
if retryDetail.RetryCount != 1 || retryDetail.AccessStatus != string(batch.AccessStatusActive) {
|
||||
t.Fatalf("retry detail = %+v, want retry_count=1 and active access", retryDetail)
|
||||
}
|
||||
if !containsEventType(retryDetail.Events, "retry_scheduled") || !containsEventType(retryDetail.Events, "stage_transition") {
|
||||
t.Fatalf("retry events = %+v, want retry_scheduled and stage_transition", retryDetail.Events)
|
||||
}
|
||||
}
|
||||
|
||||
type batchImportV2Harness struct {
|
||||
store *sqlite.DB
|
||||
server *httptest.Server
|
||||
baseURL string
|
||||
}
|
||||
|
||||
func newBatchImportV2Harness(t *testing.T) *batchImportV2Harness {
|
||||
t.Helper()
|
||||
|
||||
store := openTestStore(t)
|
||||
server := httptest.NewServer(newBatchImportUpstreamMux())
|
||||
|
||||
return &batchImportV2Harness{
|
||||
store: store,
|
||||
server: server,
|
||||
baseURL: server.URL,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *batchImportV2Harness) Close(t *testing.T) {
|
||||
t.Helper()
|
||||
h.server.Close()
|
||||
closeTestStore(t, h.store)
|
||||
}
|
||||
|
||||
func (h *batchImportV2Harness) RunBatchImport(ctx context.Context, provisioner *batchImportProvisionerStub) (string, error) {
|
||||
service := batch.BatchImportService{
|
||||
RunStore: h.store.ImportRuns(),
|
||||
ItemStore: h.store.ImportRunItems(),
|
||||
ProbeModels: probe.ProviderModels,
|
||||
ProbeCapabilities: probe.ProbeCapabilities,
|
||||
InspectReuse: h.inspectReuse,
|
||||
Provisioner: provisioner,
|
||||
}
|
||||
|
||||
result, err := service.StartRun(ctx, batch.BatchImportRunRequest{
|
||||
RunID: "run-v2-int-001",
|
||||
HostID: "host-int-1",
|
||||
Mode: "strict",
|
||||
AccessMode: "self_service",
|
||||
Entries: []batch.BatchImportEntry{
|
||||
{BaseURL: h.baseURL + "/new", APIKey: "sk-new", RequestedModels: []string{"DeepSeek V4 Pro"}},
|
||||
{BaseURL: h.baseURL + "/active", APIKey: "sk-active", RequestedModels: []string{"kimi 2.6"}},
|
||||
{BaseURL: h.baseURL + "/deprecated", APIKey: "sk-deprecated", RequestedModels: []string{"kimi 2.6"}},
|
||||
{BaseURL: h.baseURL + "/patch", APIKey: "sk-patch", RequestedModels: []string{"kimi 2.6"}},
|
||||
{BaseURL: h.baseURL + "/advisory", APIKey: "sk-advisory", RequestedModels: []string{"kimi-k2.6"}},
|
||||
{BaseURL: h.baseURL + "/retry", APIKey: "sk-retry", RequestedModels: []string{"kimi-k2.6"}},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
worker := batch.ConfirmationWorker{
|
||||
WorkerID: "worker-int-1",
|
||||
ItemStore: batchImportConfirmationStore{store: h.store, runID: result.RunID},
|
||||
EventStore: h.store.ImportRunEvents(),
|
||||
LeaseDuration: time.Minute,
|
||||
RetryDelay: time.Second,
|
||||
Confirmer: h.confirm,
|
||||
}
|
||||
now := time.Date(2026, 5, 22, 13, 0, 0, 0, time.UTC)
|
||||
if err := worker.Tick(ctx, now); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err := worker.Tick(ctx, now.Add(2*time.Second)); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
items, err := h.store.ImportRunItems().ListByRunID(ctx, result.RunID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
validator := batch.ValidationService{
|
||||
ItemStore: h.store.ImportRunItems(),
|
||||
RunStore: h.store.ImportRuns(),
|
||||
Validator: h.validate,
|
||||
}
|
||||
for _, item := range items {
|
||||
if item.CurrentStage != string(batch.ItemStageValidate) {
|
||||
continue
|
||||
}
|
||||
if err := validator.ValidateItem(ctx, item); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
return result.RunID, nil
|
||||
}
|
||||
|
||||
func (h *batchImportV2Harness) inspectReuse(_ context.Context, input batch.ReuseLookupInput) (batch.ReuseLookupResult, error) {
|
||||
switch {
|
||||
case strings.HasSuffix(input.BaseURL, "/active"):
|
||||
return batch.ReuseLookupResult{
|
||||
ExistingProviderID: batch.NormalizeProviderID(input.BaseURL),
|
||||
ExistingAccessStatus: batch.AccessStatusActive,
|
||||
ExistingCanonicalFamilys: []string{"kimi 2.6"},
|
||||
MatchedAccountID: 201,
|
||||
MatchedAccountState: batch.MatchedAccountStateActive,
|
||||
}, nil
|
||||
case strings.HasSuffix(input.BaseURL, "/deprecated"):
|
||||
return batch.ReuseLookupResult{
|
||||
ExistingProviderID: batch.NormalizeProviderID(input.BaseURL),
|
||||
ExistingAccessStatus: batch.AccessStatusActive,
|
||||
ExistingCanonicalFamilys: []string{"kimi 2.6"},
|
||||
MatchedAccountID: 301,
|
||||
MatchedAccountState: batch.MatchedAccountStateDeprecated,
|
||||
}, nil
|
||||
case strings.HasSuffix(input.BaseURL, "/patch"):
|
||||
return batch.ReuseLookupResult{
|
||||
ExistingProviderID: batch.NormalizeProviderID(input.BaseURL),
|
||||
ExistingAccessStatus: batch.AccessStatusActive,
|
||||
ExistingCanonicalFamilys: []string{"kimi 2.6"},
|
||||
MatchedAccountID: 401,
|
||||
MatchedAccountState: batch.MatchedAccountStateActive,
|
||||
ExistingModelMapping: map[string]string{"kimi-k2.6": "kimi-2.6"},
|
||||
}, nil
|
||||
default:
|
||||
return batch.ReuseLookupResult{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (h *batchImportV2Harness) confirm(_ context.Context, item sqlite.ImportRunItem) (batch.ConfirmationResult, error) {
|
||||
switch {
|
||||
case strings.HasSuffix(item.BaseURL, "/advisory"):
|
||||
return batch.ConfirmationResult{StatusCode: http.StatusForbidden, Message: "probe race expected"}, nil
|
||||
case strings.HasSuffix(item.BaseURL, "/retry") && item.ConfirmationAttempts == 0:
|
||||
return batch.ConfirmationResult{StatusCode: http.StatusServiceUnavailable, Message: "no available accounts"}, nil
|
||||
default:
|
||||
return batch.ConfirmationResult{StatusCode: http.StatusOK, Message: "confirmation succeeded"}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (h *batchImportV2Harness) validate(_ context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) {
|
||||
return sub2api.GatewayCompletionResult{
|
||||
OK: true,
|
||||
StatusCode: http.StatusOK,
|
||||
ContentType: "application/json",
|
||||
BodyPreview: fmt.Sprintf(`{"item_id":%q,"status":"ok"}`, item.ItemID),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type batchImportProvisionerStub struct {
|
||||
provisionCalls int
|
||||
patchCalls int
|
||||
lastPatch batch.PatchProvisionRequest
|
||||
}
|
||||
|
||||
func (p *batchImportProvisionerStub) Provision(_ context.Context, req batch.ProvisionRequest) (batch.ProvisionResult, error) {
|
||||
p.provisionCalls++
|
||||
legacyBatchID := int64(800 + p.provisionCalls)
|
||||
return batch.ProvisionResult{
|
||||
LegacyBatchID: &legacyBatchID,
|
||||
LegacyProviderID: req.ProviderID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *batchImportProvisionerStub) Patch(_ context.Context, req batch.PatchProvisionRequest) error {
|
||||
p.patchCalls++
|
||||
p.lastPatch = req
|
||||
return nil
|
||||
}
|
||||
|
||||
type batchImportConfirmationStore struct {
|
||||
store *sqlite.DB
|
||||
runID string
|
||||
}
|
||||
|
||||
func (s batchImportConfirmationStore) List(ctx context.Context) ([]sqlite.ImportRunItem, error) {
|
||||
return s.store.ImportRunItems().ListByRunID(ctx, s.runID)
|
||||
}
|
||||
|
||||
func (s batchImportConfirmationStore) Upsert(ctx context.Context, item sqlite.ImportRunItem) error {
|
||||
return s.store.ImportRunItems().Upsert(ctx, item)
|
||||
}
|
||||
|
||||
func newBatchImportUpstreamMux() http.Handler {
|
||||
modelsByToken := map[string][]string{
|
||||
"sk-new": {"deepseek-ai/DeepSeek-V4-Pro"},
|
||||
"sk-active": {"kimi-k2.6"},
|
||||
"sk-deprecated": {"kimi-k2.6"},
|
||||
"sk-patch": {"Kimi-K2.6"},
|
||||
"sk-advisory": {"kimi-k2.6"},
|
||||
"sk-retry": {"kimi-k2.6"},
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/v1/models", func(w http.ResponseWriter, r *http.Request) {
|
||||
token := strings.TrimSpace(strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer "))
|
||||
models, ok := modelsByToken[token]
|
||||
if !ok {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
_, _ = w.Write([]byte(`{"error":"unauthorized"}`))
|
||||
return
|
||||
}
|
||||
|
||||
data := make([]map[string]any, 0, len(models))
|
||||
for _, model := range models {
|
||||
data = append(data, map[string]any{"id": model})
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(mustJSON(data)))
|
||||
})
|
||||
mux.HandleFunc("/v1/responses", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusForbidden)
|
||||
_, _ = w.Write([]byte(`{"error":"responses unsupported"}`))
|
||||
})
|
||||
mux.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) {
|
||||
token := strings.TrimSpace(strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer "))
|
||||
if _, ok := modelsByToken[token]; !ok {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
_, _ = w.Write([]byte(`{"error":"unauthorized"}`))
|
||||
return
|
||||
}
|
||||
if token == "sk-advisory" {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusForbidden)
|
||||
_, _ = w.Write([]byte(`{"error":"probe race expected"}`))
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(`{"id":"chatcmpl_batch_import_v2","choices":[{"index":0,"message":{"role":"assistant","content":"pong"}}]}`))
|
||||
})
|
||||
|
||||
return mux
|
||||
}
|
||||
|
||||
func findItemByBaseURL(t *testing.T, items []sqlite.ImportRunItem, baseURL string) sqlite.ImportRunItem {
|
||||
t.Helper()
|
||||
|
||||
for _, item := range items {
|
||||
if item.BaseURL == baseURL {
|
||||
return item
|
||||
}
|
||||
}
|
||||
t.Fatalf("item with base_url %q not found in %#v", baseURL, items)
|
||||
return sqlite.ImportRunItem{}
|
||||
}
|
||||
|
||||
func containsString(values []string, want string) bool {
|
||||
for _, value := range values {
|
||||
if value == want {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func containsSubstring(values []string, fragment string) bool {
|
||||
for _, value := range values {
|
||||
if strings.Contains(value, fragment) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func containsEventType(events []batch.EventProjection, want string) bool {
|
||||
for _, event := range events {
|
||||
if event.EventType == want {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func mustJSON(data []map[string]any) string {
|
||||
values := make([]string, 0, len(data))
|
||||
for _, item := range data {
|
||||
values = append(values, fmt.Sprintf(`{"id":%q}`, item["id"]))
|
||||
}
|
||||
return `{"data":[` + strings.Join(values, ",") + `]}`
|
||||
}
|
||||
Reference in New Issue
Block a user