Files
llm-intelligence/internal/retry/retry.go
phamnazage-jpg f5b373caf4
Some checks failed
CI / go-test (push) Has been cancelled
CI / scripts-regression (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / docker-build (push) Has been cancelled
feat(report): improve daily intelligence UX and price tracking
2026-05-27 17:23:08 +08:00

248 lines
5.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// internal/retry/retry.go
// 指数退避重试机制
package retry
import (
"context"
"errors"
"fmt"
"io"
"math"
"net"
"strings"
"time"
)
type temporaryError interface {
Temporary() bool
}
type timeoutError interface {
Timeout() bool
}
type HTTPStatusError struct {
StatusCode int
Body string
}
func (e HTTPStatusError) Error() string {
if e.Body == "" {
return fmt.Sprintf("http status %d", e.StatusCode)
}
return fmt.Sprintf("http status %d: %s", e.StatusCode, e.Body)
}
// Strategy 重试策略
type Strategy struct {
MaxRetries int // 最大重试次数0=不重试)
BaseDelay time.Duration // 基础延迟
MaxDelay time.Duration // 最大延迟上限
Multiplier float64 // 乘数默认2.0
Jitter bool // 是否添加随机抖动
Retryable func(error) bool // 判断错误是否可重试
}
// DefaultStrategy 返回默认重试策略
func DefaultStrategy() Strategy {
return Strategy{
MaxRetries: 3,
BaseDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
Multiplier: 2.0,
Jitter: true,
Retryable: IsRetryable,
}
}
// IsRetryable 默认重试判定仅临时网络错误、429、5xx 等可重试
func IsRetryable(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}
var statusErr HTTPStatusError
if errors.As(err, &statusErr) {
return statusErr.StatusCode == 429 || (statusErr.StatusCode >= 500 && statusErr.StatusCode < 600)
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
}
message := strings.ToLower(err.Error())
if strings.Contains(message, "json 解析失败") ||
strings.Contains(message, "invalid character") ||
strings.Contains(message, "unmarshal") ||
strings.Contains(message, "decode") ||
strings.Contains(message, "schema") {
return false
}
var tempErr temporaryError
if errors.As(err, &tempErr) && tempErr.Temporary() {
return true
}
var timeoutErr timeoutError
if errors.As(err, &timeoutErr) && timeoutErr.Timeout() {
return true
}
var netErr net.Error
if errors.As(err, &netErr) && (netErr.Timeout() || netErr.Temporary()) {
return true
}
retriableMarkers := []string{
"transport closed",
"connection reset",
"connection refused",
"tls handshake timeout",
"i/o timeout",
"no such host",
"temporarily unavailable",
"too many requests",
"rate limit",
}
for _, marker := range retriableMarkers {
if strings.Contains(message, marker) {
return true
}
}
return false
}
// Do 执行带重试的操作
func Do(ctx context.Context, strategy Strategy, fn func() error) error {
var lastErr error
for attempt := 0; attempt <= strategy.MaxRetries; attempt++ {
if err := fn(); err != nil {
lastErr = err
// 不判断最后一次是否需要重试
if attempt == strategy.MaxRetries {
break
}
// 检查是否可重试
if strategy.Retryable != nil && !strategy.Retryable(err) {
return fmt.Errorf("non-retryable error on attempt %d: %w", attempt+1, err)
}
// 计算退避延迟
delay := calculateDelay(strategy, attempt)
// 检查上下文是否已取消
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled after attempt %d: %w", attempt+1, ctx.Err())
case <-time.After(delay):
// 继续重试
}
} else {
return nil
}
}
return fmt.Errorf("all %d attempts failed, last error: %w", strategy.MaxRetries+1, lastErr)
}
// calculateDelay 计算指数退避延迟
func calculateDelay(s Strategy, attempt int) time.Duration {
// 指数退避: base * multiplier^attempt
delay := float64(s.BaseDelay) * math.Pow(s.Multiplier, float64(attempt))
// 添加上限
if max := float64(s.MaxDelay); delay > max {
delay = max
}
// 添加抖动±25%
if s.Jitter {
jitter := delay * 0.25
delay = delay - jitter + (jitter * 2 * float64(time.Now().Nanosecond()%1000) / 1000)
}
return time.Duration(delay)
}
// DoWithResult 执行带重试的操作并返回结果
func DoWithResult[T any](ctx context.Context, strategy Strategy, fn func() (T, error)) (T, error) {
var zero T
var lastErr error
for attempt := 0; attempt <= strategy.MaxRetries; attempt++ {
result, err := fn()
if err == nil {
return result, nil
}
lastErr = err
if attempt == strategy.MaxRetries {
break
}
if strategy.Retryable != nil && !strategy.Retryable(err) {
return zero, fmt.Errorf("non-retryable error on attempt %d: %w", attempt+1, err)
}
delay := calculateDelay(strategy, attempt)
select {
case <-ctx.Done():
return zero, fmt.Errorf("context cancelled after attempt %d: %w", attempt+1, ctx.Err())
case <-time.After(delay):
}
}
return zero, fmt.Errorf("all %d attempts failed, last error: %w", strategy.MaxRetries+1, lastErr)
}
// Metrics 重试统计
type Metrics struct {
Attempts int
Success bool
TotalDelay time.Duration
}
// DoWithMetrics 执行带重试并返回统计信息
func DoWithMetrics(ctx context.Context, strategy Strategy, fn func() error) (Metrics, error) {
m := Metrics{}
var lastErr error
start := time.Now()
for attempt := 0; attempt <= strategy.MaxRetries; attempt++ {
m.Attempts = attempt + 1
if err := fn(); err != nil {
lastErr = err
if attempt == strategy.MaxRetries {
break
}
if strategy.Retryable != nil && !strategy.Retryable(err) {
m.TotalDelay = time.Since(start)
return m, fmt.Errorf("non-retryable error on attempt %d: %w", attempt+1, err)
}
delay := calculateDelay(strategy, attempt)
select {
case <-ctx.Done():
m.TotalDelay = time.Since(start)
return m, fmt.Errorf("context cancelled after attempt %d: %w", attempt+1, ctx.Err())
case <-time.After(delay):
}
} else {
m.Success = true
m.TotalDelay = time.Since(start)
return m, nil
}
}
m.TotalDelay = time.Since(start)
return m, fmt.Errorf("all %d attempts failed, last error: %w", strategy.MaxRetries+1, lastErr)
}