# Sub2API 模块分析报告:计费与配额模块 ## 1. 模块概述 ### 1.1 模块定位 计费与配额模块是Sub2API系统的经济核心,负责追踪用户和API Key的使用量、计算费用、管理配额、控制访问。该模块与网关模块紧密配合,在每个请求完成后进行用量记录和费用扣除。 ### 1.2 核心职责 - **用量追踪**:记录每个请求的Token消耗和费用 - **配额控制**:用户和API Key级别的配额限制 - **计费计算**:根据模型和用量计算费用 - **余额管理**:用户余额的充值和扣减 - **速率限制**:RPM/TPM级别的请求限制 ## 2. 代码结构分析 ### 2.1 核心文件 | 文件路径 | 职责 | 代码行数 | |---------|------|----------| | `service/billing_service.go` | 计费核心服务 | ~500行 | | `service/billing_cache_service.go` | 计费缓存服务(余额、配额检查) | ~700行 | | `service/gateway_service.go` | **用量记录(RecordUsage 在此文件中)** | **8516行** | | `repository/usage_log_repo.go` | 用量数据访问层 | ~4000行 | | `repository/usage_billing_repo.go` | 计费数据访问层 | ~2000行 | | `middleware/rate_limiter.go` | 速率限制中间件 | ~400行 | | `service/api_key_rate_limit.go` | API Key级别限流 | ~300行 | > ⚠️ **重要修正**:用量记录功能 `RecordUsage` 位于 `gateway_service.go:7483`,不是独立文件。 ### 2.2 核心数据模型 ```go // 用量日志 - 每次请求的详细记录 type UsageLog struct { ID int64 RequestID string // 请求唯一ID UserID int64 // 用户ID APIKeyID *int64 // API Key ID AccountID int64 // 上游账号ID GroupID int64 // 分组ID Model string // 使用模型 UpstreamModel string // 上游实际模型 RequestType string // 请求类型:chat/completion/embedding等 InputTokens int // 输入Token数 OutputTokens int // 输出Token数 TotalTokens int // 总Token数 Cost float64 // 费用 Currency string // 货币:USD Status string // 状态:success/error DurationMs int // 请求耗时(毫秒) CreatedAt time.Time } // 计费配置 - 模型价格定义 type PricingRule struct { Model string // 模型名称(支持通配符) InputPrice float64 // 输入价格(per 1M tokens) OutputPrice float64 // 输出价格(per 1M tokens) PromptPrice float64 // 提示价格(per 1M tokens) CacheDiscount float64 // 缓存读取折扣 } ``` ## 3. 功能详细分析 ### 3.1 用量记录流程(后置处理) > ⚠️ **重要说明**:用量记录发生在**请求转发成功后**,是后置处理步骤。 ```go // gateway_service.go:7483 - RecordUsage func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInput) error { // 1. 强制缓存计费(粘性会话切换时) if input.ForceCacheBilling && result.Usage.InputTokens > 0 { result.Usage.CacheReadInputTokens += result.Usage.InputTokens result.Usage.InputTokens = 0 } // 2. 计算费用(根据媒体类型选择计费方式) var cost *CostBreakdown if result.MediaType == "image" || result.MediaType == "video" { cost = s.billingService.CalculateSoraImageCost(...) } else if result.ImageCount > 0 { cost = s.billingService.CalculateImageCost(...) } else { // Token 计费 cost, err = s.billingService.CalculateCost(billingModel, tokens, multiplier) } // 3. 创建用量日志(异步,不阻塞响应) usageLog := &UsageLog{...} go func() { s.usageLogRepo.Create(context.Background(), usageLog) }() // 4. 异步计费扣费 billingCmd := &UsageBillingCommand{ UserID: user.ID, APIKeyID: apiKey.ID, Cost: cost.TotalCost, RequestID: input.RequestID, BillingFingerprint: fingerprint, } go func() { s.usageBillingRepo.Apply(context.Background(), billingCmd) }() return nil } ``` **调用时机**:`RecordUsage` 在 handler 层请求成功返回后调用: ```go // gateway_handler.go if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{...}); err != nil { logger.Error("record usage failed", ...); // 不阻塞响应 } ``` **特点**: - **后置处理**:请求成功后才记录,不影响响应延迟 - **异步执行**:日志和计费都异步执行 - **幂等设计**:使用 BillingFingerprint 防止重复计费 ### 3.2 计费计算逻辑 > ⚠️ **修正**:计费通过 `UsageBillingRepository.Apply()` 原子执行,不是简单的 `CalculateCost`。 ```go // gateway_service.go - RecordUsage 中的计费流程 func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInput) error { // 1. 计算费用 var cost *CostBreakdown if result.MediaType == "image" { cost = s.billingService.CalculateSoraImageCost(...) } else { cost, err = s.billingService.CalculateCost(billingModel, tokens, multiplier) } // 2. 构建计费命令 billingCmd := &UsageBillingCommand{ UserID: user.ID, APIKeyID: apiKey.ID, Cost: cost.TotalCost, RequestID: input.RequestID, // 幂等指纹,防止重复计费 BillingFingerprint: generateFingerprint(...), } // 3. 异步执行原子扣费 go func() { result := s.usageBillingRepo.Apply(context.Background(), billingCmd) if result.Applied { logger.Info("billing_applied", "user_id", userID, "cost", cost) } }() return nil } // repository/usage_billing_repo.go - Apply (原子操作) func (r *usageBillingRepository) Apply(ctx context.Context, cmd *UsageBillingCommand) (*UsageBillingResult) { // 使用事务确保原子性 tx, err := r.client.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) if err != nil { return &UsageBillingResult{Applied: false} } // 1. 检查幂等(防止重复计费) if r.existsByFingerprint(ctx, cmd.BillingFingerprint) { return &UsageBillingResult{Applied: false, Reason: "duplicate"} } // 2. 扣减余额 if err := tx.User.UpdateOneID(cmd.UserID). AddBalance(-cmd.Cost). Exec(ctx); err != nil { tx.Rollback() return &UsageBillingResult{Applied: false, Error: err} } // 3. 记录计费 r.createBillingRecord(ctx, tx, cmd) // 4. 记录幂等指纹 r.createFingerprint(ctx, tx, cmd) tx.Commit() return &UsageBillingResult{Applied: true} } ``` **计费因子优先级**: ``` 费率倍数 = 系统默认值 (1.0) 或 分组配置 (group.RateMultiplier) 或 用户专属配置 (userGroupRateRepo) ``` **Token 类型与计费**: ``` 标准输入 Token: InputTokens × InputPrice 标准输出 Token: OutputTokens × OutputPrice 缓存创建 (5分钟): CacheCreation5mTokens × CachePrice × 0.8 缓存创建 (1小时): CacheCreation1hTokens × CachePrice × 0.9 缓存读取: CacheReadInputTokens × CachePrice × 0.5 ``` **定价配置示例(config.yaml)**: ```yaml billing: pricing: claude-3-5-sonnet-20241022: input: 3.00 # $3.00 / 1M tokens output: 15.00 # $15.00 / 1M tokens cache_read: 0.30 # 缓存读取折扣 ``` ### 3.3 配额控制 > ⚠️ **修正**:配额检查分散在多个位置,发生在**请求处理前**(前置验证)。 #### 3.3.1 实际配额检查流程 ``` 请求入口 (API Key Auth Middleware) │ ▼ ┌─────────────────────────────────────────┐ │ api_key_auth.go: APIKeyAuth() │ │ │ │ 1. APIKeyService.GetByKey() │ │ → 验证 Key 有效性 │ │ │ │ 2. SubscriptionService.Validate() │ │ → 检查订阅状态和有效期 │ │ → 检查订阅配额 │ │ │ │ 3. BillingCacheService.CheckUserBalance() │ → 检查用户余额 │ │ │ │ 4. APIKeyRateLimit.CheckRateLimits() │ │ → 检查 5h/1d/7d 限制 │ └─────────────────────────────────────────┘ │ ▼ 通过 → 继续请求 拒绝 → 返回错误 (402/429) ``` #### 3.3.2 API Key 验证流程(详细) ```go // service/api_key_auth.go - APIKeyAuth() func APIKeyAuth() gin.HandlerFunc { return func(c *gin.Context) { apiKey, user, err := apiKeyService.GetByKey(ctx, key) if err != nil { c.AbortWithStatusJSON(401, ...) return } // 1. 订阅验证 if err := subscriptionService.Validate(ctx, user.ID, apiKey.GroupID); err != nil { c.AbortWithStatusJSON(402, ...) // Payment Required return } // 2. 余额检查 if err := billingCacheService.CheckUserBalance(ctx, user.ID); err != nil { c.AbortWithStatusJSON(402, ...) // Payment Required return } // 3. 速率限制检查 if err := apiKeyRateLimit.CheckRateLimits(ctx, apiKey.ID); err != nil { c.AbortWithStatusJSON(429, ...) // Too Many Requests return } // 4. 设置上下文 c.Set("api_key", apiKey) c.Set("user", user) c.Next() } } ``` #### 3.3.3 BillingCacheService 余额检查 ```go // service/billing_cache_service.go - CheckUserBalance func (s *BillingCacheService) CheckUserBalance(ctx context.Context, userID int64) error { // 1. 获取缓存的余额 balance, err := s.cache.GetUserBalance(ctx, userID) if err != nil { // 2. 缓存未命中,从数据库加载 user, err := s.userRepo.GetByID(ctx, userID) if err != nil { return err } balance = user.Balance s.cache.SetUserBalance(ctx, userID, balance) } // 3. 检查是否有足够余额(允许少量超支) if balance < 0 { return ErrInsufficientBalance } return nil } ``` #### 3.3.4 配额更新(异步) ```go // gateway_service.go - RecordUsage 中的配额更新 // 异步更新,不阻塞响应 go func() { s.usageBillingRepo.Apply(ctx, &UsageBillingCommand{ UserID: userID, APIKeyID: apiKeyID, Cost: cost, // ... }) }() ``` ### 3.4 速率限制 #### 3.4.1 多级限流架构 ``` ┌─────────────────────────────────────┐ │ 第一级:API Key 限流 │ │ 检查:5h/1d/7d 累计使用量 │ └─────────────────────────────────────┘ ↓ ┌─────────────────────────────────────┐ │ 第二级:用户限流 │ │ 检查:RPM (requests per minute) │ └─────────────────────────────────────┘ ↓ ┌─────────────────────────────────────┐ │ 第三级:IP 限流 │ │ 检查:防止暴力请求 │ └─────────────────────────────────────┘ ``` #### 3.4.2 限流实现 ```go // middleware/rate_limiter.go - AllowRequest func (r *RateLimiter) AllowRequest(key string, limit int, window time.Duration) bool { // 1. 获取当前计数 count := r.getCount(key, window) // 2. 检查是否超限 if count >= int64(limit) { return false } // 3. 原子递增 return r.incr(key, window) } // 实际检查逻辑 func (s *BillingCacheService) checkRateLimits(ctx context.Context, apiKey *APIKey) error { // 检查 5h 限制 if apiKey.RateLimit5h > 0 { used5h := s.getUsage5h(apiKey.ID) if used5h >= apiKey.RateLimit5h { return ErrRateLimitExceeded } } // 检查 1d 限制 if apiKey.RateLimit1d > 0 { used1d := s.getUsage1d(apiKey.ID) if used1d >= apiKey.RateLimit1d { return ErrRateLimitExceeded } } return nil } ``` ### 3.5 余额管理 #### 3.5.1 余额充值 ```go // service/admin_service.go - RechargeBalance func (s *AdminService) RechargeBalance(ctx context.Context, userID int64, amount float64, reason string) error { // 1. 获取用户 user, err := s.userRepo.GetByID(ctx, userID) if err != nil { return err } // 2. 更新余额 newBalance := user.Balance + amount err = s.userRepo.UpdateBalance(ctx, userID, amount) if err != nil { return err } // 3. 记录充值日志 s.recordTransaction(ctx, &Transaction{ UserID: userID, Type: "recharge", Amount: amount, Balance: newBalance, Reason: reason, Operator: getCurrentUser(ctx), }) return nil } ``` #### 3.5.2 余额扣减 ```go // 在每次请求完成后自动扣减 func (s *BillingService) DeductBalance(ctx context.Context, userID int64, cost float64) error { // 1. 原子操作扣减余额 err := s.userRepo.DeductBalance(ctx, userID, cost) if err != nil { // 余额不足,记录欠费 s.recordOverage(ctx, userID, cost) return ErrInsufficientBalance } return nil } ``` ## 4. 缓存策略 ### 4.1 多级缓存架构 ```go // 计费缓存设计 type BillingCache struct { // L1: 本地内存缓存 // - 用户余额缓存 (TTL: 10s) // - API Key配额缓存 (TTL: 5s) // - 用量计数缓存 (TTL: 1s) // L2: Redis缓存 // - 实时用量统计 // - 速率限制计数 // L3: 数据库 // - 详细用量日志 // - 余额变动历史 } ``` ### 4.2 缓存更新策略 ```go // 用量记录更新策略 const ( CacheUpdateSync = "sync" // 同步更新 CacheUpdateAsync = "async" // 异步更新 CacheUpdateBatch = "batch" // 批量更新 ) // 当前策略:异步更新 func (s *BillingCacheService) QueueUpdateQuotaUsage(apiKeyID int64, cost float64) { // 放入更新队列 s.updateQueue <- &QuotaUpdate{ APIKeyID: apiKeyID, Cost: cost, } } ``` ## 5. 配置参数 ### 5.1 计费配置(config.yaml) ```yaml billing: # 默认计费配置 default: rate_multiplier: 1.0 # 价格配置(每百万Token价格,USD) pricing: claude-3-5-sonnet-20241022: input: 3.00 output: 15.00 claude-3-haiku-20240307: input: 0.25 output: 1.25 gpt-4o: input: 2.50 output: 10.00 # 缓存配置 cache: quota_ttl: 5s balance_ttl: 10s rate_limit_ttl: 1s # 熔断配置 circuit_breaker: enabled: true error_threshold: 0.1 timeout: 30s ``` ### 5.2 速率限制配置 ```yaml rate_limit: # API Key 默认限制 api_key: rpm: 1000 tpm: 100000 # tokens per minute # 用户默认限制 user: rpm: 2000 # IP 默认限制 ip: rpm: 5000 ``` ## 6. 修改和扩展指南 ### 6.1 常见修改场景 **场景1:调整模型价格** ```go // service/billing_service.go - getPricingRule func (s *BillingService) getPricingRule(model string, groupID int64) *PricingRule { // 添加新模型定价 customPricing := map[string]PricingRule{ "gpt-4-turbo": { InputPrice: 10.00, OutputPrice: 30.00, }, } if rule, ok := customPricing[model]; ok { return &rule } return s.defaultPricing[model] } ``` **场景2:调整API Key配额** ```go // 修改默认配额 const ( DefaultRateLimit5h = 500000 // 从 100000 改为 500000 DefaultRateLimit1d = 2000000 // 从 500000 改为 2000000 ) ``` **场景3:添加新的计费维度** ```go // 例如:按请求次数计费 type RequestPricing struct { Model string PerRequestCost float64 // 每次请求固定费用 PerTokenCost float64 // Token费用 } func (s *BillingService) CalculateWithRequestFee(model string, tokens int) float64 { pricing := s.getRequestPricing(model) return pricing.PerRequestCost + (float64(tokens) / 1_000_000 * pricing.PerTokenCost) } ``` ### 6.2 注意事项 1. **计费准确性**:费用计算必须精确,建议保留更多小数位 2. **并发安全**:余额更新需要原子操作 3. **数据一致性**:缓存和数据库需要定期同步 ## 7. 测试覆盖 ### 7.1 单元测试 | 测试文件 | 覆盖范围 | |----------|----------| | `billing_service_test.go` | 计费计算逻辑 | | `billing_cache_service_test.go` | 缓存机制 | | `rate_limiter_test.go` | 速率限制 | ### 7.2 集成测试 | 测试文件 | 场景 | |----------|------| | `e2e_gateway_test.go` | 完整计费流程 | ## 8. 监控与运维 ### 8.1 关键指标 | 指标 | 告警阈值 | 说明 | |------|----------|------| | `billing_balance_zero` | > 10% | 余额为0用户比例 | | `billing_quota_exhausted` | > 20% | 配额耗尽比例 | | `billing_rate_limited` | > 15% | 触发限流比例 | | `billing_cache_hit_rate` | < 90% | 缓存命中率 | ### 8.2 运维任务 | 任务 | 频率 | 说明 | |------|------|------| | 余额对账 | 每天 | 核对余额一致性 | | 用量统计 | 每小时 | 生成统计报表 | | 异常检测 | 持续 | 检测异常消费模式 | ## 9. 总结 计费与配额模块特点: - **精细化计费**:按模型、Token类型、用户分组等多维度计费 - **实时配额控制**:多级限流保障系统稳定性 - **高性能缓存**:两级缓存确保高并发下的性能 - **余额保护**:支持少量超支,平衡用户体验和风险 **潜在改进点:** 1. 可增加更灵活的计费规则(如包月套餐) 2. 可增加更详细的费用分析报表 **修改建议:** - 价格调整需要同步更新配置 - 限流参数可根据实际流量调整 --- *文档版本:1.1* *最后更新:2026-03-23* *分析基于:Sub2API v0.1.104* *修正内容:用量记录位置(gateway_service.go)、配额检查流程(前置验证)、计费原子操作*