Files
tokens-reef/deploy/docs-backup/MODULE_01_API_GATEWAY.md
Developer 349d783fd1 refactor: clean up project structure
- Remove old review reports (keep latest only)
- Move docs/ to deploy/docs-backup/
- Move performance-testing/ to deploy/
- Clean up test output files
- Organize root directory
2026-04-06 23:36:03 +08:00

487 lines
16 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Sub2API 模块分析报告API Gateway 核心
## 1. 模块概述
### 1.1 模块定位
API Gateway 是 Sub2API 的核心模块,负责接收用户请求、选择合适的上游账号、转发请求到 AI 提供商(如 Anthropic、OpenAI、Google Gemini 等),并处理响应。该模块是整个系统流量入口和转发核心。
### 1.2 核心职责
- **请求接收与路由**:接收来自用户的 AI API 请求
- **账号选择**:根据负载、可用性、优先级选择合适的上游账号
- **请求转发**:将请求转发到上游 AI 服务
- **故障处理**:账号故障时的自动重试和切换
- **响应返回**:将上游响应返回给用户
## 2. 代码结构分析
### 2.1 核心文件
| 文件路径 | 职责 | 代码行数 |
|---------|------|----------|
| `handler/gateway_handler.go` | Claude /v1/messages 主入口 | ~1800 行 |
| `handler/openai_chat_completions.go` | OpenAI /v1/chat/completions 处理器 | ~2500 行 |
| `handler/gemini_v1beta_handler.go` | Google Gemini /v1beta/* 处理器 | ~600 行 |
| `handler/sora_gateway_handler.go` | Sora 视频生成处理器 | ~500 行 |
| `service/gateway_service.go` | 核心网关逻辑,账号选择、重试、**8516 行** | 8516 行 |
| `service/openai_gateway_service.go` | OpenAI 特定处理WebSocket、流式 | ~4000 行 |
| `service/antigravity_gateway_service.go` | Antigravity 平台支持 | ~1500 行 |
### 2.2 目录结构
```
backend/internal/
├── handler/
│ ├── gateway_handler.go # Claude /v1/messages 主入口
│ ├── openai_chat_completions.go # OpenAI /v1/chat/completions
│ ├── openai_gateway_handler.go # OpenAI 兼容接口(含 SSE、WebSocket
│ ├── gemini_v1beta_handler.go # Google Gemini /v1beta/*
│ ├── sora_gateway_handler.go # Sora 视频生成
│ └── endpoint.go # 端点辅助函数
└── service/
├── gateway_service.go # 核心网关服务8516行
├── openai_gateway_service.go # OpenAI 特定处理
├── openai_ws_v2/ # OpenAI WebSocket v2 支持
├── antigravity_gateway_service.go # Antigravity 平台
├── openai_account_scheduler.go # OpenAI 账号调度器
└── gateway_anthropic_apikey_passthrough.go # API Key 透传
```
## 3. 功能详细分析
### 3.1 请求处理流程
```
用户请求 → 认证中间件 → 网关处理器 → 账号选择 → 请求转发 → 响应处理 → 返回结果
```
**关键步骤:**
1. **请求认证** (`api_key_auth.go`)
- 验证 API Key 有效性
- 获取用户和分组信息
- 检查配额和限流
2. **账号选择** (`gateway_service.go:SelectAccountWithLoadAwareness`)
- 检查分组下的可用账号
- 根据负载因子排序
- 支持粘性会话(同一会话路由到同一账号)
3. **请求转发** (`forwardRequest`)
- 构建上游请求
- 添加必要的认证头
- 处理流式/非流式响应
4. **故障处理** (`handleUpstreamError`)
- 单账号重试(同一账号重试)
- 账号切换(切换到其他账号)
- 退避策略(线性退避)
### 3.2 账号选择算法
```go
// gateway_service.go - SelectAccountWithLoadAwareness (实际实现)
func (s *GatewayService) SelectAccountWithLoadAwareness(
ctx context.Context,
groupID *int64,
sessionHash string,
requestedModel string,
excludedIDs map[int64]struct{},
metadataUserID string, // 已废弃参数
) (*AccountSelectionResult, error) {
// 1. 检查 Claude Code 限制(可能会替换 groupID 为降级分组)
group, groupID, err := s.checkClaudeCodeRestriction(ctx, groupID)
if err != nil {
return nil, err
}
ctx = s.withGroupContext(ctx, group)
// 2. 获取粘性会话绑定的账号
var stickyAccountID int64
if accountID, err := s.cache.GetSessionAccountID(ctx, derefGroupID(groupID), sessionHash); err == nil {
stickyAccountID = accountID
}
// 3. 尝试获取账号槽位
for {
account, err := s.SelectAccountForModelWithExclusions(ctx, groupID, sessionHash, requestedModel, localExcluded)
if err != nil {
return nil, err
}
// 3.1 尝试获取并发槽位
result, err := s.tryAcquireAccountSlot(ctx, account.ID, account.Concurrency)
if err == nil && result.Acquired {
// 3.2 检查会话限制
if !s.checkAndRegisterSession(ctx, account, sessionHash) {
result.ReleaseFunc()
localExcluded[account.ID] = struct{}{}
continue
}
return &AccountSelectionResult{Account: account, Acquired: true, ...}, nil
}
// 3.3 支持等待计划WaitPlan
// ...
}
}
```
**实际流程6 步):**
1. **Claude Code 限制检查** (`checkClaudeCodeRestriction`)
- 检测 Claude Code 客户端
- 可能替换 groupID 为降级分组
2. **粘性会话获取** (`cache.GetSessionAccountID`)
- 从 Redis 获取 sessionHash 绑定的账号
- 用于保持同一会话路由到同一账号
3. **并发槽位获取** (`tryAcquireAccountSlot`)
- 检查账号并发限制
- 使用 `ConcurrencyService` 管理槽位
4. **会话限制检查** (`checkAndRegisterSession`)
- 检查每个账号的会话数上限
- 支持等待队列
5. **等待计划 (WaitPlan)**
- 如果账号繁忙,返回等待计划
- 客户端可以等待重试
6. **混合调度** (`selectAccountWithMixedScheduling`)
- Anthropic/Gemini 分组支持混合调度
- 包含启用了 mixed_scheduling 的 Antigravity 账号
**关键特性:**
- **负载感知**:根据并发槽位和负载因子选择
- **粘性会话**:基于 sessionHash 保持会话
- **会话限制**:每个账号有最大会话数限制
- **等待计划**:支持队列等待而非直接拒绝
- **Claude Code 适配**:自动降级处理
### 3.3 故障重试策略
```go
// gateway_service.go - handleUpstreamError
func handleUpstreamError(...) *handleResult {
// 1. 同账号重试(短延迟)
if canRetrySameAccount(statusCode) {
return retryWithShortDelay(account)
}
// 2. 切换账号(稍长延迟)
if hasMoreAccounts(groupID) {
return switchToNextAccount(groupID, failedAccount)
}
// 3. 线性退避
return retryWithLinearBackoff(attemptNum)
}
```
**重试参数(可配置):**
- 单账号重试次数:默认 1 次
- 账号切换次数:默认 3 次
- 重试延迟100ms ~ 8000ms线性增长
### 3.4 支持的上游平台
| 平台 | 端点 | 认证方式 | 特性 |
|------|------|----------|------|
| **Anthropic Claude** | `/v1/messages` | OAuth/API Key | 流式响应,消息历史 |
| **OpenAI** | `/v1/chat/completions` | API Key | 多模型支持 |
| **Google Gemini** | `/v1beta/models/:generateContent` | API Key | 多模态支持 |
| **Antigravity** | `/v1/messages`, `/v1beta/` | OAuth | 独立配额系统 |
| **AWS Bedrock** | `/model invocation` | AWS 签名 | Claude on Bedrock |
## 4. 关键数据结构
### 4.1 GatewayService 依赖的服务
```go
// gateway_service.go - GatewayService 结构体
type GatewayService struct {
// === Repositories ===
accountRepo AccountRepository // 账号数据访问
groupRepo GroupRepository // 分组数据访问
usageLogRepo UsageLogRepository // 用量日志
usageBillingRepo UsageBillingRepository // 用量计费
userRepo UserRepository // 用户数据
userSubRepo UserSubscriptionRepository // 订阅数据
userGroupRateRepo UserGroupRateRepository // 用户分组费率
// === Services ===
billingService *BillingService // 计费服务
rateLimitService *RateLimitService // 限流服务
billingCacheService *BillingCacheService // 计费缓存
identityService *IdentityService // 身份识别
concurrencyService *ConcurrencyService // 并发控制
deferredService *DeferredService // 延迟操作
schedulerSnapshot *SchedulerSnapshotService // 调度快照
settingService SettingService // 系统设置
// === Cache & HTTP ===
cache GatewayCache // 网关缓存(粘性会话)
digestStore DigestSessionStore // 会话存储
httpUpstream HTTPUpstream // 上游 HTTP 客户端
}
```
### 4.2 账号选择结果
```go
type AccountSelectionResult struct {
Account *Account // 选中的账号
Acquired bool // 是否成功获取槽位
ReleaseFunc func() // 释放槽位函数
WaitPlan *AccountWaitPlan // 等待计划(如果 Acquired=false
}
type AccountWaitPlan struct {
AccountID int64 // 账号ID
MaxConcurrency int // 最大并发
Timeout time.Duration // 超时时间
MaxWaiting int // 最大等待数
}
```
### 4.3 用量记录输入
```go
type RecordUsageInput struct {
Result *ForwardResult // 转发结果
APIKey *APIKey // API Key
User *User // 用户
Account *Account // 上游账号
Subscription *Subscription // 订阅(如果有)
RequestID string // 请求ID
}
```
```
## 5. 性能与优化
### 5.1 性能瓶颈分析
| 瓶颈点 | 影响 | 优化建议 |
|-------|------|----------|
| 账号选择锁 | 高并发下争用 | 使用读写锁或本地缓存 |
| 重试延迟 | 响应时间增加 | 自适应退避算法 |
| 日志写入 | I/O 延迟 | 批量异步写入 |
### 5.2 GatewayService 核心方法
| 方法 | 位置 | 职责 |
|------|------|------|
| `HandleRequest` | gateway_service.go | 处理 Claude /v1/messages 请求 |
| `HandleOpenAIRequest` | openai_gateway_service.go | 处理 OpenAI 格式请求 |
| `SelectAccountWithLoadAwareness` | gateway_service.go:1190 | 负载感知账号选择 |
| `RecordUsage` | gateway_service.go:7483 | 用量记录与计费 |
| `tryAcquireAccountSlot` | gateway_service.go | 尝试获取并发槽位 |
| `checkAndRegisterSession` | gateway_service.go | 检查会话限制 |
### 5.3 缓存策略
- **API Key 缓存**两级缓存L1 内存 + L2 Redis
- **账号状态缓存**本地内存缓存5 秒过期
- **配额缓存**Redis 缓存,实时更新
- **粘性会话缓存**RedisTTL 1 小时
- **调度快照缓存**:定期快照加速选择
### 5.4 连接池管理
```go
// gateway_service.go - HTTPUpstream 接口
type HTTPUpstream interface {
Do(req *http.Request) (*http.Response, error)
// 每个账号维护独立的 HTTP 客户端
}
```
**连接管理策略:**
- 每个上游账号维护独立的 HTTP 客户端
- 使用连接池复用连接
- 超时配置:连接 10s读取 120s
- 支持 WebSocketSSE长连接
## 6. 安全考虑
### 6.1 请求验证
- **模型白名单**:验证请求模型是否在分组允许列表中
- **IP 限制**:支持 API Key 级别的 IP 白名单
- **请求限流**:基于用户/分组的 RPM/TPM 限制
### 6.2 上游保护
- **URL 白名单**:限制可访问的上游域名
- **Header 过滤**:移除敏感的上游响应头
- **超时控制**:防止上游响应超时
## 7. 可扩展性设计
### 7.1 新增上游支持
要支持新的 AI 提供商,需要:
1. **实现账号类型**`account.go`
```go
type AccountType string
const (
AccountTypeOAuth AccountType = "oauth"
AccountTypeAPIKey AccountType = "apikey"
AccountTypeAnthropic AccountType = "anthropic"
// 新增类型
AccountTypeCustom AccountType = "custom"
)
```
2. **实现请求转换器**`service/request_transformer.go`
- 将标准请求格式转换为目标平台格式
- 处理认证、模型映射等
3. **注册到网关**`server/routes/gateway.go`
```go
router.POST("/custom/v1/*path", middleware.APIKeyAuth, customHandler)
```
### 7.2 插件化设计
当前架构支持:
- **错误处理插件**:通过 `ErrorPassthroughRule` 配置
- **请求/响应拦截**:通过 Hook 机制
- **自定义认证**:支持 OAuth、API Key 等多种方式
## 8. 测试覆盖
### 8.1 单元测试
| 测试文件 | 覆盖范围 |
|----------|----------|
| `gateway_account_selection_test.go` | 账号选择算法 |
| `gateway_handler_stream_failover_test.go` | 流式响应故障转移 |
| `gateway_helper_backoff_test.go` | 退避算法 |
### 8.2 集成测试
| 测试文件 | 场景 |
|----------|------|
| `e2e_gateway_test.go` | 完整请求流程 |
| `e2e_user_flow_test.go` | 用户使用场景 |
## 9. 配置参数
### 9.1 网关配置config.yaml
```yaml
gateway:
# 重试配置
max_retries: 3
retry_delay_ms: 100
max_retry_delay_ms: 8000
# 超时配置
request_timeout: 120s
connect_timeout: 10s
# 粘性会话
sticky_session_ttl: 3600
# 流式响应
stream_buffer_size: 32768
```
### 9.2 环境变量
| 变量 | 说明 | 默认值 |
|------|------|--------|
| `GATEWAY_MAX_RETRIES` | 最大重试次数 | 3 |
| `GATEWAY_REQUEST_TIMEOUT` | 请求超时 | 120s |
| `GATEWAY_STICKY_SESSION` | 启用粘性会话 | true |
## 10. 监控与运维
### 10.1 关键指标
| 指标 | 告警阈值 | 说明 |
|------|----------|------|
| `gateway_request_duration` | > 30s | 请求延迟 |
| `gateway_account_failover` | > 10% | 账号切换率 |
| `gateway_upstream_errors` | > 1% | 上游错误率 |
### 10.2 日志分析
关键日志字段:
- `request_id`:请求唯一标识
- `account_id`:使用的账号 ID
- `upstream_status`:上游响应状态
- `retry_count`:重试次数
- `failover_reason`:故障切换原因
## 11. 修改和扩展指南
### 11.1 常见修改场景
**场景 1修改重试策略**
修改文件:`service/gateway_service.go`
```go
func (s *GatewayService) handleUpstreamError(...) {
// 修改重试次数
maxRetries := 5 // 从 3 改为 5
// 修改退避算法
delay := calculateExponentialBackoff(attempt) // 改为指数退避
}
```
**场景 2添加新的上游支持**
1. 在 `domain/constants.go` 添加账号类型
2. 实现账号创建/验证逻辑
3. 添加请求转发函数
4. 注册路由
**场景 3调整账号选择算法**
修改文件:`service/gateway_service.go`
```go
func (s *GatewayService) selectAccountByStrategy(...) {
// 可以添加更多选择策略
switch strategy {
case "least_load":
return selectByLeastLoad(accounts)
case "round_robin":
return selectByRoundRobin(accounts)
case "priority":
return selectByPriority(accounts)
}
}
```
### 11.2 注意事项
1. **线程安全**:账号选择逻辑需要考虑并发安全
2. **向后兼容**:新增配置需要设置合理的默认值
3. **测试覆盖**:重大修改需要添加对应的单元测试
## 12. 总结
API Gateway 是 Sub2API 系统的核心模块,设计具有良好的灵活性和可扩展性。关键特点:
- **多平台支持**:统一接口接入多种 AI 提供商
- **智能调度**:负载感知 + 粘性会话
- **故障容忍**:自动重试 + 账号切换
- **高性能**:多级缓存 + 连接池复用
修改建议:
- 重试策略调整相对简单,风险较低
- 新增上游支持需要完善测试
- 账号选择算法优化需充分验证
---
*文档版本1.1*
*最后更新2026-03-23*
*分析基于Sub2API v0.1.104*
*修正内容文件路径、账号选择算法6步流程、GatewayService依赖服务*