Files
tokens-reef/deploy/docs-backup/MODULE_05_BILLING.md
Developer 349d783fd1 refactor: clean up project structure
- Remove old review reports (keep latest only)
- Move docs/ to deploy/docs-backup/
- Move performance-testing/ to deploy/
- Clean up test output files
- Organize root directory
2026-04-06 23:36:03 +08:00

19 KiB
Raw Permalink Blame History

Sub2API 模块分析报告:计费与配额模块

1. 模块概述

1.1 模块定位

计费与配额模块是Sub2API系统的经济核心负责追踪用户和API Key的使用量、计算费用、管理配额、控制访问。该模块与网关模块紧密配合在每个请求完成后进行用量记录和费用扣除。

1.2 核心职责

  • 用量追踪记录每个请求的Token消耗和费用
  • 配额控制用户和API Key级别的配额限制
  • 计费计算:根据模型和用量计算费用
  • 余额管理:用户余额的充值和扣减
  • 速率限制RPM/TPM级别的请求限制

2. 代码结构分析

2.1 核心文件

文件路径 职责 代码行数
service/billing_service.go 计费核心服务 ~500行
service/billing_cache_service.go 计费缓存服务(余额、配额检查) ~700行
service/gateway_service.go 用量记录RecordUsage 在此文件中) 8516行
repository/usage_log_repo.go 用量数据访问层 ~4000行
repository/usage_billing_repo.go 计费数据访问层 ~2000行
middleware/rate_limiter.go 速率限制中间件 ~400行
service/api_key_rate_limit.go API Key级别限流 ~300行

⚠️ 重要修正:用量记录功能 RecordUsage 位于 gateway_service.go:7483,不是独立文件。

2.2 核心数据模型

// 用量日志 - 每次请求的详细记录
type UsageLog struct {
    ID              int64
    RequestID       string     // 请求唯一ID
    UserID          int64      // 用户ID
    APIKeyID        *int64    // API Key ID
    AccountID       int64      // 上游账号ID
    GroupID         int64      // 分组ID
    Model           string     // 使用模型
    UpstreamModel   string     // 上游实际模型
    RequestType     string     // 请求类型chat/completion/embedding等
    InputTokens     int        // 输入Token数
    OutputTokens    int        // 输出Token数
    TotalTokens     int        // 总Token数
    Cost            float64   // 费用
    Currency        string     // 货币USD
    Status          string     // 状态success/error
    DurationMs      int        // 请求耗时(毫秒)
    CreatedAt       time.Time
}

// 计费配置 - 模型价格定义
type PricingRule struct {
    Model         string    // 模型名称(支持通配符)
    InputPrice   float64   // 输入价格per 1M tokens
    OutputPrice  float64   // 输出价格per 1M tokens
    PromptPrice  float64   // 提示价格per 1M tokens
    CacheDiscount float64  // 缓存读取折扣
}

3. 功能详细分析

3.1 用量记录流程(后置处理)

⚠️ 重要说明:用量记录发生在请求转发成功后,是后置处理步骤。

// gateway_service.go:7483 - RecordUsage
func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInput) error {
    // 1. 强制缓存计费(粘性会话切换时)
    if input.ForceCacheBilling && result.Usage.InputTokens > 0 {
        result.Usage.CacheReadInputTokens += result.Usage.InputTokens
        result.Usage.InputTokens = 0
    }
    
    // 2. 计算费用(根据媒体类型选择计费方式)
    var cost *CostBreakdown
    if result.MediaType == "image" || result.MediaType == "video" {
        cost = s.billingService.CalculateSoraImageCost(...)
    } else if result.ImageCount > 0 {
        cost = s.billingService.CalculateImageCost(...)
    } else {
        // Token 计费
        cost, err = s.billingService.CalculateCost(billingModel, tokens, multiplier)
    }
    
    // 3. 创建用量日志(异步,不阻塞响应)
    usageLog := &UsageLog{...}
    go func() {
        s.usageLogRepo.Create(context.Background(), usageLog)
    }()
    
    // 4. 异步计费扣费
    billingCmd := &UsageBillingCommand{
        UserID:         user.ID,
        APIKeyID:       apiKey.ID,
        Cost:           cost.TotalCost,
        RequestID:      input.RequestID,
        BillingFingerprint: fingerprint,
    }
    go func() {
        s.usageBillingRepo.Apply(context.Background(), billingCmd)
    }()
    
    return nil
}

调用时机RecordUsage 在 handler 层请求成功返回后调用:

// gateway_handler.go
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{...}); err != nil {
    logger.Error("record usage failed", ...);  // 不阻塞响应
}

特点

  • 后置处理:请求成功后才记录,不影响响应延迟
  • 异步执行:日志和计费都异步执行
  • 幂等设计:使用 BillingFingerprint 防止重复计费

3.2 计费计算逻辑

⚠️ 修正:计费通过 UsageBillingRepository.Apply() 原子执行,不是简单的 CalculateCost

// gateway_service.go - RecordUsage 中的计费流程
func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInput) error {
    // 1. 计算费用
    var cost *CostBreakdown
    if result.MediaType == "image" {
        cost = s.billingService.CalculateSoraImageCost(...)
    } else {
        cost, err = s.billingService.CalculateCost(billingModel, tokens, multiplier)
    }
    
    // 2. 构建计费命令
    billingCmd := &UsageBillingCommand{
        UserID:         user.ID,
        APIKeyID:       apiKey.ID,
        Cost:           cost.TotalCost,
        RequestID:      input.RequestID,
        // 幂等指纹,防止重复计费
        BillingFingerprint: generateFingerprint(...),
    }
    
    // 3. 异步执行原子扣费
    go func() {
        result := s.usageBillingRepo.Apply(context.Background(), billingCmd)
        if result.Applied {
            logger.Info("billing_applied", "user_id", userID, "cost", cost)
        }
    }()
    
    return nil
}

// repository/usage_billing_repo.go - Apply (原子操作)
func (r *usageBillingRepository) Apply(ctx context.Context, cmd *UsageBillingCommand) (*UsageBillingResult) {
    // 使用事务确保原子性
    tx, err := r.client.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
    if err != nil {
        return &UsageBillingResult{Applied: false}
    }
    
    // 1. 检查幂等(防止重复计费)
    if r.existsByFingerprint(ctx, cmd.BillingFingerprint) {
        return &UsageBillingResult{Applied: false, Reason: "duplicate"}
    }
    
    // 2. 扣减余额
    if err := tx.User.UpdateOneID(cmd.UserID).
        AddBalance(-cmd.Cost).
        Exec(ctx); err != nil {
        tx.Rollback()
        return &UsageBillingResult{Applied: false, Error: err}
    }
    
    // 3. 记录计费
    r.createBillingRecord(ctx, tx, cmd)
    
    // 4. 记录幂等指纹
    r.createFingerprint(ctx, tx, cmd)
    
    tx.Commit()
    return &UsageBillingResult{Applied: true}
}

计费因子优先级

费率倍数 = 系统默认值 (1.0)
       或 分组配置 (group.RateMultiplier)
       或 用户专属配置 (userGroupRateRepo)

Token 类型与计费

标准输入 Token:     InputTokens × InputPrice
标准输出 Token:     OutputTokens × OutputPrice
缓存创建 (5分钟):   CacheCreation5mTokens × CachePrice × 0.8
缓存创建 (1小时):   CacheCreation1hTokens × CachePrice × 0.9
缓存读取:           CacheReadInputTokens × CachePrice × 0.5

定价配置示例config.yaml

billing:
  pricing:
    claude-3-5-sonnet-20241022:
      input: 3.00      # $3.00 / 1M tokens
      output: 15.00    # $15.00 / 1M tokens
      cache_read: 0.30  # 缓存读取折扣

3.3 配额控制

⚠️ 修正:配额检查分散在多个位置,发生在请求处理前(前置验证)。

3.3.1 实际配额检查流程

请求入口 (API Key Auth Middleware)
         │
         ▼
┌─────────────────────────────────────────┐
│  api_key_auth.go: APIKeyAuth()          │
│                                         │
│  1. APIKeyService.GetByKey()           │
│     → 验证 Key 有效性                   │
│                                         │
│  2. SubscriptionService.Validate()      │
│     → 检查订阅状态和有效期              │
│     → 检查订阅配额                      │
│                                         │
│  3. BillingCacheService.CheckUserBalance()
│     → 检查用户余额                      │
│                                         │
│  4. APIKeyRateLimit.CheckRateLimits()   │
│     → 检查 5h/1d/7d 限制               │
└─────────────────────────────────────────┘
         │
         ▼
    通过 → 继续请求
    拒绝 → 返回错误 (402/429)

3.3.2 API Key 验证流程(详细)

// service/api_key_auth.go - APIKeyAuth()
func APIKeyAuth() gin.HandlerFunc {
    return func(c *gin.Context) {
        apiKey, user, err := apiKeyService.GetByKey(ctx, key)
        if err != nil {
            c.AbortWithStatusJSON(401, ...)
            return
        }
        
        // 1. 订阅验证
        if err := subscriptionService.Validate(ctx, user.ID, apiKey.GroupID); err != nil {
            c.AbortWithStatusJSON(402, ...)  // Payment Required
            return
        }
        
        // 2. 余额检查
        if err := billingCacheService.CheckUserBalance(ctx, user.ID); err != nil {
            c.AbortWithStatusJSON(402, ...)  // Payment Required
            return
        }
        
        // 3. 速率限制检查
        if err := apiKeyRateLimit.CheckRateLimits(ctx, apiKey.ID); err != nil {
            c.AbortWithStatusJSON(429, ...)  // Too Many Requests
            return
        }
        
        // 4. 设置上下文
        c.Set("api_key", apiKey)
        c.Set("user", user)
        c.Next()
    }
}

3.3.3 BillingCacheService 余额检查

// service/billing_cache_service.go - CheckUserBalance
func (s *BillingCacheService) CheckUserBalance(ctx context.Context, userID int64) error {
    // 1. 获取缓存的余额
    balance, err := s.cache.GetUserBalance(ctx, userID)
    if err != nil {
        // 2. 缓存未命中,从数据库加载
        user, err := s.userRepo.GetByID(ctx, userID)
        if err != nil {
            return err
        }
        balance = user.Balance
        s.cache.SetUserBalance(ctx, userID, balance)
    }
    
    // 3. 检查是否有足够余额(允许少量超支)
    if balance < 0 {
        return ErrInsufficientBalance
    }
    
    return nil
}

3.3.4 配额更新(异步)

// gateway_service.go - RecordUsage 中的配额更新
// 异步更新,不阻塞响应
go func() {
    s.usageBillingRepo.Apply(ctx, &UsageBillingCommand{
        UserID: userID,
        APIKeyID: apiKeyID,
        Cost: cost,
        // ...
    })
}()

3.4 速率限制

3.4.1 多级限流架构

┌─────────────────────────────────────┐
│        第一级API Key 限流          │
│   检查5h/1d/7d 累计使用量          │
└─────────────────────────────────────┘
                ↓
┌─────────────────────────────────────┐
│        第二级:用户限流              │
│   检查RPM (requests per minute)   │
└─────────────────────────────────────┘
                ↓
┌─────────────────────────────────────┐
│        第三级IP 限流               │
│   检查:防止暴力请求                │
└─────────────────────────────────────┘

3.4.2 限流实现

// middleware/rate_limiter.go - AllowRequest
func (r *RateLimiter) AllowRequest(key string, limit int, window time.Duration) bool {
    // 1. 获取当前计数
    count := r.getCount(key, window)
    
    // 2. 检查是否超限
    if count >= int64(limit) {
        return false
    }
    
    // 3. 原子递增
    return r.incr(key, window)
}

// 实际检查逻辑
func (s *BillingCacheService) checkRateLimits(ctx context.Context, apiKey *APIKey) error {
    // 检查 5h 限制
    if apiKey.RateLimit5h > 0 {
        used5h := s.getUsage5h(apiKey.ID)
        if used5h >= apiKey.RateLimit5h {
            return ErrRateLimitExceeded
        }
    }
    
    // 检查 1d 限制
    if apiKey.RateLimit1d > 0 {
        used1d := s.getUsage1d(apiKey.ID)
        if used1d >= apiKey.RateLimit1d {
            return ErrRateLimitExceeded
        }
    }
    
    return nil
}

3.5 余额管理

3.5.1 余额充值

// service/admin_service.go - RechargeBalance
func (s *AdminService) RechargeBalance(ctx context.Context, userID int64, amount float64, reason string) error {
    // 1. 获取用户
    user, err := s.userRepo.GetByID(ctx, userID)
    if err != nil {
        return err
    }
    
    // 2. 更新余额
    newBalance := user.Balance + amount
    err = s.userRepo.UpdateBalance(ctx, userID, amount)
    if err != nil {
        return err
    }
    
    // 3. 记录充值日志
    s.recordTransaction(ctx, &Transaction{
        UserID:    userID,
        Type:      "recharge",
        Amount:    amount,
        Balance:   newBalance,
        Reason:    reason,
        Operator:  getCurrentUser(ctx),
    })
    
    return nil
}

3.5.2 余额扣减

// 在每次请求完成后自动扣减
func (s *BillingService) DeductBalance(ctx context.Context, userID int64, cost float64) error {
    // 1. 原子操作扣减余额
    err := s.userRepo.DeductBalance(ctx, userID, cost)
    if err != nil {
        // 余额不足,记录欠费
        s.recordOverage(ctx, userID, cost)
        return ErrInsufficientBalance
    }
    
    return nil
}

4. 缓存策略

4.1 多级缓存架构

// 计费缓存设计
type BillingCache struct {
    // L1: 本地内存缓存
    // - 用户余额缓存 (TTL: 10s)
    // - API Key配额缓存 (TTL: 5s)
    // - 用量计数缓存 (TTL: 1s)
    
    // L2: Redis缓存
    // - 实时用量统计
    // - 速率限制计数
    
    // L3: 数据库
    // - 详细用量日志
    // - 余额变动历史
}

4.2 缓存更新策略

// 用量记录更新策略
const (
    CacheUpdateSync    = "sync"     // 同步更新
    CacheUpdateAsync   = "async"    // 异步更新
    CacheUpdateBatch   = "batch"    // 批量更新
)

// 当前策略:异步更新
func (s *BillingCacheService) QueueUpdateQuotaUsage(apiKeyID int64, cost float64) {
    // 放入更新队列
    s.updateQueue <- &QuotaUpdate{
        APIKeyID: apiKeyID,
        Cost:     cost,
    }
}

5. 配置参数

5.1 计费配置config.yaml

billing:
  # 默认计费配置
  default:
    rate_multiplier: 1.0
    
  # 价格配置每百万Token价格USD
  pricing:
    claude-3-5-sonnet-20241022:
      input: 3.00
      output: 15.00
    claude-3-haiku-20240307:
      input: 0.25
      output: 1.25
    gpt-4o:
      input: 2.50
      output: 10.00
      
  # 缓存配置
  cache:
    quota_ttl: 5s
    balance_ttl: 10s
    rate_limit_ttl: 1s
    
  # 熔断配置
  circuit_breaker:
    enabled: true
    error_threshold: 0.1
    timeout: 30s

5.2 速率限制配置

rate_limit:
  # API Key 默认限制
  api_key:
    rpm: 1000
    tpm: 100000  # tokens per minute
    
  # 用户默认限制
  user:
    rpm: 2000
    
  # IP 默认限制  
  ip:
    rpm: 5000

6. 修改和扩展指南

6.1 常见修改场景

场景1调整模型价格

// service/billing_service.go - getPricingRule
func (s *BillingService) getPricingRule(model string, groupID int64) *PricingRule {
    // 添加新模型定价
    customPricing := map[string]PricingRule{
        "gpt-4-turbo": {
            InputPrice:  10.00,
            OutputPrice: 30.00,
        },
    }
    
    if rule, ok := customPricing[model]; ok {
        return &rule
    }
    
    return s.defaultPricing[model]
}

场景2调整API Key配额

// 修改默认配额
const (
    DefaultRateLimit5h = 500000   // 从 100000 改为 500000
    DefaultRateLimit1d = 2000000  // 从 500000 改为 2000000
)

场景3添加新的计费维度

// 例如:按请求次数计费
type RequestPricing struct {
    Model         string
    PerRequestCost float64  // 每次请求固定费用
    PerTokenCost  float64   // Token费用
}

func (s *BillingService) CalculateWithRequestFee(model string, tokens int) float64 {
    pricing := s.getRequestPricing(model)
    return pricing.PerRequestCost + (float64(tokens) / 1_000_000 * pricing.PerTokenCost)
}

6.2 注意事项

  1. 计费准确性:费用计算必须精确,建议保留更多小数位
  2. 并发安全:余额更新需要原子操作
  3. 数据一致性:缓存和数据库需要定期同步

7. 测试覆盖

7.1 单元测试

测试文件 覆盖范围
billing_service_test.go 计费计算逻辑
billing_cache_service_test.go 缓存机制
rate_limiter_test.go 速率限制

7.2 集成测试

测试文件 场景
e2e_gateway_test.go 完整计费流程

8. 监控与运维

8.1 关键指标

指标 告警阈值 说明
billing_balance_zero > 10% 余额为0用户比例
billing_quota_exhausted > 20% 配额耗尽比例
billing_rate_limited > 15% 触发限流比例
billing_cache_hit_rate < 90% 缓存命中率

8.2 运维任务

任务 频率 说明
余额对账 每天 核对余额一致性
用量统计 每小时 生成统计报表
异常检测 持续 检测异常消费模式

9. 总结

计费与配额模块特点:

  • 精细化计费按模型、Token类型、用户分组等多维度计费
  • 实时配额控制:多级限流保障系统稳定性
  • 高性能缓存:两级缓存确保高并发下的性能
  • 余额保护:支持少量超支,平衡用户体验和风险

潜在改进点:

  1. 可增加更灵活的计费规则(如包月套餐)
  2. 可增加更详细的费用分析报表

修改建议:

  • 价格调整需要同步更新配置
  • 限流参数可根据实际流量调整

文档版本1.1 最后更新2026-03-23 分析基于Sub2API v0.1.104 修正内容用量记录位置gateway_service.go、配额检查流程前置验证、计费原子操作