914 lines
35 KiB
Go
914 lines
35 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/jackc/pgconn"
|
|
"github.com/jackc/pgx/v4"
|
|
"github.com/jackc/pgx/v4/pgxpool"
|
|
"supply-intelligence/internal/domain"
|
|
"supply-intelligence/internal/publish"
|
|
)
|
|
|
|
// PostgresRepository implements Repository using pgx.
|
|
type PostgresRepository struct {
|
|
db *pgxpool.Pool
|
|
}
|
|
|
|
// NewPostgresRepository connects to PostgreSQL using the given connection string.
|
|
func NewPostgresRepository(ctx context.Context, connString string) (*PostgresRepository, error) {
|
|
config, err := pgxpool.ParseConfig(connString)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse conn string: %w", err)
|
|
}
|
|
pool, err := pgxpool.ConnectConfig(ctx, config)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connect to postgres: %w", err)
|
|
}
|
|
if err := pool.Ping(ctx); err != nil {
|
|
return nil, fmt.Errorf("ping postgres: %w", err)
|
|
}
|
|
return &PostgresRepository{db: pool}, nil
|
|
}
|
|
|
|
// Close releases the connection pool.
|
|
func (r *PostgresRepository) Close() { r.db.Close() }
|
|
|
|
type dbtx interface {
|
|
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
|
|
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
|
|
}
|
|
|
|
// ─── Routing State ────────────────────────────────────────────────────────────
|
|
|
|
func (r *PostgresRepository) UpsertRoutingState(ctx context.Context, state domain.AccountRoutingState) {
|
|
r.UpsertRoutingStateContext(ctx, state)
|
|
}
|
|
|
|
func (r *PostgresRepository) UpsertRoutingStateContext(ctx context.Context, state domain.AccountRoutingState) domain.AccountRoutingState {
|
|
query := `
|
|
INSERT INTO supply_intelligence_account_routing_states
|
|
(account_id, platform, account_status, routing_enabled, risk_score, reason_code, last_probe_at, version)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,1)
|
|
ON CONFLICT (account_id) DO UPDATE SET
|
|
platform=EXCLUDED.platform,
|
|
account_status=EXCLUDED.account_status,
|
|
routing_enabled=EXCLUDED.routing_enabled,
|
|
risk_score=EXCLUDED.risk_score,
|
|
reason_code=EXCLUDED.reason_code,
|
|
last_probe_at=EXCLUDED.last_probe_at,
|
|
version=supply_intelligence_account_routing_states.version+1`
|
|
_, _ = r.db.Exec(ctx, query,
|
|
state.AccountID, state.Platform,
|
|
state.AccountStatus, state.RoutingEnabled,
|
|
state.RiskScore, state.ReasonCode, state.LastProbeAt,
|
|
)
|
|
return state
|
|
}
|
|
|
|
func (r *PostgresRepository) GetRoutingState(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) {
|
|
return r.GetRoutingStateContext(ctx, accountID)
|
|
}
|
|
|
|
func (r *PostgresRepository) GetRoutingStateContext(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) {
|
|
query := `
|
|
SELECT account_id, platform, account_status, routing_enabled, risk_score, reason_code, last_probe_at, version
|
|
FROM supply_intelligence_account_routing_states WHERE account_id=$1`
|
|
row := r.db.QueryRow(ctx, query, accountID)
|
|
var s domain.AccountRoutingState
|
|
err := row.Scan(&s.AccountID, &s.Platform, &s.AccountStatus, &s.RoutingEnabled, &s.RiskScore, &s.ReasonCode, &s.LastProbeAt, &s.Version)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return domain.AccountRoutingState{}, false
|
|
}
|
|
if err != nil {
|
|
return domain.AccountRoutingState{}, false
|
|
}
|
|
return s, true
|
|
}
|
|
|
|
func (r *PostgresRepository) ListRoutingStatesByPlatform(ctx context.Context, platform string) []domain.AccountRoutingState {
|
|
query := `
|
|
SELECT account_id, platform, account_status, routing_enabled, risk_score, reason_code, last_probe_at, version
|
|
FROM supply_intelligence_account_routing_states WHERE platform=$1`
|
|
rows, err := r.db.Query(ctx, query, platform)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if rows.Err() != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
var result []domain.AccountRoutingState
|
|
for rows.Next() {
|
|
var s domain.AccountRoutingState
|
|
if err := rows.Scan(&s.AccountID, &s.Platform, &s.AccountStatus, &s.RoutingEnabled, &s.RiskScore, &s.ReasonCode, &s.LastProbeAt, &s.Version); err != nil {
|
|
continue
|
|
}
|
|
result = append(result, s)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// ─── Package Change Events ────────────────────────────────────────────────────
|
|
|
|
func (r *PostgresRepository) AppendPackageEvent(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) {
|
|
return r.AppendPackageEventContext(ctx, evt)
|
|
}
|
|
|
|
func (r *PostgresRepository) AppendPackageEventContext(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) {
|
|
if err := insertPackageEvent(ctx, r.db, evt); err != nil {
|
|
return domain.PackageChangeEvent{}, err
|
|
}
|
|
return evt, nil
|
|
}
|
|
|
|
func (r *PostgresRepository) ListPackageEvents(ctx context.Context) []domain.PackageChangeEvent {
|
|
query := `
|
|
SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version,
|
|
COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time,
|
|
retry_count, last_retry_at, next_retry_at,
|
|
COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'')
|
|
FROM supply_intelligence_package_change_events
|
|
ORDER BY occurred_at DESC, event_id`
|
|
rows, err := r.db.Query(ctx, query)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if rows.Err() != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
return scanEvents(rows)
|
|
}
|
|
|
|
func (r *PostgresRepository) GetLatestPackageEvent(ctx context.Context, platform, model string) (domain.PackageChangeEvent, bool) {
|
|
query := `
|
|
SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at,
|
|
version, COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time,
|
|
retry_count, last_retry_at, next_retry_at,
|
|
COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'')
|
|
FROM supply_intelligence_package_change_events
|
|
WHERE platform=$1 AND model=$2
|
|
ORDER BY occurred_at DESC, event_id DESC
|
|
LIMIT 1`
|
|
row := r.db.QueryRow(ctx, query, platform, model)
|
|
var evt domain.PackageChangeEvent
|
|
err := scanEventScanner(row, &evt)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return domain.PackageChangeEvent{}, false
|
|
}
|
|
if err != nil {
|
|
return domain.PackageChangeEvent{}, false
|
|
}
|
|
return evt, true
|
|
}
|
|
|
|
func (r *PostgresRepository) ListPackageEventsAfter(ctx context.Context, cursor string) ([]domain.PackageChangeEvent, string) {
|
|
const pageSize = 50
|
|
var args []interface{}
|
|
var query string
|
|
|
|
if cursor == "" {
|
|
args = append(args, pageSize)
|
|
query = `
|
|
SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version,
|
|
COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time,
|
|
retry_count, last_retry_at, next_retry_at,
|
|
COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'')
|
|
FROM supply_intelligence_package_change_events
|
|
ORDER BY occurred_at DESC, event_id DESC
|
|
LIMIT $1`
|
|
} else {
|
|
args = append(args, cursor, pageSize)
|
|
query = `
|
|
WITH cursor_event AS (
|
|
SELECT occurred_at FROM supply_intelligence_package_change_events WHERE event_id=$1
|
|
)
|
|
SELECT e.event_id, e.account_id, e.event_type, e.package_id, e.platform, e.model, e.occurred_at, e.version,
|
|
COALESCE(e.ack_status,''), COALESCE(e.ack_consumer,''), COALESCE(e.ack_detail,''), e.ack_time,
|
|
e.retry_count, e.last_retry_at, e.next_retry_at,
|
|
COALESCE(e.last_failure_category,''), COALESCE(e.last_failure_detail,'')
|
|
FROM supply_intelligence_package_change_events e
|
|
JOIN cursor_event c ON e.occurred_at < c.occurred_at
|
|
OR (e.occurred_at = c.occurred_at AND e.event_id > $1)
|
|
ORDER BY e.occurred_at DESC, e.event_id DESC
|
|
LIMIT $2`
|
|
}
|
|
|
|
rows, err := r.db.Query(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, ""
|
|
}
|
|
if rows.Err() != nil {
|
|
return nil, ""
|
|
}
|
|
defer rows.Close()
|
|
|
|
var result []domain.PackageChangeEvent
|
|
for rows.Next() {
|
|
var e domain.PackageChangeEvent
|
|
if err := scanEventRow(rows, &e); err != nil {
|
|
continue
|
|
}
|
|
result = append(result, e)
|
|
}
|
|
|
|
// next cursor is last eventID only if there is another page
|
|
next := ""
|
|
if len(result) == pageSize && len(result) > 0 {
|
|
next = result[len(result)-1].EventID
|
|
}
|
|
return result, next
|
|
}
|
|
|
|
func (r *PostgresRepository) AckPackageEvent(ctx context.Context, eventID, consumer string, result domain.GatewayAckResult, detail string, ackedAt time.Time) (domain.PackageChangeEvent, error) {
|
|
query := `
|
|
UPDATE supply_intelligence_package_change_events
|
|
SET ack_status=$2, ack_consumer=$3, ack_detail=$4, ack_time=$5, next_retry_at=NULL
|
|
WHERE event_id=$1`
|
|
commandTag, err := r.db.Exec(ctx, query, eventID, string(result), consumer, detail, ackedAt)
|
|
if err != nil {
|
|
return domain.PackageChangeEvent{}, err
|
|
}
|
|
if commandTag.RowsAffected() == 0 {
|
|
return domain.PackageChangeEvent{}, ErrEventNotFound
|
|
}
|
|
return r.getEventByID(ctx, eventID)
|
|
}
|
|
|
|
func (r *PostgresRepository) getEventByID(ctx context.Context, eventID string) (domain.PackageChangeEvent, error) {
|
|
query := `
|
|
SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version,
|
|
COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time,
|
|
retry_count, last_retry_at, next_retry_at,
|
|
COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'')
|
|
FROM supply_intelligence_package_change_events WHERE event_id=$1`
|
|
row := r.db.QueryRow(ctx, query, eventID)
|
|
var e domain.PackageChangeEvent
|
|
if err := scanEventScanner(row, &e); errors.Is(err, pgx.ErrNoRows) {
|
|
return domain.PackageChangeEvent{}, ErrEventNotFound
|
|
} else if err != nil {
|
|
return domain.PackageChangeEvent{}, err
|
|
}
|
|
return e, nil
|
|
}
|
|
|
|
func (r *PostgresRepository) GetPackageEventByID(ctx context.Context, eventID string) (domain.PackageChangeEvent, bool) {
|
|
evt, err := r.getEventByID(ctx, eventID)
|
|
if errors.Is(err, ErrEventNotFound) {
|
|
return domain.PackageChangeEvent{}, false
|
|
}
|
|
if err != nil {
|
|
return domain.PackageChangeEvent{}, false
|
|
}
|
|
return evt, true
|
|
}
|
|
|
|
// ─── Gateway Snapshot ─────────────────────────────────────────────────────────
|
|
|
|
func (r *PostgresRepository) UpsertGatewayAppliedSnapshot(ctx context.Context, snapshot domain.GatewayAppliedSnapshot) domain.GatewayAppliedSnapshot {
|
|
query := `
|
|
INSERT INTO supply_intelligence_gateway_applied_snapshots
|
|
(consumer, last_event_id, last_package_id, last_platform, last_model,
|
|
last_applied_version, last_result, updated_at)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
|
|
ON CONFLICT (consumer) DO UPDATE SET
|
|
last_event_id=EXCLUDED.last_event_id,
|
|
last_package_id=EXCLUDED.last_package_id,
|
|
last_platform=EXCLUDED.last_platform,
|
|
last_model=EXCLUDED.last_model,
|
|
last_applied_version=EXCLUDED.last_applied_version,
|
|
last_result=EXCLUDED.last_result,
|
|
updated_at=EXCLUDED.updated_at
|
|
RETURNING consumer, last_event_id, last_package_id, last_platform, last_model, last_applied_version, last_result, updated_at`
|
|
var out domain.GatewayAppliedSnapshot
|
|
err := r.db.QueryRow(ctx, query,
|
|
snapshot.Consumer, snapshot.LastEventID, snapshot.LastPackageID,
|
|
snapshot.LastPlatform, snapshot.LastModel, snapshot.LastAppliedVersion,
|
|
snapshot.LastResult, snapshot.UpdatedAt,
|
|
).Scan(&out.Consumer, &out.LastEventID, &out.LastPackageID,
|
|
&out.LastPlatform, &out.LastModel, &out.LastAppliedVersion, &out.LastResult, &out.UpdatedAt)
|
|
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
|
return snapshot
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (r *PostgresRepository) GetGatewayAppliedSnapshot(ctx context.Context, consumer string) (domain.GatewayAppliedSnapshot, bool) {
|
|
query := `
|
|
SELECT consumer, last_event_id, last_package_id, last_platform, last_model,
|
|
last_applied_version, last_result, updated_at
|
|
FROM supply_intelligence_gateway_applied_snapshots WHERE consumer=$1`
|
|
row := r.db.QueryRow(ctx, query, consumer)
|
|
var s domain.GatewayAppliedSnapshot
|
|
err := row.Scan(&s.Consumer, &s.LastEventID, &s.LastPackageID,
|
|
&s.LastPlatform, &s.LastModel, &s.LastAppliedVersion, &s.LastResult, &s.UpdatedAt)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return domain.GatewayAppliedSnapshot{}, false
|
|
}
|
|
if err != nil {
|
|
return domain.GatewayAppliedSnapshot{}, false
|
|
}
|
|
return s, true
|
|
}
|
|
|
|
// ─── Discovery Candidates ─────────────────────────────────────────────────────
|
|
|
|
func (r *PostgresRepository) GetDiscoveryCandidateByID(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) {
|
|
return r.GetDiscoveryCandidateByIDContext(ctx, candidateID)
|
|
}
|
|
|
|
func (r *PostgresRepository) GetDiscoveryCandidateByIDContext(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) {
|
|
query := `
|
|
SELECT candidate_id, account_id, platform, model, status, source, reason_code,
|
|
discovered_at, updated_at, version
|
|
FROM supply_intelligence_model_candidates WHERE candidate_id=$1`
|
|
row := r.db.QueryRow(ctx, query, candidateID)
|
|
var c domain.DiscoveryCandidate
|
|
err := row.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status,
|
|
&c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return domain.DiscoveryCandidate{}, false
|
|
}
|
|
if err != nil {
|
|
return domain.DiscoveryCandidate{}, false
|
|
}
|
|
return c, true
|
|
}
|
|
|
|
func (r *PostgresRepository) FindDiscoveryCandidate(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) {
|
|
return r.FindDiscoveryCandidateContext(ctx, accountID, platform, model)
|
|
}
|
|
|
|
func (r *PostgresRepository) FindDiscoveryCandidateContext(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) {
|
|
query := `
|
|
SELECT candidate_id, account_id, platform, model, status, source, reason_code,
|
|
discovered_at, updated_at, version
|
|
FROM supply_intelligence_model_candidates WHERE account_id=$1 AND platform=$2 AND model=$3`
|
|
row := r.db.QueryRow(ctx, query, accountID, platform, model)
|
|
var c domain.DiscoveryCandidate
|
|
err := row.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status,
|
|
&c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return domain.DiscoveryCandidate{}, false
|
|
}
|
|
if err != nil {
|
|
return domain.DiscoveryCandidate{}, false
|
|
}
|
|
return c, true
|
|
}
|
|
|
|
func (r *PostgresRepository) GetLatestDiscoveryCandidate(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) {
|
|
return r.GetLatestDiscoveryCandidateContext(ctx, platform, model)
|
|
}
|
|
|
|
func (r *PostgresRepository) GetLatestDiscoveryCandidateContext(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) {
|
|
query := `
|
|
SELECT candidate_id, account_id, platform, model, status, source, reason_code,
|
|
discovered_at, updated_at, version
|
|
FROM supply_intelligence_model_candidates
|
|
WHERE platform=$1 AND model=$2
|
|
ORDER BY updated_at DESC, candidate_id DESC
|
|
LIMIT 1`
|
|
row := r.db.QueryRow(ctx, query, platform, model)
|
|
var c domain.DiscoveryCandidate
|
|
err := row.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status,
|
|
&c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return domain.DiscoveryCandidate{}, false
|
|
}
|
|
if err != nil {
|
|
return domain.DiscoveryCandidate{}, false
|
|
}
|
|
return c, true
|
|
}
|
|
|
|
func (r *PostgresRepository) UpsertDiscoveryCandidate(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate {
|
|
return r.UpsertDiscoveryCandidateContext(ctx, candidate)
|
|
}
|
|
|
|
func (r *PostgresRepository) UpsertDiscoveryCandidateContext(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate {
|
|
query := `
|
|
INSERT INTO supply_intelligence_model_candidates
|
|
(candidate_id, account_id, platform, model, status, source, reason_code,
|
|
discovered_at, updated_at, version)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,1)
|
|
ON CONFLICT (platform, model) DO UPDATE SET
|
|
account_id=EXCLUDED.account_id,
|
|
status=EXCLUDED.status,
|
|
source=EXCLUDED.source,
|
|
reason_code=EXCLUDED.reason_code,
|
|
updated_at=EXCLUDED.updated_at,
|
|
version=supply_intelligence_model_candidates.version+1
|
|
RETURNING version`
|
|
var version int64
|
|
err := r.db.QueryRow(ctx, query,
|
|
candidate.CandidateID, candidate.AccountID, candidate.Platform, candidate.Model,
|
|
candidate.Status, candidate.Source, candidate.ReasonCode,
|
|
candidate.DiscoveredAt, candidate.UpdatedAt,
|
|
).Scan(&version)
|
|
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
|
return candidate
|
|
}
|
|
candidate.Version = version
|
|
return candidate
|
|
}
|
|
|
|
func (r *PostgresRepository) ListDiscoveryCandidates(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate {
|
|
return r.ListDiscoveryCandidatesContext(ctx, status)
|
|
}
|
|
|
|
func (r *PostgresRepository) ListDiscoveryCandidatesContext(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate {
|
|
var query string
|
|
var args []interface{}
|
|
if status == "" {
|
|
query = `
|
|
SELECT candidate_id, account_id, platform, model, status, source, reason_code,
|
|
discovered_at, updated_at, version
|
|
FROM supply_intelligence_model_candidates ORDER BY discovered_at DESC`
|
|
} else {
|
|
query = `
|
|
SELECT candidate_id, account_id, platform, model, status, source, reason_code,
|
|
discovered_at, updated_at, version
|
|
FROM supply_intelligence_model_candidates WHERE status=$1 ORDER BY discovered_at DESC`
|
|
args = append(args, string(status))
|
|
}
|
|
rows, err := r.db.Query(ctx, query, args...)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if rows.Err() != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
var result []domain.DiscoveryCandidate
|
|
for rows.Next() {
|
|
var c domain.DiscoveryCandidate
|
|
if err := rows.Scan(&c.CandidateID, &c.AccountID, &c.Platform, &c.Model, &c.Status,
|
|
&c.Source, &c.ReasonCode, &c.DiscoveredAt, &c.UpdatedAt, &c.Version); err != nil {
|
|
continue
|
|
}
|
|
result = append(result, c)
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (r *PostgresRepository) UpdateCandidateStatus(ctx context.Context, candidateID string, status domain.DiscoveryCandidateStatus, failureCode, failureSummary string) error {
|
|
query := `
|
|
UPDATE supply_intelligence_model_candidates
|
|
SET status=$2, reason_code=$3, updated_at=now()
|
|
WHERE candidate_id=$1`
|
|
_, err := r.db.Exec(ctx, query, candidateID, string(status), failureCode)
|
|
return err
|
|
}
|
|
|
|
// ─── Supply Packages ───────────────────────────────────────────────────────────
|
|
|
|
func (r *PostgresRepository) UpsertSupplyPackage(ctx context.Context, pkg domain.SupplyPackage) error {
|
|
query := `
|
|
INSERT INTO supply_intelligence_supply_packages
|
|
(package_id, platform, model, status, source, created_at, updated_at, version)
|
|
VALUES (
|
|
CASE WHEN $1 = 0 THEN nextval('supply_package_id_seq') ELSE $1 END,
|
|
$2,$3,$4,$5,$6,$7,1
|
|
)
|
|
ON CONFLICT (platform, model) DO UPDATE SET
|
|
status=EXCLUDED.status,
|
|
source=EXCLUDED.source,
|
|
updated_at=EXCLUDED.updated_at,
|
|
version=supply_intelligence_supply_packages.version+1
|
|
RETURNING package_id, version`
|
|
var packageID int64
|
|
var version int64
|
|
if err := r.db.QueryRow(ctx, query,
|
|
pkg.PackageID, pkg.Platform, pkg.Model, pkg.Status, pkg.Source,
|
|
pkg.CreatedAt, pkg.UpdatedAt,
|
|
).Scan(&packageID, &version); err != nil {
|
|
return err
|
|
}
|
|
_ = packageID
|
|
_ = version
|
|
return nil
|
|
}
|
|
|
|
func (r *PostgresRepository) GetSupplyPackage(ctx context.Context, platform, model string) (domain.SupplyPackage, bool) {
|
|
query := `
|
|
SELECT package_id, platform, model, status, source, created_at, updated_at, version
|
|
FROM supply_intelligence_supply_packages WHERE platform=$1 AND model=$2`
|
|
row := r.db.QueryRow(ctx, query, platform, model)
|
|
var p domain.SupplyPackage
|
|
err := row.Scan(&p.PackageID, &p.Platform, &p.Model, &p.Status, &p.Source, &p.CreatedAt, &p.UpdatedAt, &p.Version)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return domain.SupplyPackage{}, false
|
|
}
|
|
if err != nil {
|
|
return domain.SupplyPackage{}, false
|
|
}
|
|
return p, true
|
|
}
|
|
|
|
func (r *PostgresRepository) ListSupplyPackages(ctx context.Context, status string) []domain.SupplyPackage {
|
|
var query string
|
|
var args []interface{}
|
|
if status == "" {
|
|
query = `SELECT package_id, platform, model, status, source, created_at, updated_at, version FROM supply_intelligence_supply_packages`
|
|
} else {
|
|
query = `SELECT package_id, platform, model, status, source, created_at, updated_at, version FROM supply_intelligence_supply_packages WHERE status=$1`
|
|
args = append(args, status)
|
|
}
|
|
rows, err := r.db.Query(ctx, query, args...)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if rows.Err() != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
var result []domain.SupplyPackage
|
|
for rows.Next() {
|
|
var p domain.SupplyPackage
|
|
if err := rows.Scan(&p.PackageID, &p.Platform, &p.Model, &p.Status, &p.Source, &p.CreatedAt, &p.UpdatedAt, &p.Version); err != nil {
|
|
continue
|
|
}
|
|
result = append(result, p)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// ─── Probe Execution Logs ──────────────────────────────────────────────────────
|
|
|
|
func (r *PostgresRepository) AppendProbeExecutionLog(ctx context.Context, log domain.ProbeExecutionLog) error {
|
|
query := `
|
|
INSERT INTO supply_intelligence_probe_execution_logs
|
|
(account_id, platform, probe_result, failure_class, http_status, latency_ms,
|
|
risk_score, evaluated_transition, executed_at, request_id, version)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,1)`
|
|
_, err := r.db.Exec(ctx, query,
|
|
log.AccountID, log.Platform, log.ProbeResult, log.FailureClass,
|
|
log.HTTPStatus, log.LatencyMs, log.RiskScore, log.EvaluatedTransition,
|
|
log.ExecutedAt, log.RequestID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (r *PostgresRepository) ListProbeExecutionLogs(ctx context.Context, accountID int64, limit int) ([]domain.ProbeExecutionLog, error) {
|
|
query := `
|
|
SELECT log_id, account_id, platform, probe_result, failure_class, http_status, latency_ms,
|
|
risk_score, evaluated_transition, executed_at, request_id, version
|
|
FROM supply_intelligence_probe_execution_logs
|
|
WHERE account_id=$1
|
|
ORDER BY executed_at DESC LIMIT $2`
|
|
rows, err := r.db.Query(ctx, query, accountID, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rows.Err() != nil {
|
|
return nil, rows.Err()
|
|
}
|
|
defer rows.Close()
|
|
var result []domain.ProbeExecutionLog
|
|
for rows.Next() {
|
|
var l domain.ProbeExecutionLog
|
|
if err := rows.Scan(&l.LogID, &l.AccountID, &l.Platform, &l.ProbeResult,
|
|
&l.FailureClass, &l.HTTPStatus, &l.LatencyMs, &l.RiskScore,
|
|
&l.EvaluatedTransition, &l.ExecutedAt, &l.RequestID, &l.Version); err != nil {
|
|
continue
|
|
}
|
|
result = append(result, l)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
func (r *PostgresRepository) ListRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time, limit int) []domain.PackageChangeEvent {
|
|
query := `
|
|
SELECT event_id, account_id, event_type, package_id, platform, model, occurred_at, version,
|
|
COALESCE(ack_status,''), COALESCE(ack_consumer,''), COALESCE(ack_detail,''), ack_time,
|
|
retry_count, last_retry_at, next_retry_at,
|
|
COALESCE(last_failure_category,''), COALESCE(last_failure_detail,'')
|
|
FROM supply_intelligence_package_change_events
|
|
WHERE ack_status=$1 AND next_retry_at IS NOT NULL AND next_retry_at <= $2
|
|
ORDER BY next_retry_at ASC, occurred_at DESC, event_id DESC`
|
|
rows, err := r.db.Query(ctx, query, string(domain.GatewayAckResultPending), now)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if rows.Err() != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
items := scanEvents(rows)
|
|
if limit > 0 && len(items) > limit {
|
|
items = items[:limit]
|
|
}
|
|
_ = consumer
|
|
return items
|
|
}
|
|
|
|
func (r *PostgresRepository) CountPackageEventsBySyncStatus(ctx context.Context, status domain.GatewaySyncStatus) int {
|
|
query := `SELECT COUNT(*) FROM supply_intelligence_package_change_events WHERE ack_status=$1`
|
|
row := r.db.QueryRow(ctx, query, string(status))
|
|
var count int
|
|
if err := row.Scan(&count); err != nil {
|
|
return 0
|
|
}
|
|
return count
|
|
}
|
|
|
|
func (r *PostgresRepository) CountRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time) int {
|
|
query := `SELECT COUNT(*) FROM supply_intelligence_package_change_events WHERE ack_status=$1 AND next_retry_at IS NOT NULL AND next_retry_at <= $2`
|
|
row := r.db.QueryRow(ctx, query, string(domain.GatewayAckResultPending), now)
|
|
var count int
|
|
if err := row.Scan(&count); err != nil {
|
|
return 0
|
|
}
|
|
_ = consumer
|
|
return count
|
|
}
|
|
|
|
func (r *PostgresRepository) MarkPackageEventRetry(ctx context.Context, eventID string, retryCount int, nextRetryAt time.Time, category domain.GatewayFailureCategory, detail string, retriedAt time.Time) (domain.PackageChangeEvent, error) {
|
|
query := `
|
|
UPDATE supply_intelligence_package_change_events
|
|
SET ack_status=$2, retry_count=$3, last_retry_at=$4, next_retry_at=$5,
|
|
last_failure_category=$6, last_failure_detail=$7, ack_detail=$7
|
|
WHERE event_id=$1`
|
|
commandTag, err := r.db.Exec(ctx, query, eventID, string(domain.GatewayAckResultPending), retryCount, retriedAt, nextRetryAt, string(category), detail)
|
|
if err != nil {
|
|
return domain.PackageChangeEvent{}, err
|
|
}
|
|
if commandTag.RowsAffected() == 0 {
|
|
return domain.PackageChangeEvent{}, ErrEventNotFound
|
|
}
|
|
return r.getEventByID(ctx, eventID)
|
|
}
|
|
|
|
func scanEvents(rows pgx.Rows) []domain.PackageChangeEvent {
|
|
var result []domain.PackageChangeEvent
|
|
for rows.Next() {
|
|
var e domain.PackageChangeEvent
|
|
if err := scanEventRow(rows, &e); err != nil {
|
|
continue
|
|
}
|
|
result = append(result, e)
|
|
}
|
|
return result
|
|
}
|
|
|
|
type eventScanner interface {
|
|
Scan(dest ...interface{}) error
|
|
}
|
|
|
|
func scanEventScanner(scanner eventScanner, e *domain.PackageChangeEvent) error {
|
|
return scanner.Scan(
|
|
&e.EventID, &e.AccountID, &e.EventType, &e.PackageID, &e.Platform, &e.Model,
|
|
&e.OccurredAt, &e.Version,
|
|
&e.GatewaySyncStatus, &e.Consumer, &e.ConsumerDetail, &e.AckedAt,
|
|
&e.RetryCount, &e.LastRetryAt, &e.NextRetryAt,
|
|
&e.LastFailureCategory, &e.LastFailureDetail,
|
|
)
|
|
}
|
|
|
|
func scanEventRow(rows pgx.Rows, e *domain.PackageChangeEvent) error {
|
|
return scanEventScanner(rows, e)
|
|
}
|
|
|
|
// AppendAdmissionTestLog inserts an admission test log entry.
|
|
func (r *PostgresRepository) AppendAdmissionTestLog(ctx context.Context, candidateID string, status string, failureCode string, failureSummary string, testedAt time.Time) error {
|
|
query := `
|
|
INSERT INTO supply_intelligence_admission_test_logs
|
|
(candidate_id, status, failure_code, failure_summary, tested_at, version)
|
|
VALUES ($1,$2,$3,$4,$5,1)`
|
|
_, err := r.db.Exec(ctx, query, candidateID, status, failureCode, failureSummary, testedAt)
|
|
return err
|
|
}
|
|
|
|
// ListAdmissionTestLogsByCandidate returns admission test logs for a candidate.
|
|
func (r *PostgresRepository) ListAdmissionTestLogsByCandidate(ctx context.Context, candidateID string, limit int) ([]domain.AdmissionTestLog, error) {
|
|
query := `
|
|
SELECT test_id, candidate_id, status, failure_code, failure_summary, tested_at, version
|
|
FROM supply_intelligence_admission_test_logs
|
|
WHERE candidate_id=$1
|
|
ORDER BY tested_at DESC LIMIT $2`
|
|
rows, err := r.db.Query(ctx, query, candidateID, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rows.Err() != nil {
|
|
return nil, rows.Err()
|
|
}
|
|
defer rows.Close()
|
|
var result []domain.AdmissionTestLog
|
|
for rows.Next() {
|
|
var l domain.AdmissionTestLog
|
|
if err := rows.Scan(&l.TestID, &l.CandidateID, &l.Status, &l.FailureCode, &l.FailureSummary, &l.TestedAt, &l.Version); err != nil {
|
|
continue
|
|
}
|
|
result = append(result, l)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// ListActiveAccounts returns all accounts with routing enabled.
|
|
func (r *PostgresRepository) ListActiveAccounts(ctx context.Context) []domain.AccountRoutingState {
|
|
query := `
|
|
SELECT account_id, platform, account_status, routing_enabled,
|
|
risk_score, reason_code, last_probe_at, created_at, updated_at, version
|
|
FROM supply_intelligence_account_routing_states
|
|
WHERE routing_enabled = true`
|
|
rows, err := r.db.Query(ctx, query)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if rows.Err() != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
var result []domain.AccountRoutingState
|
|
for rows.Next() {
|
|
var rs domain.AccountRoutingState
|
|
if err := rows.Scan(&rs.AccountID, &rs.Platform, &rs.AccountStatus, &rs.RoutingEnabled,
|
|
&rs.RiskScore, &rs.ReasonCode, &rs.LastProbeAt, &rs.Version); err != nil {
|
|
continue
|
|
}
|
|
result = append(result, rs)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// ─── Supply Accounts ───────────────────────────────────────────────────────────
|
|
|
|
func (r *PostgresRepository) UpsertSupplyAccount(ctx context.Context, account domain.SupplyAccount) domain.SupplyAccount {
|
|
query := `
|
|
INSERT INTO supply_intelligence_supply_accounts (account_id, platform, api_key, consumer_tag, status, created_at, updated_at)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7)
|
|
ON CONFLICT (account_id) DO UPDATE SET
|
|
platform=EXCLUDED.platform,
|
|
api_key=EXCLUDED.api_key,
|
|
consumer_tag=EXCLUDED.consumer_tag,
|
|
status=EXCLUDED.status,
|
|
updated_at=EXCLUDED.updated_at
|
|
RETURNING account_id, platform, api_key, consumer_tag, status, created_at, updated_at`
|
|
var a domain.SupplyAccount
|
|
err := r.db.QueryRow(ctx, query,
|
|
account.AccountID, account.Platform, account.APIKey, account.ConsumerTag,
|
|
account.Status, account.CreatedAt, account.UpdatedAt,
|
|
).Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt)
|
|
if err != nil {
|
|
return account
|
|
}
|
|
return a
|
|
}
|
|
|
|
func (r *PostgresRepository) GetSupplyAccount(ctx context.Context, accountID int64) (domain.SupplyAccount, bool) {
|
|
query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE account_id=$1`
|
|
row := r.db.QueryRow(ctx, query, accountID)
|
|
var a domain.SupplyAccount
|
|
err := row.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return domain.SupplyAccount{}, false
|
|
}
|
|
if err != nil {
|
|
return domain.SupplyAccount{}, false
|
|
}
|
|
return a, true
|
|
}
|
|
|
|
func (r *PostgresRepository) ListSupplyAccountsByPlatform(ctx context.Context, platform string) []domain.SupplyAccount {
|
|
query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE platform=$1 AND status='active'`
|
|
rows, err := r.db.Query(ctx, query, platform)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
var result []domain.SupplyAccount
|
|
for rows.Next() {
|
|
var a domain.SupplyAccount
|
|
if err := rows.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt); err != nil {
|
|
continue
|
|
}
|
|
result = append(result, a)
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (r *PostgresRepository) ListSupplyAccounts(ctx context.Context) []domain.SupplyAccount {
|
|
query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE status='active'`
|
|
rows, err := r.db.Query(ctx, query)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
var result []domain.SupplyAccount
|
|
for rows.Next() {
|
|
var a domain.SupplyAccount
|
|
if err := rows.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt); err != nil {
|
|
continue
|
|
}
|
|
result = append(result, a)
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (r *PostgresRepository) ListSupplyAccountsByConsumer(ctx context.Context, consumerTag string) []domain.SupplyAccount {
|
|
query := `SELECT account_id, platform, api_key, consumer_tag, status, created_at, updated_at FROM supply_intelligence_supply_accounts WHERE consumer_tag=$1 AND status='active'`
|
|
rows, err := r.db.Query(ctx, query, consumerTag)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
var result []domain.SupplyAccount
|
|
for rows.Next() {
|
|
var a domain.SupplyAccount
|
|
if err := rows.Scan(&a.AccountID, &a.Platform, &a.APIKey, &a.ConsumerTag, &a.Status, &a.CreatedAt, &a.UpdatedAt); err != nil {
|
|
continue
|
|
}
|
|
result = append(result, a)
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (r *PostgresRepository) PublishPackageAtomically(ctx context.Context, input publish.PublishPackageAtomicInput) (publish.PublishPackageAtomicResult, error) {
|
|
tx, err := r.db.Begin(ctx)
|
|
if err != nil {
|
|
return publish.PublishPackageAtomicResult{}, err
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
commandTag, err := tx.Exec(ctx, `
|
|
UPDATE supply_intelligence_model_candidates
|
|
SET status=$2, reason_code=$3, updated_at=$4, version=$5
|
|
WHERE candidate_id=$1 AND status=$6`,
|
|
input.Candidate.CandidateID,
|
|
string(input.Candidate.Status),
|
|
input.Candidate.ReasonCode,
|
|
input.Candidate.UpdatedAt,
|
|
input.Candidate.Version,
|
|
string(domain.DiscoveryCandidateStatusTestPassed),
|
|
)
|
|
if err != nil {
|
|
return publish.PublishPackageAtomicResult{}, err
|
|
}
|
|
if commandTag.RowsAffected() == 0 {
|
|
currentCandidate, ok := r.GetDiscoveryCandidateByIDContext(ctx, input.Candidate.CandidateID)
|
|
if ok && currentCandidate.Status == domain.DiscoveryCandidateStatusPublished {
|
|
return publish.PublishPackageAtomicResult{}, publish.ErrPackageAlreadyPublished
|
|
}
|
|
return publish.PublishPackageAtomicResult{}, publish.ErrCandidateNotPublishable
|
|
}
|
|
|
|
commandTag, err = tx.Exec(ctx, `
|
|
INSERT INTO supply_intelligence_supply_packages
|
|
(package_id, platform, model, status, source, created_at, updated_at, version)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
|
|
ON CONFLICT (platform, model) DO UPDATE SET
|
|
package_id=EXCLUDED.package_id,
|
|
status=EXCLUDED.status,
|
|
source=EXCLUDED.source,
|
|
created_at=EXCLUDED.created_at,
|
|
updated_at=EXCLUDED.updated_at,
|
|
version=EXCLUDED.version
|
|
WHERE supply_intelligence_supply_packages.status='draft'`,
|
|
input.Package.PackageID,
|
|
input.Package.Platform,
|
|
input.Package.Model,
|
|
input.Package.Status,
|
|
input.Package.Source,
|
|
input.Package.CreatedAt,
|
|
input.Package.UpdatedAt,
|
|
input.Package.Version,
|
|
)
|
|
if err != nil {
|
|
return publish.PublishPackageAtomicResult{}, err
|
|
}
|
|
if commandTag.RowsAffected() == 0 {
|
|
return publish.PublishPackageAtomicResult{}, publish.ErrPackageAlreadyPublished
|
|
}
|
|
|
|
if err := insertPackageEvent(ctx, tx, input.Event); err != nil {
|
|
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "23505" {
|
|
return publish.PublishPackageAtomicResult{}, publish.ErrDuplicatePublishRequest
|
|
}
|
|
return publish.PublishPackageAtomicResult{}, err
|
|
}
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return publish.PublishPackageAtomicResult{}, err
|
|
}
|
|
return publish.PublishPackageAtomicResult{Candidate: input.Candidate, Package: input.Package, Event: input.Event}, nil
|
|
}
|
|
|
|
func insertPackageEvent(ctx context.Context, execer dbtx, evt domain.PackageChangeEvent) error {
|
|
query := `
|
|
INSERT INTO supply_intelligence_package_change_events
|
|
(event_id, account_id, event_type, package_id, platform, model, occurred_at, version, ack_status)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,'pending')`
|
|
_, err := execer.Exec(ctx, query,
|
|
evt.EventID, evt.AccountID, evt.EventType, evt.PackageID,
|
|
evt.Platform, evt.Model, evt.OccurredAt, evt.Version,
|
|
)
|
|
return err
|
|
}
|