360 lines
12 KiB
Go
360 lines
12 KiB
Go
package publish_test
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"supply-intelligence/internal/app"
|
|
"supply-intelligence/internal/domain"
|
|
"supply-intelligence/internal/publish"
|
|
"supply-intelligence/internal/repository"
|
|
)
|
|
|
|
type failingSupplyPackageRepo struct {
|
|
candidate domain.DiscoveryCandidate
|
|
pkg domain.SupplyPackage
|
|
upsertErr error
|
|
appendCalled bool
|
|
statusUpdated bool
|
|
}
|
|
|
|
func (r *failingSupplyPackageRepo) AppendPackageEventContext(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) {
|
|
r.appendCalled = true
|
|
return evt, nil
|
|
}
|
|
|
|
func (r *failingSupplyPackageRepo) GetLatestDiscoveryCandidateContext(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) {
|
|
return r.candidate, r.candidate.Platform == platform && r.candidate.Model == model
|
|
}
|
|
|
|
func (r *failingSupplyPackageRepo) UpdateCandidateStatus(ctx context.Context, candidateID string, status domain.DiscoveryCandidateStatus, failureCode, failureSummary string) error {
|
|
r.statusUpdated = true
|
|
r.candidate.Status = status
|
|
return nil
|
|
}
|
|
|
|
func (r *failingSupplyPackageRepo) GetSupplyPackage(ctx context.Context, platform, model string) (domain.SupplyPackage, bool) {
|
|
return r.pkg, r.pkg.Platform == platform && r.pkg.Model == model
|
|
}
|
|
|
|
func (r *failingSupplyPackageRepo) UpsertSupplyPackage(ctx context.Context, pkg domain.SupplyPackage) error {
|
|
return r.upsertErr
|
|
}
|
|
|
|
func TestServiceRecordPackagePublished(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
service := publish.NewService(repo)
|
|
occurredAt := time.Unix(1715000000, 0)
|
|
|
|
event, err := service.RecordPackagePublished(context.Background(), publish.RecordPackagePublishedInput{
|
|
EventID: "evt-publish-1",
|
|
PackageID: 1001,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-mini",
|
|
Version: 3,
|
|
OccurredAt: occurredAt,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if event.EventID != "evt-publish-1" || event.EventType != publish.PackagePublishedEventType {
|
|
t.Fatalf("unexpected event: %+v", event)
|
|
}
|
|
if !event.OccurredAt.Equal(occurredAt.UTC()) {
|
|
t.Fatalf("unexpected occurred_at: %s", event.OccurredAt)
|
|
}
|
|
if event.GatewaySyncStatus != domain.GatewaySyncStatusPending {
|
|
t.Fatalf("unexpected sync status: %q", event.GatewaySyncStatus)
|
|
}
|
|
|
|
items := repo.ListPackageEvents(context.Background())
|
|
if len(items) != 1 {
|
|
t.Fatalf("unexpected items length: %d", len(items))
|
|
}
|
|
if items[0].EventID != event.EventID || items[0].Version != 3 {
|
|
t.Fatalf("unexpected stored event: %+v", items[0])
|
|
}
|
|
if items[0].GatewaySyncStatus != domain.GatewaySyncStatusPending {
|
|
t.Fatalf("unexpected stored sync status: %+v", items[0])
|
|
}
|
|
}
|
|
|
|
func TestServiceRecordPackagePublishedRejectsInvalidInput(t *testing.T) {
|
|
service := publish.NewService(repository.NewMemoryRepository())
|
|
|
|
_, err := service.RecordPackagePublished(context.Background(), publish.RecordPackagePublishedInput{
|
|
EventID: " ",
|
|
PackageID: 0,
|
|
Platform: "",
|
|
Model: "",
|
|
Version: 0,
|
|
})
|
|
if err == nil {
|
|
t.Fatal("expected error")
|
|
}
|
|
if err != publish.ErrInvalidPublishInput {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestServicePublishDraftTransitionsCandidatePackageAndEvent(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.UpsertDiscoveryCandidateContext(context.Background(), domain.DiscoveryCandidate{
|
|
CandidateID: "cand-publish",
|
|
AccountID: 101,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-mini",
|
|
Source: "admission",
|
|
Status: domain.DiscoveryCandidateStatusTestPassed,
|
|
DiscoveredAt: time.Unix(100, 0).UTC(),
|
|
UpdatedAt: time.Unix(110, 0).UTC(),
|
|
Version: 2,
|
|
})
|
|
repo.UpsertSupplyPackage(context.Background(), domain.SupplyPackage{
|
|
PackageID: 11,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-mini",
|
|
Status: "draft",
|
|
Source: "admission",
|
|
UpdatedAt: time.Unix(110, 0).UTC(),
|
|
Version: 1,
|
|
})
|
|
service := publish.NewService(repo)
|
|
|
|
out, err := service.PublishDraft(context.Background(), publish.PublishDraftInput{
|
|
EventID: "evt-publish-real",
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-mini",
|
|
OccurredAt: time.Unix(120, 0).UTC(),
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if out.Candidate.Status != domain.DiscoveryCandidateStatusPublished {
|
|
t.Fatalf("expected published candidate, got %+v", out.Candidate)
|
|
}
|
|
if out.Package.Status != "active" {
|
|
t.Fatalf("expected active package, got %+v", out.Package)
|
|
}
|
|
if out.Event.GatewaySyncStatus != domain.GatewaySyncStatusPending {
|
|
t.Fatalf("expected pending gateway sync, got %+v", out.Event)
|
|
}
|
|
if got, ok := repo.GetLatestDiscoveryCandidateContext(context.Background(), "openai", "gpt-4.1-mini"); !ok || got.Status != domain.DiscoveryCandidateStatusPublished {
|
|
t.Fatalf("expected stored published candidate, got %+v ok=%v", got, ok)
|
|
}
|
|
if pkg, ok := repo.GetSupplyPackage(context.Background(), "openai", "gpt-4.1-mini"); !ok || pkg.Status != "active" {
|
|
t.Fatalf("expected stored active package, got %+v ok=%v", pkg, ok)
|
|
}
|
|
}
|
|
|
|
func TestServicePublishDraftRejectsInvalidState(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.UpsertDiscoveryCandidateContext(context.Background(), domain.DiscoveryCandidate{
|
|
CandidateID: "cand-bad",
|
|
AccountID: 102,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1",
|
|
Source: "admission",
|
|
Status: domain.DiscoveryCandidateStatusDiscovered,
|
|
DiscoveredAt: time.Unix(100, 0).UTC(),
|
|
UpdatedAt: time.Unix(100, 0).UTC(),
|
|
Version: 1,
|
|
})
|
|
repo.UpsertSupplyPackage(context.Background(), domain.SupplyPackage{
|
|
PackageID: 12,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1",
|
|
Status: "draft",
|
|
Source: "admission",
|
|
UpdatedAt: time.Unix(100, 0).UTC(),
|
|
Version: 1,
|
|
})
|
|
service := publish.NewService(repo)
|
|
|
|
_, err := service.PublishDraft(context.Background(), publish.PublishDraftInput{EventID: "evt-bad", Platform: "openai", Model: "gpt-4.1"})
|
|
if !errors.Is(err, publish.ErrCandidateNotPublishable) {
|
|
t.Fatalf("expected publish.ErrCandidateNotPublishable, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestServicePublishDraftRejectsAlreadyPublishedPackage(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.UpsertDiscoveryCandidateContext(context.Background(), domain.DiscoveryCandidate{
|
|
CandidateID: "cand-published",
|
|
AccountID: 103,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-already",
|
|
Source: "admission",
|
|
Status: domain.DiscoveryCandidateStatusPublished,
|
|
DiscoveredAt: time.Unix(100, 0).UTC(),
|
|
UpdatedAt: time.Unix(120, 0).UTC(),
|
|
Version: 2,
|
|
})
|
|
repo.UpsertSupplyPackage(context.Background(), domain.SupplyPackage{
|
|
PackageID: 13,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-already",
|
|
Status: "active",
|
|
Source: "admission",
|
|
UpdatedAt: time.Unix(120, 0).UTC(),
|
|
Version: 2,
|
|
})
|
|
service := publish.NewService(repo)
|
|
|
|
_, err := service.PublishDraft(context.Background(), publish.PublishDraftInput{EventID: "evt-again", Platform: "openai", Model: "gpt-4.1-already"})
|
|
if !errors.Is(err, publish.ErrPackageAlreadyPublished) {
|
|
t.Fatalf("expected publish.ErrPackageAlreadyPublished, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestServicePublishDraftTreatsHalfAppliedStateAsAlreadyPublished(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
candidate domain.DiscoveryCandidateStatus
|
|
pkgStatus string
|
|
}{
|
|
{name: "candidate already published", candidate: domain.DiscoveryCandidateStatusPublished, pkgStatus: "draft"},
|
|
{name: "package already active", candidate: domain.DiscoveryCandidateStatusTestPassed, pkgStatus: "active"},
|
|
}
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
repo := repository.NewMemoryRepository()
|
|
repo.UpsertDiscoveryCandidateContext(context.Background(), domain.DiscoveryCandidate{
|
|
CandidateID: "cand-half-applied",
|
|
AccountID: 104,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-half",
|
|
Source: "admission",
|
|
Status: tc.candidate,
|
|
DiscoveredAt: time.Unix(100, 0).UTC(),
|
|
UpdatedAt: time.Unix(120, 0).UTC(),
|
|
Version: 2,
|
|
})
|
|
repo.UpsertSupplyPackage(context.Background(), domain.SupplyPackage{
|
|
PackageID: 14,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-half",
|
|
Status: tc.pkgStatus,
|
|
Source: "admission",
|
|
UpdatedAt: time.Unix(120, 0).UTC(),
|
|
Version: 2,
|
|
})
|
|
service := publish.NewService(repo)
|
|
|
|
_, err := service.PublishDraft(context.Background(), publish.PublishDraftInput{EventID: "evt-half-applied", Platform: "openai", Model: "gpt-4.1-half"})
|
|
if !errors.Is(err, publish.ErrPackageAlreadyPublished) {
|
|
t.Fatalf("expected publish.ErrPackageAlreadyPublished, got %v", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestServicePublishDraftReturnsSupplyPackageUpsertError(t *testing.T) {
|
|
repo := &failingSupplyPackageRepo{
|
|
candidate: domain.DiscoveryCandidate{
|
|
CandidateID: "cand-upsert-fail",
|
|
AccountID: 105,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-upsert-fail",
|
|
Source: "admission",
|
|
Status: domain.DiscoveryCandidateStatusTestPassed,
|
|
DiscoveredAt: time.Unix(100, 0).UTC(),
|
|
UpdatedAt: time.Unix(110, 0).UTC(),
|
|
Version: 2,
|
|
},
|
|
pkg: domain.SupplyPackage{
|
|
PackageID: 15,
|
|
Platform: "openai",
|
|
Model: "gpt-4.1-upsert-fail",
|
|
Status: "draft",
|
|
Source: "admission",
|
|
UpdatedAt: time.Unix(110, 0).UTC(),
|
|
Version: 1,
|
|
},
|
|
upsertErr: errors.New("db write failed"),
|
|
}
|
|
service := publish.NewService(repo)
|
|
|
|
_, err := service.PublishDraft(context.Background(), publish.PublishDraftInput{EventID: "evt-upsert-fail", Platform: "openai", Model: "gpt-4.1-upsert-fail"})
|
|
if !errors.Is(err, repo.upsertErr) {
|
|
t.Fatalf("expected upsert error, got %v", err)
|
|
}
|
|
if !repo.statusUpdated {
|
|
t.Fatal("expected candidate status update attempted before package upsert")
|
|
}
|
|
if repo.appendCalled {
|
|
t.Fatal("did not expect package event append after package upsert failure")
|
|
}
|
|
}
|
|
|
|
func TestPublishEndpointConcurrentDuplicateOnlyOneSucceeds(t *testing.T) {
|
|
application := app.New()
|
|
application.Repo.UpsertDiscoveryCandidateContext(context.Background(), domain.DiscoveryCandidate{CandidateID: "cand-concurrent", AccountID: 603, Platform: "openai", Model: "gpt-4.1-race", Source: "admission", Status: domain.DiscoveryCandidateStatusTestPassed, DiscoveredAt: time.Unix(100, 0).UTC(), UpdatedAt: time.Unix(110, 0).UTC(), Version: 2})
|
|
application.Repo.UpsertSupplyPackage(context.Background(), domain.SupplyPackage{PackageID: 503, Platform: "openai", Model: "gpt-4.1-race", Status: "draft", Source: "admission", UpdatedAt: time.Unix(110, 0).UTC(), Version: 1})
|
|
|
|
handler := application.Server.Routes()
|
|
body := `{"event_id":"evt-concurrent-1","platform":"openai","model":"gpt-4.1-race","occurred_at":"2026-05-06T20:30:00Z"}`
|
|
|
|
type result struct {
|
|
status int
|
|
error string
|
|
}
|
|
results := make(chan result, 2)
|
|
start := make(chan struct{})
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 2; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
<-start
|
|
req := httptest.NewRequest(http.MethodPost, "/internal/supply-intelligence/publish/package-event", strings.NewReader(body))
|
|
rr := httptest.NewRecorder()
|
|
handler.ServeHTTP(rr, req)
|
|
var payload map[string]any
|
|
_ = json.Unmarshal(rr.Body.Bytes(), &payload)
|
|
errValue, _ := payload["error"].(string)
|
|
results <- result{status: rr.Code, error: errValue}
|
|
}()
|
|
}
|
|
close(start)
|
|
wg.Wait()
|
|
close(results)
|
|
|
|
successCount := 0
|
|
conflictCount := 0
|
|
for res := range results {
|
|
switch res.status {
|
|
case http.StatusOK:
|
|
successCount++
|
|
case http.StatusConflict:
|
|
if res.error != "publish_already_applied" {
|
|
t.Fatalf("unexpected conflict payload: %+v", res)
|
|
}
|
|
conflictCount++
|
|
default:
|
|
t.Fatalf("unexpected response: %+v", res)
|
|
}
|
|
}
|
|
if successCount != 1 || conflictCount != 1 {
|
|
t.Fatalf("expected one success and one conflict, got success=%d conflict=%d", successCount, conflictCount)
|
|
}
|
|
events := application.Repo.ListPackageEvents(context.Background())
|
|
if len(events) != 1 {
|
|
t.Fatalf("expected exactly one event, got %d", len(events))
|
|
}
|
|
if candidate, ok := application.Repo.GetLatestDiscoveryCandidateContext(context.Background(), "openai", "gpt-4.1-race"); !ok || candidate.Status != domain.DiscoveryCandidateStatusPublished {
|
|
t.Fatalf("expected published candidate, got %+v ok=%v", candidate, ok)
|
|
}
|
|
if pkg, ok := application.Repo.GetSupplyPackage(context.Background(), "openai", "gpt-4.1-race"); !ok || pkg.Status != "active" {
|
|
t.Fatalf("expected active package, got %+v ok=%v", pkg, ok)
|
|
}
|
|
}
|