Files
2026-05-12 18:49:52 +08:00

914 lines
35 KiB
Go

package repository
import (
"context"
"errors"
"fmt"
"time"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"supply-intelligence/internal/domain"
"supply-intelligence/internal/publish"
)
// PostgresRepository implements Repository using pgx.
type PostgresRepository struct {
db *pgxpool.Pool
}
// NewPostgresRepository connects to PostgreSQL using the given connection string.
func NewPostgresRepository(ctx context.Context, connString string) (*PostgresRepository, error) {
config, err := pgxpool.ParseConfig(connString)
if err != nil {
return nil, fmt.Errorf("parse conn string: %w", err)
}
pool, err := pgxpool.ConnectConfig(ctx, config)
if err != nil {
return nil, fmt.Errorf("connect to postgres: %w", err)
}
if err := pool.Ping(ctx); err != nil {
return nil, fmt.Errorf("ping postgres: %w", err)
}
return &PostgresRepository{db: pool}, nil
}
// Close releases the connection pool.
func (r *PostgresRepository) Close() { r.db.Close() }
type dbtx interface {
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
}
// ─── Routing State ────────────────────────────────────────────────────────────
func (r *PostgresRepository) UpsertRoutingState(ctx context.Context, state domain.AccountRoutingState) {
r.UpsertRoutingStateContext(ctx, state)
}
func (r *PostgresRepository) UpsertRoutingStateContext(ctx context.Context, state domain.AccountRoutingState) domain.AccountRoutingState {
query := `
INSERT INTO supply_intelligence_account_routing_states
(account_id, platform, account_status, routing_enabled, risk_score, reason_code, last_probe_at, version)
VALUES ($1,$2,$3,$4,$5,$6,$7,1)
ON CONFLICT (account_id) DO UPDATE SET
platform=EXCLUDED.platform,
account_status=EXCLUDED.account_status,
routing_enabled=EXCLUDED.routing_enabled,
risk_score=EXCLUDED.risk_score,
reason_code=EXCLUDED.reason_code,
last_probe_at=EXCLUDED.last_probe_at,
version=supply_intelligence_account_routing_states.version+1`
_, _ = r.db.Exec(ctx, query,
state.AccountID, state.Platform,
state.AccountStatus, state.RoutingEnabled,
state.RiskScore, state.ReasonCode, state.LastProbeAt,
)
return state
}
func (r *PostgresRepository) GetRoutingState(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) {
return r.GetRoutingStateContext(ctx, accountID)
}
func (r *PostgresRepository) GetRoutingStateContext(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) {
query := `
SELECT account_id, platform, account_status, routing_enabled, risk_score, reason_code, last_probe_at, version
FROM supply_intelligence_account_routing_states WHERE account_id=$1`
row := r.db.QueryRow(ctx, query, accountID)
var s domain.AccountRoutingState
err := row.Scan(&s.AccountID, &s.Platform, &s.AccountStatus, &s.RoutingEnabled, &s.RiskScore, &s.ReasonCode, &s.LastProbeAt, &s.Version)
if errors.Is(err, pgx.ErrNoRows) {
return domain.AccountRoutingState{}, false
}
if err != nil {
return domain.AccountRoutingState{}, false
}
return s, true
}
func (r *PostgresRepository) ListRoutingStatesByPlatform(ctx context.Context, platform string) []domain.AccountRoutingState {
query := `
SELECT account_id, platform, account_status, routing_enabled, risk_score, reason_code, last_probe_at, version
FROM supply_intelligence_account_routing_states WHERE platform=$1`
rows, err := r.db.Query(ctx, query, platform)
if err != nil {
return nil
}
if rows.Err() != nil {
return nil
}
defer rows.Close()
var result []domain.AccountRoutingState
for rows.Next() {
var s domain.AccountRoutingState
if err := rows.Scan(&s.AccountID, &s.Platform, &s.AccountStatus, &s.RoutingEnabled, &s.RiskScore, &s.ReasonCode, &s.LastProbeAt, &s.Version); err != nil {
continue
}
result = append(result, s)
}
return result
}
// ─── Package Change Events ────────────────────────────────────────────────────
func (r *PostgresRepository) AppendPackageEvent(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) {
return r.AppendPackageEventContext(ctx, evt)
}
func (r *PostgresRepository) AppendPackageEventContext(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) {
if err := insertPackageEvent(ctx, r.db, evt); err != nil {
return domain.PackageChangeEvent{}, err
}
return evt, nil
}
func (r *PostgresRepository) ListPackageEvents(ctx context.Context) []domain.PackageChangeEvent {
query := `
SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version,
COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time,
retry_count, last_retry_at, next_retry_at,
COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'')
FROM supply_intelligence_package_change_events
ORDER BY occurred_at DESC, event_id`
rows, err := r.db.Query(ctx, query)
if err != nil {
return nil
}
if rows.Err() != nil {
return nil
}
defer rows.Close()
return scanEvents(rows)
}
func (r *PostgresRepository) GetLatestPackageEvent(ctx context.Context, platform, model string) (domain.PackageChangeEvent, bool) {
query := `
SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at,
version, COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time,
retry_count, last_retry_at, next_retry_at,
COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'')
FROM supply_intelligence_package_change_events
WHERE platform=$1 AND model=$2
ORDER BY occurred_at DESC, event_id DESC
LIMIT 1`
row := r.db.QueryRow(ctx, query, platform, model)
var evt domain.PackageChangeEvent
err := scanEventScanner(row, &evt)
if errors.Is(err, pgx.ErrNoRows) {
return domain.PackageChangeEvent{}, false
}
if err != nil {
return domain.PackageChangeEvent{}, false
}
return evt, true
}
func (r *PostgresRepository) ListPackageEventsAfter(ctx context.Context, cursor string) ([]domain.PackageChangeEvent, string) {
const pageSize = 50
var args []interface{}
var query string
if cursor == "" {
args = append(args, pageSize)
query = `
SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version,
COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time,
retry_count, last_retry_at, next_retry_at,
COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'')
FROM supply_intelligence_package_change_events
ORDER BY occurred_at DESC, event_id DESC
LIMIT $1`
} else {
args = append(args, cursor, pageSize)
query = `
WITH cursor_event AS (
SELECT occurred_at FROM supply_intelligence_package_change_events WHERE event_id=$1
)
SELECT e.event_id, e.account_id, e.event_type, e.package_id, e.platform, e.model, e.occurred_at, e.version,
COALESCE(e.ack_status,''), COALESCE(e.ack_consumer,''), COALESCE(e.ack_detail,''), e.ack_time,
e.retry_count, e.last_retry_at, e.next_retry_at,
COALESCE(e.last_failure_category,''), COALESCE(e.last_failure_detail,'')
FROM supply_intelligence_package_change_events e
JOIN cursor_event c ON e.occurred_at < c.occurred_at
OR (e.occurred_at = c.occurred_at AND e.event_id > $1)
ORDER BY e.occurred_at DESC, e.event_id DESC
LIMIT $2`
}
rows, err := r.db.Query(ctx, query, args...)
if err != nil {
return nil, ""
}
if rows.Err() != nil {
return nil, ""
}
defer rows.Close()
var result []domain.PackageChangeEvent
for rows.Next() {
var e domain.PackageChangeEvent
if err := scanEventRow(rows, &e); err != nil {
continue
}
result = append(result, e)
}
// next cursor is last eventID only if there is another page
next := ""
if len(result) == pageSize && len(result) > 0 {
next = result[len(result)-1].EventID
}
return result, next
}
func (r *PostgresRepository) AckPackageEvent(ctx context.Context, eventID, consumer string, result domain.GatewayAckResult, detail string, ackedAt time.Time) (domain.PackageChangeEvent, error) {
query := `
UPDATE supply_intelligence_package_change_events
SET ack_status=$2, ack_consumer=$3, ack_detail=$4, ack_time=$5, next_retry_at=NULL
WHERE event_id=$1`
commandTag, err := r.db.Exec(ctx, query, eventID, string(result), consumer, detail, ackedAt)
if err != nil {
return domain.PackageChangeEvent{}, err
}
if commandTag.RowsAffected() == 0 {
return domain.PackageChangeEvent{}, ErrEventNotFound
}
return r.getEventByID(ctx, eventID)
}
func (r *PostgresRepository) getEventByID(ctx context.Context, eventID string) (domain.PackageChangeEvent, error) {
query := `
SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version,
COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time,
retry_count, last_retry_at, next_retry_at,
COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'')
FROM supply_intelligence_package_change_events WHERE event_id=$1`
row := r.db.QueryRow(ctx, query, eventID)
var e domain.PackageChangeEvent
if err := scanEventScanner(row, &e); errors.Is(err, pgx.ErrNoRows) {
return domain.PackageChangeEvent{}, ErrEventNotFound
} else if err != nil {
return domain.PackageChangeEvent{}, err
}
return e, nil
}
func (r *PostgresRepository) GetPackageEventByID(ctx context.Context, eventID string) (domain.PackageChangeEvent, bool) {
evt, err := r.getEventByID(ctx, eventID)
if errors.Is(err, ErrEventNotFound) {
return domain.PackageChangeEvent{}, false
}
if err != nil {
return domain.PackageChangeEvent{}, false
}
return evt, true
}
// ─── Gateway Snapshot ─────────────────────────────────────────────────────────
func (r *PostgresRepository) UpsertGatewayAppliedSnapshot(ctx context.Context, snapshot domain.GatewayAppliedSnapshot) domain.GatewayAppliedSnapshot {
query := `
INSERT INTO supply_intelligence_gateway_applied_snapshots
(consumer, last_event_id, last_package_id, last_platform, last_model,
last_applied_version, last_result, updated_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
ON CONFLICT (consumer) DO UPDATE SET
last_event_id=EXCLUDED.last_event_id,
last_package_id=EXCLUDED.last_package_id,
last_platform=EXCLUDED.last_platform,
last_model=EXCLUDED.last_model,
last_applied_version=EXCLUDED.last_applied_version,
last_result=EXCLUDED.last_result,
updated_at=EXCLUDED.updated_at
RETURNING consumer, last_event_id, last_package_id, last_platform, last_model, last_applied_version, last_result, updated_at`
var out domain.GatewayAppliedSnapshot
err := r.db.QueryRow(ctx, query,
snapshot.Consumer, snapshot.LastEventID, snapshot.LastPackageID,
snapshot.LastPlatform, snapshot.LastModel, snapshot.LastAppliedVersion,
snapshot.LastResult, snapshot.UpdatedAt,
).Scan(&out.Consumer, &out.LastEventID, &out.LastPackageID,
&out.LastPlatform, &out.LastModel, &out.LastAppliedVersion, &out.LastResult, &out.UpdatedAt)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return snapshot
}
return out
}
func (r *PostgresRepository) GetGatewayAppliedSnapshot(ctx context.Context, consumer string) (domain.GatewayAppliedSnapshot, bool) {
query := `
SELECT consumer, last_event_id, last_package_id, last_platform, last_model,
last_applied_version, last_result, updated_at
FROM supply_intelligence_gateway_applied_snapshots WHERE consumer=$1`
row := r.db.QueryRow(ctx, query, consumer)
var s domain.GatewayAppliedSnapshot
err := row.Scan(&s.Consumer, &s.LastEventID, &s.LastPackageID,
&s.LastPlatform, &s.LastModel, &s.LastAppliedVersion, &s.LastResult, &s.UpdatedAt)
if errors.Is(err, pgx.ErrNoRows) {
return domain.GatewayAppliedSnapshot{}, false
}
if err != nil {
return domain.GatewayAppliedSnapshot{}, false
}
return s, true
}
// ─── Discovery Candidates ─────────────────────────────────────────────────────
func (r *PostgresRepository) GetDiscoveryCandidateByID(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) {
return r.GetDiscoveryCandidateByIDContext(ctx, candidateID)
}
func (r *PostgresRepository) GetDiscoveryCandidateByIDContext(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) {
query := `
SELECT candidate_id, account_id, platform, model, status, source, reason_code,
discovered_at, updated_at, version
FROM supply_intelligence_model_candidates WHERE candidate_id=$1`
row := r.db.QueryRow(ctx, query, candidateID)
var c domain.DiscoveryCandidate
err := row.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status,
&c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version)
if errors.Is(err, pgx.ErrNoRows) {
return domain.DiscoveryCandidate{}, false
}
if err != nil {
return domain.DiscoveryCandidate{}, false
}
return c, true
}
func (r *PostgresRepository) FindDiscoveryCandidate(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) {
return r.FindDiscoveryCandidateContext(ctx, accountID, platform, model)
}
func (r *PostgresRepository) FindDiscoveryCandidateContext(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) {
query := `
SELECT candidate_id, account_id, platform, model, status, source, reason_code,
discovered_at, updated_at, version
FROM supply_intelligence_model_candidates WHERE account_id=$1 AND platform=$2 AND model=$3`
row := r.db.QueryRow(ctx, query, accountID, platform, model)
var c domain.DiscoveryCandidate
err := row.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status,
&c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version)
if errors.Is(err, pgx.ErrNoRows) {
return domain.DiscoveryCandidate{}, false
}
if err != nil {
return domain.DiscoveryCandidate{}, false
}
return c, true
}
func (r *PostgresRepository) GetLatestDiscoveryCandidate(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) {
return r.GetLatestDiscoveryCandidateContext(ctx, platform, model)
}
func (r *PostgresRepository) GetLatestDiscoveryCandidateContext(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) {
query := `
SELECT candidate_id, account_id, platform, model, status, source, reason_code,
discovered_at, updated_at, version
FROM supply_intelligence_model_candidates
WHERE platform=$1 AND model=$2
ORDER BY updated_at DESC, candidate_id DESC
LIMIT 1`
row := r.db.QueryRow(ctx, query, platform, model)
var c domain.DiscoveryCandidate
err := row.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status,
&c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version)
if errors.Is(err, pgx.ErrNoRows) {
return domain.DiscoveryCandidate{}, false
}
if err != nil {
return domain.DiscoveryCandidate{}, false
}
return c, true
}
func (r *PostgresRepository) UpsertDiscoveryCandidate(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate {
return r.UpsertDiscoveryCandidateContext(ctx, candidate)
}
func (r *PostgresRepository) UpsertDiscoveryCandidateContext(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate {
query := `
INSERT INTO supply_intelligence_model_candidates
(candidate_id, account_id, platform, model, status, source, reason_code,
discovered_at, updated_at, version)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,1)
ON CONFLICT (platform, model) DO UPDATE SET
account_id=EXCLUDED.account_id,
status=EXCLUDED.status,
source=EXCLUDED.source,
reason_code=EXCLUDED.reason_code,
updated_at=EXCLUDED.updated_at,
version=supply_intelligence_model_candidates.version+1
RETURNING version`
var version int64
err := r.db.QueryRow(ctx, query,
candidate.CandidateID, candidate.AccountID, candidate.Platform, candidate.Model,
candidate.Status, candidate.Source, candidate.ReasonCode,
candidate.DiscoveredAt, candidate.UpdatedAt,
).Scan(&version)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return candidate
}
candidate.Version = version
return candidate
}
func (r *PostgresRepository) ListDiscoveryCandidates(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate {
return r.ListDiscoveryCandidatesContext(ctx, status)
}
func (r *PostgresRepository) ListDiscoveryCandidatesContext(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate {
var query string
var args []interface{}
if status == "" {
query = `
SELECT candidate_id, account_id, platform, model, status, source, reason_code,
discovered_at, updated_at, version
FROM supply_intelligence_model_candidates ORDER BY discovered_at DESC`
} else {
query = `
SELECT candidate_id, account_id, platform, model, status, source, reason_code,
discovered_at, updated_at, version
FROM supply_intelligence_model_candidates WHERE status=$1 ORDER BY discovered_at DESC`
args = append(args, string(status))
}
rows, err := r.db.Query(ctx, query, args...)
if err != nil {
return nil
}
if rows.Err() != nil {
return nil
}
defer rows.Close()
var result []domain.DiscoveryCandidate
for rows.Next() {
var c domain.DiscoveryCandidate
if err := rows.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status,
&c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version); err != nil {
continue
}
result = append(result, c)
}
return result
}
func (r *PostgresRepository) UpdateCandidateStatus(ctx context.Context, candidateID string, status domain.DiscoveryCandidateStatus, failureCode, failureSummary string) error {
query := `
UPDATE supply_intelligence_model_candidates
SET status=$2, reason_code=$3, updated_at=now()
WHERE candidate_id=$1`
_, err := r.db.Exec(ctx, query, candidateID, string(status), failureCode)
return err
}
// ─── Supply Packages ───────────────────────────────────────────────────────────
func (r *PostgresRepository) UpsertSupplyPackage(ctx context.Context, pkg domain.SupplyPackage) error {
query := `
INSERT INTO supply_intelligence_supply_packages
(package_id, platform, model, status, source, created_at, updated_at, version)
VALUES (
CASE WHEN $1 = 0 THEN nextval('supply_package_id_seq') ELSE $1 END,
$2,$3,$4,$5,$6,$7,1
)
ON CONFLICT (platform, model) DO UPDATE SET
status=EXCLUDED.status,
source=EXCLUDED.source,
updated_at=EXCLUDED.updated_at,
version=supply_intelligence_supply_packages.version+1
RETURNING package_id, version`
var packageID int64
var version int64
if err := r.db.QueryRow(ctx, query,
pkg.PackageID, pkg.Platform, pkg.Model, pkg.Status, pkg.Source,
pkg.CreatedAt, pkg.UpdatedAt,
).Scan(&packageID, &version); err != nil {
return err
}
_ = packageID
_ = version
return nil
}
func (r *PostgresRepository) GetSupplyPackage(ctx context.Context, platform, model string) (domain.SupplyPackage, bool) {
query := `
SELECT package_id, platform, model, status, source, created_at, updated_at, version
FROM supply_intelligence_supply_packages WHERE platform=$1 AND model=$2`
row := r.db.QueryRow(ctx, query, platform, model)
var p domain.SupplyPackage
err := row.Scan(&p.PackageID, &p.Platform, &p.Model, &p.Status, &p.Source, &p.CreatedAt, &p.UpdatedAt, &p.Version)
if errors.Is(err, pgx.ErrNoRows) {
return domain.SupplyPackage{}, false
}
if err != nil {
return domain.SupplyPackage{}, false
}
return p, true
}
func (r *PostgresRepository) ListSupplyPackages(ctx context.Context, status string) []domain.SupplyPackage {
var query string
var args []interface{}
if status == "" {
query = `SELECT package_id, platform, model, status, source, created_at, updated_at, version FROM supply_intelligence_supply_packages`
} else {
query = `SELECT package_id, platform, model, status, source, created_at, updated_at, version FROM supply_intelligence_supply_packages WHERE status=$1`
args = append(args, status)
}
rows, err := r.db.Query(ctx, query, args...)
if err != nil {
return nil
}
if rows.Err() != nil {
return nil
}
defer rows.Close()
var result []domain.SupplyPackage
for rows.Next() {
var p domain.SupplyPackage
if err := rows.Scan(&p.PackageID, &p.Platform, &p.Model, &p.Status, &p.Source, &p.CreatedAt, &p.UpdatedAt, &p.Version); err != nil {
continue
}
result = append(result, p)
}
return result
}
// ─── Probe Execution Logs ──────────────────────────────────────────────────────
func (r *PostgresRepository) AppendProbeExecutionLog(ctx context.Context, log domain.ProbeExecutionLog) error {
query := `
INSERT INTO supply_intelligence_probe_execution_logs
(account_id, platform, probe_result, failure_class, http_status, latency_ms,
risk_score, evaluated_transition, executed_at, request_id, version)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,1)`
_, err := r.db.Exec(ctx, query,
log.AccountID, log.Platform, log.ProbeResult, log.FailureClass,
log.HTTPStatus, log.LatencyMs, log.RiskScore, log.EvaluatedTransition,
log.ExecutedAt, log.RequestID,
)
return err
}
func (r *PostgresRepository) ListProbeExecutionLogs(ctx context.Context, accountID int64, limit int) ([]domain.ProbeExecutionLog, error) {
query := `
SELECT log_id, account_id, platform, probe_result, failure_class, http_status, latency_ms,
risk_score, evaluated_transition, executed_at, request_id, version
FROM supply_intelligence_probe_execution_logs
WHERE account_id=$1
ORDER BY executed_at DESC LIMIT $2`
rows, err := r.db.Query(ctx, query, accountID, limit)
if err != nil {
return nil, err
}
if rows.Err() != nil {
return nil, rows.Err()
}
defer rows.Close()
var result []domain.ProbeExecutionLog
for rows.Next() {
var l domain.ProbeExecutionLog
if err := rows.Scan(&l.LogID, &l.AccountID, &l.Platform, &l.ProbeResult,
&l.FailureClass, &l.HTTPStatus, &l.LatencyMs, &l.RiskScore,
&l.EvaluatedTransition, &l.ExecutedAt, &l.RequestID, &l.Version); err != nil {
continue
}
result = append(result, l)
}
return result, nil
}
// ─── Helpers ──────────────────────────────────────────────────────────────────
func (r *PostgresRepository) ListRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time, limit int) []domain.PackageChangeEvent {
query := `
SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version,
COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time,
retry_count, last_retry_at, next_retry_at,
COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'')
FROM supply_intelligence_package_change_events
WHERE ack_status=$1 AND next_retry_at IS NOT NULL AND next_retry_at <= $2
ORDER BY next_retry_at ASC, occurred_at DESC, event_id DESC`
rows, err := r.db.Query(ctx, query, string(domain.GatewayAckResultPending), now)
if err != nil {
return nil
}
if rows.Err() != nil {
return nil
}
defer rows.Close()
items := scanEvents(rows)
if limit > 0 && len(items) > limit {
items = items[:limit]
}
_ = consumer
return items
}
func (r *PostgresRepository) CountPackageEventsBySyncStatus(ctx context.Context, status domain.GatewaySyncStatus) int {
query := `SELECT COUNT(*) FROM supply_intelligence_package_change_events WHERE ack_status=$1`
row := r.db.QueryRow(ctx, query, string(status))
var count int
if err := row.Scan(&count); err != nil {
return 0
}
return count
}
func (r *PostgresRepository) CountRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time) int {
query := `SELECT COUNT(*) FROM supply_intelligence_package_change_events WHERE ack_status=$1 AND next_retry_at IS NOT NULL AND next_retry_at <= $2`
row := r.db.QueryRow(ctx, query, string(domain.GatewayAckResultPending), now)
var count int
if err := row.Scan(&count); err != nil {
return 0
}
_ = consumer
return count
}
func (r *PostgresRepository) MarkPackageEventRetry(ctx context.Context, eventID string, retryCount int, nextRetryAt time.Time, category domain.GatewayFailureCategory, detail string, retriedAt time.Time) (domain.PackageChangeEvent, error) {
query := `
UPDATE supply_intelligence_package_change_events
SET ack_status=$2, retry_count=$3, last_retry_at=$4, next_retry_at=$5,
last_failure_category=$6, last_failure_detail=$7, ack_detail=$7
WHERE event_id=$1`
commandTag, err := r.db.Exec(ctx, query, eventID, string(domain.GatewayAckResultPending), retryCount, retriedAt, nextRetryAt, string(category), detail)
if err != nil {
return domain.PackageChangeEvent{}, err
}
if commandTag.RowsAffected() == 0 {
return domain.PackageChangeEvent{}, ErrEventNotFound
}
return r.getEventByID(ctx, eventID)
}
func scanEvents(rows pgx.Rows) []domain.PackageChangeEvent {
var result []domain.PackageChangeEvent
for rows.Next() {
var e domain.PackageChangeEvent
if err := scanEventRow(rows, &e); err != nil {
continue
}
result = append(result, e)
}
return result
}
type eventScanner interface {
Scan(dest ...interface{}) error
}
func scanEventScanner(scanner eventScanner, e *domain.PackageChangeEvent) error {
return scanner.Scan(
&e.EventID, &e.AccountID, &e.EventType, &e.PackageID, &e.Platform, &e.Model,
&e.OccurredAt, &e.Version,
&e.GatewaySyncStatus, &e.Consumer, &e.ConsumerDetail, &e.AckedAt,
&e.RetryCount, &e.LastRetryAt, &e.NextRetryAt,
&e.LastFailureCategory, &e.LastFailureDetail,
)
}
func scanEventRow(rows pgx.Rows, e *domain.PackageChangeEvent) error {
return scanEventScanner(rows, e)
}
// AppendAdmissionTestLog inserts an admission test log entry.
func (r *PostgresRepository) AppendAdmissionTestLog(ctx context.Context, candidateID string, status string, failureCode string, failureSummary string, testedAt time.Time) error {
query := `
INSERT INTO supply_intelligence_admission_test_logs
(candidate_id, status, failure_code, failure_summary, tested_at, version)
VALUES ($1,$2,$3,$4,$5,1)`
_, err := r.db.Exec(ctx, query, candidateID, status, failureCode, failureSummary, testedAt)
return err
}
// ListAdmissionTestLogsByCandidate returns admission test logs for a candidate.
func (r *PostgresRepository) ListAdmissionTestLogsByCandidate(ctx context.Context, candidateID string, limit int) ([]domain.AdmissionTestLog, error) {
query := `
SELECT test_id, candidate_id, status, failure_code, failure_summary, tested_at, version
FROM supply_intelligence_admission_test_logs
WHERE candidate_id=$1
ORDER BY tested_at DESC LIMIT $2`
rows, err := r.db.Query(ctx, query, candidateID, limit)
if err != nil {
return nil, err
}
if rows.Err() != nil {
return nil, rows.Err()
}
defer rows.Close()
var result []domain.AdmissionTestLog
for rows.Next() {
var l domain.AdmissionTestLog
if err := rows.Scan(&l.TestID, &l.CandidateID, &l.Status, &l.FailureCode, &l.FailureSummary, &l.TestedAt, &l.Version); err != nil {
continue
}
result = append(result, l)
}
return result, nil
}
// ListActiveAccounts returns all accounts with routing enabled.
func (r *PostgresRepository) ListActiveAccounts(ctx context.Context) []domain.AccountRoutingState {
query := `
SELECT account_id, platform, account_status, routing_enabled,
risk_score, reason_code, last_probe_at, created_at, updated_at, version
FROM supply_intelligence_account_routing_states
WHERE routing_enabled = true`
rows, err := r.db.Query(ctx, query)
if err != nil {
return nil
}
if rows.Err() != nil {
return nil
}
defer rows.Close()
var result []domain.AccountRoutingState
for rows.Next() {
var rs domain.AccountRoutingState
if err := rows.Scan(&rs.AccountID, &rs.Platform, &rs.AccountStatus, &rs.RoutingEnabled,
&rs.RiskScore, &rs.ReasonCode, &rs.LastProbeAt, &rs.Version); err != nil {
continue
}
result = append(result, rs)
}
return result
}
// ─── Supply Accounts ───────────────────────────────────────────────────────────
func (r *PostgresRepository) UpsertSupplyAccount(ctx context.Context, account domain.SupplyAccount) domain.SupplyAccount {
query := `
INSERT INTO supply_intelligence_supply_accounts (account_id, platform, api_key, consumer_tag, status, created_at, updated_at)
VALUES ($1,$2,$3,$4,$5,$6,$7)
ON CONFLICT (account_id) DO UPDATE SET
platform=EXCLUDED.platform,
api_key=EXCLUDED.api_key,
consumer_tag=EXCLUDED.consumer_tag,
status=EXCLUDED.status,
updated_at=EXCLUDED.updated_at
RETURNING account_id, platform, api_key, consumer_tag, status, created_at, updated_at`
var a domain.SupplyAccount
err := r.db.QueryRow(ctx, query,
account.AccountID, account.Platform, account.APIKey, account.ConsumerTag,
account.Status, account.CreatedAt, account.UpdatedAt,
).Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt)
if err != nil {
return account
}
return a
}
func (r *PostgresRepository) GetSupplyAccount(ctx context.Context, accountID int64) (domain.SupplyAccount, bool) {
query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE account_id=$1`
row := r.db.QueryRow(ctx, query, accountID)
var a domain.SupplyAccount
err := row.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt)
if errors.Is(err, pgx.ErrNoRows) {
return domain.SupplyAccount{}, false
}
if err != nil {
return domain.SupplyAccount{}, false
}
return a, true
}
func (r *PostgresRepository) ListSupplyAccountsByPlatform(ctx context.Context, platform string) []domain.SupplyAccount {
query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE platform=$1 AND status='active'`
rows, err := r.db.Query(ctx, query, platform)
if err != nil {
return nil
}
defer rows.Close()
var result []domain.SupplyAccount
for rows.Next() {
var a domain.SupplyAccount
if err := rows.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt); err != nil {
continue
}
result = append(result, a)
}
return result
}
func (r *PostgresRepository) ListSupplyAccounts(ctx context.Context) []domain.SupplyAccount {
query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE status='active'`
rows, err := r.db.Query(ctx, query)
if err != nil {
return nil
}
defer rows.Close()
var result []domain.SupplyAccount
for rows.Next() {
var a domain.SupplyAccount
if err := rows.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt); err != nil {
continue
}
result = append(result, a)
}
return result
}
func (r *PostgresRepository) ListSupplyAccountsByConsumer(ctx context.Context, consumerTag string) []domain.SupplyAccount {
query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE consumer_tag=$1 AND status='active'`
rows, err := r.db.Query(ctx, query, consumerTag)
if err != nil {
return nil
}
defer rows.Close()
var result []domain.SupplyAccount
for rows.Next() {
var a domain.SupplyAccount
if err := rows.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt); err != nil {
continue
}
result = append(result, a)
}
return result
}
func (r *PostgresRepository) PublishPackageAtomically(ctx context.Context, input publish.PublishPackageAtomicInput) (publish.PublishPackageAtomicResult, error) {
tx, err := r.db.Begin(ctx)
if err != nil {
return publish.PublishPackageAtomicResult{}, err
}
defer tx.Rollback(ctx)
commandTag, err := tx.Exec(ctx, `
UPDATE supply_intelligence_model_candidates
SET status=$2, reason_code=$3, updated_at=$4, version=$5
WHERE candidate_id=$1 AND status=$6`,
input.Candidate.CandidateID,
string(input.Candidate.Status),
input.Candidate.ReasonCode,
input.Candidate.UpdatedAt,
input.Candidate.Version,
string(domain.DiscoveryCandidateStatusTestPassed),
)
if err != nil {
return publish.PublishPackageAtomicResult{}, err
}
if commandTag.RowsAffected() == 0 {
currentCandidate, ok := r.GetDiscoveryCandidateByIDContext(ctx, input.Candidate.CandidateID)
if ok && currentCandidate.Status == domain.DiscoveryCandidateStatusPublished {
return publish.PublishPackageAtomicResult{}, publish.ErrPackageAlreadyPublished
}
return publish.PublishPackageAtomicResult{}, publish.ErrCandidateNotPublishable
}
commandTag, err = tx.Exec(ctx, `
INSERT INTO supply_intelligence_supply_packages
(package_id, platform, model, status, source, created_at, updated_at, version)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
ON CONFLICT (platform, model) DO UPDATE SET
package_id=EXCLUDED.package_id,
status=EXCLUDED.status,
source=EXCLUDED.source,
created_at=EXCLUDED.created_at,
updated_at=EXCLUDED.updated_at,
version=EXCLUDED.version
WHERE supply_intelligence_supply_packages.status='draft'`,
input.Package.PackageID,
input.Package.Platform,
input.Package.Model,
input.Package.Status,
input.Package.Source,
input.Package.CreatedAt,
input.Package.UpdatedAt,
input.Package.Version,
)
if err != nil {
return publish.PublishPackageAtomicResult{}, err
}
if commandTag.RowsAffected() == 0 {
return publish.PublishPackageAtomicResult{}, publish.ErrPackageAlreadyPublished
}
if err := insertPackageEvent(ctx, tx, input.Event); err != nil {
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "23505" {
return publish.PublishPackageAtomicResult{}, publish.ErrDuplicatePublishRequest
}
return publish.PublishPackageAtomicResult{}, err
}
if err := tx.Commit(ctx); err != nil {
return publish.PublishPackageAtomicResult{}, err
}
return publish.PublishPackageAtomicResult{Candidate: input.Candidate, Package: input.Package, Event: input.Event}, nil
}
func insertPackageEvent(ctx context.Context, execer dbtx, evt domain.PackageChangeEvent) error {
query := `
INSERT INTO supply_intelligence_package_change_events
(event_id, account_id, event_type, package_id, platform, model, occurred_at, version, ack_status)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,'pending')`
_, err := execer.Exec(ctx, query,
evt.EventID, evt.AccountID, evt.EventType, evt.PackageID,
evt.Platform, evt.Model, evt.OccurredAt, evt.Version,
)
return err
}