125 lines
4.0 KiB
Go
125 lines
4.0 KiB
Go
package poller
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"supply-intelligence/internal/domain"
|
|
"supply-intelligence/internal/gatewayconsumer"
|
|
"supply-intelligence/internal/repository"
|
|
)
|
|
|
|
func TestRuntimeStartsBackgroundPolling(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-runtime-1",
|
|
EventType: "supply_package_published",
|
|
PackageID: 1,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-mini",
|
|
OccurredAt: time.Unix(1, 0).UTC(),
|
|
Version: 1,
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
})
|
|
|
|
service := gatewayconsumer.NewService(repo)
|
|
poller := NewGatewayPackagePoller(service)
|
|
runtime := NewRuntime(poller, 10*time.Millisecond)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
if !runtime.Start(ctx) {
|
|
t.Fatalf("expected runtime to start")
|
|
}
|
|
defer runtime.Stop()
|
|
|
|
deadline := time.Now().Add(500 * time.Millisecond)
|
|
for time.Now().Before(deadline) {
|
|
items, _ := repo.ListPackageEventsAfter(context.Background(), "")
|
|
if len(items) == 1 && items[0].GatewaySyncStatus == domain.GatewaySyncStatusApplied {
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
items, _ := repo.ListPackageEventsAfter(context.Background(), "")
|
|
t.Fatalf("expected background polling to apply event, got %+v", items)
|
|
}
|
|
|
|
func TestRuntimeStartRequiresPoller(t *testing.T) {
|
|
if (&Runtime{}).Start(context.Background()) {
|
|
t.Fatalf("expected runtime without poller to refuse start")
|
|
}
|
|
}
|
|
|
|
func TestRuntimePauseResumeAndStatus(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
|
|
EventID: "evt-runtime-paused",
|
|
EventType: "supply_package_published",
|
|
PackageID: 2,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-runtime-paused",
|
|
OccurredAt: time.Unix(2, 0).UTC(),
|
|
Version: 1,
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
})
|
|
|
|
service := gatewayconsumer.NewService(repo)
|
|
service.SetApplier(func(context.Context, domain.PackageChangeEvent) (gatewayconsumer.GatewayApplyResult, error) {
|
|
return gatewayconsumer.GatewayApplyResult{AckResult: domain.GatewayAckResultApplied, Detail: "applied"}, nil
|
|
})
|
|
poller := NewGatewayPackagePoller(service)
|
|
runtime := NewRuntime(poller, 10*time.Millisecond)
|
|
|
|
if !runtime.Pause() {
|
|
t.Fatalf("expected pause before start to succeed")
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
if !runtime.Start(ctx) {
|
|
t.Fatalf("expected runtime to start")
|
|
}
|
|
defer runtime.Stop()
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
items, _ := repo.ListPackageEventsAfter(context.Background(), "")
|
|
if len(items) != 1 || items[0].GatewaySyncStatus != domain.GatewaySyncStatusPending {
|
|
t.Fatalf("expected paused runtime to keep event pending, got %+v", items)
|
|
}
|
|
status := runtime.Status()
|
|
if !status.Started || !status.Paused {
|
|
t.Fatalf("expected started+paused status, got %+v", status)
|
|
}
|
|
if status.Cursor != "" {
|
|
t.Fatalf("expected empty cursor before processing, got %+v", status)
|
|
}
|
|
|
|
if !runtime.Resume() {
|
|
t.Fatalf("expected resume to succeed")
|
|
}
|
|
deadline := time.Now().Add(500 * time.Millisecond)
|
|
for time.Now().Before(deadline) {
|
|
items, _ = repo.ListPackageEventsAfter(context.Background(), "")
|
|
if len(items) == 1 && items[0].GatewaySyncStatus == domain.GatewaySyncStatusApplied {
|
|
break
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
items, _ = repo.ListPackageEventsAfter(context.Background(), "")
|
|
if len(items) != 1 || items[0].GatewaySyncStatus != domain.GatewaySyncStatusApplied {
|
|
t.Fatalf("expected resumed runtime to apply event, got %+v", items)
|
|
}
|
|
status = runtime.Status()
|
|
if !status.Started || status.Paused {
|
|
t.Fatalf("expected started and not paused after resume, got %+v", status)
|
|
}
|
|
if status.LastPollAt == nil {
|
|
t.Fatalf("expected last poll timestamp after processing, got %+v", status)
|
|
}
|
|
if status.LastError != "" {
|
|
t.Fatalf("expected no last error, got %+v", status)
|
|
}
|
|
}
|