package poller import ( "context" "testing" "time" "supply-intelligence/internal/domain" "supply-intelligence/internal/gatewayconsumer" "supply-intelligence/internal/repository" ) func TestRuntimeStartsBackgroundPolling(t *testing.T) { repo := repository.NewMemoryRepository() repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{ EventID: "evt-runtime-1", EventType: "supply_package_published", PackageID: 1, Platform: "openai", Model: "gpt-4.1-mini", OccurredAt: time.Unix(1, 0).UTC(), Version: 1, GatewaySyncStatus: domain.GatewaySyncStatusPending, }) service := gatewayconsumer.NewService(repo) poller := NewGatewayPackagePoller(service) runtime := NewRuntime(poller, 10*time.Millisecond) ctx, cancel := context.WithCancel(context.Background()) defer cancel() if !runtime.Start(ctx) { t.Fatalf("expected runtime to start") } defer runtime.Stop() deadline := time.Now().Add(500 * time.Millisecond) for time.Now().Before(deadline) { items, _ := repo.ListPackageEventsAfter(context.Background(), "") if len(items) == 1 && items[0].GatewaySyncStatus == domain.GatewaySyncStatusApplied { return } time.Sleep(10 * time.Millisecond) } items, _ := repo.ListPackageEventsAfter(context.Background(), "") t.Fatalf("expected background polling to apply event, got %+v", items) } func TestRuntimeStartRequiresPoller(t *testing.T) { if (&Runtime{}).Start(context.Background()) { t.Fatalf("expected runtime without poller to refuse start") } } func TestRuntimePauseResumeAndStatus(t *testing.T) { repo := repository.NewMemoryRepository() repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{ EventID: "evt-runtime-paused", EventType: "supply_package_published", PackageID: 2, Platform: "openai", Model: "gpt-4.1-runtime-paused", OccurredAt: time.Unix(2, 0).UTC(), Version: 1, GatewaySyncStatus: domain.GatewaySyncStatusPending, }) service := gatewayconsumer.NewService(repo) service.SetApplier(func(context.Context, domain.PackageChangeEvent) (gatewayconsumer.GatewayApplyResult, error) { return gatewayconsumer.GatewayApplyResult{AckResult: domain.GatewayAckResultApplied, Detail: "applied"}, nil }) poller := NewGatewayPackagePoller(service) runtime := NewRuntime(poller, 10*time.Millisecond) if !runtime.Pause() { t.Fatalf("expected pause before start to succeed") } ctx, cancel := context.WithCancel(context.Background()) defer cancel() if !runtime.Start(ctx) { t.Fatalf("expected runtime to start") } defer runtime.Stop() time.Sleep(50 * time.Millisecond) items, _ := repo.ListPackageEventsAfter(context.Background(), "") if len(items) != 1 || items[0].GatewaySyncStatus != domain.GatewaySyncStatusPending { t.Fatalf("expected paused runtime to keep event pending, got %+v", items) } status := runtime.Status() if !status.Started || !status.Paused { t.Fatalf("expected started+paused status, got %+v", status) } if status.Cursor != "" { t.Fatalf("expected empty cursor before processing, got %+v", status) } if !runtime.Resume() { t.Fatalf("expected resume to succeed") } deadline := time.Now().Add(500 * time.Millisecond) for time.Now().Before(deadline) { items, _ = repo.ListPackageEventsAfter(context.Background(), "") if len(items) == 1 && items[0].GatewaySyncStatus == domain.GatewaySyncStatusApplied { break } time.Sleep(10 * time.Millisecond) } items, _ = repo.ListPackageEventsAfter(context.Background(), "") if len(items) != 1 || items[0].GatewaySyncStatus != domain.GatewaySyncStatusApplied { t.Fatalf("expected resumed runtime to apply event, got %+v", items) } status = runtime.Status() if !status.Started || status.Paused { t.Fatalf("expected started and not paused after resume, got %+v", status) } if status.LastPollAt == nil { t.Fatalf("expected last poll timestamp after processing, got %+v", status) } if status.LastError != "" { t.Fatalf("expected no last error, got %+v", status) } }