Files
2026-05-12 18:49:52 +08:00

117 lines
2.1 KiB
Go

package poller
import (
"context"
"sync"
"time"
)
type RuntimeStatus struct {
Started bool `json:"started"`
Paused bool `json:"paused"`
Cursor string `json:"cursor"`
LastPollAt *time.Time `json:"last_poll_at,omitempty"`
LastError string `json:"last_error,omitempty"`
}
type Runtime struct {
poller *GatewayPackagePoller
interval time.Duration
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
paused bool
lastPollAt *time.Time
lastError string
}
func NewRuntime(poller *GatewayPackagePoller, interval time.Duration) *Runtime {
if interval <= 0 {
interval = time.Second
}
return &Runtime{poller: poller, interval: interval}
}
func (r *Runtime) Start(parent context.Context) bool {
if r == nil || r.poller == nil || r.cancel != nil {
return false
}
ctx, cancel := context.WithCancel(parent)
r.cancel = cancel
r.wg.Add(1)
go func() {
defer r.wg.Done()
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
r.mu.RLock()
paused := r.paused
r.mu.RUnlock()
if !paused {
now := time.Now().UTC()
_, err := r.poller.PollOnce(ctx)
r.mu.Lock()
r.lastPollAt = &now
if err != nil {
r.lastError = err.Error()
} else {
r.lastError = ""
}
r.mu.Unlock()
}
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}()
return true
}
func (r *Runtime) Pause() bool {
if r == nil {
return false
}
r.mu.Lock()
defer r.mu.Unlock()
r.paused = true
return true
}
func (r *Runtime) Resume() bool {
if r == nil {
return false
}
r.mu.Lock()
defer r.mu.Unlock()
r.paused = false
return true
}
func (r *Runtime) Status() RuntimeStatus {
if r == nil {
return RuntimeStatus{}
}
r.mu.RLock()
defer r.mu.RUnlock()
status := RuntimeStatus{Started: r.cancel != nil, Paused: r.paused, LastError: r.lastError}
if r.poller != nil {
status.Cursor = r.poller.Cursor()
}
if r.lastPollAt != nil {
t := *r.lastPollAt
status.LastPollAt = &t
}
return status
}
func (r *Runtime) Stop() {
if r == nil || r.cancel == nil {
return
}
r.cancel()
r.wg.Wait()
r.cancel = nil
}