package poller import ( "context" "sync" "time" ) type RuntimeStatus struct { Started bool `json:"started"` Paused bool `json:"paused"` Cursor string `json:"cursor"` LastPollAt *time.Time `json:"last_poll_at,omitempty"` LastError string `json:"last_error,omitempty"` } type Runtime struct { poller *GatewayPackagePoller interval time.Duration cancel context.CancelFunc wg sync.WaitGroup mu sync.RWMutex paused bool lastPollAt *time.Time lastError string } func NewRuntime(poller *GatewayPackagePoller, interval time.Duration) *Runtime { if interval <= 0 { interval = time.Second } return &Runtime{poller: poller, interval: interval} } func (r *Runtime) Start(parent context.Context) bool { if r == nil || r.poller == nil || r.cancel != nil { return false } ctx, cancel := context.WithCancel(parent) r.cancel = cancel r.wg.Add(1) go func() { defer r.wg.Done() ticker := time.NewTicker(r.interval) defer ticker.Stop() for { r.mu.RLock() paused := r.paused r.mu.RUnlock() if !paused { now := time.Now().UTC() _, err := r.poller.PollOnce(ctx) r.mu.Lock() r.lastPollAt = &now if err != nil { r.lastError = err.Error() } else { r.lastError = "" } r.mu.Unlock() } select { case <-ctx.Done(): return case <-ticker.C: } } }() return true } func (r *Runtime) Pause() bool { if r == nil { return false } r.mu.Lock() defer r.mu.Unlock() r.paused = true return true } func (r *Runtime) Resume() bool { if r == nil { return false } r.mu.Lock() defer r.mu.Unlock() r.paused = false return true } func (r *Runtime) Status() RuntimeStatus { if r == nil { return RuntimeStatus{} } r.mu.RLock() defer r.mu.RUnlock() status := RuntimeStatus{Started: r.cancel != nil, Paused: r.paused, LastError: r.lastError} if r.poller != nil { status.Cursor = r.poller.Cursor() } if r.lastPollAt != nil { t := *r.lastPollAt status.LastPollAt = &t } return status } func (r *Runtime) Stop() { if r == nil || r.cancel == nil { return } r.cancel() r.wg.Wait() r.cancel = nil }