-
Notifications
You must be signed in to change notification settings - Fork 8.3k
/
Copy pathqueue.go
193 lines (163 loc) · 4.97 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package task
import (
"fmt"
"time"
"k8s.io/klog/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
var keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
// Queue manages a time work queue through an independent worker that invokes the
// given sync function for every work item inserted.
// The queue uses an internal timestamp that allows the removal of certain elements
// which timestamp is older than the last successful get operation.
type Queue struct {
// queue is the work queue the worker polls
queue workqueue.TypedRateLimitingInterface[any]
// sync is called for each item in the queue
sync func(interface{}) error
// workerDone is closed when the worker exits
workerDone chan bool
// fn makes a key for an API object
fn func(obj interface{}) (interface{}, error)
// lastSync is the Unix epoch time of the last execution of 'sync'
lastSync int64
}
// Element represents one item of the queue
type Element struct {
Key interface{}
Timestamp int64
IsSkippable bool
}
// Run starts processing elements in the queue
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {
wait.Until(t.worker, period, stopCh)
}
// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
t.enqueue(obj, false)
}
// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
t.enqueue(obj, true)
}
// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
if t.IsShuttingDown() {
klog.ErrorS(nil, "queue has been shutdown, failed to enqueue", "key", obj)
return
}
ts := time.Now().UnixNano()
if !skippable {
// make sure the timestamp is bigger than lastSync
ts = time.Now().Add(24 * time.Hour).UnixNano()
}
klog.V(3).InfoS("queuing", "item", obj)
key, err := t.fn(obj)
if err != nil {
klog.ErrorS(err, "creating object key", "item", obj)
return
}
t.queue.Add(Element{
Key: key,
Timestamp: ts,
})
}
func (t *Queue) defaultKeyFunc(obj interface{}) (interface{}, error) {
key, err := keyFunc(obj)
if err != nil {
return "", fmt.Errorf("could not get key for object %+v: %v", obj, err)
}
return key, nil
}
// worker processes work in the queue through sync.
func (t *Queue) worker() {
for {
key, quit := t.queue.Get()
if quit {
if !isClosed(t.workerDone) {
close(t.workerDone)
}
return
}
ts := time.Now().UnixNano()
item, ok := key.(Element)
if !ok {
klog.ErrorS(nil, "invalid item type", "key", key)
}
if item.Timestamp != 0 && t.lastSync > item.Timestamp {
klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp)
t.queue.Forget(key)
t.queue.Done(key)
continue
}
klog.V(3).InfoS("syncing", "key", item.Key)
if err := t.sync(key); err != nil {
klog.ErrorS(err, "requeuing", "key", item.Key)
t.queue.AddRateLimited(Element{
Key: item.Key,
Timestamp: 0,
})
} else {
t.queue.Forget(key)
t.lastSync = ts
}
t.queue.Done(key)
}
}
func isClosed(ch <-chan bool) bool {
select {
case <-ch:
return true
default:
}
return false
}
// Shutdown shuts down the work queue and waits for the worker to ACK
func (t *Queue) Shutdown() {
t.queue.ShutDown()
<-t.workerDone
}
// IsShuttingDown returns if the method Shutdown was invoked
func (t *Queue) IsShuttingDown() bool {
return t.queue.ShuttingDown()
}
// NewTaskQueue creates a new task queue with the given sync function.
// The sync function is called for every element inserted into the queue.
func NewTaskQueue(syncFn func(interface{}) error) *Queue {
return NewCustomTaskQueue(syncFn, nil)
}
// NewCustomTaskQueue creates a new custom task queue with the given sync function.
func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue {
q := &Queue{
queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()),
sync: syncFn,
workerDone: make(chan bool),
fn: fn,
}
if fn == nil {
q.fn = q.defaultKeyFunc
}
return q
}
// GetDummyObject returns a valid object that can be used in the Queue
func GetDummyObject(name string) *metav1.ObjectMeta {
return &metav1.ObjectMeta{
Name: name,
}
}