package routing import ( "context" "fmt" "log" "sync" "time" "sub2api-cn-relay-manager/internal/store/sqlite" ) const ( defaultQueueSize = 128 defaultFlushInterval = 2 * time.Second defaultMaxBatchSize = 32 defaultFallbackWriteTimeout = 3 * time.Second ) type RouteDecisionEvent struct { RequestID string LogicalGroupID string PublicModel string UserKey string ConversationKey string StickyKey string StickyKeyType string StickyHit bool SelectedRouteID string SelectedShadowGroupID string FallbackUsed bool ErrorClass string UpstreamStatus int LatencyMS int } type RouteFailoverEvent struct { RequestID string LogicalGroupID string PublicModel string FromRouteID string ToRouteID string Reason string FailureCount int } type RouteStickyAuditEvent struct { StickyKey string StickyKeyType string LogicalGroupID string PublicModel string RouteID string Action string ExpiresAt string } type RouteDecisionLogger interface { AppendDecision(context.Context, RouteDecisionEvent) error AppendFailover(context.Context, RouteFailoverEvent) error AppendStickyAudit(context.Context, RouteStickyAuditEvent) error Flush(context.Context) error Close() error } type routeLogSink interface { AppendDecision(context.Context, RouteDecisionEvent) error AppendFailover(context.Context, RouteFailoverEvent) error AppendStickyAudit(context.Context, RouteStickyAuditEvent) error Close() error } type ErrorMetrics struct { FlushErrors int64 WriteErrors int64 DroppedEvents int64 mu sync.RWMutex } func (e *ErrorMetrics) RecordFlushError() { e.mu.Lock() defer e.mu.Unlock() e.FlushErrors++ } func (e *ErrorMetrics) RecordWriteError() { e.mu.Lock() defer e.mu.Unlock() e.WriteErrors++ } func (e *ErrorMetrics) RecordDroppedEvent() { e.mu.Lock() defer e.mu.Unlock() e.DroppedEvents++ } func (e *ErrorMetrics) GetFlushErrors() int64 { e.mu.RLock() defer e.mu.RUnlock() return e.FlushErrors } func (e *ErrorMetrics) GetWriteErrors() int64 { e.mu.RLock() defer e.mu.RUnlock() return e.WriteErrors } func (e *ErrorMetrics) GetDroppedEvents() int64 { e.mu.RLock() defer e.mu.RUnlock() return e.DroppedEvents } type ErrorHandler func(ctx context.Context, err error, eventType string) type AsyncLogWriterOptions struct { QueueSize int FlushInterval time.Duration MaxBatchSize int FallbackWriteTimeout time.Duration OnError ErrorHandler } type queuedLogEvent struct { decision *RouteDecisionEvent failover *RouteFailoverEvent sticky *RouteStickyAuditEvent } type AsyncLogWriter struct { sink routeLogSink queue chan queuedLogEvent flushRequests chan chan error closeCh chan struct{} closedCh chan struct{} flushInterval time.Duration maxBatchSize int fallbackWriteTimeout time.Duration once sync.Once onError ErrorHandler metrics ErrorMetrics } func NewSQLiteLogWriter(ctx context.Context, sqliteDSN string, opts AsyncLogWriterOptions) (*AsyncLogWriter, error) { store, err := sqlite.Open(ctx, sqliteDSN) if err != nil { return nil, err } return NewAsyncLogWriter(newSQLiteRouteLogSink(store), opts), nil } func NewAsyncLogWriter(sink routeLogSink, opts AsyncLogWriterOptions) *AsyncLogWriter { if opts.QueueSize <= 0 { opts.QueueSize = defaultQueueSize } if opts.FlushInterval <= 0 { opts.FlushInterval = defaultFlushInterval } if opts.MaxBatchSize <= 0 { opts.MaxBatchSize = defaultMaxBatchSize } if opts.FallbackWriteTimeout <= 0 { opts.FallbackWriteTimeout = defaultFallbackWriteTimeout } writer := &AsyncLogWriter{ sink: sink, queue: make(chan queuedLogEvent, opts.QueueSize), flushRequests: make(chan chan error), closeCh: make(chan struct{}), closedCh: make(chan struct{}), flushInterval: opts.FlushInterval, maxBatchSize: opts.MaxBatchSize, fallbackWriteTimeout: opts.FallbackWriteTimeout, onError: opts.OnError, } go writer.loop() return writer } func (w *AsyncLogWriter) AppendDecision(ctx context.Context, event RouteDecisionEvent) error { return w.enqueue(ctx, queuedLogEvent{decision: &event}) } func (w *AsyncLogWriter) AppendFailover(ctx context.Context, event RouteFailoverEvent) error { return w.enqueue(ctx, queuedLogEvent{failover: &event}) } func (w *AsyncLogWriter) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error { return w.enqueue(ctx, queuedLogEvent{sticky: &event}) } func (w *AsyncLogWriter) Flush(ctx context.Context) error { resp := make(chan error, 1) select { case <-ctx.Done(): return ctx.Err() case <-w.closedCh: return nil case w.flushRequests <- resp: } select { case <-ctx.Done(): return ctx.Err() case err := <-resp: return err } } func (w *AsyncLogWriter) Close() error { var err error w.once.Do(func() { close(w.closeCh) <-w.closedCh err = w.sink.Close() }) return err } func (w *AsyncLogWriter) loop() { ticker := time.NewTicker(w.flushInterval) defer ticker.Stop() defer close(w.closedCh) batch := make([]queuedLogEvent, 0, w.maxBatchSize) for { select { case item := <-w.queue: batch = append(batch, item) if len(batch) >= w.maxBatchSize { w.flushBatch(batch) batch = batch[:0] } case resp := <-w.flushRequests: batch = w.drainQueue(batch) err := w.flushBatch(batch) batch = batch[:0] resp <- err case <-ticker.C: batch = w.drainQueue(batch) if len(batch) == 0 { continue } w.flushBatch(batch) batch = batch[:0] case <-w.closeCh: batch = w.drainQueue(batch) w.flushBatch(batch) return } } } func (w *AsyncLogWriter) enqueue(ctx context.Context, item queuedLogEvent) error { select { case <-ctx.Done(): return ctx.Err() case <-w.closedCh: return fmt.Errorf("route log writer is closed") case w.queue <- item: return nil default: // Queue is full, use fallback direct write w.metrics.RecordDroppedEvent() fallbackCtx, cancel := context.WithTimeout(context.Background(), w.fallbackWriteTimeout) defer cancel() err := w.writeOne(fallbackCtx, item) if err != nil && w.onError != nil { w.onError(fallbackCtx, err, w.getEventType(item)) } return err } } func (w *AsyncLogWriter) flushBatch(batch []queuedLogEvent) error { if len(batch) == 0 { return nil } var firstErr error for _, item := range batch { if err := w.writeOne(context.Background(), item); err != nil { if firstErr == nil { firstErr = err } w.metrics.RecordFlushError() log.Printf("routing: flush route log event failed: %v", err) // Call error handler if configured if w.onError != nil { w.onError(context.Background(), err, w.getEventType(item)) } } } if firstErr != nil { w.metrics.RecordWriteError() } return firstErr } func (w *AsyncLogWriter) getEventType(item queuedLogEvent) string { switch { case item.decision != nil: return "decision" case item.failover != nil: return "failover" case item.sticky != nil: return "sticky_audit" default: return "unknown" } } // Metrics returns the error metrics for monitoring func (w *AsyncLogWriter) Metrics() ErrorMetrics { return ErrorMetrics{ FlushErrors: w.metrics.GetFlushErrors(), WriteErrors: w.metrics.GetWriteErrors(), DroppedEvents: w.metrics.GetDroppedEvents(), } } func (w *AsyncLogWriter) drainQueue(batch []queuedLogEvent) []queuedLogEvent { for { select { case item := <-w.queue: batch = append(batch, item) default: return batch } } } func (w *AsyncLogWriter) writeOne(ctx context.Context, item queuedLogEvent) error { switch { case item.decision != nil: return w.sink.AppendDecision(ctx, *item.decision) case item.failover != nil: return w.sink.AppendFailover(ctx, *item.failover) case item.sticky != nil: return w.sink.AppendStickyAudit(ctx, *item.sticky) default: return fmt.Errorf("route log event payload is empty") } } type sqliteRouteLogSink struct { store *sqlite.DB } func newSQLiteRouteLogSink(store *sqlite.DB) *sqliteRouteLogSink { return &sqliteRouteLogSink{store: store} } func (s *sqliteRouteLogSink) AppendDecision(ctx context.Context, event RouteDecisionEvent) error { _, err := s.store.RouteDecisionLogs().Create(ctx, sqlite.RouteDecisionLog{ RequestID: event.RequestID, LogicalGroupID: event.LogicalGroupID, PublicModel: event.PublicModel, UserKey: event.UserKey, ConversationKey: event.ConversationKey, StickyKey: event.StickyKey, StickyKeyType: event.StickyKeyType, StickyHit: event.StickyHit, SelectedRouteID: event.SelectedRouteID, SelectedShadowGroupID: event.SelectedShadowGroupID, FallbackUsed: event.FallbackUsed, ErrorClass: event.ErrorClass, UpstreamStatus: event.UpstreamStatus, LatencyMS: event.LatencyMS, }) return err } func (s *sqliteRouteLogSink) AppendFailover(ctx context.Context, event RouteFailoverEvent) error { _, err := s.store.RouteFailoverEvents().Create(ctx, sqlite.RouteFailoverEvent{ RequestID: event.RequestID, LogicalGroupID: event.LogicalGroupID, PublicModel: event.PublicModel, FromRouteID: event.FromRouteID, ToRouteID: event.ToRouteID, Reason: event.Reason, FailureCount: event.FailureCount, }) return err } func (s *sqliteRouteLogSink) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error { _, err := s.store.RouteStickyAudit().Create(ctx, sqlite.RouteStickyAudit{ StickyKey: event.StickyKey, StickyKeyType: event.StickyKeyType, LogicalGroupID: event.LogicalGroupID, PublicModel: event.PublicModel, RouteID: event.RouteID, Action: event.Action, ExpiresAt: event.ExpiresAt, }) return err } func (s *sqliteRouteLogSink) Close() error { return s.store.Close() }