Files
supply-intelligence/internal/repository/postgres_publish_tx_test.go
2026-05-12 18:49:52 +08:00

287 lines
10 KiB
Go

package repository
import (
"context"
"errors"
"fmt"
"net"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"supply-intelligence/internal/domain"
"supply-intelligence/internal/publish"
)
func requireDocker(t *testing.T) {
t.Helper()
if _, err := exec.LookPath("docker"); err != nil {
t.Skip("docker not installed")
}
}
func freeTCPPort(t *testing.T) int {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("allocate free tcp port: %v", err)
}
defer ln.Close()
addr, ok := ln.Addr().(*net.TCPAddr)
if !ok {
t.Fatalf("unexpected listener addr type: %T", ln.Addr())
}
return addr.Port
}
func waitForPostgresReady(t *testing.T, port int, user, dbName, containerName string) {
t.Helper()
deadline := time.Now().Add(45 * time.Second)
var lastOut string
for time.Now().Before(deadline) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
cmd := exec.CommandContext(ctx, "pg_isready", "-h", "127.0.0.1", "-p", strconv.Itoa(port), "-U", user, "-d", dbName)
out, err := cmd.CombinedOutput()
cancel()
lastOut = strings.TrimSpace(string(out))
if err == nil {
return
}
time.Sleep(1 * time.Second)
}
logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput()
t.Fatalf("postgres container did not become ready on port %d within timeout; last pg_isready=%q logs=%s", port, lastOut, string(logs))
}
func newPostgresTestRepository(t *testing.T) *PostgresRepository {
t.Helper()
requireDocker(t)
if _, err := exec.LookPath("pg_isready"); err != nil {
t.Skip("pg_isready not installed")
}
_, currentFile, _, ok := runtime.Caller(0)
if !ok {
t.Fatal("resolve current test file")
}
projectRoot := filepath.Clean(filepath.Join(filepath.Dir(currentFile), "..", ".."))
migrationsDir := filepath.Join(projectRoot, "migrations")
hostPort := freeTCPPort(t)
containerName := fmt.Sprintf("supply-intelligence-repo-test-%d", time.Now().UnixNano())
dbName := "supply_intelligence"
dbUser := "supply"
dbPassword := "supply123"
cmd := exec.Command("docker", "run", "-d",
"--name", containerName,
"-e", "POSTGRES_DB="+dbName,
"-e", "POSTGRES_USER="+dbUser,
"-e", "POSTGRES_PASSWORD="+dbPassword,
"-p", fmt.Sprintf("127.0.0.1:%d:5432", hostPort),
"-v", migrationsDir+":/docker-entrypoint-initdb.d:ro",
"postgres:16-alpine",
)
cmd.Dir = projectRoot
if out, err := cmd.CombinedOutput(); err != nil {
t.Skipf("start isolated postgres container failed: %v output=%s", err, string(out))
}
t.Cleanup(func() {
rmCmd := exec.Command("docker", "rm", "-f", containerName)
rmCmd.Dir = projectRoot
_, _ = rmCmd.CombinedOutput()
})
waitForPostgresReady(t, hostPort, dbUser, dbName, containerName)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
t.Cleanup(cancel)
dsn := fmt.Sprintf("host=127.0.0.1 port=%d user=%s password=%s dbname=%s sslmode=disable", hostPort, dbUser, dbPassword, dbName)
repo, err := NewPostgresRepository(ctx, dsn)
if err != nil {
t.Fatalf("postgres not ready: %v", err)
}
return repo
}
func seedPublishCandidateAndPackage(t *testing.T, repo *PostgresRepository, candidateID string, accountID int64, platform, model string) {
t.Helper()
ctx := context.Background()
repo.UpsertDiscoveryCandidateContext(ctx, domain.DiscoveryCandidate{CandidateID: candidateID, AccountID: accountID, Platform: platform, Model: model, Source: "admission", Status: domain.DiscoveryCandidateStatusTestPassed, DiscoveredAt: time.Unix(100,0).UTC(), UpdatedAt: time.Unix(110,0).UTC()})
repo.UpsertSupplyPackage(ctx, domain.SupplyPackage{PackageID: 1, Platform: platform, Model: model, Status: "draft", Source: "admission", CreatedAt: time.Unix(90,0).UTC(), UpdatedAt: time.Unix(110,0).UTC()})
}
func mustLatestCandidate(t *testing.T, repo *PostgresRepository, ctx context.Context, platform, model string) domain.DiscoveryCandidate {
t.Helper()
v, ok := repo.GetLatestDiscoveryCandidateContext(ctx, platform, model)
if !ok { t.Fatalf("candidate missing") }
return v
}
func mustCandidateByID(t *testing.T, repo *PostgresRepository, ctx context.Context, id string) domain.DiscoveryCandidate {
t.Helper()
v, ok := repo.GetDiscoveryCandidateByIDContext(ctx, id)
if !ok { t.Fatalf("candidate id missing") }
return v
}
func mustPackage(t *testing.T, repo *PostgresRepository, ctx context.Context, platform, model string) domain.SupplyPackage {
t.Helper()
v, ok := repo.GetSupplyPackage(ctx, platform, model)
if !ok { t.Fatalf("package missing") }
return v
}
func TestPostgresPublishPackageAtomicallyConcurrentDoublePublish(t *testing.T) {
repo := newPostgresTestRepository(t)
ctx := context.Background()
model := fmt.Sprintf("gpt-concurrent-%d", time.Now().UnixNano())
seedPublishCandidateAndPackage(t, repo, "cand-tx-concurrent", 7102, "openai", model)
firstCandidate := mustLatestCandidate(t, repo, ctx, "openai", model)
firstPackage := mustPackage(t, repo, ctx, "openai", model)
firstCandidate.Status = domain.DiscoveryCandidateStatusPublished
firstCandidate.UpdatedAt = time.Unix(300, 0).UTC()
firstCandidate.Version++
firstPackage.Status = "active"
firstPackage.UpdatedAt = time.Unix(300, 0).UTC()
firstPackage.Version++
var wg sync.WaitGroup
wg.Add(2)
results := make(chan error, 2)
for i := 0; i < 2; i++ {
go func(idx int) {
defer wg.Done()
evtID := fmt.Sprintf("evt-concurrent-%d-%d", time.Now().UnixNano(), idx)
_, err := repo.PublishPackageAtomically(ctx, publish.PublishPackageAtomicInput{
Candidate: firstCandidate,
Package: firstPackage,
Event: domain.PackageChangeEvent{
EventID: evtID,
AccountID: 7102,
EventType: publish.PackagePublishedEventType,
PackageID: firstPackage.PackageID,
Platform: "openai",
Model: model,
OccurredAt: time.Unix(300+int64(idx), 0).UTC(),
Version: firstPackage.Version,
GatewaySyncStatus: domain.GatewaySyncStatusPending,
},
})
results <- err
}(i)
}
wg.Wait()
close(results)
successCount := 0
failCount := 0
for err := range results {
if err == nil {
successCount++
} else {
failCount++
if !errors.Is(err, publish.ErrPackageAlreadyPublished) && !errors.Is(err, publish.ErrCandidateNotPublishable) {
t.Fatalf("unexpected concurrent error: %v", err)
}
}
}
if successCount != 1 {
t.Fatalf("expected exactly 1 success, got %d", successCount)
}
if failCount != 1 {
t.Fatalf("expected exactly 1 failure, got %d", failCount)
}
candidateAfter := mustCandidateByID(t, repo, ctx, "cand-tx-concurrent")
if candidateAfter.Status != domain.DiscoveryCandidateStatusPublished {
t.Fatalf("expected published candidate after concurrent publish, got %+v", candidateAfter)
}
pkgAfter := mustPackage(t, repo, ctx, "openai", model)
if pkgAfter.Status != "active" {
t.Fatalf("expected active package after concurrent publish, got %+v", pkgAfter)
}
events := repo.ListPackageEvents(ctx)
var modelEvents int
for _, e := range events {
if e.Platform == "openai" && e.Model == model {
modelEvents++
}
}
if modelEvents != 1 {
t.Fatalf("expected exactly 1 event for model after concurrent publish, got %d", modelEvents)
}
}
func TestPostgresPublishPackageAtomicallyRollsBackOnDuplicateEvent(t *testing.T) {
repo := newPostgresTestRepository(t)
ctx := context.Background()
model := fmt.Sprintf("gpt-rollback-%d", time.Now().UnixNano())
seedPublishCandidateAndPackage(t, repo, "cand-tx-rollback", 7101, "openai", model)
firstCandidate := mustLatestCandidate(t, repo, ctx, "openai", model)
firstPackage := mustPackage(t, repo, ctx, "openai", model)
firstCandidate.Status = domain.DiscoveryCandidateStatusPublished
firstCandidate.UpdatedAt = time.Unix(200, 0).UTC()
firstCandidate.Version++
firstPackage.Status = "active"
firstPackage.UpdatedAt = time.Unix(200, 0).UTC()
firstPackage.Version++
_, err := repo.PublishPackageAtomically(ctx, publish.PublishPackageAtomicInput{Candidate: firstCandidate, Package: firstPackage, Event: domain.PackageChangeEvent{EventID: "evt-rollback-1", AccountID: 7101, EventType: publish.PackagePublishedEventType, PackageID: firstPackage.PackageID, Platform: "openai", Model: model, OccurredAt: time.Unix(200, 0).UTC(), Version: firstPackage.Version, GatewaySyncStatus: domain.GatewaySyncStatusPending}})
if err != nil {
t.Fatalf("seed publish failed: %v", err)
}
candidateBefore := mustCandidateByID(t, repo, ctx, "cand-tx-rollback")
pkgBefore := mustPackage(t, repo, ctx, "openai", model)
_, err = repo.PublishPackageAtomically(ctx, publish.PublishPackageAtomicInput{Candidate: candidateBefore, Package: pkgBefore, Event: domain.PackageChangeEvent{EventID: "evt-rollback-1", AccountID: 7101, EventType: publish.PackagePublishedEventType, PackageID: pkgBefore.PackageID, Platform: "openai", Model: model, OccurredAt: time.Unix(201, 0).UTC(), Version: pkgBefore.Version + 1, GatewaySyncStatus: domain.GatewaySyncStatusPending}})
if err == nil {
t.Fatal("expected duplicate event error")
}
candidateAfter := mustCandidateByID(t, repo, ctx, "cand-tx-rollback")
if candidateAfter.Status != candidateBefore.Status || candidateAfter.Version != candidateBefore.Version {
t.Fatalf("candidate changed despite rollback: before=%+v after=%+v", candidateBefore, candidateAfter)
}
pkgAfter := mustPackage(t, repo, ctx, "openai", model)
if pkgAfter.Status != pkgBefore.Status || pkgAfter.Version != pkgBefore.Version {
t.Fatalf("package changed despite rollback: before=%+v after=%+v", pkgBefore, pkgAfter)
}
}
func TestPostgresUpsertSupplyPackageAllocatesDistinctPackageIDsForZeroInput(t *testing.T) {
repo := newPostgresTestRepository(t)
ctx := context.Background()
baseTime := time.Unix(100, 0).UTC()
repo.UpsertSupplyPackage(ctx, domain.SupplyPackage{
Platform: "openai",
Model: fmt.Sprintf("gpt-zero-id-a-%d", time.Now().UnixNano()),
Status: "draft",
Source: "admission",
CreatedAt: baseTime,
UpdatedAt: baseTime,
})
repo.UpsertSupplyPackage(ctx, domain.SupplyPackage{
Platform: "openai",
Model: fmt.Sprintf("gpt-zero-id-b-%d", time.Now().UnixNano()),
Status: "draft",
Source: "admission",
CreatedAt: baseTime.Add(time.Second),
UpdatedAt: baseTime.Add(time.Second),
})
pkgs := repo.ListSupplyPackages(ctx, "")
if len(pkgs) != 2 {
t.Fatalf("expected 2 packages after zero-id upserts, got %d: %+v", len(pkgs), pkgs)
}
if pkgs[0].PackageID == 0 || pkgs[1].PackageID == 0 {
t.Fatalf("expected non-zero package ids, got %+v", pkgs)
}
if pkgs[0].PackageID == pkgs[1].PackageID {
t.Fatalf("expected distinct package ids, got %+v", pkgs)
}
}