package repository import ( "context" "errors" "sort" "sync" "time" "supply-intelligence/internal/domain" "supply-intelligence/internal/publish" ) var ( ErrNotFound = errors.New("row not found") ErrDuplicateEventID = errors.New("duplicate event id") ) func IsGatewayAckResult(result domain.GatewayAckResult) bool { return result == domain.GatewayAckResultApplied || result == domain.GatewayAckResultFailed } // MemoryRepository implements Repository using in-memory maps. // NOT thread-safe for production use; use for testing and local development. type MemoryRepository struct { mu sync.RWMutex routingStates map[int64]domain.AccountRoutingState supplyAccounts map[int64]domain.SupplyAccount packageEvents map[string]domain.PackageChangeEvent appliedSnapshot map[string]domain.GatewayAppliedSnapshot discoveryCandidates map[string]domain.DiscoveryCandidate supplyPackages map[string]domain.SupplyPackage admissionTestLogs []domain.AdmissionTestLog now func() time.Time } func NewMemoryRepository() *MemoryRepository { return &MemoryRepository{ routingStates: map[int64]domain.AccountRoutingState{}, supplyAccounts: map[int64]domain.SupplyAccount{}, packageEvents: map[string]domain.PackageChangeEvent{}, appliedSnapshot: map[string]domain.GatewayAppliedSnapshot{}, admissionTestLogs: make([]domain.AdmissionTestLog, 0), discoveryCandidates: map[string]domain.DiscoveryCandidate{}, supplyPackages: map[string]domain.SupplyPackage{}, now: func() time.Time { return time.Now().UTC() }, } } var _ Repository = (*MemoryRepository)(nil) func (r *MemoryRepository) UpsertRoutingState(ctx context.Context, state domain.AccountRoutingState) { r.mu.Lock() defer r.mu.Unlock() if existing, ok := r.routingStates[state.AccountID]; ok { state.Version = existing.Version + 1 state.LastProbeAt = existing.LastProbeAt } else { state.Version = 1 } r.routingStates[state.AccountID] = state _ = ctx } func (r *MemoryRepository) GetRoutingState(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) { r.mu.RLock() defer r.mu.RUnlock() s, ok := r.routingStates[accountID] return s, ok } func (r *MemoryRepository) ListRoutingStatesByPlatform(ctx context.Context, platform string) []domain.AccountRoutingState { r.mu.RLock() defer r.mu.RUnlock() var result []domain.AccountRoutingState for _, s := range r.routingStates { if platform == "" || s.Platform == platform { result = append(result, s) } } return result } func (r *MemoryRepository) AppendPackageEventContext(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) { r.mu.Lock() defer r.mu.Unlock() if _, exists := r.packageEvents[evt.EventID]; exists { return domain.PackageChangeEvent{}, publish.ErrDuplicatePublishRequest } if evt.Version == 0 { evt.Version = 1 } if evt.GatewaySyncStatus == "" { evt.GatewaySyncStatus = domain.GatewaySyncStatusPending } r.packageEvents[evt.EventID] = evt _ = ctx return evt, nil } func (r *MemoryRepository) AppendPackageEvent(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) { return r.AppendPackageEventContext(ctx, evt) } func (r *MemoryRepository) ListPackageEvents(ctx context.Context) []domain.PackageChangeEvent { r.mu.RLock() defer r.mu.RUnlock() events := make([]domain.PackageChangeEvent, 0, len(r.packageEvents)) for _, e := range r.packageEvents { events = append(events, e) } return events } func (r *MemoryRepository) GetPackageEventByID(ctx context.Context, eventID string) (domain.PackageChangeEvent, bool) { r.mu.RLock() defer r.mu.RUnlock() evt, ok := r.packageEvents[eventID] _ = ctx return evt, ok } func (r *MemoryRepository) GetLatestPackageEvent(ctx context.Context, platform, model string) (domain.PackageChangeEvent, bool) { r.mu.RLock() defer r.mu.RUnlock() var ( found bool best domain.PackageChangeEvent ) for _, evt := range r.packageEvents { if evt.Platform != platform || evt.Model != model { continue } if !found || evt.OccurredAt.After(best.OccurredAt) || (evt.OccurredAt.Equal(best.OccurredAt) && evt.EventID > best.EventID) { best = evt found = true } } return best, found } func (r *MemoryRepository) ListPackageEventsAfter(ctx context.Context, cursor string) ([]domain.PackageChangeEvent, string) { r.mu.RLock() defer r.mu.RUnlock() items := make([]domain.PackageChangeEvent, 0, len(r.packageEvents)) for _, evt := range r.packageEvents { items = append(items, evt) } sort.Slice(items, func(i, j int) bool { if items[i].OccurredAt.Equal(items[j].OccurredAt) { return items[i].EventID < items[j].EventID } return items[i].OccurredAt.Before(items[j].OccurredAt) }) const pageSize = 50 result := make([]domain.PackageChangeEvent, 0, pageSize) found := cursor == "" hasMore := false for _, item := range items { if !found { if item.EventID == cursor { found = true } continue } result = append(result, item) if len(result) >= pageSize { hasMore = true break } } next := "" if hasMore && len(result) > 0 { next = result[len(result)-1].EventID } _ = ctx return result, next } func (r *MemoryRepository) ListRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time, limit int) []domain.PackageChangeEvent { r.mu.RLock() defer r.mu.RUnlock() items := make([]domain.PackageChangeEvent, 0) for _, evt := range r.packageEvents { if evt.GatewaySyncStatus != domain.GatewaySyncStatusPending || evt.NextRetryAt == nil || evt.NextRetryAt.After(now) { continue } items = append(items, evt) } sort.Slice(items, func(i, j int) bool { if items[i].NextRetryAt != nil && items[j].NextRetryAt != nil && items[i].NextRetryAt.Equal(*items[j].NextRetryAt) { return items[i].EventID < items[j].EventID } if items[i].NextRetryAt == nil { return false } if items[j].NextRetryAt == nil { return true } return items[i].NextRetryAt.Before(*items[j].NextRetryAt) }) if limit > 0 && len(items) > limit { items = items[:limit] } _ = ctx _ = consumer return items } func (r *MemoryRepository) CountPackageEventsBySyncStatus(ctx context.Context, status domain.GatewaySyncStatus) int { r.mu.RLock() defer r.mu.RUnlock() count := 0 for _, evt := range r.packageEvents { if evt.GatewaySyncStatus == status { count++ } } _ = ctx return count } func (r *MemoryRepository) CountRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time) int { r.mu.RLock() defer r.mu.RUnlock() count := 0 for _, evt := range r.packageEvents { if evt.GatewaySyncStatus == domain.GatewaySyncStatusPending && evt.NextRetryAt != nil && !evt.NextRetryAt.After(now) { count++ } } _ = ctx _ = consumer return count } func (r *MemoryRepository) AckPackageEvent(ctx context.Context, eventID, consumer string, result domain.GatewayAckResult, detail string, ackedAt time.Time) (domain.PackageChangeEvent, error) { r.mu.Lock() defer r.mu.Unlock() evt, ok := r.packageEvents[eventID] if !ok { return domain.PackageChangeEvent{}, ErrEventNotFound } evt.Consumer = consumer evt.ConsumerDetail = detail evt.AckedAt = &ackedAt evt.GatewaySyncStatus = result.SyncStatus() evt.Version++ if result == domain.GatewayAckResultFailed && evt.LastFailureDetail == "" { evt.LastFailureDetail = detail } if result != domain.GatewayAckResultPending { evt.NextRetryAt = nil } r.packageEvents[eventID] = evt _ = ctx return evt, nil } func (r *MemoryRepository) MarkPackageEventRetry(ctx context.Context, eventID string, retryCount int, nextRetryAt time.Time, category domain.GatewayFailureCategory, detail string, retriedAt time.Time) (domain.PackageChangeEvent, error) { r.mu.Lock() defer r.mu.Unlock() evt, ok := r.packageEvents[eventID] if !ok { return domain.PackageChangeEvent{}, ErrEventNotFound } evt.RetryCount = retryCount evt.LastRetryAt = &retriedAt evt.NextRetryAt = &nextRetryAt evt.LastFailureCategory = category evt.LastFailureDetail = detail evt.ConsumerDetail = detail evt.Version++ r.packageEvents[eventID] = evt _ = ctx return evt, nil } func (r *MemoryRepository) UpsertGatewayAppliedSnapshot(ctx context.Context, snapshot domain.GatewayAppliedSnapshot) domain.GatewayAppliedSnapshot { r.mu.Lock() defer r.mu.Unlock() snapshot.UpdatedAt = time.Now().UTC() r.appliedSnapshot[snapshot.Consumer] = snapshot _ = ctx return snapshot } func (r *MemoryRepository) GetGatewayAppliedSnapshot(ctx context.Context, consumer string) (domain.GatewayAppliedSnapshot, bool) { r.mu.RLock() defer r.mu.RUnlock() s, ok := r.appliedSnapshot[consumer] return s, ok } func (r *MemoryRepository) GetDiscoveryCandidateByID(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) { r.mu.RLock() defer r.mu.RUnlock() c, ok := r.discoveryCandidates[candidateID] return c, ok } func (r *MemoryRepository) FindDiscoveryCandidate(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) { r.mu.RLock() defer r.mu.RUnlock() for _, c := range r.discoveryCandidates { if c.AccountID == accountID && c.Platform == platform && c.Model == model { return c, true } } return domain.DiscoveryCandidate{}, false } func (r *MemoryRepository) GetLatestDiscoveryCandidate(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) { r.mu.RLock() defer r.mu.RUnlock() var ( found bool best domain.DiscoveryCandidate ) for _, c := range r.discoveryCandidates { if c.Platform != platform || c.Model != model { continue } if !found || c.UpdatedAt.After(best.UpdatedAt) || (c.UpdatedAt.Equal(best.UpdatedAt) && c.CandidateID > best.CandidateID) { best = c found = true } } return best, found } func (r *MemoryRepository) UpsertDiscoveryCandidate(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate { r.mu.Lock() defer r.mu.Unlock() now := time.Now().UTC() candidate.UpdatedAt = now if existing, ok := r.discoveryCandidates[candidate.CandidateID]; ok { candidate.Version = existing.Version + 1 } else { candidate.Version = 1 if candidate.DiscoveredAt.IsZero() { candidate.DiscoveredAt = now } } r.discoveryCandidates[candidate.CandidateID] = candidate return candidate } func (r *MemoryRepository) ListDiscoveryCandidates(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate { r.mu.RLock() defer r.mu.RUnlock() items := make([]domain.DiscoveryCandidate, 0, len(r.discoveryCandidates)) for _, c := range r.discoveryCandidates { if status != "" && c.Status != status { continue } items = append(items, c) } sort.Slice(items, func(i, j int) bool { if items[i].DiscoveredAt.Equal(items[j].DiscoveredAt) { return items[i].CandidateID < items[j].CandidateID } return items[i].DiscoveredAt.Before(items[j].DiscoveredAt) }) return items } func (r *MemoryRepository) UpdateCandidateStatus(ctx context.Context, candidateID string, status domain.DiscoveryCandidateStatus, failureCode, failureSummary string) error { r.mu.Lock() defer r.mu.Unlock() c, ok := r.discoveryCandidates[candidateID] if !ok { return errors.New("candidate not found") } c.Status = status c.ReasonCode = failureCode c.UpdatedAt = time.Now().UTC() c.Version++ r.discoveryCandidates[candidateID] = c _ = ctx return nil } func (r *MemoryRepository) UpsertSupplyPackage(ctx context.Context, pkg domain.SupplyPackage) error { r.mu.Lock() defer r.mu.Unlock() now := time.Now().UTC() key := pkg.Platform + "_" + pkg.Model if existing, ok := r.supplyPackages[key]; ok { pkg.PackageID = existing.PackageID pkg.Version = existing.Version + 1 pkg.CreatedAt = existing.CreatedAt } else { pkg.Version = 1 if pkg.CreatedAt.IsZero() { pkg.CreatedAt = now } } pkg.UpdatedAt = now r.supplyPackages[key] = pkg _ = ctx return nil } func (r *MemoryRepository) GetSupplyPackage(ctx context.Context, platform, model string) (domain.SupplyPackage, bool) { r.mu.RLock() defer r.mu.RUnlock() key := platform + "_" + model pkg, ok := r.supplyPackages[key] return pkg, ok } func (r *MemoryRepository) ListSupplyPackages(ctx context.Context, status string) []domain.SupplyPackage { r.mu.RLock() defer r.mu.RUnlock() items := make([]domain.SupplyPackage, 0, len(r.supplyPackages)) for _, pkg := range r.supplyPackages { if status != "" && pkg.Status != status { continue } items = append(items, pkg) } sort.Slice(items, func(i, j int) bool { if items[i].UpdatedAt.Equal(items[j].UpdatedAt) { if items[i].Platform == items[j].Platform { return items[i].Model < items[j].Model } return items[i].Platform < items[j].Platform } return items[i].UpdatedAt.Before(items[j].UpdatedAt) }) return items } func (r *MemoryRepository) AppendProbeExecutionLog(ctx context.Context, log domain.ProbeExecutionLog) error { _ = ctx _ = log return nil } func (r *MemoryRepository) ListProbeExecutionLogs(ctx context.Context, accountID int64, limit int) ([]domain.ProbeExecutionLog, error) { _ = ctx _ = accountID _ = limit return nil, nil } func (r *MemoryRepository) AppendAdmissionTestLog(ctx context.Context, candidateID string, status string, failureCode string, failureSummary string, testedAt time.Time) error { r.mu.Lock() defer r.mu.Unlock() log := domain.AdmissionTestLog{CandidateID: candidateID, Status: status, FailureCode: failureCode, FailureSummary: failureSummary, TestedAt: testedAt, Version: int64(len(r.admissionTestLogs) + 1)} r.admissionTestLogs = append(r.admissionTestLogs, log) _ = ctx return nil } func (r *MemoryRepository) ListAdmissionTestLogsByCandidate(ctx context.Context, candidateID string, limit int) ([]domain.AdmissionTestLog, error) { r.mu.RLock() defer r.mu.RUnlock() items := make([]domain.AdmissionTestLog, 0) for i := len(r.admissionTestLogs) - 1; i >= 0; i-- { if r.admissionTestLogs[i].CandidateID != candidateID { continue } items = append(items, r.admissionTestLogs[i]) if limit > 0 && len(items) >= limit { break } } _ = ctx return items, nil } func (r *MemoryRepository) UpsertSupplyAccount(ctx context.Context, account domain.SupplyAccount) domain.SupplyAccount { r.mu.Lock() defer r.mu.Unlock() if existing, ok := r.supplyAccounts[account.AccountID]; ok { if account.CreatedAt.IsZero() { account.CreatedAt = existing.CreatedAt } } else if account.CreatedAt.IsZero() { account.CreatedAt = time.Now().UTC() } if account.UpdatedAt.IsZero() { account.UpdatedAt = time.Now().UTC() } r.supplyAccounts[account.AccountID] = account _ = ctx return account } func (r *MemoryRepository) GetSupplyAccount(ctx context.Context, accountID int64) (domain.SupplyAccount, bool) { r.mu.RLock() defer r.mu.RUnlock() account, ok := r.supplyAccounts[accountID] _ = ctx return account, ok } func (r *MemoryRepository) ListSupplyAccountsByPlatform(ctx context.Context, platform string) []domain.SupplyAccount { r.mu.RLock() defer r.mu.RUnlock() items := make([]domain.SupplyAccount, 0) for _, account := range r.supplyAccounts { if platform == "" || account.Platform == platform { items = append(items, account) } } _ = ctx return items } func (r *MemoryRepository) ListSupplyAccounts(ctx context.Context) []domain.SupplyAccount { r.mu.RLock() defer r.mu.RUnlock() items := make([]domain.SupplyAccount, 0, len(r.supplyAccounts)) for _, account := range r.supplyAccounts { items = append(items, account) } _ = ctx return items } func (r *MemoryRepository) ListSupplyAccountsByConsumer(ctx context.Context, consumerTag string) []domain.SupplyAccount { r.mu.RLock() defer r.mu.RUnlock() items := make([]domain.SupplyAccount, 0) for _, account := range r.supplyAccounts { if consumerTag == "" || account.ConsumerTag == consumerTag { items = append(items, account) } } _ = ctx return items } func (r *MemoryRepository) UpsertRoutingStateContext(ctx context.Context, state domain.AccountRoutingState) domain.AccountRoutingState { r.UpsertRoutingState(ctx, state) stored, _ := r.GetRoutingState(ctx, state.AccountID) return stored } func (r *MemoryRepository) GetRoutingStateContext(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) { return r.GetRoutingState(ctx, accountID) } func (r *MemoryRepository) GetDiscoveryCandidateByIDContext(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) { return r.GetDiscoveryCandidateByID(ctx, candidateID) } func (r *MemoryRepository) FindDiscoveryCandidateContext(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) { return r.FindDiscoveryCandidate(ctx, accountID, platform, model) } func (r *MemoryRepository) GetLatestDiscoveryCandidateContext(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) { return r.GetLatestDiscoveryCandidate(ctx, platform, model) } func (r *MemoryRepository) UpsertDiscoveryCandidateContext(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate { return r.UpsertDiscoveryCandidate(ctx, candidate) } func (r *MemoryRepository) ListDiscoveryCandidatesContext(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate { return r.ListDiscoveryCandidates(ctx, status) } func (r *MemoryRepository) ListActiveAccounts(ctx context.Context) []domain.AccountRoutingState { states := r.ListRoutingStatesByPlatform(ctx, "") result := make([]domain.AccountRoutingState, 0, len(states)) for _, state := range states { if state.AccountStatus == domain.AccountStatusActive && state.RoutingEnabled { result = append(result, state) } } return result }