- Remove old review reports (keep latest only) - Move docs/ to deploy/docs-backup/ - Move performance-testing/ to deploy/ - Clean up test output files - Organize root directory
626 lines
17 KiB
Markdown
626 lines
17 KiB
Markdown
# Sub2API 模块分析报告:调度与负载均衡模块
|
||
|
||
## 1. 模块概述
|
||
|
||
### 1.1 模块定位
|
||
调度与负载均衡模块是Sub2API系统的请求分配核心,负责在多个上游账号之间智能分配请求,实现负载均衡、故障转移和会话保持。该模块确保系统在高并发场景下的稳定性和资源利用率。
|
||
|
||
### 1.2 核心职责
|
||
- **账号选择**:根据负载、优先级选择最合适的账号
|
||
- **负载均衡**:将请求均匀分布到多个账号
|
||
- **故障转移**:账号故障时自动切换到备用账号
|
||
- **会话保持**:同一会话路由到同一账号
|
||
- **连接管理**:管理上游连接池
|
||
|
||
## 2. 代码结构分析
|
||
|
||
### 2.1 核心文件
|
||
|
||
| 文件路径 | 职责 | 代码行数 |
|
||
|---------|------|----------|
|
||
| `service/gateway_service.go` | 账号选择核心逻辑 | **8516行** |
|
||
| `service/openai_account_scheduler.go` | OpenAI 账号调度器(加权评分) | ~2000行 |
|
||
| `service/account_load_factor.go` | 负载因子计算 | ~400行 |
|
||
| `service/account_pool_mode.go` | 账号池模式(备用) | ~300行 |
|
||
| `service/concurrency_service.go` | 并发槽位管理 | ~1000行 |
|
||
| `handler/gateway_handler.go` | 网关请求处理 | ~1800行 |
|
||
|
||
> ⚠️ **修正**:`SelectAccountWithLoadAwareness` 位于 `gateway_service.go:1190`,负载评分逻辑在 `openai_account_scheduler.go`。
|
||
|
||
### 2.2 核心数据结构
|
||
|
||
```go
|
||
// 账号选择结果
|
||
type AccountSelection struct {
|
||
Account *Account // 选中的账号
|
||
SessionKey string // 粘性会话键
|
||
IsSticky bool // 是否命中粘性会话
|
||
LoadFactor float64 // 当前负载因子
|
||
}
|
||
|
||
// 负载因子
|
||
type LoadFactor struct {
|
||
ActiveConnections int // 活跃连接数
|
||
MaxConcurrency int // 最大并发
|
||
RPM int // 每分钟请求数
|
||
ErrorRate float64 // 错误率
|
||
Calculated float64 // 计算后的负载因子
|
||
}
|
||
|
||
// 故障转移结果
|
||
type FailoverResult struct {
|
||
Success bool
|
||
Account *Account
|
||
RetryCount int
|
||
DelayMs int
|
||
Error error
|
||
}
|
||
```
|
||
|
||
## 3. 功能详细分析
|
||
|
||
### 3.1 账号选择算法
|
||
|
||
#### 3.1.1 负载感知选择
|
||
|
||
```go
|
||
// service/gateway_service.go - SelectAccountWithLoadAwareness
|
||
func (s *GatewayService) SelectAccountWithLoadAwareness(
|
||
ctx context.Context,
|
||
groupID int64,
|
||
sessionKey string,
|
||
model string,
|
||
failedAccountIDs []int64,
|
||
) (*Account, error) {
|
||
|
||
// 1. 获取分组下所有可用账号
|
||
accounts := s.getAvailableAccounts(groupID, model)
|
||
|
||
// 2. 过滤掉已失败的账号
|
||
accounts = s.filterFailedAccounts(accounts, failedAccountIDs)
|
||
|
||
// 3. 计算每个账号的负载因子
|
||
for _, account := range accounts {
|
||
account.LoadFactor = s.calculateLoadFactor(account)
|
||
}
|
||
|
||
// 4. 按负载因子排序(低的优先)
|
||
sort.Slice(accounts, func(i, j int) bool {
|
||
return accounts[i].LoadFactor < accounts[j].LoadFactor
|
||
})
|
||
|
||
// 5. 检查粘性会话
|
||
if sessionKey != "" {
|
||
stickyAccount := s.getStickyAccount(sessionKey, accounts)
|
||
if stickyAccount != nil {
|
||
return stickyAccount, nil
|
||
}
|
||
}
|
||
|
||
// 6. 返回负载最低的账号
|
||
return accounts[0], nil
|
||
}
|
||
```
|
||
|
||
#### 3.1.2 负载因子计算(加权评分系统)
|
||
|
||
> ⚠️ **修正**:负载评分使用**可配置的加权评分系统**,不是简单的公式。
|
||
|
||
```go
|
||
// service/openai_account_scheduler.go - Weighted Scoring
|
||
type AccountScore struct {
|
||
AccountID int64
|
||
TotalScore float64
|
||
PriorityScore float64 // 优先级得分
|
||
LoadScore float64 // 负载得分
|
||
QueueScore float64 // 队列得分
|
||
ErrorScore float64 // 错误率得分
|
||
TTFTScore float64 // 首字节时间得分
|
||
}
|
||
|
||
// 评分权重(可配置)
|
||
type SchedulerScoreWeights struct {
|
||
PriorityWeight float64 // 默认 1.0
|
||
LoadWeight float64 // 默认 1.0
|
||
QueueWeight float64 // 默认 0.7
|
||
ErrorWeight float64 // 默认 0.8
|
||
TTFTWeight float64 // 默认 0.5
|
||
}
|
||
|
||
// 计算公式
|
||
func (s *AccountScheduler) calculateScore(account *Account) *AccountScore {
|
||
score := &AccountScore{AccountID: account.ID}
|
||
weights := s.getScoreWeights()
|
||
|
||
// 1. 优先级得分(账号配置中的优先级,越高越优先)
|
||
score.PriorityScore = float64(account.Priority)
|
||
|
||
// 2. 负载得分(当前连接数 / 最大并发)
|
||
activeConns := s.getActiveConnections(account.ID)
|
||
score.LoadScore = 1.0 - (float64(activeConns) / float64(account.MaxConcurrency))
|
||
|
||
// 3. 队列得分(等待中的请求数)
|
||
queueLen := s.getQueueLength(account.ID)
|
||
score.QueueScore = 1.0 - (float64(queueLen) / float64(account.MaxConcurrency*2))
|
||
|
||
// 4. 错误率得分(最近错误率越低越好)
|
||
errorRate := s.getErrorRate(account.ID)
|
||
score.ErrorScore = 1.0 - errorRate
|
||
|
||
// 5. TTFT 得分(首字节响应时间,归一化)
|
||
avgTTFT := s.getAvgTTFT(account.ID)
|
||
score.TTFTScore = calculateTTFTScore(avgTTFT)
|
||
|
||
// 6. 加权总分
|
||
score.TotalScore =
|
||
score.PriorityScore * weights.PriorityWeight +
|
||
score.LoadScore * weights.LoadWeight +
|
||
score.QueueScore * weights.QueueWeight +
|
||
score.ErrorScore * weights.ErrorWeight +
|
||
score.TTFTScore * weights.TTFTWeight
|
||
|
||
return score
|
||
}
|
||
|
||
// 选择策略:top-K + 加权随机
|
||
func (s *AccountScheduler) selectByWeightedRandom(candidates []*Account) *Account {
|
||
topK := s.selectTopK(candidates, 5) // 选前5名
|
||
|
||
// 加权随机,避免单一账号被过度使用
|
||
totalWeight := 0.0
|
||
for _, a := range topK {
|
||
totalWeight += a.Score.TotalScore
|
||
}
|
||
|
||
rand := mathrand.Float64() * totalWeight
|
||
cumulative := 0.0
|
||
for _, a := range topK {
|
||
cumulative += a.Score.TotalScore
|
||
if rand <= cumulative {
|
||
return a.Account
|
||
}
|
||
}
|
||
return topK[0]
|
||
}
|
||
```
|
||
|
||
**评分维度说明**:
|
||
|
||
| 维度 | 计算方式 | 权重范围 | 说明 |
|
||
|------|---------|----------|------|
|
||
| PriorityScore | 账号配置中的 Priority 字段 | 0-100 | 越高越优先 |
|
||
| LoadScore | `1 - (活跃连接/最大并发)` | 0-1 | 空闲度 |
|
||
| QueueScore | `1 - (队列长度/(最大并发×2))` | 0-1 | 等待情况 |
|
||
| ErrorScore | `1 - 最近错误率` | 0-1 | 健康度 |
|
||
| TTFTScore | 归一化的首字节时间 | 0-1 | 响应速度 |
|
||
|
||
### 3.2 粘性会话
|
||
|
||
#### 3.2.1 会话保持机制
|
||
|
||
```go
|
||
// service/gateway_service.go - BindStickySession
|
||
func (s *GatewayService) BindStickySession(ctx context.Context, groupID int64, sessionKey string, accountID int64) error {
|
||
// 1. 生成会话键
|
||
// 格式:group:{groupID}:session:{sessionHash}
|
||
redisKey := fmt.Sprintf("sticky:%d:%s", groupID, sessionKey)
|
||
|
||
// 2. 存储映射关系
|
||
// TTL: 1小时(可配置)
|
||
err := s.redis.Set(ctx, redisKey, accountID, 1*time.Hour)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 3. 记录会话窗口
|
||
return s.updateSessionWindow(ctx, accountID, sessionKey)
|
||
}
|
||
```
|
||
|
||
#### 3.2.2 会话验证
|
||
|
||
```go
|
||
// 检查会话是否有效
|
||
func (s *GatewayService) getStickyAccount(sessionKey string, accounts []*Account) *Account {
|
||
for _, account := range accounts {
|
||
// 检查账号是否在会话窗口内
|
||
if s.isInSessionWindow(account, sessionKey) {
|
||
// 验证会话映射
|
||
if mappedID := s.getSessionMapping(groupID, sessionKey); mappedID == account.ID {
|
||
return account
|
||
}
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
```
|
||
|
||
### 3.3 故障转移
|
||
|
||
#### 3.3.1 重试策略
|
||
|
||
```go
|
||
// service/gateway_service.go - handleUpstreamError
|
||
func (s *GatewayService) handleUpstreamError(
|
||
ctx context.Context,
|
||
account *Account,
|
||
err error,
|
||
attempt int,
|
||
) *FailoverResult {
|
||
|
||
// 1. 检查是否可重试
|
||
if !isRetryableError(err) {
|
||
return &FailoverResult{Success: false, Error: err}
|
||
}
|
||
|
||
// 2. 判断重试级别
|
||
switch getRetryLevel(err) {
|
||
case RetryLevelSameAccount:
|
||
// 同账号重试(短延迟)
|
||
delay := calculateDelay(attempt, DelayTypeShort)
|
||
return &FailoverResult{
|
||
RetryCount: 1,
|
||
DelayMs: delay,
|
||
Account: account, // 继续使用同一账号
|
||
}
|
||
|
||
case RetryLevelSwitchAccount:
|
||
// 切换账号(稍长延迟)
|
||
nextAccount := s.selectNextAccount(account.GroupID, account.ID)
|
||
delay := calculateDelay(attempt, DelayTypeMedium)
|
||
return &FailoverResult{
|
||
RetryCount: 1,
|
||
DelayMs: delay,
|
||
Account: nextAccount,
|
||
}
|
||
|
||
case RetryLevelGiveUp:
|
||
// 放弃重试
|
||
return &FailoverResult{Success: false, Error: err}
|
||
}
|
||
|
||
return &FailoverResult{Success: false, Error: err}
|
||
}
|
||
```
|
||
|
||
#### 3.3.2 延迟计算
|
||
|
||
```go
|
||
// 延迟计算策略
|
||
func calculateDelay(attempt int, delayType DelayType) int {
|
||
switch delayType {
|
||
case DelayTypeShort:
|
||
// 短延迟:100ms - 400ms
|
||
return 100 + attempt*100
|
||
|
||
case DelayTypeMedium:
|
||
// 中延迟:200ms - 800ms
|
||
return 200 + attempt*200
|
||
|
||
case DelayTypeLong:
|
||
// 长延迟:500ms - 2000ms
|
||
return 500 + attempt*500
|
||
|
||
default:
|
||
return 100
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 3.3.3 账号池模式(备用方案)
|
||
|
||
> ⚠️ **修正**:`account_pool_mode.go` 是**备用/扩展方案**,主要账号选择使用加权评分系统。
|
||
|
||
```go
|
||
// service/account_pool_mode.go - 备用选择模式
|
||
type PoolMode struct {
|
||
Type string // least_load/round_robin/random/priority
|
||
MaxRetries int // 最大切换次数
|
||
Backoff string // 退避策略:linear/exponential
|
||
}
|
||
|
||
const (
|
||
PoolModeLeastLoad = "least_load" // 最小负载
|
||
PoolModeRoundRobin = "round_robin" // 轮询
|
||
PoolModeRandom = "random" // 随机
|
||
PoolModePriority = "priority" // 优先级
|
||
)
|
||
|
||
// 实际主要选择流程使用 SelectAccountWithLoadAwareness
|
||
// account_pool_mode 仅在特定场景下使用
|
||
```
|
||
|
||
**主要选择流程 vs 备用模式**:
|
||
|
||
| 选择方式 | 使用位置 | 说明 |
|
||
|---------|---------|------|
|
||
| `SelectAccountWithLoadAwareness` | gateway_service.go:1190 | **主要流程**:加权评分 + 粘性会话 + 并发控制 |
|
||
| `selectByLoadBalance` | openai_account_scheduler.go | **OpenAI 专用**:WebSocket 场景 |
|
||
| `selectAccount` (PoolMode) | account_pool_mode.go | **备用**:特定配置下使用 |
|
||
|
||
### 3.4 连接管理
|
||
|
||
#### 3.4.1 连接池配置
|
||
|
||
```go
|
||
// 上游HTTP客户端配置
|
||
type UpstreamClientConfig struct {
|
||
MaxIdleConns int // 最大空闲连接
|
||
MaxIdleConnsPerHost int // 每个主机最大空闲连接
|
||
IdleConnTimeout time.Duration // 空闲连接超时
|
||
DialTimeout time.Duration // 建立连接超时
|
||
ReadTimeout time.Duration // 读取超时
|
||
WriteTimeout time.Duration // 写超时
|
||
}
|
||
|
||
// 默认配置
|
||
var DefaultUpstreamConfig = UpstreamClientConfig{
|
||
MaxIdleConns: 100,
|
||
MaxIdleConnsPerHost: 30,
|
||
IdleConnTimeout: 90 * time.Second,
|
||
DialTimeout: 10 * time.Second,
|
||
ReadTimeout: 120 * time.Second,
|
||
WriteTimeout: 30 * time.Second,
|
||
}
|
||
```
|
||
|
||
#### 3.4.2 连接跟踪
|
||
|
||
```go
|
||
// 连接跟踪服务
|
||
type ConnectionTracker struct {
|
||
mutex sync.RWMutex
|
||
// 本地内存:实时连接数
|
||
connections map[int64]int64
|
||
|
||
// Redis:历史统计数据
|
||
redisStats map[int64]*HistoricalStats
|
||
}
|
||
|
||
func (t *ConnectionTracker) Increment(accountID int64) {
|
||
t.mutex.Lock()
|
||
defer t.mutex.Unlock()
|
||
t.connections[accountID]++
|
||
}
|
||
|
||
func (t *ConnectionTracker) Decrement(accountID int64) {
|
||
t.mutex.Lock()
|
||
defer t.mutex.Unlock()
|
||
t.connections[accountID]--
|
||
}
|
||
|
||
func (t *ConnectionTracker) GetActiveCount(accountID int64) int64 {
|
||
t.mutex.RLock()
|
||
defer t.mutex.RUnlock()
|
||
return t.connections[accountID]
|
||
}
|
||
```
|
||
|
||
## 4. 高级功能
|
||
|
||
### 4.1 预热机制
|
||
|
||
```go
|
||
// 新账号预热
|
||
func (s *AccountWarmupService) WarmupAccount(account *Account) error {
|
||
// 1. 发送轻量级请求建立连接
|
||
req := &Request{
|
||
Model: "claude-3-haiku-20240307",
|
||
MaxTokens: 1,
|
||
}
|
||
|
||
// 2. 预热连接池
|
||
for i := 0; i < 5; i++ {
|
||
_, err := s.sendRequest(account, req)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
// 3. 标记预热完成
|
||
account.WarmedUp = true
|
||
return nil
|
||
}
|
||
```
|
||
|
||
### 4.2 健康检查
|
||
|
||
```go
|
||
// 账号健康检查
|
||
func (s *HealthCheckService) CheckAccountHealth(accountID int64) HealthStatus {
|
||
// 1. 检查最后活跃时间
|
||
if time.Since(account.LastUsedAt) > 24*time.Hour {
|
||
return HealthStatusStale
|
||
}
|
||
|
||
// 2. 检查错误率
|
||
errorRate := s.getErrorRate(accountID)
|
||
if errorRate > 0.5 {
|
||
return HealthStatusUnhealthy
|
||
}
|
||
|
||
// 3. 检查响应时间
|
||
avgLatency := s.getAvgLatency(accountID)
|
||
if avgLatency > 10*time.Second {
|
||
return HealthStatusSlow
|
||
}
|
||
|
||
return HealthStatusHealthy
|
||
}
|
||
```
|
||
|
||
### 4.3 自适应负载
|
||
|
||
```go
|
||
// 自适应负载调整
|
||
func (s *AdaptiveLoadService) AdjustConcurrency(account *Account) {
|
||
// 1. 基于错误率调整
|
||
errorRate := s.getErrorRate(account.ID)
|
||
if errorRate > 0.1 {
|
||
// 错误率高,减少并发
|
||
account.MaxConcurrency = int(float64(account.MaxConcurrency) * 0.8)
|
||
}
|
||
|
||
// 2. 基于响应时间调整
|
||
avgLatency := s.getAvgLatency(account.ID)
|
||
if avgLatency > 5*time.Second {
|
||
account.MaxConcurrency = int(float64(account.MaxConcurrency) * 0.9)
|
||
}
|
||
|
||
// 3. 基于成功率恢复
|
||
successRate := 1 - errorRate
|
||
if successRate > 0.99 && account.MaxConcurrency < account.OriginalMaxConcurrency {
|
||
account.MaxConcurrency = int(float64(account.MaxConcurrency) * 1.1)
|
||
}
|
||
}
|
||
```
|
||
|
||
## 5. 配置参数
|
||
|
||
### 5.1 调度配置(config.yaml)
|
||
|
||
```yaml
|
||
scheduler:
|
||
# 账号选择策略
|
||
selection:
|
||
strategy: "least_load" # least_load/round_robin/random/priority
|
||
sticky_session: true
|
||
sticky_ttl: 3600
|
||
|
||
# 故障转移配置
|
||
failover:
|
||
max_retries: 3
|
||
retry_delay_ms: 100
|
||
max_retry_delay_ms: 8000
|
||
enable_same_account_retry: true
|
||
|
||
# 连接池配置
|
||
connection_pool:
|
||
max_idle_conns: 100
|
||
max_idle_per_host: 30
|
||
idle_timeout: 90s
|
||
dial_timeout: 10s
|
||
read_timeout: 120s
|
||
|
||
# 健康检查
|
||
health_check:
|
||
enabled: true
|
||
interval: 5m
|
||
timeout: 30s
|
||
```
|
||
|
||
### 5.2 环境变量
|
||
|
||
| 变量 | 说明 | 默认值 |
|
||
|------|------|--------|
|
||
| `SCHEDULER_SELECTION_STRATEGY` | 选择策略 | least_load |
|
||
| `SCHEDULER_MAX_RETRIES` | 最大重试次数 | 3 |
|
||
| `SCHEDULER_STICKY_SESSION` | 启用粘性会话 | true |
|
||
|
||
## 6. 修改和扩展指南
|
||
|
||
### 6.1 常见修改场景
|
||
|
||
**场景1:调整选择策略**
|
||
|
||
```go
|
||
// 修改为轮询模式
|
||
func (s *GatewayService) SelectAccount(ctx context.Context, groupID int64) (*Account, error) {
|
||
accounts := s.getAvailableAccounts(groupID)
|
||
|
||
// 使用轮询选择
|
||
index := s.getAndIncrementIndex(groupID)
|
||
return accounts[index % len(accounts)], nil
|
||
}
|
||
```
|
||
|
||
**场景2:添加新的负载指标**
|
||
|
||
```go
|
||
// 添加延迟作为负载指标
|
||
func (s *LoadFactorService) CalculateWithLatency(account *Account) float64 {
|
||
activeConns := float64(s.getActiveConnections(account.ID)) / float64(account.MaxConcurrency)
|
||
latency := s.getAvgLatency(account.ID).Seconds() / 10.0 // 归一化
|
||
|
||
return activeConns*0.7 + latency*0.3
|
||
}
|
||
```
|
||
|
||
**场景3:自定义重试策略**
|
||
|
||
```go
|
||
// 实现指数退避
|
||
func calculateExponentialBackoff(attempt int) int {
|
||
base := 100
|
||
maxDelay := 5000
|
||
delay := base * (1 << attempt) // 2^attempt
|
||
if delay > maxDelay {
|
||
delay = maxDelay
|
||
}
|
||
return delay
|
||
}
|
||
```
|
||
|
||
### 6.2 注意事项
|
||
|
||
1. **线程安全**:负载因子更新需要原子操作
|
||
2. **数据一致性**:Redis和本地缓存需要同步
|
||
3. **超时设置**:重试延迟要考虑用户体验
|
||
|
||
## 7. 测试覆盖
|
||
|
||
### 7.1 单元测试
|
||
|
||
| 测试文件 | 覆盖范围 |
|
||
|----------|----------|
|
||
| `account_load_factor_test.go` | 负载因子计算 |
|
||
| `failover_service_test.go` | 故障转移逻辑 |
|
||
| `account_pool_mode_test.go` | 账号池模式 |
|
||
|
||
### 7.2 集成测试
|
||
|
||
| 测试文件 | 场景 |
|
||
|----------|------|
|
||
| `e2e_gateway_test.go` | 完整调度流程 |
|
||
|
||
## 8. 监控与运维
|
||
|
||
### 8.1 关键指标
|
||
|
||
| 指标 | 告警阈值 | 说明 |
|
||
|------|----------|------|
|
||
| `scheduler_failover_count` | > 100/min | 故障转移次数 |
|
||
| `scheduler_avg_latency` | > 5s | 平均延迟 |
|
||
| `scheduler_account_errors` | > 10% | 账号错误率 |
|
||
| `scheduler_sticky_hit_rate` | < 80% | 粘性会话命中率 |
|
||
|
||
### 8.2 运维任务
|
||
|
||
| 任务 | 频率 | 说明 |
|
||
|------|------|------|
|
||
| 负载分析 | 每小时 | 分析账号负载分布 |
|
||
| 故障统计 | 每天 | 统计故障转移情况 |
|
||
| 连接池清理 | 每天 | 清理空闲连接 |
|
||
|
||
## 9. 总结
|
||
|
||
调度与负载均衡模块特点:
|
||
|
||
- **智能选择**:负载感知算法确保请求分发到最合适的账号
|
||
- **高可用**:完善的故障转移机制保障服务连续性
|
||
- **会话保持**:粘性会话提升用户体验
|
||
- **可配置**:多种调度模式满足不同场景
|
||
|
||
**潜在改进点:**
|
||
1. 可增加更多自适应能力
|
||
2. 可支持更复杂的调度规则
|
||
|
||
**修改建议:**
|
||
- 选择策略调整相对简单
|
||
- 重试策略需要充分测试
|
||
|
||
---
|
||
*文档版本:1.1*
|
||
*最后更新:2026-03-23*
|
||
*分析基于:Sub2API v0.1.104*
|
||
*修正内容:负载因子计算(加权评分系统)、账号池模式(备用方案)、实际选择流程* |