package httpapi import ( "context" "encoding/json" "errors" "net/http" "strings" "time" "github.com/prometheus/client_golang/prometheus/promhttp" "supply-intelligence/internal/admission" "supply-intelligence/internal/discovery" "supply-intelligence/internal/domain" "supply-intelligence/internal/gatewayconsumer" "supply-intelligence/internal/poller" "supply-intelligence/internal/probe" "supply-intelligence/internal/publish" "supply-intelligence/internal/repository" ) type Server struct { repo repository.Repository probeService *probe.Service publishService *publish.Service gatewayConsumerService *gatewayconsumer.Service gatewayRuntime *poller.Runtime discoveryService *discovery.Service admissionService *admission.Service discoveryScheduler *discovery.DiscoveryScheduler dashboardHandler *DashboardHandler } type packageChangesResponse struct { Items []domain.PackageChangeEvent `json:"items"` NextCursor string `json:"next_cursor"` } type discoveryCandidatesResponse struct { Items []domain.DiscoveryCandidate `json:"items"` } func NewServer(repo repository.Repository, probeService *probe.Service, publishService *publish.Service, gatewayConsumerService *gatewayconsumer.Service, gatewayRuntime *poller.Runtime, discoveryService *discovery.Service, admissionService *admission.Service, discoveryScheduler *discovery.DiscoveryScheduler, dashboardHandler *DashboardHandler) *Server { return &Server{repo: repo, probeService: probeService, publishService: publishService, gatewayConsumerService: gatewayConsumerService, gatewayRuntime: gatewayRuntime, discoveryService: discoveryService, admissionService: admissionService, discoveryScheduler: discoveryScheduler, dashboardHandler: dashboardHandler} } func (s *Server) Routes() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/healthz", s.handleHealth) mux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/internal/supply-intelligence/accounts/", s.handleGetRoutingState) mux.HandleFunc("/internal/supply-intelligence/probe/evaluate", s.handleEvaluateProbe) mux.HandleFunc("/internal/supply-intelligence/publish/package-event", s.handlePublishPackageEvent) mux.HandleFunc("/internal/supply-intelligence/discovery/candidates", s.handleDiscoveryCandidates) mux.HandleFunc("/internal/supply-intelligence/gateway/package-changes", s.handleListPackageChanges) mux.HandleFunc("/internal/supply-intelligence/gateway/package-changes/", s.handleAckPackageChange) mux.HandleFunc("/internal/supply-intelligence/gateway/consume-once", s.handleConsumeOnce) mux.HandleFunc("/internal/supply-intelligence/gateway/runtime-status", s.handleGatewayRuntimeStatus) mux.HandleFunc("/internal/supply-intelligence/gateway/runtime/pause", s.handleGatewayRuntimePause) mux.HandleFunc("/internal/supply-intelligence/gateway/runtime/resume", s.handleGatewayRuntimeResume) mux.HandleFunc("/internal/supply-intelligence/admission/run", s.handleAdmissionRun) mux.HandleFunc("/internal/supply-intelligence/admission/candidates", s.handleAdmissionCandidates) mux.HandleFunc("/internal/supply-intelligence/models/", s.handleModelAdmissionState) // Dashboard endpoints if s.dashboardHandler != nil { mux.HandleFunc("/internal/supply-intelligence/dashboard/accounts", s.dashboardHandler.ListAccounts) mux.HandleFunc("/internal/supply-intelligence/dashboard/accounts/", s.dashboardHandler.GetProbeHistory) mux.HandleFunc("/internal/supply-intelligence/dashboard/models", s.dashboardHandler.ListModels) mux.HandleFunc("/internal/supply-intelligence/dashboard/candidates", s.dashboardHandler.ListCandidates) } // Discovery scan endpoints if s.discoveryScheduler != nil { mux.HandleFunc("/internal/supply-intelligence/discovery/scan", s.handleDiscoveryScan) mux.HandleFunc("/internal/supply-intelligence/discovery/scan-platform", s.handleDiscoveryScanPlatform) } return mux } func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } func (s *Server) handleGetRoutingState(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } prefix := "/internal/supply-intelligence/accounts/" path := strings.TrimPrefix(r.URL.Path, prefix) if !strings.HasSuffix(path, "/routing-state") { writeJSON(w, http.StatusNotFound, map[string]string{"error": "not_found"}) return } accountIDPart := strings.TrimSuffix(path, "/routing-state") var accountID int64 if _, err := parseInt64(accountIDPart, &accountID); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_account_id"}) return } state, ok := s.repo.GetRoutingState(r.Context(), accountID) if !ok { writeJSON(w, http.StatusNotFound, map[string]string{"error": "not_found"}) return } writeJSON(w, http.StatusOK, state) } func (s *Server) handleEvaluateProbe(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } if s.probeService == nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "probe_service_unavailable"}) return } var payload struct { AccountID int64 `json:"account_id"` Platform string `json:"platform"` CurrentStatus string `json:"current_status"` StatusCode int `json:"status_code"` TransportError string `json:"transport_error"` } if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_json"}) return } if payload.AccountID <= 0 { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_account_id"}) return } if payload.Platform == "" { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing_platform"}) return } if payload.CurrentStatus == "" { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing_current_status"}) return } var transportErr error if payload.TransportError != "" { transportErr = errors.New(payload.TransportError) } result, err := s.probeService.EvaluateHTTPResult(context.Background(), probe.EvaluateInput{ AccountID: payload.AccountID, Platform: payload.Platform, CurrentStatus: domainAccountStatus(payload.CurrentStatus), StatusCode: payload.StatusCode, TransportError: transportErr, }) if err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } writeJSON(w, http.StatusOK, result) } func (s *Server) handlePublishPackageEvent(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } if s.publishService == nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "publish_service_unavailable"}) return } var payload struct { EventID string `json:"event_id"` Platform string `json:"platform"` Model string `json:"model"` OccurredAt string `json:"occurred_at"` } if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_json"}) return } var occurredAt time.Time if payload.OccurredAt != "" { parsed, err := time.Parse(time.RFC3339, payload.OccurredAt) if err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_occurred_at"}) return } occurredAt = parsed } out, err := s.publishService.PublishDraft(r.Context(), publish.PublishDraftInput{ EventID: payload.EventID, Platform: payload.Platform, Model: payload.Model, OccurredAt: occurredAt, }) if err != nil { switch { case errors.Is(err, publish.ErrInvalidPublishInput): writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_publish_input"}) case errors.Is(err, publish.ErrCandidateOrPackageMissing): writeJSON(w, http.StatusNotFound, map[string]string{"error": "candidate_or_package_missing"}) case errors.Is(err, publish.ErrDuplicatePublishRequest): writeJSON(w, http.StatusConflict, map[string]string{"error": "duplicate_publish_request"}) case errors.Is(err, publish.ErrPackageAlreadyPublished): writeJSON(w, http.StatusConflict, map[string]string{"error": "publish_already_applied"}) case errors.Is(err, publish.ErrCandidateNotPublishable), errors.Is(err, publish.ErrPackageNotPublishable): writeJSON(w, http.StatusConflict, map[string]string{"error": "publish_precondition_failed"}) default: writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "internal_error"}) } return } writeJSON(w, http.StatusOK, out) } func (s *Server) handleDiscoveryCandidates(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: s.handleCreateDiscoveryCandidate(w, r) case http.MethodGet: s.handleListDiscoveryCandidates(w, r) default: writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) } } func (s *Server) handleCreateDiscoveryCandidate(w http.ResponseWriter, r *http.Request) { if s.discoveryService == nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "discovery_service_unavailable"}) return } var payload struct { CandidateID string `json:"candidate_id"` AccountID int64 `json:"account_id"` Platform string `json:"platform"` Model string `json:"model"` Source string `json:"source"` ReasonCode string `json:"reason_code"` DiscoveredAt string `json:"discovered_at"` } if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_json"}) return } var discoveredAt time.Time if strings.TrimSpace(payload.DiscoveredAt) != "" { parsed, err := time.Parse(time.RFC3339, payload.DiscoveredAt) if err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_discovered_at"}) return } discoveredAt = parsed } out, err := s.discoveryService.RecordCandidate(r.Context(), discovery.RecordCandidateInput{ CandidateID: payload.CandidateID, AccountID: payload.AccountID, Platform: payload.Platform, Model: payload.Model, Source: payload.Source, ReasonCode: payload.ReasonCode, DiscoveredAt: discoveredAt, }) if err != nil { if errors.Is(err, discovery.ErrInvalidCandidateInput) { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_candidate_input"}) return } writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "internal_error"}) return } writeJSON(w, http.StatusOK, out) } func (s *Server) handleListDiscoveryCandidates(w http.ResponseWriter, r *http.Request) { if s.discoveryService == nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "discovery_service_unavailable"}) return } status, ok := parseDiscoveryCandidateStatus(strings.TrimSpace(r.URL.Query().Get("status"))) if !ok { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_status"}) return } writeJSON(w, http.StatusOK, discoveryCandidatesResponse{Items: s.discoveryService.ListCandidates(r.Context(), status)}) } func parseDiscoveryCandidateStatus(raw string) (domain.DiscoveryCandidateStatus, bool) { if raw == "" { return "", true } status := domain.DiscoveryCandidateStatus(raw) switch status { case domain.DiscoveryCandidateStatusDiscovered, domain.DiscoveryCandidateStatusTesting, domain.DiscoveryCandidateStatusTestPassed, domain.DiscoveryCandidateStatusTestFailed, domain.DiscoveryCandidateStatusRetryPending, domain.DiscoveryCandidateStatusIgnored, domain.DiscoveryCandidateStatusPublished, domain.DiscoveryCandidateStatusDeprecated, domain.DiscoveryCandidateStatusClosed: return status, true default: return "", false } } func (s *Server) handleListPackageChanges(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } items, nextCursor := s.repo.ListPackageEventsAfter(r.Context(), strings.TrimSpace(r.URL.Query().Get("cursor"))) writeJSON(w, http.StatusOK, packageChangesResponse{Items: items, NextCursor: nextCursor}) } func (s *Server) handleAckPackageChange(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } prefix := "/internal/supply-intelligence/gateway/package-changes/" path := strings.TrimPrefix(r.URL.Path, prefix) if !strings.HasSuffix(path, "/ack") { writeJSON(w, http.StatusNotFound, map[string]string{"error": "not_found"}) return } eventID := strings.TrimSuffix(path, "/ack") var payload struct { Consumer string `json:"consumer"` Result string `json:"result"` Detail string `json:"detail"` } if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_json"}) return } ackResult := domain.GatewayAckResult(payload.Result) if !repository.IsGatewayAckResult(ackResult) { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_result"}) return } consumer := strings.TrimSpace(payload.Consumer) if consumer == "" { consumer = "gateway" } _, err := s.repo.AckPackageEvent(r.Context(), eventID, consumer, ackResult, payload.Detail, time.Now().UTC()) if err != nil { if errors.Is(err, repository.ErrEventNotFound) { writeJSON(w, http.StatusNotFound, map[string]string{"error": "not_found"}) return } writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "internal_error"}) return } w.WriteHeader(http.StatusNoContent) } func (s *Server) handleConsumeOnce(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } if s.gatewayConsumerService == nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "gateway_consumer_unavailable"}) return } var payload struct { Consumer string `json:"consumer"` Cursor string `json:"cursor"` } if r.Body != nil { if err := json.NewDecoder(r.Body).Decode(&payload); err != nil && err.Error() != "EOF" { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_json"}) return } } out, err := s.gatewayConsumerService.ConsumeOnce(r.Context(), gatewayconsumer.ConsumeOnceInput{Consumer: payload.Consumer, Cursor: payload.Cursor}) if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "consume_failed"}) return } writeJSON(w, http.StatusOK, out) } func (s *Server) handleGatewayRuntimeStatus(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } if s.gatewayRuntime == nil || s.repo == nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "gateway_runtime_unavailable"}) return } now := time.Now().UTC() status := s.gatewayRuntime.Status() consumer := strings.TrimSpace(r.URL.Query().Get("consumer")) if consumer == "" { consumer = "gateway" } writeJSON(w, http.StatusOK, map[string]any{ "started": status.Started, "paused": status.Paused, "cursor": status.Cursor, "last_poll_at": status.LastPollAt, "last_error": status.LastError, "pending_retry_events": s.repo.CountRetryablePendingPackageEvents(r.Context(), consumer, now), "failed_events": s.repo.CountPackageEventsBySyncStatus(r.Context(), domain.GatewaySyncStatusFailed), }) } func (s *Server) handleGatewayRuntimePause(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } if s.gatewayRuntime == nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "gateway_runtime_unavailable"}) return } if !s.gatewayRuntime.Pause() { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "pause_failed"}) return } writeJSON(w, http.StatusOK, map[string]bool{"paused": true}) } func (s *Server) handleGatewayRuntimeResume(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } if s.gatewayRuntime == nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "gateway_runtime_unavailable"}) return } if !s.gatewayRuntime.Resume() { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "resume_failed"}) return } writeJSON(w, http.StatusOK, map[string]bool{"paused": false}) } func writeJSON(w http.ResponseWriter, status int, body any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(body) } // handleAdmissionRun runs admission test for a specific candidate func (s *Server) handleAdmissionRun(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } if s.admissionService == nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "admission_service_unavailable"}) return } var payload struct { CandidateID string `json:"candidate_id"` } if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_json"}) return } if strings.TrimSpace(payload.CandidateID) == "" { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing_candidate_id"}) return } result, err := s.admissionService.RunAdmission(r.Context(), payload.CandidateID) if err != nil { switch { case errors.Is(err, admission.ErrCandidateNotFound): writeJSON(w, http.StatusNotFound, map[string]string{"error": "candidate_not_found"}) case errors.Is(err, admission.ErrCandidateNotRunnable): writeJSON(w, http.StatusConflict, map[string]string{"error": "candidate_not_runnable"}) default: writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "admission_run_failed"}) } return } writeJSON(w, http.StatusOK, result) } // handleAdmissionCandidates lists candidates currently runnable for admission testing func (s *Server) handleAdmissionCandidates(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } if s.admissionService == nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "admission_service_unavailable"}) return } candidates := s.admissionService.GetRunnableCandidates(r.Context()) writeJSON(w, http.StatusOK, map[string]any{"items": candidates}) } func (s *Server) handleModelAdmissionState(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } prefix := "/internal/supply-intelligence/models/" path := strings.TrimPrefix(r.URL.Path, prefix) parts := strings.Split(path, "/") if len(parts) != 3 || parts[2] != "admission-state" { writeJSON(w, http.StatusNotFound, map[string]string{"error": "not_found"}) return } platform := strings.TrimSpace(parts[0]) model := strings.TrimSpace(parts[1]) if platform == "" || model == "" { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_model_path"}) return } var candidate *domain.DiscoveryCandidate if latest, ok := s.repo.GetLatestDiscoveryCandidateContext(r.Context(), platform, model); ok { copyCandidate := latest candidate = ©Candidate } pkg, hasPackage := s.repo.GetSupplyPackage(r.Context(), platform, model) var lastEvent *domain.PackageChangeEvent if hasPackage { if latestEvent, ok := s.repo.GetLatestPackageEvent(r.Context(), platform, model); ok { copyEvt := latestEvent lastEvent = ©Evt } } gatewaySyncStatus := domain.GatewaySyncStatus("") if lastEvent != nil { gatewaySyncStatus = lastEvent.GatewaySyncStatus } writeJSON(w, http.StatusOK, map[string]any{ "platform": platform, "model": model, "candidate": candidate, "package": packageOrNil(hasPackage, pkg), "gateway_sync_status": gatewaySyncStatus, "last_event": lastEvent, }) } func packageOrNil(ok bool, pkg domain.SupplyPackage) any { if !ok { return nil } return pkg } func domainAccountStatus(raw string) domain.AccountStatus { return domain.AccountStatus(raw) } // handleDiscoveryScan runs discovery across all registered platforms. // POST /internal/supply-intelligence/discovery/scan func (s *Server) handleDiscoveryScan(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } if s.discoveryScheduler == nil { writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "discovery_scheduler_unavailable"}) return } results, err := s.discoveryScheduler.ScanAllPlatforms(r.Context()) if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } type scanResultRow struct { Platform string `json:"platform"` NewModels int `json:"new_models"` RemovedModels []string `json:"removed_models,omitempty"` Errors []string `json:"errors,omitempty"` } rows := make([]scanResultRow, 0, len(results)) for _, r := range results { rows = append(rows, scanResultRow{ Platform: r.Platform, NewModels: r.NewModels, RemovedModels: r.RemovedModels, Errors: r.Errors, }) } writeJSON(w, http.StatusOK, map[string]any{"results": rows, "total_platforms": len(results)}) } // handleDiscoveryScanPlatform runs discovery for a single platform. // POST /internal/supply-intelligence/discovery/scan-platform // Body: {"platform": "openai"} func (s *Server) handleDiscoveryScanPlatform(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) return } if s.discoveryScheduler == nil { writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "discovery_scheduler_unavailable"}) return } var payload struct { Platform string `json:"platform"` } if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid_json"}) return } if strings.TrimSpace(payload.Platform) == "" { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing_platform"}) return } result, err := s.discoveryScheduler.ScanPlatform(r.Context(), payload.Platform) if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } writeJSON(w, http.StatusOK, map[string]any{ "platform": result.Platform, "new_models": result.NewModels, "removed_models": result.RemovedModels, "errors": result.Errors, }) }