-
Notifications
You must be signed in to change notification settings - Fork 5
/
queue.go
146 lines (125 loc) · 3.27 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Package timerqueue implements a priority queue for objects scheduled at a
// particular time.
package timerqueue
import (
"container/heap"
"errors"
"time"
)
// Timer is an interface that types implement to schedule and receive OnTimer
// callbacks.
type Timer interface {
OnTimer(t time.Time)
}
// Queue is a time-sorted collection of Timer objects.
type Queue struct {
heap timerHeap
table map[Timer]*timerData
}
type timerData struct {
timer Timer
time time.Time
index int
}
// New creates a new timer priority queue.
func New() *Queue {
return &Queue{
table: make(map[Timer]*timerData),
}
}
// Len returns the current number of timer objects in the queue.
func (q *Queue) Len() int {
return len(q.heap)
}
// Schedule schedules a timer for exectuion at time tm. If the
// timer was already scheduled, it is rescheduled.
func (q *Queue) Schedule(t Timer, tm time.Time) {
if data, ok := q.table[t]; !ok {
data = &timerData{t, tm, 0}
heap.Push(&q.heap, data)
q.table[t] = data
} else {
data.time = tm
heap.Fix(&q.heap, data.index)
}
}
// Unschedule unschedules a timer's execution.
func (q *Queue) Unschedule(t Timer) {
if data, ok := q.table[t]; ok {
heap.Remove(&q.heap, data.index)
delete(q.table, t)
}
}
// GetTime returns the time at which the timer is scheduled.
// If the timer isn't currently scheduled, an error is returned.
func (q *Queue) GetTime(t Timer) (tm time.Time, err error) {
if data, ok := q.table[t]; ok {
return data.time, nil
}
return time.Time{}, errors.New("timerqueue: timer not scheduled")
}
// IsScheduled returns true if the timer is currently scheduled.
func (q *Queue) IsScheduled(t Timer) bool {
_, ok := q.table[t]
return ok
}
// Clear unschedules all currently scheduled timers.
func (q *Queue) Clear() {
q.heap, q.table = nil, make(map[Timer]*timerData)
}
// PopFirst removes and returns the next timer to be scheduled and
// the time at which it is scheduled to run.
func (q *Queue) PopFirst() (t Timer, tm time.Time) {
if len(q.heap) > 0 {
data := heap.Pop(&q.heap).(*timerData)
delete(q.table, data.timer)
return data.timer, data.time
}
return nil, time.Time{}
}
// PeekFirst returns the next timer to be scheduled and the time
// at which it is scheduled to run. It does not modify the contents
// of the timer queue.
func (q *Queue) PeekFirst() (t Timer, tm time.Time) {
if len(q.heap) > 0 {
return q.heap[0].timer, q.heap[0].time
}
return nil, time.Time{}
}
// Advance executes OnTimer callbacks for all timers scheduled to be
// run before the time 'tm'. Executed timers are removed from the
// timer queue.
func (q *Queue) Advance(tm time.Time) {
for len(q.heap) > 0 && !tm.Before(q.heap[0].time) {
data := q.heap[0]
heap.Remove(&q.heap, data.index)
delete(q.table, data.timer)
data.timer.OnTimer(data.time)
}
}
/*
* timerHeap
*/
type timerHeap []*timerData
func (h timerHeap) Len() int {
return len(h)
}
func (h timerHeap) Less(i, j int) bool {
return h[i].time.Before(h[j].time)
}
func (h timerHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index, h[j].index = i, j
}
func (h *timerHeap) Push(x interface{}) {
data := x.(*timerData)
*h = append(*h, data)
data.index = len(*h) - 1
}
func (h *timerHeap) Pop() interface{} {
n := len(*h)
data := (*h)[n-1]
*h = (*h)[:n-1]
data.index = -1
return data
}