package routing import ( "context" "fmt" "log" "sync" "time" "sub2api-cn-relay-manager/internal/store/sqlite" ) const ( defaultQueueSize = 128 defaultFlushInterval = 2 * time.Second defaultMaxBatchSize = 32 defaultFallbackWriteTimeout = 3 * time.Second ) type RouteDecisionEvent struct { RequestID string LogicalGroupID string PublicModel string UserKey string ConversationKey string StickyKey string StickyKeyType string StickyHit bool SelectedRouteID string SelectedShadowGroupID string FallbackUsed bool ErrorClass string UpstreamStatus int LatencyMS int } type RouteFailoverEvent struct { RequestID string LogicalGroupID string PublicModel string FromRouteID string ToRouteID string Reason string FailureCount int } type RouteStickyAuditEvent struct { StickyKey string StickyKeyType string LogicalGroupID string PublicModel string RouteID string Action string ExpiresAt string } type RouteDecisionLogger interface { AppendDecision(context.Context, RouteDecisionEvent) error AppendFailover(context.Context, RouteFailoverEvent) error AppendStickyAudit(context.Context, RouteStickyAuditEvent) error Flush(context.Context) error Close() error } type routeLogSink interface { AppendDecision(context.Context, RouteDecisionEvent) error AppendFailover(context.Context, RouteFailoverEvent) error AppendStickyAudit(context.Context, RouteStickyAuditEvent) error Close() error } type AsyncLogWriterOptions struct { QueueSize int FlushInterval time.Duration MaxBatchSize int FallbackWriteTimeout time.Duration } type queuedLogEvent struct { decision *RouteDecisionEvent failover *RouteFailoverEvent sticky *RouteStickyAuditEvent } type AsyncLogWriter struct { sink routeLogSink queue chan queuedLogEvent flushRequests chan chan error closeCh chan struct{} closedCh chan struct{} flushInterval time.Duration maxBatchSize int fallbackWriteTimeout time.Duration once sync.Once } func NewSQLiteLogWriter(ctx context.Context, sqliteDSN string, opts AsyncLogWriterOptions) (*AsyncLogWriter, error) { store, err := sqlite.Open(ctx, sqliteDSN) if err != nil { return nil, err } return NewAsyncLogWriter(newSQLiteRouteLogSink(store), opts), nil } func NewAsyncLogWriter(sink routeLogSink, opts AsyncLogWriterOptions) *AsyncLogWriter { if opts.QueueSize <= 0 { opts.QueueSize = defaultQueueSize } if opts.FlushInterval <= 0 { opts.FlushInterval = defaultFlushInterval } if opts.MaxBatchSize <= 0 { opts.MaxBatchSize = defaultMaxBatchSize } if opts.FallbackWriteTimeout <= 0 { opts.FallbackWriteTimeout = defaultFallbackWriteTimeout } writer := &AsyncLogWriter{ sink: sink, queue: make(chan queuedLogEvent, opts.QueueSize), flushRequests: make(chan chan error), closeCh: make(chan struct{}), closedCh: make(chan struct{}), flushInterval: opts.FlushInterval, maxBatchSize: opts.MaxBatchSize, fallbackWriteTimeout: opts.FallbackWriteTimeout, } go writer.loop() return writer } func (w *AsyncLogWriter) AppendDecision(ctx context.Context, event RouteDecisionEvent) error { return w.enqueue(ctx, queuedLogEvent{decision: &event}) } func (w *AsyncLogWriter) AppendFailover(ctx context.Context, event RouteFailoverEvent) error { return w.enqueue(ctx, queuedLogEvent{failover: &event}) } func (w *AsyncLogWriter) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error { return w.enqueue(ctx, queuedLogEvent{sticky: &event}) } func (w *AsyncLogWriter) Flush(ctx context.Context) error { resp := make(chan error, 1) select { case <-ctx.Done(): return ctx.Err() case <-w.closedCh: return nil case w.flushRequests <- resp: } select { case <-ctx.Done(): return ctx.Err() case err := <-resp: return err } } func (w *AsyncLogWriter) Close() error { var err error w.once.Do(func() { close(w.closeCh) <-w.closedCh err = w.sink.Close() }) return err } func (w *AsyncLogWriter) loop() { ticker := time.NewTicker(w.flushInterval) defer ticker.Stop() defer close(w.closedCh) batch := make([]queuedLogEvent, 0, w.maxBatchSize) for { select { case item := <-w.queue: batch = append(batch, item) if len(batch) >= w.maxBatchSize { w.flushBatch(batch) batch = batch[:0] } case resp := <-w.flushRequests: batch = w.drainQueue(batch) err := w.flushBatch(batch) batch = batch[:0] resp <- err case <-ticker.C: batch = w.drainQueue(batch) if len(batch) == 0 { continue } w.flushBatch(batch) batch = batch[:0] case <-w.closeCh: batch = w.drainQueue(batch) w.flushBatch(batch) return } } } func (w *AsyncLogWriter) enqueue(ctx context.Context, item queuedLogEvent) error { select { case <-ctx.Done(): return ctx.Err() case <-w.closedCh: return fmt.Errorf("route log writer is closed") case w.queue <- item: return nil default: fallbackCtx, cancel := context.WithTimeout(context.Background(), w.fallbackWriteTimeout) defer cancel() return w.writeOne(fallbackCtx, item) } } func (w *AsyncLogWriter) flushBatch(batch []queuedLogEvent) error { if len(batch) == 0 { return nil } var firstErr error for _, item := range batch { if err := w.writeOne(context.Background(), item); err != nil { if firstErr == nil { firstErr = err } log.Printf("routing: flush route log event failed: %v", err) } } return firstErr } func (w *AsyncLogWriter) drainQueue(batch []queuedLogEvent) []queuedLogEvent { for { select { case item := <-w.queue: batch = append(batch, item) default: return batch } } } func (w *AsyncLogWriter) writeOne(ctx context.Context, item queuedLogEvent) error { switch { case item.decision != nil: return w.sink.AppendDecision(ctx, *item.decision) case item.failover != nil: return w.sink.AppendFailover(ctx, *item.failover) case item.sticky != nil: return w.sink.AppendStickyAudit(ctx, *item.sticky) default: return fmt.Errorf("route log event payload is empty") } } type sqliteRouteLogSink struct { store *sqlite.DB } func newSQLiteRouteLogSink(store *sqlite.DB) *sqliteRouteLogSink { return &sqliteRouteLogSink{store: store} } func (s *sqliteRouteLogSink) AppendDecision(ctx context.Context, event RouteDecisionEvent) error { _, err := s.store.RouteDecisionLogs().Create(ctx, sqlite.RouteDecisionLog{ RequestID: event.RequestID, LogicalGroupID: event.LogicalGroupID, PublicModel: event.PublicModel, UserKey: event.UserKey, ConversationKey: event.ConversationKey, StickyKey: event.StickyKey, StickyKeyType: event.StickyKeyType, StickyHit: event.StickyHit, SelectedRouteID: event.SelectedRouteID, SelectedShadowGroupID: event.SelectedShadowGroupID, FallbackUsed: event.FallbackUsed, ErrorClass: event.ErrorClass, UpstreamStatus: event.UpstreamStatus, LatencyMS: event.LatencyMS, }) return err } func (s *sqliteRouteLogSink) AppendFailover(ctx context.Context, event RouteFailoverEvent) error { _, err := s.store.RouteFailoverEvents().Create(ctx, sqlite.RouteFailoverEvent{ RequestID: event.RequestID, LogicalGroupID: event.LogicalGroupID, PublicModel: event.PublicModel, FromRouteID: event.FromRouteID, ToRouteID: event.ToRouteID, Reason: event.Reason, FailureCount: event.FailureCount, }) return err } func (s *sqliteRouteLogSink) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error { _, err := s.store.RouteStickyAudit().Create(ctx, sqlite.RouteStickyAudit{ StickyKey: event.StickyKey, StickyKeyType: event.StickyKeyType, LogicalGroupID: event.LogicalGroupID, PublicModel: event.PublicModel, RouteID: event.RouteID, Action: event.Action, ExpiresAt: event.ExpiresAt, }) return err } func (s *sqliteRouteLogSink) Close() error { return s.store.Close() }