package repository import ( "context" "errors" "fmt" "time" "github.com/jackc/pgconn" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" "supply-intelligence/internal/domain" "supply-intelligence/internal/publish" ) // PostgresRepository implements Repository using pgx. type PostgresRepository struct { db *pgxpool.Pool } // NewPostgresRepository connects to PostgreSQL using the given connection string. func NewPostgresRepository(ctx context.Context, connString string) (*PostgresRepository, error) { config, err := pgxpool.ParseConfig(connString) if err != nil { return nil, fmt.Errorf("parse conn string: %w", err) } pool, err := pgxpool.ConnectConfig(ctx, config) if err != nil { return nil, fmt.Errorf("connect to postgres: %w", err) } if err := pool.Ping(ctx); err != nil { return nil, fmt.Errorf("ping postgres: %w", err) } return &PostgresRepository{db: pool}, nil } // Close releases the connection pool. func (r *PostgresRepository) Close() { r.db.Close() } type dbtx interface { Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row } // ─── Routing State ──────────────────────────────────────────────────────────── func (r *PostgresRepository) UpsertRoutingState(ctx context.Context, state domain.AccountRoutingState) { r.UpsertRoutingStateContext(ctx, state) } func (r *PostgresRepository) UpsertRoutingStateContext(ctx context.Context, state domain.AccountRoutingState) domain.AccountRoutingState { query := ` INSERT INTO supply_intelligence_account_routing_states (account_id, platform, account_status, routing_enabled, risk_score, reason_code, last_probe_at, version) VALUES ($1,$2,$3,$4,$5,$6,$7,1) ON CONFLICT (account_id) DO UPDATE SET platform=EXCLUDED.platform, account_status=EXCLUDED.account_status, routing_enabled=EXCLUDED.routing_enabled, risk_score=EXCLUDED.risk_score, reason_code=EXCLUDED.reason_code, last_probe_at=EXCLUDED.last_probe_at, version=supply_intelligence_account_routing_states.version+1` _, _ = r.db.Exec(ctx, query, state.AccountID, state.Platform, state.AccountStatus, state.RoutingEnabled, state.RiskScore, state.ReasonCode, state.LastProbeAt, ) return state } func (r *PostgresRepository) GetRoutingState(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) { return r.GetRoutingStateContext(ctx, accountID) } func (r *PostgresRepository) GetRoutingStateContext(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) { query := ` SELECT account_id, platform, account_status, routing_enabled, risk_score, reason_code, last_probe_at, version FROM supply_intelligence_account_routing_states WHERE account_id=$1` row := r.db.QueryRow(ctx, query, accountID) var s domain.AccountRoutingState err := row.Scan(&s.AccountID, &s.Platform, &s.AccountStatus, &s.RoutingEnabled, &s.RiskScore, &s.ReasonCode, &s.LastProbeAt, &s.Version) if errors.Is(err, pgx.ErrNoRows) { return domain.AccountRoutingState{}, false } if err != nil { return domain.AccountRoutingState{}, false } return s, true } func (r *PostgresRepository) ListRoutingStatesByPlatform(ctx context.Context, platform string) []domain.AccountRoutingState { query := ` SELECT account_id, platform, account_status, routing_enabled, risk_score, reason_code, last_probe_at, version FROM supply_intelligence_account_routing_states WHERE platform=$1` rows, err := r.db.Query(ctx, query, platform) if err != nil { return nil } if rows.Err() != nil { return nil } defer rows.Close() var result []domain.AccountRoutingState for rows.Next() { var s domain.AccountRoutingState if err := rows.Scan(&s.AccountID, &s.Platform, &s.AccountStatus, &s.RoutingEnabled, &s.RiskScore, &s.ReasonCode, &s.LastProbeAt, &s.Version); err != nil { continue } result = append(result, s) } return result } // ─── Package Change Events ──────────────────────────────────────────────────── func (r *PostgresRepository) AppendPackageEvent(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) { return r.AppendPackageEventContext(ctx, evt) } func (r *PostgresRepository) AppendPackageEventContext(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) { if err := insertPackageEvent(ctx, r.db, evt); err != nil { return domain.PackageChangeEvent{}, err } return evt, nil } func (r *PostgresRepository) ListPackageEvents(ctx context.Context) []domain.PackageChangeEvent { query := ` SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version, COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time, retry_count, last_retry_at, next_retry_at, COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'') FROM supply_intelligence_package_change_events ORDER BY occurred_at DESC, event_id` rows, err := r.db.Query(ctx, query) if err != nil { return nil } if rows.Err() != nil { return nil } defer rows.Close() return scanEvents(rows) } func (r *PostgresRepository) GetLatestPackageEvent(ctx context.Context, platform, model string) (domain.PackageChangeEvent, bool) { query := ` SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version, COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time, retry_count, last_retry_at, next_retry_at, COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'') FROM supply_intelligence_package_change_events WHERE platform=$1 AND model=$2 ORDER BY occurred_at DESC, event_id DESC LIMIT 1` row := r.db.QueryRow(ctx, query, platform, model) var evt domain.PackageChangeEvent err := scanEventScanner(row, &evt) if errors.Is(err, pgx.ErrNoRows) { return domain.PackageChangeEvent{}, false } if err != nil { return domain.PackageChangeEvent{}, false } return evt, true } func (r *PostgresRepository) ListPackageEventsAfter(ctx context.Context, cursor string) ([]domain.PackageChangeEvent, string) { const pageSize = 50 var args []interface{} var query string if cursor == "" { args = append(args, pageSize) query = ` SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version, COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time, retry_count, last_retry_at, next_retry_at, COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'') FROM supply_intelligence_package_change_events ORDER BY occurred_at DESC, event_id DESC LIMIT $1` } else { args = append(args, cursor, pageSize) query = ` WITH cursor_event AS ( SELECT occurred_at FROM supply_intelligence_package_change_events WHERE event_id=$1 ) SELECT e.event_id, e.account_id, e.event_type, e.package_id, e.platform, e.model, e.occurred_at, e.version, COALESCE(e.ack_status,''), COALESCE(e.ack_consumer,''), COALESCE(e.ack_detail,''), e.ack_time, e.retry_count, e.last_retry_at, e.next_retry_at, COALESCE(e.last_failure_category,''), COALESCE(e.last_failure_detail,'') FROM supply_intelligence_package_change_events e JOIN cursor_event c ON e.occurred_at < c.occurred_at OR (e.occurred_at = c.occurred_at AND e.event_id > $1) ORDER BY e.occurred_at DESC, e.event_id DESC LIMIT $2` } rows, err := r.db.Query(ctx, query, args...) if err != nil { return nil, "" } if rows.Err() != nil { return nil, "" } defer rows.Close() var result []domain.PackageChangeEvent for rows.Next() { var e domain.PackageChangeEvent if err := scanEventRow(rows, &e); err != nil { continue } result = append(result, e) } // next cursor is last eventID only if there is another page next := "" if len(result) == pageSize && len(result) > 0 { next = result[len(result)-1].EventID } return result, next } func (r *PostgresRepository) AckPackageEvent(ctx context.Context, eventID, consumer string, result domain.GatewayAckResult, detail string, ackedAt time.Time) (domain.PackageChangeEvent, error) { query := ` UPDATE supply_intelligence_package_change_events SET ack_status=$2, ack_consumer=$3, ack_detail=$4, ack_time=$5, next_retry_at=NULL WHERE event_id=$1` commandTag, err := r.db.Exec(ctx, query, eventID, string(result), consumer, detail, ackedAt) if err != nil { return domain.PackageChangeEvent{}, err } if commandTag.RowsAffected() == 0 { return domain.PackageChangeEvent{}, ErrEventNotFound } return r.getEventByID(ctx, eventID) } func (r *PostgresRepository) getEventByID(ctx context.Context, eventID string) (domain.PackageChangeEvent, error) { query := ` SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version, COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time, retry_count, last_retry_at, next_retry_at, COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'') FROM supply_intelligence_package_change_events WHERE event_id=$1` row := r.db.QueryRow(ctx, query, eventID) var e domain.PackageChangeEvent if err := scanEventScanner(row, &e); errors.Is(err, pgx.ErrNoRows) { return domain.PackageChangeEvent{}, ErrEventNotFound } else if err != nil { return domain.PackageChangeEvent{}, err } return e, nil } func (r *PostgresRepository) GetPackageEventByID(ctx context.Context, eventID string) (domain.PackageChangeEvent, bool) { evt, err := r.getEventByID(ctx, eventID) if errors.Is(err, ErrEventNotFound) { return domain.PackageChangeEvent{}, false } if err != nil { return domain.PackageChangeEvent{}, false } return evt, true } // ─── Gateway Snapshot ───────────────────────────────────────────────────────── func (r *PostgresRepository) UpsertGatewayAppliedSnapshot(ctx context.Context, snapshot domain.GatewayAppliedSnapshot) domain.GatewayAppliedSnapshot { query := ` INSERT INTO supply_intelligence_gateway_applied_snapshots (consumer, last_event_id, last_package_id, last_platform, last_model, last_applied_version, last_result, updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT (consumer) DO UPDATE SET last_event_id=EXCLUDED.last_event_id, last_package_id=EXCLUDED.last_package_id, last_platform=EXCLUDED.last_platform, last_model=EXCLUDED.last_model, last_applied_version=EXCLUDED.last_applied_version, last_result=EXCLUDED.last_result, updated_at=EXCLUDED.updated_at RETURNING consumer, last_event_id, last_package_id, last_platform, last_model, last_applied_version, last_result, updated_at` var out domain.GatewayAppliedSnapshot err := r.db.QueryRow(ctx, query, snapshot.Consumer, snapshot.LastEventID, snapshot.LastPackageID, snapshot.LastPlatform, snapshot.LastModel, snapshot.LastAppliedVersion, snapshot.LastResult, snapshot.UpdatedAt, ).Scan(&out.Consumer, &out.LastEventID, &out.LastPackageID, &out.LastPlatform, &out.LastModel, &out.LastAppliedVersion, &out.LastResult, &out.UpdatedAt) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return snapshot } return out } func (r *PostgresRepository) GetGatewayAppliedSnapshot(ctx context.Context, consumer string) (domain.GatewayAppliedSnapshot, bool) { query := ` SELECT consumer, last_event_id, last_package_id, last_platform, last_model, last_applied_version, last_result, updated_at FROM supply_intelligence_gateway_applied_snapshots WHERE consumer=$1` row := r.db.QueryRow(ctx, query, consumer) var s domain.GatewayAppliedSnapshot err := row.Scan(&s.Consumer, &s.LastEventID, &s.LastPackageID, &s.LastPlatform, &s.LastModel, &s.LastAppliedVersion, &s.LastResult, &s.UpdatedAt) if errors.Is(err, pgx.ErrNoRows) { return domain.GatewayAppliedSnapshot{}, false } if err != nil { return domain.GatewayAppliedSnapshot{}, false } return s, true } // ─── Discovery Candidates ───────────────────────────────────────────────────── func (r *PostgresRepository) GetDiscoveryCandidateByID(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) { return r.GetDiscoveryCandidateByIDContext(ctx, candidateID) } func (r *PostgresRepository) GetDiscoveryCandidateByIDContext(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) { query := ` SELECT candidate_id, account_id, platform, model, status, source, reason_code, discovered_at, updated_at, version FROM supply_intelligence_model_candidates WHERE candidate_id=$1` row := r.db.QueryRow(ctx, query, candidateID) var c domain.DiscoveryCandidate err := row.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status, &c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version) if errors.Is(err, pgx.ErrNoRows) { return domain.DiscoveryCandidate{}, false } if err != nil { return domain.DiscoveryCandidate{}, false } return c, true } func (r *PostgresRepository) FindDiscoveryCandidate(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) { return r.FindDiscoveryCandidateContext(ctx, accountID, platform, model) } func (r *PostgresRepository) FindDiscoveryCandidateContext(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) { query := ` SELECT candidate_id, account_id, platform, model, status, source, reason_code, discovered_at, updated_at, version FROM supply_intelligence_model_candidates WHERE account_id=$1 AND platform=$2 AND model=$3` row := r.db.QueryRow(ctx, query, accountID, platform, model) var c domain.DiscoveryCandidate err := row.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status, &c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version) if errors.Is(err, pgx.ErrNoRows) { return domain.DiscoveryCandidate{}, false } if err != nil { return domain.DiscoveryCandidate{}, false } return c, true } func (r *PostgresRepository) GetLatestDiscoveryCandidate(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) { return r.GetLatestDiscoveryCandidateContext(ctx, platform, model) } func (r *PostgresRepository) GetLatestDiscoveryCandidateContext(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) { query := ` SELECT candidate_id, account_id, platform, model, status, source, reason_code, discovered_at, updated_at, version FROM supply_intelligence_model_candidates WHERE platform=$1 AND model=$2 ORDER BY updated_at DESC, candidate_id DESC LIMIT 1` row := r.db.QueryRow(ctx, query, platform, model) var c domain.DiscoveryCandidate err := row.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status, &c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version) if errors.Is(err, pgx.ErrNoRows) { return domain.DiscoveryCandidate{}, false } if err != nil { return domain.DiscoveryCandidate{}, false } return c, true } func (r *PostgresRepository) UpsertDiscoveryCandidate(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate { return r.UpsertDiscoveryCandidateContext(ctx, candidate) } func (r *PostgresRepository) UpsertDiscoveryCandidateContext(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate { query := ` INSERT INTO supply_intelligence_model_candidates (candidate_id, account_id, platform, model, status, source, reason_code, discovered_at, updated_at, version) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,1) ON CONFLICT (platform, model) DO UPDATE SET account_id=EXCLUDED.account_id, status=EXCLUDED.status, source=EXCLUDED.source, reason_code=EXCLUDED.reason_code, updated_at=EXCLUDED.updated_at, version=supply_intelligence_model_candidates.version+1 RETURNING version` var version int64 err := r.db.QueryRow(ctx, query, candidate.CandidateID, candidate.AccountID, candidate.Platform, candidate.Model, candidate.Status, candidate.Source, candidate.ReasonCode, candidate.DiscoveredAt, candidate.UpdatedAt, ).Scan(&version) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return candidate } candidate.Version = version return candidate } func (r *PostgresRepository) ListDiscoveryCandidates(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate { return r.ListDiscoveryCandidatesContext(ctx, status) } func (r *PostgresRepository) ListDiscoveryCandidatesContext(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate { var query string var args []interface{} if status == "" { query = ` SELECT candidate_id, account_id, platform, model, status, source, reason_code, discovered_at, updated_at, version FROM supply_intelligence_model_candidates ORDER BY discovered_at DESC` } else { query = ` SELECT candidate_id, account_id, platform, model, status, source, reason_code, discovered_at, updated_at, version FROM supply_intelligence_model_candidates WHERE status=$1 ORDER BY discovered_at DESC` args = append(args, string(status)) } rows, err := r.db.Query(ctx, query, args...) if err != nil { return nil } if rows.Err() != nil { return nil } defer rows.Close() var result []domain.DiscoveryCandidate for rows.Next() { var c domain.DiscoveryCandidate if err := rows.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status, &c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version); err != nil { continue } result = append(result, c) } return result } func (r *PostgresRepository) UpdateCandidateStatus(ctx context.Context, candidateID string, status domain.DiscoveryCandidateStatus, failureCode, failureSummary string) error { query := ` UPDATE supply_intelligence_model_candidates SET status=$2, reason_code=$3, updated_at=now() WHERE candidate_id=$1` _, err := r.db.Exec(ctx, query, candidateID, string(status), failureCode) return err } // ─── Supply Packages ─────────────────────────────────────────────────────────── func (r *PostgresRepository) UpsertSupplyPackage(ctx context.Context, pkg domain.SupplyPackage) error { query := ` INSERT INTO supply_intelligence_supply_packages (package_id, platform, model, status, source, created_at, updated_at, version) VALUES ( CASE WHEN $1 = 0 THEN nextval('supply_package_id_seq') ELSE $1 END, $2,$3,$4,$5,$6,$7,1 ) ON CONFLICT (platform, model) DO UPDATE SET status=EXCLUDED.status, source=EXCLUDED.source, updated_at=EXCLUDED.updated_at, version=supply_intelligence_supply_packages.version+1 RETURNING package_id, version` var packageID int64 var version int64 if err := r.db.QueryRow(ctx, query, pkg.PackageID, pkg.Platform, pkg.Model, pkg.Status, pkg.Source, pkg.CreatedAt, pkg.UpdatedAt, ).Scan(&packageID, &version); err != nil { return err } _ = packageID _ = version return nil } func (r *PostgresRepository) GetSupplyPackage(ctx context.Context, platform, model string) (domain.SupplyPackage, bool) { query := ` SELECT package_id, platform, model, status, source, created_at, updated_at, version FROM supply_intelligence_supply_packages WHERE platform=$1 AND model=$2` row := r.db.QueryRow(ctx, query, platform, model) var p domain.SupplyPackage err := row.Scan(&p.PackageID, &p.Platform, &p.Model, &p.Status, &p.Source, &p.CreatedAt, &p.UpdatedAt, &p.Version) if errors.Is(err, pgx.ErrNoRows) { return domain.SupplyPackage{}, false } if err != nil { return domain.SupplyPackage{}, false } return p, true } func (r *PostgresRepository) ListSupplyPackages(ctx context.Context, status string) []domain.SupplyPackage { var query string var args []interface{} if status == "" { query = `SELECT package_id, platform, model, status, source, created_at, updated_at, version FROM supply_intelligence_supply_packages` } else { query = `SELECT package_id, platform, model, status, source, created_at, updated_at, version FROM supply_intelligence_supply_packages WHERE status=$1` args = append(args, status) } rows, err := r.db.Query(ctx, query, args...) if err != nil { return nil } if rows.Err() != nil { return nil } defer rows.Close() var result []domain.SupplyPackage for rows.Next() { var p domain.SupplyPackage if err := rows.Scan(&p.PackageID, &p.Platform, &p.Model, &p.Status, &p.Source, &p.CreatedAt, &p.UpdatedAt, &p.Version); err != nil { continue } result = append(result, p) } return result } // ─── Probe Execution Logs ────────────────────────────────────────────────────── func (r *PostgresRepository) AppendProbeExecutionLog(ctx context.Context, log domain.ProbeExecutionLog) error { query := ` INSERT INTO supply_intelligence_probe_execution_logs (account_id, platform, probe_result, failure_class, http_status, latency_ms, risk_score, evaluated_transition, executed_at, request_id, version) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,1)` _, err := r.db.Exec(ctx, query, log.AccountID, log.Platform, log.ProbeResult, log.FailureClass, log.HTTPStatus, log.LatencyMs, log.RiskScore, log.EvaluatedTransition, log.ExecutedAt, log.RequestID, ) return err } func (r *PostgresRepository) ListProbeExecutionLogs(ctx context.Context, accountID int64, limit int) ([]domain.ProbeExecutionLog, error) { query := ` SELECT log_id, account_id, platform, probe_result, failure_class, http_status, latency_ms, risk_score, evaluated_transition, executed_at, request_id, version FROM supply_intelligence_probe_execution_logs WHERE account_id=$1 ORDER BY executed_at DESC LIMIT $2` rows, err := r.db.Query(ctx, query, accountID, limit) if err != nil { return nil, err } if rows.Err() != nil { return nil, rows.Err() } defer rows.Close() var result []domain.ProbeExecutionLog for rows.Next() { var l domain.ProbeExecutionLog if err := rows.Scan(&l.LogID, &l.AccountID, &l.Platform, &l.ProbeResult, &l.FailureClass, &l.HTTPStatus, &l.LatencyMs, &l.RiskScore, &l.EvaluatedTransition, &l.ExecutedAt, &l.RequestID, &l.Version); err != nil { continue } result = append(result, l) } return result, nil } // ─── Helpers ────────────────────────────────────────────────────────────────── func (r *PostgresRepository) ListRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time, limit int) []domain.PackageChangeEvent { query := ` SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version, COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time, retry_count, last_retry_at, next_retry_at, COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'') FROM supply_intelligence_package_change_events WHERE ack_status=$1 AND next_retry_at IS NOT NULL AND next_retry_at <= $2 ORDER BY next_retry_at ASC, occurred_at DESC, event_id DESC` rows, err := r.db.Query(ctx, query, string(domain.GatewayAckResultPending), now) if err != nil { return nil } if rows.Err() != nil { return nil } defer rows.Close() items := scanEvents(rows) if limit > 0 && len(items) > limit { items = items[:limit] } _ = consumer return items } func (r *PostgresRepository) CountPackageEventsBySyncStatus(ctx context.Context, status domain.GatewaySyncStatus) int { query := `SELECT COUNT(*) FROM supply_intelligence_package_change_events WHERE ack_status=$1` row := r.db.QueryRow(ctx, query, string(status)) var count int if err := row.Scan(&count); err != nil { return 0 } return count } func (r *PostgresRepository) CountRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time) int { query := `SELECT COUNT(*) FROM supply_intelligence_package_change_events WHERE ack_status=$1 AND next_retry_at IS NOT NULL AND next_retry_at <= $2` row := r.db.QueryRow(ctx, query, string(domain.GatewayAckResultPending), now) var count int if err := row.Scan(&count); err != nil { return 0 } _ = consumer return count } func (r *PostgresRepository) MarkPackageEventRetry(ctx context.Context, eventID string, retryCount int, nextRetryAt time.Time, category domain.GatewayFailureCategory, detail string, retriedAt time.Time) (domain.PackageChangeEvent, error) { query := ` UPDATE supply_intelligence_package_change_events SET ack_status=$2, retry_count=$3, last_retry_at=$4, next_retry_at=$5, last_failure_category=$6, last_failure_detail=$7, ack_detail=$7 WHERE event_id=$1` commandTag, err := r.db.Exec(ctx, query, eventID, string(domain.GatewayAckResultPending), retryCount, retriedAt, nextRetryAt, string(category), detail) if err != nil { return domain.PackageChangeEvent{}, err } if commandTag.RowsAffected() == 0 { return domain.PackageChangeEvent{}, ErrEventNotFound } return r.getEventByID(ctx, eventID) } func scanEvents(rows pgx.Rows) []domain.PackageChangeEvent { var result []domain.PackageChangeEvent for rows.Next() { var e domain.PackageChangeEvent if err := scanEventRow(rows, &e); err != nil { continue } result = append(result, e) } return result } type eventScanner interface { Scan(dest ...interface{}) error } func scanEventScanner(scanner eventScanner, e *domain.PackageChangeEvent) error { return scanner.Scan( &e.EventID, &e.AccountID, &e.EventType, &e.PackageID, &e.Platform, &e.Model, &e.OccurredAt, &e.Version, &e.GatewaySyncStatus, &e.Consumer, &e.ConsumerDetail, &e.AckedAt, &e.RetryCount, &e.LastRetryAt, &e.NextRetryAt, &e.LastFailureCategory, &e.LastFailureDetail, ) } func scanEventRow(rows pgx.Rows, e *domain.PackageChangeEvent) error { return scanEventScanner(rows, e) } // AppendAdmissionTestLog inserts an admission test log entry. func (r *PostgresRepository) AppendAdmissionTestLog(ctx context.Context, candidateID string, status string, failureCode string, failureSummary string, testedAt time.Time) error { query := ` INSERT INTO supply_intelligence_admission_test_logs (candidate_id, status, failure_code, failure_summary, tested_at, version) VALUES ($1,$2,$3,$4,$5,1)` _, err := r.db.Exec(ctx, query, candidateID, status, failureCode, failureSummary, testedAt) return err } // ListAdmissionTestLogsByCandidate returns admission test logs for a candidate. func (r *PostgresRepository) ListAdmissionTestLogsByCandidate(ctx context.Context, candidateID string, limit int) ([]domain.AdmissionTestLog, error) { query := ` SELECT test_id, candidate_id, status, failure_code, failure_summary, tested_at, version FROM supply_intelligence_admission_test_logs WHERE candidate_id=$1 ORDER BY tested_at DESC LIMIT $2` rows, err := r.db.Query(ctx, query, candidateID, limit) if err != nil { return nil, err } if rows.Err() != nil { return nil, rows.Err() } defer rows.Close() var result []domain.AdmissionTestLog for rows.Next() { var l domain.AdmissionTestLog if err := rows.Scan(&l.TestID, &l.CandidateID, &l.Status, &l.FailureCode, &l.FailureSummary, &l.TestedAt, &l.Version); err != nil { continue } result = append(result, l) } return result, nil } // ListActiveAccounts returns all accounts with routing enabled. func (r *PostgresRepository) ListActiveAccounts(ctx context.Context) []domain.AccountRoutingState { query := ` SELECT account_id, platform, account_status, routing_enabled, risk_score, reason_code, last_probe_at, created_at, updated_at, version FROM supply_intelligence_account_routing_states WHERE routing_enabled = true` rows, err := r.db.Query(ctx, query) if err != nil { return nil } if rows.Err() != nil { return nil } defer rows.Close() var result []domain.AccountRoutingState for rows.Next() { var rs domain.AccountRoutingState if err := rows.Scan(&rs.AccountID, &rs.Platform, &rs.AccountStatus, &rs.RoutingEnabled, &rs.RiskScore, &rs.ReasonCode, &rs.LastProbeAt, &rs.Version); err != nil { continue } result = append(result, rs) } return result } // ─── Supply Accounts ─────────────────────────────────────────────────────────── func (r *PostgresRepository) UpsertSupplyAccount(ctx context.Context, account domain.SupplyAccount) domain.SupplyAccount { query := ` INSERT INTO supply_intelligence_supply_accounts (account_id, platform, api_key, consumer_tag, status, created_at, updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (account_id) DO UPDATE SET platform=EXCLUDED.platform, api_key=EXCLUDED.api_key, consumer_tag=EXCLUDED.consumer_tag, status=EXCLUDED.status, updated_at=EXCLUDED.updated_at RETURNING account_id, platform, api_key, consumer_tag, status, created_at, updated_at` var a domain.SupplyAccount err := r.db.QueryRow(ctx, query, account.AccountID, account.Platform, account.APIKey, account.ConsumerTag, account.Status, account.CreatedAt, account.UpdatedAt, ).Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt) if err != nil { return account } return a } func (r *PostgresRepository) GetSupplyAccount(ctx context.Context, accountID int64) (domain.SupplyAccount, bool) { query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE account_id=$1` row := r.db.QueryRow(ctx, query, accountID) var a domain.SupplyAccount err := row.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt) if errors.Is(err, pgx.ErrNoRows) { return domain.SupplyAccount{}, false } if err != nil { return domain.SupplyAccount{}, false } return a, true } func (r *PostgresRepository) ListSupplyAccountsByPlatform(ctx context.Context, platform string) []domain.SupplyAccount { query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE platform=$1 AND status='active'` rows, err := r.db.Query(ctx, query, platform) if err != nil { return nil } defer rows.Close() var result []domain.SupplyAccount for rows.Next() { var a domain.SupplyAccount if err := rows.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt); err != nil { continue } result = append(result, a) } return result } func (r *PostgresRepository) ListSupplyAccounts(ctx context.Context) []domain.SupplyAccount { query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE status='active'` rows, err := r.db.Query(ctx, query) if err != nil { return nil } defer rows.Close() var result []domain.SupplyAccount for rows.Next() { var a domain.SupplyAccount if err := rows.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt); err != nil { continue } result = append(result, a) } return result } func (r *PostgresRepository) ListSupplyAccountsByConsumer(ctx context.Context, consumerTag string) []domain.SupplyAccount { query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE consumer_tag=$1 AND status='active'` rows, err := r.db.Query(ctx, query, consumerTag) if err != nil { return nil } defer rows.Close() var result []domain.SupplyAccount for rows.Next() { var a domain.SupplyAccount if err := rows.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt); err != nil { continue } result = append(result, a) } return result } func (r *PostgresRepository) PublishPackageAtomically(ctx context.Context, input publish.PublishPackageAtomicInput) (publish.PublishPackageAtomicResult, error) { tx, err := r.db.Begin(ctx) if err != nil { return publish.PublishPackageAtomicResult{}, err } defer tx.Rollback(ctx) commandTag, err := tx.Exec(ctx, ` UPDATE supply_intelligence_model_candidates SET status=$2, reason_code=$3, updated_at=$4, version=$5 WHERE candidate_id=$1 AND status=$6`, input.Candidate.CandidateID, string(input.Candidate.Status), input.Candidate.ReasonCode, input.Candidate.UpdatedAt, input.Candidate.Version, string(domain.DiscoveryCandidateStatusTestPassed), ) if err != nil { return publish.PublishPackageAtomicResult{}, err } if commandTag.RowsAffected() == 0 { currentCandidate, ok := r.GetDiscoveryCandidateByIDContext(ctx, input.Candidate.CandidateID) if ok && currentCandidate.Status == domain.DiscoveryCandidateStatusPublished { return publish.PublishPackageAtomicResult{}, publish.ErrPackageAlreadyPublished } return publish.PublishPackageAtomicResult{}, publish.ErrCandidateNotPublishable } commandTag, err = tx.Exec(ctx, ` INSERT INTO supply_intelligence_supply_packages (package_id, platform, model, status, source, created_at, updated_at, version) VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT (platform, model) DO UPDATE SET package_id=EXCLUDED.package_id, status=EXCLUDED.status, source=EXCLUDED.source, created_at=EXCLUDED.created_at, updated_at=EXCLUDED.updated_at, version=EXCLUDED.version WHERE supply_intelligence_supply_packages.status='draft'`, input.Package.PackageID, input.Package.Platform, input.Package.Model, input.Package.Status, input.Package.Source, input.Package.CreatedAt, input.Package.UpdatedAt, input.Package.Version, ) if err != nil { return publish.PublishPackageAtomicResult{}, err } if commandTag.RowsAffected() == 0 { return publish.PublishPackageAtomicResult{}, publish.ErrPackageAlreadyPublished } if err := insertPackageEvent(ctx, tx, input.Event); err != nil { if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "23505" { return publish.PublishPackageAtomicResult{}, publish.ErrDuplicatePublishRequest } return publish.PublishPackageAtomicResult{}, err } if err := tx.Commit(ctx); err != nil { return publish.PublishPackageAtomicResult{}, err } return publish.PublishPackageAtomicResult{Candidate: input.Candidate, Package: input.Package, Event: input.Event}, nil } func insertPackageEvent(ctx context.Context, execer dbtx, evt domain.PackageChangeEvent) error { query := ` INSERT INTO supply_intelligence_package_change_events (event_id, account_id, event_type, package_id, platform, model, occurred_at, version, ack_status) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,'pending')` _, err := execer.Exec(ctx, query, evt.EventID, evt.AccountID, evt.EventType, evt.PackageID, evt.Platform, evt.Model, evt.OccurredAt, evt.Version, ) return err }