Files
sub2api-cn-relay-manager/backend/internal/service/content_moderation.go

2049 lines
67 KiB
Go
Raw Normal View History

package service
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
)
const (
ContentModerationModeOff = "off"
ContentModerationModeObserve = "observe"
ContentModerationModePreBlock = "pre_block"
2026-05-07 14:31:19 +08:00
contentModerationAPIKeysModeAppend = "append"
contentModerationAPIKeysModeReplace = "replace"
ContentModerationActionAllow = "allow"
ContentModerationActionBlock = "block"
ContentModerationActionHashBlock = "hash_block"
ContentModerationActionError = "error"
ContentModerationProtocolAnthropicMessages = "anthropic_messages"
ContentModerationProtocolOpenAIResponses = "openai_responses"
ContentModerationProtocolOpenAIChat = "openai_chat_completions"
ContentModerationProtocolGemini = "gemini"
ContentModerationProtocolOpenAIImages = "openai_images"
defaultContentModerationBaseURL = "https://api.openai.com"
defaultContentModerationModel = "omni-moderation-latest"
defaultContentModerationTimeoutMS = 3000
maxContentModerationTimeoutMS = 30000
maxModerationInputRunes = 12000
maxModerationExcerptRunes = 240
defaultContentModerationWorkerCount = 4
maxContentModerationWorkerCount = 32
defaultContentModerationQueueSize = 32768
maxContentModerationQueueSize = 100000
defaultContentModerationBanThreshold = 10
defaultContentModerationViolationWindowHours = 720
defaultContentModerationBlockHTTPStatus = http.StatusForbidden
defaultContentModerationBlockMessage = "内容审计命中风险规则,请调整输入后重试"
defaultContentModerationRetryCount = 2
maxContentModerationRetryCount = 5
defaultContentModerationHitRetentionDays = 180
defaultContentModerationNonHitRetentionDays = 3
maxContentModerationRetentionDays = 3650
maxContentModerationNonHitRetentionDays = 3
2026-05-07 14:31:19 +08:00
contentModerationKeyRateLimitFreezeDuration = time.Minute
contentModerationKeyAuthFreezeDuration = 10 * time.Minute
contentModerationKeyHTTPErrorFreezeDuration = 10 * time.Second
maxContentModerationInputImages = 1
maxContentModerationTestImages = maxContentModerationInputImages
maxContentModerationTestImageBytes = 8 * 1024 * 1024
maxContentModerationTestImageDataURLBytes = 12 * 1024 * 1024
contentModerationCleanupInterval = 24 * time.Hour
contentModerationCleanupTimeout = 30 * time.Minute
contentModerationCleanupDelay = 5 * time.Minute
)
var contentModerationCategoryOrder = []string{
"harassment",
"harassment/threatening",
"hate",
"hate/threatening",
"illicit",
"illicit/violent",
"self-harm",
"self-harm/intent",
"self-harm/instructions",
"sexual",
"sexual/minors",
"violence",
"violence/graphic",
}
func ContentModerationDefaultThresholds() map[string]float64 {
return map[string]float64{
"harassment": 0.98,
"harassment/threatening": 0.90,
"hate": 0.65,
"hate/threatening": 0.65,
"illicit": 0.95,
"illicit/violent": 0.95,
"self-harm": 0.65,
"self-harm/intent": 0.85,
"self-harm/instructions": 0.65,
"sexual": 0.65,
"sexual/minors": 0.65,
"violence": 0.95,
"violence/graphic": 0.95,
}
}
func ContentModerationCategories() []string {
out := make([]string, len(contentModerationCategoryOrder))
copy(out, contentModerationCategoryOrder)
return out
}
type ContentModerationConfig struct {
Enabled bool `json:"enabled"`
Mode string `json:"mode"`
BaseURL string `json:"base_url"`
Model string `json:"model"`
APIKey string `json:"api_key,omitempty"`
APIKeys []string `json:"api_keys,omitempty"`
TimeoutMS int `json:"timeout_ms"`
SampleRate int `json:"sample_rate"`
AllGroups bool `json:"all_groups"`
GroupIDs []int64 `json:"group_ids"`
RecordNonHits bool `json:"record_non_hits"`
Thresholds map[string]float64 `json:"thresholds"`
WorkerCount int `json:"worker_count"`
QueueSize int `json:"queue_size"`
BlockStatus int `json:"block_status"`
BlockMessage string `json:"block_message"`
EmailOnHit bool `json:"email_on_hit"`
AutoBanEnabled bool `json:"auto_ban_enabled"`
BanThreshold int `json:"ban_threshold"`
ViolationWindowHours int `json:"violation_window_hours"`
RetryCount int `json:"retry_count"`
HitRetentionDays int `json:"hit_retention_days"`
NonHitRetentionDays int `json:"non_hit_retention_days"`
PreHashCheckEnabled bool `json:"pre_hash_check_enabled"`
}
type ContentModerationConfigView struct {
Enabled bool `json:"enabled"`
Mode string `json:"mode"`
BaseURL string `json:"base_url"`
Model string `json:"model"`
APIKeyConfigured bool `json:"api_key_configured"`
APIKeyMasked string `json:"api_key_masked"`
APIKeyCount int `json:"api_key_count"`
APIKeyMasks []string `json:"api_key_masks"`
APIKeyStatuses []ContentModerationAPIKeyStatus `json:"api_key_statuses"`
TimeoutMS int `json:"timeout_ms"`
SampleRate int `json:"sample_rate"`
AllGroups bool `json:"all_groups"`
GroupIDs []int64 `json:"group_ids"`
RecordNonHits bool `json:"record_non_hits"`
WorkerCount int `json:"worker_count"`
QueueSize int `json:"queue_size"`
BlockStatus int `json:"block_status"`
BlockMessage string `json:"block_message"`
EmailOnHit bool `json:"email_on_hit"`
AutoBanEnabled bool `json:"auto_ban_enabled"`
BanThreshold int `json:"ban_threshold"`
ViolationWindowHours int `json:"violation_window_hours"`
RetryCount int `json:"retry_count"`
HitRetentionDays int `json:"hit_retention_days"`
NonHitRetentionDays int `json:"non_hit_retention_days"`
PreHashCheckEnabled bool `json:"pre_hash_check_enabled"`
}
type ContentModerationAPIKeyStatus struct {
Index int `json:"index"`
KeyHash string `json:"key_hash"`
Masked string `json:"masked"`
Status string `json:"status"`
FailureCount int `json:"failure_count"`
SuccessCount int64 `json:"success_count"`
LastError string `json:"last_error"`
LastCheckedAt *time.Time `json:"last_checked_at,omitempty"`
FrozenUntil *time.Time `json:"frozen_until,omitempty"`
LastLatencyMS int `json:"last_latency_ms"`
LastHTTPStatus int `json:"last_http_status"`
LastTested bool `json:"last_tested"`
Configured bool `json:"configured"`
}
type TestContentModerationAPIKeysInput struct {
APIKeys []string `json:"api_keys"`
BaseURL string `json:"base_url"`
Model string `json:"model"`
TimeoutMS int `json:"timeout_ms"`
Prompt string `json:"prompt"`
Images []string `json:"images"`
}
type TestContentModerationAPIKeysResult struct {
Items []ContentModerationAPIKeyStatus `json:"items"`
AuditResult *ContentModerationTestAuditResult `json:"audit_result,omitempty"`
ImageCount int `json:"image_count"`
}
type ContentModerationTestAuditResult struct {
Flagged bool `json:"flagged"`
HighestCategory string `json:"highest_category"`
HighestScore float64 `json:"highest_score"`
CompositeScore float64 `json:"composite_score"`
CategoryScores map[string]float64 `json:"category_scores"`
Thresholds map[string]float64 `json:"thresholds"`
}
type UpdateContentModerationConfigInput struct {
Enabled *bool `json:"enabled"`
Mode *string `json:"mode"`
BaseURL *string `json:"base_url"`
Model *string `json:"model"`
APIKey *string `json:"api_key"`
APIKeys *[]string `json:"api_keys"`
2026-05-07 14:31:19 +08:00
APIKeysMode string `json:"api_keys_mode"`
DeleteAPIKeyHashes *[]string `json:"delete_api_key_hashes"`
ClearAPIKey bool `json:"clear_api_key"`
TimeoutMS *int `json:"timeout_ms"`
SampleRate *int `json:"sample_rate"`
AllGroups *bool `json:"all_groups"`
GroupIDs *[]int64 `json:"group_ids"`
RecordNonHits *bool `json:"record_non_hits"`
WorkerCount *int `json:"worker_count"`
QueueSize *int `json:"queue_size"`
BlockStatus *int `json:"block_status"`
BlockMessage *string `json:"block_message"`
EmailOnHit *bool `json:"email_on_hit"`
AutoBanEnabled *bool `json:"auto_ban_enabled"`
BanThreshold *int `json:"ban_threshold"`
ViolationWindowHours *int `json:"violation_window_hours"`
RetryCount *int `json:"retry_count"`
HitRetentionDays *int `json:"hit_retention_days"`
NonHitRetentionDays *int `json:"non_hit_retention_days"`
PreHashCheckEnabled *bool `json:"pre_hash_check_enabled"`
}
type ContentModerationCheckInput struct {
RequestID string
UserID int64
UserEmail string
APIKeyID int64
APIKeyName string
GroupID *int64
GroupName string
Endpoint string
Provider string
Model string
Protocol string
Body []byte
}
type ContentModerationInput struct {
Text string
Images []string
}
func (in *ContentModerationInput) Normalize() {
if in == nil {
return
}
in.Text = trimRunes(normalizeContentModerationText(in.Text), maxModerationInputRunes)
2026-05-07 15:11:38 +08:00
in.Images = normalizeModerationImages(in.Images)
}
func (in ContentModerationInput) IsEmpty() bool {
return strings.TrimSpace(in.Text) == "" && len(in.Images) == 0
}
func (in ContentModerationInput) ModerationInput() any {
2026-05-07 14:31:19 +08:00
images := limitContentModerationImages(in.Images)
if len(images) == 0 {
return in.Text
}
2026-05-07 14:31:19 +08:00
parts := make([]moderationAPIInputPart, 0, len(images)+1)
if strings.TrimSpace(in.Text) != "" {
parts = append(parts, moderationAPIInputPart{Type: "text", Text: in.Text})
}
2026-05-07 14:31:19 +08:00
for _, image := range images {
parts = append(parts, moderationAPIInputPart{
Type: "image_url",
ImageURL: &moderationAPIImageURLRef{URL: image},
})
}
return parts
}
func (in ContentModerationInput) ExcerptText() string {
return in.Text
}
func (in ContentModerationInput) Hash() string {
h := sha256.New()
_, _ = h.Write([]byte("text:"))
_, _ = h.Write([]byte(in.Text))
for _, image := range in.Images {
imageHash := sha256.Sum256([]byte(image))
_, _ = h.Write([]byte("\nimage:"))
_, _ = h.Write([]byte(hex.EncodeToString(imageHash[:])))
}
return hex.EncodeToString(h.Sum(nil))
}
type ContentModerationDecision struct {
Allowed bool `json:"allowed"`
Blocked bool `json:"blocked"`
Flagged bool `json:"flagged"`
Message string `json:"message"`
StatusCode int `json:"status_code"`
InputHash string `json:"input_hash,omitempty"`
HighestCategory string `json:"highest_category"`
HighestScore float64 `json:"highest_score"`
CategoryScores map[string]float64 `json:"category_scores"`
Action string `json:"action"`
}
type ContentModerationLog struct {
ID int64 `json:"id"`
RequestID string `json:"request_id"`
UserID *int64 `json:"user_id,omitempty"`
UserEmail string `json:"user_email"`
APIKeyID *int64 `json:"api_key_id,omitempty"`
APIKeyName string `json:"api_key_name"`
GroupID *int64 `json:"group_id,omitempty"`
GroupName string `json:"group_name"`
Endpoint string `json:"endpoint"`
Provider string `json:"provider"`
Model string `json:"model"`
Mode string `json:"mode"`
Action string `json:"action"`
Flagged bool `json:"flagged"`
HighestCategory string `json:"highest_category"`
HighestScore float64 `json:"highest_score"`
CategoryScores map[string]float64 `json:"category_scores"`
ThresholdSnapshot map[string]float64 `json:"threshold_snapshot"`
InputExcerpt string `json:"input_excerpt"`
UpstreamLatencyMS *int `json:"upstream_latency_ms,omitempty"`
Error string `json:"error"`
ViolationCount int `json:"violation_count"`
AutoBanned bool `json:"auto_banned"`
EmailSent bool `json:"email_sent"`
UserStatus string `json:"user_status"`
QueueDelayMS *int `json:"queue_delay_ms,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
type ContentModerationLogFilter struct {
Pagination pagination.PaginationParams
Result string
GroupID *int64
Endpoint string
Search string
From *time.Time
To *time.Time
}
type ContentModerationCleanupResult struct {
DeletedHit int64 `json:"deleted_hit"`
DeletedNonHit int64 `json:"deleted_non_hit"`
FinishedAt time.Time `json:"finished_at"`
}
type ContentModerationRuntimeStatus struct {
Enabled bool `json:"enabled"`
RiskControlEnabled bool `json:"risk_control_enabled"`
Mode string `json:"mode"`
WorkerCount int `json:"worker_count"`
MaxWorkers int `json:"max_workers"`
ActiveWorkers int `json:"active_workers"`
IdleWorkers int `json:"idle_workers"`
QueueSize int `json:"queue_size"`
QueueLength int `json:"queue_length"`
QueueUsagePercent float64 `json:"queue_usage_percent"`
Enqueued int64 `json:"enqueued"`
Dropped int64 `json:"dropped"`
Processed int64 `json:"processed"`
Errors int64 `json:"errors"`
APIKeyStatuses []ContentModerationAPIKeyStatus `json:"api_key_statuses"`
FlaggedHashCount int64 `json:"flagged_hash_count"`
LastCleanupAt *time.Time `json:"last_cleanup_at,omitempty"`
LastCleanupDeletedHit int64 `json:"last_cleanup_deleted_hit"`
LastCleanupDeletedNonHit int64 `json:"last_cleanup_deleted_non_hit"`
}
type ContentModerationUnbanUserResult struct {
UserID int64 `json:"user_id"`
Status string `json:"status"`
}
type ContentModerationDeleteHashResult struct {
InputHash string `json:"input_hash"`
Deleted bool `json:"deleted"`
}
type ContentModerationClearHashesResult struct {
Deleted int64 `json:"deleted"`
}
type ContentModerationRepository interface {
CreateLog(ctx context.Context, log *ContentModerationLog) error
ListLogs(ctx context.Context, filter ContentModerationLogFilter) ([]ContentModerationLog, *pagination.PaginationResult, error)
CountFlaggedByUserSince(ctx context.Context, userID int64, since time.Time) (int, error)
CleanupExpiredLogs(ctx context.Context, hitBefore time.Time, nonHitBefore time.Time) (*ContentModerationCleanupResult, error)
}
type ContentModerationHashCache interface {
RecordFlaggedInputHash(ctx context.Context, inputHash string) error
HasFlaggedInputHash(ctx context.Context, inputHash string) (bool, error)
DeleteFlaggedInputHash(ctx context.Context, inputHash string) (bool, error)
ClearFlaggedInputHashes(ctx context.Context) (int64, error)
CountFlaggedInputHashes(ctx context.Context) (int64, error)
}
type ContentModerationService struct {
settingRepo SettingRepository
repo ContentModerationRepository
hashCache ContentModerationHashCache
groupRepo GroupRepository
userRepo UserRepository
authCacheInvalidator APIKeyAuthCacheInvalidator
emailService *EmailService
httpClient *http.Client
asyncQueue chan contentModerationTask
workerCount int
apiKeyCursor atomic.Uint64
asyncActive atomic.Int64
asyncEnqueued atomic.Int64
asyncDropped atomic.Int64
asyncProcessed atomic.Int64
asyncErrors atomic.Int64
lastCleanupUnix atomic.Int64
lastCleanupDeletedHit atomic.Int64
lastCleanupDeletedNonHit atomic.Int64
keyHealthMu sync.Mutex
keyHealth map[string]*contentModerationKeyHealth
}
type contentModerationTask struct {
input ContentModerationCheckInput
content ContentModerationInput
inputHash string
enqueuedAt time.Time
}
type contentModerationKeyHealth struct {
Hash string
Masked string
FailureCount int
SuccessCount int64
LastError string
LastCheckedAt time.Time
FrozenUntil time.Time
LastLatencyMS int
LastHTTPStatus int
LastTested bool
}
func NewContentModerationService(
settingRepo SettingRepository,
repo ContentModerationRepository,
hashCache ContentModerationHashCache,
groupRepo GroupRepository,
userRepo UserRepository,
authCacheInvalidator APIKeyAuthCacheInvalidator,
emailService *EmailService,
) *ContentModerationService {
svc := &ContentModerationService{
settingRepo: settingRepo,
repo: repo,
hashCache: hashCache,
groupRepo: groupRepo,
userRepo: userRepo,
authCacheInvalidator: authCacheInvalidator,
emailService: emailService,
httpClient: &http.Client{},
workerCount: maxContentModerationWorkerCount,
asyncQueue: make(chan contentModerationTask, maxContentModerationQueueSize),
keyHealth: make(map[string]*contentModerationKeyHealth),
}
if settingRepo != nil && repo != nil {
for i := 0; i < svc.workerCount; i++ {
go svc.worker(i)
}
go svc.cleanupWorker()
}
return svc
}
func (s *ContentModerationService) GetConfig(ctx context.Context) (*ContentModerationConfigView, error) {
cfg, err := s.loadConfig(ctx)
if err != nil {
return nil, err
}
return s.configView(cfg), nil
}
func (s *ContentModerationService) UpdateConfig(ctx context.Context, input UpdateContentModerationConfigInput) (*ContentModerationConfigView, error) {
cfg, err := s.loadConfig(ctx)
if err != nil {
return nil, err
}
if input.Enabled != nil {
cfg.Enabled = *input.Enabled
}
if input.Mode != nil {
cfg.Mode = strings.TrimSpace(*input.Mode)
}
if input.BaseURL != nil {
cfg.BaseURL = strings.TrimSpace(*input.BaseURL)
}
if input.Model != nil {
cfg.Model = strings.TrimSpace(*input.Model)
}
if input.TimeoutMS != nil {
cfg.TimeoutMS = *input.TimeoutMS
}
if input.SampleRate != nil {
cfg.SampleRate = *input.SampleRate
}
if input.WorkerCount != nil {
cfg.WorkerCount = *input.WorkerCount
}
if input.QueueSize != nil {
cfg.QueueSize = *input.QueueSize
}
if input.BlockStatus != nil {
cfg.BlockStatus = *input.BlockStatus
}
if input.BlockMessage != nil {
cfg.BlockMessage = strings.TrimSpace(*input.BlockMessage)
}
if input.EmailOnHit != nil {
cfg.EmailOnHit = *input.EmailOnHit
}
if input.AutoBanEnabled != nil {
cfg.AutoBanEnabled = *input.AutoBanEnabled
}
if input.BanThreshold != nil {
cfg.BanThreshold = *input.BanThreshold
}
if input.ViolationWindowHours != nil {
cfg.ViolationWindowHours = *input.ViolationWindowHours
}
if input.RetryCount != nil {
cfg.RetryCount = *input.RetryCount
}
if input.HitRetentionDays != nil {
cfg.HitRetentionDays = *input.HitRetentionDays
}
if input.NonHitRetentionDays != nil {
cfg.NonHitRetentionDays = *input.NonHitRetentionDays
}
if input.PreHashCheckEnabled != nil {
cfg.PreHashCheckEnabled = *input.PreHashCheckEnabled
}
if input.AllGroups != nil {
cfg.AllGroups = *input.AllGroups
}
if input.GroupIDs != nil {
cfg.GroupIDs = normalizeInt64IDs(*input.GroupIDs)
}
if input.RecordNonHits != nil {
cfg.RecordNonHits = *input.RecordNonHits
}
if input.ClearAPIKey {
cfg.APIKey = ""
cfg.APIKeys = []string{}
} else {
2026-05-07 14:31:19 +08:00
apiKeysMode := normalizeContentModerationAPIKeysMode(input.APIKeysMode)
if input.DeleteAPIKeyHashes != nil && apiKeysMode != contentModerationAPIKeysModeReplace {
cfg.APIKeys = deleteModerationAPIKeysByHash(cfg.apiKeys(), *input.DeleteAPIKeyHashes)
cfg.APIKey = ""
}
if input.APIKeys != nil {
2026-05-07 14:31:19 +08:00
if apiKeysMode == contentModerationAPIKeysModeReplace {
cfg.APIKeys = normalizeModerationAPIKeys(*input.APIKeys)
} else {
cfg.APIKeys = normalizeModerationAPIKeys(append(cfg.apiKeys(), *input.APIKeys...))
}
cfg.APIKey = ""
}
if input.APIKey != nil && strings.TrimSpace(*input.APIKey) != "" {
cfg.APIKeys = normalizeModerationAPIKeys(append(cfg.APIKeys, *input.APIKey))
cfg.APIKey = ""
}
}
if err := s.validateConfig(ctx, cfg); err != nil {
return nil, err
}
cfg.normalize()
raw, err := json.Marshal(cfg)
if err != nil {
return nil, fmt.Errorf("marshal content moderation config: %w", err)
}
if err := s.settingRepo.Set(ctx, SettingKeyContentModerationConfig, string(raw)); err != nil {
return nil, fmt.Errorf("save content moderation config: %w", err)
}
return s.configView(cfg), nil
}
func (s *ContentModerationService) TestAPIKeys(ctx context.Context, input TestContentModerationAPIKeysInput) (*TestContentModerationAPIKeysResult, error) {
cfg, err := s.loadConfig(ctx)
if err != nil {
return nil, err
}
keys := normalizeModerationAPIKeys(input.APIKeys)
configured := false
if len(keys) == 0 {
keys = cfg.apiKeys()
configured = true
}
if strings.TrimSpace(input.BaseURL) != "" {
cfg.BaseURL = input.BaseURL
}
if strings.TrimSpace(input.Model) != "" {
cfg.Model = input.Model
}
if input.TimeoutMS > 0 {
cfg.TimeoutMS = input.TimeoutMS
}
cfg.normalize()
testInput, imageCount, err := buildModerationTestInput(input.Prompt, input.Images)
if err != nil {
return nil, err
}
auditOnly := contentModerationTestHasAuditInput(input.Prompt, input.Images)
if configured && auditOnly {
key, ok := s.nextUsableAPIKey(cfg)
if !ok {
return &TestContentModerationAPIKeysResult{
Items: s.apiKeyStatuses(keys),
ImageCount: imageCount,
}, nil
}
keys = []string{key}
}
if len(keys) == 0 {
return &TestContentModerationAPIKeysResult{Items: []ContentModerationAPIKeyStatus{}, ImageCount: imageCount}, nil
}
items := make([]ContentModerationAPIKeyStatus, 0, len(keys))
var auditResult *ContentModerationTestAuditResult
for idx, key := range keys {
start := time.Now()
httpStatus := 0
result, err := s.callModerationOnceWithInput(ctx, cfg, key, testInput, &httpStatus)
latency := int(time.Since(start).Milliseconds())
keyHash := moderationAPIKeyHash(key)
if err != nil {
2026-05-07 14:31:19 +08:00
s.markAPIKeyError(key, err.Error(), latency, httpStatus)
} else {
s.markAPIKeySuccess(key, latency, httpStatus)
if auditResult == nil {
auditResult = buildContentModerationTestAuditResult(result, cfg.Thresholds)
}
}
status := s.apiKeyStatusForHash(idx, keyHash, maskSecretTail(key), configured)
status.LastTested = true
items = append(items, status)
}
return &TestContentModerationAPIKeysResult{Items: items, AuditResult: auditResult, ImageCount: imageCount}, nil
}
func (s *ContentModerationService) Check(ctx context.Context, input ContentModerationCheckInput) (*ContentModerationDecision, error) {
allow := &ContentModerationDecision{Allowed: true, Action: ContentModerationActionAllow}
if s == nil || s.settingRepo == nil || s.repo == nil {
slog.Info("content_moderation.skip_unavailable",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol)
return allow, nil
}
if !s.isRiskControlEnabled(ctx) {
slog.Info("content_moderation.skip_feature_disabled",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol)
return allow, nil
}
cfg, err := s.loadConfig(ctx)
if err != nil {
slog.Warn("content_moderation.skip_config_load_failed",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol,
"error", err)
return allow, nil
}
inScope := cfg.includesGroup(input.GroupID)
slog.Info("content_moderation.config_loaded",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"group_name", input.GroupName,
"endpoint", input.Endpoint,
"provider", input.Provider,
"protocol", input.Protocol,
"model", input.Model,
"enabled", cfg.Enabled,
"mode", cfg.Mode,
"all_groups", cfg.AllGroups,
"configured_group_ids", cfg.GroupIDs,
"in_scope", inScope,
"sample_rate", cfg.SampleRate,
"api_key_count", len(cfg.apiKeys()),
"pre_hash_check_enabled", cfg.PreHashCheckEnabled,
"record_non_hits", cfg.RecordNonHits)
if !cfg.Enabled {
slog.Info("content_moderation.skip_config_disabled",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol)
return allow, nil
}
if cfg.Mode == ContentModerationModeOff {
slog.Info("content_moderation.skip_mode_off",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol)
return allow, nil
}
if !inScope {
slog.Info("content_moderation.skip_group_out_of_scope",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"group_name", input.GroupName,
"endpoint", input.Endpoint,
"protocol", input.Protocol,
"all_groups", cfg.AllGroups,
"configured_group_ids", cfg.GroupIDs)
return allow, nil
}
content := ExtractContentModerationInput(input.Protocol, input.Body)
if content.IsEmpty() {
slog.Info("content_moderation.skip_empty_input",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol,
"body_bytes", len(input.Body))
return allow, nil
}
content.Normalize()
slog.Info("content_moderation.input_extracted",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol,
"text_runes", len([]rune(content.Text)),
"image_count", len(content.Images))
hashText := content.Hash()
if cfg.PreHashCheckEnabled && s.hashCache != nil {
matched, err := s.hashCache.HasFlaggedInputHash(ctx, hashText)
if err != nil {
slog.Warn("content_moderation.hash_check_failed", "user_id", input.UserID, "endpoint", input.Endpoint, "error", err)
}
if matched {
slog.Info("content_moderation.hash_block",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol,
"input_hash", hashText)
message := cfg.BlockMessage
if message != "" {
message = fmt.Sprintf("%shash: %s", message, hashText)
}
return &ContentModerationDecision{
Allowed: false,
Blocked: true,
Flagged: true,
Message: message,
StatusCode: cfg.BlockStatus,
InputHash: hashText,
Action: ContentModerationActionHashBlock,
}, nil
}
}
if !cfg.shouldSample(hashText) {
slog.Info("content_moderation.skip_sample_rate",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol,
"sample_rate", cfg.SampleRate)
return allow, nil
}
if len(cfg.apiKeys()) == 0 {
slog.Warn("content_moderation.skip_no_audit_api_keys",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol)
return allow, nil
}
if cfg.Mode == ContentModerationModeObserve {
slog.Info("content_moderation.enqueue_observe",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol,
"queue_len", len(s.asyncQueue))
s.enqueueAsync(input, cfg, content, hashText)
return allow, nil
}
return s.checkSync(ctx, input, cfg, content, hashText, nil, true), nil
}
func (s *ContentModerationService) checkSync(ctx context.Context, input ContentModerationCheckInput, cfg *ContentModerationConfig, content ContentModerationInput, hashText string, queueDelay *int, allowBlock bool) *ContentModerationDecision {
allow := &ContentModerationDecision{Allowed: true, Action: ContentModerationActionAllow}
start := time.Now()
result, err := s.callModeration(ctx, cfg, content.ModerationInput())
latency := int(time.Since(start).Milliseconds())
if err != nil {
slog.Warn("content_moderation.audit_api_failed",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"endpoint", input.Endpoint,
"protocol", input.Protocol,
"mode", cfg.Mode,
"allow_block", allowBlock,
"queue_delay_ms", queueDelay,
"latency_ms", latency,
"error", err)
if queueDelay != nil {
s.asyncErrors.Add(1)
}
if cfg.RecordNonHits {
log := s.buildLog(input, cfg, ContentModerationActionError, false, "", 0, nil, content.ExcerptText(), &latency, queueDelay, err.Error())
_ = s.repo.CreateLog(ctx, log)
}
return allow
}
flagged, highestCategory, highestScore := evaluateModerationScores(result.CategoryScores, cfg.Thresholds)
action := ContentModerationActionAllow
blocked := false
if allowBlock && flagged && cfg.Mode == ContentModerationModePreBlock {
action = ContentModerationActionBlock
blocked = true
}
slog.Info("content_moderation.audit_result",
"user_id", input.UserID,
"api_key_id", input.APIKeyID,
"group_id", contentModerationLogGroupID(input.GroupID),
"group_name", input.GroupName,
"endpoint", input.Endpoint,
"protocol", input.Protocol,
"mode", cfg.Mode,
"allow_block", allowBlock,
"flagged", flagged,
"blocked", blocked,
"action", action,
"highest_category", highestCategory,
"highest_score", highestScore,
"latency_ms", latency,
"queue_delay_ms", queueDelay)
if flagged || cfg.RecordNonHits {
log := s.buildLog(input, cfg, action, flagged, highestCategory, highestScore, result.CategoryScores, content.ExcerptText(), &latency, queueDelay, "")
if flagged && s.hashCache != nil {
if err := s.hashCache.RecordFlaggedInputHash(ctx, hashText); err != nil {
slog.Warn("content_moderation.record_hash_failed", "user_id", input.UserID, "endpoint", input.Endpoint, "error", err)
}
}
s.applyFlaggedSideEffects(ctx, cfg, log)
_ = s.repo.CreateLog(ctx, log)
}
if blocked {
return &ContentModerationDecision{
Allowed: false,
Blocked: true,
Flagged: true,
Message: cfg.BlockMessage,
StatusCode: cfg.BlockStatus,
HighestCategory: highestCategory,
HighestScore: highestScore,
CategoryScores: result.CategoryScores,
Action: action,
}
}
return &ContentModerationDecision{
Allowed: true,
Flagged: flagged,
Message: "",
HighestCategory: highestCategory,
HighestScore: highestScore,
CategoryScores: result.CategoryScores,
Action: action,
}
}
func (s *ContentModerationService) enqueueAsync(input ContentModerationCheckInput, cfg *ContentModerationConfig, content ContentModerationInput, hashText string) {
if s == nil || s.asyncQueue == nil {
return
}
queueSize := defaultContentModerationQueueSize
if cfg != nil && cfg.QueueSize > 0 {
queueSize = cfg.QueueSize
}
if len(s.asyncQueue) >= queueSize {
slog.Warn("content_moderation.async_queue_full", "user_id", input.UserID, "endpoint", input.Endpoint, "queue_size", queueSize)
s.asyncDropped.Add(1)
return
}
task := contentModerationTask{
input: input,
content: content,
inputHash: hashText,
enqueuedAt: time.Now(),
}
select {
case s.asyncQueue <- task:
s.asyncEnqueued.Add(1)
default:
slog.Warn("content_moderation.async_queue_full", "user_id", input.UserID, "endpoint", input.Endpoint)
s.asyncDropped.Add(1)
}
}
func (s *ContentModerationService) worker(id int) {
for {
ctx, cancel := context.WithTimeout(context.Background(), maxContentModerationTimeoutMS*time.Millisecond+10*time.Second)
cfg, err := s.loadConfig(ctx)
if err != nil || !cfg.Enabled || cfg.Mode == ContentModerationModeOff || len(cfg.apiKeys()) == 0 || id >= cfg.WorkerCount {
cancel()
time.Sleep(time.Second)
continue
}
task, ok := s.dequeueAsyncTask(ctx, time.Second)
if !ok {
cancel()
continue
}
func() {
defer cancel()
defer func() {
if r := recover(); r != nil {
slog.Error("content_moderation.worker_panic", "worker_id", id, "recover", r)
}
}()
if !cfg.includesGroup(task.input.GroupID) {
return
}
s.asyncActive.Add(1)
defer s.asyncActive.Add(-1)
queueDelay := int(time.Since(task.enqueuedAt).Milliseconds())
_ = s.checkSync(ctx, task.input, cfg, task.content, task.inputHash, &queueDelay, false)
s.asyncProcessed.Add(1)
}()
}
}
func (s *ContentModerationService) dequeueAsyncTask(ctx context.Context, idleWait time.Duration) (contentModerationTask, bool) {
var zero contentModerationTask
if s == nil || s.asyncQueue == nil {
return zero, false
}
if idleWait <= 0 {
idleWait = time.Second
}
timer := time.NewTimer(idleWait)
defer timer.Stop()
select {
case task, ok := <-s.asyncQueue:
return task, ok
case <-ctx.Done():
return zero, false
case <-timer.C:
return zero, false
}
}
func (s *ContentModerationService) ListLogs(ctx context.Context, filter ContentModerationLogFilter) ([]ContentModerationLog, *pagination.PaginationResult, error) {
if filter.Pagination.Page <= 0 {
filter.Pagination.Page = 1
}
if filter.Pagination.PageSize <= 0 {
filter.Pagination.PageSize = 20
}
if filter.Pagination.PageSize > 100 {
filter.Pagination.PageSize = 100
}
if filter.Pagination.SortOrder == "" {
filter.Pagination.SortOrder = pagination.SortOrderDesc
}
return s.repo.ListLogs(ctx, filter)
}
func (s *ContentModerationService) UnbanUser(ctx context.Context, userID int64) (*ContentModerationUnbanUserResult, error) {
if s == nil || s.userRepo == nil {
return nil, infraerrors.InternalServer("CONTENT_MODERATION_USER_REPOSITORY_UNAVAILABLE", "用户仓储不可用")
}
if userID <= 0 {
return nil, infraerrors.BadRequest("INVALID_USER_ID", "用户 ID 无效")
}
user, err := s.userRepo.GetByID(ctx, userID)
if err != nil {
if errors.Is(err, ErrUserNotFound) {
return nil, infraerrors.NotFound("USER_NOT_FOUND", "用户不存在")
}
return nil, fmt.Errorf("get content moderation unban user: %w", err)
}
if user.Status != StatusActive {
user.Status = StatusActive
if err := s.userRepo.Update(ctx, user); err != nil {
return nil, fmt.Errorf("update content moderation unban user: %w", err)
}
}
if s.authCacheInvalidator != nil {
s.authCacheInvalidator.InvalidateAuthCacheByUserID(ctx, userID)
}
return &ContentModerationUnbanUserResult{
UserID: userID,
Status: StatusActive,
}, nil
}
func (s *ContentModerationService) DeleteFlaggedInputHash(ctx context.Context, inputHash string) (*ContentModerationDeleteHashResult, error) {
inputHash = normalizeContentModerationHash(inputHash)
if inputHash == "" {
return nil, infraerrors.BadRequest("INVALID_CONTENT_MODERATION_HASH", "风险输入哈希无效")
}
if s == nil || s.hashCache == nil {
return nil, infraerrors.InternalServer("CONTENT_MODERATION_HASH_CACHE_UNAVAILABLE", "内容审计哈希缓存不可用")
}
deleted, err := s.hashCache.DeleteFlaggedInputHash(ctx, inputHash)
if err != nil {
return nil, fmt.Errorf("delete content moderation flagged hash: %w", err)
}
return &ContentModerationDeleteHashResult{
InputHash: inputHash,
Deleted: deleted,
}, nil
}
func (s *ContentModerationService) ClearFlaggedInputHashes(ctx context.Context) (*ContentModerationClearHashesResult, error) {
if s == nil || s.hashCache == nil {
return nil, infraerrors.InternalServer("CONTENT_MODERATION_HASH_CACHE_UNAVAILABLE", "内容审计哈希缓存不可用")
}
deleted, err := s.hashCache.ClearFlaggedInputHashes(ctx)
if err != nil {
return nil, fmt.Errorf("clear content moderation flagged hashes: %w", err)
}
return &ContentModerationClearHashesResult{Deleted: deleted}, nil
}
func (s *ContentModerationService) GetStatus(ctx context.Context) (*ContentModerationRuntimeStatus, error) {
if s == nil {
return &ContentModerationRuntimeStatus{}, nil
}
cfg, err := s.loadConfig(ctx)
if err != nil {
return nil, err
}
riskEnabled := s.isRiskControlEnabled(ctx)
active := int(s.asyncActive.Load())
if active < 0 {
active = 0
}
if active > cfg.WorkerCount {
active = cfg.WorkerCount
}
queueLength := 0
if s.asyncQueue != nil {
queueLength = len(s.asyncQueue)
}
queueUsage := 0.0
if cfg.QueueSize > 0 {
queueUsage = float64(queueLength) * 100 / float64(cfg.QueueSize)
}
var flaggedHashCount int64
if s.hashCache != nil {
if n, err := s.hashCache.CountFlaggedInputHashes(ctx); err == nil {
flaggedHashCount = n
} else {
slog.Warn("content_moderation.hash_count_failed", "error", err)
}
}
var lastCleanupAt *time.Time
if unix := s.lastCleanupUnix.Load(); unix > 0 {
t := time.Unix(unix, 0)
lastCleanupAt = &t
}
return &ContentModerationRuntimeStatus{
Enabled: cfg.Enabled,
RiskControlEnabled: riskEnabled,
Mode: cfg.Mode,
WorkerCount: cfg.WorkerCount,
MaxWorkers: maxContentModerationWorkerCount,
ActiveWorkers: active,
IdleWorkers: cfg.WorkerCount - active,
QueueSize: cfg.QueueSize,
QueueLength: queueLength,
QueueUsagePercent: queueUsage,
Enqueued: s.asyncEnqueued.Load(),
Dropped: s.asyncDropped.Load(),
Processed: s.asyncProcessed.Load(),
Errors: s.asyncErrors.Load(),
APIKeyStatuses: s.apiKeyStatuses(cfg.apiKeys()),
FlaggedHashCount: flaggedHashCount,
LastCleanupAt: lastCleanupAt,
LastCleanupDeletedHit: s.lastCleanupDeletedHit.Load(),
LastCleanupDeletedNonHit: s.lastCleanupDeletedNonHit.Load(),
}, nil
}
func (s *ContentModerationService) cleanupWorker() {
timer := time.NewTimer(contentModerationCleanupDelay)
defer timer.Stop()
for {
<-timer.C
s.runCleanupOnce()
timer.Reset(contentModerationCleanupInterval)
}
}
func (s *ContentModerationService) runCleanupOnce() {
if s == nil || s.repo == nil || s.settingRepo == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), contentModerationCleanupTimeout)
defer cancel()
cfg, err := s.loadConfig(ctx)
if err != nil {
slog.Warn("content_moderation.cleanup_load_config_failed", "error", err)
return
}
now := time.Now()
hitBefore := now.AddDate(0, 0, -cfg.HitRetentionDays)
nonHitBefore := now.AddDate(0, 0, -cfg.NonHitRetentionDays)
result, err := s.repo.CleanupExpiredLogs(ctx, hitBefore, nonHitBefore)
if err != nil {
slog.Warn("content_moderation.cleanup_failed", "error", err)
return
}
if result == nil {
return
}
s.lastCleanupUnix.Store(result.FinishedAt.Unix())
s.lastCleanupDeletedHit.Store(result.DeletedHit)
s.lastCleanupDeletedNonHit.Store(result.DeletedNonHit)
}
func (s *ContentModerationService) loadConfig(ctx context.Context) (*ContentModerationConfig, error) {
cfg := defaultContentModerationConfig()
raw, err := s.settingRepo.GetValue(ctx, SettingKeyContentModerationConfig)
if err != nil {
if errors.Is(err, ErrSettingNotFound) {
cfg.normalize()
return cfg, nil
}
return nil, fmt.Errorf("get content moderation config: %w", err)
}
if strings.TrimSpace(raw) == "" {
cfg.normalize()
return cfg, nil
}
if err := json.Unmarshal([]byte(raw), cfg); err != nil {
return nil, infraerrors.BadRequest("INVALID_CONTENT_MODERATION_CONFIG", "内容审计配置不是有效 JSON")
}
cfg.normalize()
return cfg, nil
}
func (s *ContentModerationService) isRiskControlEnabled(ctx context.Context) bool {
raw, err := s.settingRepo.GetValue(ctx, SettingKeyRiskControlEnabled)
if err != nil {
return false
}
return raw == "true"
}
func (s *ContentModerationService) validateConfig(ctx context.Context, cfg *ContentModerationConfig) error {
if cfg == nil {
return infraerrors.BadRequest("INVALID_CONTENT_MODERATION_CONFIG", "内容审计配置不能为空")
}
cfg.normalize()
switch cfg.Mode {
case ContentModerationModeOff, ContentModerationModeObserve, ContentModerationModePreBlock:
default:
return infraerrors.BadRequest("INVALID_CONTENT_MODERATION_MODE", "内容审计模式无效")
}
if _, err := url.ParseRequestURI(cfg.BaseURL); err != nil {
return infraerrors.BadRequest("INVALID_CONTENT_MODERATION_BASE_URL", "OpenAI Base URL 无效")
}
if cfg.BlockStatus < 400 || cfg.BlockStatus > 599 {
return infraerrors.BadRequest("INVALID_CONTENT_MODERATION_BLOCK_STATUS", "拦截 HTTP 状态码必须在 400-599 之间")
}
if !cfg.AllGroups && len(cfg.GroupIDs) > 0 && s.groupRepo != nil {
for _, groupID := range cfg.GroupIDs {
if _, err := s.groupRepo.GetByIDLite(ctx, groupID); err != nil {
return infraerrors.BadRequest("INVALID_CONTENT_MODERATION_GROUP", fmt.Sprintf("审计分组不存在: %d", groupID))
}
}
}
return nil
}
func (s *ContentModerationService) callModeration(ctx context.Context, cfg *ContentModerationConfig, input any) (*moderationAPIResult, error) {
attempts := cfg.RetryCount + 1
if attempts <= 0 {
attempts = 1
}
if attempts > maxContentModerationRetryCount+1 {
attempts = maxContentModerationRetryCount + 1
}
var lastErr error
for attempt := 0; attempt < attempts; attempt++ {
key, ok := s.nextUsableAPIKey(cfg)
if !ok {
lastErr = errors.New("no moderation api key available")
break
}
start := time.Now()
httpStatus := 0
result, err := s.callModerationOnceWithInput(ctx, cfg, key, input, &httpStatus)
latency := int(time.Since(start).Milliseconds())
if err == nil {
s.markAPIKeySuccess(key, latency, httpStatus)
return result, nil
}
2026-05-07 14:31:19 +08:00
s.markAPIKeyError(key, err.Error(), latency, httpStatus)
lastErr = err
2026-05-07 14:31:19 +08:00
if httpStatus == http.StatusBadRequest {
break
}
if attempt == attempts-1 {
break
}
wait := time.Duration(100*(attempt+1)) * time.Millisecond
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(wait):
}
}
return nil, lastErr
}
func (s *ContentModerationService) callModerationOnceWithInput(ctx context.Context, cfg *ContentModerationConfig, apiKey string, input any, httpStatus *int) (*moderationAPIResult, error) {
base := strings.TrimRight(cfg.BaseURL, "/")
endpoint, err := url.JoinPath(base, "/v1/moderations")
if err != nil {
return nil, err
}
payload := moderationAPIRequest{
Model: cfg.Model,
Input: input,
}
raw, err := json.Marshal(payload)
if err != nil {
return nil, err
}
timeout := time.Duration(cfg.TimeoutMS) * time.Millisecond
reqCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, endpoint, bytes.NewReader(raw))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+apiKey)
req.Header.Set("Content-Type", "application/json")
client := s.httpClient
if client == nil {
client = http.DefaultClient
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if httpStatus != nil {
*httpStatus = resp.StatusCode
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
return nil, fmt.Errorf("moderation api status %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
}
var out moderationAPIResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, err
}
if len(out.Results) == 0 {
return nil, errors.New("moderation api returned empty results")
}
return &out.Results[0], nil
}
func (s *ContentModerationService) buildLog(input ContentModerationCheckInput, cfg *ContentModerationConfig, action string, flagged bool, highestCategory string, highestScore float64, scores map[string]float64, text string, latency *int, queueDelay *int, errText string) *ContentModerationLog {
var userID *int64
if input.UserID > 0 {
userID = &input.UserID
}
var apiKeyID *int64
if input.APIKeyID > 0 {
apiKeyID = &input.APIKeyID
}
return &ContentModerationLog{
RequestID: input.RequestID,
UserID: userID,
UserEmail: input.UserEmail,
APIKeyID: apiKeyID,
APIKeyName: input.APIKeyName,
GroupID: cloneInt64Ptr(input.GroupID),
GroupName: input.GroupName,
Endpoint: input.Endpoint,
Provider: input.Provider,
Model: input.Model,
Mode: cfg.Mode,
Action: action,
Flagged: flagged,
HighestCategory: highestCategory,
HighestScore: highestScore,
CategoryScores: cloneFloatMap(scores),
ThresholdSnapshot: cloneFloatMap(cfg.Thresholds),
InputExcerpt: trimRunes(redactContentModerationSecrets(text), maxModerationExcerptRunes),
UpstreamLatencyMS: latency,
QueueDelayMS: queueDelay,
Error: errText,
}
}
func (s *ContentModerationService) applyFlaggedSideEffects(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog) {
if s == nil || cfg == nil || log == nil || !log.Flagged || log.UserID == nil || *log.UserID <= 0 {
return
}
count := 1
if s.repo != nil && cfg.ViolationWindowHours > 0 {
since := time.Now().Add(-time.Duration(cfg.ViolationWindowHours) * time.Hour)
if n, err := s.repo.CountFlaggedByUserSince(ctx, *log.UserID, since); err == nil {
count = n + 1
}
}
log.ViolationCount = count
autoBanJustApplied := false
if cfg.AutoBanEnabled && cfg.BanThreshold > 0 && count >= cfg.BanThreshold && s.userRepo != nil {
user, err := s.userRepo.GetByID(ctx, *log.UserID)
if err != nil {
slog.Warn("content_moderation.ban_get_user_failed", "user_id", *log.UserID, "error", err)
return
}
if user.Status != StatusDisabled {
user.Status = StatusDisabled
if err := s.userRepo.Update(ctx, user); err != nil {
slog.Warn("content_moderation.ban_update_user_failed", "user_id", *log.UserID, "error", err)
return
}
if s.authCacheInvalidator != nil {
s.authCacheInvalidator.InvalidateAuthCacheByUserID(ctx, *log.UserID)
}
autoBanJustApplied = true
}
log.AutoBanned = true
}
if s.emailService == nil || strings.TrimSpace(log.UserEmail) == "" {
return
}
emailSent := false
if cfg.EmailOnHit {
if err := s.sendViolationEmail(ctx, cfg, log); err != nil {
slog.Warn("content_moderation.email_failed", "user_id", *log.UserID, "email", log.UserEmail, "error", err)
} else {
emailSent = true
}
}
if autoBanJustApplied {
if err := s.sendAccountDisabledEmail(ctx, cfg, log); err != nil {
slog.Warn("content_moderation.ban_email_failed", "user_id", *log.UserID, "email", log.UserEmail, "error", err)
} else {
emailSent = true
}
}
log.EmailSent = emailSent
}
func (s *ContentModerationService) sendViolationEmail(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog) error {
siteName := s.siteName(ctx)
subject := fmt.Sprintf("[%s] 账户风控提醒 / Risk Control Notice", sanitizeEmailHeader(siteName))
body := buildContentModerationViolationEmailBody(siteName, log, cfg)
return s.emailService.SendEmail(ctx, log.UserEmail, subject, body)
}
func (s *ContentModerationService) sendAccountDisabledEmail(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog) error {
siteName := s.siteName(ctx)
subject := fmt.Sprintf("[%s] 账户已被禁用 / Account Disabled", sanitizeEmailHeader(siteName))
body := buildContentModerationAccountDisabledEmailBody(siteName, log, cfg)
return s.emailService.SendEmail(ctx, log.UserEmail, subject, body)
}
func (s *ContentModerationService) siteName(ctx context.Context) string {
if s == nil || s.settingRepo == nil {
return "Sub2API"
}
name, err := s.settingRepo.GetValue(ctx, SettingKeySiteName)
if err != nil || strings.TrimSpace(name) == "" {
return "Sub2API"
}
return strings.TrimSpace(name)
}
func defaultContentModerationConfig() *ContentModerationConfig {
return &ContentModerationConfig{
Enabled: false,
Mode: ContentModerationModePreBlock,
BaseURL: defaultContentModerationBaseURL,
Model: defaultContentModerationModel,
TimeoutMS: defaultContentModerationTimeoutMS,
SampleRate: 100,
AllGroups: true,
GroupIDs: []int64{},
RecordNonHits: false,
Thresholds: ContentModerationDefaultThresholds(),
WorkerCount: defaultContentModerationWorkerCount,
QueueSize: defaultContentModerationQueueSize,
BlockStatus: defaultContentModerationBlockHTTPStatus,
BlockMessage: defaultContentModerationBlockMessage,
EmailOnHit: true,
AutoBanEnabled: true,
BanThreshold: defaultContentModerationBanThreshold,
ViolationWindowHours: defaultContentModerationViolationWindowHours,
RetryCount: defaultContentModerationRetryCount,
HitRetentionDays: defaultContentModerationHitRetentionDays,
NonHitRetentionDays: defaultContentModerationNonHitRetentionDays,
PreHashCheckEnabled: false,
}
}
func (cfg *ContentModerationConfig) normalize() {
if cfg.APIKey != "" {
cfg.APIKeys = normalizeModerationAPIKeys(append(cfg.APIKeys, cfg.APIKey))
cfg.APIKey = ""
} else {
cfg.APIKeys = normalizeModerationAPIKeys(cfg.APIKeys)
}
if cfg.Mode == "" {
cfg.Mode = ContentModerationModePreBlock
}
if cfg.BaseURL == "" {
cfg.BaseURL = defaultContentModerationBaseURL
}
cfg.BaseURL = strings.TrimRight(strings.TrimSpace(cfg.BaseURL), "/")
if cfg.Model == "" {
cfg.Model = defaultContentModerationModel
}
cfg.Model = strings.TrimSpace(cfg.Model)
if cfg.TimeoutMS <= 0 {
cfg.TimeoutMS = defaultContentModerationTimeoutMS
}
if cfg.TimeoutMS > maxContentModerationTimeoutMS {
cfg.TimeoutMS = maxContentModerationTimeoutMS
}
if cfg.SampleRate < 0 {
cfg.SampleRate = 0
}
if cfg.SampleRate > 100 {
cfg.SampleRate = 100
}
if cfg.WorkerCount <= 0 {
cfg.WorkerCount = defaultContentModerationWorkerCount
}
if cfg.WorkerCount > maxContentModerationWorkerCount {
cfg.WorkerCount = maxContentModerationWorkerCount
}
if cfg.QueueSize <= 0 {
cfg.QueueSize = defaultContentModerationQueueSize
}
if cfg.QueueSize > maxContentModerationQueueSize {
cfg.QueueSize = maxContentModerationQueueSize
}
if strings.TrimSpace(cfg.BlockMessage) == "" {
cfg.BlockMessage = defaultContentModerationBlockMessage
}
cfg.BlockMessage = strings.TrimSpace(cfg.BlockMessage)
if cfg.BlockStatus <= 0 {
cfg.BlockStatus = defaultContentModerationBlockHTTPStatus
}
if cfg.BanThreshold <= 0 {
cfg.BanThreshold = defaultContentModerationBanThreshold
}
if cfg.ViolationWindowHours <= 0 {
cfg.ViolationWindowHours = defaultContentModerationViolationWindowHours
}
if cfg.RetryCount < 0 {
cfg.RetryCount = 0
}
if cfg.RetryCount > maxContentModerationRetryCount {
cfg.RetryCount = maxContentModerationRetryCount
}
if cfg.HitRetentionDays <= 0 {
cfg.HitRetentionDays = defaultContentModerationHitRetentionDays
}
if cfg.HitRetentionDays > maxContentModerationRetentionDays {
cfg.HitRetentionDays = maxContentModerationRetentionDays
}
if cfg.NonHitRetentionDays <= 0 {
cfg.NonHitRetentionDays = defaultContentModerationNonHitRetentionDays
}
if cfg.NonHitRetentionDays > maxContentModerationNonHitRetentionDays {
cfg.NonHitRetentionDays = maxContentModerationNonHitRetentionDays
}
cfg.GroupIDs = normalizeInt64IDs(cfg.GroupIDs)
cfg.Thresholds = mergeContentModerationThresholds(ContentModerationDefaultThresholds(), cfg.Thresholds)
}
func (cfg *ContentModerationConfig) includesGroup(groupID *int64) bool {
if cfg.AllGroups {
return true
}
if groupID == nil {
return false
}
for _, id := range cfg.GroupIDs {
if id == *groupID {
return true
}
}
return false
}
func contentModerationLogGroupID(groupID *int64) int64 {
if groupID == nil {
return 0
}
return *groupID
}
func (cfg *ContentModerationConfig) shouldSample(hashText string) bool {
if cfg.SampleRate >= 100 {
return true
}
if cfg.SampleRate <= 0 {
return false
}
raw, err := hex.DecodeString(hashText)
if err != nil || len(raw) < 2 {
return true
}
return int(binary.BigEndian.Uint16(raw[:2])%100) < cfg.SampleRate
}
func (cfg *ContentModerationConfig) apiKeys() []string {
if cfg == nil {
return nil
}
return normalizeModerationAPIKeys(cfg.APIKeys)
}
func (s *ContentModerationService) nextUsableAPIKey(cfg *ContentModerationConfig) (string, bool) {
keys := cfg.apiKeys()
if len(keys) == 0 {
return "", false
}
now := time.Now()
for i := 0; i < len(keys); i++ {
idx := int(s.apiKeyCursor.Add(1)-1) % len(keys)
key := keys[idx]
if !s.isAPIKeyFrozen(key, now) {
return key, true
}
}
return "", false
}
func (s *ContentModerationService) isAPIKeyFrozen(key string, now time.Time) bool {
hash := moderationAPIKeyHash(key)
if hash == "" || s == nil {
return false
}
s.keyHealthMu.Lock()
defer s.keyHealthMu.Unlock()
state := s.keyHealth[hash]
return state != nil && state.FrozenUntil.After(now)
}
func (s *ContentModerationService) markAPIKeySuccess(key string, latencyMS int, httpStatus int) {
hash := moderationAPIKeyHash(key)
if hash == "" || s == nil {
return
}
s.keyHealthMu.Lock()
defer s.keyHealthMu.Unlock()
state := s.ensureAPIKeyHealthLocked(hash, maskSecretTail(key))
state.FailureCount = 0
state.SuccessCount++
state.LastError = ""
state.LastCheckedAt = time.Now()
state.FrozenUntil = time.Time{}
state.LastLatencyMS = latencyMS
state.LastHTTPStatus = httpStatus
state.LastTested = true
}
2026-05-07 14:31:19 +08:00
func (s *ContentModerationService) markAPIKeyError(key string, errText string, latencyMS int, httpStatus int) {
hash := moderationAPIKeyHash(key)
if hash == "" || s == nil {
return
}
s.keyHealthMu.Lock()
defer s.keyHealthMu.Unlock()
state := s.ensureAPIKeyHealthLocked(hash, maskSecretTail(key))
2026-05-07 14:31:19 +08:00
if contentModerationFreezeDurationForHTTPStatus(httpStatus) > 0 {
state.FailureCount++
}
state.LastError = trimRunes(errText, 180)
state.LastCheckedAt = time.Now()
state.LastLatencyMS = latencyMS
state.LastHTTPStatus = httpStatus
state.LastTested = true
2026-05-07 14:31:19 +08:00
if freezeDuration := contentModerationFreezeDurationForHTTPStatus(httpStatus); freezeDuration > 0 {
state.FrozenUntil = time.Now().Add(freezeDuration)
}
}
func contentModerationFreezeDurationForHTTPStatus(httpStatus int) time.Duration {
switch httpStatus {
case 0, http.StatusBadRequest:
return 0
case http.StatusUnauthorized, http.StatusForbidden:
return contentModerationKeyAuthFreezeDuration
case http.StatusTooManyRequests, 529:
return contentModerationKeyRateLimitFreezeDuration
default:
return contentModerationKeyHTTPErrorFreezeDuration
}
}
func (s *ContentModerationService) ensureAPIKeyHealthLocked(hash string, masked string) *contentModerationKeyHealth {
if s.keyHealth == nil {
s.keyHealth = make(map[string]*contentModerationKeyHealth)
}
state := s.keyHealth[hash]
if state == nil {
state = &contentModerationKeyHealth{Hash: hash}
s.keyHealth[hash] = state
}
if strings.TrimSpace(masked) != "" {
state.Masked = masked
}
return state
}
func (s *ContentModerationService) configView(cfg *ContentModerationConfig) *ContentModerationConfigView {
keys := cfg.apiKeys()
masks := make([]string, 0, len(keys))
for _, key := range keys {
masks = append(masks, maskSecretTail(key))
}
apiKeyMasked := ""
if len(masks) > 0 {
apiKeyMasked = masks[0]
}
return &ContentModerationConfigView{
Enabled: cfg.Enabled,
Mode: cfg.Mode,
BaseURL: cfg.BaseURL,
Model: cfg.Model,
APIKeyConfigured: len(keys) > 0,
APIKeyMasked: apiKeyMasked,
APIKeyCount: len(keys),
APIKeyMasks: masks,
APIKeyStatuses: s.apiKeyStatuses(keys),
TimeoutMS: cfg.TimeoutMS,
SampleRate: cfg.SampleRate,
AllGroups: cfg.AllGroups,
GroupIDs: append([]int64(nil), cfg.GroupIDs...),
RecordNonHits: cfg.RecordNonHits,
WorkerCount: cfg.WorkerCount,
QueueSize: cfg.QueueSize,
BlockStatus: cfg.BlockStatus,
BlockMessage: cfg.BlockMessage,
EmailOnHit: cfg.EmailOnHit,
AutoBanEnabled: cfg.AutoBanEnabled,
BanThreshold: cfg.BanThreshold,
ViolationWindowHours: cfg.ViolationWindowHours,
RetryCount: cfg.RetryCount,
HitRetentionDays: cfg.HitRetentionDays,
NonHitRetentionDays: cfg.NonHitRetentionDays,
PreHashCheckEnabled: cfg.PreHashCheckEnabled,
}
}
func (s *ContentModerationService) apiKeyStatuses(keys []string) []ContentModerationAPIKeyStatus {
out := make([]ContentModerationAPIKeyStatus, 0, len(keys))
for idx, key := range keys {
out = append(out, s.apiKeyStatusForHash(idx, moderationAPIKeyHash(key), maskSecretTail(key), true))
}
return out
}
func (s *ContentModerationService) apiKeyStatusForHash(index int, hash string, masked string, configured bool) ContentModerationAPIKeyStatus {
status := ContentModerationAPIKeyStatus{
Index: index,
KeyHash: hash,
Masked: masked,
Status: "unknown",
Configured: configured,
}
if hash == "" || s == nil {
return status
}
now := time.Now()
s.keyHealthMu.Lock()
defer s.keyHealthMu.Unlock()
state := s.keyHealth[hash]
if state == nil {
return status
}
status.FailureCount = state.FailureCount
status.SuccessCount = state.SuccessCount
status.LastError = state.LastError
status.LastLatencyMS = state.LastLatencyMS
status.LastHTTPStatus = state.LastHTTPStatus
status.LastTested = state.LastTested
if !state.LastCheckedAt.IsZero() {
t := state.LastCheckedAt
status.LastCheckedAt = &t
}
if state.FrozenUntil.After(now) {
t := state.FrozenUntil
status.FrozenUntil = &t
status.Status = "frozen"
return status
}
if state.LastError != "" {
status.Status = "error"
return status
}
if state.SuccessCount > 0 || state.LastTested {
status.Status = "ok"
}
return status
}
func moderationAPIKeyHash(key string) string {
key = strings.TrimSpace(key)
if key == "" {
return ""
}
sum := sha256.Sum256([]byte(key))
return hex.EncodeToString(sum[:])
}
func buildModerationTestInput(prompt string, images []string) (any, int, error) {
prompt = trimRunes(normalizeContentModerationText(prompt), maxModerationInputRunes)
normalizedImages := make([]string, 0, len(images))
for _, image := range images {
image = strings.TrimSpace(image)
if image == "" {
continue
}
if len(normalizedImages) >= maxContentModerationTestImages {
return nil, 0, infraerrors.BadRequest("TOO_MANY_MODERATION_TEST_IMAGES", fmt.Sprintf("最多上传 %d 张测试图片", maxContentModerationTestImages))
}
if err := validateModerationTestImageDataURL(image); err != nil {
return nil, 0, err
}
normalizedImages = append(normalizedImages, image)
}
if prompt == "" && len(normalizedImages) == 0 {
return "hello", 0, nil
}
if len(normalizedImages) == 0 {
return prompt, 0, nil
}
parts := make([]moderationAPIInputPart, 0, len(normalizedImages)+1)
if prompt != "" {
parts = append(parts, moderationAPIInputPart{Type: "text", Text: prompt})
}
for _, image := range normalizedImages {
parts = append(parts, moderationAPIInputPart{
Type: "image_url",
ImageURL: &moderationAPIImageURLRef{URL: image},
})
}
return parts, len(normalizedImages), nil
}
func contentModerationTestHasAuditInput(prompt string, images []string) bool {
if normalizeContentModerationText(prompt) != "" {
return true
}
for _, image := range images {
if strings.TrimSpace(image) != "" {
return true
}
}
return false
}
func validateModerationTestImageDataURL(value string) error {
if len(value) > maxContentModerationTestImageDataURLBytes {
return infraerrors.BadRequest("MODERATION_TEST_IMAGE_TOO_LARGE", "测试图片不能超过 8MB")
}
if !strings.HasPrefix(value, "data:image/") {
return infraerrors.BadRequest("INVALID_MODERATION_TEST_IMAGE", "测试图片必须是 data:image/* base64")
}
parts := strings.SplitN(value, ",", 2)
if len(parts) != 2 || !strings.Contains(parts[0], ";base64") {
return infraerrors.BadRequest("INVALID_MODERATION_TEST_IMAGE", "测试图片必须是 base64 data URL")
}
raw, err := base64.StdEncoding.DecodeString(parts[1])
if err != nil {
return infraerrors.BadRequest("INVALID_MODERATION_TEST_IMAGE", "测试图片 base64 无效")
}
if len(raw) > maxContentModerationTestImageBytes {
return infraerrors.BadRequest("MODERATION_TEST_IMAGE_TOO_LARGE", "测试图片不能超过 8MB")
}
return nil
}
func buildContentModerationTestAuditResult(result *moderationAPIResult, thresholds map[string]float64) *ContentModerationTestAuditResult {
if result == nil {
return nil
}
scores := make(map[string]float64, len(result.CategoryScores))
for category, score := range result.CategoryScores {
scores[category] = score
}
thresholdSnapshot := mergeContentModerationThresholds(ContentModerationDefaultThresholds(), thresholds)
flagged, highestCategory, highestScore := evaluateModerationScores(scores, thresholdSnapshot)
compositeScore := highestScore
return &ContentModerationTestAuditResult{
Flagged: flagged,
HighestCategory: highestCategory,
HighestScore: highestScore,
CompositeScore: compositeScore,
CategoryScores: scores,
Thresholds: thresholdSnapshot,
}
}
type moderationAPIRequest struct {
Model string `json:"model"`
Input any `json:"input"`
}
type moderationAPIInputPart struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
ImageURL *moderationAPIImageURLRef `json:"image_url,omitempty"`
}
type moderationAPIImageURLRef struct {
URL string `json:"url"`
}
type moderationAPIResponse struct {
Results []moderationAPIResult `json:"results"`
}
type moderationAPIResult struct {
Flagged bool `json:"flagged"`
CategoryScores map[string]float64 `json:"category_scores"`
}
func evaluateModerationScores(scores map[string]float64, thresholds map[string]float64) (bool, string, float64) {
flagged := false
highestCategory := ""
highestScore := 0.0
for _, category := range contentModerationCategoryOrder {
score := scores[category]
if score > highestScore || highestCategory == "" {
highestScore = score
highestCategory = category
}
if score >= thresholds[category] {
flagged = true
}
}
for category, score := range scores {
if score > highestScore || highestCategory == "" {
highestScore = score
highestCategory = category
}
}
return flagged, highestCategory, highestScore
}
func mergeContentModerationThresholds(base map[string]float64, override map[string]float64) map[string]float64 {
out := cloneFloatMap(base)
if out == nil {
out = map[string]float64{}
}
for _, category := range contentModerationCategoryOrder {
if v, ok := override[category]; ok {
if v < 0 {
v = 0
}
if v > 1 {
v = 1
}
out[category] = v
}
}
return out
}
func normalizeInt64IDs(ids []int64) []int64 {
if len(ids) == 0 {
return []int64{}
}
seen := make(map[int64]struct{}, len(ids))
out := make([]int64, 0, len(ids))
for _, id := range ids {
if id <= 0 {
continue
}
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
out = append(out, id)
}
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
return out
}
func normalizeModerationAPIKeys(keys []string) []string {
if len(keys) == 0 {
return []string{}
}
seen := make(map[string]struct{}, len(keys))
out := make([]string, 0, len(keys))
for _, key := range keys {
key = strings.TrimSpace(key)
if key == "" {
continue
}
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}
out = append(out, key)
}
return out
}
2026-05-07 14:31:19 +08:00
func deleteModerationAPIKeysByHash(keys []string, hashes []string) []string {
keys = normalizeModerationAPIKeys(keys)
deleteHashes := make(map[string]struct{}, len(hashes))
for _, hash := range hashes {
hash = normalizeContentModerationHash(hash)
if hash != "" {
deleteHashes[hash] = struct{}{}
}
}
if len(deleteHashes) == 0 {
return keys
}
out := make([]string, 0, len(keys))
for _, key := range keys {
if _, ok := deleteHashes[moderationAPIKeyHash(key)]; ok {
continue
}
out = append(out, key)
}
return out
}
func normalizeContentModerationAPIKeysMode(mode string) string {
switch strings.ToLower(strings.TrimSpace(mode)) {
case contentModerationAPIKeysModeReplace:
return contentModerationAPIKeysModeReplace
default:
return contentModerationAPIKeysModeAppend
}
}
func normalizeContentModerationHash(inputHash string) string {
inputHash = strings.ToLower(strings.TrimSpace(inputHash))
if len(inputHash) != sha256.Size*2 {
return ""
}
if _, err := hex.DecodeString(inputHash); err != nil {
return ""
}
return inputHash
}
func cloneFloatMap(in map[string]float64) map[string]float64 {
if in == nil {
return map[string]float64{}
}
out := make(map[string]float64, len(in))
for k, v := range in {
out[k] = v
}
return out
}
func cloneInt64Ptr(in *int64) *int64 {
if in == nil {
return nil
}
v := *in
return &v
}
func trimRunes(text string, max int) string {
if max <= 0 {
return ""
}
runes := []rune(text)
if len(runes) <= max {
return text
}
return string(runes[:max])
}
func maskSecretTail(secret string) string {
secret = strings.TrimSpace(secret)
if secret == "" {
return ""
}
if len(secret) <= 4 {
return "****"
}
return strings.Repeat("*", 8) + secret[len(secret)-4:]
}