Files
tokens-reef/deploy/docs-backup/MODULE_07_USAGE.md
Developer 349d783fd1 refactor: clean up project structure
- Remove old review reports (keep latest only)
- Move docs/ to deploy/docs-backup/
- Move performance-testing/ to deploy/
- Clean up test output files
- Organize root directory
2026-04-06 23:36:03 +08:00

12 KiB
Raw Permalink Blame History

Sub2API 模块分析报告:用量统计与日志模块

1. 模块概述

1.1 模块定位

用量统计与日志模块是Sub2API的数据核心负责记录、存储和分析所有API请求的详细信息。该模块为计费、监控、审计和数据分析提供基础数据支撑。

1.2 核心职责

  • 用量记录记录每次请求的Token消耗和费用
  • 日志存储:存储详细的请求和响应日志
  • 统计分析:生成使用量、费用、趋势等报表
  • 数据导出:支持数据导出和第三方集成

2. 代码结构分析

2.1 核心文件

文件路径 职责 代码行数
repository/usage_log_repo.go 用量数据访问层 ~4000行
repository/usage_billing_repo.go 计费数据访问层 ~2000行
service/usage_log_service.go 用量记录服务 ~800行
handler/usage_handler.go 用量查询API ~400行
handler/admin/usage_handler.go 管理后台用量API ~600行
service/dashboard_service.go 仪表板数据服务 ~600行

2.2 核心数据模型

// 用量日志 - ent/schema/usagelog.go
type UsageLog struct {
    ID              int64
    RequestID       string    // 请求唯一ID
    ClientRequestID  string    // 客户端请求ID
    UserID          int64
    APIKeyID        *int64
    AccountID       int64
    GroupID         int64
    Model           string
    UpstreamModel   string    // 上游实际模型
    RequestType    string    // chat/completion/embedding/image
    IsStream        bool      // 是否流式响应
    InputTokens     int
    OutputTokens    int
    TotalTokens     int
    Cost            float64
    Currency        string
    Status          string    // success/error/canceled
    ErrorCode       string
    ErrorMessage    string
    DurationMs      int
    Endpoint        string    // 请求端点
    IPAddress       string    // 客户端IP
    UserAgent      string
    CreatedAt       time.Time
}

// 用量统计 - 聚合数据
type UsageStats struct {
    UserID       int64
    APIKeyID     int64
    GroupID      int64
    Model        string
    TotalTokens  int
    TotalCost    float64
    RequestCount int
    AvgLatency   int
    Period       string    // daily/hourly
}

3. 功能详细分析

3.1 用量记录流程

// service/usage_log_service.go - RecordUsage
func (s *UsageLogService) RecordUsage(ctx context.Context, params RecordUsageParams) error {
    // 1. 创建用量记录
    log := &UsageLog{
        RequestID:      params.RequestID,
        ClientRequestID: params.ClientRequestID,
        UserID:         params.UserID,
        APIKeyID:       params.APIKeyID,
        AccountID:      params.AccountID,
        GroupID:        params.GroupID,
        Model:          params.Model,
        UpstreamModel:  params.UpstreamModel,
        RequestType:    params.RequestType,
        InputTokens:    params.InputTokens,
        OutputTokens:   params.OutputTokens,
        Cost:           params.Cost,
        Status:         params.Status,
        DurationMs:     params.DurationMs,
    }
    
    // 2. 批量写入(提高性能)
    return s.batchWrite(ctx, log)
}

3.2 数据存储策略

3.2.1 分表策略

// 按时间分表(支持归档)
func (r *UsageLogRepository) getTableName(startTime time.Time) string {
    year := startTime.Year()
    month := startTime.Month()
    
    // 格式usage_logs_2024_01
    return fmt.Sprintf("usage_logs_%d_%02d", year, month)
}

// 分表查询
func (r *UsageLogRepository) QueryByTimeRange(startTime, endTime time.Time) ([]UsageLog, error) {
    var allLogs []UsageLog
    
    // 按月遍历
    for current := startTime; current.Before(endTime); current = current.AddMonth(1) {
        logs := r.queryTable(r.getTableName(current), startTime, endTime)
        allLogs = append(allLogs, logs...)
    }
    
    return allLogs, nil
}

3.2.2 索引优化

-- 关键索引
CREATE INDEX idx_usage_logs_user_id_time ON usage_logs(user_id, created_at);
CREATE INDEX idx_usage_logs_apikey_id_time ON usage_logs(api_key_id, created_at);
CREATE INDEX idx_usage_logs_group_id_time ON usage_logs(group_id, created_at);
CREATE INDEX idx_usage_logs_model_time ON usage_logs(model, created_at);

3.3 统计分析

3.3.1 用户用量统计

// handler/usage_handler.go - GetUserUsageStats
func (h *UsageHandler) GetUserUsageStats(c *gin.Context) {
    userID := c.GetInt64("user_id")
    startDate := c.Query("start_date")
    endDate := c.Query("end_date")
    
    // 获取时间范围
    startTime, _ := time.Parse("2006-01-02", startDate)
    endTime, _ := time.Parse("2006-01-02", endDate)
    
    // 1. 按模型统计
    byModel, _ := h.usageService.GetStatsByModel(userID, startTime, endTime)
    
    // 2. 按时间统计
    byDay, _ := h.usageService.GetStatsByDay(userID, startTime, endTime)
    
    // 3. 汇总统计
    total, _ := h.usageService.GetTotalUsage(userID, startTime, endTime)
    
    c.JSON(200, gin.H{
        "by_model": byModel,
        "by_day":   byDay,
        "total":    total,
    })
}

3.3.2 实时统计

// 用量实时统计从Redis
func (s *UsageLogService) GetRealtimeStats() *RealtimeStats {
    // 从Redis获取实时数据
    totalRequests := s.redis.Get("realtime:requests:total")
    totalTokens := s.redis.Get("realtime:tokens:total")
    totalCost := s.redis.Get("realtime:cost:total")
    
    // 计算实时QPS
    qps := s.redis.Get("realtime:qps")
    
    return &RealtimeStats{
        TotalRequests: totalRequests,
        TotalTokens:   totalTokens,
        TotalCost:     totalCost,
        QPS:           qps,
    }
}

3.4 数据导出

3.4.1 CSV导出

// handler/admin/usage_handler.go - ExportCSV
func (h *AdminUsageHandler) ExportCSV(c *gin.Context) {
    // 获取查询参数
    startDate := c.Query("start_date")
    endDate := c.Query("end_date")
    userID := c.Query("user_id")
    
    // 查询数据
    logs, _ := h.usageService.QueryLogs(startDate, endDate, userID)
    
    // 生成CSV
    writer := csv.NewWriter(c.Writer)
    defer writer.Flush()
    
    // 写入表头
    writer.Write([]string{
        "时间", "用户", "模型", "输入Token", 
        "输出Token", "总Token", "费用", "状态"
    })
    
    // 写入数据行
    for _, log := range logs {
        writer.Write([]string{
            log.CreatedAt.Format("2006-01-02 15:04:05"),
            fmt.Sprintf("%d", log.UserID),
            log.Model,
            fmt.Sprintf("%d", log.InputTokens),
            fmt.Sprintf("%d", log.OutputTokens),
            fmt.Sprintf("%d", log.TotalTokens),
            fmt.Sprintf("%.4f", log.Cost),
            log.Status,
        })
    }
    
    c.Header("Content-Type", "text/csv")
    c.Header("Content-Disposition", "attachment; filename=usage.csv")
}

4. 性能优化

4.1 写入优化

// 批量写入
func (r *UsageLogRepository) BatchWrite(logs []*UsageLog) error {
    // 1. 批量插入每批1000条
    const batchSize = 1000
    
    for i := 0; i < len(logs); i += batchSize {
        end := i + batchSize
        if end > len(logs) {
            end = len(logs)
        }
        
        batch := logs[i:end]
        if err := r.insertBatch(batch); err != nil {
            return err
        }
    }
    
    return nil
}

4.2 查询优化

// 预聚合查询
func (r *UsageLogRepository) GetPreAggregatedStats(groupID int64, startTime, endTime time.Time) (*UsageStats, error) {
    // 使用预聚合表(按天聚合)
    query := `
        SELECT 
            user_id, 
            SUM(total_tokens) as total_tokens,
            SUM(cost) as total_cost,
            COUNT(*) as request_count
        FROM usage_logs_daily
        WHERE group_id = ? AND date BETWEEN ? AND ?
        GROUP BY user_id
    `
    
    return r.queryStats(query, groupID, startTime, endTime)
}

4.3 缓存策略

// 热点数据缓存
func (s *UsageCacheService) GetUserTodayStats(userID int64) (*UserTodayStats, error) {
    // 1. 先查Redis缓存
    cacheKey := fmt.Sprintf("usage:today:%d", userID)
    if cached := s.redis.Get(cacheKey); cached != nil {
        return cached, nil
    }
    
    // 2. 缓存未命中,查询数据库
    stats := s.queryTodayStats(userID)
    
    // 3. 写入缓存TTL: 1分钟
    s.redis.Set(cacheKey, stats, 1*time.Minute)
    
    return stats, nil
}

5. 配置参数

5.1 用量配置config.yaml

usage:
  # 存储配置
  storage:
    retention_days: 90        # 数据保留天数
    partition_interval: "monthly"  # 分表间隔
    batch_size: 1000         # 批量写入大小
    
  # 缓存配置
  cache:
    enabled: true
    ttl: 1m                 # 缓存TTL
    
  # 导出配置
  export:
    max_rows: 100000         # 每次最大导出行数
    timeout: 300s           # 导出超时时间

5.2 数据库配置

database:
  # PostgreSQL连接池
  pool:
    max_connections: 50
    min_connections: 10
    max_lifetime: 3600s
    
  # 慢查询日志
  slow_query_threshold: 1s

6. 修改和扩展指南

6.1 常见修改场景

场景1调整数据保留期

// 修改归档策略
func (r *UsageLogRepository) ArchiveOldData() error {
    // 保留90天之前的归档到冷存储
    cutoff := time.Now().AddDate(0, 0, -90)
    
    logs, _ := r.QueryBefore(cutoff)
    
    // 导出到S3/OSS
    for _, batch := range batchLogs(logs, 10000) {
        uploadToColdStorage(batch)
    }
    
    // 删除原数据
    return r.DeleteBefore(cutoff)
}

场景2添加新的统计维度

// 添加端点统计
func (s *UsageLogService) GetStatsByEndpoint(userID int64, startTime, endTime time.Time) (map[string]*EndpointStats, error) {
    query := `
        SELECT endpoint, 
               SUM(total_tokens) as tokens,
               COUNT(*) as requests,
               SUM(cost) as cost
        FROM usage_logs
        WHERE user_id = ? AND created_at BETWEEN ? AND ?
        GROUP BY endpoint
    `
    
    return s.queryEndpointStats(query, userID, startTime, endTime)
}

6.2 注意事项

  1. 数据量:生产环境数据量可能很大,需要分表和索引优化
  2. 写入性能:高并发写入需要批量处理
  3. 查询性能:复杂统计查询需要预聚合

7. 测试覆盖

7.1 单元测试

测试文件 覆盖范围
usage_log_repo_test.go 用量记录CRUD
dashboard_service_test.go 统计分析

8. 监控与运维

8.1 关键指标

指标 告警阈值 说明
usage_log_size - 用量日志表大小
usage_write_latency > 100ms 写入延迟
usage_query_slow > 1s 慢查询数量

8.2 运维任务

任务 频率 说明
数据归档 每天 归档历史数据
索引优化 每周 检查索引使用情况
慢查询分析 每周 分析慢查询并优化

9. 总结

用量统计与日志模块特点:

  • 完整记录:详细的请求日志支持多种分析场景
  • 高性能存储:分表和批量写入支持高并发
  • 灵活统计:支持多维度数据统计
  • 数据导出支持CSV导出和API查询

潜在改进点:

  1. 可增加实时流处理
  2. 可增加更丰富的可视化图表

修改建议:

  • 数据保留期调整需要考虑存储成本
  • 统计查询优化需要结合实际查询模式

文档版本1.0 最后更新2025-01 分析基于Sub2API v0.1.104