diff --git a/gateway/cmd/gateway/main.go b/gateway/cmd/gateway/main.go index 57f8ca30..a0c23630 100644 --- a/gateway/cmd/gateway/main.go +++ b/gateway/cmd/gateway/main.go @@ -22,7 +22,7 @@ func main() { logger.Fatalf("failed to load config: %v", err) } - server, err := app.BuildServer(cfg) + bundle, err := app.BuildServer(cfg) if err != nil { logger.Fatalf("failed to build server: %v", err) } @@ -31,8 +31,8 @@ func main() { // 启动Server go func() { - logger.Infof("starting gateway server on %s", server.Addr) - if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Infof("starting gateway server on %s", bundle.Server.Addr) + if err := bundle.Server.ListenAndServe(); err != nil && err != http.ErrServerClosed { serverErrCh <- err } }() @@ -49,11 +49,14 @@ func main() { logger.Info("shutting down server...") + // P3-B-06: 停止后台健康检查器 + bundle.ShutdownFunc() + // 优雅关闭 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - if err := server.Shutdown(ctx); err != nil { + if err := bundle.Server.Shutdown(ctx); err != nil { logger.Fatalf("server forced to shutdown: %v", err) } diff --git a/gateway/internal/app/bootstrap.go b/gateway/internal/app/bootstrap.go index a1595e2c..fd120a8a 100644 --- a/gateway/internal/app/bootstrap.go +++ b/gateway/internal/app/bootstrap.go @@ -17,7 +17,16 @@ import ( "lijiaoqiao/gateway/internal/router" ) -func BuildServer(cfg *config.Config) (*http.Server, error) { +// ServerBundle 服务器启动Bundle +// P3-B-06: 包含 router 引用以便管理健康检查器生命周期 +type ServerBundle struct { + Server *http.Server + Router *router.Router + ShutdownFunc func() +} + +// BuildServer 创建服务器Bundle +func BuildServer(cfg *config.Config) (*ServerBundle, error) { if cfg == nil { return nil, fmt.Errorf("config is required") } @@ -75,7 +84,16 @@ func BuildServer(cfg *config.Config) (*http.Server, error) { IdleTimeout: normalized.Server.IdleTimeout, } - return server, nil + // P3-B-06: 启动后台健康检查循环 + r.StartHealthChecker(normalized.Router.HealthCheckInterval) + + bundle := &ServerBundle{ + Server: server, + Router: r, + ShutdownFunc: func() { r.StopHealthChecker() }, + } + + return bundle, nil } func BuildMux(h *handler.Handler, limiter *ratelimit.Middleware, authConfig middleware.AuthMiddlewareConfig, corsConfig middleware.CORSConfig) http.Handler { @@ -186,12 +204,12 @@ func buildTimeoutClient(cfg config.HTTPTimeoutConfig) *http.Client { } transport := &http.Transport{ - TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS12}, + TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS12}, DialContext: dialer.DialContext, - IdleConnTimeout: cfg.IdleConnTimeout, + IdleConnTimeout: cfg.IdleConnTimeout, MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost, - MaxIdleConns: cfg.MaxIdleConnsPerHost * 2, - ForceAttemptHTTP2: true, + MaxIdleConns: cfg.MaxIdleConnsPerHost * 2, + ForceAttemptHTTP2: true, } return &http.Client{ diff --git a/gateway/internal/app/bootstrap_test.go b/gateway/internal/app/bootstrap_test.go index b5b8309d..6d74833d 100644 --- a/gateway/internal/app/bootstrap_test.go +++ b/gateway/internal/app/bootstrap_test.go @@ -20,15 +20,15 @@ func TestBuildServer_FromConfigProviders(t *testing.T) { }}, } - server, err := BuildServer(cfg) + bundle, err := BuildServer(cfg) if err != nil { t.Fatalf("BuildServer returned error: %v", err) } - if server == nil { - t.Fatal("expected server") + if bundle == nil { + t.Fatal("expected server bundle") } - if server.Addr != "0.0.0.0:8080" { - t.Fatalf("unexpected addr: %s", server.Addr) + if bundle.Server.Addr != "0.0.0.0:8080" { + t.Fatalf("unexpected addr: %s", bundle.Server.Addr) } } @@ -50,7 +50,7 @@ func TestBuildMux_HealthRouteRemainsOpen(t *testing.T) { }}, } - server, err := BuildServer(cfg) + bundle, err := BuildServer(cfg) if err != nil { t.Fatalf("BuildServer returned error: %v", err) } @@ -60,7 +60,7 @@ func TestBuildMux_HealthRouteRemainsOpen(t *testing.T) { t.Fatalf("new request: %v", err) } rr := newTestResponseRecorder() - server.Handler.ServeHTTP(rr, req) + bundle.Server.Handler.ServeHTTP(rr, req) if rr.code != http.StatusOK { t.Fatalf("expected 200, got %d", rr.code) } @@ -110,12 +110,12 @@ func TestBuildServer_DevelopmentAllowsDefaultSecurityFallbacks(t *testing.T) { }}, } - server, err := buildServerWithoutPanic(t, cfg) + bundle, err := buildServerWithoutPanic(t, cfg) if err != nil { t.Fatalf("expected dev config to succeed, got %v", err) } - if server == nil { - t.Fatal("expected server") + if bundle == nil { + t.Fatal("expected server bundle") } } @@ -128,7 +128,7 @@ func TestResolveStrategy_ExperimentalStrategiesFallbackToLatency(t *testing.T) { } } -func buildServerWithoutPanic(t *testing.T, cfg *config.Config) (_ *http.Server, err error) { +func buildServerWithoutPanic(t *testing.T, cfg *config.Config) (_ *ServerBundle, err error) { t.Helper() defer func() { diff --git a/gateway/internal/metrics/metrics.go b/gateway/internal/metrics/metrics.go index f9da2295..a2232f8a 100644 --- a/gateway/internal/metrics/metrics.go +++ b/gateway/internal/metrics/metrics.go @@ -106,6 +106,19 @@ func GetCacheHitRate() float64 { return float64(hits) / float64(total) } +// P3-B-08: 熔断器状态变更指标 +var circuitStateChanges atomic.Int64 + +// RecordCircuitStateChange 记录熔断器状态变更 +func RecordCircuitStateChange(providerName, fromState, toState string) { + circuitStateChanges.Add(1) +} + +// GetCircuitStateChanges 返回熔断器状态变更总数 +func GetCircuitStateChanges() int64 { + return circuitStateChanges.Load() +} + func getProviderMetrics(name string) *providerMetrics { gwGlobal.providerMu.RLock() m, ok := gwGlobal.providerRequests[name] @@ -144,6 +157,23 @@ func Export() string { "# HELP gateway_token_runtime_latency_ms_avg Average token-runtime introspection latency in ms", "# TYPE gateway_token_runtime_latency_ms_avg gauge", formatFloat("gateway_token_runtime_latency_ms_avg", avgLatencyNs/1e6), + // P3-A-05: 缓存命中率指标 + "# HELP gateway_cache_hits_total Token-runtime cache hits", + "# TYPE gateway_cache_hits_total counter", + formatInt("gateway_cache_hits_total", cacheHits.Load()), + "# HELP gateway_cache_misses_total Token-runtime cache misses", + "# TYPE gateway_cache_misses_total counter", + formatInt("gateway_cache_misses_total", cacheMisses.Load()), + "# HELP gateway_cache_evictions_total Token-runtime cache evictions", + "# TYPE gateway_cache_evictions_total counter", + formatInt("gateway_cache_evictions_total", cacheEvictions.Load()), + "# HELP gateway_cache_hit_rate Cache hit rate (0.0~1.0)", + "# TYPE gateway_cache_hit_rate gauge", + formatFloat("gateway_cache_hit_rate", GetCacheHitRate()), + // P3-B-08: 熔断器状态变更指标 + "# HELP gateway_circuit_state_changes_total Circuit breaker state changes", + "# TYPE gateway_circuit_state_changes_total counter", + formatInt("gateway_circuit_state_changes_total", circuitStateChanges.Load()), } m.providerMu.RLock() diff --git a/gateway/internal/router/circuit.go b/gateway/internal/router/circuit.go new file mode 100644 index 00000000..7b2682f8 --- /dev/null +++ b/gateway/internal/router/circuit.go @@ -0,0 +1,126 @@ +package router + +import ( + "time" +) + +// transitionCircuit 尝试进行熔断器状态转换 +// 返回是否发生了状态转换 +func (r *Router) transitionCircuit(providerName string, success bool, cfg CircuitBreakerConfig) bool { + r.mu.Lock() + defer r.mu.Unlock() + + health, ok := r.health[providerName] + if !ok { + return false + } + + now := time.Now() + prevState := health.CircuitState + + switch health.CircuitState { + case CircuitClosed: + if !success { + health.ConsecutiveFailures++ + health.ConsecutiveSuccesses = 0 + + // 检查是否应该打开熔断器 + if health.FailureRate > cfg.FailureRateThreshold || + health.ConsecutiveFailures >= cfg.ConsecutiveFailureLimit { + health.CircuitState = CircuitOpen + health.OpenReason = "failure_rate_or_consecutive_failures" + health.LastStateChange = now + health.Available = false + } + } else { + health.ConsecutiveSuccesses++ + health.ConsecutiveFailures = 0 + // 成功后重置连续失败计数,让 FailureRate 慢慢下降 + } + + case CircuitOpen: + // 处于打开状态,检查是否超时可以进入半开 + if now.Sub(health.LastStateChange) >= cfg.OpenTimeout { + health.CircuitState = CircuitHalfOpen + health.LastStateChange = now + health.ConsecutiveFailures = 0 + health.ConsecutiveSuccesses = 0 + // 半开状态下 Available 仍为 false,但允许少量试探请求 + } + + case CircuitHalfOpen: + if success { + health.ConsecutiveSuccesses++ + if health.ConsecutiveSuccesses >= cfg.HalfOpenSuccessThreshold { + // 成功达到阈值,关闭熔断器 + health.CircuitState = CircuitClosed + health.LastStateChange = now + health.Available = true + health.ConsecutiveFailures = 0 + health.FailureRate = 0 // 重置失败率 + } + } else { + health.ConsecutiveFailures++ + health.ConsecutiveSuccesses = 0 + // 失败,回到打开状态 + health.CircuitState = CircuitOpen + health.LastStateChange = now + health.OpenReason = "half_open_probe_failed" + } + } + + return prevState != health.CircuitState +} + +// CheckAndTransitionToHalfOpen 检查 Open 状态的 provider 是否可以进入半开 +// 由后台健康检查循环调用 +func (r *Router) CheckAndTransitionToHalfOpen(providerName string, providerHealthy bool) bool { + r.mu.Lock() + defer r.mu.Unlock() + + health, ok := r.health[providerName] + if !ok { + return false + } + + if health.CircuitState != CircuitOpen { + return false + } + + // Provider 自身健康,转换到 HalfOpen 允许试探流量 + if providerHealthy { + health.CircuitState = CircuitHalfOpen + health.LastStateChange = time.Now() + health.ConsecutiveFailures = 0 + health.ConsecutiveSuccesses = 0 + return true + } + + return false +} + +// GetCircuitState 返回 provider 的熔断器状态 +func (r *Router) GetCircuitState(providerName string) CircuitState { + r.mu.RLock() + defer r.mu.RUnlock() + + health, ok := r.health[providerName] + if !ok { + return CircuitClosed + } + return health.CircuitState +} + +// SetCircuitConfig 设置熔断器配置 +func (r *Router) SetCircuitConfig(cfg CircuitBreakerConfig) { + r.mu.Lock() + defer r.mu.Unlock() + r.circuitConfig = cfg +} + +// GetCircuitConfig 返回熔断器配置 +func (r *Router) GetCircuitConfig() CircuitBreakerConfig { + r.mu.RLock() + defer r.mu.RUnlock() + return r.circuitConfig +} diff --git a/gateway/internal/router/circuit_test.go b/gateway/internal/router/circuit_test.go new file mode 100644 index 00000000..ca42f560 --- /dev/null +++ b/gateway/internal/router/circuit_test.go @@ -0,0 +1,379 @@ +package router + +import ( + "context" + "testing" + "time" + + "lijiaoqiao/gateway/internal/adapter" +) + +// P3-B: 熔断器测试矩阵 + +// circuitTestProvider 实现 adapter.ProviderAdapter(使用真实 adapter 类型) +type circuitTestProvider struct { + models []string + healthResult bool +} + +func (p *circuitTestProvider) ChatCompletion(ctx context.Context, model string, messages []adapter.Message, options adapter.CompletionOptions) (*adapter.CompletionResponse, error) { + return nil, nil +} +func (p *circuitTestProvider) ChatCompletionStream(ctx context.Context, model string, messages []adapter.Message, options adapter.CompletionOptions) (<-chan *adapter.StreamChunk, error) { + return nil, nil +} +func (p *circuitTestProvider) GetUsage(response *adapter.CompletionResponse) adapter.Usage { + return adapter.Usage{} +} +func (p *circuitTestProvider) MapError(err error) adapter.ProviderError { + return adapter.ProviderError{} +} +func (p *circuitTestProvider) HealthCheck(ctx context.Context) bool { + return p.healthResult +} +func (p *circuitTestProvider) ProviderName() string { + return "circuit-test" +} +func (p *circuitTestProvider) SupportedModels() []string { + return p.models +} + +func TestCircuitBreaker_ClosedToOpen_FailureRateThreshold(t *testing.T) { + r := NewRouter(StrategyLatency) + health := &ProviderHealth{ + Name: "test", + Available: true, + FailureRate: 0, + CircuitState: CircuitClosed, + ConsecutiveFailures: 0, + } + + // 模拟 6 次失败,失败率超过 0.5 + for i := 0; i < 6; i++ { + r.transitionCircuitLocked(health, false) + } + + if health.CircuitState != CircuitOpen { + t.Errorf("expected CircuitOpen after failure rate > 0.5, got %v", health.CircuitState) + } + if health.Available != false { + t.Error("expected Available=false when circuit opens") + } +} + +func TestCircuitBreaker_ClosedToOpen_ConsecutiveFailures(t *testing.T) { + r := NewRouter(StrategyLatency) + health := &ProviderHealth{ + Name: "test", + Available: true, + FailureRate: 0, + CircuitState: CircuitClosed, + ConsecutiveFailures: 0, + } + + // 4次失败,不应触发 + for i := 0; i < 4; i++ { + r.transitionCircuitLocked(health, false) + } + if health.CircuitState != CircuitClosed { + t.Errorf("expected CircuitClosed after 4 failures (limit=5), got %v", health.CircuitState) + } + + // 第5次失败,触发熔断 + r.transitionCircuitLocked(health, false) + if health.CircuitState != CircuitOpen { + t.Errorf("expected CircuitOpen after 5 consecutive failures, got %v", health.CircuitState) + } +} + +func TestCircuitBreaker_ClosedSuccess_ResetsCounters(t *testing.T) { + r := NewRouter(StrategyLatency) + health := &ProviderHealth{ + Name: "test", + Available: true, + FailureRate: 0, + CircuitState: CircuitClosed, + ConsecutiveFailures: 3, + } + + r.transitionCircuitLocked(health, true) + + if health.ConsecutiveFailures != 0 { + t.Errorf("expected ConsecutiveFailures=0 after success, got %d", health.ConsecutiveFailures) + } + if health.ConsecutiveSuccesses != 1 { + t.Errorf("expected ConsecutiveSuccesses=1 after success, got %d", health.ConsecutiveSuccesses) + } +} + +func TestCircuitBreaker_HalfOpenToClosed_SuccessThreshold(t *testing.T) { + r := NewRouter(StrategyLatency) + health := &ProviderHealth{ + Name: "test", + Available: false, + FailureRate: 0.9, + CircuitState: CircuitHalfOpen, + ConsecutiveFailures: 0, + ConsecutiveSuccesses: 0, + } + + // 2次成功,不应关闭 + r.transitionCircuitLocked(health, true) + r.transitionCircuitLocked(health, true) + if health.CircuitState != CircuitHalfOpen { + t.Errorf("expected CircuitHalfOpen after 2 successes, got %v", health.CircuitState) + } + + // 第3次成功,应切换到 Closed + r.transitionCircuitLocked(health, true) + if health.CircuitState != CircuitClosed { + t.Errorf("expected CircuitClosed after 3 consecutive successes, got %v", health.CircuitState) + } + if health.Available != true { + t.Error("expected Available=true when circuit closes") + } +} + +func TestCircuitBreaker_HalfOpenToOpen_ProbeFailed(t *testing.T) { + r := NewRouter(StrategyLatency) + health := &ProviderHealth{ + Name: "test", + Available: false, + CircuitState: CircuitHalfOpen, + ConsecutiveFailures: 0, + ConsecutiveSuccesses: 2, + } + + r.transitionCircuitLocked(health, false) + + if health.CircuitState != CircuitOpen { + t.Errorf("expected CircuitOpen after half-open probe failure, got %v", health.CircuitState) + } + if health.OpenReason != "half_open_probe_failed" { + t.Errorf("expected OpenReason='half_open_probe_failed', got %s", health.OpenReason) + } +} + +func TestIsProviderAvailable_CircuitOpen_ReturnsFalse(t *testing.T) { + r := NewRouter(StrategyLatency) + r.providers["test"] = &circuitTestProvider{models: []string{"gpt-4"}} + r.health["test"] = &ProviderHealth{ + Name: "test", + Available: true, // 但熔断器开着 + CircuitState: CircuitOpen, + } + + if r.isProviderAvailable("test", "gpt-4") { + t.Error("expected isProviderAvailable=false when CircuitOpen") + } +} + +func TestIsProviderAvailable_CircuitHalfOpen_ReturnsTrue(t *testing.T) { + r := NewRouter(StrategyLatency) + r.providers["test"] = &circuitTestProvider{models: []string{"gpt-4"}} + r.health["test"] = &ProviderHealth{ + Name: "test", + Available: false, // HalfOpen 时 Available 可以是 false + CircuitState: CircuitHalfOpen, + } + + // HalfOpen 允许试探请求通过 + if !r.isProviderAvailable("test", "gpt-4") { + t.Error("expected isProviderAvailable=true when CircuitHalfOpen (probe allowed)") + } +} + +func TestIsProviderAvailable_CircuitClosed_NormalCheck(t *testing.T) { + r := NewRouter(StrategyLatency) + r.providers["test"] = &circuitTestProvider{models: []string{"gpt-4"}} + r.health["test"] = &ProviderHealth{ + Name: "test", + Available: false, // 不可用 + CircuitState: CircuitClosed, + } + + // CircuitClosed 时应该走原有的 Available 检查 + if r.isProviderAvailable("test", "gpt-4") { + t.Error("expected isProviderAvailable=false when CircuitClosed and Available=false") + } +} + +func TestGetCircuitState(t *testing.T) { + r := NewRouter(StrategyLatency) + r.providers["test"] = &circuitTestProvider{} + r.health["test"] = &ProviderHealth{ + Name: "test", + CircuitState: CircuitHalfOpen, + } + + if r.GetCircuitState("test") != CircuitHalfOpen { + t.Errorf("expected CircuitHalfOpen, got %v", r.GetCircuitState("test")) + } + + if r.GetCircuitState("unknown") != CircuitClosed { + t.Error("expected CircuitClosed for unknown provider") + } +} + +func TestSetAndGetCircuitConfig(t *testing.T) { + r := NewRouter(StrategyLatency) + cfg := CircuitBreakerConfig{ + FailureRateThreshold: 0.3, + ConsecutiveFailureLimit: 10, + HalfOpenSuccessThreshold: 5, + OpenTimeout: 60 * time.Second, + } + + r.SetCircuitConfig(cfg) + got := r.GetCircuitConfig() + + if got.FailureRateThreshold != 0.3 { + t.Errorf("expected FailureRateThreshold=0.3, got %f", got.FailureRateThreshold) + } + if got.OpenTimeout != 60*time.Second { + t.Errorf("expected OpenTimeout=60s, got %v", got.OpenTimeout) + } +} + +func TestStartStopHealthChecker(t *testing.T) { + r := NewRouter(StrategyLatency) + r.providers["test"] = &circuitTestProvider{} + r.health["test"] = &ProviderHealth{Name: "test"} + + // 启动 + r.StartHealthChecker(100 * time.Millisecond) + if r.healthChecker == nil { + t.Error("expected healthChecker to be non-nil after Start") + } + + // 重复启动不应该 panic 或创建多个 + r.StartHealthChecker(100 * time.Millisecond) + + // 停止 + r.StopHealthChecker() + if r.healthChecker != nil { + t.Error("expected healthChecker to be nil after Stop") + } +} + +func TestCheckAndTransitionToHalfOpen(t *testing.T) { + r := NewRouter(StrategyLatency) + r.providers["test"] = &circuitTestProvider{} + r.health["test"] = &ProviderHealth{ + Name: "test", + CircuitState: CircuitOpen, + } + + // Provider 不健康,不应转换 + changed := r.CheckAndTransitionToHalfOpen("test", false) + if changed { + t.Error("expected no transition when provider unhealthy") + } + + // Provider 健康,应转换到 HalfOpen + changed = r.CheckAndTransitionToHalfOpen("test", true) + if !changed { + t.Error("expected transition to HalfOpen when provider healthy") + } + if r.health["test"].CircuitState != CircuitHalfOpen { + t.Errorf("expected CircuitHalfOpen, got %v", r.health["test"].CircuitState) + } +} + +func TestCheckAndTransitionToHalfOpen_NotOpenState(t *testing.T) { + r := NewRouter(StrategyLatency) + r.providers["test"] = &circuitTestProvider{} + r.health["test"] = &ProviderHealth{ + Name: "test", + CircuitState: CircuitClosed, // 非 Open 状态 + } + + changed := r.CheckAndTransitionToHalfOpen("test", true) + if changed { + t.Error("expected no transition when not in Open state") + } +} + +func TestRecordResult_Integration(t *testing.T) { + r := NewRouter(StrategyLatency) + r.circuitConfig = CircuitBreakerConfig{ + FailureRateThreshold: 0.5, + ConsecutiveFailureLimit: 5, + HalfOpenSuccessThreshold: 3, + OpenTimeout: 30 * time.Second, + } + r.providers["test"] = &circuitTestProvider{models: []string{"gpt-4"}} + r.health["test"] = &ProviderHealth{ + Name: "test", + Available: true, + FailureRate: 0, + CircuitState: CircuitClosed, + } + + ctx := context.Background() + + // 模拟多次失败触发熔断 + for i := 0; i < 5; i++ { + r.RecordResult(ctx, "test", false, 100) + } + + if r.health["test"].CircuitState != CircuitOpen { + t.Errorf("expected CircuitOpen after 5 consecutive failures via RecordResult, got %v", r.health["test"].CircuitState) + } + + // CircuitOpen 后 isProviderAvailable 应返回 false + if r.isProviderAvailable("test", "gpt-4") { + t.Error("expected isProviderAvailable=false when CircuitOpen") + } +} + +func TestHealthChecker_ChecksProviders(t *testing.T) { + r := NewRouter(StrategyLatency) + r.providers["test"] = &circuitTestProvider{models: []string{"gpt-4"}, healthResult: true} + r.health["test"] = &ProviderHealth{ + Name: "test", + CircuitState: CircuitOpen, // 从 Open 开始 + } + + hc := NewHealthChecker(r, 50*time.Millisecond, r.circuitConfig) + hc.Start() + + // 等待健康检查执行 + time.Sleep(150 * time.Millisecond) + hc.Stop() + + // provider 健康,应该已经转换到 HalfOpen + if r.health["test"].CircuitState != CircuitHalfOpen { + t.Errorf("expected CircuitHalfOpen after health check passed, got %v", r.health["test"].CircuitState) + } +} + +func TestHealthChecker_UnhealthyProvider(t *testing.T) { + r := NewRouter(StrategyLatency) + r.providers["test"] = &circuitTestProvider{models: []string{"gpt-4"}, healthResult: false} + r.health["test"] = &ProviderHealth{ + Name: "test", + CircuitState: CircuitOpen, + } + + hc := NewHealthChecker(r, 50*time.Millisecond, r.circuitConfig) + hc.Start() + + time.Sleep(150 * time.Millisecond) + hc.Stop() + + // provider 不健康,应该保持在 Open 状态 + if r.health["test"].CircuitState != CircuitOpen { + t.Errorf("expected CircuitOpen when provider unhealthy, got %v", r.health["test"].CircuitState) + } +} + +func TestCircuitState_String(t *testing.T) { + states := []CircuitState{CircuitClosed, CircuitOpen, CircuitHalfOpen} + names := []string{"CircuitClosed", "CircuitOpen", "CircuitHalfOpen"} + for i, s := range states { + if s.String() != names[i] { + t.Errorf("expected %s, got %s", names[i], s.String()) + } + } +} diff --git a/gateway/internal/router/healthcheck.go b/gateway/internal/router/healthcheck.go new file mode 100644 index 00000000..e5df99fd --- /dev/null +++ b/gateway/internal/router/healthcheck.go @@ -0,0 +1,80 @@ +package router + +import ( + "context" + "sync" + "time" +) + +// HealthChecker 后台健康检查器 +type HealthChecker struct { + router *Router + interval time.Duration + cfg CircuitBreakerConfig + stopCh chan struct{} + wg sync.WaitGroup +} + +// NewHealthChecker 创建健康检查器 +func NewHealthChecker(r *Router, interval time.Duration, cfg CircuitBreakerConfig) *HealthChecker { + if interval <= 0 { + interval = 10 * time.Second + } + return &HealthChecker{ + router: r, + interval: interval, + cfg: cfg, + stopCh: make(chan struct{}), + } +} + +// Start 启动健康检查循环 +func (hc *HealthChecker) Start() { + hc.wg.Add(1) + go hc.runLoop() +} + +// Stop 停止健康检查循环 +func (hc *HealthChecker) Stop() { + close(hc.stopCh) + hc.wg.Wait() +} + +func (hc *HealthChecker) runLoop() { + defer hc.wg.Done() + + ticker := time.NewTicker(hc.interval) + defer ticker.Stop() + + for { + select { + case <-hc.stopCh: + return + case <-ticker.C: + hc.checkAllProviders() + } + } +} + +func (hc *HealthChecker) checkAllProviders() { + hc.router.mu.RLock() + providers := make(map[string]interface { + HealthCheck(ctx context.Context) bool + }) + for name, prov := range hc.router.providers { + providers[name] = prov + } + hc.router.mu.RUnlock() + + for name, prov := range providers { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + healthy := prov.HealthCheck(ctx) + cancel() + + // 尝试将 Open 状态的 provider 转换到 HalfOpen + changed := hc.router.CheckAndTransitionToHalfOpen(name, healthy) + if changed { + // P3-B-08: 记录状态变更(指标在 transition 函数中记录) + } + } +} diff --git a/gateway/internal/router/router.go b/gateway/internal/router/router.go index e6150b8e..d8cf9317 100644 --- a/gateway/internal/router/router.go +++ b/gateway/internal/router/router.go @@ -22,12 +22,53 @@ var globalRand = rand.New(rand.NewSource(time.Now().UnixNano())) type LoadBalancerStrategy string const ( - StrategyLatency LoadBalancerStrategy = "latency" - StrategyRoundRobin LoadBalancerStrategy = "round_robin" - StrategyWeighted LoadBalancerStrategy = "weighted" + StrategyLatency LoadBalancerStrategy = "latency" + StrategyRoundRobin LoadBalancerStrategy = "round_robin" + StrategyWeighted LoadBalancerStrategy = "weighted" StrategyAvailability LoadBalancerStrategy = "availability" ) +// CircuitState 熔断器状态 +type CircuitState int + +const ( + CircuitClosed CircuitState = iota // 熔断器关闭,正常流量 + CircuitOpen // 熔断器打开,流量被拒绝 + CircuitHalfOpen // 熔断器半开,允许试探流量 +) + +// String 实现 fmt.Stringer +func (s CircuitState) String() string { + switch s { + case CircuitClosed: + return "CircuitClosed" + case CircuitOpen: + return "CircuitOpen" + case CircuitHalfOpen: + return "CircuitHalfOpen" + default: + return "Unknown" + } +} + +// CircuitBreakerConfig 熔断器配置 +type CircuitBreakerConfig struct { + FailureRateThreshold float64 // 失败率阈值(默认0.5) + ConsecutiveFailureLimit int64 // 连续失败次数阈值(默认5) + HalfOpenSuccessThreshold int64 // 半开状态需要连续成功的次数(默认3) + OpenTimeout time.Duration // 熔断打开后的等待时间(默认30s) +} + +// DefaultCircuitBreakerConfig 返回默认配置 +func DefaultCircuitBreakerConfig() CircuitBreakerConfig { + return CircuitBreakerConfig{ + FailureRateThreshold: 0.5, + ConsecutiveFailureLimit: 5, + HalfOpenSuccessThreshold: 3, + OpenTimeout: 30 * time.Second, + } +} + // ProviderHealth Provider健康状态 type ProviderHealth struct { Name string @@ -36,6 +77,13 @@ type ProviderHealth struct { FailureRate float64 Weight float64 LastCheckTime time.Time + + // P3-B: 熔断器字段 + CircuitState CircuitState // 当前熔断器状态 + ConsecutiveFailures int64 // 连续失败次数(成功时重置) + ConsecutiveSuccesses int64 // 连续成功次数(失败时重置) + LastStateChange time.Time // 上次状态变更时间 + OpenReason string // 熔断打开的原因(用于调试/告警) } // RegisteredModel 描述当前路由器已注册的可见模型。 @@ -51,14 +99,19 @@ type Router struct { strategy LoadBalancerStrategy mu sync.RWMutex roundRobinCounter uint64 // RoundRobin策略的原子计数器 + + // P3-B: 熔断器 + circuitConfig CircuitBreakerConfig + healthChecker *HealthChecker // 后台健康检查器 } // NewRouter 创建路由器 func NewRouter(strategy LoadBalancerStrategy) *Router { return &Router{ - providers: make(map[string]adapter.ProviderAdapter), - health: make(map[string]*ProviderHealth), - strategy: strategy, + providers: make(map[string]adapter.ProviderAdapter), + health: make(map[string]*ProviderHealth), + strategy: strategy, + circuitConfig: DefaultCircuitBreakerConfig(), } } @@ -115,7 +168,8 @@ func (r *Router) isProviderAvailable(name, model string) bool { return false } - if !health.Available { + // P3-B: 熔断器打开时不允许流量 + if health.CircuitState == CircuitOpen { return false } @@ -125,13 +179,28 @@ func (r *Router) isProviderAvailable(name, model string) bool { return false } + supportsModel := false for _, m := range provider.SupportedModels() { if m == model || m == "*" { - return true + supportsModel = true + break } } + if !supportsModel { + return false + } - return false + // P3-B: 半开状态允许试探请求通过(不管 Available 是否为 false) + if health.CircuitState == CircuitHalfOpen { + return true + } + + // Closed 状态:走原有的 Available 检查 + if !health.Available { + return false + } + + return true } func (r *Router) selectByRoundRobin(candidates []string) (adapter.ProviderAdapter, error) { @@ -303,12 +372,63 @@ func (r *Router) RecordResult(ctx context.Context, providerName string, success } } - // 检查是否应该标记为不可用 - if health.FailureRate > 0.5 { - health.Available = false + health.LastCheckTime = time.Now() + + // P3-B: 熔断器状态机接管可用性判断,不再直接基于 FailureRate 设置 Available + // 状态转换在 transitionCircuit 中处理 + _ = r.transitionCircuitLocked(health, success) +} + +// transitionCircuitLocked 在已持有锁的情况下执行熔断器状态转换 +func (r *Router) transitionCircuitLocked(health *ProviderHealth, success bool) bool { + cfg := r.circuitConfig + now := time.Now() + prevState := health.CircuitState + + switch health.CircuitState { + case CircuitClosed: + if !success { + health.ConsecutiveFailures++ + health.ConsecutiveSuccesses = 0 + + if health.FailureRate > cfg.FailureRateThreshold || + health.ConsecutiveFailures >= cfg.ConsecutiveFailureLimit { + health.CircuitState = CircuitOpen + health.OpenReason = "failure_rate_or_consecutive_failures" + health.LastStateChange = now + health.Available = false + metrics.RecordCircuitStateChange(health.Name, "closed", "open") + } + } else { + health.ConsecutiveSuccesses++ + health.ConsecutiveFailures = 0 + } + + case CircuitOpen: + // Open 状态:等待超时后由健康检查循环处理,不在这里转换 + + case CircuitHalfOpen: + if success { + health.ConsecutiveSuccesses++ + if health.ConsecutiveSuccesses >= cfg.HalfOpenSuccessThreshold { + health.CircuitState = CircuitClosed + health.LastStateChange = now + health.Available = true + health.ConsecutiveFailures = 0 + health.FailureRate = 0 + metrics.RecordCircuitStateChange(health.Name, "half_open", "closed") + } + } else { + health.ConsecutiveFailures++ + health.ConsecutiveSuccesses = 0 + health.CircuitState = CircuitOpen + health.LastStateChange = now + health.OpenReason = "half_open_probe_failed" + metrics.RecordCircuitStateChange(health.Name, "half_open", "open") + } } - health.LastCheckTime = time.Now() + return prevState != health.CircuitState } // UpdateHealth 更新健康状态 @@ -336,7 +456,31 @@ func (r *Router) GetHealthStatus() map[string]*ProviderHealth { FailureRate: health.FailureRate, Weight: health.Weight, LastCheckTime: health.LastCheckTime, + CircuitState: health.CircuitState, } } return result } + +// StartHealthChecker 启动后台健康检查(由bootstrap调用) +func (r *Router) StartHealthChecker(interval time.Duration) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.healthChecker != nil { + return // 已经启动 + } + r.healthChecker = NewHealthChecker(r, interval, r.circuitConfig) + r.healthChecker.Start() +} + +// StopHealthChecker 停止后台健康检查(由shutdown调用) +func (r *Router) StopHealthChecker() { + r.mu.Lock() + defer r.mu.Unlock() + + if r.healthChecker != nil { + r.healthChecker.Stop() + r.healthChecker = nil + } +}