Files
2026-05-29 14:43:34 +08:00

636 lines
15 KiB
Go

package integration_test
import (
"context"
"database/sql"
"errors"
"fmt"
"io/fs"
"path/filepath"
"testing"
_ "modernc.org/sqlite"
"sub2api-cn-relay-manager/internal/store/migrations"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
func TestStoreInitCreatesRequiredTables(t *testing.T) {
store := openTestStore(t)
defer closeTestStore(t, store)
for _, table := range []string{"hosts", "packs", "providers"} {
if !tableExists(t, store.SQLDB(), table) {
t.Fatalf("table %q does not exist after store initialization", table)
}
}
}
func TestStoreInitEnforcesUniqueConstraints(t *testing.T) {
ctx := context.Background()
store := openTestStore(t)
defer closeTestStore(t, store)
packID, err := store.Packs().Create(ctx, sqlite.Pack{
PackID: "openai-cn-pack",
Version: "1.0.0",
Checksum: "checksum-1",
})
if err != nil {
t.Fatalf("Packs().Create() error = %v", err)
}
provider := sqlite.Provider{
PackID: packID,
ProviderID: "deepseek",
DisplayName: "DeepSeek",
BaseURL: "https://api.deepseek.com",
Platform: "openai",
}
if _, err := store.Providers().Create(ctx, provider); err != nil {
t.Fatalf("Providers().Create() first call error = %v", err)
}
if _, err := store.Providers().Create(ctx, provider); err == nil {
t.Fatal("Providers().Create() second call error = nil, want unique constraint failure")
}
}
func TestStoreInitEnforcesProviderForeignKey(t *testing.T) {
ctx := context.Background()
store := openTestStore(t)
defer closeTestStore(t, store)
_, err := store.Providers().Create(ctx, sqlite.Provider{
PackID: 9999,
ProviderID: "ghost",
DisplayName: "Ghost",
BaseURL: "https://ghost.example.com",
Platform: "openai",
})
if err == nil {
t.Fatal("Providers().Create() error = nil, want foreign key failure")
}
}
func TestStoreInitRollsBackTransaction(t *testing.T) {
ctx := context.Background()
store := openTestStore(t)
defer closeTestStore(t, store)
wantErr := errors.New("force rollback")
err := store.WithTx(ctx, func(queries *sqlite.Queries) error {
_, err := queries.Hosts.Create(ctx, sqlite.Host{
HostID: "host-1",
BaseURL: "https://host.example.com",
HostVersion: "0.1.126",
CapabilityProbeJSON: `{"supports_batch_accounts":true}`,
})
if err != nil {
return err
}
return wantErr
})
if !errors.Is(err, wantErr) {
t.Fatalf("WithTx() error = %v, want %v", err, wantErr)
}
if got := countRows(t, store.SQLDB(), "hosts"); got != 0 {
t.Fatalf("hosts row count after rollback = %d, want 0", got)
}
}
func TestStoreInitRecordsMigrationLedgerOnce(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "state.db")
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000", filepath.ToSlash(dbPath))
wantMigrations := migrationCount(t)
store1, err := sqlite.Open(context.Background(), dsn)
if err != nil {
t.Fatalf("first sqlite.Open() error = %v", err)
}
if got := countRows(t, store1.SQLDB(), "schema_migrations"); got != wantMigrations {
t.Fatalf("schema_migrations row count after first open = %d, want %d", got, wantMigrations)
}
if err := store1.Close(); err != nil {
t.Fatalf("first store.Close() error = %v", err)
}
store2, err := sqlite.Open(context.Background(), dsn)
if err != nil {
t.Fatalf("second sqlite.Open() error = %v", err)
}
defer closeTestStore(t, store2)
if got := countRows(t, store2.SQLDB(), "schema_migrations"); got != wantMigrations {
t.Fatalf("schema_migrations row count after second open = %d, want %d", got, wantMigrations)
}
}
func TestStoreAppliesLatestMigration(t *testing.T) {
store := openTestStore(t)
defer closeTestStore(t, store)
for _, table := range []string{
"import_runs",
"import_run_items",
"import_run_item_events",
"logical_groups",
"logical_group_models",
"logical_group_routes",
"logical_group_route_models",
"route_decision_logs",
"route_failover_events",
"route_sticky_audit",
"provider_accounts",
} {
if !tableExists(t, store.SQLDB(), table) {
t.Fatalf("table %q does not exist after latest migration", table)
}
}
for _, column := range []string{
"host_id",
"subscription_users_json",
"subscription_days",
"probe_api_key",
} {
if !tableColumnExists(t, store.SQLDB(), "import_runs", column) {
t.Fatalf("column %q missing from import_runs", column)
}
}
for _, column := range []string{
"api_key_fingerprint",
"canonical_model_families_json",
"matched_account_state",
"account_resolution",
"provision_reused",
"reused_from_provider_id",
"reused_from_account_id",
"lease_owner",
"lease_until",
} {
if !tableColumnExists(t, store.SQLDB(), "import_run_items", column) {
t.Fatalf("column %q missing from import_run_items", column)
}
}
for _, column := range []string{
"logical_group_id",
"display_name",
"route_policy",
"sticky_mode",
"conversation_ttl_seconds",
"user_model_ttl_seconds",
"failover_threshold",
"cooldown_seconds",
} {
if !tableColumnExists(t, store.SQLDB(), "logical_groups", column) {
t.Fatalf("column %q missing from logical_groups", column)
}
}
for _, column := range []string{
"logical_group_id",
"public_model",
"status",
} {
if !tableColumnExists(t, store.SQLDB(), "logical_group_models", column) {
t.Fatalf("column %q missing from logical_group_models", column)
}
}
for _, column := range []string{
"route_id",
"logical_group_id",
"priority",
"weight",
"shadow_group_id",
"shadow_host_id",
"upstream_base_url_hint",
"cooldown_until",
} {
if !tableColumnExists(t, store.SQLDB(), "logical_group_routes", column) {
t.Fatalf("column %q missing from logical_group_routes", column)
}
}
for _, column := range []string{
"route_id",
"public_model",
"shadow_model",
"status",
} {
if !tableColumnExists(t, store.SQLDB(), "logical_group_route_models", column) {
t.Fatalf("column %q missing from logical_group_route_models", column)
}
}
for _, column := range []string{
"request_id",
"logical_group_id",
"public_model",
"sticky_key",
"sticky_hit",
"selected_route_id",
"selected_shadow_group_id",
"fallback_used",
"upstream_status",
"latency_ms",
} {
if !tableColumnExists(t, store.SQLDB(), "route_decision_logs", column) {
t.Fatalf("column %q missing from route_decision_logs", column)
}
}
for _, column := range []string{
"request_id",
"logical_group_id",
"public_model",
"from_route_id",
"to_route_id",
"reason",
"failure_count",
} {
if !tableColumnExists(t, store.SQLDB(), "route_failover_events", column) {
t.Fatalf("column %q missing from route_failover_events", column)
}
}
for _, column := range []string{
"sticky_key",
"sticky_key_type",
"logical_group_id",
"public_model",
"route_id",
"action",
"expires_at",
} {
if !tableColumnExists(t, store.SQLDB(), "route_sticky_audit", column) {
t.Fatalf("column %q missing from route_sticky_audit", column)
}
}
for _, column := range []string{
"host_id",
"provider_id",
"route_id",
"shadow_group_id",
"host_account_id",
"key_fingerprint",
"account_status",
"last_probe_status",
"last_probe_at",
"disabled_reason",
} {
if !tableColumnExists(t, store.SQLDB(), "provider_accounts", column) {
t.Fatalf("column %q missing from provider_accounts", column)
}
}
}
func TestStoreInitEnforcesLogicalRoutingConstraints(t *testing.T) {
ctx := context.Background()
store := openTestStore(t)
defer closeTestStore(t, store)
db := store.SQLDB()
mustExecSQL(t, db, `
INSERT INTO logical_groups (
logical_group_id,
display_name,
status
) VALUES (?, ?, ?)`,
"gpt-shared",
"GPT Shared",
"active",
)
if _, err := db.ExecContext(ctx, `
INSERT INTO logical_groups (
logical_group_id,
display_name,
status
) VALUES (?, ?, ?)`,
"gpt-shared",
"GPT Shared Duplicate",
"active",
); err == nil {
t.Fatal("duplicate logical_group_id insert error = nil, want unique constraint failure")
}
mustExecSQL(t, db, `
INSERT INTO logical_group_models (
logical_group_id,
public_model
) VALUES (?, ?)`,
"gpt-shared",
"gpt-5.4",
)
if _, err := db.ExecContext(ctx, `
INSERT INTO logical_group_models (
logical_group_id,
public_model
) VALUES (?, ?)`,
"gpt-shared",
"gpt-5.4",
); err == nil {
t.Fatal("duplicate logical_group_models insert error = nil, want unique constraint failure")
}
if _, err := db.ExecContext(ctx, `
INSERT INTO logical_group_routes (
route_id,
logical_group_id,
name,
status,
priority,
shadow_group_id,
shadow_host_id
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
"route-missing-group",
"missing-group",
"Missing Group Route",
"active",
10,
"shadow-group-a",
"shadow-host-a",
); err == nil {
t.Fatal("logical_group_routes missing group error = nil, want foreign key failure")
}
mustExecSQL(t, db, `
INSERT INTO logical_group_routes (
route_id,
logical_group_id,
name,
status,
priority,
shadow_group_id,
shadow_host_id
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
"route-asxs",
"gpt-shared",
"ASXS",
"active",
10,
"gpt-shared__asxs",
"remote43",
)
if _, err := db.ExecContext(ctx, `
INSERT INTO logical_group_route_models (
route_id,
public_model,
shadow_model
) VALUES (?, ?, ?)`,
"missing-route",
"gpt-5.4",
"gpt-5.4",
); err == nil {
t.Fatal("logical_group_route_models missing route error = nil, want foreign key failure")
}
mustExecSQL(t, db, `
INSERT INTO logical_group_route_models (
route_id,
public_model,
shadow_model
) VALUES (?, ?, ?)`,
"route-asxs",
"gpt-5.4",
"gpt-5.4",
)
if _, err := db.ExecContext(ctx, `
INSERT INTO logical_group_route_models (
route_id,
public_model,
shadow_model
) VALUES (?, ?, ?)`,
"route-asxs",
"gpt-5.4",
"gpt-5.4-alt",
); err == nil {
t.Fatal("duplicate logical_group_route_models insert error = nil, want unique constraint failure")
}
}
func TestStoreInitBackfillsLedgerForCompletePreLedgerSchema(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "state.db")
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000", filepath.ToSlash(dbPath))
wantMigrations := migrationCount(t)
rawDB := openRawSQLiteDB(t, dsn)
createLegacy0001Schema(t, rawDB)
closeRawSQLiteDB(t, rawDB)
store, err := sqlite.Open(context.Background(), dsn)
if err != nil {
t.Fatalf("sqlite.Open() on complete pre-ledger schema error = %v", err)
}
defer closeTestStore(t, store)
if got := countRows(t, store.SQLDB(), "schema_migrations"); got != wantMigrations {
t.Fatalf("schema_migrations row count after backfill = %d, want %d", got, wantMigrations)
}
}
func TestStoreInitFailsWhenPreLedgerSchemaIsPartial(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "state.db")
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000", filepath.ToSlash(dbPath))
rawDB := openRawSQLiteDB(t, dsn)
mustExec(t, rawDB, `
CREATE TABLE hosts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
host_id TEXT NOT NULL UNIQUE,
base_url TEXT NOT NULL,
host_version TEXT NOT NULL,
capability_probe_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
)`)
closeRawSQLiteDB(t, rawDB)
store, err := sqlite.Open(context.Background(), dsn)
if err == nil {
closeTestStore(t, store)
t.Fatal("sqlite.Open() error = nil, want partial pre-ledger schema failure")
}
}
func openTestStore(t *testing.T) *sqlite.DB {
t.Helper()
dbPath := filepath.Join(t.TempDir(), "state.db")
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_pragma=foreign_keys(0)", filepath.ToSlash(dbPath))
store, err := sqlite.Open(context.Background(), dsn)
if err != nil {
t.Fatalf("sqlite.Open() error = %v", err)
}
return store
}
func openRawSQLiteDB(t *testing.T, dsn string) *sql.DB {
t.Helper()
db, err := sql.Open("sqlite", dsn)
if err != nil {
t.Fatalf("sql.Open() error = %v", err)
}
if err := db.PingContext(context.Background()); err != nil {
t.Fatalf("raw db PingContext() error = %v", err)
}
return db
}
func closeRawSQLiteDB(t *testing.T, db *sql.DB) {
t.Helper()
if err := db.Close(); err != nil {
t.Fatalf("raw db Close() error = %v", err)
}
}
func migrationCount(t *testing.T) int {
t.Helper()
names, err := fs.Glob(migrations.Files, "*.sql")
if err != nil {
t.Fatalf("fs.Glob(migrations.Files) error = %v", err)
}
return len(names)
}
func createLegacy0001Schema(t *testing.T, db *sql.DB) {
t.Helper()
mustExec(t, db, `
CREATE TABLE hosts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
host_id TEXT NOT NULL UNIQUE,
base_url TEXT NOT NULL,
host_version TEXT NOT NULL,
capability_probe_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
)`)
mustExec(t, db, `
CREATE TABLE packs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pack_id TEXT NOT NULL UNIQUE,
version TEXT NOT NULL,
checksum TEXT NOT NULL,
installed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
)`)
mustExec(t, db, `
CREATE TABLE providers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pack_id INTEGER NOT NULL,
provider_id TEXT NOT NULL,
display_name TEXT NOT NULL,
base_url TEXT NOT NULL,
platform TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT fk_providers_pack
FOREIGN KEY (pack_id)
REFERENCES packs(id)
ON DELETE CASCADE,
CONSTRAINT uq_providers_pack_provider
UNIQUE (pack_id, provider_id)
)`)
}
func mustExec(t *testing.T, db *sql.DB, statement string) {
t.Helper()
if _, err := db.ExecContext(context.Background(), statement); err != nil {
t.Fatalf("ExecContext() error = %v", err)
}
}
func mustExecSQL(t *testing.T, db *sql.DB, statement string, args ...any) {
t.Helper()
if _, err := db.ExecContext(context.Background(), statement, args...); err != nil {
t.Fatalf("ExecContext() error = %v", err)
}
}
func closeTestStore(t *testing.T, store *sqlite.DB) {
t.Helper()
if err := store.Close(); err != nil {
t.Fatalf("store.Close() error = %v", err)
}
}
func tableExists(t *testing.T, db *sql.DB, table string) bool {
t.Helper()
var name string
err := db.QueryRowContext(
context.Background(),
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?",
table,
).Scan(&name)
if errors.Is(err, sql.ErrNoRows) {
return false
}
if err != nil {
t.Fatalf("tableExists(%q) query error = %v", table, err)
}
return name == table
}
func countRows(t *testing.T, db *sql.DB, table string) int {
t.Helper()
var count int
query := fmt.Sprintf("SELECT COUNT(*) FROM %s", table)
if err := db.QueryRowContext(context.Background(), query).Scan(&count); err != nil {
t.Fatalf("countRows(%q) query error = %v", table, err)
}
return count
}
func tableColumnExists(t *testing.T, db *sql.DB, table, column string) bool {
t.Helper()
rows, err := db.QueryContext(context.Background(), "PRAGMA table_info("+table+")")
if err != nil {
t.Fatalf("table_info(%q) query error = %v", table, err)
}
defer rows.Close()
for rows.Next() {
var (
cid int
name string
columnType string
notNull int
defaultVal sql.NullString
pk int
)
if err := rows.Scan(&cid, &name, &columnType, &notNull, &defaultVal, &pk); err != nil {
t.Fatalf("table_info(%q) scan error = %v", table, err)
}
if name == column {
return true
}
}
if err := rows.Err(); err != nil {
t.Fatalf("table_info(%q) rows error = %v", table, err)
}
return false
}