package repository import ( "context" "errors" "fmt" "net" "os/exec" "path/filepath" "runtime" "strconv" "strings" "sync" "testing" "time" "supply-intelligence/internal/domain" "supply-intelligence/internal/publish" ) func requireDocker(t *testing.T) { t.Helper() if _, err := exec.LookPath("docker"); err != nil { t.Skip("docker not installed") } } func freeTCPPort(t *testing.T) int { t.Helper() ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("allocate free tcp port: %v", err) } defer ln.Close() addr, ok := ln.Addr().(*net.TCPAddr) if !ok { t.Fatalf("unexpected listener addr type: %T", ln.Addr()) } return addr.Port } func waitForPostgresReady(t *testing.T, port int, user, dbName, containerName string) { t.Helper() deadline := time.Now().Add(45 * time.Second) var lastOut string for time.Now().Before(deadline) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) cmd := exec.CommandContext(ctx, "pg_isready", "-h", "127.0.0.1", "-p", strconv.Itoa(port), "-U", user, "-d", dbName) out, err := cmd.CombinedOutput() cancel() lastOut = strings.TrimSpace(string(out)) if err == nil { return } time.Sleep(1 * time.Second) } logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() t.Fatalf("postgres container did not become ready on port %d within timeout; last pg_isready=%q logs=%s", port, lastOut, string(logs)) } func newPostgresTestRepository(t *testing.T) *PostgresRepository { t.Helper() requireDocker(t) if _, err := exec.LookPath("pg_isready"); err != nil { t.Skip("pg_isready not installed") } _, currentFile, _, ok := runtime.Caller(0) if !ok { t.Fatal("resolve current test file") } projectRoot := filepath.Clean(filepath.Join(filepath.Dir(currentFile), "..", "..")) migrationsDir := filepath.Join(projectRoot, "migrations") hostPort := freeTCPPort(t) containerName := fmt.Sprintf("supply-intelligence-repo-test-%d", time.Now().UnixNano()) dbName := "supply_intelligence" dbUser := "supply" dbPassword := "supply123" cmd := exec.Command("docker", "run", "-d", "--name", containerName, "-e", "POSTGRES_DB="+dbName, "-e", "POSTGRES_USER="+dbUser, "-e", "POSTGRES_PASSWORD="+dbPassword, "-p", fmt.Sprintf("127.0.0.1:%d:5432", hostPort), "-v", migrationsDir+":/docker-entrypoint-initdb.d:ro", "postgres:16-alpine", ) cmd.Dir = projectRoot if out, err := cmd.CombinedOutput(); err != nil { t.Skipf("start isolated postgres container failed: %v output=%s", err, string(out)) } t.Cleanup(func() { rmCmd := exec.Command("docker", "rm", "-f", containerName) rmCmd.Dir = projectRoot _, _ = rmCmd.CombinedOutput() }) waitForPostgresReady(t, hostPort, dbUser, dbName, containerName) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) t.Cleanup(cancel) dsn := fmt.Sprintf("host=127.0.0.1 port=%d user=%s password=%s dbname=%s sslmode=disable", hostPort, dbUser, dbPassword, dbName) repo, err := NewPostgresRepository(ctx, dsn) if err != nil { t.Fatalf("postgres not ready: %v", err) } return repo } func seedPublishCandidateAndPackage(t *testing.T, repo *PostgresRepository, candidateID string, accountID int64, platform, model string) { t.Helper() ctx := context.Background() repo.UpsertDiscoveryCandidateContext(ctx, domain.DiscoveryCandidate{CandidateID: candidateID, AccountID: accountID, Platform: platform, Model: model, Source: "admission", Status: domain.DiscoveryCandidateStatusTestPassed, DiscoveredAt: time.Unix(100,0).UTC(), UpdatedAt: time.Unix(110,0).UTC()}) repo.UpsertSupplyPackage(ctx, domain.SupplyPackage{PackageID: 1, Platform: platform, Model: model, Status: "draft", Source: "admission", CreatedAt: time.Unix(90,0).UTC(), UpdatedAt: time.Unix(110,0).UTC()}) } func mustLatestCandidate(t *testing.T, repo *PostgresRepository, ctx context.Context, platform, model string) domain.DiscoveryCandidate { t.Helper() v, ok := repo.GetLatestDiscoveryCandidateContext(ctx, platform, model) if !ok { t.Fatalf("candidate missing") } return v } func mustCandidateByID(t *testing.T, repo *PostgresRepository, ctx context.Context, id string) domain.DiscoveryCandidate { t.Helper() v, ok := repo.GetDiscoveryCandidateByIDContext(ctx, id) if !ok { t.Fatalf("candidate id missing") } return v } func mustPackage(t *testing.T, repo *PostgresRepository, ctx context.Context, platform, model string) domain.SupplyPackage { t.Helper() v, ok := repo.GetSupplyPackage(ctx, platform, model) if !ok { t.Fatalf("package missing") } return v } func TestPostgresPublishPackageAtomicallyConcurrentDoublePublish(t *testing.T) { repo := newPostgresTestRepository(t) ctx := context.Background() model := fmt.Sprintf("gpt-concurrent-%d", time.Now().UnixNano()) seedPublishCandidateAndPackage(t, repo, "cand-tx-concurrent", 7102, "openai", model) firstCandidate := mustLatestCandidate(t, repo, ctx, "openai", model) firstPackage := mustPackage(t, repo, ctx, "openai", model) firstCandidate.Status = domain.DiscoveryCandidateStatusPublished firstCandidate.UpdatedAt = time.Unix(300, 0).UTC() firstCandidate.Version++ firstPackage.Status = "active" firstPackage.UpdatedAt = time.Unix(300, 0).UTC() firstPackage.Version++ var wg sync.WaitGroup wg.Add(2) results := make(chan error, 2) for i := 0; i < 2; i++ { go func(idx int) { defer wg.Done() evtID := fmt.Sprintf("evt-concurrent-%d-%d", time.Now().UnixNano(), idx) _, err := repo.PublishPackageAtomically(ctx, publish.PublishPackageAtomicInput{ Candidate: firstCandidate, Package: firstPackage, Event: domain.PackageChangeEvent{ EventID: evtID, AccountID: 7102, EventType: publish.PackagePublishedEventType, PackageID: firstPackage.PackageID, Platform: "openai", Model: model, OccurredAt: time.Unix(300+int64(idx), 0).UTC(), Version: firstPackage.Version, GatewaySyncStatus: domain.GatewaySyncStatusPending, }, }) results <- err }(i) } wg.Wait() close(results) successCount := 0 failCount := 0 for err := range results { if err == nil { successCount++ } else { failCount++ if !errors.Is(err, publish.ErrPackageAlreadyPublished) && !errors.Is(err, publish.ErrCandidateNotPublishable) { t.Fatalf("unexpected concurrent error: %v", err) } } } if successCount != 1 { t.Fatalf("expected exactly 1 success, got %d", successCount) } if failCount != 1 { t.Fatalf("expected exactly 1 failure, got %d", failCount) } candidateAfter := mustCandidateByID(t, repo, ctx, "cand-tx-concurrent") if candidateAfter.Status != domain.DiscoveryCandidateStatusPublished { t.Fatalf("expected published candidate after concurrent publish, got %+v", candidateAfter) } pkgAfter := mustPackage(t, repo, ctx, "openai", model) if pkgAfter.Status != "active" { t.Fatalf("expected active package after concurrent publish, got %+v", pkgAfter) } events := repo.ListPackageEvents(ctx) var modelEvents int for _, e := range events { if e.Platform == "openai" && e.Model == model { modelEvents++ } } if modelEvents != 1 { t.Fatalf("expected exactly 1 event for model after concurrent publish, got %d", modelEvents) } } func TestPostgresPublishPackageAtomicallyRollsBackOnDuplicateEvent(t *testing.T) { repo := newPostgresTestRepository(t) ctx := context.Background() model := fmt.Sprintf("gpt-rollback-%d", time.Now().UnixNano()) seedPublishCandidateAndPackage(t, repo, "cand-tx-rollback", 7101, "openai", model) firstCandidate := mustLatestCandidate(t, repo, ctx, "openai", model) firstPackage := mustPackage(t, repo, ctx, "openai", model) firstCandidate.Status = domain.DiscoveryCandidateStatusPublished firstCandidate.UpdatedAt = time.Unix(200, 0).UTC() firstCandidate.Version++ firstPackage.Status = "active" firstPackage.UpdatedAt = time.Unix(200, 0).UTC() firstPackage.Version++ _, err := repo.PublishPackageAtomically(ctx, publish.PublishPackageAtomicInput{Candidate: firstCandidate, Package: firstPackage, Event: domain.PackageChangeEvent{EventID: "evt-rollback-1", AccountID: 7101, EventType: publish.PackagePublishedEventType, PackageID: firstPackage.PackageID, Platform: "openai", Model: model, OccurredAt: time.Unix(200, 0).UTC(), Version: firstPackage.Version, GatewaySyncStatus: domain.GatewaySyncStatusPending}}) if err != nil { t.Fatalf("seed publish failed: %v", err) } candidateBefore := mustCandidateByID(t, repo, ctx, "cand-tx-rollback") pkgBefore := mustPackage(t, repo, ctx, "openai", model) _, err = repo.PublishPackageAtomically(ctx, publish.PublishPackageAtomicInput{Candidate: candidateBefore, Package: pkgBefore, Event: domain.PackageChangeEvent{EventID: "evt-rollback-1", AccountID: 7101, EventType: publish.PackagePublishedEventType, PackageID: pkgBefore.PackageID, Platform: "openai", Model: model, OccurredAt: time.Unix(201, 0).UTC(), Version: pkgBefore.Version + 1, GatewaySyncStatus: domain.GatewaySyncStatusPending}}) if err == nil { t.Fatal("expected duplicate event error") } candidateAfter := mustCandidateByID(t, repo, ctx, "cand-tx-rollback") if candidateAfter.Status != candidateBefore.Status || candidateAfter.Version != candidateBefore.Version { t.Fatalf("candidate changed despite rollback: before=%+v after=%+v", candidateBefore, candidateAfter) } pkgAfter := mustPackage(t, repo, ctx, "openai", model) if pkgAfter.Status != pkgBefore.Status || pkgAfter.Version != pkgBefore.Version { t.Fatalf("package changed despite rollback: before=%+v after=%+v", pkgBefore, pkgAfter) } } func TestPostgresUpsertSupplyPackageAllocatesDistinctPackageIDsForZeroInput(t *testing.T) { repo := newPostgresTestRepository(t) ctx := context.Background() baseTime := time.Unix(100, 0).UTC() repo.UpsertSupplyPackage(ctx, domain.SupplyPackage{ Platform: "openai", Model: fmt.Sprintf("gpt-zero-id-a-%d", time.Now().UnixNano()), Status: "draft", Source: "admission", CreatedAt: baseTime, UpdatedAt: baseTime, }) repo.UpsertSupplyPackage(ctx, domain.SupplyPackage{ Platform: "openai", Model: fmt.Sprintf("gpt-zero-id-b-%d", time.Now().UnixNano()), Status: "draft", Source: "admission", CreatedAt: baseTime.Add(time.Second), UpdatedAt: baseTime.Add(time.Second), }) pkgs := repo.ListSupplyPackages(ctx, "") if len(pkgs) != 2 { t.Fatalf("expected 2 packages after zero-id upserts, got %d: %+v", len(pkgs), pkgs) } if pkgs[0].PackageID == 0 || pkgs[1].PackageID == 0 { t.Fatalf("expected non-zero package ids, got %+v", pkgs) } if pkgs[0].PackageID == pkgs[1].PackageID { t.Fatalf("expected distinct package ids, got %+v", pkgs) } }