279 lines
8.3 KiB
Go
279 lines
8.3 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"supply-intelligence/internal/domain"
|
|
)
|
|
|
|
var ErrEventNotFound = errors.New("event not found")
|
|
|
|
func IsGatewayAckResult(result domain.GatewayAckResult) bool {
|
|
return result == domain.GatewayAckResultApplied || result == domain.GatewayAckResultFailed
|
|
}
|
|
|
|
type MemoryRepository struct {
|
|
mu sync.RWMutex
|
|
routingStates map[int64]domain.AccountRoutingState
|
|
packageEvents map[string]domain.PackageChangeEvent
|
|
appliedSnapshot map[string]domain.GatewayAppliedSnapshot
|
|
discoveryCandidates map[string]domain.DiscoveryCandidate
|
|
supplyPackages map[string]domain.SupplyPackage // key: platform+"_"+model
|
|
}
|
|
|
|
func NewMemoryRepository() *MemoryRepository {
|
|
return &MemoryRepository{
|
|
routingStates: map[int64]domain.AccountRoutingState{},
|
|
packageEvents: map[string]domain.PackageChangeEvent{},
|
|
appliedSnapshot: map[string]domain.GatewayAppliedSnapshot{},
|
|
discoveryCandidates: map[string]domain.DiscoveryCandidate{},
|
|
supplyPackages: map[string]domain.SupplyPackage{},
|
|
}
|
|
}
|
|
|
|
func (r *MemoryRepository) UpsertRoutingState(state domain.AccountRoutingState) {
|
|
r.upsertRoutingState(state)
|
|
}
|
|
|
|
func (r *MemoryRepository) UpsertRoutingStateContext(_ context.Context, state domain.AccountRoutingState) domain.AccountRoutingState {
|
|
return r.upsertRoutingState(state)
|
|
}
|
|
|
|
func (r *MemoryRepository) upsertRoutingState(state domain.AccountRoutingState) domain.AccountRoutingState {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.routingStates[state.AccountID] = state
|
|
return state
|
|
}
|
|
|
|
func (r *MemoryRepository) GetRoutingState(accountID int64) (domain.AccountRoutingState, bool) {
|
|
return r.getRoutingState(accountID)
|
|
}
|
|
|
|
func (r *MemoryRepository) GetRoutingStateContext(_ context.Context, accountID int64) (domain.AccountRoutingState, bool) {
|
|
return r.getRoutingState(accountID)
|
|
}
|
|
|
|
func (r *MemoryRepository) getRoutingState(accountID int64) (domain.AccountRoutingState, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
state, ok := r.routingStates[accountID]
|
|
return state, ok
|
|
}
|
|
|
|
func (r *MemoryRepository) AppendPackageEvent(evt domain.PackageChangeEvent) {
|
|
_, _ = r.AppendPackageEventContext(context.Background(), evt)
|
|
}
|
|
|
|
func (r *MemoryRepository) AppendPackageEventContext(_ context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if evt.OccurredAt.IsZero() {
|
|
evt.OccurredAt = time.Now().UTC()
|
|
}
|
|
if evt.GatewaySyncStatus == "" {
|
|
evt.GatewaySyncStatus = domain.GatewaySyncStatusPending
|
|
}
|
|
r.packageEvents[evt.EventID] = evt
|
|
return evt, nil
|
|
}
|
|
|
|
func (r *MemoryRepository) ListPackageEvents() []domain.PackageChangeEvent {
|
|
items, _ := r.ListPackageEventsAfter("")
|
|
return items
|
|
}
|
|
|
|
func (r *MemoryRepository) ListPackageEventsAfter(cursor string) ([]domain.PackageChangeEvent, string) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
items := make([]domain.PackageChangeEvent, 0, len(r.packageEvents))
|
|
for _, evt := range r.packageEvents {
|
|
items = append(items, evt)
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
if items[i].OccurredAt.Equal(items[j].OccurredAt) {
|
|
return items[i].EventID < items[j].EventID
|
|
}
|
|
return items[i].OccurredAt.Before(items[j].OccurredAt)
|
|
})
|
|
if cursor == "" {
|
|
return items, nextCursorFor(items)
|
|
}
|
|
start := 0
|
|
if idx, err := strconv.Atoi(cursor); err == nil {
|
|
if idx < 0 {
|
|
idx = 0
|
|
}
|
|
if idx > len(items) {
|
|
idx = len(items)
|
|
}
|
|
start = idx
|
|
} else {
|
|
for i, evt := range items {
|
|
if evt.EventID == cursor {
|
|
start = i + 1
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if start >= len(items) {
|
|
return []domain.PackageChangeEvent{}, ""
|
|
}
|
|
filtered := append([]domain.PackageChangeEvent(nil), items[start:]...)
|
|
return filtered, nextCursorFor(items)
|
|
}
|
|
|
|
func nextCursorFor(items []domain.PackageChangeEvent) string {
|
|
if len(items) == 0 {
|
|
return ""
|
|
}
|
|
return strconv.Itoa(len(items))
|
|
}
|
|
|
|
func (r *MemoryRepository) AckPackageEvent(eventID, consumer string, result domain.GatewayAckResult, detail string, ackedAt time.Time) (domain.PackageChangeEvent, error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
evt, ok := r.packageEvents[eventID]
|
|
if !ok {
|
|
return domain.PackageChangeEvent{}, ErrEventNotFound
|
|
}
|
|
if ackedAt.IsZero() {
|
|
ackedAt = time.Now().UTC()
|
|
}
|
|
evt.Consumer = consumer
|
|
evt.ConsumerDetail = detail
|
|
evt.GatewaySyncStatus = result.SyncStatus()
|
|
evt.AckedAt = &ackedAt
|
|
r.packageEvents[eventID] = evt
|
|
return evt, nil
|
|
}
|
|
|
|
func (r *MemoryRepository) UpsertGatewayAppliedSnapshot(snapshot domain.GatewayAppliedSnapshot) domain.GatewayAppliedSnapshot {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if snapshot.UpdatedAt.IsZero() {
|
|
snapshot.UpdatedAt = time.Now().UTC()
|
|
}
|
|
r.appliedSnapshot[snapshot.Consumer] = snapshot
|
|
return snapshot
|
|
}
|
|
|
|
func (r *MemoryRepository) GetGatewayAppliedSnapshot(consumer string) (domain.GatewayAppliedSnapshot, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
snapshot, ok := r.appliedSnapshot[consumer]
|
|
return snapshot, ok
|
|
}
|
|
|
|
func (r *MemoryRepository) GetDiscoveryCandidateByIDContext(_ context.Context, candidateID string) (domain.DiscoveryCandidate, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
candidate, ok := r.discoveryCandidates[candidateID]
|
|
return candidate, ok
|
|
}
|
|
|
|
func (r *MemoryRepository) FindDiscoveryCandidateContext(_ context.Context, accountID int64, platform, model string) (domain.DiscoveryCandidate, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
for _, candidate := range r.discoveryCandidates {
|
|
if candidate.AccountID == accountID && candidate.Platform == platform && candidate.Model == model {
|
|
return candidate, true
|
|
}
|
|
}
|
|
return domain.DiscoveryCandidate{}, false
|
|
}
|
|
|
|
func (r *MemoryRepository) UpsertDiscoveryCandidateContext(_ context.Context, candidate domain.DiscoveryCandidate) domain.DiscoveryCandidate {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if candidate.DiscoveredAt.IsZero() {
|
|
candidate.DiscoveredAt = time.Now().UTC()
|
|
}
|
|
if candidate.UpdatedAt.IsZero() {
|
|
candidate.UpdatedAt = candidate.DiscoveredAt
|
|
}
|
|
r.discoveryCandidates[candidate.CandidateID] = candidate
|
|
return candidate
|
|
}
|
|
|
|
func (r *MemoryRepository) ListDiscoveryCandidatesContext(_ context.Context, status domain.DiscoveryCandidateStatus) []domain.DiscoveryCandidate {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
items := make([]domain.DiscoveryCandidate, 0, len(r.discoveryCandidates))
|
|
for _, candidate := range r.discoveryCandidates {
|
|
if status != "" && candidate.Status != status {
|
|
continue
|
|
}
|
|
items = append(items, candidate)
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
if items[i].DiscoveredAt.Equal(items[j].DiscoveredAt) {
|
|
return items[i].CandidateID < items[j].CandidateID
|
|
}
|
|
return items[i].DiscoveredAt.Before(items[j].DiscoveredAt)
|
|
})
|
|
return items
|
|
}
|
|
|
|
// --- SupplyPackage methods ---
|
|
|
|
// UpsertSupplyPackage creates or updates a supply package
|
|
func (r *MemoryRepository) UpsertSupplyPackage(pkg domain.SupplyPackage) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
key := pkg.Platform + "_" + pkg.Model
|
|
if existing, ok := r.supplyPackages[key]; ok {
|
|
pkg.PackageID = existing.PackageID
|
|
pkg.Version = existing.Version + 1
|
|
pkg.CreatedAt = existing.CreatedAt
|
|
}
|
|
if pkg.CreatedAt.IsZero() {
|
|
pkg.CreatedAt = time.Now().UTC()
|
|
}
|
|
pkg.UpdatedAt = time.Now().UTC()
|
|
r.supplyPackages[key] = pkg
|
|
}
|
|
|
|
// GetSupplyPackage retrieves a supply package by platform and model
|
|
func (r *MemoryRepository) GetSupplyPackage(platform, model string) (domain.SupplyPackage, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
key := platform + "_" + model
|
|
pkg, ok := r.supplyPackages[key]
|
|
return pkg, ok
|
|
}
|
|
|
|
// ListSupplyPackages returns all supply packages, optionally filtered by status
|
|
func (r *MemoryRepository) ListSupplyPackages(status string) []domain.SupplyPackage {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
items := make([]domain.SupplyPackage, 0, len(r.supplyPackages))
|
|
for _, pkg := range r.supplyPackages {
|
|
if status == "" || pkg.Status == status {
|
|
items = append(items, pkg)
|
|
}
|
|
}
|
|
return items
|
|
}
|
|
|
|
// UpdateCandidateStatus updates a candidate's status (used by admission service)
|
|
func (r *MemoryRepository) UpdateCandidateStatus(ctx context.Context, candidateID string, status domain.DiscoveryCandidateStatus, failureCode, failureSummary string) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if _, ok := r.discoveryCandidates[candidateID]; !ok {
|
|
return errors.New("candidate not found")
|
|
}
|
|
c := r.discoveryCandidates[candidateID]
|
|
c.Status = status
|
|
c.ReasonCode = failureCode
|
|
c.UpdatedAt = time.Now().UTC()
|
|
c.Version++
|
|
r.discoveryCandidates[candidateID] = c
|
|
return nil
|
|
}
|