- Remove old review reports (keep latest only) - Move docs/ to deploy/docs-backup/ - Move performance-testing/ to deploy/ - Clean up test output files - Organize root directory
650 lines
19 KiB
Markdown
650 lines
19 KiB
Markdown
# Sub2API 模块分析报告:计费与配额模块
|
||
|
||
## 1. 模块概述
|
||
|
||
### 1.1 模块定位
|
||
计费与配额模块是Sub2API系统的经济核心,负责追踪用户和API Key的使用量、计算费用、管理配额、控制访问。该模块与网关模块紧密配合,在每个请求完成后进行用量记录和费用扣除。
|
||
|
||
### 1.2 核心职责
|
||
- **用量追踪**:记录每个请求的Token消耗和费用
|
||
- **配额控制**:用户和API Key级别的配额限制
|
||
- **计费计算**:根据模型和用量计算费用
|
||
- **余额管理**:用户余额的充值和扣减
|
||
- **速率限制**:RPM/TPM级别的请求限制
|
||
|
||
## 2. 代码结构分析
|
||
|
||
### 2.1 核心文件
|
||
|
||
| 文件路径 | 职责 | 代码行数 |
|
||
|---------|------|----------|
|
||
| `service/billing_service.go` | 计费核心服务 | ~500行 |
|
||
| `service/billing_cache_service.go` | 计费缓存服务(余额、配额检查) | ~700行 |
|
||
| `service/gateway_service.go` | **用量记录(RecordUsage 在此文件中)** | **8516行** |
|
||
| `repository/usage_log_repo.go` | 用量数据访问层 | ~4000行 |
|
||
| `repository/usage_billing_repo.go` | 计费数据访问层 | ~2000行 |
|
||
| `middleware/rate_limiter.go` | 速率限制中间件 | ~400行 |
|
||
| `service/api_key_rate_limit.go` | API Key级别限流 | ~300行 |
|
||
|
||
> ⚠️ **重要修正**:用量记录功能 `RecordUsage` 位于 `gateway_service.go:7483`,不是独立文件。
|
||
|
||
### 2.2 核心数据模型
|
||
|
||
```go
|
||
// 用量日志 - 每次请求的详细记录
|
||
type UsageLog struct {
|
||
ID int64
|
||
RequestID string // 请求唯一ID
|
||
UserID int64 // 用户ID
|
||
APIKeyID *int64 // API Key ID
|
||
AccountID int64 // 上游账号ID
|
||
GroupID int64 // 分组ID
|
||
Model string // 使用模型
|
||
UpstreamModel string // 上游实际模型
|
||
RequestType string // 请求类型:chat/completion/embedding等
|
||
InputTokens int // 输入Token数
|
||
OutputTokens int // 输出Token数
|
||
TotalTokens int // 总Token数
|
||
Cost float64 // 费用
|
||
Currency string // 货币:USD
|
||
Status string // 状态:success/error
|
||
DurationMs int // 请求耗时(毫秒)
|
||
CreatedAt time.Time
|
||
}
|
||
|
||
// 计费配置 - 模型价格定义
|
||
type PricingRule struct {
|
||
Model string // 模型名称(支持通配符)
|
||
InputPrice float64 // 输入价格(per 1M tokens)
|
||
OutputPrice float64 // 输出价格(per 1M tokens)
|
||
PromptPrice float64 // 提示价格(per 1M tokens)
|
||
CacheDiscount float64 // 缓存读取折扣
|
||
}
|
||
```
|
||
|
||
## 3. 功能详细分析
|
||
|
||
### 3.1 用量记录流程(后置处理)
|
||
|
||
> ⚠️ **重要说明**:用量记录发生在**请求转发成功后**,是后置处理步骤。
|
||
|
||
```go
|
||
// gateway_service.go:7483 - RecordUsage
|
||
func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInput) error {
|
||
// 1. 强制缓存计费(粘性会话切换时)
|
||
if input.ForceCacheBilling && result.Usage.InputTokens > 0 {
|
||
result.Usage.CacheReadInputTokens += result.Usage.InputTokens
|
||
result.Usage.InputTokens = 0
|
||
}
|
||
|
||
// 2. 计算费用(根据媒体类型选择计费方式)
|
||
var cost *CostBreakdown
|
||
if result.MediaType == "image" || result.MediaType == "video" {
|
||
cost = s.billingService.CalculateSoraImageCost(...)
|
||
} else if result.ImageCount > 0 {
|
||
cost = s.billingService.CalculateImageCost(...)
|
||
} else {
|
||
// Token 计费
|
||
cost, err = s.billingService.CalculateCost(billingModel, tokens, multiplier)
|
||
}
|
||
|
||
// 3. 创建用量日志(异步,不阻塞响应)
|
||
usageLog := &UsageLog{...}
|
||
go func() {
|
||
s.usageLogRepo.Create(context.Background(), usageLog)
|
||
}()
|
||
|
||
// 4. 异步计费扣费
|
||
billingCmd := &UsageBillingCommand{
|
||
UserID: user.ID,
|
||
APIKeyID: apiKey.ID,
|
||
Cost: cost.TotalCost,
|
||
RequestID: input.RequestID,
|
||
BillingFingerprint: fingerprint,
|
||
}
|
||
go func() {
|
||
s.usageBillingRepo.Apply(context.Background(), billingCmd)
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
```
|
||
|
||
**调用时机**:`RecordUsage` 在 handler 层请求成功返回后调用:
|
||
```go
|
||
// gateway_handler.go
|
||
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{...}); err != nil {
|
||
logger.Error("record usage failed", ...); // 不阻塞响应
|
||
}
|
||
```
|
||
|
||
**特点**:
|
||
- **后置处理**:请求成功后才记录,不影响响应延迟
|
||
- **异步执行**:日志和计费都异步执行
|
||
- **幂等设计**:使用 BillingFingerprint 防止重复计费
|
||
|
||
### 3.2 计费计算逻辑
|
||
|
||
> ⚠️ **修正**:计费通过 `UsageBillingRepository.Apply()` 原子执行,不是简单的 `CalculateCost`。
|
||
|
||
```go
|
||
// gateway_service.go - RecordUsage 中的计费流程
|
||
func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInput) error {
|
||
// 1. 计算费用
|
||
var cost *CostBreakdown
|
||
if result.MediaType == "image" {
|
||
cost = s.billingService.CalculateSoraImageCost(...)
|
||
} else {
|
||
cost, err = s.billingService.CalculateCost(billingModel, tokens, multiplier)
|
||
}
|
||
|
||
// 2. 构建计费命令
|
||
billingCmd := &UsageBillingCommand{
|
||
UserID: user.ID,
|
||
APIKeyID: apiKey.ID,
|
||
Cost: cost.TotalCost,
|
||
RequestID: input.RequestID,
|
||
// 幂等指纹,防止重复计费
|
||
BillingFingerprint: generateFingerprint(...),
|
||
}
|
||
|
||
// 3. 异步执行原子扣费
|
||
go func() {
|
||
result := s.usageBillingRepo.Apply(context.Background(), billingCmd)
|
||
if result.Applied {
|
||
logger.Info("billing_applied", "user_id", userID, "cost", cost)
|
||
}
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
|
||
// repository/usage_billing_repo.go - Apply (原子操作)
|
||
func (r *usageBillingRepository) Apply(ctx context.Context, cmd *UsageBillingCommand) (*UsageBillingResult) {
|
||
// 使用事务确保原子性
|
||
tx, err := r.client.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
|
||
if err != nil {
|
||
return &UsageBillingResult{Applied: false}
|
||
}
|
||
|
||
// 1. 检查幂等(防止重复计费)
|
||
if r.existsByFingerprint(ctx, cmd.BillingFingerprint) {
|
||
return &UsageBillingResult{Applied: false, Reason: "duplicate"}
|
||
}
|
||
|
||
// 2. 扣减余额
|
||
if err := tx.User.UpdateOneID(cmd.UserID).
|
||
AddBalance(-cmd.Cost).
|
||
Exec(ctx); err != nil {
|
||
tx.Rollback()
|
||
return &UsageBillingResult{Applied: false, Error: err}
|
||
}
|
||
|
||
// 3. 记录计费
|
||
r.createBillingRecord(ctx, tx, cmd)
|
||
|
||
// 4. 记录幂等指纹
|
||
r.createFingerprint(ctx, tx, cmd)
|
||
|
||
tx.Commit()
|
||
return &UsageBillingResult{Applied: true}
|
||
}
|
||
```
|
||
|
||
**计费因子优先级**:
|
||
```
|
||
费率倍数 = 系统默认值 (1.0)
|
||
或 分组配置 (group.RateMultiplier)
|
||
或 用户专属配置 (userGroupRateRepo)
|
||
```
|
||
|
||
**Token 类型与计费**:
|
||
```
|
||
标准输入 Token: InputTokens × InputPrice
|
||
标准输出 Token: OutputTokens × OutputPrice
|
||
缓存创建 (5分钟): CacheCreation5mTokens × CachePrice × 0.8
|
||
缓存创建 (1小时): CacheCreation1hTokens × CachePrice × 0.9
|
||
缓存读取: CacheReadInputTokens × CachePrice × 0.5
|
||
```
|
||
|
||
**定价配置示例(config.yaml)**:
|
||
```yaml
|
||
billing:
|
||
pricing:
|
||
claude-3-5-sonnet-20241022:
|
||
input: 3.00 # $3.00 / 1M tokens
|
||
output: 15.00 # $15.00 / 1M tokens
|
||
cache_read: 0.30 # 缓存读取折扣
|
||
```
|
||
|
||
### 3.3 配额控制
|
||
|
||
> ⚠️ **修正**:配额检查分散在多个位置,发生在**请求处理前**(前置验证)。
|
||
|
||
#### 3.3.1 实际配额检查流程
|
||
|
||
```
|
||
请求入口 (API Key Auth Middleware)
|
||
│
|
||
▼
|
||
┌─────────────────────────────────────────┐
|
||
│ api_key_auth.go: APIKeyAuth() │
|
||
│ │
|
||
│ 1. APIKeyService.GetByKey() │
|
||
│ → 验证 Key 有效性 │
|
||
│ │
|
||
│ 2. SubscriptionService.Validate() │
|
||
│ → 检查订阅状态和有效期 │
|
||
│ → 检查订阅配额 │
|
||
│ │
|
||
│ 3. BillingCacheService.CheckUserBalance()
|
||
│ → 检查用户余额 │
|
||
│ │
|
||
│ 4. APIKeyRateLimit.CheckRateLimits() │
|
||
│ → 检查 5h/1d/7d 限制 │
|
||
└─────────────────────────────────────────┘
|
||
│
|
||
▼
|
||
通过 → 继续请求
|
||
拒绝 → 返回错误 (402/429)
|
||
```
|
||
|
||
#### 3.3.2 API Key 验证流程(详细)
|
||
|
||
```go
|
||
// service/api_key_auth.go - APIKeyAuth()
|
||
func APIKeyAuth() gin.HandlerFunc {
|
||
return func(c *gin.Context) {
|
||
apiKey, user, err := apiKeyService.GetByKey(ctx, key)
|
||
if err != nil {
|
||
c.AbortWithStatusJSON(401, ...)
|
||
return
|
||
}
|
||
|
||
// 1. 订阅验证
|
||
if err := subscriptionService.Validate(ctx, user.ID, apiKey.GroupID); err != nil {
|
||
c.AbortWithStatusJSON(402, ...) // Payment Required
|
||
return
|
||
}
|
||
|
||
// 2. 余额检查
|
||
if err := billingCacheService.CheckUserBalance(ctx, user.ID); err != nil {
|
||
c.AbortWithStatusJSON(402, ...) // Payment Required
|
||
return
|
||
}
|
||
|
||
// 3. 速率限制检查
|
||
if err := apiKeyRateLimit.CheckRateLimits(ctx, apiKey.ID); err != nil {
|
||
c.AbortWithStatusJSON(429, ...) // Too Many Requests
|
||
return
|
||
}
|
||
|
||
// 4. 设置上下文
|
||
c.Set("api_key", apiKey)
|
||
c.Set("user", user)
|
||
c.Next()
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 3.3.3 BillingCacheService 余额检查
|
||
|
||
```go
|
||
// service/billing_cache_service.go - CheckUserBalance
|
||
func (s *BillingCacheService) CheckUserBalance(ctx context.Context, userID int64) error {
|
||
// 1. 获取缓存的余额
|
||
balance, err := s.cache.GetUserBalance(ctx, userID)
|
||
if err != nil {
|
||
// 2. 缓存未命中,从数据库加载
|
||
user, err := s.userRepo.GetByID(ctx, userID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
balance = user.Balance
|
||
s.cache.SetUserBalance(ctx, userID, balance)
|
||
}
|
||
|
||
// 3. 检查是否有足够余额(允许少量超支)
|
||
if balance < 0 {
|
||
return ErrInsufficientBalance
|
||
}
|
||
|
||
return nil
|
||
}
|
||
```
|
||
|
||
#### 3.3.4 配额更新(异步)
|
||
|
||
```go
|
||
// gateway_service.go - RecordUsage 中的配额更新
|
||
// 异步更新,不阻塞响应
|
||
go func() {
|
||
s.usageBillingRepo.Apply(ctx, &UsageBillingCommand{
|
||
UserID: userID,
|
||
APIKeyID: apiKeyID,
|
||
Cost: cost,
|
||
// ...
|
||
})
|
||
}()
|
||
```
|
||
|
||
### 3.4 速率限制
|
||
|
||
#### 3.4.1 多级限流架构
|
||
|
||
```
|
||
┌─────────────────────────────────────┐
|
||
│ 第一级:API Key 限流 │
|
||
│ 检查:5h/1d/7d 累计使用量 │
|
||
└─────────────────────────────────────┘
|
||
↓
|
||
┌─────────────────────────────────────┐
|
||
│ 第二级:用户限流 │
|
||
│ 检查:RPM (requests per minute) │
|
||
└─────────────────────────────────────┘
|
||
↓
|
||
┌─────────────────────────────────────┐
|
||
│ 第三级:IP 限流 │
|
||
│ 检查:防止暴力请求 │
|
||
└─────────────────────────────────────┘
|
||
```
|
||
|
||
#### 3.4.2 限流实现
|
||
|
||
```go
|
||
// middleware/rate_limiter.go - AllowRequest
|
||
func (r *RateLimiter) AllowRequest(key string, limit int, window time.Duration) bool {
|
||
// 1. 获取当前计数
|
||
count := r.getCount(key, window)
|
||
|
||
// 2. 检查是否超限
|
||
if count >= int64(limit) {
|
||
return false
|
||
}
|
||
|
||
// 3. 原子递增
|
||
return r.incr(key, window)
|
||
}
|
||
|
||
// 实际检查逻辑
|
||
func (s *BillingCacheService) checkRateLimits(ctx context.Context, apiKey *APIKey) error {
|
||
// 检查 5h 限制
|
||
if apiKey.RateLimit5h > 0 {
|
||
used5h := s.getUsage5h(apiKey.ID)
|
||
if used5h >= apiKey.RateLimit5h {
|
||
return ErrRateLimitExceeded
|
||
}
|
||
}
|
||
|
||
// 检查 1d 限制
|
||
if apiKey.RateLimit1d > 0 {
|
||
used1d := s.getUsage1d(apiKey.ID)
|
||
if used1d >= apiKey.RateLimit1d {
|
||
return ErrRateLimitExceeded
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
```
|
||
|
||
### 3.5 余额管理
|
||
|
||
#### 3.5.1 余额充值
|
||
|
||
```go
|
||
// service/admin_service.go - RechargeBalance
|
||
func (s *AdminService) RechargeBalance(ctx context.Context, userID int64, amount float64, reason string) error {
|
||
// 1. 获取用户
|
||
user, err := s.userRepo.GetByID(ctx, userID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 2. 更新余额
|
||
newBalance := user.Balance + amount
|
||
err = s.userRepo.UpdateBalance(ctx, userID, amount)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 3. 记录充值日志
|
||
s.recordTransaction(ctx, &Transaction{
|
||
UserID: userID,
|
||
Type: "recharge",
|
||
Amount: amount,
|
||
Balance: newBalance,
|
||
Reason: reason,
|
||
Operator: getCurrentUser(ctx),
|
||
})
|
||
|
||
return nil
|
||
}
|
||
```
|
||
|
||
#### 3.5.2 余额扣减
|
||
|
||
```go
|
||
// 在每次请求完成后自动扣减
|
||
func (s *BillingService) DeductBalance(ctx context.Context, userID int64, cost float64) error {
|
||
// 1. 原子操作扣减余额
|
||
err := s.userRepo.DeductBalance(ctx, userID, cost)
|
||
if err != nil {
|
||
// 余额不足,记录欠费
|
||
s.recordOverage(ctx, userID, cost)
|
||
return ErrInsufficientBalance
|
||
}
|
||
|
||
return nil
|
||
}
|
||
```
|
||
|
||
## 4. 缓存策略
|
||
|
||
### 4.1 多级缓存架构
|
||
|
||
```go
|
||
// 计费缓存设计
|
||
type BillingCache struct {
|
||
// L1: 本地内存缓存
|
||
// - 用户余额缓存 (TTL: 10s)
|
||
// - API Key配额缓存 (TTL: 5s)
|
||
// - 用量计数缓存 (TTL: 1s)
|
||
|
||
// L2: Redis缓存
|
||
// - 实时用量统计
|
||
// - 速率限制计数
|
||
|
||
// L3: 数据库
|
||
// - 详细用量日志
|
||
// - 余额变动历史
|
||
}
|
||
```
|
||
|
||
### 4.2 缓存更新策略
|
||
|
||
```go
|
||
// 用量记录更新策略
|
||
const (
|
||
CacheUpdateSync = "sync" // 同步更新
|
||
CacheUpdateAsync = "async" // 异步更新
|
||
CacheUpdateBatch = "batch" // 批量更新
|
||
)
|
||
|
||
// 当前策略:异步更新
|
||
func (s *BillingCacheService) QueueUpdateQuotaUsage(apiKeyID int64, cost float64) {
|
||
// 放入更新队列
|
||
s.updateQueue <- &QuotaUpdate{
|
||
APIKeyID: apiKeyID,
|
||
Cost: cost,
|
||
}
|
||
}
|
||
```
|
||
|
||
## 5. 配置参数
|
||
|
||
### 5.1 计费配置(config.yaml)
|
||
|
||
```yaml
|
||
billing:
|
||
# 默认计费配置
|
||
default:
|
||
rate_multiplier: 1.0
|
||
|
||
# 价格配置(每百万Token价格,USD)
|
||
pricing:
|
||
claude-3-5-sonnet-20241022:
|
||
input: 3.00
|
||
output: 15.00
|
||
claude-3-haiku-20240307:
|
||
input: 0.25
|
||
output: 1.25
|
||
gpt-4o:
|
||
input: 2.50
|
||
output: 10.00
|
||
|
||
# 缓存配置
|
||
cache:
|
||
quota_ttl: 5s
|
||
balance_ttl: 10s
|
||
rate_limit_ttl: 1s
|
||
|
||
# 熔断配置
|
||
circuit_breaker:
|
||
enabled: true
|
||
error_threshold: 0.1
|
||
timeout: 30s
|
||
```
|
||
|
||
### 5.2 速率限制配置
|
||
|
||
```yaml
|
||
rate_limit:
|
||
# API Key 默认限制
|
||
api_key:
|
||
rpm: 1000
|
||
tpm: 100000 # tokens per minute
|
||
|
||
# 用户默认限制
|
||
user:
|
||
rpm: 2000
|
||
|
||
# IP 默认限制
|
||
ip:
|
||
rpm: 5000
|
||
```
|
||
|
||
## 6. 修改和扩展指南
|
||
|
||
### 6.1 常见修改场景
|
||
|
||
**场景1:调整模型价格**
|
||
|
||
```go
|
||
// service/billing_service.go - getPricingRule
|
||
func (s *BillingService) getPricingRule(model string, groupID int64) *PricingRule {
|
||
// 添加新模型定价
|
||
customPricing := map[string]PricingRule{
|
||
"gpt-4-turbo": {
|
||
InputPrice: 10.00,
|
||
OutputPrice: 30.00,
|
||
},
|
||
}
|
||
|
||
if rule, ok := customPricing[model]; ok {
|
||
return &rule
|
||
}
|
||
|
||
return s.defaultPricing[model]
|
||
}
|
||
```
|
||
|
||
**场景2:调整API Key配额**
|
||
|
||
```go
|
||
// 修改默认配额
|
||
const (
|
||
DefaultRateLimit5h = 500000 // 从 100000 改为 500000
|
||
DefaultRateLimit1d = 2000000 // 从 500000 改为 2000000
|
||
)
|
||
```
|
||
|
||
**场景3:添加新的计费维度**
|
||
|
||
```go
|
||
// 例如:按请求次数计费
|
||
type RequestPricing struct {
|
||
Model string
|
||
PerRequestCost float64 // 每次请求固定费用
|
||
PerTokenCost float64 // Token费用
|
||
}
|
||
|
||
func (s *BillingService) CalculateWithRequestFee(model string, tokens int) float64 {
|
||
pricing := s.getRequestPricing(model)
|
||
return pricing.PerRequestCost + (float64(tokens) / 1_000_000 * pricing.PerTokenCost)
|
||
}
|
||
```
|
||
|
||
### 6.2 注意事项
|
||
|
||
1. **计费准确性**:费用计算必须精确,建议保留更多小数位
|
||
2. **并发安全**:余额更新需要原子操作
|
||
3. **数据一致性**:缓存和数据库需要定期同步
|
||
|
||
## 7. 测试覆盖
|
||
|
||
### 7.1 单元测试
|
||
|
||
| 测试文件 | 覆盖范围 |
|
||
|----------|----------|
|
||
| `billing_service_test.go` | 计费计算逻辑 |
|
||
| `billing_cache_service_test.go` | 缓存机制 |
|
||
| `rate_limiter_test.go` | 速率限制 |
|
||
|
||
### 7.2 集成测试
|
||
|
||
| 测试文件 | 场景 |
|
||
|----------|------|
|
||
| `e2e_gateway_test.go` | 完整计费流程 |
|
||
|
||
## 8. 监控与运维
|
||
|
||
### 8.1 关键指标
|
||
|
||
| 指标 | 告警阈值 | 说明 |
|
||
|------|----------|------|
|
||
| `billing_balance_zero` | > 10% | 余额为0用户比例 |
|
||
| `billing_quota_exhausted` | > 20% | 配额耗尽比例 |
|
||
| `billing_rate_limited` | > 15% | 触发限流比例 |
|
||
| `billing_cache_hit_rate` | < 90% | 缓存命中率 |
|
||
|
||
### 8.2 运维任务
|
||
|
||
| 任务 | 频率 | 说明 |
|
||
|------|------|------|
|
||
| 余额对账 | 每天 | 核对余额一致性 |
|
||
| 用量统计 | 每小时 | 生成统计报表 |
|
||
| 异常检测 | 持续 | 检测异常消费模式 |
|
||
|
||
## 9. 总结
|
||
|
||
计费与配额模块特点:
|
||
|
||
- **精细化计费**:按模型、Token类型、用户分组等多维度计费
|
||
- **实时配额控制**:多级限流保障系统稳定性
|
||
- **高性能缓存**:两级缓存确保高并发下的性能
|
||
- **余额保护**:支持少量超支,平衡用户体验和风险
|
||
|
||
**潜在改进点:**
|
||
1. 可增加更灵活的计费规则(如包月套餐)
|
||
2. 可增加更详细的费用分析报表
|
||
|
||
**修改建议:**
|
||
- 价格调整需要同步更新配置
|
||
- 限流参数可根据实际流量调整
|
||
|
||
---
|
||
*文档版本:1.1*
|
||
*最后更新:2026-03-23*
|
||
*分析基于:Sub2API v0.1.104*
|
||
*修正内容:用量记录位置(gateway_service.go)、配额检查流程(前置验证)、计费原子操作* |