# Sub2API 模块分析报告:调度与负载均衡模块 ## 1. 模块概述 ### 1.1 模块定位 调度与负载均衡模块是Sub2API系统的请求分配核心,负责在多个上游账号之间智能分配请求,实现负载均衡、故障转移和会话保持。该模块确保系统在高并发场景下的稳定性和资源利用率。 ### 1.2 核心职责 - **账号选择**:根据负载、优先级选择最合适的账号 - **负载均衡**:将请求均匀分布到多个账号 - **故障转移**:账号故障时自动切换到备用账号 - **会话保持**:同一会话路由到同一账号 - **连接管理**:管理上游连接池 ## 2. 代码结构分析 ### 2.1 核心文件 | 文件路径 | 职责 | 代码行数 | |---------|------|----------| | `service/gateway_service.go` | 账号选择核心逻辑 | **8516行** | | `service/openai_account_scheduler.go` | OpenAI 账号调度器(加权评分) | ~2000行 | | `service/account_load_factor.go` | 负载因子计算 | ~400行 | | `service/account_pool_mode.go` | 账号池模式(备用) | ~300行 | | `service/concurrency_service.go` | 并发槽位管理 | ~1000行 | | `handler/gateway_handler.go` | 网关请求处理 | ~1800行 | > ⚠️ **修正**:`SelectAccountWithLoadAwareness` 位于 `gateway_service.go:1190`,负载评分逻辑在 `openai_account_scheduler.go`。 ### 2.2 核心数据结构 ```go // 账号选择结果 type AccountSelection struct { Account *Account // 选中的账号 SessionKey string // 粘性会话键 IsSticky bool // 是否命中粘性会话 LoadFactor float64 // 当前负载因子 } // 负载因子 type LoadFactor struct { ActiveConnections int // 活跃连接数 MaxConcurrency int // 最大并发 RPM int // 每分钟请求数 ErrorRate float64 // 错误率 Calculated float64 // 计算后的负载因子 } // 故障转移结果 type FailoverResult struct { Success bool Account *Account RetryCount int DelayMs int Error error } ``` ## 3. 功能详细分析 ### 3.1 账号选择算法 #### 3.1.1 负载感知选择 ```go // service/gateway_service.go - SelectAccountWithLoadAwareness func (s *GatewayService) SelectAccountWithLoadAwareness( ctx context.Context, groupID int64, sessionKey string, model string, failedAccountIDs []int64, ) (*Account, error) { // 1. 获取分组下所有可用账号 accounts := s.getAvailableAccounts(groupID, model) // 2. 过滤掉已失败的账号 accounts = s.filterFailedAccounts(accounts, failedAccountIDs) // 3. 计算每个账号的负载因子 for _, account := range accounts { account.LoadFactor = s.calculateLoadFactor(account) } // 4. 按负载因子排序(低的优先) sort.Slice(accounts, func(i, j int) bool { return accounts[i].LoadFactor < accounts[j].LoadFactor }) // 5. 检查粘性会话 if sessionKey != "" { stickyAccount := s.getStickyAccount(sessionKey, accounts) if stickyAccount != nil { return stickyAccount, nil } } // 6. 返回负载最低的账号 return accounts[0], nil } ``` #### 3.1.2 负载因子计算(加权评分系统) > ⚠️ **修正**:负载评分使用**可配置的加权评分系统**,不是简单的公式。 ```go // service/openai_account_scheduler.go - Weighted Scoring type AccountScore struct { AccountID int64 TotalScore float64 PriorityScore float64 // 优先级得分 LoadScore float64 // 负载得分 QueueScore float64 // 队列得分 ErrorScore float64 // 错误率得分 TTFTScore float64 // 首字节时间得分 } // 评分权重(可配置) type SchedulerScoreWeights struct { PriorityWeight float64 // 默认 1.0 LoadWeight float64 // 默认 1.0 QueueWeight float64 // 默认 0.7 ErrorWeight float64 // 默认 0.8 TTFTWeight float64 // 默认 0.5 } // 计算公式 func (s *AccountScheduler) calculateScore(account *Account) *AccountScore { score := &AccountScore{AccountID: account.ID} weights := s.getScoreWeights() // 1. 优先级得分(账号配置中的优先级,越高越优先) score.PriorityScore = float64(account.Priority) // 2. 负载得分(当前连接数 / 最大并发) activeConns := s.getActiveConnections(account.ID) score.LoadScore = 1.0 - (float64(activeConns) / float64(account.MaxConcurrency)) // 3. 队列得分(等待中的请求数) queueLen := s.getQueueLength(account.ID) score.QueueScore = 1.0 - (float64(queueLen) / float64(account.MaxConcurrency*2)) // 4. 错误率得分(最近错误率越低越好) errorRate := s.getErrorRate(account.ID) score.ErrorScore = 1.0 - errorRate // 5. TTFT 得分(首字节响应时间,归一化) avgTTFT := s.getAvgTTFT(account.ID) score.TTFTScore = calculateTTFTScore(avgTTFT) // 6. 加权总分 score.TotalScore = score.PriorityScore * weights.PriorityWeight + score.LoadScore * weights.LoadWeight + score.QueueScore * weights.QueueWeight + score.ErrorScore * weights.ErrorWeight + score.TTFTScore * weights.TTFTWeight return score } // 选择策略:top-K + 加权随机 func (s *AccountScheduler) selectByWeightedRandom(candidates []*Account) *Account { topK := s.selectTopK(candidates, 5) // 选前5名 // 加权随机,避免单一账号被过度使用 totalWeight := 0.0 for _, a := range topK { totalWeight += a.Score.TotalScore } rand := mathrand.Float64() * totalWeight cumulative := 0.0 for _, a := range topK { cumulative += a.Score.TotalScore if rand <= cumulative { return a.Account } } return topK[0] } ``` **评分维度说明**: | 维度 | 计算方式 | 权重范围 | 说明 | |------|---------|----------|------| | PriorityScore | 账号配置中的 Priority 字段 | 0-100 | 越高越优先 | | LoadScore | `1 - (活跃连接/最大并发)` | 0-1 | 空闲度 | | QueueScore | `1 - (队列长度/(最大并发×2))` | 0-1 | 等待情况 | | ErrorScore | `1 - 最近错误率` | 0-1 | 健康度 | | TTFTScore | 归一化的首字节时间 | 0-1 | 响应速度 | ### 3.2 粘性会话 #### 3.2.1 会话保持机制 ```go // service/gateway_service.go - BindStickySession func (s *GatewayService) BindStickySession(ctx context.Context, groupID int64, sessionKey string, accountID int64) error { // 1. 生成会话键 // 格式:group:{groupID}:session:{sessionHash} redisKey := fmt.Sprintf("sticky:%d:%s", groupID, sessionKey) // 2. 存储映射关系 // TTL: 1小时(可配置) err := s.redis.Set(ctx, redisKey, accountID, 1*time.Hour) if err != nil { return err } // 3. 记录会话窗口 return s.updateSessionWindow(ctx, accountID, sessionKey) } ``` #### 3.2.2 会话验证 ```go // 检查会话是否有效 func (s *GatewayService) getStickyAccount(sessionKey string, accounts []*Account) *Account { for _, account := range accounts { // 检查账号是否在会话窗口内 if s.isInSessionWindow(account, sessionKey) { // 验证会话映射 if mappedID := s.getSessionMapping(groupID, sessionKey); mappedID == account.ID { return account } } } return nil } ``` ### 3.3 故障转移 #### 3.3.1 重试策略 ```go // service/gateway_service.go - handleUpstreamError func (s *GatewayService) handleUpstreamError( ctx context.Context, account *Account, err error, attempt int, ) *FailoverResult { // 1. 检查是否可重试 if !isRetryableError(err) { return &FailoverResult{Success: false, Error: err} } // 2. 判断重试级别 switch getRetryLevel(err) { case RetryLevelSameAccount: // 同账号重试(短延迟) delay := calculateDelay(attempt, DelayTypeShort) return &FailoverResult{ RetryCount: 1, DelayMs: delay, Account: account, // 继续使用同一账号 } case RetryLevelSwitchAccount: // 切换账号(稍长延迟) nextAccount := s.selectNextAccount(account.GroupID, account.ID) delay := calculateDelay(attempt, DelayTypeMedium) return &FailoverResult{ RetryCount: 1, DelayMs: delay, Account: nextAccount, } case RetryLevelGiveUp: // 放弃重试 return &FailoverResult{Success: false, Error: err} } return &FailoverResult{Success: false, Error: err} } ``` #### 3.3.2 延迟计算 ```go // 延迟计算策略 func calculateDelay(attempt int, delayType DelayType) int { switch delayType { case DelayTypeShort: // 短延迟:100ms - 400ms return 100 + attempt*100 case DelayTypeMedium: // 中延迟:200ms - 800ms return 200 + attempt*200 case DelayTypeLong: // 长延迟:500ms - 2000ms return 500 + attempt*500 default: return 100 } } ``` #### 3.3.3 账号池模式(备用方案) > ⚠️ **修正**:`account_pool_mode.go` 是**备用/扩展方案**,主要账号选择使用加权评分系统。 ```go // service/account_pool_mode.go - 备用选择模式 type PoolMode struct { Type string // least_load/round_robin/random/priority MaxRetries int // 最大切换次数 Backoff string // 退避策略:linear/exponential } const ( PoolModeLeastLoad = "least_load" // 最小负载 PoolModeRoundRobin = "round_robin" // 轮询 PoolModeRandom = "random" // 随机 PoolModePriority = "priority" // 优先级 ) // 实际主要选择流程使用 SelectAccountWithLoadAwareness // account_pool_mode 仅在特定场景下使用 ``` **主要选择流程 vs 备用模式**: | 选择方式 | 使用位置 | 说明 | |---------|---------|------| | `SelectAccountWithLoadAwareness` | gateway_service.go:1190 | **主要流程**:加权评分 + 粘性会话 + 并发控制 | | `selectByLoadBalance` | openai_account_scheduler.go | **OpenAI 专用**:WebSocket 场景 | | `selectAccount` (PoolMode) | account_pool_mode.go | **备用**:特定配置下使用 | ### 3.4 连接管理 #### 3.4.1 连接池配置 ```go // 上游HTTP客户端配置 type UpstreamClientConfig struct { MaxIdleConns int // 最大空闲连接 MaxIdleConnsPerHost int // 每个主机最大空闲连接 IdleConnTimeout time.Duration // 空闲连接超时 DialTimeout time.Duration // 建立连接超时 ReadTimeout time.Duration // 读取超时 WriteTimeout time.Duration // 写超时 } // 默认配置 var DefaultUpstreamConfig = UpstreamClientConfig{ MaxIdleConns: 100, MaxIdleConnsPerHost: 30, IdleConnTimeout: 90 * time.Second, DialTimeout: 10 * time.Second, ReadTimeout: 120 * time.Second, WriteTimeout: 30 * time.Second, } ``` #### 3.4.2 连接跟踪 ```go // 连接跟踪服务 type ConnectionTracker struct { mutex sync.RWMutex // 本地内存:实时连接数 connections map[int64]int64 // Redis:历史统计数据 redisStats map[int64]*HistoricalStats } func (t *ConnectionTracker) Increment(accountID int64) { t.mutex.Lock() defer t.mutex.Unlock() t.connections[accountID]++ } func (t *ConnectionTracker) Decrement(accountID int64) { t.mutex.Lock() defer t.mutex.Unlock() t.connections[accountID]-- } func (t *ConnectionTracker) GetActiveCount(accountID int64) int64 { t.mutex.RLock() defer t.mutex.RUnlock() return t.connections[accountID] } ``` ## 4. 高级功能 ### 4.1 预热机制 ```go // 新账号预热 func (s *AccountWarmupService) WarmupAccount(account *Account) error { // 1. 发送轻量级请求建立连接 req := &Request{ Model: "claude-3-haiku-20240307", MaxTokens: 1, } // 2. 预热连接池 for i := 0; i < 5; i++ { _, err := s.sendRequest(account, req) if err != nil { return err } } // 3. 标记预热完成 account.WarmedUp = true return nil } ``` ### 4.2 健康检查 ```go // 账号健康检查 func (s *HealthCheckService) CheckAccountHealth(accountID int64) HealthStatus { // 1. 检查最后活跃时间 if time.Since(account.LastUsedAt) > 24*time.Hour { return HealthStatusStale } // 2. 检查错误率 errorRate := s.getErrorRate(accountID) if errorRate > 0.5 { return HealthStatusUnhealthy } // 3. 检查响应时间 avgLatency := s.getAvgLatency(accountID) if avgLatency > 10*time.Second { return HealthStatusSlow } return HealthStatusHealthy } ``` ### 4.3 自适应负载 ```go // 自适应负载调整 func (s *AdaptiveLoadService) AdjustConcurrency(account *Account) { // 1. 基于错误率调整 errorRate := s.getErrorRate(account.ID) if errorRate > 0.1 { // 错误率高,减少并发 account.MaxConcurrency = int(float64(account.MaxConcurrency) * 0.8) } // 2. 基于响应时间调整 avgLatency := s.getAvgLatency(account.ID) if avgLatency > 5*time.Second { account.MaxConcurrency = int(float64(account.MaxConcurrency) * 0.9) } // 3. 基于成功率恢复 successRate := 1 - errorRate if successRate > 0.99 && account.MaxConcurrency < account.OriginalMaxConcurrency { account.MaxConcurrency = int(float64(account.MaxConcurrency) * 1.1) } } ``` ## 5. 配置参数 ### 5.1 调度配置(config.yaml) ```yaml scheduler: # 账号选择策略 selection: strategy: "least_load" # least_load/round_robin/random/priority sticky_session: true sticky_ttl: 3600 # 故障转移配置 failover: max_retries: 3 retry_delay_ms: 100 max_retry_delay_ms: 8000 enable_same_account_retry: true # 连接池配置 connection_pool: max_idle_conns: 100 max_idle_per_host: 30 idle_timeout: 90s dial_timeout: 10s read_timeout: 120s # 健康检查 health_check: enabled: true interval: 5m timeout: 30s ``` ### 5.2 环境变量 | 变量 | 说明 | 默认值 | |------|------|--------| | `SCHEDULER_SELECTION_STRATEGY` | 选择策略 | least_load | | `SCHEDULER_MAX_RETRIES` | 最大重试次数 | 3 | | `SCHEDULER_STICKY_SESSION` | 启用粘性会话 | true | ## 6. 修改和扩展指南 ### 6.1 常见修改场景 **场景1:调整选择策略** ```go // 修改为轮询模式 func (s *GatewayService) SelectAccount(ctx context.Context, groupID int64) (*Account, error) { accounts := s.getAvailableAccounts(groupID) // 使用轮询选择 index := s.getAndIncrementIndex(groupID) return accounts[index % len(accounts)], nil } ``` **场景2:添加新的负载指标** ```go // 添加延迟作为负载指标 func (s *LoadFactorService) CalculateWithLatency(account *Account) float64 { activeConns := float64(s.getActiveConnections(account.ID)) / float64(account.MaxConcurrency) latency := s.getAvgLatency(account.ID).Seconds() / 10.0 // 归一化 return activeConns*0.7 + latency*0.3 } ``` **场景3:自定义重试策略** ```go // 实现指数退避 func calculateExponentialBackoff(attempt int) int { base := 100 maxDelay := 5000 delay := base * (1 << attempt) // 2^attempt if delay > maxDelay { delay = maxDelay } return delay } ``` ### 6.2 注意事项 1. **线程安全**:负载因子更新需要原子操作 2. **数据一致性**:Redis和本地缓存需要同步 3. **超时设置**:重试延迟要考虑用户体验 ## 7. 测试覆盖 ### 7.1 单元测试 | 测试文件 | 覆盖范围 | |----------|----------| | `account_load_factor_test.go` | 负载因子计算 | | `failover_service_test.go` | 故障转移逻辑 | | `account_pool_mode_test.go` | 账号池模式 | ### 7.2 集成测试 | 测试文件 | 场景 | |----------|------| | `e2e_gateway_test.go` | 完整调度流程 | ## 8. 监控与运维 ### 8.1 关键指标 | 指标 | 告警阈值 | 说明 | |------|----------|------| | `scheduler_failover_count` | > 100/min | 故障转移次数 | | `scheduler_avg_latency` | > 5s | 平均延迟 | | `scheduler_account_errors` | > 10% | 账号错误率 | | `scheduler_sticky_hit_rate` | < 80% | 粘性会话命中率 | ### 8.2 运维任务 | 任务 | 频率 | 说明 | |------|------|------| | 负载分析 | 每小时 | 分析账号负载分布 | | 故障统计 | 每天 | 统计故障转移情况 | | 连接池清理 | 每天 | 清理空闲连接 | ## 9. 总结 调度与负载均衡模块特点: - **智能选择**:负载感知算法确保请求分发到最合适的账号 - **高可用**:完善的故障转移机制保障服务连续性 - **会话保持**:粘性会话提升用户体验 - **可配置**:多种调度模式满足不同场景 **潜在改进点:** 1. 可增加更多自适应能力 2. 可支持更复杂的调度规则 **修改建议:** - 选择策略调整相对简单 - 重试策略需要充分测试 --- *文档版本:1.1* *最后更新:2026-03-23* *分析基于:Sub2API v0.1.104* *修正内容:负载因子计算(加权评分系统)、账号池模式(备用方案)、实际选择流程*