Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill the store #32

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 50 additions & 15 deletions cmd/k8s.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,70 @@
package cmd

import (
"errors"
"fmt"
"time"

"k8s.io/kubernetes/pkg/api"
kcache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
selector "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/util/wait"
)

const (
// Resync period for the kube controller loop.
resyncPeriod = 30 * time.Minute
resyncPeriod = 30 * time.Minute
podIPIndexName = "byPodIP"
)

var errPodNotFound = errors.New("Pod with specified IP not found")

type k8s struct {
*client.Client
podStore kcache.Indexer
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, it's an Indexer not a store. Indexers contain store, so let's rename that to podIndexer.

podController *kcache.Controller
}

func podIPIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("obj not pod: %+v", obj)
}
if pod.Status.PodIP != "" && controller.IsPodActive(pod) {
return []string{pod.Status.PodIP}, nil
}
return nil, nil
}

// Returns a cache.ListWatch that gets all changes to pods.
func (k8s *k8s) createPodLW() *kcache.ListWatch {
return kcache.NewListWatchFromClient(k8s, "pods", api.NamespaceAll, selector.Everything())
}

func (k8s *k8s) watchForPods(podManager kcache.ResourceEventHandler) kcache.Store {
podStore, podController := kcache.NewInformer(
k8s.createPodLW(),
&api.Pod{},
resyncPeriod,
kcache.ResourceEventHandlerFuncs{
AddFunc: podManager.OnAdd,
DeleteFunc: podManager.OnDelete,
UpdateFunc: podManager.OnUpdate,
},
)
go podController.Run(wait.NeverStop)
return podStore
func (k8s *k8s) PodByIP(IP string) (*api.Pod, error) {
pods, err := k8s.podStore.ByIndex(podIPIndexName, IP)
if err != nil {
return nil, err
}

if len(pods) == 0 {
return nil, errPodNotFound
}
if len(pods) == 1 {
return pods[0].(*api.Pod), nil
}
podNames := make([]string, len(pods))
for i, pod := range pods {
podNames[i] = pod.(*api.Pod).ObjectMeta.Name
}
return nil, fmt.Errorf("%d pods (%v) with the ip %s indexed", len(pods), podNames, IP)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how indexer works exactly, but the code doesn't seem to do anything when a pod is removed so wouldn't there be a case old pods have been indexed at a specific IP and the IP has been recycled for a new pod? In that case, the code would return an error here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indexer takes care of calling this on every add/update (on both new & old pods)/remove and keeping the internal mapping up-to-date.

}

func (k8s *k8s) Run() {
k8s.podController.Run(wait.NeverStop)
}

func newK8s(host, token string, insecure bool) (*k8s, error) {
Expand All @@ -56,5 +83,13 @@ func newK8s(host, token string, insecure bool) (*k8s, error) {
if err != nil {
return nil, err
}
return &k8s{c}, nil
k8s := &k8s{c, nil, nil}
k8s.podStore, k8s.podController = kcache.NewIndexerInformer(
k8s.createPodLW(),
&api.Pod{},
resyncPeriod,
kcache.ResourceEventHandlerFuncs{},
kcache.Indexers{podIPIndexName: podIPIndexFunc},
)
return k8s, nil
}
29 changes: 19 additions & 10 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httputil"
"strings"
"time"

log "github.com/Sirupsen/logrus"
"github.com/cenk/backoff"
Expand All @@ -30,7 +31,6 @@ type Server struct {
Version bool
iam *iam
k8s *k8s
store *store
}

type appHandler func(http.ResponseWriter, *http.Request)
Expand All @@ -57,18 +57,28 @@ func parseRemoteAddr(addr string) string {

func (s *Server) getRole(IP string) (string, error) {
var role string
var err error
operation := func() error {
role, err = s.store.Get(IP)
return err
pod, err := s.k8s.PodByIP(IP)
if err != nil {
if err != errPodNotFound {
log.Warnf("Role request from IP %s resulted in error: %s", IP, err.Error())
}
return err
}
role = pod.Annotations[s.IAMRoleKey]
return nil
}

err = backoff.Retry(operation, backoff.NewExponentialBackOff())
if err != nil {
return "", err
exponential := backoff.NewExponentialBackOff()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea 👍

Wonder if we want to try for more than one second and if we should set the number of retries as well though.

exponential.MaxElapsedTime = 1 * time.Second
exponential.MaxInterval = 30 * time.Millisecond

err := backoff.Retry(operation, exponential)
if role == "" && err == nil {
err = errPodNotFound
}

return role, nil
return role, err
}

func (s *Server) securityCredentialsHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -128,8 +138,7 @@ func (s *Server) Run(host, token string, insecure bool) error {
return err
}
s.k8s = k8s
s.store = newStore(s.IAMRoleKey, s.DefaultIAMRole)
s.k8s.watchForPods(s.store)
go k8s.Run()
s.iam = newIAM(s.BaseRoleARN)
r := mux.NewRouter()
r.Handle("/{version}/meta-data/iam/security-credentials/", appHandler(s.securityCredentialsHandler))
Expand Down
94 changes: 0 additions & 94 deletions cmd/store.go

This file was deleted.