478 lines
16 KiB
Go
478 lines
16 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"sub2api-cn-relay-manager/internal/routing"
|
|
"sub2api-cn-relay-manager/internal/store/sqlite"
|
|
)
|
|
|
|
type ResolveRouteRequest struct {
|
|
RequestID string `json:"request_id,omitempty"`
|
|
LogicalGroupID string `json:"logical_group_id"`
|
|
PublicModel string `json:"public_model"`
|
|
Scope string `json:"scope"`
|
|
SubjectID string `json:"subject_id"`
|
|
UserKey string `json:"user_key,omitempty"`
|
|
ConversationKey string `json:"conversation_key,omitempty"`
|
|
Sync bool `json:"sync,omitempty"`
|
|
}
|
|
|
|
type ResolveRouteInfo struct {
|
|
RequestID string `json:"request_id"`
|
|
Backend string `json:"backend"`
|
|
LogicalGroupID string `json:"logical_group_id"`
|
|
PublicModel string `json:"public_model"`
|
|
Scope string `json:"scope"`
|
|
SubjectID string `json:"subject_id"`
|
|
StickyKey string `json:"sticky_key"`
|
|
StickyHit bool `json:"sticky_hit"`
|
|
StickyAction string `json:"sticky_action"`
|
|
RouteID string `json:"route_id"`
|
|
RouteName string `json:"route_name,omitempty"`
|
|
ShadowGroupID string `json:"shadow_group_id"`
|
|
ShadowHostID string `json:"shadow_host_id"`
|
|
ShadowModel string `json:"shadow_model,omitempty"`
|
|
Priority int `json:"priority"`
|
|
Weight int `json:"weight"`
|
|
BoundAt string `json:"bound_at,omitempty"`
|
|
ExpiresAt string `json:"expires_at,omitempty"`
|
|
}
|
|
|
|
type resolvedRouteCandidate struct {
|
|
route sqlite.LogicalGroupRoute
|
|
routeModel sqlite.LogicalGroupRouteModel
|
|
}
|
|
|
|
type skippedResolvedRoute struct {
|
|
route sqlite.LogicalGroupRoute
|
|
failureCount int
|
|
reason string
|
|
}
|
|
|
|
type routeSelectionResult struct {
|
|
candidate resolvedRouteCandidate
|
|
fallbackUsed bool
|
|
failoverFrom *skippedResolvedRoute
|
|
}
|
|
|
|
func handleResolveRoute(w http.ResponseWriter, r *http.Request, fn func(context.Context, ResolveRouteRequest) (ResolveRouteInfo, error)) {
|
|
if fn == nil {
|
|
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "resolve-route action is not configured"})
|
|
return
|
|
}
|
|
var req ResolveRouteRequest
|
|
if err := decodeJSON(r, &req); err != nil {
|
|
writeHTTPError(w, err)
|
|
return
|
|
}
|
|
info, err := fn(r.Context(), req)
|
|
if err != nil {
|
|
writeHTTPError(w, classifyError(err))
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"resolve": info})
|
|
}
|
|
|
|
func buildResolveRouteAction(sqliteDSN string, stickyRuntime stickyStoreRuntime, writerSource *lazyRouteLogWriter) func(context.Context, ResolveRouteRequest) (ResolveRouteInfo, error) {
|
|
return func(ctx context.Context, req ResolveRouteRequest) (ResolveRouteInfo, error) {
|
|
store, err := sqlite.Open(ctx, sqliteDSN)
|
|
if err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
defer store.Close()
|
|
|
|
group, err := getLogicalGroupRow(ctx, store, req.LogicalGroupID)
|
|
if err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
if !isActiveStatus(group.Status) {
|
|
return ResolveRouteInfo{}, fmt.Errorf("logical group %q is not active", group.LogicalGroupID)
|
|
}
|
|
|
|
req, stickyKey, requestID, err := normalizeResolveRouteRequest(req)
|
|
if err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
|
|
writer, err := writerSource.get(ctx)
|
|
if err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
|
|
binding, stickyHit, err := lookupValidStickyBinding(ctx, store, stickyRuntime, stickyKey, req)
|
|
if err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
|
|
if stickyHit {
|
|
candidate, err := loadResolvedRouteCandidate(ctx, store, group.LogicalGroupID, req.PublicModel, binding.RouteID)
|
|
if err != nil {
|
|
if err := stickyRuntime.store.Delete(ctx, stickyKey); err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
} else {
|
|
info := resolveRouteInfoFromBinding(stickyRuntime.backend, stickyKey, req.Scope, req.SubjectID, candidate, binding, requestID, true, "hit")
|
|
if err := writer.AppendStickyAudit(ctx, routing.RouteStickyAuditEvent{
|
|
StickyKey: stickyKey,
|
|
StickyKeyType: req.Scope,
|
|
LogicalGroupID: req.LogicalGroupID,
|
|
PublicModel: req.PublicModel,
|
|
RouteID: candidate.route.RouteID,
|
|
Action: "hit",
|
|
ExpiresAt: binding.ExpiresAt,
|
|
}); err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
if err := writer.AppendDecision(ctx, routing.RouteDecisionEvent{
|
|
RequestID: requestID,
|
|
LogicalGroupID: req.LogicalGroupID,
|
|
PublicModel: req.PublicModel,
|
|
UserKey: resolveUserKey(req),
|
|
ConversationKey: resolveConversationKey(req),
|
|
StickyKey: stickyKey,
|
|
StickyKeyType: req.Scope,
|
|
StickyHit: true,
|
|
SelectedRouteID: candidate.route.RouteID,
|
|
SelectedShadowGroupID: candidate.route.ShadowGroupID,
|
|
}); err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
if req.Sync {
|
|
if err := writer.Flush(ctx); err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
}
|
|
return info, nil
|
|
}
|
|
}
|
|
|
|
selection, err := selectResolvedRouteCandidate(ctx, store, stickyRuntime, group, req.PublicModel)
|
|
if err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
candidate := selection.candidate
|
|
|
|
ttl, err := resolveStickyTTL(group, req.Scope)
|
|
if err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
binding = routing.StickyBinding{
|
|
LogicalGroupID: req.LogicalGroupID,
|
|
PublicModel: req.PublicModel,
|
|
RouteID: candidate.route.RouteID,
|
|
ShadowGroupID: candidate.route.ShadowGroupID,
|
|
}
|
|
if err := stickyRuntime.store.Set(ctx, stickyKey, binding, ttl); err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
stored, ok, err := stickyRuntime.store.Get(ctx, stickyKey)
|
|
if err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
if !ok {
|
|
return ResolveRouteInfo{}, fmt.Errorf("sticky binding %q not found after set", stickyKey)
|
|
}
|
|
|
|
if err := writer.AppendStickyAudit(ctx, routing.RouteStickyAuditEvent{
|
|
StickyKey: stickyKey,
|
|
StickyKeyType: req.Scope,
|
|
LogicalGroupID: req.LogicalGroupID,
|
|
PublicModel: req.PublicModel,
|
|
RouteID: candidate.route.RouteID,
|
|
Action: "bind",
|
|
ExpiresAt: stored.ExpiresAt,
|
|
}); err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
if err := writer.AppendDecision(ctx, routing.RouteDecisionEvent{
|
|
RequestID: requestID,
|
|
LogicalGroupID: req.LogicalGroupID,
|
|
PublicModel: req.PublicModel,
|
|
UserKey: resolveUserKey(req),
|
|
ConversationKey: resolveConversationKey(req),
|
|
StickyKey: stickyKey,
|
|
StickyKeyType: req.Scope,
|
|
StickyHit: false,
|
|
SelectedRouteID: candidate.route.RouteID,
|
|
SelectedShadowGroupID: candidate.route.ShadowGroupID,
|
|
FallbackUsed: selection.fallbackUsed,
|
|
}); err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
if selection.failoverFrom != nil {
|
|
if err := writer.AppendFailover(ctx, routing.RouteFailoverEvent{
|
|
RequestID: requestID,
|
|
LogicalGroupID: req.LogicalGroupID,
|
|
PublicModel: req.PublicModel,
|
|
FromRouteID: selection.failoverFrom.route.RouteID,
|
|
ToRouteID: candidate.route.RouteID,
|
|
Reason: selection.failoverFrom.reason,
|
|
FailureCount: selection.failoverFrom.failureCount,
|
|
}); err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
}
|
|
if req.Sync {
|
|
if err := writer.Flush(ctx); err != nil {
|
|
return ResolveRouteInfo{}, err
|
|
}
|
|
}
|
|
return resolveRouteInfoFromBinding(stickyRuntime.backend, stickyKey, req.Scope, req.SubjectID, candidate, stored, requestID, false, "bind"), nil
|
|
}
|
|
}
|
|
|
|
func normalizeResolveRouteRequest(req ResolveRouteRequest) (ResolveRouteRequest, string, string, error) {
|
|
req.RequestID = strings.TrimSpace(req.RequestID)
|
|
req.LogicalGroupID = strings.TrimSpace(req.LogicalGroupID)
|
|
req.PublicModel = strings.TrimSpace(req.PublicModel)
|
|
req.Scope = strings.TrimSpace(req.Scope)
|
|
req.SubjectID = strings.TrimSpace(req.SubjectID)
|
|
req.UserKey = strings.TrimSpace(req.UserKey)
|
|
req.ConversationKey = strings.TrimSpace(req.ConversationKey)
|
|
|
|
if req.LogicalGroupID == "" {
|
|
return ResolveRouteRequest{}, "", "", fmt.Errorf("logical_group_id is required")
|
|
}
|
|
if req.PublicModel == "" {
|
|
return ResolveRouteRequest{}, "", "", fmt.Errorf("public_model is required")
|
|
}
|
|
if req.Scope == "" {
|
|
return ResolveRouteRequest{}, "", "", fmt.Errorf("scope is required")
|
|
}
|
|
if req.SubjectID == "" {
|
|
return ResolveRouteRequest{}, "", "", fmt.Errorf("subject_id is required")
|
|
}
|
|
|
|
stickyKey, err := routing.BuildStickyKey(req.Scope, req.LogicalGroupID, req.PublicModel, req.SubjectID)
|
|
if err != nil {
|
|
return ResolveRouteRequest{}, "", "", err
|
|
}
|
|
requestID := req.RequestID
|
|
if requestID == "" {
|
|
requestID = fmt.Sprintf("resolve_%d", time.Now().UnixNano())
|
|
}
|
|
return req, stickyKey, requestID, nil
|
|
}
|
|
|
|
func lookupValidStickyBinding(ctx context.Context, store *sqlite.DB, runtime stickyStoreRuntime, stickyKey string, req ResolveRouteRequest) (routing.StickyBinding, bool, error) {
|
|
binding, ok, err := runtime.store.Get(ctx, stickyKey)
|
|
if err != nil || !ok {
|
|
return routing.StickyBinding{}, false, err
|
|
}
|
|
candidate, err := loadResolvedRouteCandidate(ctx, store, req.LogicalGroupID, req.PublicModel, binding.RouteID)
|
|
if err != nil {
|
|
if deleteErr := runtime.store.Delete(ctx, stickyKey); deleteErr != nil {
|
|
return routing.StickyBinding{}, false, deleteErr
|
|
}
|
|
return routing.StickyBinding{}, false, nil
|
|
}
|
|
cooldown, ok, err := runtime.store.GetCooldown(ctx, candidate.route.RouteID)
|
|
if err != nil {
|
|
return routing.StickyBinding{}, false, err
|
|
}
|
|
if ok && !isExpiredRFC3339(cooldown.Until) {
|
|
if deleteErr := runtime.store.Delete(ctx, stickyKey); deleteErr != nil {
|
|
return routing.StickyBinding{}, false, deleteErr
|
|
}
|
|
return routing.StickyBinding{}, false, nil
|
|
}
|
|
return binding, true, nil
|
|
}
|
|
|
|
func selectResolvedRouteCandidate(ctx context.Context, store *sqlite.DB, runtime stickyStoreRuntime, group sqlite.LogicalGroup, publicModel string) (routeSelectionResult, error) {
|
|
models, err := store.LogicalGroupModels().ListByLogicalGroupID(ctx, group.LogicalGroupID)
|
|
if err != nil {
|
|
return routeSelectionResult{}, err
|
|
}
|
|
if !logicalGroupHasActiveModel(models, publicModel) {
|
|
return routeSelectionResult{}, fmt.Errorf("logical group %q does not expose active model %q", group.LogicalGroupID, publicModel)
|
|
}
|
|
|
|
routes, err := store.LogicalGroupRoutes().ListByLogicalGroupID(ctx, group.LogicalGroupID)
|
|
if err != nil {
|
|
return routeSelectionResult{}, err
|
|
}
|
|
var skipped []skippedResolvedRoute
|
|
for _, route := range routes {
|
|
if !isActiveStatus(route.Status) {
|
|
continue
|
|
}
|
|
if !routeExitsCooldown(route.CooldownUntil) {
|
|
skipped = append(skipped, skippedResolvedRoute{
|
|
route: route,
|
|
reason: "route_cooldown_until_active",
|
|
})
|
|
continue
|
|
}
|
|
failureState, ok, err := runtime.store.GetRouteFailure(ctx, route.RouteID)
|
|
if err != nil {
|
|
return routeSelectionResult{}, err
|
|
}
|
|
if ok && failureState.FailureCount >= group.FailoverThreshold {
|
|
skipped = append(skipped, skippedResolvedRoute{
|
|
route: route,
|
|
reason: failureSkipReason(failureState),
|
|
failureCount: failureState.FailureCount,
|
|
})
|
|
continue
|
|
}
|
|
cooldown, ok, err := runtime.store.GetCooldown(ctx, route.RouteID)
|
|
if err != nil {
|
|
return routeSelectionResult{}, err
|
|
}
|
|
if ok && !routeExitsCooldown(cooldown.Until) {
|
|
skipped = append(skipped, skippedResolvedRoute{
|
|
route: route,
|
|
reason: cooldownSkipReason(cooldown),
|
|
})
|
|
continue
|
|
}
|
|
routeModels, err := store.LogicalGroupRouteModels().ListByRouteID(ctx, route.RouteID)
|
|
if err != nil {
|
|
return routeSelectionResult{}, err
|
|
}
|
|
for _, routeModel := range routeModels {
|
|
if strings.TrimSpace(routeModel.PublicModel) == publicModel && isActiveStatus(routeModel.Status) {
|
|
result := routeSelectionResult{
|
|
candidate: resolvedRouteCandidate{route: route, routeModel: routeModel},
|
|
}
|
|
if len(skipped) > 0 {
|
|
result.fallbackUsed = true
|
|
result.failoverFrom = &skipped[0]
|
|
}
|
|
return result, nil
|
|
}
|
|
}
|
|
}
|
|
return routeSelectionResult{}, fmt.Errorf("no active route found for logical group %q model %q", group.LogicalGroupID, publicModel)
|
|
}
|
|
|
|
func loadResolvedRouteCandidate(ctx context.Context, store *sqlite.DB, logicalGroupID string, publicModel string, routeID string) (resolvedRouteCandidate, error) {
|
|
route, err := getLogicalGroupRouteRow(ctx, store, routeID)
|
|
if err != nil {
|
|
return resolvedRouteCandidate{}, err
|
|
}
|
|
if route.LogicalGroupID != logicalGroupID {
|
|
return resolvedRouteCandidate{}, fmt.Errorf("logical group route %q not found under logical group %q", route.RouteID, logicalGroupID)
|
|
}
|
|
if !isActiveStatus(route.Status) {
|
|
return resolvedRouteCandidate{}, fmt.Errorf("logical group route %q is not active", route.RouteID)
|
|
}
|
|
if !routeExitsCooldown(route.CooldownUntil) {
|
|
return resolvedRouteCandidate{}, fmt.Errorf("logical group route %q is cooling down", route.RouteID)
|
|
}
|
|
routeModels, err := store.LogicalGroupRouteModels().ListByRouteID(ctx, route.RouteID)
|
|
if err != nil {
|
|
return resolvedRouteCandidate{}, err
|
|
}
|
|
for _, routeModel := range routeModels {
|
|
if strings.TrimSpace(routeModel.PublicModel) == strings.TrimSpace(publicModel) && isActiveStatus(routeModel.Status) {
|
|
return resolvedRouteCandidate{route: route, routeModel: routeModel}, nil
|
|
}
|
|
}
|
|
return resolvedRouteCandidate{}, fmt.Errorf("logical group route %q does not expose active model %q", route.RouteID, publicModel)
|
|
}
|
|
|
|
func logicalGroupHasActiveModel(models []sqlite.LogicalGroupModel, publicModel string) bool {
|
|
publicModel = strings.TrimSpace(publicModel)
|
|
for _, model := range models {
|
|
if strings.TrimSpace(model.PublicModel) == publicModel && isActiveStatus(model.Status) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func resolveStickyTTL(group sqlite.LogicalGroup, scope string) (time.Duration, error) {
|
|
switch strings.ToLower(strings.TrimSpace(scope)) {
|
|
case routing.StickyScopeConversation, routing.StickyScopeSession:
|
|
return secondsToDuration(group.ConversationTTLSeconds, 0)
|
|
case routing.StickyScopeUser:
|
|
return secondsToDuration(group.UserModelTTLSeconds, 0)
|
|
default:
|
|
return 0, fmt.Errorf("unsupported sticky scope %q", scope)
|
|
}
|
|
}
|
|
|
|
func resolveRouteInfoFromBinding(backend string, stickyKey string, scope string, subjectID string, candidate resolvedRouteCandidate, binding routing.StickyBinding, requestID string, stickyHit bool, stickyAction string) ResolveRouteInfo {
|
|
return ResolveRouteInfo{
|
|
RequestID: requestID,
|
|
Backend: backend,
|
|
LogicalGroupID: binding.LogicalGroupID,
|
|
PublicModel: binding.PublicModel,
|
|
Scope: strings.TrimSpace(scope),
|
|
SubjectID: strings.TrimSpace(subjectID),
|
|
StickyKey: stickyKey,
|
|
StickyHit: stickyHit,
|
|
StickyAction: stickyAction,
|
|
RouteID: candidate.route.RouteID,
|
|
RouteName: candidate.route.Name,
|
|
ShadowGroupID: candidate.route.ShadowGroupID,
|
|
ShadowHostID: candidate.route.ShadowHostID,
|
|
ShadowModel: candidate.routeModel.ShadowModel,
|
|
Priority: candidate.route.Priority,
|
|
Weight: candidate.route.Weight,
|
|
BoundAt: binding.BoundAt,
|
|
ExpiresAt: binding.ExpiresAt,
|
|
}
|
|
}
|
|
|
|
func resolveUserKey(req ResolveRouteRequest) string {
|
|
if req.UserKey != "" {
|
|
return req.UserKey
|
|
}
|
|
if strings.EqualFold(req.Scope, routing.StickyScopeUser) {
|
|
return req.SubjectID
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func resolveConversationKey(req ResolveRouteRequest) string {
|
|
if req.ConversationKey != "" {
|
|
return req.ConversationKey
|
|
}
|
|
if strings.EqualFold(req.Scope, routing.StickyScopeConversation) || strings.EqualFold(req.Scope, routing.StickyScopeSession) {
|
|
return req.SubjectID
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func isActiveStatus(status string) bool {
|
|
return strings.EqualFold(strings.TrimSpace(status), "active")
|
|
}
|
|
|
|
func isExpiredRFC3339(raw string) bool {
|
|
raw = strings.TrimSpace(raw)
|
|
if raw == "" {
|
|
return true
|
|
}
|
|
until, err := time.Parse(time.RFC3339, raw)
|
|
if err != nil {
|
|
return true
|
|
}
|
|
return !until.After(time.Now().UTC())
|
|
}
|
|
|
|
func routeExitsCooldown(raw string) bool {
|
|
return isExpiredRFC3339(raw)
|
|
}
|
|
|
|
func failureSkipReason(state routing.RouteFailureState) string {
|
|
if reason := strings.TrimSpace(state.LastErrorClass); reason != "" {
|
|
return "failure_threshold_exceeded:" + reason
|
|
}
|
|
return "failure_threshold_exceeded"
|
|
}
|
|
|
|
func cooldownSkipReason(state routing.RouteCooldownState) string {
|
|
if reason := strings.TrimSpace(state.Reason); reason != "" {
|
|
return "active_cooldown:" + reason
|
|
}
|
|
return "active_cooldown"
|
|
}
|