-
Notifications
You must be signed in to change notification settings - Fork 0
/
handlers.go
166 lines (137 loc) · 3.9 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package kubenvoy
import (
"sort"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// This is a bit bad, maybe use Watch.interface ?
func handleEventHandlerPanic(f func()) {
defer func() {
if r := recover(); r != nil {
glog.Errorf("Panic in handling events: %v", r)
}
}()
f()
}
// ResourcesHandler handles resources
type ResourcesHandler interface {
Handle(resources []interface{})
}
// SharedInformerResourceEventHandlerWrapper implments cache.ResourceEventHandler,
// add triggers handle function seeing any event. It's capable of grouping
// results togethers with debounced function
type SharedInformerResourceEventHandlerWrapper struct {
informer cache.SharedInformer
handler ResourcesHandler
debouncedTrigger func()
}
func NewSharedInformerResourceEventHandlerWrapper(informer cache.SharedInformer, handler ResourcesHandler, debounce bool) *SharedInformerResourceEventHandlerWrapper {
wrapper := &SharedInformerResourceEventHandlerWrapper{
informer: informer,
handler: handler,
}
if debounce {
wrapper.debouncedTrigger = debounced(1000*time.Millisecond, wrapper.trigger)
}
return wrapper
}
func debounced(interval time.Duration, f func()) func() {
in := make(chan func())
out := make(chan func())
go func() {
var f func() = func() {}
for {
select {
case f = <-in:
case <-time.After(interval):
out <- f
<-in
// new interval
}
}
}()
go func() {
for {
select {
case f := <-out:
f()
}
}
}()
return func() {
in <- f
}
}
func (w *SharedInformerResourceEventHandlerWrapper) trigger() {
handleEventHandlerPanic(func() {
w.handler.Handle(w.informer.GetStore().List())
})
}
func (w *SharedInformerResourceEventHandlerWrapper) onEvent() {
if w.debouncedTrigger != nil {
w.debouncedTrigger()
} else {
w.trigger()
}
}
// OnAdd implements cache.ResourceEventHandler
func (w *SharedInformerResourceEventHandlerWrapper) OnAdd(obj interface{}) {
glog.V(3).Infof("OnAdd %v [%v] [%v]", obj, w.informer.LastSyncResourceVersion())
w.onEvent()
}
// OnUpdate implements cache.ResourceEventHandler
func (w *SharedInformerResourceEventHandlerWrapper) OnUpdate(oldObj, newObj interface{}) {
glog.V(3).Infof("OnUpdate %v [%v] [%v]", newObj, w.informer.LastSyncResourceVersion())
w.onEvent()
}
// OnDelete implements cache.ResourceEventHandler
func (w *SharedInformerResourceEventHandlerWrapper) OnDelete(obj interface{}) {
glog.V(3).Infof("OnDelete %v [%v] [%v] ", obj, w.informer.LastSyncResourceVersion())
w.onEvent()
}
// EndpointsHandler handles one kubernetes endpoints result, it implements cache.ResourcesHandler
type EndpointsHandler func(endpoints *v1.Endpoints)
func (h EndpointsHandler) EndpointsHandlerFuncs() *cache.ResourceEventHandlerFuncs {
return &cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
h(obj.(*v1.Endpoints))
},
UpdateFunc: func(obj, newObj interface{}) {
h(obj.(*v1.Endpoints))
},
DeleteFunc: func(obj interface{}) {
h(nil)
},
}
}
// ServicesHandler handles kubernetes services results, it implements ResourcesHandler
type ServicesHandler func(service []*v1.Service)
func (h ServicesHandler) Handle(resources []interface{}) {
services := make(ServiceSorter, len(resources))
for i, r := range resources {
services[i] = r.(*v1.Service)
}
sort.Sort(services)
h(services)
}
type ServiceSorter []*v1.Service
// Len is part of sort.Interface.
func (s ServiceSorter) Len() int {
return len(s)
}
// Swap is part of sort.Interface.
func (s ServiceSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// Less is part of sort.Interface. It is implemented by calling the "by" closure in the sorter.
func (s ServiceSorter) Less(i, j int) bool {
namespaceCompare := strings.Compare(s[i].Namespace, s[j].Namespace)
if namespaceCompare == 0 {
nameCompare := strings.Compare(s[i].Name, s[j].Name)
return nameCompare < 0
}
return namespaceCompare < 0
}