Files
2026-05-12 18:49:52 +08:00

437 lines
16 KiB
Go

package gatewayconsumer
import (
"context"
"errors"
"testing"
"time"
"supply-intelligence/internal/domain"
"supply-intelligence/internal/repository"
)
func TestServiceConsumeOnceAppliedAndFailed(t *testing.T) {
repo := repository.NewMemoryRepository()
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-applied",
EventType: "supply_package_published",
PackageID: 101,
Platform: "openai",
Model: "gpt-4.1-mini",
Version: 3,
OccurredAt: time.Unix(10, 0).UTC(),
GatewaySyncStatus: domain.GatewaySyncStatusPending,
})
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-failed",
EventType: "supply_package_published",
PackageID: 102,
Platform: "openai",
Model: "gpt-fail-model",
Version: 4,
OccurredAt: time.Unix(20, 0).UTC(),
GatewaySyncStatus: domain.GatewaySyncStatusPending,
})
service := NewService(repo)
service.now = func() time.Time { return time.Unix(30, 0).UTC() }
out, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(out.Items) != 2 {
t.Fatalf("unexpected item count: %d", len(out.Items))
}
if out.Items[0].GatewaySyncStatus != domain.GatewaySyncStatusApplied {
t.Fatalf("unexpected first status: %+v", out.Items[0])
}
if out.Items[1].GatewaySyncStatus != domain.GatewaySyncStatusFailed {
t.Fatalf("unexpected second status: %+v", out.Items[1])
}
events := repo.ListPackageEvents(context.Background())
var appliedEvt, failedEvt domain.PackageChangeEvent
for _, e := range events {
if e.EventID == "evt-applied" {
appliedEvt = e
} else if e.EventID == "evt-failed" {
failedEvt = e
}
}
if appliedEvt.GatewaySyncStatus != domain.GatewaySyncStatusApplied {
t.Fatalf("expected applied event, got %+v", appliedEvt)
}
if failedEvt.GatewaySyncStatus != domain.GatewaySyncStatusFailed {
t.Fatalf("expected failed event, got %+v", failedEvt)
}
snapshot, ok := repo.GetGatewayAppliedSnapshot(context.Background(), "gateway")
if !ok {
t.Fatal("expected applied snapshot")
}
if snapshot.LastEventID != "evt-applied" || snapshot.LastPackageID != 101 {
t.Fatalf("unexpected snapshot: %+v", snapshot)
}
}
func TestServiceConsumeOnceRejectsInvalidNilService(t *testing.T) {
var service *Service
_, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{})
if err != ErrInvalidConsumeInput {
t.Fatalf("unexpected error: %v", err)
}
}
func TestServiceConsumeOnceSkipsNonPendingEvents(t *testing.T) {
repo := repository.NewMemoryRepository()
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-applied-existing",
EventType: "supply_package_published",
PackageID: 201,
Platform: "openai",
Model: "gpt-4.1-applied",
Version: 5,
OccurredAt: time.Unix(10, 0).UTC(),
GatewaySyncStatus: domain.GatewaySyncStatusApplied,
})
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-failed-existing",
EventType: "supply_package_published",
PackageID: 202,
Platform: "openai",
Model: "gpt-4.1-failed",
Version: 6,
OccurredAt: time.Unix(11, 0).UTC(),
GatewaySyncStatus: domain.GatewaySyncStatusFailed,
})
service := NewService(repo)
out, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(out.Items) != 0 {
t.Fatalf("expected no items for non-pending events, got %+v", out.Items)
}
if _, ok := repo.GetGatewayAppliedSnapshot(context.Background(), "gateway"); ok {
t.Fatalf("expected no snapshot update when no pending events were consumed")
}
}
func TestServiceConsumeOnceSkipsUnauthorizedEvents(t *testing.T) {
repo := repository.NewMemoryRepository()
repo.UpsertSupplyAccount(context.Background(), domain.SupplyAccount{
AccountID: 301,
Platform: "openai",
APIKey: "key-other",
ConsumerTag: "other-consumer",
Status: "active",
CreatedAt: time.Unix(1, 0).UTC(),
UpdatedAt: time.Unix(1, 0).UTC(),
})
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-unauthorized",
EventType: "supply_package_published",
PackageID: 301,
AccountID: 301,
Platform: "openai",
Model: "gpt-4.1-unauthorized",
Version: 7,
OccurredAt: time.Unix(12, 0).UTC(),
GatewaySyncStatus: domain.GatewaySyncStatusPending,
})
service := NewService(repo)
out, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(out.Items) != 0 {
t.Fatalf("expected unauthorized event to be skipped, got %+v", out.Items)
}
events := repo.ListPackageEvents(context.Background())
if len(events) != 1 || events[0].GatewaySyncStatus != domain.GatewaySyncStatusPending {
t.Fatalf("expected unauthorized event to remain pending, got %+v", events)
}
if _, ok := repo.GetGatewayAppliedSnapshot(context.Background(), "gateway"); ok {
t.Fatalf("expected no snapshot update for unauthorized event")
}
}
func TestServiceConsumeOnceFailedDoesNotDriftSnapshot(t *testing.T) {
repo := repository.NewMemoryRepository()
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-apply-first",
EventType: "supply_package_published",
PackageID: 401,
Platform: "openai",
Model: "gpt-4.1-first",
Version: 8,
OccurredAt: time.Unix(20, 0).UTC(),
GatewaySyncStatus: domain.GatewaySyncStatusPending,
})
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-fail-second",
EventType: "supply_package_published",
PackageID: 402,
Platform: "openai",
Model: "gpt-fail-second",
Version: 9,
OccurredAt: time.Unix(21, 0).UTC(),
GatewaySyncStatus: domain.GatewaySyncStatusPending,
})
service := NewService(repo)
service.now = func() time.Time { return time.Unix(30, 0).UTC() }
out, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(out.Items) != 2 {
t.Fatalf("unexpected item count: %d", len(out.Items))
}
snapshot, ok := repo.GetGatewayAppliedSnapshot(context.Background(), "gateway")
if !ok {
t.Fatal("expected snapshot after applied event")
}
if snapshot.LastEventID != "evt-apply-first" || snapshot.LastPackageID != 401 || snapshot.LastResult != string(domain.GatewayAckResultApplied) {
t.Fatalf("expected snapshot to stay on last applied event, got %+v", snapshot)
}
events := repo.ListPackageEvents(context.Background())
statusByID := map[string]domain.GatewaySyncStatus{}
for _, event := range events {
statusByID[event.EventID] = event.GatewaySyncStatus
}
if statusByID["evt-apply-first"] != domain.GatewaySyncStatusApplied {
t.Fatalf("expected first event applied, got %+v", statusByID)
}
if statusByID["evt-fail-second"] != domain.GatewaySyncStatusFailed {
t.Fatalf("expected second event failed, got %+v", statusByID)
}
}
func TestServiceConsumeOnceRetriesTransientFailureUntilApplied(t *testing.T) {
repo := repository.NewMemoryRepository()
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-retry-success",
EventType: "supply_package_published",
PackageID: 501,
Platform: "openai",
Model: "gpt-4.1-retry-success",
Version: 1,
OccurredAt: time.Unix(10, 0).UTC(),
GatewaySyncStatus: domain.GatewaySyncStatusPending,
})
service := NewService(repo)
times := []time.Time{
time.Unix(60, 0).UTC(),
time.Unix(61, 0).UTC(),
time.Unix(120, 0).UTC(),
time.Unix(121, 0).UTC(),
time.Unix(420, 0).UTC(),
time.Unix(421, 0).UTC(),
}
service.now = func() time.Time {
if len(times) == 0 {
return time.Unix(421, 0).UTC()
}
now := times[0]
times = times[1:]
return now
}
attempts := 0
service.SetApplier(func(context.Context, domain.PackageChangeEvent) (GatewayApplyResult, error) {
attempts++
switch attempts {
case 1, 2:
return GatewayApplyResult{Retryable: true, FailureCategory: domain.GatewayFailureCategoryTemporaryTimeout, Detail: "gateway timeout"}, nil
case 3:
return GatewayApplyResult{AckResult: domain.GatewayAckResultApplied, Detail: "applied after retry"}, nil
default:
return GatewayApplyResult{}, errors.New("unexpected extra attempt")
}
})
first, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
if err != nil {
t.Fatalf("unexpected first consume error: %v", err)
}
if len(first.Items) != 1 {
t.Fatalf("expected one first item, got %+v", first.Items)
}
if first.Items[0].Result != domain.GatewayAckResultPending || first.Items[0].GatewaySyncStatus != domain.GatewaySyncStatusPending {
t.Fatalf("expected first item pending retry, got %+v", first.Items[0])
}
if first.Items[0].RetryCount != 1 {
t.Fatalf("expected first retry count 1, got %+v", first.Items[0])
}
if first.Items[0].NextRetryAt == nil || !first.Items[0].NextRetryAt.Equal(time.Unix(120, 0).UTC()) {
t.Fatalf("expected first next retry at +1m, got %+v", first.Items[0].NextRetryAt)
}
second, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
if err != nil {
t.Fatalf("unexpected second consume error: %v", err)
}
if len(second.Items) != 1 {
t.Fatalf("expected one second item at first retry window, got %+v", second.Items)
}
if second.Items[0].Result != domain.GatewayAckResultPending || second.Items[0].RetryCount != 2 {
t.Fatalf("expected second retry state, got %+v", second.Items[0])
}
if second.Items[0].NextRetryAt == nil || !second.Items[0].NextRetryAt.Equal(time.Unix(361, 0).UTC()) {
t.Fatalf("expected second next retry at +5m from retry attempt, got %+v", second.Items[0].NextRetryAt)
}
if second.Items[0].FailureCategory != domain.GatewayFailureCategoryTemporaryTimeout {
t.Fatalf("expected retry item to carry timeout category, got %+v", second.Items[0])
}
third, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
if err != nil {
t.Fatalf("unexpected third consume error: %v", err)
}
if len(third.Items) != 1 {
t.Fatalf("expected one third item after retry window opens, got %+v", third.Items)
}
if third.Items[0].Result != domain.GatewayAckResultApplied || third.Items[0].GatewaySyncStatus != domain.GatewaySyncStatusApplied {
t.Fatalf("expected final applied item on third consume, got %+v", third.Items[0])
}
fourth, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
if err != nil {
t.Fatalf("unexpected fourth consume error: %v", err)
}
if len(fourth.Items) != 0 {
t.Fatalf("expected no fourth item after event already applied, got %+v", fourth.Items)
}
fifth, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
if err != nil {
t.Fatalf("unexpected fifth consume error: %v", err)
}
if len(fifth.Items) != 0 {
t.Fatalf("expected no fifth item after event already applied, got %+v", fifth.Items)
}
if attempts != 3 {
t.Fatalf("expected three attempts, got %d", attempts)
}
events := repo.ListPackageEvents(context.Background())
if len(events) != 1 {
t.Fatalf("expected one event, got %+v", events)
}
evt := events[0]
if evt.GatewaySyncStatus != domain.GatewaySyncStatusApplied || evt.RetryCount != 2 {
t.Fatalf("expected applied event with retry history, got %+v", evt)
}
if evt.LastFailureCategory != domain.GatewayFailureCategoryTemporaryTimeout {
t.Fatalf("expected last failure category persisted, got %+v", evt)
}
snapshot, ok := repo.GetGatewayAppliedSnapshot(context.Background(), "gateway")
if !ok || snapshot.LastEventID != "evt-retry-success" {
t.Fatalf("expected applied snapshot for retried event, got %+v ok=%v", snapshot, ok)
}
}
func TestServiceConsumeOnceMarksRetryExhaustedAsFailed(t *testing.T) {
repo := repository.NewMemoryRepository()
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-retry-exhausted",
EventType: "supply_package_published",
PackageID: 601,
Platform: "openai",
Model: "gpt-4.1-retry-exhausted",
Version: 1,
OccurredAt: time.Unix(10, 0).UTC(),
GatewaySyncStatus: domain.GatewaySyncStatusPending,
})
service := NewService(repo)
times := []time.Time{
time.Unix(60, 0).UTC(),
time.Unix(120, 0).UTC(),
time.Unix(121, 0).UTC(),
time.Unix(420, 0).UTC(),
time.Unix(421, 0).UTC(),
}
service.now = func() time.Time {
if len(times) == 0 {
return time.Unix(421, 0).UTC()
}
now := times[0]
times = times[1:]
return now
}
attempts := 0
service.SetApplier(func(context.Context, domain.PackageChangeEvent) (GatewayApplyResult, error) {
attempts++
return GatewayApplyResult{Retryable: true, FailureCategory: domain.GatewayFailureCategoryTemporary5xx, Detail: "upstream 502"}, nil
})
for i := 0; i < 5; i++ {
_, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
if err != nil {
t.Fatalf("unexpected consume error at step %d: %v", i+1, err)
}
}
if attempts != 3 {
t.Fatalf("expected three attempts before terminal failure, got %d", attempts)
}
events := repo.ListPackageEvents(context.Background())
if len(events) != 1 {
t.Fatalf("expected one event, got %+v", events)
}
evt := events[0]
if evt.GatewaySyncStatus != domain.GatewaySyncStatusFailed {
t.Fatalf("expected failed terminal status, got %+v", evt)
}
if evt.RetryCount != 2 {
t.Fatalf("expected retry_count=2 after exhausting two scheduled retries, got %+v", evt)
}
if evt.NextRetryAt != nil {
t.Fatalf("expected next retry cleared after terminal failure, got %+v", evt)
}
if evt.LastFailureCategory != domain.GatewayFailureCategoryTemporary5xx {
t.Fatalf("expected persisted category temporary_5xx, got %+v", evt)
}
}
func TestServiceConsumeOnceMarksNonRetryableFailureAsFailed(t *testing.T) {
repo := repository.NewMemoryRepository()
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-non-retryable",
EventType: "supply_package_published",
PackageID: 701,
Platform: "openai",
Model: "gpt-4.1-non-retryable",
Version: 1,
OccurredAt: time.Unix(10, 0).UTC(),
GatewaySyncStatus: domain.GatewaySyncStatusPending,
})
service := NewService(repo)
service.now = func() time.Time { return time.Unix(60, 0).UTC() }
service.SetApplier(func(context.Context, domain.PackageChangeEvent) (GatewayApplyResult, error) {
return GatewayApplyResult{Retryable: false, FailureCategory: domain.GatewayFailureCategoryContractInvalid, Detail: "schema mismatch"}, nil
})
out, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(out.Items) != 1 {
t.Fatalf("expected one item, got %+v", out.Items)
}
if out.Items[0].Result != domain.GatewayAckResultFailed || out.Items[0].GatewaySyncStatus != domain.GatewaySyncStatusFailed {
t.Fatalf("expected failed item, got %+v", out.Items[0])
}
if out.Items[0].FailureCategory != domain.GatewayFailureCategoryContractInvalid {
t.Fatalf("expected contract_invalid category, got %+v", out.Items[0])
}
events := repo.ListPackageEvents(context.Background())
if len(events) != 1 || events[0].RetryCount != 0 || events[0].GatewaySyncStatus != domain.GatewaySyncStatusFailed {
t.Fatalf("expected non-retryable immediate failure, got %+v", events)
}
}