From e815c209584c36efd5de5176848448b443f40d73 Mon Sep 17 00:00:00 2001 From: Jerome Touffe-Blin Date: Mon, 5 Jun 2017 20:41:25 +1000 Subject: [PATCH] Fix #46 - add locks to pod and namespace update operations --- namespace.go | 34 +++++++++++++++++++++------------- pod.go | 29 ++++++++++++++++++----------- 2 files changed, 39 insertions(+), 24 deletions(-) diff --git a/namespace.go b/namespace.go index df4e5813..4a99df07 100644 --- a/namespace.go +++ b/namespace.go @@ -2,6 +2,7 @@ package kube2iam import ( "encoding/json" + "sync" log "github.com/Sirupsen/logrus" "k8s.io/client-go/pkg/api/v1" @@ -11,6 +12,7 @@ import ( // NamespaceHandler represents a namespace handler. type NamespaceHandler struct { + mutex sync.RWMutex storage *store.Store } @@ -28,32 +30,38 @@ func (h *NamespaceHandler) OnAdd(obj interface{}) { return } - log.WithFields(h.namespaceFields(ns)).Debug("Namespace OnAdd") + logger := log.WithFields(h.namespaceFields(ns)) + logger.Debug("Namespace OnAdd") roles := h.getRoleAnnotation(ns) for _, role := range roles { - log.WithFields(h.namespaceFields(ns)).WithField("ns.role", role).Debug("Namespace OnAdd - Role") + logger.WithField("ns.role", role).Info("Add role to namespace") h.storage.AddRoleToNamespace(ns.GetName(), role) } - } // OnUpdate called with a namespace is updated inside k8s. func (h *NamespaceHandler) OnUpdate(oldObj, newObj interface{}) { - //ons, ok := oldObj.(*v1.Namespace) + ons, ok := oldObj.(*v1.Namespace) nns, ok := newObj.(*v1.Namespace) if !ok { log.Errorf("Expected Namespace but OnUpdate handler received %+v", newObj) return } - log.WithFields(h.namespaceFields(nns)).Debug("Namespace OnUpdate") + logger := log.WithFields(h.namespaceFields(nns)) + logger.Debug("Namespace OnUpdate") - roles := h.getRoleAnnotation(nns) - nsname := nns.GetName() - h.storage.DeleteNamespace(nsname) - for _, role := range roles { - log.WithFields(h.namespaceFields(nns)).WithField("ns.role", role).Debug("Namespace OnUpdate - Role") - h.storage.AddRoleToNamespace(nsname, role) + if annotationDiffers(ons.GetAnnotations(), nns.GetAnnotations(), h.storage.NamespaceKey) { + roles := h.getRoleAnnotation(nns) + nsName := nns.GetName() + h.mutex.Lock() + defer h.mutex.Unlock() + logger.Info("Deleting namespace from store (OnUpdate)") + h.storage.DeleteNamespace(nsName) + for _, role := range roles { + logger.WithField("ns.role", role).Info("Add role namespace (OnUpdate)") + h.storage.AddRoleToNamespace(nsName, role) + } } } @@ -64,14 +72,14 @@ func (h *NamespaceHandler) OnDelete(obj interface{}) { log.Errorf("Expected Namespace but OnDelete handler received %+v", obj) return } - log.WithFields(h.namespaceFields(ns)).Debug("Namespace OnDelete") + log.WithFields(h.namespaceFields(ns)).Info("Deleting namespace from store (OnDelete)") h.storage.DeleteNamespace(ns.GetName()) } // getRoleAnnotations reads the "iam.amazonaws.com/allowed-roles" annotation off a namespace // and splits them as a JSON list (["role1", "role2", "role3"]) func (h *NamespaceHandler) getRoleAnnotation(ns *v1.Namespace) []string { - rolesString := ns.Annotations[h.storage.NamespaceKey] + rolesString := ns.GetAnnotations()[h.storage.NamespaceKey] if rolesString != "" { var decoded []string if err := json.Unmarshal([]byte(rolesString), &decoded); err != nil { diff --git a/pod.go b/pod.go index 43f2945f..fc0b2991 100644 --- a/pod.go +++ b/pod.go @@ -1,6 +1,8 @@ package kube2iam import ( + "sync" + log "github.com/Sirupsen/logrus" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" @@ -10,6 +12,7 @@ import ( // PodHandler represents a pod handler. type PodHandler struct { + mutex sync.RWMutex storage *store.Store } @@ -29,17 +32,24 @@ func (p *PodHandler) OnAdd(obj interface{}) { log.Errorf("Expected Pod but OnAdd handler received %+v", obj) return } - log.WithFields(p.podFields(pod)).Debug("Pod OnAdd") + logger := log.WithFields(p.podFields(pod)) + logger.Debug("Pod OnAdd") p.storage.AddNamespaceToIP(pod) if pod.Status.PodIP != "" { if role, ok := pod.GetAnnotations()[p.storage.IamRoleKey]; ok { + logger.Info("Adding pod to store") p.storage.AddRoleToIP(pod, role) } } } +func (p *PodHandler) shouldUpdate(oldPod, newPod *v1.Pod) bool { + return oldPod.Status.PodIP != newPod.Status.PodIP || + annotationDiffers(oldPod.GetAnnotations(), newPod.GetAnnotations(), p.storage.IamRoleKey) +} + // OnUpdate is called when a pod is modified. func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) { oldPod, ok1 := oldObj.(*v1.Pod) @@ -51,17 +61,12 @@ func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) { logger := log.WithFields(p.podFields(newPod)) logger.Debug("Pod OnUpdate") - if oldPod.Status.PodIP != newPod.Status.PodIP { + if p.shouldUpdate(oldPod, newPod) { + logger.Info("Updating pod due to added/updated annotation value or different pod IP") + p.mutex.Lock() + defer p.mutex.Unlock() p.OnDelete(oldPod) p.OnAdd(newPod) - return - } - - if annotationDiffers(oldPod.Annotations, newPod.Annotations, p.storage.IamRoleKey) { - logger.Debug("Updating pod due to added/updated annotation value") - p.OnDelete(oldPod) - p.OnAdd(newPod) - return } } @@ -80,9 +85,11 @@ func (p *PodHandler) OnDelete(obj interface{}) { return } - log.WithFields(p.podFields(pod)).Debug("Pod OnDelete") + logger := log.WithFields(p.podFields(pod)) + logger.Debug("Pod OnDelete") if pod.Status.PodIP != "" { + logger.Info("Removing pod from store") p.storage.DeleteIP(pod.Status.PodIP) } }