Files
tokens-reef/deploy/docs-backup/MODULE_05_BILLING.md

650 lines
19 KiB
Markdown
Raw Permalink Normal View History

# Sub2API 模块分析报告:计费与配额模块
## 1. 模块概述
### 1.1 模块定位
计费与配额模块是Sub2API系统的经济核心负责追踪用户和API Key的使用量、计算费用、管理配额、控制访问。该模块与网关模块紧密配合在每个请求完成后进行用量记录和费用扣除。
### 1.2 核心职责
- **用量追踪**记录每个请求的Token消耗和费用
- **配额控制**用户和API Key级别的配额限制
- **计费计算**:根据模型和用量计算费用
- **余额管理**:用户余额的充值和扣减
- **速率限制**RPM/TPM级别的请求限制
## 2. 代码结构分析
### 2.1 核心文件
| 文件路径 | 职责 | 代码行数 |
|---------|------|----------|
| `service/billing_service.go` | 计费核心服务 | ~500行 |
| `service/billing_cache_service.go` | 计费缓存服务(余额、配额检查) | ~700行 |
| `service/gateway_service.go` | **用量记录RecordUsage 在此文件中)** | **8516行** |
| `repository/usage_log_repo.go` | 用量数据访问层 | ~4000行 |
| `repository/usage_billing_repo.go` | 计费数据访问层 | ~2000行 |
| `middleware/rate_limiter.go` | 速率限制中间件 | ~400行 |
| `service/api_key_rate_limit.go` | API Key级别限流 | ~300行 |
> ⚠️ **重要修正**:用量记录功能 `RecordUsage` 位于 `gateway_service.go:7483`,不是独立文件。
### 2.2 核心数据模型
```go
// 用量日志 - 每次请求的详细记录
type UsageLog struct {
ID int64
RequestID string // 请求唯一ID
UserID int64 // 用户ID
APIKeyID *int64 // API Key ID
AccountID int64 // 上游账号ID
GroupID int64 // 分组ID
Model string // 使用模型
UpstreamModel string // 上游实际模型
RequestType string // 请求类型chat/completion/embedding等
InputTokens int // 输入Token数
OutputTokens int // 输出Token数
TotalTokens int // 总Token数
Cost float64 // 费用
Currency string // 货币USD
Status string // 状态success/error
DurationMs int // 请求耗时(毫秒)
CreatedAt time.Time
}
// 计费配置 - 模型价格定义
type PricingRule struct {
Model string // 模型名称(支持通配符)
InputPrice float64 // 输入价格per 1M tokens
OutputPrice float64 // 输出价格per 1M tokens
PromptPrice float64 // 提示价格per 1M tokens
CacheDiscount float64 // 缓存读取折扣
}
```
## 3. 功能详细分析
### 3.1 用量记录流程(后置处理)
> ⚠️ **重要说明**:用量记录发生在**请求转发成功后**,是后置处理步骤。
```go
// gateway_service.go:7483 - RecordUsage
func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInput) error {
// 1. 强制缓存计费(粘性会话切换时)
if input.ForceCacheBilling && result.Usage.InputTokens > 0 {
result.Usage.CacheReadInputTokens += result.Usage.InputTokens
result.Usage.InputTokens = 0
}
// 2. 计算费用(根据媒体类型选择计费方式)
var cost *CostBreakdown
if result.MediaType == "image" || result.MediaType == "video" {
cost = s.billingService.CalculateSoraImageCost(...)
} else if result.ImageCount > 0 {
cost = s.billingService.CalculateImageCost(...)
} else {
// Token 计费
cost, err = s.billingService.CalculateCost(billingModel, tokens, multiplier)
}
// 3. 创建用量日志(异步,不阻塞响应)
usageLog := &UsageLog{...}
go func() {
s.usageLogRepo.Create(context.Background(), usageLog)
}()
// 4. 异步计费扣费
billingCmd := &UsageBillingCommand{
UserID: user.ID,
APIKeyID: apiKey.ID,
Cost: cost.TotalCost,
RequestID: input.RequestID,
BillingFingerprint: fingerprint,
}
go func() {
s.usageBillingRepo.Apply(context.Background(), billingCmd)
}()
return nil
}
```
**调用时机**`RecordUsage` 在 handler 层请求成功返回后调用:
```go
// gateway_handler.go
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{...}); err != nil {
logger.Error("record usage failed", ...); // 不阻塞响应
}
```
**特点**
- **后置处理**:请求成功后才记录,不影响响应延迟
- **异步执行**:日志和计费都异步执行
- **幂等设计**:使用 BillingFingerprint 防止重复计费
### 3.2 计费计算逻辑
> ⚠️ **修正**:计费通过 `UsageBillingRepository.Apply()` 原子执行,不是简单的 `CalculateCost`。
```go
// gateway_service.go - RecordUsage 中的计费流程
func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInput) error {
// 1. 计算费用
var cost *CostBreakdown
if result.MediaType == "image" {
cost = s.billingService.CalculateSoraImageCost(...)
} else {
cost, err = s.billingService.CalculateCost(billingModel, tokens, multiplier)
}
// 2. 构建计费命令
billingCmd := &UsageBillingCommand{
UserID: user.ID,
APIKeyID: apiKey.ID,
Cost: cost.TotalCost,
RequestID: input.RequestID,
// 幂等指纹,防止重复计费
BillingFingerprint: generateFingerprint(...),
}
// 3. 异步执行原子扣费
go func() {
result := s.usageBillingRepo.Apply(context.Background(), billingCmd)
if result.Applied {
logger.Info("billing_applied", "user_id", userID, "cost", cost)
}
}()
return nil
}
// repository/usage_billing_repo.go - Apply (原子操作)
func (r *usageBillingRepository) Apply(ctx context.Context, cmd *UsageBillingCommand) (*UsageBillingResult) {
// 使用事务确保原子性
tx, err := r.client.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
if err != nil {
return &UsageBillingResult{Applied: false}
}
// 1. 检查幂等(防止重复计费)
if r.existsByFingerprint(ctx, cmd.BillingFingerprint) {
return &UsageBillingResult{Applied: false, Reason: "duplicate"}
}
// 2. 扣减余额
if err := tx.User.UpdateOneID(cmd.UserID).
AddBalance(-cmd.Cost).
Exec(ctx); err != nil {
tx.Rollback()
return &UsageBillingResult{Applied: false, Error: err}
}
// 3. 记录计费
r.createBillingRecord(ctx, tx, cmd)
// 4. 记录幂等指纹
r.createFingerprint(ctx, tx, cmd)
tx.Commit()
return &UsageBillingResult{Applied: true}
}
```
**计费因子优先级**
```
费率倍数 = 系统默认值 (1.0)
或 分组配置 (group.RateMultiplier)
或 用户专属配置 (userGroupRateRepo)
```
**Token 类型与计费**
```
标准输入 Token: InputTokens × InputPrice
标准输出 Token: OutputTokens × OutputPrice
缓存创建 (5分钟): CacheCreation5mTokens × CachePrice × 0.8
缓存创建 (1小时): CacheCreation1hTokens × CachePrice × 0.9
缓存读取: CacheReadInputTokens × CachePrice × 0.5
```
**定价配置示例config.yaml**
```yaml
billing:
pricing:
claude-3-5-sonnet-20241022:
input: 3.00 # $3.00 / 1M tokens
output: 15.00 # $15.00 / 1M tokens
cache_read: 0.30 # 缓存读取折扣
```
### 3.3 配额控制
> ⚠️ **修正**:配额检查分散在多个位置,发生在**请求处理前**(前置验证)。
#### 3.3.1 实际配额检查流程
```
请求入口 (API Key Auth Middleware)
┌─────────────────────────────────────────┐
│ api_key_auth.go: APIKeyAuth() │
│ │
│ 1. APIKeyService.GetByKey() │
│ → 验证 Key 有效性 │
│ │
│ 2. SubscriptionService.Validate() │
│ → 检查订阅状态和有效期 │
│ → 检查订阅配额 │
│ │
│ 3. BillingCacheService.CheckUserBalance()
│ → 检查用户余额 │
│ │
│ 4. APIKeyRateLimit.CheckRateLimits() │
│ → 检查 5h/1d/7d 限制 │
└─────────────────────────────────────────┘
通过 → 继续请求
拒绝 → 返回错误 (402/429)
```
#### 3.3.2 API Key 验证流程(详细)
```go
// service/api_key_auth.go - APIKeyAuth()
func APIKeyAuth() gin.HandlerFunc {
return func(c *gin.Context) {
apiKey, user, err := apiKeyService.GetByKey(ctx, key)
if err != nil {
c.AbortWithStatusJSON(401, ...)
return
}
// 1. 订阅验证
if err := subscriptionService.Validate(ctx, user.ID, apiKey.GroupID); err != nil {
c.AbortWithStatusJSON(402, ...) // Payment Required
return
}
// 2. 余额检查
if err := billingCacheService.CheckUserBalance(ctx, user.ID); err != nil {
c.AbortWithStatusJSON(402, ...) // Payment Required
return
}
// 3. 速率限制检查
if err := apiKeyRateLimit.CheckRateLimits(ctx, apiKey.ID); err != nil {
c.AbortWithStatusJSON(429, ...) // Too Many Requests
return
}
// 4. 设置上下文
c.Set("api_key", apiKey)
c.Set("user", user)
c.Next()
}
}
```
#### 3.3.3 BillingCacheService 余额检查
```go
// service/billing_cache_service.go - CheckUserBalance
func (s *BillingCacheService) CheckUserBalance(ctx context.Context, userID int64) error {
// 1. 获取缓存的余额
balance, err := s.cache.GetUserBalance(ctx, userID)
if err != nil {
// 2. 缓存未命中,从数据库加载
user, err := s.userRepo.GetByID(ctx, userID)
if err != nil {
return err
}
balance = user.Balance
s.cache.SetUserBalance(ctx, userID, balance)
}
// 3. 检查是否有足够余额(允许少量超支)
if balance < 0 {
return ErrInsufficientBalance
}
return nil
}
```
#### 3.3.4 配额更新(异步)
```go
// gateway_service.go - RecordUsage 中的配额更新
// 异步更新,不阻塞响应
go func() {
s.usageBillingRepo.Apply(ctx, &UsageBillingCommand{
UserID: userID,
APIKeyID: apiKeyID,
Cost: cost,
// ...
})
}()
```
### 3.4 速率限制
#### 3.4.1 多级限流架构
```
┌─────────────────────────────────────┐
│ 第一级API Key 限流 │
│ 检查5h/1d/7d 累计使用量 │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 第二级:用户限流 │
│ 检查RPM (requests per minute) │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 第三级IP 限流 │
│ 检查:防止暴力请求 │
└─────────────────────────────────────┘
```
#### 3.4.2 限流实现
```go
// middleware/rate_limiter.go - AllowRequest
func (r *RateLimiter) AllowRequest(key string, limit int, window time.Duration) bool {
// 1. 获取当前计数
count := r.getCount(key, window)
// 2. 检查是否超限
if count >= int64(limit) {
return false
}
// 3. 原子递增
return r.incr(key, window)
}
// 实际检查逻辑
func (s *BillingCacheService) checkRateLimits(ctx context.Context, apiKey *APIKey) error {
// 检查 5h 限制
if apiKey.RateLimit5h > 0 {
used5h := s.getUsage5h(apiKey.ID)
if used5h >= apiKey.RateLimit5h {
return ErrRateLimitExceeded
}
}
// 检查 1d 限制
if apiKey.RateLimit1d > 0 {
used1d := s.getUsage1d(apiKey.ID)
if used1d >= apiKey.RateLimit1d {
return ErrRateLimitExceeded
}
}
return nil
}
```
### 3.5 余额管理
#### 3.5.1 余额充值
```go
// service/admin_service.go - RechargeBalance
func (s *AdminService) RechargeBalance(ctx context.Context, userID int64, amount float64, reason string) error {
// 1. 获取用户
user, err := s.userRepo.GetByID(ctx, userID)
if err != nil {
return err
}
// 2. 更新余额
newBalance := user.Balance + amount
err = s.userRepo.UpdateBalance(ctx, userID, amount)
if err != nil {
return err
}
// 3. 记录充值日志
s.recordTransaction(ctx, &Transaction{
UserID: userID,
Type: "recharge",
Amount: amount,
Balance: newBalance,
Reason: reason,
Operator: getCurrentUser(ctx),
})
return nil
}
```
#### 3.5.2 余额扣减
```go
// 在每次请求完成后自动扣减
func (s *BillingService) DeductBalance(ctx context.Context, userID int64, cost float64) error {
// 1. 原子操作扣减余额
err := s.userRepo.DeductBalance(ctx, userID, cost)
if err != nil {
// 余额不足,记录欠费
s.recordOverage(ctx, userID, cost)
return ErrInsufficientBalance
}
return nil
}
```
## 4. 缓存策略
### 4.1 多级缓存架构
```go
// 计费缓存设计
type BillingCache struct {
// L1: 本地内存缓存
// - 用户余额缓存 (TTL: 10s)
// - API Key配额缓存 (TTL: 5s)
// - 用量计数缓存 (TTL: 1s)
// L2: Redis缓存
// - 实时用量统计
// - 速率限制计数
// L3: 数据库
// - 详细用量日志
// - 余额变动历史
}
```
### 4.2 缓存更新策略
```go
// 用量记录更新策略
const (
CacheUpdateSync = "sync" // 同步更新
CacheUpdateAsync = "async" // 异步更新
CacheUpdateBatch = "batch" // 批量更新
)
// 当前策略:异步更新
func (s *BillingCacheService) QueueUpdateQuotaUsage(apiKeyID int64, cost float64) {
// 放入更新队列
s.updateQueue <- &QuotaUpdate{
APIKeyID: apiKeyID,
Cost: cost,
}
}
```
## 5. 配置参数
### 5.1 计费配置config.yaml
```yaml
billing:
# 默认计费配置
default:
rate_multiplier: 1.0
# 价格配置每百万Token价格USD
pricing:
claude-3-5-sonnet-20241022:
input: 3.00
output: 15.00
claude-3-haiku-20240307:
input: 0.25
output: 1.25
gpt-4o:
input: 2.50
output: 10.00
# 缓存配置
cache:
quota_ttl: 5s
balance_ttl: 10s
rate_limit_ttl: 1s
# 熔断配置
circuit_breaker:
enabled: true
error_threshold: 0.1
timeout: 30s
```
### 5.2 速率限制配置
```yaml
rate_limit:
# API Key 默认限制
api_key:
rpm: 1000
tpm: 100000 # tokens per minute
# 用户默认限制
user:
rpm: 2000
# IP 默认限制
ip:
rpm: 5000
```
## 6. 修改和扩展指南
### 6.1 常见修改场景
**场景1调整模型价格**
```go
// service/billing_service.go - getPricingRule
func (s *BillingService) getPricingRule(model string, groupID int64) *PricingRule {
// 添加新模型定价
customPricing := map[string]PricingRule{
"gpt-4-turbo": {
InputPrice: 10.00,
OutputPrice: 30.00,
},
}
if rule, ok := customPricing[model]; ok {
return &rule
}
return s.defaultPricing[model]
}
```
**场景2调整API Key配额**
```go
// 修改默认配额
const (
DefaultRateLimit5h = 500000 // 从 100000 改为 500000
DefaultRateLimit1d = 2000000 // 从 500000 改为 2000000
)
```
**场景3添加新的计费维度**
```go
// 例如:按请求次数计费
type RequestPricing struct {
Model string
PerRequestCost float64 // 每次请求固定费用
PerTokenCost float64 // Token费用
}
func (s *BillingService) CalculateWithRequestFee(model string, tokens int) float64 {
pricing := s.getRequestPricing(model)
return pricing.PerRequestCost + (float64(tokens) / 1_000_000 * pricing.PerTokenCost)
}
```
### 6.2 注意事项
1. **计费准确性**:费用计算必须精确,建议保留更多小数位
2. **并发安全**:余额更新需要原子操作
3. **数据一致性**:缓存和数据库需要定期同步
## 7. 测试覆盖
### 7.1 单元测试
| 测试文件 | 覆盖范围 |
|----------|----------|
| `billing_service_test.go` | 计费计算逻辑 |
| `billing_cache_service_test.go` | 缓存机制 |
| `rate_limiter_test.go` | 速率限制 |
### 7.2 集成测试
| 测试文件 | 场景 |
|----------|------|
| `e2e_gateway_test.go` | 完整计费流程 |
## 8. 监控与运维
### 8.1 关键指标
| 指标 | 告警阈值 | 说明 |
|------|----------|------|
| `billing_balance_zero` | > 10% | 余额为0用户比例 |
| `billing_quota_exhausted` | > 20% | 配额耗尽比例 |
| `billing_rate_limited` | > 15% | 触发限流比例 |
| `billing_cache_hit_rate` | < 90% | 缓存命中率 |
### 8.2 运维任务
| 任务 | 频率 | 说明 |
|------|------|------|
| 余额对账 | 每天 | 核对余额一致性 |
| 用量统计 | 每小时 | 生成统计报表 |
| 异常检测 | 持续 | 检测异常消费模式 |
## 9. 总结
计费与配额模块特点:
- **精细化计费**按模型、Token类型、用户分组等多维度计费
- **实时配额控制**:多级限流保障系统稳定性
- **高性能缓存**:两级缓存确保高并发下的性能
- **余额保护**:支持少量超支,平衡用户体验和风险
**潜在改进点:**
1. 可增加更灵活的计费规则(如包月套餐)
2. 可增加更详细的费用分析报表
**修改建议:**
- 价格调整需要同步更新配置
- 限流参数可根据实际流量调整
---
*文档版本1.1*
*最后更新2026-03-23*
*分析基于Sub2API v0.1.104*
*修正内容用量记录位置gateway_service.go、配额检查流程前置验证、计费原子操作*