- Remove old review reports (keep latest only) - Move docs/ to deploy/docs-backup/ - Move performance-testing/ to deploy/ - Clean up test output files - Organize root directory
17 KiB
17 KiB
Sub2API 模块分析报告:调度与负载均衡模块
1. 模块概述
1.1 模块定位
调度与负载均衡模块是Sub2API系统的请求分配核心,负责在多个上游账号之间智能分配请求,实现负载均衡、故障转移和会话保持。该模块确保系统在高并发场景下的稳定性和资源利用率。
1.2 核心职责
- 账号选择:根据负载、优先级选择最合适的账号
- 负载均衡:将请求均匀分布到多个账号
- 故障转移:账号故障时自动切换到备用账号
- 会话保持:同一会话路由到同一账号
- 连接管理:管理上游连接池
2. 代码结构分析
2.1 核心文件
| 文件路径 | 职责 | 代码行数 |
|---|---|---|
service/gateway_service.go |
账号选择核心逻辑 | 8516行 |
service/openai_account_scheduler.go |
OpenAI 账号调度器(加权评分) | ~2000行 |
service/account_load_factor.go |
负载因子计算 | ~400行 |
service/account_pool_mode.go |
账号池模式(备用) | ~300行 |
service/concurrency_service.go |
并发槽位管理 | ~1000行 |
handler/gateway_handler.go |
网关请求处理 | ~1800行 |
⚠️ 修正:
SelectAccountWithLoadAwareness位于gateway_service.go:1190,负载评分逻辑在openai_account_scheduler.go。
2.2 核心数据结构
// 账号选择结果
type AccountSelection struct {
Account *Account // 选中的账号
SessionKey string // 粘性会话键
IsSticky bool // 是否命中粘性会话
LoadFactor float64 // 当前负载因子
}
// 负载因子
type LoadFactor struct {
ActiveConnections int // 活跃连接数
MaxConcurrency int // 最大并发
RPM int // 每分钟请求数
ErrorRate float64 // 错误率
Calculated float64 // 计算后的负载因子
}
// 故障转移结果
type FailoverResult struct {
Success bool
Account *Account
RetryCount int
DelayMs int
Error error
}
3. 功能详细分析
3.1 账号选择算法
3.1.1 负载感知选择
// service/gateway_service.go - SelectAccountWithLoadAwareness
func (s *GatewayService) SelectAccountWithLoadAwareness(
ctx context.Context,
groupID int64,
sessionKey string,
model string,
failedAccountIDs []int64,
) (*Account, error) {
// 1. 获取分组下所有可用账号
accounts := s.getAvailableAccounts(groupID, model)
// 2. 过滤掉已失败的账号
accounts = s.filterFailedAccounts(accounts, failedAccountIDs)
// 3. 计算每个账号的负载因子
for _, account := range accounts {
account.LoadFactor = s.calculateLoadFactor(account)
}
// 4. 按负载因子排序(低的优先)
sort.Slice(accounts, func(i, j int) bool {
return accounts[i].LoadFactor < accounts[j].LoadFactor
})
// 5. 检查粘性会话
if sessionKey != "" {
stickyAccount := s.getStickyAccount(sessionKey, accounts)
if stickyAccount != nil {
return stickyAccount, nil
}
}
// 6. 返回负载最低的账号
return accounts[0], nil
}
3.1.2 负载因子计算(加权评分系统)
⚠️ 修正:负载评分使用可配置的加权评分系统,不是简单的公式。
// service/openai_account_scheduler.go - Weighted Scoring
type AccountScore struct {
AccountID int64
TotalScore float64
PriorityScore float64 // 优先级得分
LoadScore float64 // 负载得分
QueueScore float64 // 队列得分
ErrorScore float64 // 错误率得分
TTFTScore float64 // 首字节时间得分
}
// 评分权重(可配置)
type SchedulerScoreWeights struct {
PriorityWeight float64 // 默认 1.0
LoadWeight float64 // 默认 1.0
QueueWeight float64 // 默认 0.7
ErrorWeight float64 // 默认 0.8
TTFTWeight float64 // 默认 0.5
}
// 计算公式
func (s *AccountScheduler) calculateScore(account *Account) *AccountScore {
score := &AccountScore{AccountID: account.ID}
weights := s.getScoreWeights()
// 1. 优先级得分(账号配置中的优先级,越高越优先)
score.PriorityScore = float64(account.Priority)
// 2. 负载得分(当前连接数 / 最大并发)
activeConns := s.getActiveConnections(account.ID)
score.LoadScore = 1.0 - (float64(activeConns) / float64(account.MaxConcurrency))
// 3. 队列得分(等待中的请求数)
queueLen := s.getQueueLength(account.ID)
score.QueueScore = 1.0 - (float64(queueLen) / float64(account.MaxConcurrency*2))
// 4. 错误率得分(最近错误率越低越好)
errorRate := s.getErrorRate(account.ID)
score.ErrorScore = 1.0 - errorRate
// 5. TTFT 得分(首字节响应时间,归一化)
avgTTFT := s.getAvgTTFT(account.ID)
score.TTFTScore = calculateTTFTScore(avgTTFT)
// 6. 加权总分
score.TotalScore =
score.PriorityScore * weights.PriorityWeight +
score.LoadScore * weights.LoadWeight +
score.QueueScore * weights.QueueWeight +
score.ErrorScore * weights.ErrorWeight +
score.TTFTScore * weights.TTFTWeight
return score
}
// 选择策略:top-K + 加权随机
func (s *AccountScheduler) selectByWeightedRandom(candidates []*Account) *Account {
topK := s.selectTopK(candidates, 5) // 选前5名
// 加权随机,避免单一账号被过度使用
totalWeight := 0.0
for _, a := range topK {
totalWeight += a.Score.TotalScore
}
rand := mathrand.Float64() * totalWeight
cumulative := 0.0
for _, a := range topK {
cumulative += a.Score.TotalScore
if rand <= cumulative {
return a.Account
}
}
return topK[0]
}
评分维度说明:
| 维度 | 计算方式 | 权重范围 | 说明 |
|---|---|---|---|
| PriorityScore | 账号配置中的 Priority 字段 | 0-100 | 越高越优先 |
| LoadScore | 1 - (活跃连接/最大并发) |
0-1 | 空闲度 |
| QueueScore | 1 - (队列长度/(最大并发×2)) |
0-1 | 等待情况 |
| ErrorScore | 1 - 最近错误率 |
0-1 | 健康度 |
| TTFTScore | 归一化的首字节时间 | 0-1 | 响应速度 |
3.2 粘性会话
3.2.1 会话保持机制
// service/gateway_service.go - BindStickySession
func (s *GatewayService) BindStickySession(ctx context.Context, groupID int64, sessionKey string, accountID int64) error {
// 1. 生成会话键
// 格式:group:{groupID}:session:{sessionHash}
redisKey := fmt.Sprintf("sticky:%d:%s", groupID, sessionKey)
// 2. 存储映射关系
// TTL: 1小时(可配置)
err := s.redis.Set(ctx, redisKey, accountID, 1*time.Hour)
if err != nil {
return err
}
// 3. 记录会话窗口
return s.updateSessionWindow(ctx, accountID, sessionKey)
}
3.2.2 会话验证
// 检查会话是否有效
func (s *GatewayService) getStickyAccount(sessionKey string, accounts []*Account) *Account {
for _, account := range accounts {
// 检查账号是否在会话窗口内
if s.isInSessionWindow(account, sessionKey) {
// 验证会话映射
if mappedID := s.getSessionMapping(groupID, sessionKey); mappedID == account.ID {
return account
}
}
}
return nil
}
3.3 故障转移
3.3.1 重试策略
// service/gateway_service.go - handleUpstreamError
func (s *GatewayService) handleUpstreamError(
ctx context.Context,
account *Account,
err error,
attempt int,
) *FailoverResult {
// 1. 检查是否可重试
if !isRetryableError(err) {
return &FailoverResult{Success: false, Error: err}
}
// 2. 判断重试级别
switch getRetryLevel(err) {
case RetryLevelSameAccount:
// 同账号重试(短延迟)
delay := calculateDelay(attempt, DelayTypeShort)
return &FailoverResult{
RetryCount: 1,
DelayMs: delay,
Account: account, // 继续使用同一账号
}
case RetryLevelSwitchAccount:
// 切换账号(稍长延迟)
nextAccount := s.selectNextAccount(account.GroupID, account.ID)
delay := calculateDelay(attempt, DelayTypeMedium)
return &FailoverResult{
RetryCount: 1,
DelayMs: delay,
Account: nextAccount,
}
case RetryLevelGiveUp:
// 放弃重试
return &FailoverResult{Success: false, Error: err}
}
return &FailoverResult{Success: false, Error: err}
}
3.3.2 延迟计算
// 延迟计算策略
func calculateDelay(attempt int, delayType DelayType) int {
switch delayType {
case DelayTypeShort:
// 短延迟:100ms - 400ms
return 100 + attempt*100
case DelayTypeMedium:
// 中延迟:200ms - 800ms
return 200 + attempt*200
case DelayTypeLong:
// 长延迟:500ms - 2000ms
return 500 + attempt*500
default:
return 100
}
}
3.3.3 账号池模式(备用方案)
⚠️ 修正:
account_pool_mode.go是备用/扩展方案,主要账号选择使用加权评分系统。
// service/account_pool_mode.go - 备用选择模式
type PoolMode struct {
Type string // least_load/round_robin/random/priority
MaxRetries int // 最大切换次数
Backoff string // 退避策略:linear/exponential
}
const (
PoolModeLeastLoad = "least_load" // 最小负载
PoolModeRoundRobin = "round_robin" // 轮询
PoolModeRandom = "random" // 随机
PoolModePriority = "priority" // 优先级
)
// 实际主要选择流程使用 SelectAccountWithLoadAwareness
// account_pool_mode 仅在特定场景下使用
主要选择流程 vs 备用模式:
| 选择方式 | 使用位置 | 说明 |
|---|---|---|
SelectAccountWithLoadAwareness |
gateway_service.go:1190 | 主要流程:加权评分 + 粘性会话 + 并发控制 |
selectByLoadBalance |
openai_account_scheduler.go | OpenAI 专用:WebSocket 场景 |
selectAccount (PoolMode) |
account_pool_mode.go | 备用:特定配置下使用 |
3.4 连接管理
3.4.1 连接池配置
// 上游HTTP客户端配置
type UpstreamClientConfig struct {
MaxIdleConns int // 最大空闲连接
MaxIdleConnsPerHost int // 每个主机最大空闲连接
IdleConnTimeout time.Duration // 空闲连接超时
DialTimeout time.Duration // 建立连接超时
ReadTimeout time.Duration // 读取超时
WriteTimeout time.Duration // 写超时
}
// 默认配置
var DefaultUpstreamConfig = UpstreamClientConfig{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 30,
IdleConnTimeout: 90 * time.Second,
DialTimeout: 10 * time.Second,
ReadTimeout: 120 * time.Second,
WriteTimeout: 30 * time.Second,
}
3.4.2 连接跟踪
// 连接跟踪服务
type ConnectionTracker struct {
mutex sync.RWMutex
// 本地内存:实时连接数
connections map[int64]int64
// Redis:历史统计数据
redisStats map[int64]*HistoricalStats
}
func (t *ConnectionTracker) Increment(accountID int64) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.connections[accountID]++
}
func (t *ConnectionTracker) Decrement(accountID int64) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.connections[accountID]--
}
func (t *ConnectionTracker) GetActiveCount(accountID int64) int64 {
t.mutex.RLock()
defer t.mutex.RUnlock()
return t.connections[accountID]
}
4. 高级功能
4.1 预热机制
// 新账号预热
func (s *AccountWarmupService) WarmupAccount(account *Account) error {
// 1. 发送轻量级请求建立连接
req := &Request{
Model: "claude-3-haiku-20240307",
MaxTokens: 1,
}
// 2. 预热连接池
for i := 0; i < 5; i++ {
_, err := s.sendRequest(account, req)
if err != nil {
return err
}
}
// 3. 标记预热完成
account.WarmedUp = true
return nil
}
4.2 健康检查
// 账号健康检查
func (s *HealthCheckService) CheckAccountHealth(accountID int64) HealthStatus {
// 1. 检查最后活跃时间
if time.Since(account.LastUsedAt) > 24*time.Hour {
return HealthStatusStale
}
// 2. 检查错误率
errorRate := s.getErrorRate(accountID)
if errorRate > 0.5 {
return HealthStatusUnhealthy
}
// 3. 检查响应时间
avgLatency := s.getAvgLatency(accountID)
if avgLatency > 10*time.Second {
return HealthStatusSlow
}
return HealthStatusHealthy
}
4.3 自适应负载
// 自适应负载调整
func (s *AdaptiveLoadService) AdjustConcurrency(account *Account) {
// 1. 基于错误率调整
errorRate := s.getErrorRate(account.ID)
if errorRate > 0.1 {
// 错误率高,减少并发
account.MaxConcurrency = int(float64(account.MaxConcurrency) * 0.8)
}
// 2. 基于响应时间调整
avgLatency := s.getAvgLatency(account.ID)
if avgLatency > 5*time.Second {
account.MaxConcurrency = int(float64(account.MaxConcurrency) * 0.9)
}
// 3. 基于成功率恢复
successRate := 1 - errorRate
if successRate > 0.99 && account.MaxConcurrency < account.OriginalMaxConcurrency {
account.MaxConcurrency = int(float64(account.MaxConcurrency) * 1.1)
}
}
5. 配置参数
5.1 调度配置(config.yaml)
scheduler:
# 账号选择策略
selection:
strategy: "least_load" # least_load/round_robin/random/priority
sticky_session: true
sticky_ttl: 3600
# 故障转移配置
failover:
max_retries: 3
retry_delay_ms: 100
max_retry_delay_ms: 8000
enable_same_account_retry: true
# 连接池配置
connection_pool:
max_idle_conns: 100
max_idle_per_host: 30
idle_timeout: 90s
dial_timeout: 10s
read_timeout: 120s
# 健康检查
health_check:
enabled: true
interval: 5m
timeout: 30s
5.2 环境变量
| 变量 | 说明 | 默认值 |
|---|---|---|
SCHEDULER_SELECTION_STRATEGY |
选择策略 | least_load |
SCHEDULER_MAX_RETRIES |
最大重试次数 | 3 |
SCHEDULER_STICKY_SESSION |
启用粘性会话 | true |
6. 修改和扩展指南
6.1 常见修改场景
场景1:调整选择策略
// 修改为轮询模式
func (s *GatewayService) SelectAccount(ctx context.Context, groupID int64) (*Account, error) {
accounts := s.getAvailableAccounts(groupID)
// 使用轮询选择
index := s.getAndIncrementIndex(groupID)
return accounts[index % len(accounts)], nil
}
场景2:添加新的负载指标
// 添加延迟作为负载指标
func (s *LoadFactorService) CalculateWithLatency(account *Account) float64 {
activeConns := float64(s.getActiveConnections(account.ID)) / float64(account.MaxConcurrency)
latency := s.getAvgLatency(account.ID).Seconds() / 10.0 // 归一化
return activeConns*0.7 + latency*0.3
}
场景3:自定义重试策略
// 实现指数退避
func calculateExponentialBackoff(attempt int) int {
base := 100
maxDelay := 5000
delay := base * (1 << attempt) // 2^attempt
if delay > maxDelay {
delay = maxDelay
}
return delay
}
6.2 注意事项
- 线程安全:负载因子更新需要原子操作
- 数据一致性:Redis和本地缓存需要同步
- 超时设置:重试延迟要考虑用户体验
7. 测试覆盖
7.1 单元测试
| 测试文件 | 覆盖范围 |
|---|---|
account_load_factor_test.go |
负载因子计算 |
failover_service_test.go |
故障转移逻辑 |
account_pool_mode_test.go |
账号池模式 |
7.2 集成测试
| 测试文件 | 场景 |
|---|---|
e2e_gateway_test.go |
完整调度流程 |
8. 监控与运维
8.1 关键指标
| 指标 | 告警阈值 | 说明 |
|---|---|---|
scheduler_failover_count |
> 100/min | 故障转移次数 |
scheduler_avg_latency |
> 5s | 平均延迟 |
scheduler_account_errors |
> 10% | 账号错误率 |
scheduler_sticky_hit_rate |
< 80% | 粘性会话命中率 |
8.2 运维任务
| 任务 | 频率 | 说明 |
|---|---|---|
| 负载分析 | 每小时 | 分析账号负载分布 |
| 故障统计 | 每天 | 统计故障转移情况 |
| 连接池清理 | 每天 | 清理空闲连接 |
9. 总结
调度与负载均衡模块特点:
- 智能选择:负载感知算法确保请求分发到最合适的账号
- 高可用:完善的故障转移机制保障服务连续性
- 会话保持:粘性会话提升用户体验
- 可配置:多种调度模式满足不同场景
潜在改进点:
- 可增加更多自适应能力
- 可支持更复杂的调度规则
修改建议:
- 选择策略调整相对简单
- 重试策略需要充分测试
文档版本:1.1 最后更新:2026-03-23 分析基于:Sub2API v0.1.104 修正内容:负载因子计算(加权评分系统)、账号池模式(备用方案)、实际选择流程