Files
tokens-reef/deploy/docs-backup/MODULE_06_SCHEDULING.md

626 lines
17 KiB
Markdown
Raw Permalink Normal View History

# Sub2API 模块分析报告:调度与负载均衡模块
## 1. 模块概述
### 1.1 模块定位
调度与负载均衡模块是Sub2API系统的请求分配核心负责在多个上游账号之间智能分配请求实现负载均衡、故障转移和会话保持。该模块确保系统在高并发场景下的稳定性和资源利用率。
### 1.2 核心职责
- **账号选择**:根据负载、优先级选择最合适的账号
- **负载均衡**:将请求均匀分布到多个账号
- **故障转移**:账号故障时自动切换到备用账号
- **会话保持**:同一会话路由到同一账号
- **连接管理**:管理上游连接池
## 2. 代码结构分析
### 2.1 核心文件
| 文件路径 | 职责 | 代码行数 |
|---------|------|----------|
| `service/gateway_service.go` | 账号选择核心逻辑 | **8516行** |
| `service/openai_account_scheduler.go` | OpenAI 账号调度器(加权评分) | ~2000行 |
| `service/account_load_factor.go` | 负载因子计算 | ~400行 |
| `service/account_pool_mode.go` | 账号池模式(备用) | ~300行 |
| `service/concurrency_service.go` | 并发槽位管理 | ~1000行 |
| `handler/gateway_handler.go` | 网关请求处理 | ~1800行 |
> ⚠️ **修正**`SelectAccountWithLoadAwareness` 位于 `gateway_service.go:1190`,负载评分逻辑在 `openai_account_scheduler.go`。
### 2.2 核心数据结构
```go
// 账号选择结果
type AccountSelection struct {
Account *Account // 选中的账号
SessionKey string // 粘性会话键
IsSticky bool // 是否命中粘性会话
LoadFactor float64 // 当前负载因子
}
// 负载因子
type LoadFactor struct {
ActiveConnections int // 活跃连接数
MaxConcurrency int // 最大并发
RPM int // 每分钟请求数
ErrorRate float64 // 错误率
Calculated float64 // 计算后的负载因子
}
// 故障转移结果
type FailoverResult struct {
Success bool
Account *Account
RetryCount int
DelayMs int
Error error
}
```
## 3. 功能详细分析
### 3.1 账号选择算法
#### 3.1.1 负载感知选择
```go
// service/gateway_service.go - SelectAccountWithLoadAwareness
func (s *GatewayService) SelectAccountWithLoadAwareness(
ctx context.Context,
groupID int64,
sessionKey string,
model string,
failedAccountIDs []int64,
) (*Account, error) {
// 1. 获取分组下所有可用账号
accounts := s.getAvailableAccounts(groupID, model)
// 2. 过滤掉已失败的账号
accounts = s.filterFailedAccounts(accounts, failedAccountIDs)
// 3. 计算每个账号的负载因子
for _, account := range accounts {
account.LoadFactor = s.calculateLoadFactor(account)
}
// 4. 按负载因子排序(低的优先)
sort.Slice(accounts, func(i, j int) bool {
return accounts[i].LoadFactor < accounts[j].LoadFactor
})
// 5. 检查粘性会话
if sessionKey != "" {
stickyAccount := s.getStickyAccount(sessionKey, accounts)
if stickyAccount != nil {
return stickyAccount, nil
}
}
// 6. 返回负载最低的账号
return accounts[0], nil
}
```
#### 3.1.2 负载因子计算(加权评分系统)
> ⚠️ **修正**:负载评分使用**可配置的加权评分系统**,不是简单的公式。
```go
// service/openai_account_scheduler.go - Weighted Scoring
type AccountScore struct {
AccountID int64
TotalScore float64
PriorityScore float64 // 优先级得分
LoadScore float64 // 负载得分
QueueScore float64 // 队列得分
ErrorScore float64 // 错误率得分
TTFTScore float64 // 首字节时间得分
}
// 评分权重(可配置)
type SchedulerScoreWeights struct {
PriorityWeight float64 // 默认 1.0
LoadWeight float64 // 默认 1.0
QueueWeight float64 // 默认 0.7
ErrorWeight float64 // 默认 0.8
TTFTWeight float64 // 默认 0.5
}
// 计算公式
func (s *AccountScheduler) calculateScore(account *Account) *AccountScore {
score := &AccountScore{AccountID: account.ID}
weights := s.getScoreWeights()
// 1. 优先级得分(账号配置中的优先级,越高越优先)
score.PriorityScore = float64(account.Priority)
// 2. 负载得分(当前连接数 / 最大并发)
activeConns := s.getActiveConnections(account.ID)
score.LoadScore = 1.0 - (float64(activeConns) / float64(account.MaxConcurrency))
// 3. 队列得分(等待中的请求数)
queueLen := s.getQueueLength(account.ID)
score.QueueScore = 1.0 - (float64(queueLen) / float64(account.MaxConcurrency*2))
// 4. 错误率得分(最近错误率越低越好)
errorRate := s.getErrorRate(account.ID)
score.ErrorScore = 1.0 - errorRate
// 5. TTFT 得分(首字节响应时间,归一化)
avgTTFT := s.getAvgTTFT(account.ID)
score.TTFTScore = calculateTTFTScore(avgTTFT)
// 6. 加权总分
score.TotalScore =
score.PriorityScore * weights.PriorityWeight +
score.LoadScore * weights.LoadWeight +
score.QueueScore * weights.QueueWeight +
score.ErrorScore * weights.ErrorWeight +
score.TTFTScore * weights.TTFTWeight
return score
}
// 选择策略top-K + 加权随机
func (s *AccountScheduler) selectByWeightedRandom(candidates []*Account) *Account {
topK := s.selectTopK(candidates, 5) // 选前5名
// 加权随机,避免单一账号被过度使用
totalWeight := 0.0
for _, a := range topK {
totalWeight += a.Score.TotalScore
}
rand := mathrand.Float64() * totalWeight
cumulative := 0.0
for _, a := range topK {
cumulative += a.Score.TotalScore
if rand <= cumulative {
return a.Account
}
}
return topK[0]
}
```
**评分维度说明**
| 维度 | 计算方式 | 权重范围 | 说明 |
|------|---------|----------|------|
| PriorityScore | 账号配置中的 Priority 字段 | 0-100 | 越高越优先 |
| LoadScore | `1 - (活跃连接/最大并发)` | 0-1 | 空闲度 |
| QueueScore | `1 - (队列长度/(最大并发×2))` | 0-1 | 等待情况 |
| ErrorScore | `1 - 最近错误率` | 0-1 | 健康度 |
| TTFTScore | 归一化的首字节时间 | 0-1 | 响应速度 |
### 3.2 粘性会话
#### 3.2.1 会话保持机制
```go
// service/gateway_service.go - BindStickySession
func (s *GatewayService) BindStickySession(ctx context.Context, groupID int64, sessionKey string, accountID int64) error {
// 1. 生成会话键
// 格式group:{groupID}:session:{sessionHash}
redisKey := fmt.Sprintf("sticky:%d:%s", groupID, sessionKey)
// 2. 存储映射关系
// TTL: 1小时可配置
err := s.redis.Set(ctx, redisKey, accountID, 1*time.Hour)
if err != nil {
return err
}
// 3. 记录会话窗口
return s.updateSessionWindow(ctx, accountID, sessionKey)
}
```
#### 3.2.2 会话验证
```go
// 检查会话是否有效
func (s *GatewayService) getStickyAccount(sessionKey string, accounts []*Account) *Account {
for _, account := range accounts {
// 检查账号是否在会话窗口内
if s.isInSessionWindow(account, sessionKey) {
// 验证会话映射
if mappedID := s.getSessionMapping(groupID, sessionKey); mappedID == account.ID {
return account
}
}
}
return nil
}
```
### 3.3 故障转移
#### 3.3.1 重试策略
```go
// service/gateway_service.go - handleUpstreamError
func (s *GatewayService) handleUpstreamError(
ctx context.Context,
account *Account,
err error,
attempt int,
) *FailoverResult {
// 1. 检查是否可重试
if !isRetryableError(err) {
return &FailoverResult{Success: false, Error: err}
}
// 2. 判断重试级别
switch getRetryLevel(err) {
case RetryLevelSameAccount:
// 同账号重试(短延迟)
delay := calculateDelay(attempt, DelayTypeShort)
return &FailoverResult{
RetryCount: 1,
DelayMs: delay,
Account: account, // 继续使用同一账号
}
case RetryLevelSwitchAccount:
// 切换账号(稍长延迟)
nextAccount := s.selectNextAccount(account.GroupID, account.ID)
delay := calculateDelay(attempt, DelayTypeMedium)
return &FailoverResult{
RetryCount: 1,
DelayMs: delay,
Account: nextAccount,
}
case RetryLevelGiveUp:
// 放弃重试
return &FailoverResult{Success: false, Error: err}
}
return &FailoverResult{Success: false, Error: err}
}
```
#### 3.3.2 延迟计算
```go
// 延迟计算策略
func calculateDelay(attempt int, delayType DelayType) int {
switch delayType {
case DelayTypeShort:
// 短延迟100ms - 400ms
return 100 + attempt*100
case DelayTypeMedium:
// 中延迟200ms - 800ms
return 200 + attempt*200
case DelayTypeLong:
// 长延迟500ms - 2000ms
return 500 + attempt*500
default:
return 100
}
}
```
#### 3.3.3 账号池模式(备用方案)
> ⚠️ **修正**`account_pool_mode.go` 是**备用/扩展方案**,主要账号选择使用加权评分系统。
```go
// service/account_pool_mode.go - 备用选择模式
type PoolMode struct {
Type string // least_load/round_robin/random/priority
MaxRetries int // 最大切换次数
Backoff string // 退避策略linear/exponential
}
const (
PoolModeLeastLoad = "least_load" // 最小负载
PoolModeRoundRobin = "round_robin" // 轮询
PoolModeRandom = "random" // 随机
PoolModePriority = "priority" // 优先级
)
// 实际主要选择流程使用 SelectAccountWithLoadAwareness
// account_pool_mode 仅在特定场景下使用
```
**主要选择流程 vs 备用模式**
| 选择方式 | 使用位置 | 说明 |
|---------|---------|------|
| `SelectAccountWithLoadAwareness` | gateway_service.go:1190 | **主要流程**:加权评分 + 粘性会话 + 并发控制 |
| `selectByLoadBalance` | openai_account_scheduler.go | **OpenAI 专用**WebSocket 场景 |
| `selectAccount` (PoolMode) | account_pool_mode.go | **备用**:特定配置下使用 |
### 3.4 连接管理
#### 3.4.1 连接池配置
```go
// 上游HTTP客户端配置
type UpstreamClientConfig struct {
MaxIdleConns int // 最大空闲连接
MaxIdleConnsPerHost int // 每个主机最大空闲连接
IdleConnTimeout time.Duration // 空闲连接超时
DialTimeout time.Duration // 建立连接超时
ReadTimeout time.Duration // 读取超时
WriteTimeout time.Duration // 写超时
}
// 默认配置
var DefaultUpstreamConfig = UpstreamClientConfig{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 30,
IdleConnTimeout: 90 * time.Second,
DialTimeout: 10 * time.Second,
ReadTimeout: 120 * time.Second,
WriteTimeout: 30 * time.Second,
}
```
#### 3.4.2 连接跟踪
```go
// 连接跟踪服务
type ConnectionTracker struct {
mutex sync.RWMutex
// 本地内存:实时连接数
connections map[int64]int64
// Redis历史统计数据
redisStats map[int64]*HistoricalStats
}
func (t *ConnectionTracker) Increment(accountID int64) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.connections[accountID]++
}
func (t *ConnectionTracker) Decrement(accountID int64) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.connections[accountID]--
}
func (t *ConnectionTracker) GetActiveCount(accountID int64) int64 {
t.mutex.RLock()
defer t.mutex.RUnlock()
return t.connections[accountID]
}
```
## 4. 高级功能
### 4.1 预热机制
```go
// 新账号预热
func (s *AccountWarmupService) WarmupAccount(account *Account) error {
// 1. 发送轻量级请求建立连接
req := &Request{
Model: "claude-3-haiku-20240307",
MaxTokens: 1,
}
// 2. 预热连接池
for i := 0; i < 5; i++ {
_, err := s.sendRequest(account, req)
if err != nil {
return err
}
}
// 3. 标记预热完成
account.WarmedUp = true
return nil
}
```
### 4.2 健康检查
```go
// 账号健康检查
func (s *HealthCheckService) CheckAccountHealth(accountID int64) HealthStatus {
// 1. 检查最后活跃时间
if time.Since(account.LastUsedAt) > 24*time.Hour {
return HealthStatusStale
}
// 2. 检查错误率
errorRate := s.getErrorRate(accountID)
if errorRate > 0.5 {
return HealthStatusUnhealthy
}
// 3. 检查响应时间
avgLatency := s.getAvgLatency(accountID)
if avgLatency > 10*time.Second {
return HealthStatusSlow
}
return HealthStatusHealthy
}
```
### 4.3 自适应负载
```go
// 自适应负载调整
func (s *AdaptiveLoadService) AdjustConcurrency(account *Account) {
// 1. 基于错误率调整
errorRate := s.getErrorRate(account.ID)
if errorRate > 0.1 {
// 错误率高,减少并发
account.MaxConcurrency = int(float64(account.MaxConcurrency) * 0.8)
}
// 2. 基于响应时间调整
avgLatency := s.getAvgLatency(account.ID)
if avgLatency > 5*time.Second {
account.MaxConcurrency = int(float64(account.MaxConcurrency) * 0.9)
}
// 3. 基于成功率恢复
successRate := 1 - errorRate
if successRate > 0.99 && account.MaxConcurrency < account.OriginalMaxConcurrency {
account.MaxConcurrency = int(float64(account.MaxConcurrency) * 1.1)
}
}
```
## 5. 配置参数
### 5.1 调度配置config.yaml
```yaml
scheduler:
# 账号选择策略
selection:
strategy: "least_load" # least_load/round_robin/random/priority
sticky_session: true
sticky_ttl: 3600
# 故障转移配置
failover:
max_retries: 3
retry_delay_ms: 100
max_retry_delay_ms: 8000
enable_same_account_retry: true
# 连接池配置
connection_pool:
max_idle_conns: 100
max_idle_per_host: 30
idle_timeout: 90s
dial_timeout: 10s
read_timeout: 120s
# 健康检查
health_check:
enabled: true
interval: 5m
timeout: 30s
```
### 5.2 环境变量
| 变量 | 说明 | 默认值 |
|------|------|--------|
| `SCHEDULER_SELECTION_STRATEGY` | 选择策略 | least_load |
| `SCHEDULER_MAX_RETRIES` | 最大重试次数 | 3 |
| `SCHEDULER_STICKY_SESSION` | 启用粘性会话 | true |
## 6. 修改和扩展指南
### 6.1 常见修改场景
**场景1调整选择策略**
```go
// 修改为轮询模式
func (s *GatewayService) SelectAccount(ctx context.Context, groupID int64) (*Account, error) {
accounts := s.getAvailableAccounts(groupID)
// 使用轮询选择
index := s.getAndIncrementIndex(groupID)
return accounts[index % len(accounts)], nil
}
```
**场景2添加新的负载指标**
```go
// 添加延迟作为负载指标
func (s *LoadFactorService) CalculateWithLatency(account *Account) float64 {
activeConns := float64(s.getActiveConnections(account.ID)) / float64(account.MaxConcurrency)
latency := s.getAvgLatency(account.ID).Seconds() / 10.0 // 归一化
return activeConns*0.7 + latency*0.3
}
```
**场景3自定义重试策略**
```go
// 实现指数退避
func calculateExponentialBackoff(attempt int) int {
base := 100
maxDelay := 5000
delay := base * (1 << attempt) // 2^attempt
if delay > maxDelay {
delay = maxDelay
}
return delay
}
```
### 6.2 注意事项
1. **线程安全**:负载因子更新需要原子操作
2. **数据一致性**Redis和本地缓存需要同步
3. **超时设置**:重试延迟要考虑用户体验
## 7. 测试覆盖
### 7.1 单元测试
| 测试文件 | 覆盖范围 |
|----------|----------|
| `account_load_factor_test.go` | 负载因子计算 |
| `failover_service_test.go` | 故障转移逻辑 |
| `account_pool_mode_test.go` | 账号池模式 |
### 7.2 集成测试
| 测试文件 | 场景 |
|----------|------|
| `e2e_gateway_test.go` | 完整调度流程 |
## 8. 监控与运维
### 8.1 关键指标
| 指标 | 告警阈值 | 说明 |
|------|----------|------|
| `scheduler_failover_count` | > 100/min | 故障转移次数 |
| `scheduler_avg_latency` | > 5s | 平均延迟 |
| `scheduler_account_errors` | > 10% | 账号错误率 |
| `scheduler_sticky_hit_rate` | < 80% | 粘性会话命中率 |
### 8.2 运维任务
| 任务 | 频率 | 说明 |
|------|------|------|
| 负载分析 | 每小时 | 分析账号负载分布 |
| 故障统计 | 每天 | 统计故障转移情况 |
| 连接池清理 | 每天 | 清理空闲连接 |
## 9. 总结
调度与负载均衡模块特点:
- **智能选择**:负载感知算法确保请求分发到最合适的账号
- **高可用**:完善的故障转移机制保障服务连续性
- **会话保持**:粘性会话提升用户体验
- **可配置**:多种调度模式满足不同场景
**潜在改进点:**
1. 可增加更多自适应能力
2. 可支持更复杂的调度规则
**修改建议:**
- 选择策略调整相对简单
- 重试策略需要充分测试
---
*文档版本1.1*
*最后更新2026-03-23*
*分析基于Sub2API v0.1.104*
*修正内容:负载因子计算(加权评分系统)、账号池模式(备用方案)、实际选择流程*