287 lines
10 KiB
Go
287 lines
10 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"supply-intelligence/internal/domain"
|
|
"supply-intelligence/internal/publish"
|
|
)
|
|
|
|
func requireDocker(t *testing.T) {
|
|
t.Helper()
|
|
if _, err := exec.LookPath("docker"); err != nil {
|
|
t.Skip("docker not installed")
|
|
}
|
|
}
|
|
|
|
func freeTCPPort(t *testing.T) int {
|
|
t.Helper()
|
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
t.Fatalf("allocate free tcp port: %v", err)
|
|
}
|
|
defer ln.Close()
|
|
addr, ok := ln.Addr().(*net.TCPAddr)
|
|
if !ok {
|
|
t.Fatalf("unexpected listener addr type: %T", ln.Addr())
|
|
}
|
|
return addr.Port
|
|
}
|
|
|
|
func waitForPostgresReady(t *testing.T, port int, user, dbName, containerName string) {
|
|
t.Helper()
|
|
deadline := time.Now().Add(45 * time.Second)
|
|
var lastOut string
|
|
for time.Now().Before(deadline) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
cmd := exec.CommandContext(ctx, "pg_isready", "-h", "127.0.0.1", "-p", strconv.Itoa(port), "-U", user, "-d", dbName)
|
|
out, err := cmd.CombinedOutput()
|
|
cancel()
|
|
lastOut = strings.TrimSpace(string(out))
|
|
if err == nil {
|
|
return
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput()
|
|
t.Fatalf("postgres container did not become ready on port %d within timeout; last pg_isready=%q logs=%s", port, lastOut, string(logs))
|
|
}
|
|
|
|
func newPostgresTestRepository(t *testing.T) *PostgresRepository {
|
|
t.Helper()
|
|
requireDocker(t)
|
|
if _, err := exec.LookPath("pg_isready"); err != nil {
|
|
t.Skip("pg_isready not installed")
|
|
}
|
|
_, currentFile, _, ok := runtime.Caller(0)
|
|
if !ok {
|
|
t.Fatal("resolve current test file")
|
|
}
|
|
projectRoot := filepath.Clean(filepath.Join(filepath.Dir(currentFile), "..", ".."))
|
|
migrationsDir := filepath.Join(projectRoot, "migrations")
|
|
hostPort := freeTCPPort(t)
|
|
containerName := fmt.Sprintf("supply-intelligence-repo-test-%d", time.Now().UnixNano())
|
|
dbName := "supply_intelligence"
|
|
dbUser := "supply"
|
|
dbPassword := "supply123"
|
|
|
|
cmd := exec.Command("docker", "run", "-d",
|
|
"--name", containerName,
|
|
"-e", "POSTGRES_DB="+dbName,
|
|
"-e", "POSTGRES_USER="+dbUser,
|
|
"-e", "POSTGRES_PASSWORD="+dbPassword,
|
|
"-p", fmt.Sprintf("127.0.0.1:%d:5432", hostPort),
|
|
"-v", migrationsDir+":/docker-entrypoint-initdb.d:ro",
|
|
"postgres:16-alpine",
|
|
)
|
|
cmd.Dir = projectRoot
|
|
if out, err := cmd.CombinedOutput(); err != nil {
|
|
t.Skipf("start isolated postgres container failed: %v output=%s", err, string(out))
|
|
}
|
|
t.Cleanup(func() {
|
|
rmCmd := exec.Command("docker", "rm", "-f", containerName)
|
|
rmCmd.Dir = projectRoot
|
|
_, _ = rmCmd.CombinedOutput()
|
|
})
|
|
waitForPostgresReady(t, hostPort, dbUser, dbName, containerName)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
t.Cleanup(cancel)
|
|
dsn := fmt.Sprintf("host=127.0.0.1 port=%d user=%s password=%s dbname=%s sslmode=disable", hostPort, dbUser, dbPassword, dbName)
|
|
repo, err := NewPostgresRepository(ctx, dsn)
|
|
if err != nil {
|
|
t.Fatalf("postgres not ready: %v", err)
|
|
}
|
|
return repo
|
|
}
|
|
|
|
func seedPublishCandidateAndPackage(t *testing.T, repo *PostgresRepository, candidateID string, accountID int64, platform, model string) {
|
|
t.Helper()
|
|
ctx := context.Background()
|
|
repo.UpsertDiscoveryCandidateContext(ctx, domain.DiscoveryCandidate{CandidateID: candidateID, AccountID: accountID, Platform: platform, Model: model, Source: "admission", Status: domain.DiscoveryCandidateStatusTestPassed, DiscoveredAt: time.Unix(100,0).UTC(), UpdatedAt: time.Unix(110,0).UTC()})
|
|
repo.UpsertSupplyPackage(ctx, domain.SupplyPackage{PackageID: 1, Platform: platform, Model: model, Status: "draft", Source: "admission", CreatedAt: time.Unix(90,0).UTC(), UpdatedAt: time.Unix(110,0).UTC()})
|
|
}
|
|
|
|
func mustLatestCandidate(t *testing.T, repo *PostgresRepository, ctx context.Context, platform, model string) domain.DiscoveryCandidate {
|
|
t.Helper()
|
|
v, ok := repo.GetLatestDiscoveryCandidateContext(ctx, platform, model)
|
|
if !ok { t.Fatalf("candidate missing") }
|
|
return v
|
|
}
|
|
func mustCandidateByID(t *testing.T, repo *PostgresRepository, ctx context.Context, id string) domain.DiscoveryCandidate {
|
|
t.Helper()
|
|
v, ok := repo.GetDiscoveryCandidateByIDContext(ctx, id)
|
|
if !ok { t.Fatalf("candidate id missing") }
|
|
return v
|
|
}
|
|
func mustPackage(t *testing.T, repo *PostgresRepository, ctx context.Context, platform, model string) domain.SupplyPackage {
|
|
t.Helper()
|
|
v, ok := repo.GetSupplyPackage(ctx, platform, model)
|
|
if !ok { t.Fatalf("package missing") }
|
|
return v
|
|
}
|
|
|
|
func TestPostgresPublishPackageAtomicallyConcurrentDoublePublish(t *testing.T) {
|
|
repo := newPostgresTestRepository(t)
|
|
ctx := context.Background()
|
|
model := fmt.Sprintf("gpt-concurrent-%d", time.Now().UnixNano())
|
|
seedPublishCandidateAndPackage(t, repo, "cand-tx-concurrent", 7102, "openai", model)
|
|
|
|
firstCandidate := mustLatestCandidate(t, repo, ctx, "openai", model)
|
|
firstPackage := mustPackage(t, repo, ctx, "openai", model)
|
|
firstCandidate.Status = domain.DiscoveryCandidateStatusPublished
|
|
firstCandidate.UpdatedAt = time.Unix(300, 0).UTC()
|
|
firstCandidate.Version++
|
|
firstPackage.Status = "active"
|
|
firstPackage.UpdatedAt = time.Unix(300, 0).UTC()
|
|
firstPackage.Version++
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
results := make(chan error, 2)
|
|
for i := 0; i < 2; i++ {
|
|
go func(idx int) {
|
|
defer wg.Done()
|
|
evtID := fmt.Sprintf("evt-concurrent-%d-%d", time.Now().UnixNano(), idx)
|
|
_, err := repo.PublishPackageAtomically(ctx, publish.PublishPackageAtomicInput{
|
|
Candidate: firstCandidate,
|
|
Package: firstPackage,
|
|
Event: domain.PackageChangeEvent{
|
|
EventID: evtID,
|
|
AccountID: 7102,
|
|
EventType: publish.PackagePublishedEventType,
|
|
PackageID: firstPackage.PackageID,
|
|
Platform: "openai",
|
|
Model: model,
|
|
OccurredAt: time.Unix(300+int64(idx), 0).UTC(),
|
|
Version: firstPackage.Version,
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
},
|
|
})
|
|
results <- err
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
close(results)
|
|
|
|
successCount := 0
|
|
failCount := 0
|
|
for err := range results {
|
|
if err == nil {
|
|
successCount++
|
|
} else {
|
|
failCount++
|
|
if !errors.Is(err, publish.ErrPackageAlreadyPublished) && !errors.Is(err, publish.ErrCandidateNotPublishable) {
|
|
t.Fatalf("unexpected concurrent error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
if successCount != 1 {
|
|
t.Fatalf("expected exactly 1 success, got %d", successCount)
|
|
}
|
|
if failCount != 1 {
|
|
t.Fatalf("expected exactly 1 failure, got %d", failCount)
|
|
}
|
|
|
|
candidateAfter := mustCandidateByID(t, repo, ctx, "cand-tx-concurrent")
|
|
if candidateAfter.Status != domain.DiscoveryCandidateStatusPublished {
|
|
t.Fatalf("expected published candidate after concurrent publish, got %+v", candidateAfter)
|
|
}
|
|
pkgAfter := mustPackage(t, repo, ctx, "openai", model)
|
|
if pkgAfter.Status != "active" {
|
|
t.Fatalf("expected active package after concurrent publish, got %+v", pkgAfter)
|
|
}
|
|
events := repo.ListPackageEvents(ctx)
|
|
var modelEvents int
|
|
for _, e := range events {
|
|
if e.Platform == "openai" && e.Model == model {
|
|
modelEvents++
|
|
}
|
|
}
|
|
if modelEvents != 1 {
|
|
t.Fatalf("expected exactly 1 event for model after concurrent publish, got %d", modelEvents)
|
|
}
|
|
}
|
|
|
|
func TestPostgresPublishPackageAtomicallyRollsBackOnDuplicateEvent(t *testing.T) {
|
|
repo := newPostgresTestRepository(t)
|
|
ctx := context.Background()
|
|
model := fmt.Sprintf("gpt-rollback-%d", time.Now().UnixNano())
|
|
seedPublishCandidateAndPackage(t, repo, "cand-tx-rollback", 7101, "openai", model)
|
|
|
|
firstCandidate := mustLatestCandidate(t, repo, ctx, "openai", model)
|
|
firstPackage := mustPackage(t, repo, ctx, "openai", model)
|
|
firstCandidate.Status = domain.DiscoveryCandidateStatusPublished
|
|
firstCandidate.UpdatedAt = time.Unix(200, 0).UTC()
|
|
firstCandidate.Version++
|
|
firstPackage.Status = "active"
|
|
firstPackage.UpdatedAt = time.Unix(200, 0).UTC()
|
|
firstPackage.Version++
|
|
_, err := repo.PublishPackageAtomically(ctx, publish.PublishPackageAtomicInput{Candidate: firstCandidate, Package: firstPackage, Event: domain.PackageChangeEvent{EventID: "evt-rollback-1", AccountID: 7101, EventType: publish.PackagePublishedEventType, PackageID: firstPackage.PackageID, Platform: "openai", Model: model, OccurredAt: time.Unix(200, 0).UTC(), Version: firstPackage.Version, GatewaySyncStatus: domain.GatewaySyncStatusPending}})
|
|
if err != nil {
|
|
t.Fatalf("seed publish failed: %v", err)
|
|
}
|
|
|
|
candidateBefore := mustCandidateByID(t, repo, ctx, "cand-tx-rollback")
|
|
pkgBefore := mustPackage(t, repo, ctx, "openai", model)
|
|
|
|
_, err = repo.PublishPackageAtomically(ctx, publish.PublishPackageAtomicInput{Candidate: candidateBefore, Package: pkgBefore, Event: domain.PackageChangeEvent{EventID: "evt-rollback-1", AccountID: 7101, EventType: publish.PackagePublishedEventType, PackageID: pkgBefore.PackageID, Platform: "openai", Model: model, OccurredAt: time.Unix(201, 0).UTC(), Version: pkgBefore.Version + 1, GatewaySyncStatus: domain.GatewaySyncStatusPending}})
|
|
if err == nil {
|
|
t.Fatal("expected duplicate event error")
|
|
}
|
|
|
|
candidateAfter := mustCandidateByID(t, repo, ctx, "cand-tx-rollback")
|
|
if candidateAfter.Status != candidateBefore.Status || candidateAfter.Version != candidateBefore.Version {
|
|
t.Fatalf("candidate changed despite rollback: before=%+v after=%+v", candidateBefore, candidateAfter)
|
|
}
|
|
pkgAfter := mustPackage(t, repo, ctx, "openai", model)
|
|
if pkgAfter.Status != pkgBefore.Status || pkgAfter.Version != pkgBefore.Version {
|
|
t.Fatalf("package changed despite rollback: before=%+v after=%+v", pkgBefore, pkgAfter)
|
|
}
|
|
}
|
|
|
|
func TestPostgresUpsertSupplyPackageAllocatesDistinctPackageIDsForZeroInput(t *testing.T) {
|
|
repo := newPostgresTestRepository(t)
|
|
ctx := context.Background()
|
|
baseTime := time.Unix(100, 0).UTC()
|
|
|
|
repo.UpsertSupplyPackage(ctx, domain.SupplyPackage{
|
|
Platform: "openai",
|
|
Model: fmt.Sprintf("gpt-zero-id-a-%d", time.Now().UnixNano()),
|
|
Status: "draft",
|
|
Source: "admission",
|
|
CreatedAt: baseTime,
|
|
UpdatedAt: baseTime,
|
|
})
|
|
repo.UpsertSupplyPackage(ctx, domain.SupplyPackage{
|
|
Platform: "openai",
|
|
Model: fmt.Sprintf("gpt-zero-id-b-%d", time.Now().UnixNano()),
|
|
Status: "draft",
|
|
Source: "admission",
|
|
CreatedAt: baseTime.Add(time.Second),
|
|
UpdatedAt: baseTime.Add(time.Second),
|
|
})
|
|
|
|
pkgs := repo.ListSupplyPackages(ctx, "")
|
|
if len(pkgs) != 2 {
|
|
t.Fatalf("expected 2 packages after zero-id upserts, got %d: %+v", len(pkgs), pkgs)
|
|
}
|
|
if pkgs[0].PackageID == 0 || pkgs[1].PackageID == 0 {
|
|
t.Fatalf("expected non-zero package ids, got %+v", pkgs)
|
|
}
|
|
if pkgs[0].PackageID == pkgs[1].PackageID {
|
|
t.Fatalf("expected distinct package ids, got %+v", pkgs)
|
|
}
|
|
}
|