Skip to content

Commit

Permalink
Fix #46 - add locks to pod and namespace update operations
Browse files Browse the repository at this point in the history
  • Loading branch information
jtblin committed Jun 5, 2017
1 parent d91c0d4 commit e815c20
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 24 deletions.
34 changes: 21 additions & 13 deletions namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kube2iam

import (
"encoding/json"
"sync"

log "github.com/Sirupsen/logrus"
"k8s.io/client-go/pkg/api/v1"
Expand All @@ -11,6 +12,7 @@ import (

// NamespaceHandler represents a namespace handler.
type NamespaceHandler struct {
mutex sync.RWMutex
storage *store.Store
}

Expand All @@ -28,32 +30,38 @@ func (h *NamespaceHandler) OnAdd(obj interface{}) {
return
}

log.WithFields(h.namespaceFields(ns)).Debug("Namespace OnAdd")
logger := log.WithFields(h.namespaceFields(ns))
logger.Debug("Namespace OnAdd")

roles := h.getRoleAnnotation(ns)
for _, role := range roles {
log.WithFields(h.namespaceFields(ns)).WithField("ns.role", role).Debug("Namespace OnAdd - Role")
logger.WithField("ns.role", role).Info("Add role to namespace")
h.storage.AddRoleToNamespace(ns.GetName(), role)
}

}

// OnUpdate called with a namespace is updated inside k8s.
func (h *NamespaceHandler) OnUpdate(oldObj, newObj interface{}) {
//ons, ok := oldObj.(*v1.Namespace)
ons, ok := oldObj.(*v1.Namespace)
nns, ok := newObj.(*v1.Namespace)
if !ok {
log.Errorf("Expected Namespace but OnUpdate handler received %+v", newObj)
return
}
log.WithFields(h.namespaceFields(nns)).Debug("Namespace OnUpdate")
logger := log.WithFields(h.namespaceFields(nns))
logger.Debug("Namespace OnUpdate")

roles := h.getRoleAnnotation(nns)
nsname := nns.GetName()
h.storage.DeleteNamespace(nsname)
for _, role := range roles {
log.WithFields(h.namespaceFields(nns)).WithField("ns.role", role).Debug("Namespace OnUpdate - Role")
h.storage.AddRoleToNamespace(nsname, role)
if annotationDiffers(ons.GetAnnotations(), nns.GetAnnotations(), h.storage.NamespaceKey) {
roles := h.getRoleAnnotation(nns)
nsName := nns.GetName()
h.mutex.Lock()
defer h.mutex.Unlock()
logger.Info("Deleting namespace from store (OnUpdate)")
h.storage.DeleteNamespace(nsName)
for _, role := range roles {
logger.WithField("ns.role", role).Info("Add role namespace (OnUpdate)")
h.storage.AddRoleToNamespace(nsName, role)
}
}
}

Expand All @@ -64,14 +72,14 @@ func (h *NamespaceHandler) OnDelete(obj interface{}) {
log.Errorf("Expected Namespace but OnDelete handler received %+v", obj)
return
}
log.WithFields(h.namespaceFields(ns)).Debug("Namespace OnDelete")
log.WithFields(h.namespaceFields(ns)).Info("Deleting namespace from store (OnDelete)")
h.storage.DeleteNamespace(ns.GetName())
}

// getRoleAnnotations reads the "iam.amazonaws.com/allowed-roles" annotation off a namespace
// and splits them as a JSON list (["role1", "role2", "role3"])
func (h *NamespaceHandler) getRoleAnnotation(ns *v1.Namespace) []string {
rolesString := ns.Annotations[h.storage.NamespaceKey]
rolesString := ns.GetAnnotations()[h.storage.NamespaceKey]
if rolesString != "" {
var decoded []string
if err := json.Unmarshal([]byte(rolesString), &decoded); err != nil {
Expand Down
29 changes: 18 additions & 11 deletions pod.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kube2iam

import (
"sync"

log "github.com/Sirupsen/logrus"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -10,6 +12,7 @@ import (

// PodHandler represents a pod handler.
type PodHandler struct {
mutex sync.RWMutex
storage *store.Store
}

Expand All @@ -29,17 +32,24 @@ func (p *PodHandler) OnAdd(obj interface{}) {
log.Errorf("Expected Pod but OnAdd handler received %+v", obj)
return
}
log.WithFields(p.podFields(pod)).Debug("Pod OnAdd")
logger := log.WithFields(p.podFields(pod))
logger.Debug("Pod OnAdd")

p.storage.AddNamespaceToIP(pod)

if pod.Status.PodIP != "" {
if role, ok := pod.GetAnnotations()[p.storage.IamRoleKey]; ok {
logger.Info("Adding pod to store")
p.storage.AddRoleToIP(pod, role)
}
}
}

func (p *PodHandler) shouldUpdate(oldPod, newPod *v1.Pod) bool {
return oldPod.Status.PodIP != newPod.Status.PodIP ||
annotationDiffers(oldPod.GetAnnotations(), newPod.GetAnnotations(), p.storage.IamRoleKey)
}

// OnUpdate is called when a pod is modified.
func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) {
oldPod, ok1 := oldObj.(*v1.Pod)
Expand All @@ -51,17 +61,12 @@ func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) {
logger := log.WithFields(p.podFields(newPod))
logger.Debug("Pod OnUpdate")

if oldPod.Status.PodIP != newPod.Status.PodIP {
if p.shouldUpdate(oldPod, newPod) {
logger.Info("Updating pod due to added/updated annotation value or different pod IP")
p.mutex.Lock()
defer p.mutex.Unlock()
p.OnDelete(oldPod)
p.OnAdd(newPod)
return
}

if annotationDiffers(oldPod.Annotations, newPod.Annotations, p.storage.IamRoleKey) {
logger.Debug("Updating pod due to added/updated annotation value")
p.OnDelete(oldPod)
p.OnAdd(newPod)
return
}
}

Expand All @@ -80,9 +85,11 @@ func (p *PodHandler) OnDelete(obj interface{}) {
return
}

log.WithFields(p.podFields(pod)).Debug("Pod OnDelete")
logger := log.WithFields(p.podFields(pod))
logger.Debug("Pod OnDelete")

if pod.Status.PodIP != "" {
logger.Info("Removing pod from store")
p.storage.DeleteIP(pod.Status.PodIP)
}
}
Expand Down

0 comments on commit e815c20

Please sign in to comment.