-
Notifications
You must be signed in to change notification settings - Fork 5
/
services.go
89 lines (75 loc) · 2.47 KB
/
services.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package main
import (
"reflect"
"strings"
"github.com/golang/glog"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/unversioned"
v1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/tools/cache"
)
var (
svcAPIResource = unversioned.APIResource{Name: "services", Namespaced: true, Kind: "service"}
)
func newServicesListWatchController() *lwController {
return &lwController{
stopCh: make(chan struct{}),
}
}
func newServicesListWatchControllerForClientset(lbex *lbExController) *lwController {
lwc := newServicesListWatchController()
//Setup an informer to call functions when the ListWatch changes
listWatch := cache.NewListWatchFromClient(
lbex.clientset.Core().RESTClient(), "services", api.NamespaceAll, fields.Everything())
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: serviceCreatedFunc(lbex),
DeleteFunc: serviceDeletedFunc(lbex),
UpdateFunc: serviceUpdatedFunc(lbex),
}
lbex.servicesStore, lwc.controller = cache.NewInformer(listWatch, &v1.Service{}, resyncPeriod, eventHandler)
return lwc
}
func filterObject(obj interface{}) bool {
// obj can be filtered for either a: type conversion failure,
// b: namespace is 'kube-system/' - which we don't handle.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
glog.V(5).Infof("filterObject: DeletionHandlingMetaNamespaceKeyFunc(): err: %v", err)
return true
}
glog.V(5).Infof("filterObject: return %s has prefix 'kube-system/'", key)
return strings.HasPrefix(key, "kube-system/")
}
func serviceCreatedFunc(lbex *lbExController) func(obj interface{}) {
return func(obj interface{}) {
if filterObject(obj) {
glog.V(5).Infof("AddFunc: filtering out service object")
return
}
glog.V(5).Infof("AddFunc: enqueuing service object")
lbex.servicesQueue.Enqueue(obj)
}
}
func serviceDeletedFunc(lbex *lbExController) func(obj interface{}) {
return func(obj interface{}) {
if filterObject(obj) {
glog.V(5).Infof("DeleteFunc: filtering out service object")
return
}
glog.V(5).Infof("DeleteFunc: enqueuing service object")
lbex.servicesQueue.Enqueue(obj)
}
}
func serviceUpdatedFunc(lbex *lbExController) func(obj, newObj interface{}) {
return func(obj, newObj interface{}) {
if filterObject(obj) {
glog.V(5).Infof("UpdateFunc: filtering out service object")
return
}
if !reflect.DeepEqual(obj, newObj) {
glog.V(5).Infof("UpdateFunc: enqueuing unequal service object")
lbex.servicesQueue.Enqueue(newObj)
}
}
}