212 lines
6.7 KiB
Go
212 lines
6.7 KiB
Go
package publish
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strings"
|
|
"time"
|
|
|
|
"supply-intelligence/internal/domain"
|
|
)
|
|
|
|
const PackagePublishedEventType = "supply_package_published"
|
|
|
|
var (
|
|
ErrInvalidPublishInput = errors.New("invalid publish input")
|
|
ErrCandidateNotPublishable = errors.New("candidate not publishable")
|
|
ErrPackageNotPublishable = errors.New("package not publishable")
|
|
ErrCandidateOrPackageMissing = errors.New("candidate or package missing")
|
|
ErrDuplicatePublishRequest = errors.New("duplicate publish request")
|
|
ErrPackageAlreadyPublished = errors.New("package already published")
|
|
)
|
|
|
|
type PublishPackageAtomicInput struct {
|
|
Candidate domain.DiscoveryCandidate
|
|
Package domain.SupplyPackage
|
|
Event domain.PackageChangeEvent
|
|
}
|
|
|
|
type PublishPackageAtomicResult struct {
|
|
Candidate domain.DiscoveryCandidate
|
|
Package domain.SupplyPackage
|
|
Event domain.PackageChangeEvent
|
|
}
|
|
|
|
type AtomicPublishRepository interface {
|
|
PublishPackageAtomically(ctx context.Context, input PublishPackageAtomicInput) (PublishPackageAtomicResult, error)
|
|
}
|
|
|
|
type PackageEventRepository interface {
|
|
AppendPackageEventContext(ctx context.Context, evt domain.PackageChangeEvent) (domain.PackageChangeEvent, error)
|
|
GetLatestDiscoveryCandidateContext(ctx context.Context, platform, model string) (domain.DiscoveryCandidate, bool)
|
|
UpdateCandidateStatus(ctx context.Context, candidateID string, status domain.DiscoveryCandidateStatus, failureCode, failureSummary string) error
|
|
GetSupplyPackage(ctx context.Context, platform, model string) (domain.SupplyPackage, bool)
|
|
UpsertSupplyPackage(ctx context.Context, pkg domain.SupplyPackage) error
|
|
}
|
|
|
|
type Service struct {
|
|
repo PackageEventRepository
|
|
now func() time.Time
|
|
}
|
|
|
|
type RecordPackagePublishedInput struct {
|
|
EventID string
|
|
PackageID int64
|
|
Platform string
|
|
Model string
|
|
Version int64
|
|
OccurredAt time.Time
|
|
}
|
|
|
|
type PublishDraftInput struct {
|
|
EventID string
|
|
Platform string
|
|
Model string
|
|
OccurredAt time.Time
|
|
}
|
|
|
|
type PublishDraftOutput struct {
|
|
Candidate domain.DiscoveryCandidate `json:"candidate"`
|
|
Package domain.SupplyPackage `json:"package"`
|
|
Event domain.PackageChangeEvent `json:"event"`
|
|
GatewaySyncStatus domain.GatewaySyncStatus `json:"gateway_sync_status"`
|
|
}
|
|
|
|
func NewService(repo PackageEventRepository) *Service {
|
|
return &Service{repo: repo, now: func() time.Time { return time.Now().UTC() }}
|
|
}
|
|
|
|
func (s *Service) RecordPackagePublished(ctx context.Context, input RecordPackagePublishedInput) (domain.PackageChangeEvent, error) {
|
|
if s == nil || s.repo == nil {
|
|
return domain.PackageChangeEvent{}, ErrInvalidPublishInput
|
|
}
|
|
if strings.TrimSpace(input.EventID) == "" || input.PackageID <= 0 || strings.TrimSpace(input.Platform) == "" || strings.TrimSpace(input.Model) == "" || input.Version <= 0 {
|
|
return domain.PackageChangeEvent{}, ErrInvalidPublishInput
|
|
}
|
|
|
|
event := domain.PackageChangeEvent{
|
|
EventID: strings.TrimSpace(input.EventID),
|
|
EventType: PackagePublishedEventType,
|
|
PackageID: input.PackageID,
|
|
Platform: strings.TrimSpace(input.Platform),
|
|
Model: strings.TrimSpace(input.Model),
|
|
OccurredAt: input.OccurredAt.UTC(),
|
|
Version: input.Version,
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
}
|
|
if event.OccurredAt.IsZero() {
|
|
event.OccurredAt = s.now()
|
|
}
|
|
return s.repo.AppendPackageEventContext(ctx, event)
|
|
}
|
|
|
|
func (s *Service) PublishDraft(ctx context.Context, input PublishDraftInput) (PublishDraftOutput, error) {
|
|
if s == nil || s.repo == nil {
|
|
return PublishDraftOutput{}, ErrInvalidPublishInput
|
|
}
|
|
platform := strings.TrimSpace(input.Platform)
|
|
model := strings.TrimSpace(input.Model)
|
|
eventID := strings.TrimSpace(input.EventID)
|
|
if eventID == "" || platform == "" || model == "" {
|
|
return PublishDraftOutput{}, ErrInvalidPublishInput
|
|
}
|
|
|
|
candidate, ok := s.repo.GetLatestDiscoveryCandidateContext(ctx, platform, model)
|
|
if !ok {
|
|
return PublishDraftOutput{}, ErrCandidateOrPackageMissing
|
|
}
|
|
pkg, ok := s.repo.GetSupplyPackage(ctx, platform, model)
|
|
if !ok {
|
|
return PublishDraftOutput{}, ErrCandidateOrPackageMissing
|
|
}
|
|
if candidate.Status == domain.DiscoveryCandidateStatusPublished && pkg.Status == "active" {
|
|
return PublishDraftOutput{}, ErrPackageAlreadyPublished
|
|
}
|
|
if candidate.Status == domain.DiscoveryCandidateStatusPublished || pkg.Status == "active" {
|
|
return PublishDraftOutput{}, ErrPackageAlreadyPublished
|
|
}
|
|
if candidate.Status != domain.DiscoveryCandidateStatusTestPassed {
|
|
return PublishDraftOutput{}, ErrCandidateNotPublishable
|
|
}
|
|
if pkg.Status != "draft" {
|
|
return PublishDraftOutput{}, ErrPackageNotPublishable
|
|
}
|
|
|
|
now := s.now()
|
|
candidate.Status = domain.DiscoveryCandidateStatusPublished
|
|
candidate.ReasonCode = ""
|
|
candidate.UpdatedAt = now
|
|
candidate.Version++
|
|
|
|
pkg.Status = "active"
|
|
pkg.UpdatedAt = now
|
|
pkg.Version++
|
|
|
|
version := pkg.Version
|
|
if version <= 0 {
|
|
version = 1
|
|
}
|
|
occurredAt := input.OccurredAt.UTC()
|
|
if occurredAt.IsZero() {
|
|
occurredAt = now
|
|
}
|
|
event := domain.PackageChangeEvent{
|
|
EventID: eventID,
|
|
AccountID: candidate.AccountID,
|
|
EventType: PackagePublishedEventType,
|
|
PackageID: pkg.PackageID,
|
|
Platform: platform,
|
|
Model: model,
|
|
OccurredAt: occurredAt,
|
|
Version: version,
|
|
GatewaySyncStatus: domain.GatewaySyncStatusPending,
|
|
}
|
|
|
|
if atomicRepo, ok := s.repo.(AtomicPublishRepository); ok {
|
|
result, err := atomicRepo.PublishPackageAtomically(ctx, PublishPackageAtomicInput{
|
|
Candidate: candidate,
|
|
Package: pkg,
|
|
Event: event,
|
|
})
|
|
if err != nil {
|
|
if errors.Is(err, ErrDuplicatePublishRequest) {
|
|
return PublishDraftOutput{}, ErrDuplicatePublishRequest
|
|
}
|
|
return PublishDraftOutput{}, err
|
|
}
|
|
return PublishDraftOutput{
|
|
Candidate: result.Candidate,
|
|
Package: result.Package,
|
|
Event: result.Event,
|
|
GatewaySyncStatus: result.Event.GatewaySyncStatus,
|
|
}, nil
|
|
}
|
|
|
|
if err := s.repo.UpdateCandidateStatus(ctx, candidate.CandidateID, domain.DiscoveryCandidateStatusPublished, "", ""); err != nil {
|
|
return PublishDraftOutput{}, err
|
|
}
|
|
if err := s.repo.UpsertSupplyPackage(ctx, pkg); err != nil {
|
|
return PublishDraftOutput{}, err
|
|
}
|
|
updatedPkg, ok := s.repo.GetSupplyPackage(ctx, platform, model)
|
|
if ok {
|
|
pkg = updatedPkg
|
|
event.PackageID = pkg.PackageID
|
|
event.Version = pkg.Version
|
|
}
|
|
storedEvent, err := s.repo.AppendPackageEventContext(ctx, event)
|
|
if err != nil {
|
|
if errors.Is(err, ErrDuplicatePublishRequest) {
|
|
return PublishDraftOutput{}, ErrDuplicatePublishRequest
|
|
}
|
|
return PublishDraftOutput{}, err
|
|
}
|
|
|
|
return PublishDraftOutput{
|
|
Candidate: candidate,
|
|
Package: pkg,
|
|
Event: storedEvent,
|
|
GatewaySyncStatus: storedEvent.GatewaySyncStatus,
|
|
}, nil
|
|
}
|