Files
tokens-reef/deploy/docs-backup/MODULE_06_SCHEDULING.md
Developer 349d783fd1 refactor: clean up project structure
- Remove old review reports (keep latest only)
- Move docs/ to deploy/docs-backup/
- Move performance-testing/ to deploy/
- Clean up test output files
- Organize root directory
2026-04-06 23:36:03 +08:00

17 KiB
Raw Permalink Blame History

Sub2API 模块分析报告:调度与负载均衡模块

1. 模块概述

1.1 模块定位

调度与负载均衡模块是Sub2API系统的请求分配核心负责在多个上游账号之间智能分配请求实现负载均衡、故障转移和会话保持。该模块确保系统在高并发场景下的稳定性和资源利用率。

1.2 核心职责

  • 账号选择:根据负载、优先级选择最合适的账号
  • 负载均衡:将请求均匀分布到多个账号
  • 故障转移:账号故障时自动切换到备用账号
  • 会话保持:同一会话路由到同一账号
  • 连接管理:管理上游连接池

2. 代码结构分析

2.1 核心文件

文件路径 职责 代码行数
service/gateway_service.go 账号选择核心逻辑 8516行
service/openai_account_scheduler.go OpenAI 账号调度器(加权评分) ~2000行
service/account_load_factor.go 负载因子计算 ~400行
service/account_pool_mode.go 账号池模式(备用) ~300行
service/concurrency_service.go 并发槽位管理 ~1000行
handler/gateway_handler.go 网关请求处理 ~1800行

⚠️ 修正SelectAccountWithLoadAwareness 位于 gateway_service.go:1190,负载评分逻辑在 openai_account_scheduler.go

2.2 核心数据结构

// 账号选择结果
type AccountSelection struct {
    Account       *Account   // 选中的账号
    SessionKey    string     // 粘性会话键
    IsSticky      bool       // 是否命中粘性会话
    LoadFactor    float64    // 当前负载因子
}

// 负载因子
type LoadFactor struct {
    ActiveConnections int     // 活跃连接数
    MaxConcurrency    int     // 最大并发
    RPM               int     // 每分钟请求数
    ErrorRate         float64 // 错误率
    Calculated         float64 // 计算后的负载因子
}

// 故障转移结果
type FailoverResult struct {
    Success       bool
    Account      *Account
    RetryCount   int
    DelayMs      int
    Error        error
}

3. 功能详细分析

3.1 账号选择算法

3.1.1 负载感知选择

// service/gateway_service.go - SelectAccountWithLoadAwareness
func (s *GatewayService) SelectAccountWithLoadAwareness(
    ctx context.Context,
    groupID int64,
    sessionKey string,
    model string,
    failedAccountIDs []int64,
) (*Account, error) {
    
    // 1. 获取分组下所有可用账号
    accounts := s.getAvailableAccounts(groupID, model)
    
    // 2. 过滤掉已失败的账号
    accounts = s.filterFailedAccounts(accounts, failedAccountIDs)
    
    // 3. 计算每个账号的负载因子
    for _, account := range accounts {
        account.LoadFactor = s.calculateLoadFactor(account)
    }
    
    // 4. 按负载因子排序(低的优先)
    sort.Slice(accounts, func(i, j int) bool {
        return accounts[i].LoadFactor < accounts[j].LoadFactor
    })
    
    // 5. 检查粘性会话
    if sessionKey != "" {
        stickyAccount := s.getStickyAccount(sessionKey, accounts)
        if stickyAccount != nil {
            return stickyAccount, nil
        }
    }
    
    // 6. 返回负载最低的账号
    return accounts[0], nil
}

3.1.2 负载因子计算(加权评分系统)

⚠️ 修正:负载评分使用可配置的加权评分系统,不是简单的公式。

// service/openai_account_scheduler.go - Weighted Scoring
type AccountScore struct {
    AccountID    int64
    TotalScore   float64
    PriorityScore float64  // 优先级得分
    LoadScore     float64  // 负载得分
    QueueScore    float64  // 队列得分
    ErrorScore   float64  // 错误率得分
    TTFTScore    float64  // 首字节时间得分
}

// 评分权重(可配置)
type SchedulerScoreWeights struct {
    PriorityWeight float64 // 默认 1.0
    LoadWeight     float64 // 默认 1.0
    QueueWeight    float64 // 默认 0.7
    ErrorWeight    float64 // 默认 0.8
    TTFTWeight    float64 // 默认 0.5
}

// 计算公式
func (s *AccountScheduler) calculateScore(account *Account) *AccountScore {
    score := &AccountScore{AccountID: account.ID}
    weights := s.getScoreWeights()
    
    // 1. 优先级得分(账号配置中的优先级,越高越优先)
    score.PriorityScore = float64(account.Priority)
    
    // 2. 负载得分(当前连接数 / 最大并发)
    activeConns := s.getActiveConnections(account.ID)
    score.LoadScore = 1.0 - (float64(activeConns) / float64(account.MaxConcurrency))
    
    // 3. 队列得分(等待中的请求数)
    queueLen := s.getQueueLength(account.ID)
    score.QueueScore = 1.0 - (float64(queueLen) / float64(account.MaxConcurrency*2))
    
    // 4. 错误率得分(最近错误率越低越好)
    errorRate := s.getErrorRate(account.ID)
    score.ErrorScore = 1.0 - errorRate
    
    // 5. TTFT 得分(首字节响应时间,归一化)
    avgTTFT := s.getAvgTTFT(account.ID)
    score.TTFTScore = calculateTTFTScore(avgTTFT)
    
    // 6. 加权总分
    score.TotalScore = 
        score.PriorityScore * weights.PriorityWeight +
        score.LoadScore * weights.LoadWeight +
        score.QueueScore * weights.QueueWeight +
        score.ErrorScore * weights.ErrorWeight +
        score.TTFTScore * weights.TTFTWeight
    
    return score
}

// 选择策略top-K + 加权随机
func (s *AccountScheduler) selectByWeightedRandom(candidates []*Account) *Account {
    topK := s.selectTopK(candidates, 5)  // 选前5名
    
    // 加权随机,避免单一账号被过度使用
    totalWeight := 0.0
    for _, a := range topK {
        totalWeight += a.Score.TotalScore
    }
    
    rand := mathrand.Float64() * totalWeight
    cumulative := 0.0
    for _, a := range topK {
        cumulative += a.Score.TotalScore
        if rand <= cumulative {
            return a.Account
        }
    }
    return topK[0]
}

评分维度说明

维度 计算方式 权重范围 说明
PriorityScore 账号配置中的 Priority 字段 0-100 越高越优先
LoadScore 1 - (活跃连接/最大并发) 0-1 空闲度
QueueScore 1 - (队列长度/(最大并发×2)) 0-1 等待情况
ErrorScore 1 - 最近错误率 0-1 健康度
TTFTScore 归一化的首字节时间 0-1 响应速度

3.2 粘性会话

3.2.1 会话保持机制

// service/gateway_service.go - BindStickySession
func (s *GatewayService) BindStickySession(ctx context.Context, groupID int64, sessionKey string, accountID int64) error {
    // 1. 生成会话键
    // 格式group:{groupID}:session:{sessionHash}
    redisKey := fmt.Sprintf("sticky:%d:%s", groupID, sessionKey)
    
    // 2. 存储映射关系
    // TTL: 1小时可配置
    err := s.redis.Set(ctx, redisKey, accountID, 1*time.Hour)
    if err != nil {
        return err
    }
    
    // 3. 记录会话窗口
    return s.updateSessionWindow(ctx, accountID, sessionKey)
}

3.2.2 会话验证

// 检查会话是否有效
func (s *GatewayService) getStickyAccount(sessionKey string, accounts []*Account) *Account {
    for _, account := range accounts {
        // 检查账号是否在会话窗口内
        if s.isInSessionWindow(account, sessionKey) {
            // 验证会话映射
            if mappedID := s.getSessionMapping(groupID, sessionKey); mappedID == account.ID {
                return account
            }
        }
    }
    return nil
}

3.3 故障转移

3.3.1 重试策略

// service/gateway_service.go - handleUpstreamError
func (s *GatewayService) handleUpstreamError(
    ctx context.Context,
    account *Account,
    err error,
    attempt int,
) *FailoverResult {
    
    // 1. 检查是否可重试
    if !isRetryableError(err) {
        return &FailoverResult{Success: false, Error: err}
    }
    
    // 2. 判断重试级别
    switch getRetryLevel(err) {
    case RetryLevelSameAccount:
        // 同账号重试(短延迟)
        delay := calculateDelay(attempt, DelayTypeShort)
        return &FailoverResult{
            RetryCount: 1,
            DelayMs:    delay,
            Account:    account, // 继续使用同一账号
        }
        
    case RetryLevelSwitchAccount:
        // 切换账号(稍长延迟)
        nextAccount := s.selectNextAccount(account.GroupID, account.ID)
        delay := calculateDelay(attempt, DelayTypeMedium)
        return &FailoverResult{
            RetryCount: 1,
            DelayMs:    delay,
            Account:    nextAccount,
        }
        
    case RetryLevelGiveUp:
        // 放弃重试
        return &FailoverResult{Success: false, Error: err}
    }
    
    return &FailoverResult{Success: false, Error: err}
}

3.3.2 延迟计算

// 延迟计算策略
func calculateDelay(attempt int, delayType DelayType) int {
    switch delayType {
    case DelayTypeShort:
        // 短延迟100ms - 400ms
        return 100 + attempt*100
        
    case DelayTypeMedium:
        // 中延迟200ms - 800ms  
        return 200 + attempt*200
        
    case DelayTypeLong:
        // 长延迟500ms - 2000ms
        return 500 + attempt*500
        
    default:
        return 100
    }
}

3.3.3 账号池模式(备用方案)

⚠️ 修正account_pool_mode.go备用/扩展方案,主要账号选择使用加权评分系统。

// service/account_pool_mode.go - 备用选择模式
type PoolMode struct {
    Type        string  // least_load/round_robin/random/priority
    MaxRetries  int     // 最大切换次数
    Backoff     string  // 退避策略linear/exponential
}

const (
    PoolModeLeastLoad   = "least_load"    // 最小负载
    PoolModeRoundRobin = "round_robin"   // 轮询
    PoolModeRandom     = "random"        // 随机
    PoolModePriority   = "priority"      // 优先级
)

// 实际主要选择流程使用 SelectAccountWithLoadAwareness
// account_pool_mode 仅在特定场景下使用

主要选择流程 vs 备用模式

选择方式 使用位置 说明
SelectAccountWithLoadAwareness gateway_service.go:1190 主要流程:加权评分 + 粘性会话 + 并发控制
selectByLoadBalance openai_account_scheduler.go OpenAI 专用WebSocket 场景
selectAccount (PoolMode) account_pool_mode.go 备用:特定配置下使用

3.4 连接管理

3.4.1 连接池配置

// 上游HTTP客户端配置
type UpstreamClientConfig struct {
    MaxIdleConns        int           // 最大空闲连接
    MaxIdleConnsPerHost int           // 每个主机最大空闲连接
    IdleConnTimeout    time.Duration // 空闲连接超时
    DialTimeout        time.Duration // 建立连接超时
    ReadTimeout        time.Duration // 读取超时
    WriteTimeout       time.Duration // 写超时
}

// 默认配置
var DefaultUpstreamConfig = UpstreamClientConfig{
    MaxIdleConns:        100,
    MaxIdleConnsPerHost:  30,
    IdleConnTimeout:      90 * time.Second,
    DialTimeout:          10 * time.Second,
    ReadTimeout:          120 * time.Second,
    WriteTimeout:         30 * time.Second,
}

3.4.2 连接跟踪

// 连接跟踪服务
type ConnectionTracker struct {
    mutex sync.RWMutex
    // 本地内存:实时连接数
    connections map[int64]int64
    
    // Redis历史统计数据
    redisStats map[int64]*HistoricalStats
}

func (t *ConnectionTracker) Increment(accountID int64) {
    t.mutex.Lock()
    defer t.mutex.Unlock()
    t.connections[accountID]++
}

func (t *ConnectionTracker) Decrement(accountID int64) {
    t.mutex.Lock()
    defer t.mutex.Unlock()
    t.connections[accountID]--
}

func (t *ConnectionTracker) GetActiveCount(accountID int64) int64 {
    t.mutex.RLock()
    defer t.mutex.RUnlock()
    return t.connections[accountID]
}

4. 高级功能

4.1 预热机制

// 新账号预热
func (s *AccountWarmupService) WarmupAccount(account *Account) error {
    // 1. 发送轻量级请求建立连接
    req := &Request{
        Model:     "claude-3-haiku-20240307",
        MaxTokens: 1,
    }
    
    // 2. 预热连接池
    for i := 0; i < 5; i++ {
        _, err := s.sendRequest(account, req)
        if err != nil {
            return err
        }
    }
    
    // 3. 标记预热完成
    account.WarmedUp = true
    return nil
}

4.2 健康检查

// 账号健康检查
func (s *HealthCheckService) CheckAccountHealth(accountID int64) HealthStatus {
    // 1. 检查最后活跃时间
    if time.Since(account.LastUsedAt) > 24*time.Hour {
        return HealthStatusStale
    }
    
    // 2. 检查错误率
    errorRate := s.getErrorRate(accountID)
    if errorRate > 0.5 {
        return HealthStatusUnhealthy
    }
    
    // 3. 检查响应时间
    avgLatency := s.getAvgLatency(accountID)
    if avgLatency > 10*time.Second {
        return HealthStatusSlow
    }
    
    return HealthStatusHealthy
}

4.3 自适应负载

// 自适应负载调整
func (s *AdaptiveLoadService) AdjustConcurrency(account *Account) {
    // 1. 基于错误率调整
    errorRate := s.getErrorRate(account.ID)
    if errorRate > 0.1 {
        // 错误率高,减少并发
        account.MaxConcurrency = int(float64(account.MaxConcurrency) * 0.8)
    }
    
    // 2. 基于响应时间调整
    avgLatency := s.getAvgLatency(account.ID)
    if avgLatency > 5*time.Second {
        account.MaxConcurrency = int(float64(account.MaxConcurrency) * 0.9)
    }
    
    // 3. 基于成功率恢复
    successRate := 1 - errorRate
    if successRate > 0.99 && account.MaxConcurrency < account.OriginalMaxConcurrency {
        account.MaxConcurrency = int(float64(account.MaxConcurrency) * 1.1)
    }
}

5. 配置参数

5.1 调度配置config.yaml

scheduler:
  # 账号选择策略
  selection:
    strategy: "least_load"  # least_load/round_robin/random/priority
    sticky_session: true
    sticky_ttl: 3600
    
  # 故障转移配置
  failover:
    max_retries: 3
    retry_delay_ms: 100
    max_retry_delay_ms: 8000
    enable_same_account_retry: true
    
  # 连接池配置
  connection_pool:
    max_idle_conns: 100
    max_idle_per_host: 30
    idle_timeout: 90s
    dial_timeout: 10s
    read_timeout: 120s
    
  # 健康检查
  health_check:
    enabled: true
    interval: 5m
    timeout: 30s

5.2 环境变量

变量 说明 默认值
SCHEDULER_SELECTION_STRATEGY 选择策略 least_load
SCHEDULER_MAX_RETRIES 最大重试次数 3
SCHEDULER_STICKY_SESSION 启用粘性会话 true

6. 修改和扩展指南

6.1 常见修改场景

场景1调整选择策略

// 修改为轮询模式
func (s *GatewayService) SelectAccount(ctx context.Context, groupID int64) (*Account, error) {
    accounts := s.getAvailableAccounts(groupID)
    
    // 使用轮询选择
    index := s.getAndIncrementIndex(groupID)
    return accounts[index % len(accounts)], nil
}

场景2添加新的负载指标

// 添加延迟作为负载指标
func (s *LoadFactorService) CalculateWithLatency(account *Account) float64 {
    activeConns := float64(s.getActiveConnections(account.ID)) / float64(account.MaxConcurrency)
    latency := s.getAvgLatency(account.ID).Seconds() / 10.0  // 归一化
    
    return activeConns*0.7 + latency*0.3
}

场景3自定义重试策略

// 实现指数退避
func calculateExponentialBackoff(attempt int) int {
    base := 100
    maxDelay := 5000
    delay := base * (1 << attempt)  // 2^attempt
    if delay > maxDelay {
        delay = maxDelay
    }
    return delay
}

6.2 注意事项

  1. 线程安全:负载因子更新需要原子操作
  2. 数据一致性Redis和本地缓存需要同步
  3. 超时设置:重试延迟要考虑用户体验

7. 测试覆盖

7.1 单元测试

测试文件 覆盖范围
account_load_factor_test.go 负载因子计算
failover_service_test.go 故障转移逻辑
account_pool_mode_test.go 账号池模式

7.2 集成测试

测试文件 场景
e2e_gateway_test.go 完整调度流程

8. 监控与运维

8.1 关键指标

指标 告警阈值 说明
scheduler_failover_count > 100/min 故障转移次数
scheduler_avg_latency > 5s 平均延迟
scheduler_account_errors > 10% 账号错误率
scheduler_sticky_hit_rate < 80% 粘性会话命中率

8.2 运维任务

任务 频率 说明
负载分析 每小时 分析账号负载分布
故障统计 每天 统计故障转移情况
连接池清理 每天 清理空闲连接

9. 总结

调度与负载均衡模块特点:

  • 智能选择:负载感知算法确保请求分发到最合适的账号
  • 高可用:完善的故障转移机制保障服务连续性
  • 会话保持:粘性会话提升用户体验
  • 可配置:多种调度模式满足不同场景

潜在改进点:

  1. 可增加更多自适应能力
  2. 可支持更复杂的调度规则

修改建议:

  • 选择策略调整相对简单
  • 重试策略需要充分测试

文档版本1.1 最后更新2026-03-23 分析基于Sub2API v0.1.104 修正内容:负载因子计算(加权评分系统)、账号池模式(备用方案)、实际选择流程