Skip to content

Commit

Permalink
use RetryWatcher to survive apiserver restarts a bit better
Browse files Browse the repository at this point in the history
  • Loading branch information
xrstf committed May 24, 2022
1 parent 6e8ba05 commit f5f53e3
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 18 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/imdario/mergo v0.3.11 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
Expand Down Expand Up @@ -168,6 +169,7 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
Expand Down Expand Up @@ -227,6 +229,7 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
51 changes: 47 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (

"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
watchtools "k8s.io/client-go/tools/watch"
)

type options struct {
Expand Down Expand Up @@ -64,8 +67,10 @@ func main() {
opt.kubeconfig = os.Getenv("KUBECONFIG")
}

var labelSelector labels.Selector
if opt.labels != "" {
if _, err := labels.Parse(opt.labels); err != nil {
var err error
if labelSelector, err = labels.Parse(opt.labels); err != nil {
log.Fatalf("Invalid label selector: %v", err)
}
}
Expand Down Expand Up @@ -122,13 +127,51 @@ func main() {

log.Debug("Starting to watch pods…")

wi, err := resourceInterface.Watch(rootCtx, metav1.ListOptions{
LabelSelector: opt.labels,
// to use the retrywatcher, we need a start revision; setting this to empty or "0"
// is not supported, so we need a real revision; to achieve this we simply create
// a "standard" watcher, takes the first event and its resourceVersion as the
// starting point for the second, longlived retrying watcher
initialPods, resourceVersion, err := getStartPods(rootCtx, clientset, opt.labels)
if err != nil {
log.Fatalf("Failed to determine initial resourceVersion: %v", err)
}

wi, err := watchtools.NewRetryWatcher(resourceVersion, &watchContextInjector{
ctx: rootCtx,
ri: resourceInterface,
})
if err != nil {
log.Fatalf("Failed to create watch for pods: %v", err)
}

w := watcher.NewWatcher(clientset, c, log, opt.namespaces, args, opt.containerNames, opt.live)
watcherOpts := watcher.Options{
LabelSelector: labelSelector,
Namespaces: opt.namespaces,
ResourceNames: args,
ContainerNames: opt.containerNames,
RunningOnly: opt.live,
}

w := watcher.NewWatcher(clientset, c, log, initialPods, watcherOpts)
w.Watch(rootCtx, wi)
}

func getStartPods(ctx context.Context, cs *kubernetes.Clientset, labelSelector string) ([]corev1.Pod, string, error) {
pods, err := cs.CoreV1().Pods("").List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return nil, "", fmt.Errorf("failed to perform list on Pods: %w", err)
}

return pods.Items, pods.ResourceVersion, nil
}

type watchContextInjector struct {
ctx context.Context
ri dynamic.ResourceInterface
}

func (cw *watchContextInjector) Watch(options metav1.ListOptions) (watch.Interface, error) {
return cw.ri.Watch(cw.ctx, options)
}
55 changes: 41 additions & 14 deletions pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -21,27 +22,43 @@ type Watcher struct {
clientset *kubernetes.Clientset
log logrus.FieldLogger
collector collector.Collector
namespaces []string
resourceNames []string
containerNames []string
initialPods []corev1.Pod
opt Options
seenContainers sets.String
runningOnly bool
}

func NewWatcher(clientset *kubernetes.Clientset, c collector.Collector, log logrus.FieldLogger, namespaces, resourceNames, containerNames []string, runningOnly bool) *Watcher {
type Options struct {
LabelSelector labels.Selector
Namespaces []string
ResourceNames []string
ContainerNames []string
RunningOnly bool
}

func NewWatcher(
clientset *kubernetes.Clientset,
c collector.Collector,
log logrus.FieldLogger,
initialPods []corev1.Pod,
opt Options,
) *Watcher {
return &Watcher{
clientset: clientset,
log: log,
collector: c,
namespaces: namespaces,
resourceNames: resourceNames,
containerNames: containerNames,
initialPods: initialPods,
opt: opt,
seenContainers: sets.NewString(),
runningOnly: runningOnly,
}
}

func (w *Watcher) Watch(ctx context.Context, wi watch.Interface) {
for i := range w.initialPods {
if w.podMatchesCriteria(&w.initialPods[i]) {
w.startLogCollectors(ctx, &w.initialPods[i])
}
}

for event := range wi.ResultChan() {
obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
Expand Down Expand Up @@ -92,7 +109,7 @@ func (w *Watcher) startLogCollectorsForContainers(ctx context.Context, pod *core
}

// container sttaus not what we want
if w.runningOnly {
if w.opt.RunningOnly {
if status.State.Running == nil {
containerLog.Debug("Container is not running.")
continue
Expand Down Expand Up @@ -147,11 +164,11 @@ func (w *Watcher) getPodLog(pod *corev1.Pod) logrus.FieldLogger {
func (w *Watcher) podMatchesCriteria(pod *corev1.Pod) bool {
podLog := w.getPodLog(pod)

return w.resourceNameMatches(podLog, pod) && w.resourceNamespaceMatches(podLog, pod)
return w.resourceNameMatches(podLog, pod) && w.resourceNamespaceMatches(podLog, pod) && w.resourceLabelsMatches(podLog, pod)
}

func (w *Watcher) resourceNameMatches(log logrus.FieldLogger, pod *corev1.Pod) bool {
if needleMatchesPatterns(pod.GetName(), w.resourceNames) {
if needleMatchesPatterns(pod.GetName(), w.opt.ResourceNames) {
return true
}

Expand All @@ -161,7 +178,7 @@ func (w *Watcher) resourceNameMatches(log logrus.FieldLogger, pod *corev1.Pod) b
}

func (w *Watcher) resourceNamespaceMatches(log logrus.FieldLogger, pod *corev1.Pod) bool {
if needleMatchesPatterns(pod.GetNamespace(), w.namespaces) {
if needleMatchesPatterns(pod.GetNamespace(), w.opt.Namespaces) {
return true
}

Expand All @@ -170,8 +187,18 @@ func (w *Watcher) resourceNamespaceMatches(log logrus.FieldLogger, pod *corev1.P
return false
}

func (w *Watcher) resourceLabelsMatches(log logrus.FieldLogger, pod *corev1.Pod) bool {
if w.opt.LabelSelector == nil || w.opt.LabelSelector.Matches(labels.Set(pod.Labels)) {
return true
}

log.Debug("Pod labels do not match.")

return false
}

func (w *Watcher) containerNameMatches(containerName string) bool {
return needleMatchesPatterns(containerName, w.containerNames)
return needleMatchesPatterns(containerName, w.opt.ContainerNames)
}

func nameMatches(name string, pattern string) bool {
Expand Down

0 comments on commit f5f53e3

Please sign in to comment.