package httpapi_test import ( "bytes" "context" "encoding/json" "fmt" "net" "net/http" "net/http/httptest" "os/exec" "path/filepath" "runtime" "strconv" "strings" "testing" "time" "supply-intelligence/internal/app" "supply-intelligence/internal/domain" ) func requireDockerForPostgresE2E(t *testing.T) { t.Helper() if _, err := exec.LookPath("docker"); err != nil { t.Skip("docker not installed") } if _, err := exec.LookPath("pg_isready"); err != nil { t.Skip("pg_isready not installed") } } func freeTCPPort(t *testing.T) int { t.Helper() ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("allocate free tcp port: %v", err) } defer ln.Close() addr, ok := ln.Addr().(*net.TCPAddr) if !ok { t.Fatalf("unexpected listener addr type: %T", ln.Addr()) } return addr.Port } func waitForPostgresReady(t *testing.T, port int, user, dbName, containerName string) { t.Helper() deadline := time.Now().Add(45 * time.Second) var lastOut string for time.Now().Before(deadline) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) cmd := exec.CommandContext(ctx, "pg_isready", "-h", "127.0.0.1", "-p", strconv.Itoa(port), "-U", user, "-d", dbName) out, err := cmd.CombinedOutput() cancel() lastOut = strings.TrimSpace(string(out)) if err == nil { return } time.Sleep(1 * time.Second) } logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() t.Fatalf("postgres container did not become ready on port %d within timeout; last pg_isready=%q logs=%s", port, lastOut, string(logs)) } func newPostgresApplicationForE2E(t *testing.T) *app.Application { t.Helper() requireDockerForPostgresE2E(t) _, currentFile, _, ok := runtime.Caller(0) if !ok { t.Fatal("resolve current test file") } projectRoot := filepath.Clean(filepath.Join(filepath.Dir(currentFile), "..", "..")) migrationsDir := filepath.Join(projectRoot, "migrations") hostPort := freeTCPPort(t) containerName := fmt.Sprintf("supply-intelligence-e2e-%d", time.Now().UnixNano()) dbName := "supply_intelligence" dbUser := "supply" dbPassword := "supply123" runArgs := []string{ "run", "-d", "--name", containerName, "-e", "POSTGRES_DB=" + dbName, "-e", "POSTGRES_USER=" + dbUser, "-e", "POSTGRES_PASSWORD=" + dbPassword, "-p", fmt.Sprintf("127.0.0.1:%d:5432", hostPort), "-v", migrationsDir + ":/docker-entrypoint-initdb.d:ro", "postgres:16-alpine", } runCmd := exec.Command("docker", runArgs...) runCmd.Dir = projectRoot if out, err := runCmd.CombinedOutput(); err != nil { t.Skipf("start isolated postgres container failed: %v output=%s", err, string(out)) } t.Cleanup(func() { rmCmd := exec.Command("docker", "rm", "-f", containerName) rmCmd.Dir = projectRoot _, _ = rmCmd.CombinedOutput() }) waitForPostgresReady(t, hostPort, dbUser, dbName, containerName) connString := fmt.Sprintf("postgres://%s:%s@127.0.0.1:%d/%s?sslmode=disable", dbUser, dbPassword, hostPort, dbName) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) t.Cleanup(cancel) application, err := app.NewWithPostgres(ctx, connString) if err != nil { t.Fatalf("connect isolated postgres app: %v", err) } application.GatewayConsumerService.SetConsumer("gateway") if application.GatewayConsumerService == nil { t.Fatal("expected gateway consumer service") } t.Cleanup(application.Close) return application } func TestPostgresE2EPublishConsumeAckAdmissionState(t *testing.T) { application := newPostgresApplicationForE2E(t) handler := application.Server.Routes() model := fmt.Sprintf("gpt-4.1-e2e-%d", time.Now().UnixNano()) candidateID := fmt.Sprintf("cand-e2e-%d", time.Now().UnixNano()) eventID := fmt.Sprintf("evt-e2e-%d", time.Now().UnixNano()) application.Repo.UpsertSupplyAccount(context.Background(), domain.SupplyAccount{ AccountID: 8801, Platform: "openai", APIKey: "test-key", ConsumerTag: "gateway", Status: "active", CreatedAt: time.Unix(90, 0).UTC(), UpdatedAt: time.Unix(90, 0).UTC(), }) application.Repo.UpsertDiscoveryCandidateContext(context.Background(), domain.DiscoveryCandidate{ CandidateID: candidateID, AccountID: 8801, Platform: "openai", Model: model, Source: "admission", Status: domain.DiscoveryCandidateStatusTestPassed, DiscoveredAt: time.Unix(100, 0).UTC(), UpdatedAt: time.Unix(110, 0).UTC(), Version: 2, }) application.Repo.UpsertSupplyPackage(context.Background(), domain.SupplyPackage{ Platform: "openai", Model: model, Status: "draft", Source: "admission", CreatedAt: time.Unix(100, 0).UTC(), UpdatedAt: time.Unix(110, 0).UTC(), Version: 1, }) publishReq := httptest.NewRequest(http.MethodPost, "/internal/supply-intelligence/publish/package-event", bytes.NewBufferString(fmt.Sprintf(`{"event_id":"%s","platform":"openai","model":"%s","occurred_at":"2026-05-06T20:40:00Z"}`, eventID, model))) publishRR := httptest.NewRecorder() handler.ServeHTTP(publishRR, publishReq) if publishRR.Code != http.StatusOK { t.Fatalf("unexpected publish status: %d body=%s", publishRR.Code, publishRR.Body.String()) } consumeReq := httptest.NewRequest(http.MethodPost, "/internal/supply-intelligence/gateway/consume-once", bytes.NewBufferString(`{"consumer":"gateway"}`)) consumeRR := httptest.NewRecorder() handler.ServeHTTP(consumeRR, consumeReq) if consumeRR.Code != http.StatusOK { t.Fatalf("unexpected consume status: %d body=%s", consumeRR.Code, consumeRR.Body.String()) } var consumeBody struct { Items []struct { EventID string `json:"event_id"` GatewaySyncStatus domain.GatewaySyncStatus `json:"gateway_sync_status"` Result domain.GatewayAckResult `json:"result"` } `json:"items"` } if err := json.NewDecoder(consumeRR.Body).Decode(&consumeBody); err != nil { t.Fatalf("decode consume response: %v", err) } if len(consumeBody.Items) != 1 { t.Fatalf("expected one consumed item, got %+v", consumeBody.Items) } lastConsumed := consumeBody.Items[0] if lastConsumed.EventID != eventID { t.Fatalf("expected consumed event %s, got %+v", eventID, lastConsumed) } if lastConsumed.GatewaySyncStatus != domain.GatewaySyncStatusApplied || lastConsumed.Result != domain.GatewayAckResultApplied { t.Fatalf("expected applied consume result, got %+v", lastConsumed) } stateReq := httptest.NewRequest(http.MethodGet, "/internal/supply-intelligence/models/openai/"+model+"/admission-state", nil) stateRR := httptest.NewRecorder() handler.ServeHTTP(stateRR, stateReq) if stateRR.Code != http.StatusOK { t.Fatalf("unexpected admission-state status after consume: %d body=%s", stateRR.Code, stateRR.Body.String()) } var stateBody struct { Candidate *domain.DiscoveryCandidate `json:"candidate"` Package *domain.SupplyPackage `json:"package"` GatewaySyncStatus domain.GatewaySyncStatus `json:"gateway_sync_status"` LastEvent *domain.PackageChangeEvent `json:"last_event"` } if err := json.NewDecoder(stateRR.Body).Decode(&stateBody); err != nil { t.Fatalf("decode admission-state response: %v", err) } if stateBody.Candidate == nil || stateBody.Candidate.Status != domain.DiscoveryCandidateStatusPublished { t.Fatalf("expected published candidate, got %+v", stateBody.Candidate) } if stateBody.Package == nil || stateBody.Package.Status != "active" { t.Fatalf("expected active package, got %+v", stateBody.Package) } if stateBody.LastEvent == nil || stateBody.LastEvent.EventID != eventID { t.Fatalf("expected latest event %s, got %+v", eventID, stateBody.LastEvent) } if stateBody.GatewaySyncStatus != domain.GatewaySyncStatusApplied { t.Fatalf("expected applied sync status after consume, got %q", stateBody.GatewaySyncStatus) } ackReq := httptest.NewRequest(http.MethodPost, "/internal/supply-intelligence/gateway/package-changes/"+eventID+"/ack", bytes.NewBufferString(`{"consumer":"gateway","result":"applied","detail":"manual confirm"}`)) ackRR := httptest.NewRecorder() handler.ServeHTTP(ackRR, ackReq) if ackRR.Code != http.StatusNoContent { t.Fatalf("unexpected ack status: %d body=%s", ackRR.Code, ackRR.Body.String()) } finalStateReq := httptest.NewRequest(http.MethodGet, "/internal/supply-intelligence/models/openai/"+model+"/admission-state", nil) finalStateRR := httptest.NewRecorder() handler.ServeHTTP(finalStateRR, finalStateReq) if finalStateRR.Code != http.StatusOK { t.Fatalf("unexpected final admission-state status: %d body=%s", finalStateRR.Code, finalStateRR.Body.String()) } var finalStateBody struct { Candidate *domain.DiscoveryCandidate `json:"candidate"` Package *domain.SupplyPackage `json:"package"` GatewaySyncStatus domain.GatewaySyncStatus `json:"gateway_sync_status"` LastEvent *domain.PackageChangeEvent `json:"last_event"` } if err := json.NewDecoder(finalStateRR.Body).Decode(&finalStateBody); err != nil { t.Fatalf("decode final admission-state response: %v", err) } if finalStateBody.GatewaySyncStatus != domain.GatewaySyncStatusApplied { t.Fatalf("expected applied sync status after explicit ack, got %q", finalStateBody.GatewaySyncStatus) } if finalStateBody.LastEvent == nil || finalStateBody.LastEvent.Consumer != "gateway" || finalStateBody.LastEvent.ConsumerDetail != "manual confirm" { t.Fatalf("expected ack details persisted, got %+v", finalStateBody.LastEvent) } storedEvent, ok := application.Repo.GetLatestPackageEvent(context.Background(), "openai", model) if !ok { t.Fatal("expected stored package event") } if storedEvent.EventID != eventID || storedEvent.GatewaySyncStatus != domain.GatewaySyncStatusApplied { t.Fatalf("unexpected stored event: %+v", storedEvent) } if storedEvent.AckedAt == nil { t.Fatalf("expected stored ack timestamp, got %+v", storedEvent) } storedSnapshot, ok := application.Repo.GetGatewayAppliedSnapshot(context.Background(), "gateway") if !ok { t.Fatal("expected gateway applied snapshot") } if storedSnapshot.LastEventID != eventID || storedSnapshot.LastModel != model || storedSnapshot.LastResult != string(domain.GatewayAckResultApplied) { t.Fatalf("unexpected gateway snapshot: %+v", storedSnapshot) } } func TestPostgresE2EPublishConsumeAckAdmissionStateRequiresAuthorizedConsumer(t *testing.T) { application := newPostgresApplicationForE2E(t) handler := application.Server.Routes() model := fmt.Sprintf("gpt-4.1-e2e-unauth-%d", time.Now().UnixNano()) candidateID := fmt.Sprintf("cand-e2e-unauth-%d", time.Now().UnixNano()) eventID := fmt.Sprintf("evt-e2e-unauth-%d", time.Now().UnixNano()) application.Repo.UpsertSupplyAccount(context.Background(), domain.SupplyAccount{ AccountID: 9901, Platform: "openai", APIKey: "test-key", ConsumerTag: "other-consumer", Status: "active", CreatedAt: time.Unix(90, 0).UTC(), UpdatedAt: time.Unix(90, 0).UTC(), }) application.Repo.UpsertDiscoveryCandidateContext(context.Background(), domain.DiscoveryCandidate{ CandidateID: candidateID, AccountID: 9901, Platform: "openai", Model: model, Source: "admission", Status: domain.DiscoveryCandidateStatusTestPassed, DiscoveredAt: time.Unix(100, 0).UTC(), UpdatedAt: time.Unix(110, 0).UTC(), Version: 2, }) application.Repo.UpsertSupplyPackage(context.Background(), domain.SupplyPackage{ Platform: "openai", Model: model, Status: "draft", Source: "admission", CreatedAt: time.Unix(100, 0).UTC(), UpdatedAt: time.Unix(110, 0).UTC(), Version: 1, }) publishReq := httptest.NewRequest(http.MethodPost, "/internal/supply-intelligence/publish/package-event", bytes.NewBufferString(fmt.Sprintf(`{"event_id":"%s","platform":"openai","model":"%s","occurred_at":"2026-05-06T20:45:00Z"}`, eventID, model))) publishRR := httptest.NewRecorder() handler.ServeHTTP(publishRR, publishReq) if publishRR.Code != http.StatusOK { t.Fatalf("unexpected publish status: %d body=%s", publishRR.Code, publishRR.Body.String()) } authorizedAccounts := application.Repo.ListSupplyAccountsByConsumer(context.Background(), "gateway") if len(authorizedAccounts) != 0 { t.Fatalf("expected no accounts authorized for gateway, got %+v", authorizedAccounts) } consumeReq := httptest.NewRequest(http.MethodPost, "/internal/supply-intelligence/gateway/consume-once", bytes.NewBufferString(`{"consumer":"gateway"}`)) consumeRR := httptest.NewRecorder() handler.ServeHTTP(consumeRR, consumeReq) if consumeRR.Code != http.StatusOK { t.Fatalf("unexpected consume status: %d body=%s", consumeRR.Code, consumeRR.Body.String()) } var consumeBody struct { Items []any `json:"items"` } if err := json.NewDecoder(consumeRR.Body).Decode(&consumeBody); err != nil { t.Fatalf("decode consume response: %v", err) } if len(consumeBody.Items) != 0 { t.Fatalf("expected unauthorized event to be skipped, got %+v", consumeBody.Items) } stateReq := httptest.NewRequest(http.MethodGet, "/internal/supply-intelligence/models/openai/"+model+"/admission-state", nil) stateRR := httptest.NewRecorder() handler.ServeHTTP(stateRR, stateReq) if stateRR.Code != http.StatusOK { t.Fatalf("unexpected admission-state status: %d body=%s", stateRR.Code, stateRR.Body.String()) } var stateBody struct { GatewaySyncStatus domain.GatewaySyncStatus `json:"gateway_sync_status"` LastEvent *domain.PackageChangeEvent `json:"last_event"` } if err := json.NewDecoder(stateRR.Body).Decode(&stateBody); err != nil { t.Fatalf("decode admission-state response: %v", err) } if stateBody.GatewaySyncStatus != domain.GatewaySyncStatusPending { t.Fatalf("expected pending sync status when unauthorized consumer skips event, got %q", stateBody.GatewaySyncStatus) } if stateBody.LastEvent == nil || !strings.EqualFold(stateBody.LastEvent.EventID, eventID) { t.Fatalf("expected last event to remain pending, got %+v", stateBody.LastEvent) } }