From 89844518457f82b07ca5940ce71cc4240a0d01e1 Mon Sep 17 00:00:00 2001 From: phamnazage-jpg Date: Tue, 2 Jun 2026 06:51:14 +0800 Subject: [PATCH] =?UTF-8?q?feat(log):=20H-03=20=E6=97=A5=E5=BF=97=20flush?= =?UTF-8?q?=20=E9=94=99=E8=AF=AF=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加 ErrorMetrics 结构体记录 flush/write/drop 错误数 - 添加 ErrorHandler 回调接口用于自定义错误处理 - 在 AsyncLogWriterOptions 中支持配置错误处理器 - 在 flushBatch 中记录 flush 错误指标并回调错误处理器 - 在 enqueue fallback 路径中记录丢弃事件数 - 添加 Metrics() 方法暴露错误统计 --- internal/routing/logwriter.go | 88 ++++++++++++++++++++++++++++++++++- 1 file changed, 87 insertions(+), 1 deletion(-) diff --git a/internal/routing/logwriter.go b/internal/routing/logwriter.go index a3dd3325..37b7bbd1 100644 --- a/internal/routing/logwriter.go +++ b/internal/routing/logwriter.go @@ -69,11 +69,57 @@ type routeLogSink interface { Close() error } +type ErrorMetrics struct { + FlushErrors int64 + WriteErrors int64 + DroppedEvents int64 + mu sync.RWMutex +} + +func (e *ErrorMetrics) RecordFlushError() { + e.mu.Lock() + defer e.mu.Unlock() + e.FlushErrors++ +} + +func (e *ErrorMetrics) RecordWriteError() { + e.mu.Lock() + defer e.mu.Unlock() + e.WriteErrors++ +} + +func (e *ErrorMetrics) RecordDroppedEvent() { + e.mu.Lock() + defer e.mu.Unlock() + e.DroppedEvents++ +} + +func (e *ErrorMetrics) GetFlushErrors() int64 { + e.mu.RLock() + defer e.mu.RUnlock() + return e.FlushErrors +} + +func (e *ErrorMetrics) GetWriteErrors() int64 { + e.mu.RLock() + defer e.mu.RUnlock() + return e.WriteErrors +} + +func (e *ErrorMetrics) GetDroppedEvents() int64 { + e.mu.RLock() + defer e.mu.RUnlock() + return e.DroppedEvents +} + +type ErrorHandler func(ctx context.Context, err error, eventType string) + type AsyncLogWriterOptions struct { QueueSize int FlushInterval time.Duration MaxBatchSize int FallbackWriteTimeout time.Duration + OnError ErrorHandler } type queuedLogEvent struct { @@ -92,6 +138,8 @@ type AsyncLogWriter struct { maxBatchSize int fallbackWriteTimeout time.Duration once sync.Once + onError ErrorHandler + metrics ErrorMetrics } func NewSQLiteLogWriter(ctx context.Context, sqliteDSN string, opts AsyncLogWriterOptions) (*AsyncLogWriter, error) { @@ -125,6 +173,7 @@ func NewAsyncLogWriter(sink routeLogSink, opts AsyncLogWriterOptions) *AsyncLogW flushInterval: opts.FlushInterval, maxBatchSize: opts.MaxBatchSize, fallbackWriteTimeout: opts.FallbackWriteTimeout, + onError: opts.OnError, } go writer.loop() return writer @@ -213,9 +262,15 @@ func (w *AsyncLogWriter) enqueue(ctx context.Context, item queuedLogEvent) error case w.queue <- item: return nil default: + // Queue is full, use fallback direct write + w.metrics.RecordDroppedEvent() fallbackCtx, cancel := context.WithTimeout(context.Background(), w.fallbackWriteTimeout) defer cancel() - return w.writeOne(fallbackCtx, item) + err := w.writeOne(fallbackCtx, item) + if err != nil && w.onError != nil { + w.onError(fallbackCtx, err, w.getEventType(item)) + } + return err } } @@ -230,12 +285,43 @@ func (w *AsyncLogWriter) flushBatch(batch []queuedLogEvent) error { if firstErr == nil { firstErr = err } + w.metrics.RecordFlushError() log.Printf("routing: flush route log event failed: %v", err) + + // Call error handler if configured + if w.onError != nil { + w.onError(context.Background(), err, w.getEventType(item)) + } } } + if firstErr != nil { + w.metrics.RecordWriteError() + } return firstErr } +func (w *AsyncLogWriter) getEventType(item queuedLogEvent) string { + switch { + case item.decision != nil: + return "decision" + case item.failover != nil: + return "failover" + case item.sticky != nil: + return "sticky_audit" + default: + return "unknown" + } +} + +// Metrics returns the error metrics for monitoring +func (w *AsyncLogWriter) Metrics() ErrorMetrics { + return ErrorMetrics{ + FlushErrors: w.metrics.GetFlushErrors(), + WriteErrors: w.metrics.GetWriteErrors(), + DroppedEvents: w.metrics.GetDroppedEvents(), + } +} + func (w *AsyncLogWriter) drainQueue(batch []queuedLogEvent) []queuedLogEvent { for { select {