- Remove old review reports (keep latest only) - Move docs/ to deploy/docs-backup/ - Move performance-testing/ to deploy/ - Clean up test output files - Organize root directory
12 KiB
12 KiB
Sub2API 模块分析报告:用量统计与日志模块
1. 模块概述
1.1 模块定位
用量统计与日志模块是Sub2API的数据核心,负责记录、存储和分析所有API请求的详细信息。该模块为计费、监控、审计和数据分析提供基础数据支撑。
1.2 核心职责
- 用量记录:记录每次请求的Token消耗和费用
- 日志存储:存储详细的请求和响应日志
- 统计分析:生成使用量、费用、趋势等报表
- 数据导出:支持数据导出和第三方集成
2. 代码结构分析
2.1 核心文件
| 文件路径 | 职责 | 代码行数 |
|---|---|---|
repository/usage_log_repo.go |
用量数据访问层 | ~4000行 |
repository/usage_billing_repo.go |
计费数据访问层 | ~2000行 |
service/usage_log_service.go |
用量记录服务 | ~800行 |
handler/usage_handler.go |
用量查询API | ~400行 |
handler/admin/usage_handler.go |
管理后台用量API | ~600行 |
service/dashboard_service.go |
仪表板数据服务 | ~600行 |
2.2 核心数据模型
// 用量日志 - ent/schema/usagelog.go
type UsageLog struct {
ID int64
RequestID string // 请求唯一ID
ClientRequestID string // 客户端请求ID
UserID int64
APIKeyID *int64
AccountID int64
GroupID int64
Model string
UpstreamModel string // 上游实际模型
RequestType string // chat/completion/embedding/image
IsStream bool // 是否流式响应
InputTokens int
OutputTokens int
TotalTokens int
Cost float64
Currency string
Status string // success/error/canceled
ErrorCode string
ErrorMessage string
DurationMs int
Endpoint string // 请求端点
IPAddress string // 客户端IP
UserAgent string
CreatedAt time.Time
}
// 用量统计 - 聚合数据
type UsageStats struct {
UserID int64
APIKeyID int64
GroupID int64
Model string
TotalTokens int
TotalCost float64
RequestCount int
AvgLatency int
Period string // daily/hourly
}
3. 功能详细分析
3.1 用量记录流程
// service/usage_log_service.go - RecordUsage
func (s *UsageLogService) RecordUsage(ctx context.Context, params RecordUsageParams) error {
// 1. 创建用量记录
log := &UsageLog{
RequestID: params.RequestID,
ClientRequestID: params.ClientRequestID,
UserID: params.UserID,
APIKeyID: params.APIKeyID,
AccountID: params.AccountID,
GroupID: params.GroupID,
Model: params.Model,
UpstreamModel: params.UpstreamModel,
RequestType: params.RequestType,
InputTokens: params.InputTokens,
OutputTokens: params.OutputTokens,
Cost: params.Cost,
Status: params.Status,
DurationMs: params.DurationMs,
}
// 2. 批量写入(提高性能)
return s.batchWrite(ctx, log)
}
3.2 数据存储策略
3.2.1 分表策略
// 按时间分表(支持归档)
func (r *UsageLogRepository) getTableName(startTime time.Time) string {
year := startTime.Year()
month := startTime.Month()
// 格式:usage_logs_2024_01
return fmt.Sprintf("usage_logs_%d_%02d", year, month)
}
// 分表查询
func (r *UsageLogRepository) QueryByTimeRange(startTime, endTime time.Time) ([]UsageLog, error) {
var allLogs []UsageLog
// 按月遍历
for current := startTime; current.Before(endTime); current = current.AddMonth(1) {
logs := r.queryTable(r.getTableName(current), startTime, endTime)
allLogs = append(allLogs, logs...)
}
return allLogs, nil
}
3.2.2 索引优化
-- 关键索引
CREATE INDEX idx_usage_logs_user_id_time ON usage_logs(user_id, created_at);
CREATE INDEX idx_usage_logs_apikey_id_time ON usage_logs(api_key_id, created_at);
CREATE INDEX idx_usage_logs_group_id_time ON usage_logs(group_id, created_at);
CREATE INDEX idx_usage_logs_model_time ON usage_logs(model, created_at);
3.3 统计分析
3.3.1 用户用量统计
// handler/usage_handler.go - GetUserUsageStats
func (h *UsageHandler) GetUserUsageStats(c *gin.Context) {
userID := c.GetInt64("user_id")
startDate := c.Query("start_date")
endDate := c.Query("end_date")
// 获取时间范围
startTime, _ := time.Parse("2006-01-02", startDate)
endTime, _ := time.Parse("2006-01-02", endDate)
// 1. 按模型统计
byModel, _ := h.usageService.GetStatsByModel(userID, startTime, endTime)
// 2. 按时间统计
byDay, _ := h.usageService.GetStatsByDay(userID, startTime, endTime)
// 3. 汇总统计
total, _ := h.usageService.GetTotalUsage(userID, startTime, endTime)
c.JSON(200, gin.H{
"by_model": byModel,
"by_day": byDay,
"total": total,
})
}
3.3.2 实时统计
// 用量实时统计(从Redis)
func (s *UsageLogService) GetRealtimeStats() *RealtimeStats {
// 从Redis获取实时数据
totalRequests := s.redis.Get("realtime:requests:total")
totalTokens := s.redis.Get("realtime:tokens:total")
totalCost := s.redis.Get("realtime:cost:total")
// 计算实时QPS
qps := s.redis.Get("realtime:qps")
return &RealtimeStats{
TotalRequests: totalRequests,
TotalTokens: totalTokens,
TotalCost: totalCost,
QPS: qps,
}
}
3.4 数据导出
3.4.1 CSV导出
// handler/admin/usage_handler.go - ExportCSV
func (h *AdminUsageHandler) ExportCSV(c *gin.Context) {
// 获取查询参数
startDate := c.Query("start_date")
endDate := c.Query("end_date")
userID := c.Query("user_id")
// 查询数据
logs, _ := h.usageService.QueryLogs(startDate, endDate, userID)
// 生成CSV
writer := csv.NewWriter(c.Writer)
defer writer.Flush()
// 写入表头
writer.Write([]string{
"时间", "用户", "模型", "输入Token",
"输出Token", "总Token", "费用", "状态"
})
// 写入数据行
for _, log := range logs {
writer.Write([]string{
log.CreatedAt.Format("2006-01-02 15:04:05"),
fmt.Sprintf("%d", log.UserID),
log.Model,
fmt.Sprintf("%d", log.InputTokens),
fmt.Sprintf("%d", log.OutputTokens),
fmt.Sprintf("%d", log.TotalTokens),
fmt.Sprintf("%.4f", log.Cost),
log.Status,
})
}
c.Header("Content-Type", "text/csv")
c.Header("Content-Disposition", "attachment; filename=usage.csv")
}
4. 性能优化
4.1 写入优化
// 批量写入
func (r *UsageLogRepository) BatchWrite(logs []*UsageLog) error {
// 1. 批量插入(每批1000条)
const batchSize = 1000
for i := 0; i < len(logs); i += batchSize {
end := i + batchSize
if end > len(logs) {
end = len(logs)
}
batch := logs[i:end]
if err := r.insertBatch(batch); err != nil {
return err
}
}
return nil
}
4.2 查询优化
// 预聚合查询
func (r *UsageLogRepository) GetPreAggregatedStats(groupID int64, startTime, endTime time.Time) (*UsageStats, error) {
// 使用预聚合表(按天聚合)
query := `
SELECT
user_id,
SUM(total_tokens) as total_tokens,
SUM(cost) as total_cost,
COUNT(*) as request_count
FROM usage_logs_daily
WHERE group_id = ? AND date BETWEEN ? AND ?
GROUP BY user_id
`
return r.queryStats(query, groupID, startTime, endTime)
}
4.3 缓存策略
// 热点数据缓存
func (s *UsageCacheService) GetUserTodayStats(userID int64) (*UserTodayStats, error) {
// 1. 先查Redis缓存
cacheKey := fmt.Sprintf("usage:today:%d", userID)
if cached := s.redis.Get(cacheKey); cached != nil {
return cached, nil
}
// 2. 缓存未命中,查询数据库
stats := s.queryTodayStats(userID)
// 3. 写入缓存(TTL: 1分钟)
s.redis.Set(cacheKey, stats, 1*time.Minute)
return stats, nil
}
5. 配置参数
5.1 用量配置(config.yaml)
usage:
# 存储配置
storage:
retention_days: 90 # 数据保留天数
partition_interval: "monthly" # 分表间隔
batch_size: 1000 # 批量写入大小
# 缓存配置
cache:
enabled: true
ttl: 1m # 缓存TTL
# 导出配置
export:
max_rows: 100000 # 每次最大导出行数
timeout: 300s # 导出超时时间
5.2 数据库配置
database:
# PostgreSQL连接池
pool:
max_connections: 50
min_connections: 10
max_lifetime: 3600s
# 慢查询日志
slow_query_threshold: 1s
6. 修改和扩展指南
6.1 常见修改场景
场景1:调整数据保留期
// 修改归档策略
func (r *UsageLogRepository) ArchiveOldData() error {
// 保留90天,之前的归档到冷存储
cutoff := time.Now().AddDate(0, 0, -90)
logs, _ := r.QueryBefore(cutoff)
// 导出到S3/OSS
for _, batch := range batchLogs(logs, 10000) {
uploadToColdStorage(batch)
}
// 删除原数据
return r.DeleteBefore(cutoff)
}
场景2:添加新的统计维度
// 添加端点统计
func (s *UsageLogService) GetStatsByEndpoint(userID int64, startTime, endTime time.Time) (map[string]*EndpointStats, error) {
query := `
SELECT endpoint,
SUM(total_tokens) as tokens,
COUNT(*) as requests,
SUM(cost) as cost
FROM usage_logs
WHERE user_id = ? AND created_at BETWEEN ? AND ?
GROUP BY endpoint
`
return s.queryEndpointStats(query, userID, startTime, endTime)
}
6.2 注意事项
- 数据量:生产环境数据量可能很大,需要分表和索引优化
- 写入性能:高并发写入需要批量处理
- 查询性能:复杂统计查询需要预聚合
7. 测试覆盖
7.1 单元测试
| 测试文件 | 覆盖范围 |
|---|---|
usage_log_repo_test.go |
用量记录CRUD |
dashboard_service_test.go |
统计分析 |
8. 监控与运维
8.1 关键指标
| 指标 | 告警阈值 | 说明 |
|---|---|---|
usage_log_size |
- | 用量日志表大小 |
usage_write_latency |
> 100ms | 写入延迟 |
usage_query_slow |
> 1s | 慢查询数量 |
8.2 运维任务
| 任务 | 频率 | 说明 |
|---|---|---|
| 数据归档 | 每天 | 归档历史数据 |
| 索引优化 | 每周 | 检查索引使用情况 |
| 慢查询分析 | 每周 | 分析慢查询并优化 |
9. 总结
用量统计与日志模块特点:
- 完整记录:详细的请求日志支持多种分析场景
- 高性能存储:分表和批量写入支持高并发
- 灵活统计:支持多维度数据统计
- 数据导出:支持CSV导出和API查询
潜在改进点:
- 可增加实时流处理
- 可增加更丰富的可视化图表
修改建议:
- 数据保留期调整需要考虑存储成本
- 统计查询优化需要结合实际查询模式
文档版本:1.0 最后更新:2025-01 分析基于:Sub2API v0.1.104