feat(log): H-03 日志 flush 错误监控

- 添加 ErrorMetrics 结构体记录 flush/write/drop 错误数
- 添加 ErrorHandler 回调接口用于自定义错误处理
- 在 AsyncLogWriterOptions 中支持配置错误处理器
- 在 flushBatch 中记录 flush 错误指标并回调错误处理器
- 在 enqueue fallback 路径中记录丢弃事件数
- 添加 Metrics() 方法暴露错误统计
This commit is contained in:
phamnazage-jpg
2026-06-02 06:51:14 +08:00
parent 97502b8a86
commit 8984451845

View File

@@ -69,11 +69,57 @@ type routeLogSink interface {
Close() error
}
type ErrorMetrics struct {
FlushErrors int64
WriteErrors int64
DroppedEvents int64
mu sync.RWMutex
}
func (e *ErrorMetrics) RecordFlushError() {
e.mu.Lock()
defer e.mu.Unlock()
e.FlushErrors++
}
func (e *ErrorMetrics) RecordWriteError() {
e.mu.Lock()
defer e.mu.Unlock()
e.WriteErrors++
}
func (e *ErrorMetrics) RecordDroppedEvent() {
e.mu.Lock()
defer e.mu.Unlock()
e.DroppedEvents++
}
func (e *ErrorMetrics) GetFlushErrors() int64 {
e.mu.RLock()
defer e.mu.RUnlock()
return e.FlushErrors
}
func (e *ErrorMetrics) GetWriteErrors() int64 {
e.mu.RLock()
defer e.mu.RUnlock()
return e.WriteErrors
}
func (e *ErrorMetrics) GetDroppedEvents() int64 {
e.mu.RLock()
defer e.mu.RUnlock()
return e.DroppedEvents
}
type ErrorHandler func(ctx context.Context, err error, eventType string)
type AsyncLogWriterOptions struct {
QueueSize int
FlushInterval time.Duration
MaxBatchSize int
FallbackWriteTimeout time.Duration
OnError ErrorHandler
}
type queuedLogEvent struct {
@@ -92,6 +138,8 @@ type AsyncLogWriter struct {
maxBatchSize int
fallbackWriteTimeout time.Duration
once sync.Once
onError ErrorHandler
metrics ErrorMetrics
}
func NewSQLiteLogWriter(ctx context.Context, sqliteDSN string, opts AsyncLogWriterOptions) (*AsyncLogWriter, error) {
@@ -125,6 +173,7 @@ func NewAsyncLogWriter(sink routeLogSink, opts AsyncLogWriterOptions) *AsyncLogW
flushInterval: opts.FlushInterval,
maxBatchSize: opts.MaxBatchSize,
fallbackWriteTimeout: opts.FallbackWriteTimeout,
onError: opts.OnError,
}
go writer.loop()
return writer
@@ -213,9 +262,15 @@ func (w *AsyncLogWriter) enqueue(ctx context.Context, item queuedLogEvent) error
case w.queue <- item:
return nil
default:
// Queue is full, use fallback direct write
w.metrics.RecordDroppedEvent()
fallbackCtx, cancel := context.WithTimeout(context.Background(), w.fallbackWriteTimeout)
defer cancel()
return w.writeOne(fallbackCtx, item)
err := w.writeOne(fallbackCtx, item)
if err != nil && w.onError != nil {
w.onError(fallbackCtx, err, w.getEventType(item))
}
return err
}
}
@@ -230,12 +285,43 @@ func (w *AsyncLogWriter) flushBatch(batch []queuedLogEvent) error {
if firstErr == nil {
firstErr = err
}
w.metrics.RecordFlushError()
log.Printf("routing: flush route log event failed: %v", err)
// Call error handler if configured
if w.onError != nil {
w.onError(context.Background(), err, w.getEventType(item))
}
}
}
if firstErr != nil {
w.metrics.RecordWriteError()
}
return firstErr
}
func (w *AsyncLogWriter) getEventType(item queuedLogEvent) string {
switch {
case item.decision != nil:
return "decision"
case item.failover != nil:
return "failover"
case item.sticky != nil:
return "sticky_audit"
default:
return "unknown"
}
}
// Metrics returns the error metrics for monitoring
func (w *AsyncLogWriter) Metrics() ErrorMetrics {
return ErrorMetrics{
FlushErrors: w.metrics.GetFlushErrors(),
WriteErrors: w.metrics.GetWriteErrors(),
DroppedEvents: w.metrics.GetDroppedEvents(),
}
}
func (w *AsyncLogWriter) drainQueue(batch []queuedLogEvent) []queuedLogEvent {
for {
select {