-
Notifications
You must be signed in to change notification settings - Fork 0
/
clocks.go
157 lines (140 loc) · 3.66 KB
/
clocks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Package gronos provides a concurrent application management system.
package gronos
import (
"sync"
"sync/atomic"
"time"
)
// Ticker interface represents an object that can be ticked.
type Ticker interface {
// Tick is called when the ticker is triggered.
Tick()
}
// ExecutionMode defines how a ticker should be executed.
type ExecutionMode int
const (
// NonBlocking mode executes the ticker without waiting for completion.
NonBlocking ExecutionMode = iota
// ManagedTimeline mode ensures tickers are executed in order, potentially delaying subsequent ticks.
ManagedTimeline
// BestEffort mode attempts to execute tickers on time but may skip ticks if the system is overloaded.
BestEffort
)
// TickerSubscriber represents a subscriber to the clock's ticks.
type TickerSubscriber struct {
Ticker Ticker
Mode ExecutionMode
lastExecTime atomic.Value
DynamicInterval func(lastInterval time.Duration) time.Duration
}
// Clock represents a clock that can manage multiple tickers with different execution modes.
type Clock struct {
name string
interval time.Duration
ticker *time.Ticker
stopCh chan struct{}
subs sync.Map
ticking atomic.Bool
started atomic.Bool
}
// NewClock creates a new Clock instance with the given options.
//
// Example usage:
//
// clock := NewClock(
// WithName("MyClock"),
// WithInterval(time.Second),
// )
func NewClock(opts ...ClockOption) *Clock {
c := &Clock{
interval: 100 * time.Millisecond,
stopCh: make(chan struct{}),
}
for _, opt := range opts {
opt(c)
}
c.ticker = time.NewTicker(c.interval)
return c
}
// ClockOption is a function type for configuring a Clock instance.
type ClockOption func(*Clock)
// WithName sets the name of the Clock.
func WithName(name string) ClockOption {
return func(c *Clock) {
c.name = name
}
}
// WithInterval sets the tick interval of the Clock.
func WithInterval(interval time.Duration) ClockOption {
return func(c *Clock) {
c.interval = interval
}
}
// Add subscribes a Ticker to the Clock with the specified ExecutionMode.
//
// Example usage:
//
// clock.Add(&MyTicker{}, NonBlocking)
func (c *Clock) Add(ticker Ticker, mode ExecutionMode) {
sub := &TickerSubscriber{
Ticker: ticker,
Mode: mode,
DynamicInterval: func(lastInterval time.Duration) time.Duration {
return c.interval
},
}
sub.lastExecTime.Store(time.Now())
c.subs.Store(ticker, sub)
}
// Start begins the Clock's ticking process.
//
// Example usage:
//
// clock.Start()
func (c *Clock) Start() {
if !c.started.CompareAndSwap(false, true) {
return
}
c.ticking.Store(true)
go c.dispatchTicks()
}
// Stop halts the Clock's ticking process.
//
// Example usage:
//
// clock.Stop()
func (c *Clock) Stop() {
if !c.ticking.CompareAndSwap(true, false) {
return
}
close(c.stopCh)
c.started.Store(false)
}
// dispatchTicks is the main loop that handles ticking and subscriber execution.
func (c *Clock) dispatchTicks() {
nextTick := time.Now().Add(c.interval)
for c.ticking.Load() {
now := time.Now()
if now.Before(nextTick) {
time.Sleep(nextTick.Sub(now))
continue
}
c.subs.Range(func(key, value interface{}) bool {
sub := value.(*TickerSubscriber)
lastExecTime := sub.lastExecTime.Load().(time.Time)
elapsedTime := now.Sub(lastExecTime)
interval := sub.DynamicInterval(c.interval)
if elapsedTime >= interval-time.Millisecond {
go c.executeTick(key, sub, now)
}
return true
})
nextTick = nextTick.Add(c.interval)
}
}
// executeTick performs the actual execution of a subscriber's Tick method.
func (c *Clock) executeTick(key interface{}, sub *TickerSubscriber, now time.Time) {
sub.Ticker.Tick()
sub.lastExecTime.Store(now)
c.subs.Store(key, sub)
}