Files
sub2api-cn-relay-manager/internal/app/route_resolve_api.go
2026-05-29 09:38:59 +08:00

405 lines
14 KiB
Go

package app
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"sub2api-cn-relay-manager/internal/routing"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
type ResolveRouteRequest struct {
RequestID string `json:"request_id,omitempty"`
LogicalGroupID string `json:"logical_group_id"`
PublicModel string `json:"public_model"`
Scope string `json:"scope"`
SubjectID string `json:"subject_id"`
UserKey string `json:"user_key,omitempty"`
ConversationKey string `json:"conversation_key,omitempty"`
Sync bool `json:"sync,omitempty"`
}
type ResolveRouteInfo struct {
RequestID string `json:"request_id"`
Backend string `json:"backend"`
LogicalGroupID string `json:"logical_group_id"`
PublicModel string `json:"public_model"`
Scope string `json:"scope"`
SubjectID string `json:"subject_id"`
StickyKey string `json:"sticky_key"`
StickyHit bool `json:"sticky_hit"`
StickyAction string `json:"sticky_action"`
RouteID string `json:"route_id"`
RouteName string `json:"route_name,omitempty"`
ShadowGroupID string `json:"shadow_group_id"`
ShadowHostID string `json:"shadow_host_id"`
ShadowModel string `json:"shadow_model,omitempty"`
Priority int `json:"priority"`
Weight int `json:"weight"`
BoundAt string `json:"bound_at,omitempty"`
ExpiresAt string `json:"expires_at,omitempty"`
}
type resolvedRouteCandidate struct {
route sqlite.LogicalGroupRoute
routeModel sqlite.LogicalGroupRouteModel
}
func handleResolveRoute(w http.ResponseWriter, r *http.Request, fn func(context.Context, ResolveRouteRequest) (ResolveRouteInfo, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "resolve-route action is not configured"})
return
}
var req ResolveRouteRequest
if err := decodeJSON(r, &req); err != nil {
writeHTTPError(w, err)
return
}
info, err := fn(r.Context(), req)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
writeJSON(w, http.StatusOK, map[string]any{"resolve": info})
}
func buildResolveRouteAction(sqliteDSN string, stickyRuntime stickyStoreRuntime, writerSource *lazyRouteLogWriter) func(context.Context, ResolveRouteRequest) (ResolveRouteInfo, error) {
return func(ctx context.Context, req ResolveRouteRequest) (ResolveRouteInfo, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return ResolveRouteInfo{}, err
}
defer store.Close()
group, err := getLogicalGroupRow(ctx, store, req.LogicalGroupID)
if err != nil {
return ResolveRouteInfo{}, err
}
if !isActiveStatus(group.Status) {
return ResolveRouteInfo{}, fmt.Errorf("logical group %q is not active", group.LogicalGroupID)
}
req, stickyKey, requestID, err := normalizeResolveRouteRequest(req)
if err != nil {
return ResolveRouteInfo{}, err
}
writer, err := writerSource.get(ctx)
if err != nil {
return ResolveRouteInfo{}, err
}
binding, stickyHit, err := lookupValidStickyBinding(ctx, store, stickyRuntime, stickyKey, req)
if err != nil {
return ResolveRouteInfo{}, err
}
if stickyHit {
candidate, err := loadResolvedRouteCandidate(ctx, store, group.LogicalGroupID, req.PublicModel, binding.RouteID)
if err != nil {
if err := stickyRuntime.store.Delete(ctx, stickyKey); err != nil {
return ResolveRouteInfo{}, err
}
} else {
info := resolveRouteInfoFromBinding(stickyRuntime.backend, stickyKey, req.Scope, req.SubjectID, candidate, binding, requestID, true, "hit")
if err := writer.AppendStickyAudit(ctx, routing.RouteStickyAuditEvent{
StickyKey: stickyKey,
StickyKeyType: req.Scope,
LogicalGroupID: req.LogicalGroupID,
PublicModel: req.PublicModel,
RouteID: candidate.route.RouteID,
Action: "hit",
ExpiresAt: binding.ExpiresAt,
}); err != nil {
return ResolveRouteInfo{}, err
}
if err := writer.AppendDecision(ctx, routing.RouteDecisionEvent{
RequestID: requestID,
LogicalGroupID: req.LogicalGroupID,
PublicModel: req.PublicModel,
UserKey: resolveUserKey(req),
ConversationKey: resolveConversationKey(req),
StickyKey: stickyKey,
StickyKeyType: req.Scope,
StickyHit: true,
SelectedRouteID: candidate.route.RouteID,
SelectedShadowGroupID: candidate.route.ShadowGroupID,
}); err != nil {
return ResolveRouteInfo{}, err
}
if req.Sync {
if err := writer.Flush(ctx); err != nil {
return ResolveRouteInfo{}, err
}
}
return info, nil
}
}
candidate, err := selectResolvedRouteCandidate(ctx, store, stickyRuntime, group, req.PublicModel)
if err != nil {
return ResolveRouteInfo{}, err
}
ttl, err := resolveStickyTTL(group, req.Scope)
if err != nil {
return ResolveRouteInfo{}, err
}
binding = routing.StickyBinding{
LogicalGroupID: req.LogicalGroupID,
PublicModel: req.PublicModel,
RouteID: candidate.route.RouteID,
ShadowGroupID: candidate.route.ShadowGroupID,
}
if err := stickyRuntime.store.Set(ctx, stickyKey, binding, ttl); err != nil {
return ResolveRouteInfo{}, err
}
stored, ok, err := stickyRuntime.store.Get(ctx, stickyKey)
if err != nil {
return ResolveRouteInfo{}, err
}
if !ok {
return ResolveRouteInfo{}, fmt.Errorf("sticky binding %q not found after set", stickyKey)
}
if err := writer.AppendStickyAudit(ctx, routing.RouteStickyAuditEvent{
StickyKey: stickyKey,
StickyKeyType: req.Scope,
LogicalGroupID: req.LogicalGroupID,
PublicModel: req.PublicModel,
RouteID: candidate.route.RouteID,
Action: "bind",
ExpiresAt: stored.ExpiresAt,
}); err != nil {
return ResolveRouteInfo{}, err
}
if err := writer.AppendDecision(ctx, routing.RouteDecisionEvent{
RequestID: requestID,
LogicalGroupID: req.LogicalGroupID,
PublicModel: req.PublicModel,
UserKey: resolveUserKey(req),
ConversationKey: resolveConversationKey(req),
StickyKey: stickyKey,
StickyKeyType: req.Scope,
StickyHit: false,
SelectedRouteID: candidate.route.RouteID,
SelectedShadowGroupID: candidate.route.ShadowGroupID,
}); err != nil {
return ResolveRouteInfo{}, err
}
if req.Sync {
if err := writer.Flush(ctx); err != nil {
return ResolveRouteInfo{}, err
}
}
return resolveRouteInfoFromBinding(stickyRuntime.backend, stickyKey, req.Scope, req.SubjectID, candidate, stored, requestID, false, "bind"), nil
}
}
func normalizeResolveRouteRequest(req ResolveRouteRequest) (ResolveRouteRequest, string, string, error) {
req.RequestID = strings.TrimSpace(req.RequestID)
req.LogicalGroupID = strings.TrimSpace(req.LogicalGroupID)
req.PublicModel = strings.TrimSpace(req.PublicModel)
req.Scope = strings.TrimSpace(req.Scope)
req.SubjectID = strings.TrimSpace(req.SubjectID)
req.UserKey = strings.TrimSpace(req.UserKey)
req.ConversationKey = strings.TrimSpace(req.ConversationKey)
if req.LogicalGroupID == "" {
return ResolveRouteRequest{}, "", "", fmt.Errorf("logical_group_id is required")
}
if req.PublicModel == "" {
return ResolveRouteRequest{}, "", "", fmt.Errorf("public_model is required")
}
if req.Scope == "" {
return ResolveRouteRequest{}, "", "", fmt.Errorf("scope is required")
}
if req.SubjectID == "" {
return ResolveRouteRequest{}, "", "", fmt.Errorf("subject_id is required")
}
stickyKey, err := routing.BuildStickyKey(req.Scope, req.LogicalGroupID, req.PublicModel, req.SubjectID)
if err != nil {
return ResolveRouteRequest{}, "", "", err
}
requestID := req.RequestID
if requestID == "" {
requestID = fmt.Sprintf("resolve_%d", time.Now().UnixNano())
}
return req, stickyKey, requestID, nil
}
func lookupValidStickyBinding(ctx context.Context, store *sqlite.DB, runtime stickyStoreRuntime, stickyKey string, req ResolveRouteRequest) (routing.StickyBinding, bool, error) {
binding, ok, err := runtime.store.Get(ctx, stickyKey)
if err != nil || !ok {
return routing.StickyBinding{}, false, err
}
candidate, err := loadResolvedRouteCandidate(ctx, store, req.LogicalGroupID, req.PublicModel, binding.RouteID)
if err != nil {
if deleteErr := runtime.store.Delete(ctx, stickyKey); deleteErr != nil {
return routing.StickyBinding{}, false, deleteErr
}
return routing.StickyBinding{}, false, nil
}
cooldown, ok, err := runtime.store.GetCooldown(ctx, candidate.route.RouteID)
if err != nil {
return routing.StickyBinding{}, false, err
}
if ok && !isExpiredRFC3339(cooldown.Until) {
if deleteErr := runtime.store.Delete(ctx, stickyKey); deleteErr != nil {
return routing.StickyBinding{}, false, deleteErr
}
return routing.StickyBinding{}, false, nil
}
return binding, true, nil
}
func selectResolvedRouteCandidate(ctx context.Context, store *sqlite.DB, runtime stickyStoreRuntime, group sqlite.LogicalGroup, publicModel string) (resolvedRouteCandidate, error) {
models, err := store.LogicalGroupModels().ListByLogicalGroupID(ctx, group.LogicalGroupID)
if err != nil {
return resolvedRouteCandidate{}, err
}
if !logicalGroupHasActiveModel(models, publicModel) {
return resolvedRouteCandidate{}, fmt.Errorf("logical group %q does not expose active model %q", group.LogicalGroupID, publicModel)
}
routes, err := store.LogicalGroupRoutes().ListByLogicalGroupID(ctx, group.LogicalGroupID)
if err != nil {
return resolvedRouteCandidate{}, err
}
for _, route := range routes {
if !isActiveStatus(route.Status) {
continue
}
if !isExpiredRFC3339(route.CooldownUntil) {
continue
}
cooldown, ok, err := runtime.store.GetCooldown(ctx, route.RouteID)
if err != nil {
return resolvedRouteCandidate{}, err
}
if ok && !isExpiredRFC3339(cooldown.Until) {
continue
}
routeModels, err := store.LogicalGroupRouteModels().ListByRouteID(ctx, route.RouteID)
if err != nil {
return resolvedRouteCandidate{}, err
}
for _, routeModel := range routeModels {
if strings.TrimSpace(routeModel.PublicModel) == publicModel && isActiveStatus(routeModel.Status) {
return resolvedRouteCandidate{route: route, routeModel: routeModel}, nil
}
}
}
return resolvedRouteCandidate{}, fmt.Errorf("no active route found for logical group %q model %q", group.LogicalGroupID, publicModel)
}
func loadResolvedRouteCandidate(ctx context.Context, store *sqlite.DB, logicalGroupID string, publicModel string, routeID string) (resolvedRouteCandidate, error) {
route, err := getLogicalGroupRouteRow(ctx, store, routeID)
if err != nil {
return resolvedRouteCandidate{}, err
}
if route.LogicalGroupID != logicalGroupID {
return resolvedRouteCandidate{}, fmt.Errorf("logical group route %q not found under logical group %q", route.RouteID, logicalGroupID)
}
if !isActiveStatus(route.Status) {
return resolvedRouteCandidate{}, fmt.Errorf("logical group route %q is not active", route.RouteID)
}
if !isExpiredRFC3339(route.CooldownUntil) {
return resolvedRouteCandidate{}, fmt.Errorf("logical group route %q is cooling down", route.RouteID)
}
routeModels, err := store.LogicalGroupRouteModels().ListByRouteID(ctx, route.RouteID)
if err != nil {
return resolvedRouteCandidate{}, err
}
for _, routeModel := range routeModels {
if strings.TrimSpace(routeModel.PublicModel) == strings.TrimSpace(publicModel) && isActiveStatus(routeModel.Status) {
return resolvedRouteCandidate{route: route, routeModel: routeModel}, nil
}
}
return resolvedRouteCandidate{}, fmt.Errorf("logical group route %q does not expose active model %q", route.RouteID, publicModel)
}
func logicalGroupHasActiveModel(models []sqlite.LogicalGroupModel, publicModel string) bool {
publicModel = strings.TrimSpace(publicModel)
for _, model := range models {
if strings.TrimSpace(model.PublicModel) == publicModel && isActiveStatus(model.Status) {
return true
}
}
return false
}
func resolveStickyTTL(group sqlite.LogicalGroup, scope string) (time.Duration, error) {
switch strings.ToLower(strings.TrimSpace(scope)) {
case routing.StickyScopeConversation, routing.StickyScopeSession:
return secondsToDuration(group.ConversationTTLSeconds, 0)
case routing.StickyScopeUser:
return secondsToDuration(group.UserModelTTLSeconds, 0)
default:
return 0, fmt.Errorf("unsupported sticky scope %q", scope)
}
}
func resolveRouteInfoFromBinding(backend string, stickyKey string, scope string, subjectID string, candidate resolvedRouteCandidate, binding routing.StickyBinding, requestID string, stickyHit bool, stickyAction string) ResolveRouteInfo {
return ResolveRouteInfo{
RequestID: requestID,
Backend: backend,
LogicalGroupID: binding.LogicalGroupID,
PublicModel: binding.PublicModel,
Scope: strings.TrimSpace(scope),
SubjectID: strings.TrimSpace(subjectID),
StickyKey: stickyKey,
StickyHit: stickyHit,
StickyAction: stickyAction,
RouteID: candidate.route.RouteID,
RouteName: candidate.route.Name,
ShadowGroupID: candidate.route.ShadowGroupID,
ShadowHostID: candidate.route.ShadowHostID,
ShadowModel: candidate.routeModel.ShadowModel,
Priority: candidate.route.Priority,
Weight: candidate.route.Weight,
BoundAt: binding.BoundAt,
ExpiresAt: binding.ExpiresAt,
}
}
func resolveUserKey(req ResolveRouteRequest) string {
if req.UserKey != "" {
return req.UserKey
}
if strings.EqualFold(req.Scope, routing.StickyScopeUser) {
return req.SubjectID
}
return ""
}
func resolveConversationKey(req ResolveRouteRequest) string {
if req.ConversationKey != "" {
return req.ConversationKey
}
if strings.EqualFold(req.Scope, routing.StickyScopeConversation) || strings.EqualFold(req.Scope, routing.StickyScopeSession) {
return req.SubjectID
}
return ""
}
func isActiveStatus(status string) bool {
return strings.EqualFold(strings.TrimSpace(status), "active")
}
func isExpiredRFC3339(raw string) bool {
raw = strings.TrimSpace(raw)
if raw == "" {
return true
}
until, err := time.Parse(time.RFC3339, raw)
if err != nil {
return true
}
return !until.After(time.Now().UTC())
}