From eb2242ca6f6ca68d30abe5e1960326b3a490e364 Mon Sep 17 00:00:00 2001 From: phamnazage-jpg Date: Fri, 29 May 2026 10:00:27 +0800 Subject: [PATCH] feat(routing): add resolver failover fallback --- internal/app/route_resolve_api.go | 97 +++++++++++-- internal/app/route_resolve_api_test.go | 193 ++++++++++++++++++++++++- 2 files changed, 272 insertions(+), 18 deletions(-) diff --git a/internal/app/route_resolve_api.go b/internal/app/route_resolve_api.go index 83ab638a..37f8b7a7 100644 --- a/internal/app/route_resolve_api.go +++ b/internal/app/route_resolve_api.go @@ -48,6 +48,18 @@ type resolvedRouteCandidate struct { routeModel sqlite.LogicalGroupRouteModel } +type skippedResolvedRoute struct { + route sqlite.LogicalGroupRoute + failureCount int + reason string +} + +type routeSelectionResult struct { + candidate resolvedRouteCandidate + fallbackUsed bool + failoverFrom *skippedResolvedRoute +} + func handleResolveRoute(w http.ResponseWriter, r *http.Request, fn func(context.Context, ResolveRouteRequest) (ResolveRouteInfo, error)) { if fn == nil { writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "resolve-route action is not configured"}) @@ -139,10 +151,11 @@ func buildResolveRouteAction(sqliteDSN string, stickyRuntime stickyStoreRuntime, } } - candidate, err := selectResolvedRouteCandidate(ctx, store, stickyRuntime, group, req.PublicModel) + selection, err := selectResolvedRouteCandidate(ctx, store, stickyRuntime, group, req.PublicModel) if err != nil { return ResolveRouteInfo{}, err } + candidate := selection.candidate ttl, err := resolveStickyTTL(group, req.Scope) if err != nil { @@ -187,9 +200,23 @@ func buildResolveRouteAction(sqliteDSN string, stickyRuntime stickyStoreRuntime, StickyHit: false, SelectedRouteID: candidate.route.RouteID, SelectedShadowGroupID: candidate.route.ShadowGroupID, + FallbackUsed: selection.fallbackUsed, }); err != nil { return ResolveRouteInfo{}, err } + if selection.failoverFrom != nil { + if err := writer.AppendFailover(ctx, routing.RouteFailoverEvent{ + RequestID: requestID, + LogicalGroupID: req.LogicalGroupID, + PublicModel: req.PublicModel, + FromRouteID: selection.failoverFrom.route.RouteID, + ToRouteID: candidate.route.RouteID, + Reason: selection.failoverFrom.reason, + FailureCount: selection.failoverFrom.failureCount, + }); err != nil { + return ResolveRouteInfo{}, err + } + } if req.Sync { if err := writer.Flush(ctx); err != nil { return ResolveRouteInfo{}, err @@ -257,44 +284,72 @@ func lookupValidStickyBinding(ctx context.Context, store *sqlite.DB, runtime sti return binding, true, nil } -func selectResolvedRouteCandidate(ctx context.Context, store *sqlite.DB, runtime stickyStoreRuntime, group sqlite.LogicalGroup, publicModel string) (resolvedRouteCandidate, error) { +func selectResolvedRouteCandidate(ctx context.Context, store *sqlite.DB, runtime stickyStoreRuntime, group sqlite.LogicalGroup, publicModel string) (routeSelectionResult, error) { models, err := store.LogicalGroupModels().ListByLogicalGroupID(ctx, group.LogicalGroupID) if err != nil { - return resolvedRouteCandidate{}, err + return routeSelectionResult{}, err } if !logicalGroupHasActiveModel(models, publicModel) { - return resolvedRouteCandidate{}, fmt.Errorf("logical group %q does not expose active model %q", group.LogicalGroupID, publicModel) + return routeSelectionResult{}, fmt.Errorf("logical group %q does not expose active model %q", group.LogicalGroupID, publicModel) } routes, err := store.LogicalGroupRoutes().ListByLogicalGroupID(ctx, group.LogicalGroupID) if err != nil { - return resolvedRouteCandidate{}, err + return routeSelectionResult{}, err } + var skipped []skippedResolvedRoute for _, route := range routes { if !isActiveStatus(route.Status) { continue } - if !isExpiredRFC3339(route.CooldownUntil) { + if !routeExitsCooldown(route.CooldownUntil) { + skipped = append(skipped, skippedResolvedRoute{ + route: route, + reason: "route_cooldown_until_active", + }) + continue + } + failureState, ok, err := runtime.store.GetRouteFailure(ctx, route.RouteID) + if err != nil { + return routeSelectionResult{}, err + } + if ok && failureState.FailureCount >= group.FailoverThreshold { + skipped = append(skipped, skippedResolvedRoute{ + route: route, + reason: failureSkipReason(failureState), + failureCount: failureState.FailureCount, + }) continue } cooldown, ok, err := runtime.store.GetCooldown(ctx, route.RouteID) if err != nil { - return resolvedRouteCandidate{}, err + return routeSelectionResult{}, err } - if ok && !isExpiredRFC3339(cooldown.Until) { + if ok && !routeExitsCooldown(cooldown.Until) { + skipped = append(skipped, skippedResolvedRoute{ + route: route, + reason: cooldownSkipReason(cooldown), + }) continue } routeModels, err := store.LogicalGroupRouteModels().ListByRouteID(ctx, route.RouteID) if err != nil { - return resolvedRouteCandidate{}, err + return routeSelectionResult{}, err } for _, routeModel := range routeModels { if strings.TrimSpace(routeModel.PublicModel) == publicModel && isActiveStatus(routeModel.Status) { - return resolvedRouteCandidate{route: route, routeModel: routeModel}, nil + result := routeSelectionResult{ + candidate: resolvedRouteCandidate{route: route, routeModel: routeModel}, + } + if len(skipped) > 0 { + result.fallbackUsed = true + result.failoverFrom = &skipped[0] + } + return result, nil } } } - return resolvedRouteCandidate{}, fmt.Errorf("no active route found for logical group %q model %q", group.LogicalGroupID, publicModel) + return routeSelectionResult{}, fmt.Errorf("no active route found for logical group %q model %q", group.LogicalGroupID, publicModel) } func loadResolvedRouteCandidate(ctx context.Context, store *sqlite.DB, logicalGroupID string, publicModel string, routeID string) (resolvedRouteCandidate, error) { @@ -308,7 +363,7 @@ func loadResolvedRouteCandidate(ctx context.Context, store *sqlite.DB, logicalGr if !isActiveStatus(route.Status) { return resolvedRouteCandidate{}, fmt.Errorf("logical group route %q is not active", route.RouteID) } - if !isExpiredRFC3339(route.CooldownUntil) { + if !routeExitsCooldown(route.CooldownUntil) { return resolvedRouteCandidate{}, fmt.Errorf("logical group route %q is cooling down", route.RouteID) } routeModels, err := store.LogicalGroupRouteModels().ListByRouteID(ctx, route.RouteID) @@ -402,3 +457,21 @@ func isExpiredRFC3339(raw string) bool { } return !until.After(time.Now().UTC()) } + +func routeExitsCooldown(raw string) bool { + return isExpiredRFC3339(raw) +} + +func failureSkipReason(state routing.RouteFailureState) string { + if reason := strings.TrimSpace(state.LastErrorClass); reason != "" { + return "failure_threshold_exceeded:" + reason + } + return "failure_threshold_exceeded" +} + +func cooldownSkipReason(state routing.RouteCooldownState) string { + if reason := strings.TrimSpace(state.Reason); reason != "" { + return "active_cooldown:" + reason + } + return "active_cooldown" +} diff --git a/internal/app/route_resolve_api_test.go b/internal/app/route_resolve_api_test.go index 7e25f7ac..d757b7bf 100644 --- a/internal/app/route_resolve_api_test.go +++ b/internal/app/route_resolve_api_test.go @@ -3,8 +3,12 @@ package app import ( "context" "net/http" + "net/url" "path/filepath" "testing" + "time" + + "sub2api-cn-relay-manager/internal/routing" ) func TestAPIResolveRouteReturnsSelectedRoute(t *testing.T) { @@ -110,6 +114,14 @@ func TestNewActionSetResolveRouteFlow(t *testing.T) { }); err != nil { t.Fatalf("CreateLogicalGroupRouteModel(codex2api) error = %v", err) } + if _, err := actions.SetRouteFailure(ctx, SetRouteFailureRequest{ + RouteID: "codex2api", + FailureCount: 2, + LastErrorClass: "timeout", + TTLSeconds: 600, + }); err != nil { + t.Fatalf("SetRouteFailure(codex2api) error = %v", err) + } first, err := actions.ResolveRoute(ctx, ResolveRouteRequest{ RequestID: "req-resolve-1", @@ -122,8 +134,8 @@ func TestNewActionSetResolveRouteFlow(t *testing.T) { if err != nil { t.Fatalf("ResolveRoute(first) error = %v", err) } - if first.RouteID != "codex2api" || first.StickyHit || first.StickyAction != "bind" { - t.Fatalf("ResolveRoute(first) = %+v, want codex2api bind miss", first) + if first.RouteID != "asxs" || first.StickyHit || first.StickyAction != "bind" { + t.Fatalf("ResolveRoute(first) = %+v, want asxs bind miss", first) } second, err := actions.ResolveRoute(ctx, ResolveRouteRequest{ @@ -137,8 +149,8 @@ func TestNewActionSetResolveRouteFlow(t *testing.T) { if err != nil { t.Fatalf("ResolveRoute(second) error = %v", err) } - if second.RouteID != "codex2api" || !second.StickyHit || second.StickyAction != "hit" { - t.Fatalf("ResolveRoute(second) = %+v, want codex2api sticky hit", second) + if second.RouteID != "asxs" || !second.StickyHit || second.StickyAction != "hit" { + t.Fatalf("ResolveRoute(second) = %+v, want asxs sticky hit", second) } sticky, err := actions.GetStickyBinding(ctx, GetStickyBindingRequest{ @@ -150,8 +162,8 @@ func TestNewActionSetResolveRouteFlow(t *testing.T) { if err != nil { t.Fatalf("GetStickyBinding() error = %v", err) } - if sticky.RouteID != "codex2api" { - t.Fatalf("GetStickyBinding() = %+v, want route codex2api", sticky) + if sticky.RouteID != "asxs" { + t.Fatalf("GetStickyBinding() = %+v, want route asxs", sticky) } decisions, err := actions.ListRouteDecisionLogs(ctx, ListRouteDecisionLogsRequest{ @@ -168,6 +180,9 @@ func TestNewActionSetResolveRouteFlow(t *testing.T) { if !decisions[0].StickyHit || decisions[1].StickyHit { t.Fatalf("ListRouteDecisionLogs() = %+v, want latest hit then miss", decisions) } + if !decisions[1].FallbackUsed { + t.Fatalf("ListRouteDecisionLogs()[1] = %+v, want fallback_used true", decisions[1]) + } stickyAudits, err := actions.ListRouteStickyAudit(ctx, ListRouteStickyAuditRequest{ StickyKey: first.StickyKey, @@ -182,4 +197,170 @@ func TestNewActionSetResolveRouteFlow(t *testing.T) { if stickyAudits[0].Action != "hit" || stickyAudits[1].Action != "bind" { t.Fatalf("ListRouteStickyAudit() = %+v, want latest hit then bind", stickyAudits) } + + failovers, err := actions.ListRouteFailoverEvents(ctx, ListRouteFailoverEventsRequest{ + RequestID: "req-resolve-1", + Limit: 10, + }) + if err != nil { + t.Fatalf("ListRouteFailoverEvents() error = %v", err) + } + if len(failovers) != 1 { + t.Fatalf("ListRouteFailoverEvents() len = %d, want 1", len(failovers)) + } + if failovers[0].FromRouteID != "codex2api" || failovers[0].ToRouteID != "asxs" || failovers[0].FailureCount != 2 { + t.Fatalf("ListRouteFailoverEvents()[0] = %+v, want codex2api -> asxs failure_count 2", failovers[0]) + } +} + +func TestResolveRouteHelpers(t *testing.T) { + t.Parallel() + + req, stickyKey, requestID, err := normalizeResolveRouteRequest(ResolveRouteRequest{ + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + Scope: "conversation", + SubjectID: "conv-1", + }) + if err != nil { + t.Fatalf("normalizeResolveRouteRequest() error = %v", err) + } + if req.Scope != "conversation" || stickyKey == "" || requestID == "" { + t.Fatalf("normalizeResolveRouteRequest() = (%+v, %q, %q), want normalized values", req, stickyKey, requestID) + } + if _, _, _, err := normalizeResolveRouteRequest(ResolveRouteRequest{}); err == nil { + t.Fatal("normalizeResolveRouteRequest(empty) error = nil, want error") + } + + if got := resolveUserKey(ResolveRouteRequest{Scope: "user", SubjectID: "user-1"}); got != "user-1" { + t.Fatalf("resolveUserKey(user) = %q, want user-1", got) + } + if got := resolveConversationKey(ResolveRouteRequest{Scope: "conversation", SubjectID: "conv-1"}); got != "conv-1" { + t.Fatalf("resolveConversationKey(conversation) = %q, want conv-1", got) + } + + if got := failureSkipReason(routing.RouteFailureState{LastErrorClass: "timeout"}); got != "failure_threshold_exceeded:timeout" { + t.Fatalf("failureSkipReason() = %q, want timeout reason", got) + } + if got := cooldownSkipReason(routing.RouteCooldownState{Reason: "degraded"}); got != "active_cooldown:degraded" { + t.Fatalf("cooldownSkipReason() = %q, want degraded reason", got) + } + if !routeExitsCooldown("") { + t.Fatal("routeExitsCooldown(empty) = false, want true") + } + future := time.Now().UTC().Add(time.Minute).Format(time.RFC3339) + if routeExitsCooldown(future) { + t.Fatalf("routeExitsCooldown(%q) = true, want false", future) + } + if !isActiveStatus(" active ") { + t.Fatal("isActiveStatus(active) = false, want true") + } +} + +func TestResolveRouteWithCooldownFallback(t *testing.T) { + dsn := "file:" + filepath.ToSlash(filepath.Join(t.TempDir(), "route-resolve-cooldown.db")) + "?_busy_timeout=5000" + actions := NewActionSet(dsn) + ctx := context.Background() + + _, err := actions.CreateLogicalGroup(ctx, CreateLogicalGroupRequest{ + LogicalGroupID: "cooldown-group", + DisplayName: "Cooldown Group", + Status: "active", + RoutePolicy: "priority", + StickyMode: "conversation_preferred", + ConversationTTLSeconds: 1200, + UserModelTTLSeconds: 600, + FailoverThreshold: 2, + CooldownSeconds: 300, + }) + if err != nil { + t.Fatalf("CreateLogicalGroup() error = %v", err) + } + if _, err := actions.CreateLogicalGroupModel(ctx, CreateLogicalGroupModelRequest{ + LogicalGroupID: "cooldown-group", + PublicModel: "gpt-5.4", + Status: "active", + }); err != nil { + t.Fatalf("CreateLogicalGroupModel() error = %v", err) + } + if _, err := actions.CreateLogicalGroupRoute(ctx, CreateLogicalGroupRouteRequest{ + LogicalGroupID: "cooldown-group", + RouteID: "route-a", + Name: "Route A", + Status: "active", + Priority: 10, + ShadowGroupID: "cooldown-group__a", + ShadowHostID: "remote43", + }); err != nil { + t.Fatalf("CreateLogicalGroupRoute(route-a) error = %v", err) + } + if _, err := actions.CreateLogicalGroupRouteModel(ctx, CreateLogicalGroupRouteModelRequest{ + LogicalGroupID: "cooldown-group", + RouteID: "route-a", + PublicModel: "gpt-5.4", + Status: "active", + }); err != nil { + t.Fatalf("CreateLogicalGroupRouteModel(route-a) error = %v", err) + } + if _, err := actions.CreateLogicalGroupRoute(ctx, CreateLogicalGroupRouteRequest{ + LogicalGroupID: "cooldown-group", + RouteID: "route-b", + Name: "Route B", + Status: "active", + Priority: 20, + ShadowGroupID: "cooldown-group__b", + ShadowHostID: "remote43", + }); err != nil { + t.Fatalf("CreateLogicalGroupRoute(route-b) error = %v", err) + } + if _, err := actions.CreateLogicalGroupRouteModel(ctx, CreateLogicalGroupRouteModelRequest{ + LogicalGroupID: "cooldown-group", + RouteID: "route-b", + PublicModel: "gpt-5.4", + Status: "active", + }); err != nil { + t.Fatalf("CreateLogicalGroupRouteModel(route-b) error = %v", err) + } + if _, err := actions.SetRouteCooldown(ctx, SetRouteCooldownRequest{ + RouteID: "route-a", + Reason: "degraded", + TTLSeconds: 600, + }); err != nil { + t.Fatalf("SetRouteCooldown(route-a) error = %v", err) + } + + resolved, err := actions.ResolveRoute(ctx, ResolveRouteRequest{ + RequestID: "req-cooldown-1", + LogicalGroupID: "cooldown-group", + PublicModel: "gpt-5.4", + Scope: "conversation", + SubjectID: "conv-cooldown-1", + Sync: true, + }) + if err != nil { + t.Fatalf("ResolveRoute() error = %v", err) + } + if resolved.RouteID != "route-b" || resolved.StickyHit || resolved.StickyAction != "bind" { + t.Fatalf("ResolveRoute() = %+v, want route-b bind miss", resolved) + } + + failovers, err := actions.ListRouteFailoverEvents(ctx, ListRouteFailoverEventsRequest{ + RequestID: "req-cooldown-1", + Limit: 10, + }) + if err != nil { + t.Fatalf("ListRouteFailoverEvents() error = %v", err) + } + if len(failovers) != 1 || failovers[0].FromRouteID != "route-a" || failovers[0].ToRouteID != "route-b" { + t.Fatalf("ListRouteFailoverEvents() = %+v, want route-a -> route-b", failovers) + } + if failovers[0].Reason != "active_cooldown:degraded" { + t.Fatalf("ListRouteFailoverEvents()[0].Reason = %q, want active_cooldown:degraded", failovers[0].Reason) + } + + reqURL := &http.Request{URL: &url.URL{RawQuery: "route_id=route-a"}} + cooldownReq, cooldownErr := decodeGetRouteCooldownRequest(reqURL) + if cooldownErr != nil || cooldownReq.RouteID != "route-a" { + t.Fatalf("decodeGetRouteCooldownRequest() = (%+v, %v), want route-a nil", cooldownReq, cooldownErr) + } }