437 lines
16 KiB
Go
437 lines
16 KiB
Go
package gatewayconsumer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"testing"
|
|
"time"
|
|
|
|
"supply-intelligence/internal/domain"
|
|
"supply-intelligence/internal/repository"
|
|
)
|
|
|
|
func TestServiceConsumeOnceAppliedAndFailed(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-applied",
|
|
EventType: "supply_package_published",
|
|
PackageID: 101,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-mini",
|
|
Version: 3,
|
|
OccurredAt: time.Unix(10, 0).UTC(),
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
})
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-failed",
|
|
EventType: "supply_package_published",
|
|
PackageID: 102,
|
|
Platform: "openai",
|
|
Model: "gpt-fail-model",
|
|
Version: 4,
|
|
OccurredAt: time.Unix(20, 0).UTC(),
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
})
|
|
|
|
service := NewService(repo)
|
|
service.now = func() time.Time { return time.Unix(30, 0).UTC() }
|
|
|
|
out, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if len(out.Items) != 2 {
|
|
t.Fatalf("unexpected item count: %d", len(out.Items))
|
|
}
|
|
if out.Items[0].GatewaySyncStatus != domain.GatewaySyncStatusApplied {
|
|
t.Fatalf("unexpected first status: %+v", out.Items[0])
|
|
}
|
|
if out.Items[1].GatewaySyncStatus != domain.GatewaySyncStatusFailed {
|
|
t.Fatalf("unexpected second status: %+v", out.Items[1])
|
|
}
|
|
|
|
events := repo.ListPackageEvents(context.Background())
|
|
var appliedEvt, failedEvt domain.PackageChangeEvent
|
|
for _, e := range events {
|
|
if e.EventID == "evt-applied" {
|
|
appliedEvt = e
|
|
} else if e.EventID == "evt-failed" {
|
|
failedEvt = e
|
|
}
|
|
}
|
|
if appliedEvt.GatewaySyncStatus != domain.GatewaySyncStatusApplied {
|
|
t.Fatalf("expected applied event, got %+v", appliedEvt)
|
|
}
|
|
if failedEvt.GatewaySyncStatus != domain.GatewaySyncStatusFailed {
|
|
t.Fatalf("expected failed event, got %+v", failedEvt)
|
|
}
|
|
snapshot, ok := repo.GetGatewayAppliedSnapshot(context.Background(), "gateway")
|
|
if !ok {
|
|
t.Fatal("expected applied snapshot")
|
|
}
|
|
if snapshot.LastEventID != "evt-applied" || snapshot.LastPackageID != 101 {
|
|
t.Fatalf("unexpected snapshot: %+v", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestServiceConsumeOnceRejectsInvalidNilService(t *testing.T) {
|
|
var service *Service
|
|
_, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{})
|
|
if err != ErrInvalidConsumeInput {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestServiceConsumeOnceSkipsNonPendingEvents(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-applied-existing",
|
|
EventType: "supply_package_published",
|
|
PackageID: 201,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-applied",
|
|
Version: 5,
|
|
OccurredAt: time.Unix(10, 0).UTC(),
|
|
GatewaySyncStatus: domain.GatewaySyncStatusApplied,
|
|
})
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-failed-existing",
|
|
EventType: "supply_package_published",
|
|
PackageID: 202,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-failed",
|
|
Version: 6,
|
|
OccurredAt: time.Unix(11, 0).UTC(),
|
|
GatewaySyncStatus: domain.GatewaySyncStatusFailed,
|
|
})
|
|
|
|
service := NewService(repo)
|
|
out, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if len(out.Items) != 0 {
|
|
t.Fatalf("expected no items for non-pending events, got %+v", out.Items)
|
|
}
|
|
if _, ok := repo.GetGatewayAppliedSnapshot(context.Background(), "gateway"); ok {
|
|
t.Fatalf("expected no snapshot update when no pending events were consumed")
|
|
}
|
|
}
|
|
|
|
func TestServiceConsumeOnceSkipsUnauthorizedEvents(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.UpsertSupplyAccount(context.Background(), domain.SupplyAccount{
|
|
AccountID: 301,
|
|
Platform: "openai",
|
|
APIKey: "key-other",
|
|
ConsumerTag: "other-consumer",
|
|
Status: "active",
|
|
CreatedAt: time.Unix(1, 0).UTC(),
|
|
UpdatedAt: time.Unix(1, 0).UTC(),
|
|
})
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-unauthorized",
|
|
EventType: "supply_package_published",
|
|
PackageID: 301,
|
|
AccountID: 301,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-unauthorized",
|
|
Version: 7,
|
|
OccurredAt: time.Unix(12, 0).UTC(),
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
})
|
|
|
|
service := NewService(repo)
|
|
out, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if len(out.Items) != 0 {
|
|
t.Fatalf("expected unauthorized event to be skipped, got %+v", out.Items)
|
|
}
|
|
events := repo.ListPackageEvents(context.Background())
|
|
if len(events) != 1 || events[0].GatewaySyncStatus != domain.GatewaySyncStatusPending {
|
|
t.Fatalf("expected unauthorized event to remain pending, got %+v", events)
|
|
}
|
|
if _, ok := repo.GetGatewayAppliedSnapshot(context.Background(), "gateway"); ok {
|
|
t.Fatalf("expected no snapshot update for unauthorized event")
|
|
}
|
|
}
|
|
|
|
func TestServiceConsumeOnceFailedDoesNotDriftSnapshot(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-apply-first",
|
|
EventType: "supply_package_published",
|
|
PackageID: 401,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-first",
|
|
Version: 8,
|
|
OccurredAt: time.Unix(20, 0).UTC(),
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
})
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-fail-second",
|
|
EventType: "supply_package_published",
|
|
PackageID: 402,
|
|
Platform: "openai",
|
|
Model: "gpt-fail-second",
|
|
Version: 9,
|
|
OccurredAt: time.Unix(21, 0).UTC(),
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
})
|
|
|
|
service := NewService(repo)
|
|
service.now = func() time.Time { return time.Unix(30, 0).UTC() }
|
|
out, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if len(out.Items) != 2 {
|
|
t.Fatalf("unexpected item count: %d", len(out.Items))
|
|
}
|
|
snapshot, ok := repo.GetGatewayAppliedSnapshot(context.Background(), "gateway")
|
|
if !ok {
|
|
t.Fatal("expected snapshot after applied event")
|
|
}
|
|
if snapshot.LastEventID != "evt-apply-first" || snapshot.LastPackageID != 401 || snapshot.LastResult != string(domain.GatewayAckResultApplied) {
|
|
t.Fatalf("expected snapshot to stay on last applied event, got %+v", snapshot)
|
|
}
|
|
events := repo.ListPackageEvents(context.Background())
|
|
statusByID := map[string]domain.GatewaySyncStatus{}
|
|
for _, event := range events {
|
|
statusByID[event.EventID] = event.GatewaySyncStatus
|
|
}
|
|
if statusByID["evt-apply-first"] != domain.GatewaySyncStatusApplied {
|
|
t.Fatalf("expected first event applied, got %+v", statusByID)
|
|
}
|
|
if statusByID["evt-fail-second"] != domain.GatewaySyncStatusFailed {
|
|
t.Fatalf("expected second event failed, got %+v", statusByID)
|
|
}
|
|
}
|
|
|
|
func TestServiceConsumeOnceRetriesTransientFailureUntilApplied(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-retry-success",
|
|
EventType: "supply_package_published",
|
|
PackageID: 501,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-retry-success",
|
|
Version: 1,
|
|
OccurredAt: time.Unix(10, 0).UTC(),
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
})
|
|
|
|
service := NewService(repo)
|
|
times := []time.Time{
|
|
time.Unix(60, 0).UTC(),
|
|
time.Unix(61, 0).UTC(),
|
|
time.Unix(120, 0).UTC(),
|
|
time.Unix(121, 0).UTC(),
|
|
time.Unix(420, 0).UTC(),
|
|
time.Unix(421, 0).UTC(),
|
|
}
|
|
service.now = func() time.Time {
|
|
if len(times) == 0 {
|
|
return time.Unix(421, 0).UTC()
|
|
}
|
|
now := times[0]
|
|
times = times[1:]
|
|
return now
|
|
}
|
|
attempts := 0
|
|
service.SetApplier(func(context.Context, domain.PackageChangeEvent) (GatewayApplyResult, error) {
|
|
attempts++
|
|
switch attempts {
|
|
case 1, 2:
|
|
return GatewayApplyResult{Retryable: true, FailureCategory: domain.GatewayFailureCategoryTemporaryTimeout, Detail: "gateway timeout"}, nil
|
|
case 3:
|
|
return GatewayApplyResult{AckResult: domain.GatewayAckResultApplied, Detail: "applied after retry"}, nil
|
|
default:
|
|
return GatewayApplyResult{}, errors.New("unexpected extra attempt")
|
|
}
|
|
})
|
|
|
|
first, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected first consume error: %v", err)
|
|
}
|
|
if len(first.Items) != 1 {
|
|
t.Fatalf("expected one first item, got %+v", first.Items)
|
|
}
|
|
if first.Items[0].Result != domain.GatewayAckResultPending || first.Items[0].GatewaySyncStatus != domain.GatewaySyncStatusPending {
|
|
t.Fatalf("expected first item pending retry, got %+v", first.Items[0])
|
|
}
|
|
if first.Items[0].RetryCount != 1 {
|
|
t.Fatalf("expected first retry count 1, got %+v", first.Items[0])
|
|
}
|
|
if first.Items[0].NextRetryAt == nil || !first.Items[0].NextRetryAt.Equal(time.Unix(120, 0).UTC()) {
|
|
t.Fatalf("expected first next retry at +1m, got %+v", first.Items[0].NextRetryAt)
|
|
}
|
|
|
|
second, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected second consume error: %v", err)
|
|
}
|
|
if len(second.Items) != 1 {
|
|
t.Fatalf("expected one second item at first retry window, got %+v", second.Items)
|
|
}
|
|
if second.Items[0].Result != domain.GatewayAckResultPending || second.Items[0].RetryCount != 2 {
|
|
t.Fatalf("expected second retry state, got %+v", second.Items[0])
|
|
}
|
|
if second.Items[0].NextRetryAt == nil || !second.Items[0].NextRetryAt.Equal(time.Unix(361, 0).UTC()) {
|
|
t.Fatalf("expected second next retry at +5m from retry attempt, got %+v", second.Items[0].NextRetryAt)
|
|
}
|
|
if second.Items[0].FailureCategory != domain.GatewayFailureCategoryTemporaryTimeout {
|
|
t.Fatalf("expected retry item to carry timeout category, got %+v", second.Items[0])
|
|
}
|
|
|
|
third, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected third consume error: %v", err)
|
|
}
|
|
if len(third.Items) != 1 {
|
|
t.Fatalf("expected one third item after retry window opens, got %+v", third.Items)
|
|
}
|
|
if third.Items[0].Result != domain.GatewayAckResultApplied || third.Items[0].GatewaySyncStatus != domain.GatewaySyncStatusApplied {
|
|
t.Fatalf("expected final applied item on third consume, got %+v", third.Items[0])
|
|
}
|
|
|
|
fourth, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected fourth consume error: %v", err)
|
|
}
|
|
if len(fourth.Items) != 0 {
|
|
t.Fatalf("expected no fourth item after event already applied, got %+v", fourth.Items)
|
|
}
|
|
|
|
fifth, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected fifth consume error: %v", err)
|
|
}
|
|
if len(fifth.Items) != 0 {
|
|
t.Fatalf("expected no fifth item after event already applied, got %+v", fifth.Items)
|
|
}
|
|
if attempts != 3 {
|
|
t.Fatalf("expected three attempts, got %d", attempts)
|
|
}
|
|
events := repo.ListPackageEvents(context.Background())
|
|
if len(events) != 1 {
|
|
t.Fatalf("expected one event, got %+v", events)
|
|
}
|
|
evt := events[0]
|
|
if evt.GatewaySyncStatus != domain.GatewaySyncStatusApplied || evt.RetryCount != 2 {
|
|
t.Fatalf("expected applied event with retry history, got %+v", evt)
|
|
}
|
|
if evt.LastFailureCategory != domain.GatewayFailureCategoryTemporaryTimeout {
|
|
t.Fatalf("expected last failure category persisted, got %+v", evt)
|
|
}
|
|
snapshot, ok := repo.GetGatewayAppliedSnapshot(context.Background(), "gateway")
|
|
if !ok || snapshot.LastEventID != "evt-retry-success" {
|
|
t.Fatalf("expected applied snapshot for retried event, got %+v ok=%v", snapshot, ok)
|
|
}
|
|
}
|
|
|
|
func TestServiceConsumeOnceMarksRetryExhaustedAsFailed(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-retry-exhausted",
|
|
EventType: "supply_package_published",
|
|
PackageID: 601,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-retry-exhausted",
|
|
Version: 1,
|
|
OccurredAt: time.Unix(10, 0).UTC(),
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
})
|
|
|
|
service := NewService(repo)
|
|
times := []time.Time{
|
|
time.Unix(60, 0).UTC(),
|
|
time.Unix(120, 0).UTC(),
|
|
time.Unix(121, 0).UTC(),
|
|
time.Unix(420, 0).UTC(),
|
|
time.Unix(421, 0).UTC(),
|
|
}
|
|
service.now = func() time.Time {
|
|
if len(times) == 0 {
|
|
return time.Unix(421, 0).UTC()
|
|
}
|
|
now := times[0]
|
|
times = times[1:]
|
|
return now
|
|
}
|
|
attempts := 0
|
|
service.SetApplier(func(context.Context, domain.PackageChangeEvent) (GatewayApplyResult, error) {
|
|
attempts++
|
|
return GatewayApplyResult{Retryable: true, FailureCategory: domain.GatewayFailureCategoryTemporary5xx, Detail: "upstream 502"}, nil
|
|
})
|
|
|
|
for i := 0; i < 5; i++ {
|
|
_, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected consume error at step %d: %v", i+1, err)
|
|
}
|
|
}
|
|
|
|
if attempts != 3 {
|
|
t.Fatalf("expected three attempts before terminal failure, got %d", attempts)
|
|
}
|
|
events := repo.ListPackageEvents(context.Background())
|
|
if len(events) != 1 {
|
|
t.Fatalf("expected one event, got %+v", events)
|
|
}
|
|
evt := events[0]
|
|
if evt.GatewaySyncStatus != domain.GatewaySyncStatusFailed {
|
|
t.Fatalf("expected failed terminal status, got %+v", evt)
|
|
}
|
|
if evt.RetryCount != 2 {
|
|
t.Fatalf("expected retry_count=2 after exhausting two scheduled retries, got %+v", evt)
|
|
}
|
|
if evt.NextRetryAt != nil {
|
|
t.Fatalf("expected next retry cleared after terminal failure, got %+v", evt)
|
|
}
|
|
if evt.LastFailureCategory != domain.GatewayFailureCategoryTemporary5xx {
|
|
t.Fatalf("expected persisted category temporary_5xx, got %+v", evt)
|
|
}
|
|
}
|
|
|
|
func TestServiceConsumeOnceMarksNonRetryableFailureAsFailed(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-non-retryable",
|
|
EventType: "supply_package_published",
|
|
PackageID: 701,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-non-retryable",
|
|
Version: 1,
|
|
OccurredAt: time.Unix(10, 0).UTC(),
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
})
|
|
|
|
service := NewService(repo)
|
|
service.now = func() time.Time { return time.Unix(60, 0).UTC() }
|
|
service.SetApplier(func(context.Context, domain.PackageChangeEvent) (GatewayApplyResult, error) {
|
|
return GatewayApplyResult{Retryable: false, FailureCategory: domain.GatewayFailureCategoryContractInvalid, Detail: "schema mismatch"}, nil
|
|
})
|
|
|
|
out, err := service.ConsumeOnce(context.Background(), ConsumeOnceInput{Consumer: "gateway"})
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if len(out.Items) != 1 {
|
|
t.Fatalf("expected one item, got %+v", out.Items)
|
|
}
|
|
if out.Items[0].Result != domain.GatewayAckResultFailed || out.Items[0].GatewaySyncStatus != domain.GatewaySyncStatusFailed {
|
|
t.Fatalf("expected failed item, got %+v", out.Items[0])
|
|
}
|
|
if out.Items[0].FailureCategory != domain.GatewayFailureCategoryContractInvalid {
|
|
t.Fatalf("expected contract_invalid category, got %+v", out.Items[0])
|
|
}
|
|
events := repo.ListPackageEvents(context.Background())
|
|
if len(events) != 1 || events[0].RetryCount != 0 || events[0].GatewaySyncStatus != domain.GatewaySyncStatusFailed {
|
|
t.Fatalf("expected non-retryable immediate failure, got %+v", events)
|
|
}
|
|
}
|