Files
2026-05-12 18:49:52 +08:00

577 lines
17 KiB
Go

package repository
import (
"context"
"errors"
"sort"
"sync"
"time"
"supply-intelligence/internal/domain"
"supply-intelligence/internal/publish"
)
var (
ErrNotFound = errors.New("row not found")
ErrDuplicateEventID = errors.New("duplicate event id")
)
func IsGatewayAckResult(result domain.GatewayAckResult) bool {
return result == domain.GatewayAckResultApplied || result == domain.GatewayAckResultFailed
}
// MemoryRepository implements Repository using in-memory maps.
// NOT thread-safe for production use; use for testing and local development.
type MemoryRepository struct {
mu sync.RWMutex
routingStates map[int64]domain.AccountRoutingState
supplyAccounts map[int64]domain.SupplyAccount
packageEvents map[string]domain.PackageChangeEvent
appliedSnapshot map[string]domain.GatewayAppliedSnapshot
discoveryCandidates map[string]domain.DiscoveryCandidate
supplyPackages map[string]domain.SupplyPackage
admissionTestLogs []domain.AdmissionTestLog
now func() time.Time
}
func NewMemoryRepository() *MemoryRepository {
return &MemoryRepository{
routingStates: map[int64]domain.AccountRoutingState{},
supplyAccounts: map[int64]domain.SupplyAccount{},
packageEvents: map[string]domain.PackageChangeEvent{},
appliedSnapshot: map[string]domain.GatewayAppliedSnapshot{},
admissionTestLogs: make([]domain.AdmissionTestLog, 0),
discoveryCandidates: map[string]domain.DiscoveryCandidate{},
supplyPackages: map[string]domain.SupplyPackage{},
now: func() time.Time { return time.Now().UTC() },
}
}
var _ Repository = (*MemoryRepository)(nil)
func (r *MemoryRepository) UpsertRoutingState(ctx context.Context, state domain.AccountRoutingState) {
r.mu.Lock()
defer r.mu.Unlock()
if existing, ok := r.routingStates[state.AccountID]; ok {
state.Version = existing.Version + 1
state.LastProbeAt = existing.LastProbeAt
} else {
state.Version = 1
}
r.routingStates[state.AccountID] = state
_ = ctx
}
func (r *MemoryRepository) GetRoutingState(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
s, ok := r.routingStates[accountID]
return s, ok
}
func (r *MemoryRepository) ListRoutingStatesByPlatform(ctx context.Context, platform string) []domain.AccountRoutingState {
r.mu.RLock()
defer r.mu.RUnlock()
var result []domain.AccountRoutingState
for _, s := range r.routingStates {
if platform == "" || s.Platform == platform {
result = append(result, s)
}
}
return result
}
func (r *MemoryRepository) AppendPackageEventContext(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.packageEvents[evt.EventID]; exists {
return domain.PackageChangeEvent{}, publish.ErrDuplicatePublishRequest
}
if evt.Version == 0 {
evt.Version = 1
}
if evt.GatewaySyncStatus == "" {
evt.GatewaySyncStatus = domain.GatewaySyncStatusPending
}
r.packageEvents[evt.EventID] = evt
_ = ctx
return evt, nil
}
func (r *MemoryRepository) AppendPackageEvent(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) {
return r.AppendPackageEventContext(ctx, evt)
}
func (r *MemoryRepository) ListPackageEvents(ctx context.Context) []domain.PackageChangeEvent {
r.mu.RLock()
defer r.mu.RUnlock()
events := make([]domain.PackageChangeEvent, 0, len(r.packageEvents))
for _, e := range r.packageEvents {
events = append(events, e)
}
return events
}
func (r *MemoryRepository) GetPackageEventByID(ctx context.Context, eventID string) (domain.PackageChangeEvent, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
evt, ok := r.packageEvents[eventID]
_ = ctx
return evt, ok
}
func (r *MemoryRepository) GetLatestPackageEvent(ctx context.Context, platform, model string) (domain.PackageChangeEvent, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
var (
found bool
best domain.PackageChangeEvent
)
for _, evt := range r.packageEvents {
if evt.Platform != platform || evt.Model != model {
continue
}
if !found || evt.OccurredAt.After(best.OccurredAt) || (evt.OccurredAt.Equal(best.OccurredAt) && evt.EventID > best.EventID) {
best = evt
found = true
}
}
return best, found
}
func (r *MemoryRepository) ListPackageEventsAfter(ctx context.Context, cursor string) ([]domain.PackageChangeEvent, string) {
r.mu.RLock()
defer r.mu.RUnlock()
items := make([]domain.PackageChangeEvent, 0, len(r.packageEvents))
for _, evt := range r.packageEvents {
items = append(items, evt)
}
sort.Slice(items, func(i, j int) bool {
if items[i].OccurredAt.Equal(items[j].OccurredAt) {
return items[i].EventID < items[j].EventID
}
return items[i].OccurredAt.Before(items[j].OccurredAt)
})
const pageSize = 50
result := make([]domain.PackageChangeEvent, 0, pageSize)
found := cursor == ""
hasMore := false
for _, item := range items {
if !found {
if item.EventID == cursor {
found = true
}
continue
}
result = append(result, item)
if len(result) >= pageSize {
hasMore = true
break
}
}
next := ""
if hasMore && len(result) > 0 {
next = result[len(result)-1].EventID
}
_ = ctx
return result, next
}
func (r *MemoryRepository) ListRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time, limit int) []domain.PackageChangeEvent {
r.mu.RLock()
defer r.mu.RUnlock()
items := make([]domain.PackageChangeEvent, 0)
for _, evt := range r.packageEvents {
if evt.GatewaySyncStatus != domain.GatewaySyncStatusPending || evt.NextRetryAt == nil || evt.NextRetryAt.After(now) {
continue
}
items = append(items, evt)
}
sort.Slice(items, func(i, j int) bool {
if items[i].NextRetryAt != nil && items[j].NextRetryAt != nil && items[i].NextRetryAt.Equal(*items[j].NextRetryAt) {
return items[i].EventID < items[j].EventID
}
if items[i].NextRetryAt == nil {
return false
}
if items[j].NextRetryAt == nil {
return true
}
return items[i].NextRetryAt.Before(*items[j].NextRetryAt)
})
if limit > 0 && len(items) > limit {
items = items[:limit]
}
_ = ctx
_ = consumer
return items
}
func (r *MemoryRepository) CountPackageEventsBySyncStatus(ctx context.Context, status domain.GatewaySyncStatus) int {
r.mu.RLock()
defer r.mu.RUnlock()
count := 0
for _, evt := range r.packageEvents {
if evt.GatewaySyncStatus == status {
count++
}
}
_ = ctx
return count
}
func (r *MemoryRepository) CountRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time) int {
r.mu.RLock()
defer r.mu.RUnlock()
count := 0
for _, evt := range r.packageEvents {
if evt.GatewaySyncStatus == domain.GatewaySyncStatusPending && evt.NextRetryAt != nil && !evt.NextRetryAt.After(now) {
count++
}
}
_ = ctx
_ = consumer
return count
}
func (r *MemoryRepository) AckPackageEvent(ctx context.Context, eventID, consumer string, result domain.GatewayAckResult, detail string, ackedAt time.Time) (domain.PackageChangeEvent, error) {
r.mu.Lock()
defer r.mu.Unlock()
evt, ok := r.packageEvents[eventID]
if !ok {
return domain.PackageChangeEvent{}, ErrEventNotFound
}
evt.Consumer = consumer
evt.ConsumerDetail = detail
evt.AckedAt = &ackedAt
evt.GatewaySyncStatus = result.SyncStatus()
evt.Version++
if result == domain.GatewayAckResultFailed && evt.LastFailureDetail == "" {
evt.LastFailureDetail = detail
}
if result != domain.GatewayAckResultPending {
evt.NextRetryAt = nil
}
r.packageEvents[eventID] = evt
_ = ctx
return evt, nil
}
func (r *MemoryRepository) MarkPackageEventRetry(ctx context.Context, eventID string, retryCount int, nextRetryAt time.Time, category domain.GatewayFailureCategory, detail string, retriedAt time.Time) (domain.PackageChangeEvent, error) {
r.mu.Lock()
defer r.mu.Unlock()
evt, ok := r.packageEvents[eventID]
if !ok {
return domain.PackageChangeEvent{}, ErrEventNotFound
}
evt.RetryCount = retryCount
evt.LastRetryAt = &retriedAt
evt.NextRetryAt = &nextRetryAt
evt.LastFailureCategory = category
evt.LastFailureDetail = detail
evt.ConsumerDetail = detail
evt.Version++
r.packageEvents[eventID] = evt
_ = ctx
return evt, nil
}
func (r *MemoryRepository) UpsertGatewayAppliedSnapshot(ctx context.Context, snapshot domain.GatewayAppliedSnapshot) domain.GatewayAppliedSnapshot {
r.mu.Lock()
defer r.mu.Unlock()
snapshot.UpdatedAt = time.Now().UTC()
r.appliedSnapshot[snapshot.Consumer] = snapshot
_ = ctx
return snapshot
}
func (r *MemoryRepository) GetGatewayAppliedSnapshot(ctx context.Context, consumer string) (domain.GatewayAppliedSnapshot, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
s, ok := r.appliedSnapshot[consumer]
return s, ok
}
func (r *MemoryRepository) GetDiscoveryCandidateByID(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
c, ok := r.discoveryCandidates[candidateID]
return c, ok
}
func (r *MemoryRepository) FindDiscoveryCandidate(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
for _, c := range r.discoveryCandidates {
if c.AccountID == accountID && c.Platform == platform && c.Model == model {
return c, true
}
}
return domain.DiscoveryCandidate{}, false
}
func (r *MemoryRepository) GetLatestDiscoveryCandidate(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
var (
found bool
best domain.DiscoveryCandidate
)
for _, c := range r.discoveryCandidates {
if c.Platform != platform || c.Model != model {
continue
}
if !found || c.UpdatedAt.After(best.UpdatedAt) || (c.UpdatedAt.Equal(best.UpdatedAt) && c.CandidateID > best.CandidateID) {
best = c
found = true
}
}
return best, found
}
func (r *MemoryRepository) UpsertDiscoveryCandidate(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now().UTC()
candidate.UpdatedAt = now
if existing, ok := r.discoveryCandidates[candidate.CandidateID]; ok {
candidate.Version = existing.Version + 1
} else {
candidate.Version = 1
if candidate.DiscoveredAt.IsZero() {
candidate.DiscoveredAt = now
}
}
r.discoveryCandidates[candidate.CandidateID] = candidate
return candidate
}
func (r *MemoryRepository) ListDiscoveryCandidates(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate {
r.mu.RLock()
defer r.mu.RUnlock()
items := make([]domain.DiscoveryCandidate, 0, len(r.discoveryCandidates))
for _, c := range r.discoveryCandidates {
if status != "" && c.Status != status {
continue
}
items = append(items, c)
}
sort.Slice(items, func(i, j int) bool {
if items[i].DiscoveredAt.Equal(items[j].DiscoveredAt) {
return items[i].CandidateID < items[j].CandidateID
}
return items[i].DiscoveredAt.Before(items[j].DiscoveredAt)
})
return items
}
func (r *MemoryRepository) UpdateCandidateStatus(ctx context.Context, candidateID string, status domain.DiscoveryCandidateStatus, failureCode, failureSummary string) error {
r.mu.Lock()
defer r.mu.Unlock()
c, ok := r.discoveryCandidates[candidateID]
if !ok {
return errors.New("candidate not found")
}
c.Status = status
c.ReasonCode = failureCode
c.UpdatedAt = time.Now().UTC()
c.Version++
r.discoveryCandidates[candidateID] = c
_ = ctx
return nil
}
func (r *MemoryRepository) UpsertSupplyPackage(ctx context.Context, pkg domain.SupplyPackage) error {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now().UTC()
key := pkg.Platform + "_" + pkg.Model
if existing, ok := r.supplyPackages[key]; ok {
pkg.PackageID = existing.PackageID
pkg.Version = existing.Version + 1
pkg.CreatedAt = existing.CreatedAt
} else {
pkg.Version = 1
if pkg.CreatedAt.IsZero() {
pkg.CreatedAt = now
}
}
pkg.UpdatedAt = now
r.supplyPackages[key] = pkg
_ = ctx
return nil
}
func (r *MemoryRepository) GetSupplyPackage(ctx context.Context, platform, model string) (domain.SupplyPackage, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
key := platform + "_" + model
pkg, ok := r.supplyPackages[key]
return pkg, ok
}
func (r *MemoryRepository) ListSupplyPackages(ctx context.Context, status string) []domain.SupplyPackage {
r.mu.RLock()
defer r.mu.RUnlock()
items := make([]domain.SupplyPackage, 0, len(r.supplyPackages))
for _, pkg := range r.supplyPackages {
if status != "" && pkg.Status != status {
continue
}
items = append(items, pkg)
}
sort.Slice(items, func(i, j int) bool {
if items[i].UpdatedAt.Equal(items[j].UpdatedAt) {
if items[i].Platform == items[j].Platform {
return items[i].Model < items[j].Model
}
return items[i].Platform < items[j].Platform
}
return items[i].UpdatedAt.Before(items[j].UpdatedAt)
})
return items
}
func (r *MemoryRepository) AppendProbeExecutionLog(ctx context.Context, log domain.ProbeExecutionLog) error {
_ = ctx
_ = log
return nil
}
func (r *MemoryRepository) ListProbeExecutionLogs(ctx context.Context, accountID int64, limit int) ([]domain.ProbeExecutionLog, error) {
_ = ctx
_ = accountID
_ = limit
return nil, nil
}
func (r *MemoryRepository) AppendAdmissionTestLog(ctx context.Context, candidateID string, status string, failureCode string, failureSummary string, testedAt time.Time) error {
r.mu.Lock()
defer r.mu.Unlock()
log := domain.AdmissionTestLog{CandidateID: candidateID, Status: status, FailureCode: failureCode, FailureSummary: failureSummary, TestedAt: testedAt, Version: int64(len(r.admissionTestLogs) + 1)}
r.admissionTestLogs = append(r.admissionTestLogs, log)
_ = ctx
return nil
}
func (r *MemoryRepository) ListAdmissionTestLogsByCandidate(ctx context.Context, candidateID string, limit int) ([]domain.AdmissionTestLog, error) {
r.mu.RLock()
defer r.mu.RUnlock()
items := make([]domain.AdmissionTestLog, 0)
for i := len(r.admissionTestLogs) - 1; i >= 0; i-- {
if r.admissionTestLogs[i].CandidateID != candidateID {
continue
}
items = append(items, r.admissionTestLogs[i])
if limit > 0 && len(items) >= limit {
break
}
}
_ = ctx
return items, nil
}
func (r *MemoryRepository) UpsertSupplyAccount(ctx context.Context, account domain.SupplyAccount) domain.SupplyAccount {
r.mu.Lock()
defer r.mu.Unlock()
if existing, ok := r.supplyAccounts[account.AccountID]; ok {
if account.CreatedAt.IsZero() {
account.CreatedAt = existing.CreatedAt
}
} else if account.CreatedAt.IsZero() {
account.CreatedAt = time.Now().UTC()
}
if account.UpdatedAt.IsZero() {
account.UpdatedAt = time.Now().UTC()
}
r.supplyAccounts[account.AccountID] = account
_ = ctx
return account
}
func (r *MemoryRepository) GetSupplyAccount(ctx context.Context, accountID int64) (domain.SupplyAccount, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
account, ok := r.supplyAccounts[accountID]
_ = ctx
return account, ok
}
func (r *MemoryRepository) ListSupplyAccountsByPlatform(ctx context.Context, platform string) []domain.SupplyAccount {
r.mu.RLock()
defer r.mu.RUnlock()
items := make([]domain.SupplyAccount, 0)
for _, account := range r.supplyAccounts {
if platform == "" || account.Platform == platform {
items = append(items, account)
}
}
_ = ctx
return items
}
func (r *MemoryRepository) ListSupplyAccounts(ctx context.Context) []domain.SupplyAccount {
r.mu.RLock()
defer r.mu.RUnlock()
items := make([]domain.SupplyAccount, 0, len(r.supplyAccounts))
for _, account := range r.supplyAccounts {
items = append(items, account)
}
_ = ctx
return items
}
func (r *MemoryRepository) ListSupplyAccountsByConsumer(ctx context.Context, consumerTag string) []domain.SupplyAccount {
r.mu.RLock()
defer r.mu.RUnlock()
items := make([]domain.SupplyAccount, 0)
for _, account := range r.supplyAccounts {
if consumerTag == "" || account.ConsumerTag == consumerTag {
items = append(items, account)
}
}
_ = ctx
return items
}
func (r *MemoryRepository) UpsertRoutingStateContext(ctx context.Context, state domain.AccountRoutingState) domain.AccountRoutingState {
r.UpsertRoutingState(ctx, state)
stored, _ := r.GetRoutingState(ctx, state.AccountID)
return stored
}
func (r *MemoryRepository) GetRoutingStateContext(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) {
return r.GetRoutingState(ctx, accountID)
}
func (r *MemoryRepository) GetDiscoveryCandidateByIDContext(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) {
return r.GetDiscoveryCandidateByID(ctx, candidateID)
}
func (r *MemoryRepository) FindDiscoveryCandidateContext(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) {
return r.FindDiscoveryCandidate(ctx, accountID, platform, model)
}
func (r *MemoryRepository) GetLatestDiscoveryCandidateContext(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) {
return r.GetLatestDiscoveryCandidate(ctx, platform, model)
}
func (r *MemoryRepository) UpsertDiscoveryCandidateContext(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate {
return r.UpsertDiscoveryCandidate(ctx, candidate)
}
func (r *MemoryRepository) ListDiscoveryCandidatesContext(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate {
return r.ListDiscoveryCandidates(ctx, status)
}
func (r *MemoryRepository) ListActiveAccounts(ctx context.Context) []domain.AccountRoutingState {
states := r.ListRoutingStatesByPlatform(ctx, "")
result := make([]domain.AccountRoutingState, 0, len(states))
for _, state := range states {
if state.AccountStatus == domain.AccountStatusActive && state.RoutingEnabled {
result = append(result, state)
}
}
return result
}