From fded346295e774ba651cb27ea1573cfa29100678 Mon Sep 17 00:00:00 2001 From: User Date: Sat, 18 Apr 2026 10:43:00 +0800 Subject: [PATCH] refactor(goroutine): replace bare goroutines with managed executors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add internal/pkg/safego/ utility for safe goroutine launching with panic recovery, structured logging (slog + zap), and optional error callback support - Replace 8 fire-and-forget goroutines in service layer: user_service.go (1), subscription_service.go (4), redeem_service.go (2), promo_service.go (1) — all now use safego.Go() - Add inline panic recovery to 6 long-running goroutines: pricing_service.go scheduler loop Redis PubSub subscribers: tls_fingerprint/error_passthrough/api_key caches OAuth session cleanup: openai/oauth/geminicli S3 stream upload in sora_s3_storage.go - All changes verified: go build PASS, service tests ALL PASS, repository tests ALL PASS --- backend/internal/pkg/geminicli/oauth.go | 6 +++ backend/internal/pkg/oauth/oauth.go | 6 +++ backend/internal/pkg/openai/oauth.go | 6 +++ backend/internal/pkg/safego/safego.go | 46 +++++++++++++++++++ backend/internal/repository/api_key_cache.go | 3 ++ .../repository/error_passthrough_cache.go | 5 ++ .../tls_fingerprint_profile_cache.go | 5 ++ backend/internal/service/pricing_service.go | 6 +++ backend/internal/service/promo_service.go | 6 ++- backend/internal/service/redeem_service.go | 9 ++-- backend/internal/service/sora_s3_storage.go | 6 +++ .../internal/service/subscription_service.go | 17 +++---- backend/internal/service/user_service.go | 7 ++- 13 files changed, 112 insertions(+), 16 deletions(-) create mode 100644 backend/internal/pkg/safego/safego.go diff --git a/backend/internal/pkg/geminicli/oauth.go b/backend/internal/pkg/geminicli/oauth.go index b10b5750..71bb148a 100644 --- a/backend/internal/pkg/geminicli/oauth.go +++ b/backend/internal/pkg/geminicli/oauth.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/hex" "fmt" + "log" "net/http" "net/url" "os" @@ -86,6 +87,11 @@ func (s *SessionStore) Stop() { } func (s *SessionStore) cleanup() { + defer func() { + if r := recover(); r != nil { + log.Printf("[GeminiCLI.OAuth] Cleanup goroutine panic recovered: %v", r) + } + }() ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { diff --git a/backend/internal/pkg/oauth/oauth.go b/backend/internal/pkg/oauth/oauth.go index c5ef3c6e..0d1f52c2 100644 --- a/backend/internal/pkg/oauth/oauth.go +++ b/backend/internal/pkg/oauth/oauth.go @@ -7,6 +7,7 @@ import ( "encoding/base64" "encoding/hex" "fmt" + "log" "net/url" "strings" "sync" @@ -99,6 +100,11 @@ func (s *SessionStore) Delete(sessionID string) { // cleanup removes expired sessions periodically func (s *SessionStore) cleanup() { + defer func() { + if r := recover(); r != nil { + log.Printf("[OAuth] Cleanup goroutine panic recovered: %v", r) + } + }() ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { diff --git a/backend/internal/pkg/openai/oauth.go b/backend/internal/pkg/openai/oauth.go index 8c765bef..adad29f7 100644 --- a/backend/internal/pkg/openai/oauth.go +++ b/backend/internal/pkg/openai/oauth.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "log" "net/url" "strings" "sync" @@ -109,6 +110,11 @@ func (s *SessionStore) Stop() { // cleanup removes expired sessions periodically func (s *SessionStore) cleanup() { + defer func() { + if r := recover(); r != nil { + log.Printf("[OpenAI.OAuth] Cleanup goroutine panic recovered: %v", r) + } + }() ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { diff --git a/backend/internal/pkg/safego/safego.go b/backend/internal/pkg/safego/safego.go new file mode 100644 index 00000000..5afee8f2 --- /dev/null +++ b/backend/internal/pkg/safego/safego.go @@ -0,0 +1,46 @@ +// Package safego provides managed goroutine helpers with panic recovery, +// structured logging, and optional error callbacks. +// +// Use SafeGo instead of bare go func(){} to ensure panics are caught +// and logged rather than silently crashing the process. +package safego + +import ( + "log/slog" + + "github.com/Wei-Shaw/sub2api/internal/pkg/logger" + "go.uber.org/zap" +) + +// ErrorHandler is an optional callback invoked when a panic is recovered. +type ErrorHandler func(recovered any) + +// Go launches f in a new goroutine with built-in panic recovery. +// Panics are logged as errors with the given component tag. +// If onError is non-nil it is called after recovery (useful for metrics). +func Go(component string, f func(), onError ErrorHandler) { + go runSafe(component, f, onError) +} + +func runSafe(component string, f func(), onError ErrorHandler) { + defer func() { + if r := recover(); r != nil { + handlePanic(component, r, onError) + } + }() + f() +} + +func handlePanic(component string, recovered any, onError ErrorHandler) { + slog.Error("goroutine_panic", + "component", component, + "panic", recovered, + ) + logger.L().With( + zap.String("component", component), + zap.Any("panic", recovered), + ).Error("safego.panic") + if onError != nil { + onError(recovered) + } +} diff --git a/backend/internal/repository/api_key_cache.go b/backend/internal/repository/api_key_cache.go index a1072057..4d57809f 100644 --- a/backend/internal/repository/api_key_cache.go +++ b/backend/internal/repository/api_key_cache.go @@ -112,6 +112,9 @@ func (c *apiKeyCache) SubscribeAuthCacheInvalidation(ctx context.Context, handle go func() { defer func() { + if r := recover(); r != nil { + log.Printf("[APIKeyCache] Auth cache subscriber panic recovered: %v", r) + } if err := pubsub.Close(); err != nil { log.Printf("Warning: failed to close auth cache invalidation pubsub: %v", err) } diff --git a/backend/internal/repository/error_passthrough_cache.go b/backend/internal/repository/error_passthrough_cache.go index 5584ffc8..8752797e 100644 --- a/backend/internal/repository/error_passthrough_cache.go +++ b/backend/internal/repository/error_passthrough_cache.go @@ -103,6 +103,11 @@ func (c *errorPassthroughCache) NotifyUpdate(ctx context.Context) error { // SubscribeUpdates 订阅缓存更新通知 func (c *errorPassthroughCache) SubscribeUpdates(ctx context.Context, handler func()) { go func() { + defer func() { + if r := recover(); r != nil { + log.Printf("[ErrorPassthroughCache] Subscriber panic recovered: %v", r) + } + }() sub := c.rdb.Subscribe(ctx, errorPassthroughPubSubKey) defer func() { _ = sub.Close() }() diff --git a/backend/internal/repository/tls_fingerprint_profile_cache.go b/backend/internal/repository/tls_fingerprint_profile_cache.go index 81ee0434..898d75ab 100644 --- a/backend/internal/repository/tls_fingerprint_profile_cache.go +++ b/backend/internal/repository/tls_fingerprint_profile_cache.go @@ -97,6 +97,11 @@ func (c *tlsFingerprintProfileCache) NotifyUpdate(ctx context.Context) error { // SubscribeUpdates 订阅缓存更新通知 func (c *tlsFingerprintProfileCache) SubscribeUpdates(ctx context.Context, handler func()) { go func() { + defer func() { + if r := recover(); r != nil { + slog.Error("tls_fp_subscriber_panic", "panic", r) + } + }() sub := c.rdb.Subscribe(ctx, tlsFPProfilePubSubKey) defer func() { _ = sub.Close() }() diff --git a/backend/internal/service/pricing_service.go b/backend/internal/service/pricing_service.go index 3b3f31c3..add29d57 100644 --- a/backend/internal/service/pricing_service.go +++ b/backend/internal/service/pricing_service.go @@ -163,6 +163,12 @@ func (s *PricingService) startUpdateScheduler() { s.wg.Add(1) go func() { defer s.wg.Done() + defer func() { + if r := recover(); r != nil { + logger.LegacyPrintf("service.pricing", "[Pricing] Scheduler panic recovered: %v", r) + logger.L().With(zap.String("component", "service.pricing"), zap.Any("panic", r)).Error("pricing.scheduler_panic") + } + }() ticker := time.NewTicker(hashInterval) defer ticker.Stop() diff --git a/backend/internal/service/promo_service.go b/backend/internal/service/promo_service.go index 5ff63bdc..8cd04fa9 100644 --- a/backend/internal/service/promo_service.go +++ b/backend/internal/service/promo_service.go @@ -11,6 +11,7 @@ import ( dbent "github.com/Wei-Shaw/sub2api/ent" infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" "github.com/Wei-Shaw/sub2api/internal/pkg/pagination" + "github.com/Wei-Shaw/sub2api/internal/pkg/safego" ) var ( @@ -152,11 +153,12 @@ func (s *PromoService) ApplyPromoCode(ctx context.Context, userID int64, code st // 失效余额缓存 if s.billingCacheService != nil { - go func() { + userID := userID + safego.Go("service.promo", func() { cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = s.billingCacheService.InvalidateUserBalance(cacheCtx, userID) - }() + }, nil) } return nil diff --git a/backend/internal/service/redeem_service.go b/backend/internal/service/redeem_service.go index 9ced6201..fea8361f 100644 --- a/backend/internal/service/redeem_service.go +++ b/backend/internal/service/redeem_service.go @@ -12,6 +12,7 @@ import ( dbent "github.com/Wei-Shaw/sub2api/ent" infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" "github.com/Wei-Shaw/sub2api/internal/pkg/pagination" + "github.com/Wei-Shaw/sub2api/internal/pkg/safego" ) var ( @@ -388,11 +389,11 @@ func (s *RedeemService) invalidateRedeemCaches(ctx context.Context, userID int64 if s.billingCacheService == nil { return } - go func() { + safego.Go("service.redeem", func() { cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = s.billingCacheService.InvalidateUserBalance(cacheCtx, userID) - }() + }, nil) case RedeemTypeConcurrency: if s.authCacheInvalidator != nil { s.authCacheInvalidator.InvalidateAuthCacheByUserID(ctx, userID) @@ -409,11 +410,11 @@ func (s *RedeemService) invalidateRedeemCaches(ctx context.Context, userID int64 } if redeemCode.GroupID != nil { groupID := *redeemCode.GroupID - go func() { + safego.Go("service.redeem", func() { cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID) - }() + }, nil) } } } diff --git a/backend/internal/service/sora_s3_storage.go b/backend/internal/service/sora_s3_storage.go index 4c573905..320aa217 100644 --- a/backend/internal/service/sora_s3_storage.go +++ b/backend/internal/service/sora_s3_storage.go @@ -258,6 +258,12 @@ func (s *SoraS3Storage) UploadFromURL(ctx context.Context, userID int64, sourceU uploadErrCh := make(chan error, 1) go func() { defer close(uploadErrCh) + defer func() { + if r := recover(); r != nil { + logger.LegacyPrintf("service.sora_s3", "[SoraS3] Upload goroutine panic recovered: %v", r) + uploadErrCh <- fmt.Errorf("upload panic: %v", r) + } + }() input := &s3.PutObjectInput{ Bucket: &cfg.Bucket, Key: &objectKey, diff --git a/backend/internal/service/subscription_service.go b/backend/internal/service/subscription_service.go index f0a5540e..0a304b20 100644 --- a/backend/internal/service/subscription_service.go +++ b/backend/internal/service/subscription_service.go @@ -13,6 +13,7 @@ import ( "github.com/Wei-Shaw/sub2api/internal/config" infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" "github.com/Wei-Shaw/sub2api/internal/pkg/pagination" + "github.com/Wei-Shaw/sub2api/internal/pkg/safego" "github.com/dgraph-io/ristretto" "golang.org/x/sync/singleflight" ) @@ -252,11 +253,11 @@ func (s *SubscriptionService) AssignOrExtendSubscription(ctx context.Context, in s.InvalidateSubCache(input.UserID, input.GroupID) if s.billingCacheService != nil { userID, groupID := input.UserID, input.GroupID - go func() { + safego.Go("service.subscription", func() { cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID) - }() + }, nil) } // 返回更新后的订阅 @@ -274,11 +275,11 @@ func (s *SubscriptionService) AssignOrExtendSubscription(ctx context.Context, in s.InvalidateSubCache(input.UserID, input.GroupID) if s.billingCacheService != nil { userID, groupID := input.UserID, input.GroupID - go func() { + safego.Go("service.subscription", func() { cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID) - }() + }, nil) } return sub, false, nil // false 表示是新建 @@ -417,11 +418,11 @@ func (s *SubscriptionService) assignSubscriptionWithReuse(ctx context.Context, i s.InvalidateSubCache(input.UserID, input.GroupID) if s.billingCacheService != nil { userID, groupID := input.UserID, input.GroupID - go func() { + safego.Go("service.subscription", func() { cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID) - }() + }, nil) } return sub, false, nil @@ -478,11 +479,11 @@ func (s *SubscriptionService) RevokeSubscription(ctx context.Context, subscripti s.InvalidateSubCache(sub.UserID, sub.GroupID) if s.billingCacheService != nil { userID, groupID := sub.UserID, sub.GroupID - go func() { + safego.Go("service.subscription", func() { cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID) - }() + }, nil) } return nil diff --git a/backend/internal/service/user_service.go b/backend/internal/service/user_service.go index 4045c0aa..54152237 100644 --- a/backend/internal/service/user_service.go +++ b/backend/internal/service/user_service.go @@ -6,6 +6,8 @@ import ( "log" "time" + "github.com/Wei-Shaw/sub2api/internal/pkg/safego" + infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" "github.com/Wei-Shaw/sub2api/internal/pkg/pagination" ) @@ -197,13 +199,14 @@ func (s *UserService) UpdateBalance(ctx context.Context, userID int64, amount fl s.authCacheInvalidator.InvalidateAuthCacheByUserID(ctx, userID) } if s.billingCache != nil { - go func() { + userID := userID + safego.Go("service.user", func() { cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := s.billingCache.InvalidateUserBalance(cacheCtx, userID); err != nil { log.Printf("invalidate user balance cache failed: user_id=%d err=%v", userID, err) } - }() + }, nil) } return nil }