refactor(goroutine): replace bare goroutines with managed executors
Some checks failed
CI / test (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
Security Scan / backend-security (push) Has been cancelled
Security Scan / frontend-security (push) Has been cancelled

- Add internal/pkg/safego/ utility for safe goroutine launching
  with panic recovery, structured logging (slog + zap), and optional
  error callback support

- Replace 8 fire-and-forget goroutines in service layer:
  user_service.go (1), subscription_service.go (4), redeem_service.go (2),
  promo_service.go (1) — all now use safego.Go()

- Add inline panic recovery to 6 long-running goroutines:
  pricing_service.go scheduler loop
  Redis PubSub subscribers: tls_fingerprint/error_passthrough/api_key caches
  OAuth session cleanup: openai/oauth/geminicli
  S3 stream upload in sora_s3_storage.go

- All changes verified: go build PASS, service tests ALL PASS,
  repository tests ALL PASS
This commit is contained in:
User
2026-04-18 10:43:00 +08:00
parent d1bf033f24
commit fded346295
13 changed files with 112 additions and 16 deletions

View File

@@ -6,6 +6,7 @@ import (
"encoding/base64"
"encoding/hex"
"fmt"
"log"
"net/http"
"net/url"
"os"
@@ -86,6 +87,11 @@ func (s *SessionStore) Stop() {
}
func (s *SessionStore) cleanup() {
defer func() {
if r := recover(); r != nil {
log.Printf("[GeminiCLI.OAuth] Cleanup goroutine panic recovered: %v", r)
}
}()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {

View File

@@ -7,6 +7,7 @@ import (
"encoding/base64"
"encoding/hex"
"fmt"
"log"
"net/url"
"strings"
"sync"
@@ -99,6 +100,11 @@ func (s *SessionStore) Delete(sessionID string) {
// cleanup removes expired sessions periodically
func (s *SessionStore) cleanup() {
defer func() {
if r := recover(); r != nil {
log.Printf("[OAuth] Cleanup goroutine panic recovered: %v", r)
}
}()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {

View File

@@ -7,6 +7,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"log"
"net/url"
"strings"
"sync"
@@ -109,6 +110,11 @@ func (s *SessionStore) Stop() {
// cleanup removes expired sessions periodically
func (s *SessionStore) cleanup() {
defer func() {
if r := recover(); r != nil {
log.Printf("[OpenAI.OAuth] Cleanup goroutine panic recovered: %v", r)
}
}()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {

View File

@@ -0,0 +1,46 @@
// Package safego provides managed goroutine helpers with panic recovery,
// structured logging, and optional error callbacks.
//
// Use SafeGo instead of bare go func(){} to ensure panics are caught
// and logged rather than silently crashing the process.
package safego
import (
"log/slog"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"go.uber.org/zap"
)
// ErrorHandler is an optional callback invoked when a panic is recovered.
type ErrorHandler func(recovered any)
// Go launches f in a new goroutine with built-in panic recovery.
// Panics are logged as errors with the given component tag.
// If onError is non-nil it is called after recovery (useful for metrics).
func Go(component string, f func(), onError ErrorHandler) {
go runSafe(component, f, onError)
}
func runSafe(component string, f func(), onError ErrorHandler) {
defer func() {
if r := recover(); r != nil {
handlePanic(component, r, onError)
}
}()
f()
}
func handlePanic(component string, recovered any, onError ErrorHandler) {
slog.Error("goroutine_panic",
"component", component,
"panic", recovered,
)
logger.L().With(
zap.String("component", component),
zap.Any("panic", recovered),
).Error("safego.panic")
if onError != nil {
onError(recovered)
}
}

View File

@@ -112,6 +112,9 @@ func (c *apiKeyCache) SubscribeAuthCacheInvalidation(ctx context.Context, handle
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[APIKeyCache] Auth cache subscriber panic recovered: %v", r)
}
if err := pubsub.Close(); err != nil {
log.Printf("Warning: failed to close auth cache invalidation pubsub: %v", err)
}

View File

@@ -103,6 +103,11 @@ func (c *errorPassthroughCache) NotifyUpdate(ctx context.Context) error {
// SubscribeUpdates 订阅缓存更新通知
func (c *errorPassthroughCache) SubscribeUpdates(ctx context.Context, handler func()) {
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[ErrorPassthroughCache] Subscriber panic recovered: %v", r)
}
}()
sub := c.rdb.Subscribe(ctx, errorPassthroughPubSubKey)
defer func() { _ = sub.Close() }()

View File

@@ -97,6 +97,11 @@ func (c *tlsFingerprintProfileCache) NotifyUpdate(ctx context.Context) error {
// SubscribeUpdates 订阅缓存更新通知
func (c *tlsFingerprintProfileCache) SubscribeUpdates(ctx context.Context, handler func()) {
go func() {
defer func() {
if r := recover(); r != nil {
slog.Error("tls_fp_subscriber_panic", "panic", r)
}
}()
sub := c.rdb.Subscribe(ctx, tlsFPProfilePubSubKey)
defer func() { _ = sub.Close() }()

View File

@@ -163,6 +163,12 @@ func (s *PricingService) startUpdateScheduler() {
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer func() {
if r := recover(); r != nil {
logger.LegacyPrintf("service.pricing", "[Pricing] Scheduler panic recovered: %v", r)
logger.L().With(zap.String("component", "service.pricing"), zap.Any("panic", r)).Error("pricing.scheduler_panic")
}
}()
ticker := time.NewTicker(hashInterval)
defer ticker.Stop()

View File

@@ -11,6 +11,7 @@ import (
dbent "github.com/Wei-Shaw/sub2api/ent"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
"github.com/Wei-Shaw/sub2api/internal/pkg/safego"
)
var (
@@ -152,11 +153,12 @@ func (s *PromoService) ApplyPromoCode(ctx context.Context, userID int64, code st
// 失效余额缓存
if s.billingCacheService != nil {
go func() {
userID := userID
safego.Go("service.promo", func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.billingCacheService.InvalidateUserBalance(cacheCtx, userID)
}()
}, nil)
}
return nil

View File

@@ -12,6 +12,7 @@ import (
dbent "github.com/Wei-Shaw/sub2api/ent"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
"github.com/Wei-Shaw/sub2api/internal/pkg/safego"
)
var (
@@ -388,11 +389,11 @@ func (s *RedeemService) invalidateRedeemCaches(ctx context.Context, userID int64
if s.billingCacheService == nil {
return
}
go func() {
safego.Go("service.redeem", func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.billingCacheService.InvalidateUserBalance(cacheCtx, userID)
}()
}, nil)
case RedeemTypeConcurrency:
if s.authCacheInvalidator != nil {
s.authCacheInvalidator.InvalidateAuthCacheByUserID(ctx, userID)
@@ -409,11 +410,11 @@ func (s *RedeemService) invalidateRedeemCaches(ctx context.Context, userID int64
}
if redeemCode.GroupID != nil {
groupID := *redeemCode.GroupID
go func() {
safego.Go("service.redeem", func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID)
}()
}, nil)
}
}
}

View File

@@ -258,6 +258,12 @@ func (s *SoraS3Storage) UploadFromURL(ctx context.Context, userID int64, sourceU
uploadErrCh := make(chan error, 1)
go func() {
defer close(uploadErrCh)
defer func() {
if r := recover(); r != nil {
logger.LegacyPrintf("service.sora_s3", "[SoraS3] Upload goroutine panic recovered: %v", r)
uploadErrCh <- fmt.Errorf("upload panic: %v", r)
}
}()
input := &s3.PutObjectInput{
Bucket: &cfg.Bucket,
Key: &objectKey,

View File

@@ -13,6 +13,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/config"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
"github.com/Wei-Shaw/sub2api/internal/pkg/safego"
"github.com/dgraph-io/ristretto"
"golang.org/x/sync/singleflight"
)
@@ -252,11 +253,11 @@ func (s *SubscriptionService) AssignOrExtendSubscription(ctx context.Context, in
s.InvalidateSubCache(input.UserID, input.GroupID)
if s.billingCacheService != nil {
userID, groupID := input.UserID, input.GroupID
go func() {
safego.Go("service.subscription", func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID)
}()
}, nil)
}
// 返回更新后的订阅
@@ -274,11 +275,11 @@ func (s *SubscriptionService) AssignOrExtendSubscription(ctx context.Context, in
s.InvalidateSubCache(input.UserID, input.GroupID)
if s.billingCacheService != nil {
userID, groupID := input.UserID, input.GroupID
go func() {
safego.Go("service.subscription", func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID)
}()
}, nil)
}
return sub, false, nil // false 表示是新建
@@ -417,11 +418,11 @@ func (s *SubscriptionService) assignSubscriptionWithReuse(ctx context.Context, i
s.InvalidateSubCache(input.UserID, input.GroupID)
if s.billingCacheService != nil {
userID, groupID := input.UserID, input.GroupID
go func() {
safego.Go("service.subscription", func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID)
}()
}, nil)
}
return sub, false, nil
@@ -478,11 +479,11 @@ func (s *SubscriptionService) RevokeSubscription(ctx context.Context, subscripti
s.InvalidateSubCache(sub.UserID, sub.GroupID)
if s.billingCacheService != nil {
userID, groupID := sub.UserID, sub.GroupID
go func() {
safego.Go("service.subscription", func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID)
}()
}, nil)
}
return nil

View File

@@ -6,6 +6,8 @@ import (
"log"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/safego"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
)
@@ -197,13 +199,14 @@ func (s *UserService) UpdateBalance(ctx context.Context, userID int64, amount fl
s.authCacheInvalidator.InvalidateAuthCacheByUserID(ctx, userID)
}
if s.billingCache != nil {
go func() {
userID := userID
safego.Go("service.user", func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.billingCache.InvalidateUserBalance(cacheCtx, userID); err != nil {
log.Printf("invalidate user balance cache failed: user_id=%d err=%v", userID, err)
}
}()
}, nil)
}
return nil
}