diff --git a/internal/routing/logwriter.go b/internal/routing/logwriter.go index a3dd3325..37b7bbd1 100644 --- a/internal/routing/logwriter.go +++ b/internal/routing/logwriter.go @@ -69,11 +69,57 @@ type routeLogSink interface { Close() error } +type ErrorMetrics struct { + FlushErrors int64 + WriteErrors int64 + DroppedEvents int64 + mu sync.RWMutex +} + +func (e *ErrorMetrics) RecordFlushError() { + e.mu.Lock() + defer e.mu.Unlock() + e.FlushErrors++ +} + +func (e *ErrorMetrics) RecordWriteError() { + e.mu.Lock() + defer e.mu.Unlock() + e.WriteErrors++ +} + +func (e *ErrorMetrics) RecordDroppedEvent() { + e.mu.Lock() + defer e.mu.Unlock() + e.DroppedEvents++ +} + +func (e *ErrorMetrics) GetFlushErrors() int64 { + e.mu.RLock() + defer e.mu.RUnlock() + return e.FlushErrors +} + +func (e *ErrorMetrics) GetWriteErrors() int64 { + e.mu.RLock() + defer e.mu.RUnlock() + return e.WriteErrors +} + +func (e *ErrorMetrics) GetDroppedEvents() int64 { + e.mu.RLock() + defer e.mu.RUnlock() + return e.DroppedEvents +} + +type ErrorHandler func(ctx context.Context, err error, eventType string) + type AsyncLogWriterOptions struct { QueueSize int FlushInterval time.Duration MaxBatchSize int FallbackWriteTimeout time.Duration + OnError ErrorHandler } type queuedLogEvent struct { @@ -92,6 +138,8 @@ type AsyncLogWriter struct { maxBatchSize int fallbackWriteTimeout time.Duration once sync.Once + onError ErrorHandler + metrics ErrorMetrics } func NewSQLiteLogWriter(ctx context.Context, sqliteDSN string, opts AsyncLogWriterOptions) (*AsyncLogWriter, error) { @@ -125,6 +173,7 @@ func NewAsyncLogWriter(sink routeLogSink, opts AsyncLogWriterOptions) *AsyncLogW flushInterval: opts.FlushInterval, maxBatchSize: opts.MaxBatchSize, fallbackWriteTimeout: opts.FallbackWriteTimeout, + onError: opts.OnError, } go writer.loop() return writer @@ -213,9 +262,15 @@ func (w *AsyncLogWriter) enqueue(ctx context.Context, item queuedLogEvent) error case w.queue <- item: return nil default: + // Queue is full, use fallback direct write + w.metrics.RecordDroppedEvent() fallbackCtx, cancel := context.WithTimeout(context.Background(), w.fallbackWriteTimeout) defer cancel() - return w.writeOne(fallbackCtx, item) + err := w.writeOne(fallbackCtx, item) + if err != nil && w.onError != nil { + w.onError(fallbackCtx, err, w.getEventType(item)) + } + return err } } @@ -230,12 +285,43 @@ func (w *AsyncLogWriter) flushBatch(batch []queuedLogEvent) error { if firstErr == nil { firstErr = err } + w.metrics.RecordFlushError() log.Printf("routing: flush route log event failed: %v", err) + + // Call error handler if configured + if w.onError != nil { + w.onError(context.Background(), err, w.getEventType(item)) + } } } + if firstErr != nil { + w.metrics.RecordWriteError() + } return firstErr } +func (w *AsyncLogWriter) getEventType(item queuedLogEvent) string { + switch { + case item.decision != nil: + return "decision" + case item.failover != nil: + return "failover" + case item.sticky != nil: + return "sticky_audit" + default: + return "unknown" + } +} + +// Metrics returns the error metrics for monitoring +func (w *AsyncLogWriter) Metrics() ErrorMetrics { + return ErrorMetrics{ + FlushErrors: w.metrics.GetFlushErrors(), + WriteErrors: w.metrics.GetWriteErrors(), + DroppedEvents: w.metrics.GetDroppedEvents(), + } +} + func (w *AsyncLogWriter) drainQueue(batch []queuedLogEvent) []queuedLogEvent { for { select {