feat(batch): add validation service for final access status
This commit is contained in:
133
internal/batch/validation.go
Normal file
133
internal/batch/validation.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"sub2api-cn-relay-manager/internal/host/sub2api"
|
||||
"sub2api-cn-relay-manager/internal/store/sqlite"
|
||||
)
|
||||
|
||||
type ValidationItemStore interface {
|
||||
Upsert(ctx context.Context, item sqlite.ImportRunItem) error
|
||||
}
|
||||
|
||||
type ValidationRunStore interface {
|
||||
GetByRunID(ctx context.Context, runID string) (sqlite.ImportRun, error)
|
||||
Update(ctx context.Context, run sqlite.ImportRun) error
|
||||
}
|
||||
|
||||
type ValidationService struct {
|
||||
ItemStore ValidationItemStore
|
||||
RunStore ValidationRunStore
|
||||
Validator func(ctx context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error)
|
||||
}
|
||||
|
||||
func (s ValidationService) ValidateItem(ctx context.Context, item sqlite.ImportRunItem) error {
|
||||
if s.ItemStore == nil {
|
||||
return fmt.Errorf("item store is required")
|
||||
}
|
||||
if s.RunStore == nil {
|
||||
return fmt.Errorf("run store is required")
|
||||
}
|
||||
if s.Validator == nil {
|
||||
return fmt.Errorf("validator is required")
|
||||
}
|
||||
if strings.TrimSpace(item.CurrentStage) != string(ItemStageValidate) {
|
||||
return fmt.Errorf("item %s is not ready for validation", strings.TrimSpace(item.ItemID))
|
||||
}
|
||||
|
||||
completion, err := s.Validator(ctx, item)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
item.CurrentStage = string(ItemStageDone)
|
||||
item.AccessStatus = string(resolveValidationAccessStatus(item.ConfirmationStatus, completion))
|
||||
if item.AccessStatus == string(AccessStatusDegraded) {
|
||||
item.AdvisoryMessagesJSON = appendAdvisoryJSON(item.AdvisoryMessagesJSON, "gateway_warmup_retry_succeeded")
|
||||
}
|
||||
if !completion.OK {
|
||||
item.LastErrorStage = string(ItemStageValidate)
|
||||
item.LastError = strings.TrimSpace(completion.BodyPreview)
|
||||
}
|
||||
|
||||
if err := s.ItemStore.Upsert(ctx, item); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
run, err := s.RunStore.GetByRunID(ctx, item.RunID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
run.CompletedItems++
|
||||
switch item.AccessStatus {
|
||||
case string(AccessStatusActive):
|
||||
run.ActiveItems++
|
||||
case string(AccessStatusDegraded):
|
||||
run.DegradedItems++
|
||||
run.WarningItems++
|
||||
case string(AccessStatusBroken):
|
||||
run.BrokenItems++
|
||||
}
|
||||
run.State = deriveRunState(run)
|
||||
|
||||
return s.RunStore.Update(ctx, run)
|
||||
}
|
||||
|
||||
func resolveValidationAccessStatus(confirmationStatus string, completion sub2api.GatewayCompletionResult) AccessStatus {
|
||||
switch strings.TrimSpace(confirmationStatus) {
|
||||
case string(ConfirmationFailed):
|
||||
return AccessStatusBroken
|
||||
case string(ConfirmationConfirmed), string(ConfirmationAdvisory):
|
||||
if completion.OK && completion.StatusCode >= 200 && completion.StatusCode < 300 {
|
||||
return AccessStatusActive
|
||||
}
|
||||
if isTransientValidationFailure(completion) {
|
||||
return AccessStatusDegraded
|
||||
}
|
||||
return AccessStatusBroken
|
||||
default:
|
||||
return AccessStatusBroken
|
||||
}
|
||||
}
|
||||
|
||||
func isTransientValidationFailure(result sub2api.GatewayCompletionResult) bool {
|
||||
if result.OK {
|
||||
return false
|
||||
}
|
||||
if result.StatusCode != 0 && result.StatusCode != 429 && result.StatusCode != 502 && result.StatusCode != 503 && result.StatusCode != 504 {
|
||||
return false
|
||||
}
|
||||
|
||||
body := strings.ToLower(strings.TrimSpace(result.BodyPreview))
|
||||
return strings.Contains(body, "service temporarily unavailable") ||
|
||||
strings.Contains(body, "no available accounts") ||
|
||||
strings.Contains(body, "temporar") ||
|
||||
strings.Contains(body, "try again")
|
||||
}
|
||||
|
||||
func deriveRunState(run sqlite.ImportRun) string {
|
||||
if run.TotalItems > 0 && run.CompletedItems >= run.TotalItems {
|
||||
switch {
|
||||
case run.BrokenItems > 0:
|
||||
return string(RunStateFailed)
|
||||
case run.WarningItems > 0 || run.DegradedItems > 0:
|
||||
return string(RunStateCompletedWithWarnings)
|
||||
default:
|
||||
return string(RunStateCompleted)
|
||||
}
|
||||
}
|
||||
return firstNonEmptyRunState(run.State, string(RunStateRunning))
|
||||
}
|
||||
|
||||
func firstNonEmptyRunState(values ...string) string {
|
||||
for _, value := range values {
|
||||
if trimmed := strings.TrimSpace(value); trimmed != "" {
|
||||
return trimmed
|
||||
}
|
||||
}
|
||||
return string(RunStateRunning)
|
||||
}
|
||||
212
internal/batch/validation_test.go
Normal file
212
internal/batch/validation_test.go
Normal file
@@ -0,0 +1,212 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"sub2api-cn-relay-manager/internal/host/sub2api"
|
||||
"sub2api-cn-relay-manager/internal/store/sqlite"
|
||||
)
|
||||
|
||||
func TestValidationService(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("confirmed or advisory with chat 200 becomes active", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
for _, confirmationStatus := range []string{string(ConfirmationConfirmed), string(ConfirmationAdvisory)} {
|
||||
confirmationStatus := confirmationStatus
|
||||
t.Run(confirmationStatus, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
itemStore := &fakeValidationItemStore{}
|
||||
runStore := &fakeValidationRunStore{
|
||||
run: sqlite.ImportRun{RunID: "run-1", TotalItems: 1},
|
||||
}
|
||||
service := ValidationService{
|
||||
ItemStore: itemStore,
|
||||
RunStore: runStore,
|
||||
Validator: func(ctx context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) {
|
||||
return sub2api.GatewayCompletionResult{OK: true, StatusCode: 200, ContentType: "application/json"}, nil
|
||||
},
|
||||
}
|
||||
|
||||
item := sqlite.ImportRunItem{
|
||||
ItemID: "item-1",
|
||||
RunID: "run-1",
|
||||
CurrentStage: string(ItemStageValidate),
|
||||
ConfirmationStatus: confirmationStatus,
|
||||
AccessStatus: string(AccessStatusUnknown),
|
||||
ResolvedSmokeModel: "kimi-k2.6",
|
||||
APIKeyFingerprint: "sha256:abc",
|
||||
MatchedAccountState: string(MatchedAccountStateActive),
|
||||
AccountResolution: string(AccountResolutionReused),
|
||||
}
|
||||
|
||||
if err := service.ValidateItem(context.Background(), item); err != nil {
|
||||
t.Fatalf("ValidateItem() error = %v", err)
|
||||
}
|
||||
got := itemStore.last
|
||||
if got.AccessStatus != string(AccessStatusActive) {
|
||||
t.Fatalf("AccessStatus = %q, want active", got.AccessStatus)
|
||||
}
|
||||
if got.CurrentStage != string(ItemStageDone) {
|
||||
t.Fatalf("CurrentStage = %q, want done", got.CurrentStage)
|
||||
}
|
||||
if runStore.updated.ActiveItems != 1 || runStore.updated.CompletedItems != 1 {
|
||||
t.Fatalf("run summary = %+v, want active/completed increment", runStore.updated)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("exhausted transient completion becomes degraded", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
itemStore := &fakeValidationItemStore{}
|
||||
runStore := &fakeValidationRunStore{
|
||||
run: sqlite.ImportRun{RunID: "run-1", TotalItems: 1},
|
||||
}
|
||||
service := ValidationService{
|
||||
ItemStore: itemStore,
|
||||
RunStore: runStore,
|
||||
Validator: func(ctx context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) {
|
||||
return sub2api.GatewayCompletionResult{
|
||||
OK: false,
|
||||
StatusCode: 503,
|
||||
BodyPreview: `{"error":{"message":"no available accounts"}}`,
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
|
||||
item := sqlite.ImportRunItem{
|
||||
ItemID: "item-1",
|
||||
RunID: "run-1",
|
||||
CurrentStage: string(ItemStageValidate),
|
||||
ConfirmationStatus: string(ConfirmationConfirmed),
|
||||
AccessStatus: string(AccessStatusUnknown),
|
||||
ResolvedSmokeModel: "kimi-k2.6",
|
||||
APIKeyFingerprint: "sha256:abc",
|
||||
MatchedAccountState: string(MatchedAccountStateActive),
|
||||
AccountResolution: string(AccountResolutionReused),
|
||||
}
|
||||
|
||||
if err := service.ValidateItem(context.Background(), item); err != nil {
|
||||
t.Fatalf("ValidateItem() error = %v", err)
|
||||
}
|
||||
if itemStore.last.AccessStatus != string(AccessStatusDegraded) {
|
||||
t.Fatalf("AccessStatus = %q, want degraded", itemStore.last.AccessStatus)
|
||||
}
|
||||
if !strings.Contains(itemStore.last.AdvisoryMessagesJSON, "gateway_warmup_retry_succeeded") {
|
||||
t.Fatalf("AdvisoryMessagesJSON = %q, want warmup advisory", itemStore.last.AdvisoryMessagesJSON)
|
||||
}
|
||||
if runStore.updated.DegradedItems != 1 || runStore.updated.WarningItems != 1 {
|
||||
t.Fatalf("run summary = %+v, want degraded/warning increment", runStore.updated)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("definitive invalid path becomes broken", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
itemStore := &fakeValidationItemStore{}
|
||||
runStore := &fakeValidationRunStore{
|
||||
run: sqlite.ImportRun{RunID: "run-1", TotalItems: 1},
|
||||
}
|
||||
service := ValidationService{
|
||||
ItemStore: itemStore,
|
||||
RunStore: runStore,
|
||||
Validator: func(ctx context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) {
|
||||
return sub2api.GatewayCompletionResult{
|
||||
OK: false,
|
||||
StatusCode: 404,
|
||||
BodyPreview: `{"error":"route missing"}`,
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
|
||||
item := sqlite.ImportRunItem{
|
||||
ItemID: "item-1",
|
||||
RunID: "run-1",
|
||||
CurrentStage: string(ItemStageValidate),
|
||||
ConfirmationStatus: string(ConfirmationConfirmed),
|
||||
AccessStatus: string(AccessStatusUnknown),
|
||||
ResolvedSmokeModel: "kimi-k2.6",
|
||||
APIKeyFingerprint: "sha256:abc",
|
||||
MatchedAccountState: string(MatchedAccountStateActive),
|
||||
AccountResolution: string(AccountResolutionReused),
|
||||
}
|
||||
|
||||
if err := service.ValidateItem(context.Background(), item); err != nil {
|
||||
t.Fatalf("ValidateItem() error = %v", err)
|
||||
}
|
||||
if itemStore.last.AccessStatus != string(AccessStatusBroken) {
|
||||
t.Fatalf("AccessStatus = %q, want broken", itemStore.last.AccessStatus)
|
||||
}
|
||||
if runStore.updated.BrokenItems != 1 || runStore.updated.CompletedItems != 1 {
|
||||
t.Fatalf("run summary = %+v, want broken/completed increment", runStore.updated)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("only validation stage may write access status", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
itemStore := &fakeValidationItemStore{}
|
||||
runStore := &fakeValidationRunStore{
|
||||
run: sqlite.ImportRun{RunID: "run-1", TotalItems: 1},
|
||||
}
|
||||
service := ValidationService{
|
||||
ItemStore: itemStore,
|
||||
RunStore: runStore,
|
||||
Validator: func(ctx context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) {
|
||||
return sub2api.GatewayCompletionResult{OK: true, StatusCode: 200}, nil
|
||||
},
|
||||
}
|
||||
|
||||
item := sqlite.ImportRunItem{
|
||||
ItemID: "item-1",
|
||||
RunID: "run-1",
|
||||
CurrentStage: string(ItemStageConfirm),
|
||||
ConfirmationStatus: string(ConfirmationConfirmed),
|
||||
AccessStatus: string(AccessStatusUnknown),
|
||||
ResolvedSmokeModel: "kimi-k2.6",
|
||||
APIKeyFingerprint: "sha256:abc",
|
||||
MatchedAccountState: string(MatchedAccountStateActive),
|
||||
AccountResolution: string(AccountResolutionReused),
|
||||
}
|
||||
|
||||
err := service.ValidateItem(context.Background(), item)
|
||||
if err == nil {
|
||||
t.Fatal("ValidateItem() error = nil, want validation stage guard")
|
||||
}
|
||||
if itemStore.calls != 0 {
|
||||
t.Fatalf("item upsert calls = %d, want 0", itemStore.calls)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type fakeValidationItemStore struct {
|
||||
last sqlite.ImportRunItem
|
||||
calls int
|
||||
}
|
||||
|
||||
func (f *fakeValidationItemStore) Upsert(ctx context.Context, item sqlite.ImportRunItem) error {
|
||||
f.last = item
|
||||
f.calls++
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakeValidationRunStore struct {
|
||||
run sqlite.ImportRun
|
||||
updated sqlite.ImportRun
|
||||
}
|
||||
|
||||
func (f *fakeValidationRunStore) GetByRunID(ctx context.Context, runID string) (sqlite.ImportRun, error) {
|
||||
return f.run, nil
|
||||
}
|
||||
|
||||
func (f *fakeValidationRunStore) Update(ctx context.Context, run sqlite.ImportRun) error {
|
||||
f.updated = run
|
||||
f.run = run
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user