Files
supply-intelligence/internal/poller/runtime_test.go
2026-05-12 18:49:52 +08:00

125 lines
4.0 KiB
Go

package poller
import (
"context"
"testing"
"time"
"supply-intelligence/internal/domain"
"supply-intelligence/internal/gatewayconsumer"
"supply-intelligence/internal/repository"
)
func TestRuntimeStartsBackgroundPolling(t *testing.T) {
repo := repository.NewMemoryRepository()
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-runtime-1",
EventType: "supply_package_published",
PackageID: 1,
Platform: "openai",
Model: "gpt-4.1-mini",
OccurredAt: time.Unix(1, 0).UTC(),
Version: 1,
GatewaySyncStatus: domain.GatewaySyncStatusPending,
})
service := gatewayconsumer.NewService(repo)
poller := NewGatewayPackagePoller(service)
runtime := NewRuntime(poller, 10*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if !runtime.Start(ctx) {
t.Fatalf("expected runtime to start")
}
defer runtime.Stop()
deadline := time.Now().Add(500 * time.Millisecond)
for time.Now().Before(deadline) {
items, _ := repo.ListPackageEventsAfter(context.Background(), "")
if len(items) == 1 && items[0].GatewaySyncStatus == domain.GatewaySyncStatusApplied {
return
}
time.Sleep(10 * time.Millisecond)
}
items, _ := repo.ListPackageEventsAfter(context.Background(), "")
t.Fatalf("expected background polling to apply event, got %+v", items)
}
func TestRuntimeStartRequiresPoller(t *testing.T) {
if (&Runtime{}).Start(context.Background()) {
t.Fatalf("expected runtime without poller to refuse start")
}
}
func TestRuntimePauseResumeAndStatus(t *testing.T) {
repo := repository.NewMemoryRepository()
repo.AppendPackageEvent(context.Background(), domain.PackageChangeEvent{
EventID: "evt-runtime-paused",
EventType: "supply_package_published",
PackageID: 2,
Platform: "openai",
Model: "gpt-4.1-runtime-paused",
OccurredAt: time.Unix(2, 0).UTC(),
Version: 1,
GatewaySyncStatus: domain.GatewaySyncStatusPending,
})
service := gatewayconsumer.NewService(repo)
service.SetApplier(func(context.Context, domain.PackageChangeEvent) (gatewayconsumer.GatewayApplyResult, error) {
return gatewayconsumer.GatewayApplyResult{AckResult: domain.GatewayAckResultApplied, Detail: "applied"}, nil
})
poller := NewGatewayPackagePoller(service)
runtime := NewRuntime(poller, 10*time.Millisecond)
if !runtime.Pause() {
t.Fatalf("expected pause before start to succeed")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if !runtime.Start(ctx) {
t.Fatalf("expected runtime to start")
}
defer runtime.Stop()
time.Sleep(50 * time.Millisecond)
items, _ := repo.ListPackageEventsAfter(context.Background(), "")
if len(items) != 1 || items[0].GatewaySyncStatus != domain.GatewaySyncStatusPending {
t.Fatalf("expected paused runtime to keep event pending, got %+v", items)
}
status := runtime.Status()
if !status.Started || !status.Paused {
t.Fatalf("expected started+paused status, got %+v", status)
}
if status.Cursor != "" {
t.Fatalf("expected empty cursor before processing, got %+v", status)
}
if !runtime.Resume() {
t.Fatalf("expected resume to succeed")
}
deadline := time.Now().Add(500 * time.Millisecond)
for time.Now().Before(deadline) {
items, _ = repo.ListPackageEventsAfter(context.Background(), "")
if len(items) == 1 && items[0].GatewaySyncStatus == domain.GatewaySyncStatusApplied {
break
}
time.Sleep(10 * time.Millisecond)
}
items, _ = repo.ListPackageEventsAfter(context.Background(), "")
if len(items) != 1 || items[0].GatewaySyncStatus != domain.GatewaySyncStatusApplied {
t.Fatalf("expected resumed runtime to apply event, got %+v", items)
}
status = runtime.Status()
if !status.Started || status.Paused {
t.Fatalf("expected started and not paused after resume, got %+v", status)
}
if status.LastPollAt == nil {
t.Fatalf("expected last poll timestamp after processing, got %+v", status)
}
if status.LastError != "" {
t.Fatalf("expected no last error, got %+v", status)
}
}