diff --git a/go.mod b/go.mod index 028e3b3..fa92287 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect + github.com/google/go-cmp v0.5.5 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/imdario/mergo v0.3.11 // indirect github.com/josharian/intern v1.0.0 // indirect diff --git a/go.sum b/go.sum index 6d601db..2084692 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -168,6 +169,7 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -227,6 +229,7 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/main.go b/main.go index d5d632f..eb01450 100644 --- a/main.go +++ b/main.go @@ -11,12 +11,15 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/pflag" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" + watchtools "k8s.io/client-go/tools/watch" ) type options struct { @@ -64,8 +67,10 @@ func main() { opt.kubeconfig = os.Getenv("KUBECONFIG") } + var labelSelector labels.Selector if opt.labels != "" { - if _, err := labels.Parse(opt.labels); err != nil { + var err error + if labelSelector, err = labels.Parse(opt.labels); err != nil { log.Fatalf("Invalid label selector: %v", err) } } @@ -122,13 +127,51 @@ func main() { log.Debug("Starting to watch pods…") - wi, err := resourceInterface.Watch(rootCtx, metav1.ListOptions{ - LabelSelector: opt.labels, + // to use the retrywatcher, we need a start revision; setting this to empty or "0" + // is not supported, so we need a real revision; to achieve this we simply create + // a "standard" watcher, takes the first event and its resourceVersion as the + // starting point for the second, longlived retrying watcher + initialPods, resourceVersion, err := getStartPods(rootCtx, clientset, opt.labels) + if err != nil { + log.Fatalf("Failed to determine initial resourceVersion: %v", err) + } + + wi, err := watchtools.NewRetryWatcher(resourceVersion, &watchContextInjector{ + ctx: rootCtx, + ri: resourceInterface, }) if err != nil { log.Fatalf("Failed to create watch for pods: %v", err) } - w := watcher.NewWatcher(clientset, c, log, opt.namespaces, args, opt.containerNames, opt.live) + watcherOpts := watcher.Options{ + LabelSelector: labelSelector, + Namespaces: opt.namespaces, + ResourceNames: args, + ContainerNames: opt.containerNames, + RunningOnly: opt.live, + } + + w := watcher.NewWatcher(clientset, c, log, initialPods, watcherOpts) w.Watch(rootCtx, wi) } + +func getStartPods(ctx context.Context, cs *kubernetes.Clientset, labelSelector string) ([]corev1.Pod, string, error) { + pods, err := cs.CoreV1().Pods("").List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, "", fmt.Errorf("failed to perform list on Pods: %w", err) + } + + return pods.Items, pods.ResourceVersion, nil +} + +type watchContextInjector struct { + ctx context.Context + ri dynamic.ResourceInterface +} + +func (cw *watchContextInjector) Watch(options metav1.ListOptions) (watch.Interface, error) { + return cw.ri.Watch(cw.ctx, options) +} diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 5f47d02..8a9f282 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" @@ -21,27 +22,43 @@ type Watcher struct { clientset *kubernetes.Clientset log logrus.FieldLogger collector collector.Collector - namespaces []string - resourceNames []string - containerNames []string + initialPods []corev1.Pod + opt Options seenContainers sets.String - runningOnly bool } -func NewWatcher(clientset *kubernetes.Clientset, c collector.Collector, log logrus.FieldLogger, namespaces, resourceNames, containerNames []string, runningOnly bool) *Watcher { +type Options struct { + LabelSelector labels.Selector + Namespaces []string + ResourceNames []string + ContainerNames []string + RunningOnly bool +} + +func NewWatcher( + clientset *kubernetes.Clientset, + c collector.Collector, + log logrus.FieldLogger, + initialPods []corev1.Pod, + opt Options, +) *Watcher { return &Watcher{ clientset: clientset, log: log, collector: c, - namespaces: namespaces, - resourceNames: resourceNames, - containerNames: containerNames, + initialPods: initialPods, + opt: opt, seenContainers: sets.NewString(), - runningOnly: runningOnly, } } func (w *Watcher) Watch(ctx context.Context, wi watch.Interface) { + for i := range w.initialPods { + if w.podMatchesCriteria(&w.initialPods[i]) { + w.startLogCollectors(ctx, &w.initialPods[i]) + } + } + for event := range wi.ResultChan() { obj, ok := event.Object.(*unstructured.Unstructured) if !ok { @@ -92,7 +109,7 @@ func (w *Watcher) startLogCollectorsForContainers(ctx context.Context, pod *core } // container sttaus not what we want - if w.runningOnly { + if w.opt.RunningOnly { if status.State.Running == nil { containerLog.Debug("Container is not running.") continue @@ -147,11 +164,11 @@ func (w *Watcher) getPodLog(pod *corev1.Pod) logrus.FieldLogger { func (w *Watcher) podMatchesCriteria(pod *corev1.Pod) bool { podLog := w.getPodLog(pod) - return w.resourceNameMatches(podLog, pod) && w.resourceNamespaceMatches(podLog, pod) + return w.resourceNameMatches(podLog, pod) && w.resourceNamespaceMatches(podLog, pod) && w.resourceLabelsMatches(podLog, pod) } func (w *Watcher) resourceNameMatches(log logrus.FieldLogger, pod *corev1.Pod) bool { - if needleMatchesPatterns(pod.GetName(), w.resourceNames) { + if needleMatchesPatterns(pod.GetName(), w.opt.ResourceNames) { return true } @@ -161,7 +178,7 @@ func (w *Watcher) resourceNameMatches(log logrus.FieldLogger, pod *corev1.Pod) b } func (w *Watcher) resourceNamespaceMatches(log logrus.FieldLogger, pod *corev1.Pod) bool { - if needleMatchesPatterns(pod.GetNamespace(), w.namespaces) { + if needleMatchesPatterns(pod.GetNamespace(), w.opt.Namespaces) { return true } @@ -170,8 +187,18 @@ func (w *Watcher) resourceNamespaceMatches(log logrus.FieldLogger, pod *corev1.P return false } +func (w *Watcher) resourceLabelsMatches(log logrus.FieldLogger, pod *corev1.Pod) bool { + if w.opt.LabelSelector == nil || w.opt.LabelSelector.Matches(labels.Set(pod.Labels)) { + return true + } + + log.Debug("Pod labels do not match.") + + return false +} + func (w *Watcher) containerNameMatches(containerName string) bool { - return needleMatchesPatterns(containerName, w.containerNames) + return needleMatchesPatterns(containerName, w.opt.ContainerNames) } func nameMatches(name string, pattern string) bool {