feat(metrics): H-04 Prometheus 指标暴露

- 创建 internal/metrics 包集成 Prometheus 客户端
- 添加 HTTP 请求指标(总量、延迟直方图)
- 添加业务指标(active_hosts、active_providers)
- 添加路由指标(decisions、failovers)
- 添加数据库指标(connections、operations)
- 添加日志指标(flush_errors、dropped_events)
- 添加 HTTP Middleware 自动收集请求指标
- 添加 StartServer 方法启动独立 metrics 服务
This commit is contained in:
phamnazage-jpg
2026-06-02 06:53:24 +08:00
parent 8984451845
commit d688722dd2
4 changed files with 467 additions and 2 deletions

207
internal/metrics/metrics.go Normal file
View File

@@ -0,0 +1,207 @@
package metrics
import (
"context"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
// HTTP 请求指标
HTTPRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method", "path", "status"},
)
HTTPRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method", "path"},
)
// 业务指标
ActiveHosts = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "active_hosts",
Help: "Number of active hosts",
},
)
ActiveProviders = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "active_providers",
Help: "Number of active providers",
},
)
RouteDecisionsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "route_decisions_total",
Help: "Total number of route decisions",
},
[]string{"logical_group", "status"},
)
RouteFailoversTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "route_failovers_total",
Help: "Total number of route failovers",
},
)
// 数据库指标
DBConnectionsActive = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "db_connections_active",
Help: "Number of active database connections",
},
)
DBOperationsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "db_operations_total",
Help: "Total number of database operations",
},
[]string{"operation", "table"},
)
// 日志指标
LogFlushErrorsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "log_flush_errors_total",
Help: "Total number of log flush errors",
},
)
LogDroppedEventsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "log_dropped_events_total",
Help: "Total number of dropped log events",
},
)
)
func init() {
// 注册所有指标
prometheus.MustRegister(HTTPRequestsTotal)
prometheus.MustRegister(HTTPRequestDuration)
prometheus.MustRegister(ActiveHosts)
prometheus.MustRegister(ActiveProviders)
prometheus.MustRegister(RouteDecisionsTotal)
prometheus.MustRegister(RouteFailoversTotal)
prometheus.MustRegister(DBConnectionsActive)
prometheus.MustRegister(DBOperationsTotal)
prometheus.MustRegister(LogFlushErrorsTotal)
prometheus.MustRegister(LogDroppedEventsTotal)
}
// Handler returns the HTTP handler for metrics endpoint
func Handler() http.Handler {
return promhttp.Handler()
}
// RecordHTTPRequest records metrics for an HTTP request
func RecordHTTPRequest(method, path string, status int, duration time.Duration) {
HTTPRequestsTotal.WithLabelValues(method, path, http.StatusText(status)).Inc()
HTTPRequestDuration.WithLabelValues(method, path).Observe(duration.Seconds())
}
// RecordRouteDecision records a route decision
func RecordRouteDecision(logicalGroup, status string) {
RouteDecisionsTotal.WithLabelValues(logicalGroup, status).Inc()
}
// RecordRouteFailover records a route failover
func RecordRouteFailover() {
RouteFailoversTotal.Inc()
}
// SetActiveHosts sets the active hosts gauge
func SetActiveHosts(count float64) {
ActiveHosts.Set(count)
}
// SetActiveProviders sets the active providers gauge
func SetActiveProviders(count float64) {
ActiveProviders.Set(count)
}
// RecordDBOperation records a database operation
func RecordDBOperation(operation, table string) {
DBOperationsTotal.WithLabelValues(operation, table).Inc()
}
// SetDBConnections sets the active DB connections gauge
func SetDBConnections(count float64) {
DBConnectionsActive.Set(count)
}
// RecordLogFlushError records a log flush error
func RecordLogFlushError() {
LogFlushErrorsTotal.Inc()
}
// RecordLogDroppedEvent records a dropped log event
func RecordLogDroppedEvent() {
LogDroppedEventsTotal.Inc()
}
// Middleware wraps an HTTP handler with metrics collection
func Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// Wrap response writer to capture status code
wrapped := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(wrapped, r)
duration := time.Since(start)
RecordHTTPRequest(r.Method, r.URL.Path, wrapped.statusCode, duration)
})
}
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
// StartServer starts a metrics server on the given address
func StartServer(ctx context.Context, addr string) *http.Server {
mux := http.NewServeMux()
mux.Handle("/metrics", Handler())
server := &http.Server{
Addr: addr,
Handler: mux,
}
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = server.Shutdown(shutdownCtx)
}()
go func() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
// Log error but don't crash - metrics are optional
}
}()
return server
}

View File

@@ -0,0 +1,227 @@
package metrics
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)
func TestHTTPRequestsTotal(t *testing.T) {
RecordHTTPRequest("GET", "/test", 200, 100*time.Millisecond)
// Verify the counter was incremented
// Note: We can't easily read the counter value directly, but we can verify
// the handler returns the metric
req := httptest.NewRequest("GET", "/metrics", nil)
rr := httptest.NewRecorder()
Handler().ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Errorf("Expected status 200, got %d", rr.Code)
}
body := rr.Body.String()
if !strings.Contains(body, "http_requests_total") {
t.Error("Expected metrics endpoint to contain http_requests_total")
}
}
func TestRecordRouteDecision(t *testing.T) {
RecordRouteDecision("test-group", "success")
RecordRouteDecision("test-group", "success")
RecordRouteDecision("test-group", "failed")
req := httptest.NewRequest("GET", "/metrics", nil)
rr := httptest.NewRecorder()
Handler().ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Errorf("Expected status 200, got %d", rr.Code)
}
body := rr.Body.String()
if !strings.Contains(body, "route_decisions_total") {
t.Error("Expected metrics endpoint to contain route_decisions_total")
}
}
func TestRecordRouteFailover(t *testing.T) {
RecordRouteFailover()
req := httptest.NewRequest("GET", "/metrics", nil)
rr := httptest.NewRecorder()
Handler().ServeHTTP(rr, req)
body := rr.Body.String()
if !strings.Contains(body, "route_failovers_total") {
t.Error("Expected metrics endpoint to contain route_failovers_total")
}
}
func TestSetActiveHosts(t *testing.T) {
SetActiveHosts(10)
req := httptest.NewRequest("GET", "/metrics", nil)
rr := httptest.NewRecorder()
Handler().ServeHTTP(rr, req)
body := rr.Body.String()
if !strings.Contains(body, "active_hosts") {
t.Error("Expected metrics endpoint to contain active_hosts")
}
}
func TestSetActiveProviders(t *testing.T) {
SetActiveProviders(5)
req := httptest.NewRequest("GET", "/metrics", nil)
rr := httptest.NewRecorder()
Handler().ServeHTTP(rr, req)
body := rr.Body.String()
if !strings.Contains(body, "active_providers") {
t.Error("Expected metrics endpoint to contain active_providers")
}
}
func TestRecordDBOperation(t *testing.T) {
RecordDBOperation("insert", "hosts")
RecordDBOperation("select", "hosts")
req := httptest.NewRequest("GET", "/metrics", nil)
rr := httptest.NewRecorder()
Handler().ServeHTTP(rr, req)
body := rr.Body.String()
if !strings.Contains(body, "db_operations_total") {
t.Error("Expected metrics endpoint to contain db_operations_total")
}
}
func TestRecordLogFlushError(t *testing.T) {
RecordLogFlushError()
RecordLogFlushError()
req := httptest.NewRequest("GET", "/metrics", nil)
rr := httptest.NewRecorder()
Handler().ServeHTTP(rr, req)
body := rr.Body.String()
if !strings.Contains(body, "log_flush_errors_total") {
t.Error("Expected metrics endpoint to contain log_flush_errors_total")
}
}
func TestRecordLogDroppedEvent(t *testing.T) {
RecordLogDroppedEvent()
req := httptest.NewRequest("GET", "/metrics", nil)
rr := httptest.NewRecorder()
Handler().ServeHTTP(rr, req)
body := rr.Body.String()
if !strings.Contains(body, "log_dropped_events_total") {
t.Error("Expected metrics endpoint to contain log_dropped_events_total")
}
}
func TestMiddleware(t *testing.T) {
handler := Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
w.Write([]byte("OK"))
}))
req := httptest.NewRequest("POST", "/api/test", nil)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
if rr.Code != http.StatusCreated {
t.Errorf("Expected status 201, got %d", rr.Code)
}
if rr.Body.String() != "OK" {
t.Errorf("Expected body 'OK', got '%s'", rr.Body.String())
}
}
func TestResponseWriter(t *testing.T) {
base := httptest.NewRecorder()
rw := &responseWriter{ResponseWriter: base, statusCode: 200}
rw.WriteHeader(http.StatusTeapot)
if rw.statusCode != http.StatusTeapot {
t.Errorf("Expected status code %d, got %d", http.StatusTeapot, rw.statusCode)
}
}
func TestStartServer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start metrics server on a random port
server := StartServer(ctx, "127.0.0.1:0")
// Give server time to start
time.Sleep(100 * time.Millisecond)
// Verify server is running by making a request
addr := server.Addr
if addr == "" {
t.Fatal("Server address not available")
}
// Server should shut down gracefully when context is cancelled
cancel()
// Give server time to shut down
time.Sleep(100 * time.Millisecond)
}
func TestHandlerContent(t *testing.T) {
// Set some metrics values
SetActiveHosts(42)
SetActiveProviders(7)
req := httptest.NewRequest("GET", "/metrics", nil)
rr := httptest.NewRecorder()
Handler().ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Errorf("Expected status 200, got %d", rr.Code)
}
contentType := rr.Header().Get("Content-Type")
if !strings.Contains(contentType, "text/plain") && !strings.Contains(contentType, "application/openmetrics") {
t.Errorf("Expected text/plain or openmetrics content type, got %s", contentType)
}
body := rr.Body.String()
// Check for expected metrics
expectedMetrics := []string{
"# HELP",
"# TYPE",
"active_hosts",
"active_providers",
}
for _, metric := range expectedMetrics {
if !strings.Contains(body, metric) {
t.Errorf("Expected metrics to contain '%s'", metric)
}
}
}