577 lines
17 KiB
Go
577 lines
17 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"supply-intelligence/internal/domain"
|
|
"supply-intelligence/internal/publish"
|
|
)
|
|
|
|
var (
|
|
ErrNotFound = errors.New("row not found")
|
|
ErrDuplicateEventID = errors.New("duplicate event id")
|
|
)
|
|
|
|
func IsGatewayAckResult(result domain.GatewayAckResult) bool {
|
|
return result == domain.GatewayAckResultApplied || result == domain.GatewayAckResultFailed
|
|
}
|
|
|
|
// MemoryRepository implements Repository using in-memory maps.
|
|
// NOT thread-safe for production use; use for testing and local development.
|
|
type MemoryRepository struct {
|
|
mu sync.RWMutex
|
|
routingStates map[int64]domain.AccountRoutingState
|
|
supplyAccounts map[int64]domain.SupplyAccount
|
|
packageEvents map[string]domain.PackageChangeEvent
|
|
appliedSnapshot map[string]domain.GatewayAppliedSnapshot
|
|
discoveryCandidates map[string]domain.DiscoveryCandidate
|
|
supplyPackages map[string]domain.SupplyPackage
|
|
admissionTestLogs []domain.AdmissionTestLog
|
|
now func() time.Time
|
|
}
|
|
|
|
func NewMemoryRepository() *MemoryRepository {
|
|
return &MemoryRepository{
|
|
routingStates: map[int64]domain.AccountRoutingState{},
|
|
supplyAccounts: map[int64]domain.SupplyAccount{},
|
|
packageEvents: map[string]domain.PackageChangeEvent{},
|
|
appliedSnapshot: map[string]domain.GatewayAppliedSnapshot{},
|
|
admissionTestLogs: make([]domain.AdmissionTestLog, 0),
|
|
discoveryCandidates: map[string]domain.DiscoveryCandidate{},
|
|
supplyPackages: map[string]domain.SupplyPackage{},
|
|
now: func() time.Time { return time.Now().UTC() },
|
|
}
|
|
}
|
|
|
|
var _ Repository = (*MemoryRepository)(nil)
|
|
|
|
func (r *MemoryRepository) UpsertRoutingState(ctx context.Context, state domain.AccountRoutingState) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if existing, ok := r.routingStates[state.AccountID]; ok {
|
|
state.Version = existing.Version + 1
|
|
state.LastProbeAt = existing.LastProbeAt
|
|
} else {
|
|
state.Version = 1
|
|
}
|
|
r.routingStates[state.AccountID] = state
|
|
_ = ctx
|
|
}
|
|
|
|
func (r *MemoryRepository) GetRoutingState(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
s, ok := r.routingStates[accountID]
|
|
return s, ok
|
|
}
|
|
|
|
func (r *MemoryRepository) ListRoutingStatesByPlatform(ctx context.Context, platform string) []domain.AccountRoutingState {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
var result []domain.AccountRoutingState
|
|
for _, s := range r.routingStates {
|
|
if platform == "" || s.Platform == platform {
|
|
result = append(result, s)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (r *MemoryRepository) AppendPackageEventContext(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if _, exists := r.packageEvents[evt.EventID]; exists {
|
|
return domain.PackageChangeEvent{}, publish.ErrDuplicatePublishRequest
|
|
}
|
|
if evt.Version == 0 {
|
|
evt.Version = 1
|
|
}
|
|
if evt.GatewaySyncStatus == "" {
|
|
evt.GatewaySyncStatus = domain.GatewaySyncStatusPending
|
|
}
|
|
r.packageEvents[evt.EventID] = evt
|
|
_ = ctx
|
|
return evt, nil
|
|
}
|
|
|
|
func (r *MemoryRepository) AppendPackageEvent(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) {
|
|
return r.AppendPackageEventContext(ctx, evt)
|
|
}
|
|
|
|
func (r *MemoryRepository) ListPackageEvents(ctx context.Context) []domain.PackageChangeEvent {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
events := make([]domain.PackageChangeEvent, 0, len(r.packageEvents))
|
|
for _, e := range r.packageEvents {
|
|
events = append(events, e)
|
|
}
|
|
return events
|
|
}
|
|
|
|
func (r *MemoryRepository) GetPackageEventByID(ctx context.Context, eventID string) (domain.PackageChangeEvent, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
evt, ok := r.packageEvents[eventID]
|
|
_ = ctx
|
|
return evt, ok
|
|
}
|
|
|
|
func (r *MemoryRepository) GetLatestPackageEvent(ctx context.Context, platform, model string) (domain.PackageChangeEvent, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
var (
|
|
found bool
|
|
best domain.PackageChangeEvent
|
|
)
|
|
for _, evt := range r.packageEvents {
|
|
if evt.Platform != platform || evt.Model != model {
|
|
continue
|
|
}
|
|
if !found || evt.OccurredAt.After(best.OccurredAt) || (evt.OccurredAt.Equal(best.OccurredAt) && evt.EventID > best.EventID) {
|
|
best = evt
|
|
found = true
|
|
}
|
|
}
|
|
return best, found
|
|
}
|
|
|
|
func (r *MemoryRepository) ListPackageEventsAfter(ctx context.Context, cursor string) ([]domain.PackageChangeEvent, string) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
items := make([]domain.PackageChangeEvent, 0, len(r.packageEvents))
|
|
for _, evt := range r.packageEvents {
|
|
items = append(items, evt)
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
if items[i].OccurredAt.Equal(items[j].OccurredAt) {
|
|
return items[i].EventID < items[j].EventID
|
|
}
|
|
return items[i].OccurredAt.Before(items[j].OccurredAt)
|
|
})
|
|
const pageSize = 50
|
|
result := make([]domain.PackageChangeEvent, 0, pageSize)
|
|
found := cursor == ""
|
|
hasMore := false
|
|
for _, item := range items {
|
|
if !found {
|
|
if item.EventID == cursor {
|
|
found = true
|
|
}
|
|
continue
|
|
}
|
|
result = append(result, item)
|
|
if len(result) >= pageSize {
|
|
hasMore = true
|
|
break
|
|
}
|
|
}
|
|
next := ""
|
|
if hasMore && len(result) > 0 {
|
|
next = result[len(result)-1].EventID
|
|
}
|
|
_ = ctx
|
|
return result, next
|
|
}
|
|
|
|
func (r *MemoryRepository) ListRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time, limit int) []domain.PackageChangeEvent {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
items := make([]domain.PackageChangeEvent, 0)
|
|
for _, evt := range r.packageEvents {
|
|
if evt.GatewaySyncStatus != domain.GatewaySyncStatusPending || evt.NextRetryAt == nil || evt.NextRetryAt.After(now) {
|
|
continue
|
|
}
|
|
items = append(items, evt)
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
if items[i].NextRetryAt != nil && items[j].NextRetryAt != nil && items[i].NextRetryAt.Equal(*items[j].NextRetryAt) {
|
|
return items[i].EventID < items[j].EventID
|
|
}
|
|
if items[i].NextRetryAt == nil {
|
|
return false
|
|
}
|
|
if items[j].NextRetryAt == nil {
|
|
return true
|
|
}
|
|
return items[i].NextRetryAt.Before(*items[j].NextRetryAt)
|
|
})
|
|
if limit > 0 && len(items) > limit {
|
|
items = items[:limit]
|
|
}
|
|
_ = ctx
|
|
_ = consumer
|
|
return items
|
|
}
|
|
|
|
func (r *MemoryRepository) CountPackageEventsBySyncStatus(ctx context.Context, status domain.GatewaySyncStatus) int {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
count := 0
|
|
for _, evt := range r.packageEvents {
|
|
if evt.GatewaySyncStatus == status {
|
|
count++
|
|
}
|
|
}
|
|
_ = ctx
|
|
return count
|
|
}
|
|
|
|
func (r *MemoryRepository) CountRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time) int {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
count := 0
|
|
for _, evt := range r.packageEvents {
|
|
if evt.GatewaySyncStatus == domain.GatewaySyncStatusPending && evt.NextRetryAt != nil && !evt.NextRetryAt.After(now) {
|
|
count++
|
|
}
|
|
}
|
|
_ = ctx
|
|
_ = consumer
|
|
return count
|
|
}
|
|
|
|
func (r *MemoryRepository) AckPackageEvent(ctx context.Context, eventID, consumer string, result domain.GatewayAckResult, detail string, ackedAt time.Time) (domain.PackageChangeEvent, error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
evt, ok := r.packageEvents[eventID]
|
|
if !ok {
|
|
return domain.PackageChangeEvent{}, ErrEventNotFound
|
|
}
|
|
evt.Consumer = consumer
|
|
evt.ConsumerDetail = detail
|
|
evt.AckedAt = &ackedAt
|
|
evt.GatewaySyncStatus = result.SyncStatus()
|
|
evt.Version++
|
|
if result == domain.GatewayAckResultFailed && evt.LastFailureDetail == "" {
|
|
evt.LastFailureDetail = detail
|
|
}
|
|
if result != domain.GatewayAckResultPending {
|
|
evt.NextRetryAt = nil
|
|
}
|
|
r.packageEvents[eventID] = evt
|
|
_ = ctx
|
|
return evt, nil
|
|
}
|
|
|
|
func (r *MemoryRepository) MarkPackageEventRetry(ctx context.Context, eventID string, retryCount int, nextRetryAt time.Time, category domain.GatewayFailureCategory, detail string, retriedAt time.Time) (domain.PackageChangeEvent, error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
evt, ok := r.packageEvents[eventID]
|
|
if !ok {
|
|
return domain.PackageChangeEvent{}, ErrEventNotFound
|
|
}
|
|
evt.RetryCount = retryCount
|
|
evt.LastRetryAt = &retriedAt
|
|
evt.NextRetryAt = &nextRetryAt
|
|
evt.LastFailureCategory = category
|
|
evt.LastFailureDetail = detail
|
|
evt.ConsumerDetail = detail
|
|
evt.Version++
|
|
r.packageEvents[eventID] = evt
|
|
_ = ctx
|
|
return evt, nil
|
|
}
|
|
|
|
func (r *MemoryRepository) UpsertGatewayAppliedSnapshot(ctx context.Context, snapshot domain.GatewayAppliedSnapshot) domain.GatewayAppliedSnapshot {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
snapshot.UpdatedAt = time.Now().UTC()
|
|
r.appliedSnapshot[snapshot.Consumer] = snapshot
|
|
_ = ctx
|
|
return snapshot
|
|
}
|
|
|
|
func (r *MemoryRepository) GetGatewayAppliedSnapshot(ctx context.Context, consumer string) (domain.GatewayAppliedSnapshot, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
s, ok := r.appliedSnapshot[consumer]
|
|
return s, ok
|
|
}
|
|
|
|
func (r *MemoryRepository) GetDiscoveryCandidateByID(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
c, ok := r.discoveryCandidates[candidateID]
|
|
return c, ok
|
|
}
|
|
|
|
func (r *MemoryRepository) FindDiscoveryCandidate(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
for _, c := range r.discoveryCandidates {
|
|
if c.AccountID == accountID && c.Platform == platform && c.Model == model {
|
|
return c, true
|
|
}
|
|
}
|
|
return domain.DiscoveryCandidate{}, false
|
|
}
|
|
|
|
func (r *MemoryRepository) GetLatestDiscoveryCandidate(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
var (
|
|
found bool
|
|
best domain.DiscoveryCandidate
|
|
)
|
|
for _, c := range r.discoveryCandidates {
|
|
if c.Platform != platform || c.Model != model {
|
|
continue
|
|
}
|
|
if !found || c.UpdatedAt.After(best.UpdatedAt) || (c.UpdatedAt.Equal(best.UpdatedAt) && c.CandidateID > best.CandidateID) {
|
|
best = c
|
|
found = true
|
|
}
|
|
}
|
|
return best, found
|
|
}
|
|
|
|
func (r *MemoryRepository) UpsertDiscoveryCandidate(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
now := time.Now().UTC()
|
|
candidate.UpdatedAt = now
|
|
if existing, ok := r.discoveryCandidates[candidate.CandidateID]; ok {
|
|
candidate.Version = existing.Version + 1
|
|
} else {
|
|
candidate.Version = 1
|
|
if candidate.DiscoveredAt.IsZero() {
|
|
candidate.DiscoveredAt = now
|
|
}
|
|
}
|
|
r.discoveryCandidates[candidate.CandidateID] = candidate
|
|
return candidate
|
|
}
|
|
|
|
func (r *MemoryRepository) ListDiscoveryCandidates(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
items := make([]domain.DiscoveryCandidate, 0, len(r.discoveryCandidates))
|
|
for _, c := range r.discoveryCandidates {
|
|
if status != "" && c.Status != status {
|
|
continue
|
|
}
|
|
items = append(items, c)
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
if items[i].DiscoveredAt.Equal(items[j].DiscoveredAt) {
|
|
return items[i].CandidateID < items[j].CandidateID
|
|
}
|
|
return items[i].DiscoveredAt.Before(items[j].DiscoveredAt)
|
|
})
|
|
return items
|
|
}
|
|
|
|
func (r *MemoryRepository) UpdateCandidateStatus(ctx context.Context, candidateID string, status domain.DiscoveryCandidateStatus, failureCode, failureSummary string) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
c, ok := r.discoveryCandidates[candidateID]
|
|
if !ok {
|
|
return errors.New("candidate not found")
|
|
}
|
|
c.Status = status
|
|
c.ReasonCode = failureCode
|
|
c.UpdatedAt = time.Now().UTC()
|
|
c.Version++
|
|
r.discoveryCandidates[candidateID] = c
|
|
_ = ctx
|
|
return nil
|
|
}
|
|
|
|
func (r *MemoryRepository) UpsertSupplyPackage(ctx context.Context, pkg domain.SupplyPackage) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
now := time.Now().UTC()
|
|
key := pkg.Platform + "_" + pkg.Model
|
|
if existing, ok := r.supplyPackages[key]; ok {
|
|
pkg.PackageID = existing.PackageID
|
|
pkg.Version = existing.Version + 1
|
|
pkg.CreatedAt = existing.CreatedAt
|
|
} else {
|
|
pkg.Version = 1
|
|
if pkg.CreatedAt.IsZero() {
|
|
pkg.CreatedAt = now
|
|
}
|
|
}
|
|
pkg.UpdatedAt = now
|
|
r.supplyPackages[key] = pkg
|
|
_ = ctx
|
|
return nil
|
|
}
|
|
|
|
func (r *MemoryRepository) GetSupplyPackage(ctx context.Context, platform, model string) (domain.SupplyPackage, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
key := platform + "_" + model
|
|
pkg, ok := r.supplyPackages[key]
|
|
return pkg, ok
|
|
}
|
|
|
|
func (r *MemoryRepository) ListSupplyPackages(ctx context.Context, status string) []domain.SupplyPackage {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
items := make([]domain.SupplyPackage, 0, len(r.supplyPackages))
|
|
for _, pkg := range r.supplyPackages {
|
|
if status != "" && pkg.Status != status {
|
|
continue
|
|
}
|
|
items = append(items, pkg)
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
if items[i].UpdatedAt.Equal(items[j].UpdatedAt) {
|
|
if items[i].Platform == items[j].Platform {
|
|
return items[i].Model < items[j].Model
|
|
}
|
|
return items[i].Platform < items[j].Platform
|
|
}
|
|
return items[i].UpdatedAt.Before(items[j].UpdatedAt)
|
|
})
|
|
return items
|
|
}
|
|
|
|
func (r *MemoryRepository) AppendProbeExecutionLog(ctx context.Context, log domain.ProbeExecutionLog) error {
|
|
_ = ctx
|
|
_ = log
|
|
return nil
|
|
}
|
|
|
|
func (r *MemoryRepository) ListProbeExecutionLogs(ctx context.Context, accountID int64, limit int) ([]domain.ProbeExecutionLog, error) {
|
|
_ = ctx
|
|
_ = accountID
|
|
_ = limit
|
|
return nil, nil
|
|
}
|
|
|
|
func (r *MemoryRepository) AppendAdmissionTestLog(ctx context.Context, candidateID string, status string, failureCode string, failureSummary string, testedAt time.Time) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
log := domain.AdmissionTestLog{CandidateID: candidateID, Status: status, FailureCode: failureCode, FailureSummary: failureSummary, TestedAt: testedAt, Version: int64(len(r.admissionTestLogs) + 1)}
|
|
r.admissionTestLogs = append(r.admissionTestLogs, log)
|
|
_ = ctx
|
|
return nil
|
|
}
|
|
|
|
func (r *MemoryRepository) ListAdmissionTestLogsByCandidate(ctx context.Context, candidateID string, limit int) ([]domain.AdmissionTestLog, error) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
items := make([]domain.AdmissionTestLog, 0)
|
|
for i := len(r.admissionTestLogs) - 1; i >= 0; i-- {
|
|
if r.admissionTestLogs[i].CandidateID != candidateID {
|
|
continue
|
|
}
|
|
items = append(items, r.admissionTestLogs[i])
|
|
if limit > 0 && len(items) >= limit {
|
|
break
|
|
}
|
|
}
|
|
_ = ctx
|
|
return items, nil
|
|
}
|
|
|
|
func (r *MemoryRepository) UpsertSupplyAccount(ctx context.Context, account domain.SupplyAccount) domain.SupplyAccount {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if existing, ok := r.supplyAccounts[account.AccountID]; ok {
|
|
if account.CreatedAt.IsZero() {
|
|
account.CreatedAt = existing.CreatedAt
|
|
}
|
|
} else if account.CreatedAt.IsZero() {
|
|
account.CreatedAt = time.Now().UTC()
|
|
}
|
|
if account.UpdatedAt.IsZero() {
|
|
account.UpdatedAt = time.Now().UTC()
|
|
}
|
|
r.supplyAccounts[account.AccountID] = account
|
|
_ = ctx
|
|
return account
|
|
}
|
|
|
|
func (r *MemoryRepository) GetSupplyAccount(ctx context.Context, accountID int64) (domain.SupplyAccount, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
account, ok := r.supplyAccounts[accountID]
|
|
_ = ctx
|
|
return account, ok
|
|
}
|
|
|
|
func (r *MemoryRepository) ListSupplyAccountsByPlatform(ctx context.Context, platform string) []domain.SupplyAccount {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
items := make([]domain.SupplyAccount, 0)
|
|
for _, account := range r.supplyAccounts {
|
|
if platform == "" || account.Platform == platform {
|
|
items = append(items, account)
|
|
}
|
|
}
|
|
_ = ctx
|
|
return items
|
|
}
|
|
|
|
func (r *MemoryRepository) ListSupplyAccounts(ctx context.Context) []domain.SupplyAccount {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
items := make([]domain.SupplyAccount, 0, len(r.supplyAccounts))
|
|
for _, account := range r.supplyAccounts {
|
|
items = append(items, account)
|
|
}
|
|
_ = ctx
|
|
return items
|
|
}
|
|
|
|
func (r *MemoryRepository) ListSupplyAccountsByConsumer(ctx context.Context, consumerTag string) []domain.SupplyAccount {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
items := make([]domain.SupplyAccount, 0)
|
|
for _, account := range r.supplyAccounts {
|
|
if consumerTag == "" || account.ConsumerTag == consumerTag {
|
|
items = append(items, account)
|
|
}
|
|
}
|
|
_ = ctx
|
|
return items
|
|
}
|
|
|
|
func (r *MemoryRepository) UpsertRoutingStateContext(ctx context.Context, state domain.AccountRoutingState) domain.AccountRoutingState {
|
|
r.UpsertRoutingState(ctx, state)
|
|
stored, _ := r.GetRoutingState(ctx, state.AccountID)
|
|
return stored
|
|
}
|
|
|
|
func (r *MemoryRepository) GetRoutingStateContext(ctx context.Context, accountID int64) (domain.AccountRoutingState, bool) {
|
|
return r.GetRoutingState(ctx, accountID)
|
|
}
|
|
|
|
func (r *MemoryRepository) GetDiscoveryCandidateByIDContext(ctx context.Context, candidateID string) (domain.DiscoveryCandidate, bool) {
|
|
return r.GetDiscoveryCandidateByID(ctx, candidateID)
|
|
}
|
|
|
|
func (r *MemoryRepository) FindDiscoveryCandidateContext(ctx context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) {
|
|
return r.FindDiscoveryCandidate(ctx, accountID, platform, model)
|
|
}
|
|
|
|
func (r *MemoryRepository) GetLatestDiscoveryCandidateContext(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool) {
|
|
return r.GetLatestDiscoveryCandidate(ctx, platform, model)
|
|
}
|
|
|
|
func (r *MemoryRepository) UpsertDiscoveryCandidateContext(ctx context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate {
|
|
return r.UpsertDiscoveryCandidate(ctx, candidate)
|
|
}
|
|
|
|
func (r *MemoryRepository) ListDiscoveryCandidatesContext(ctx context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate {
|
|
return r.ListDiscoveryCandidates(ctx, status)
|
|
}
|
|
|
|
func (r *MemoryRepository) ListActiveAccounts(ctx context.Context) []domain.AccountRoutingState {
|
|
states := r.ListRoutingStatesByPlatform(ctx, "")
|
|
result := make([]domain.AccountRoutingState, 0, len(states))
|
|
for _, state := range states {
|
|
if state.AccountStatus == domain.AccountStatusActive && state.RoutingEnabled {
|
|
result = append(result, state)
|
|
}
|
|
}
|
|
return result
|
|
}
|