87 lines
2.5 KiB
Go
87 lines
2.5 KiB
Go
package poller
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"supply-intelligence/internal/admission"
|
|
"supply-intelligence/internal/metrics"
|
|
)
|
|
|
|
// AdmissionRuntime periodically runs admission tests for eligible candidates.
|
|
type AdmissionRuntime struct {
|
|
admissionService *admission.Service
|
|
interval time.Duration
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// NewAdmissionRuntime creates an admission runtime with the given service and interval.
|
|
func NewAdmissionRuntime(admissionService *admission.Service, interval time.Duration) *AdmissionRuntime {
|
|
return &AdmissionRuntime{admissionService: admissionService, interval: interval}
|
|
}
|
|
|
|
// Start begins periodic admission testing. Does nothing if already started.
|
|
func (r *AdmissionRuntime) Start(parent context.Context) bool {
|
|
if r == nil || r.admissionService == nil || r.cancel != nil {
|
|
return false
|
|
}
|
|
ctx, cancel := context.WithCancel(parent)
|
|
r.cancel = cancel
|
|
r.wg.Add(1)
|
|
go func() {
|
|
defer r.wg.Done()
|
|
// Run immediately on startup, then on interval
|
|
r.runTests(context.Background())
|
|
ticker := time.NewTicker(r.interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
r.runTests(context.Background())
|
|
case <-ctx.Done():
|
|
log.Println("[admission-runtime] stopped")
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
log.Printf("[admission-runtime] started with interval=%v", r.interval)
|
|
return true
|
|
}
|
|
|
|
// Stop halts periodic testing.
|
|
func (r *AdmissionRuntime) Stop() {
|
|
if r == nil || r.cancel == nil {
|
|
return
|
|
}
|
|
r.cancel()
|
|
r.wg.Wait()
|
|
}
|
|
|
|
func (r *AdmissionRuntime) runTests(ctx context.Context) {
|
|
candidates := r.admissionService.GetRunnableCandidates(ctx)
|
|
if len(candidates) == 0 {
|
|
return
|
|
}
|
|
log.Printf("[admission-runtime] running admission tests for %d candidates", len(candidates))
|
|
for _, c := range candidates {
|
|
start := time.Now()
|
|
result, err := r.admissionService.RunAdmission(ctx, c.CandidateID)
|
|
elapsed := time.Since(start).Seconds()
|
|
metrics.AdmissionLatencySeconds.WithLabelValues(c.Platform).Observe(elapsed)
|
|
if err != nil {
|
|
log.Printf("[admission-runtime] candidate=%s error=%v", c.CandidateID, err)
|
|
continue
|
|
}
|
|
if result.Passed {
|
|
metrics.AdmissionTestsTotal.WithLabelValues(c.Platform, "passed").Inc()
|
|
log.Printf("[admission-runtime] candidate=%s PASSED", c.CandidateID)
|
|
} else {
|
|
metrics.AdmissionTestsTotal.WithLabelValues(c.Platform, "failed").Inc()
|
|
log.Printf("[admission-runtime] candidate=%s FAILED code=%s", c.CandidateID, result.FailureCode)
|
|
}
|
|
}
|
|
}
|