From 7185fe270baf1fb22dcb15ab6eae1261851026c0 Mon Sep 17 00:00:00 2001 From: xrstf Date: Wed, 26 Apr 2023 14:42:51 +0200 Subject: [PATCH] allow to dump pod events (useful to catch things like ImagePullBackOffs) --- README.md | 3 ++ main.go | 61 ++++++++++++++++++++---- pkg/collector/disk.go | 98 ++++++++++++++++++++++++++++++++++---- pkg/collector/interface.go | 1 + pkg/collector/multiplex.go | 8 ++++ pkg/collector/stream.go | 4 ++ pkg/watcher/watcher.go | 76 +++++++++++++++++++++++++++-- 7 files changed, 230 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 64ceaf0..5d99666 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,8 @@ go install go.xrstf.de/protokol ``` Usage of protokol: -c, --container stringArray Container names to store logs for (supports glob expression) (can be given multiple times) + --events Dump events for each matching Pod as a human readable log file (note: label selectors are not respected) + --events-raw Dump events for each matching Pod as YAML (note: label selectors are not respected) -f, --flat Do not create directory per namespace, but put all logs in the same directory --kubeconfig string kubeconfig file to use (uses $KUBECONFIG by default) -l, --labels string Label-selector as an alternative to specifying resource names @@ -31,6 +33,7 @@ Usage of protokol: -n, --namespace stringArray Kubernetes namespace to watch resources in (supports glob expression) (can be given multiple times) --oneshot Dump logs, but do not tail the containers (i.e. exit after downloading the current state) -o, --output string Directory where logs should be stored + --prefix string Prefix pattern to put at the beginning of each streamed line (pn = Pod name, pN = Pod namespace, c = container name) (default "[%pN/%pn:%c] >>") --stream Do not just dump logs to disk, but also stream them to stdout -v, --verbose Enable more verbose output ``` diff --git a/main.go b/main.go index 241104d..8a93484 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,8 @@ type options struct { oneShot bool flatFiles bool dumpMetadata bool + dumpEvents bool + dumpRawEvents bool verbose bool } @@ -55,6 +57,8 @@ func main() { pflag.StringVar(&opt.streamPrefix, "prefix", opt.streamPrefix, "Prefix pattern to put at the beginning of each streamed line (pn = Pod name, pN = Pod namespace, c = container name)") pflag.BoolVar(&opt.oneShot, "oneshot", opt.oneShot, "Dump logs, but do not tail the containers (i.e. exit after downloading the current state)") pflag.BoolVar(&opt.dumpMetadata, "metadata", opt.dumpMetadata, "Dump Pods additionally as YAML (note that this can include secrets in environment variables)") + pflag.BoolVar(&opt.dumpEvents, "events", opt.dumpEvents, "Dump events for each matching Pod as a human readable log file (note: label selectors are not respected)") + pflag.BoolVar(&opt.dumpRawEvents, "events-raw", opt.dumpRawEvents, "Dump events for each matching Pod as YAML (note: label selectors are not respected)") pflag.BoolVarP(&opt.verbose, "verbose", "v", opt.verbose, "Enable more verbose output") pflag.Parse() @@ -103,7 +107,7 @@ func main() { log.WithField("directory", opt.directory).Info("Storing logs on disk.") - coll, err := collector.NewDiskCollector(opt.directory, opt.flatFiles) + coll, err := collector.NewDiskCollector(opt.directory, opt.flatFiles, opt.dumpEvents, opt.dumpRawEvents) if err != nil { log.Fatalf("Failed to create log collector: %v", err) } @@ -141,14 +145,23 @@ func main() { } // ////////////////////////////////////// - // start to watch pods + // start to watch pods & potentially events - resourceInterface := dynamicClient.Resource(schema.GroupVersionResource{ + podResourceInterface := dynamicClient.Resource(schema.GroupVersionResource{ Version: "v1", Resource: "pods", }) - log.Debug("Starting to watch pods…") + eventResourceInterface := dynamicClient.Resource(schema.GroupVersionResource{ + Version: "v1", + Resource: "events", + }) + + if opt.dumpEvents || opt.dumpRawEvents { + log.Debug("Starting to watch pods & events…") + } else { + log.Debug("Starting to watch pods…") + } // to use the retrywatcher, we need a start revision; setting this to empty or "0" // is not supported, so we need a real revision; to achieve this we simply create @@ -159,15 +172,35 @@ func main() { log.Fatalf("Failed to determine initial resourceVersion: %v", err) } - var wi watch.Interface + var initialEvents []corev1.Event + if opt.dumpEvents || opt.dumpRawEvents { + initialEvents, err = getStartEvents(rootCtx, clientset, opt.labels) + if err != nil { + log.Fatalf("Failed to retrieve initial events: %v", err) + } + } + + var ( + podWatcher watch.Interface + eventWatcher watch.Interface + ) + if !opt.oneShot { - wi, err = watchtools.NewRetryWatcher(resourceVersion, &watchContextInjector{ + podWatcher, err = watchtools.NewRetryWatcher(resourceVersion, &watchContextInjector{ ctx: rootCtx, - ri: resourceInterface, + ri: podResourceInterface, }) if err != nil { log.Fatalf("Failed to create watch for pods: %v", err) } + + eventWatcher, err = watchtools.NewRetryWatcher(resourceVersion, &watchContextInjector{ + ctx: rootCtx, + ri: eventResourceInterface, + }) + if err != nil { + log.Fatalf("Failed to create watch for events: %v", err) + } } watcherOpts := watcher.Options{ @@ -178,10 +211,11 @@ func main() { RunningOnly: opt.live, OneShot: opt.oneShot, DumpMetadata: opt.dumpMetadata, + DumpEvents: opt.dumpEvents || opt.dumpRawEvents, } - w := watcher.NewWatcher(clientset, coll, log, initialPods, watcherOpts) - w.Watch(rootCtx, wi) + w := watcher.NewWatcher(clientset, coll, log, initialPods, initialEvents, watcherOpts) + w.Watch(rootCtx, podWatcher, eventWatcher) } func getStartPods(ctx context.Context, cs *kubernetes.Clientset, labelSelector string) ([]corev1.Pod, string, error) { @@ -195,6 +229,15 @@ func getStartPods(ctx context.Context, cs *kubernetes.Clientset, labelSelector s return pods.Items, pods.ResourceVersion, nil } +func getStartEvents(ctx context.Context, cs *kubernetes.Clientset, labelSelector string) ([]corev1.Event, error) { + events, err := cs.CoreV1().Events("").List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to perform list on Events: %w", err) + } + + return events.Items, nil +} + type watchContextInjector struct { ctx context.Context ri dynamic.ResourceInterface diff --git a/pkg/collector/disk.go b/pkg/collector/disk.go index 0ce5ab8..7ec5276 100644 --- a/pkg/collector/disk.go +++ b/pkg/collector/disk.go @@ -2,10 +2,12 @@ package collector import ( "context" + "errors" "fmt" "io" "os" "path/filepath" + "time" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" @@ -13,13 +15,15 @@ import ( ) type diskCollector struct { - directory string - flatFiles bool + directory string + flatFiles bool + eventsAsText bool + rawEvents bool } var _ Collector = &diskCollector{} -func NewDiskCollector(directory string, flatFiles bool) (Collector, error) { +func NewDiskCollector(directory string, flatFiles bool, eventsAsText bool, rawEvents bool) (Collector, error) { err := os.MkdirAll(directory, 0755) if err != nil { return nil, fmt.Errorf("failed to create directory %q: %w", directory, err) @@ -31,16 +35,18 @@ func NewDiskCollector(directory string, flatFiles bool) (Collector, error) { } return &diskCollector{ - directory: abs, - flatFiles: flatFiles, + directory: abs, + flatFiles: flatFiles, + eventsAsText: eventsAsText, + rawEvents: rawEvents, }, nil } -func (c *diskCollector) getDirectory(pod *corev1.Pod) (string, error) { +func (c *diskCollector) getDirectory(namespace string) (string, error) { directory := c.directory if !c.flatFiles { - directory = filepath.Join(c.directory, pod.Namespace) + directory = filepath.Join(c.directory, namespace) } if err := os.MkdirAll(directory, 0755); err != nil { @@ -51,7 +57,7 @@ func (c *diskCollector) getDirectory(pod *corev1.Pod) (string, error) { } func (c *diskCollector) CollectPodMetadata(ctx context.Context, pod *corev1.Pod) error { - directory, err := c.getDirectory(pod) + directory, err := c.getDirectory(pod.Namespace) if err != nil { return err } @@ -74,8 +80,82 @@ func (c *diskCollector) CollectPodMetadata(ctx context.Context, pod *corev1.Pod) return os.WriteFile(filename, encoded, 0644) } +func (c *diskCollector) CollectEvent(ctx context.Context, event *corev1.Event) error { + if !c.eventsAsText && !c.rawEvents { + return errors.New("event dumping is not enabled") + } + + directory, err := c.getDirectory(event.InvolvedObject.Namespace) + if err != nil { + return err + } + + if c.eventsAsText { + if err := c.dumpEventAsText(directory, event); err != nil { + return err + } + } + + if c.rawEvents { + if err := c.dumpEventAsYAML(directory, event); err != nil { + return err + } + } + + return nil +} + +func (c *diskCollector) dumpEventAsText(directory string, event *corev1.Event) error { + filename := filepath.Join(directory, fmt.Sprintf("%s.events.log", event.InvolvedObject.Name)) + + stringified := fmt.Sprintf("%s: [%s]", event.LastTimestamp.Format(time.RFC1123), event.Type) + if event.Source.Component != "" { + stringified = fmt.Sprintf("%s [%s]", stringified, event.Source.Component) + } + stringified = fmt.Sprintf("%s %s (reason: %s) (%dx)\n", stringified, event.Message, event.Reason, event.Count) + + f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return err + } + + _, err = f.WriteString(stringified) + if err1 := f.Close(); err1 != nil && err == nil { + err = err1 + } + + return err +} + +func (c *diskCollector) dumpEventAsYAML(directory string, event *corev1.Event) error { + filename := filepath.Join(directory, fmt.Sprintf("%s.events.yaml", event.InvolvedObject.Name)) + + trimmedEvent := event.DeepCopy() + trimmedEvent.ManagedFields = nil + + encoded, err := yaml.Marshal(trimmedEvent) + if err != nil { + return err + } + + encoded = append([]byte("---\n"), encoded...) + encoded = append(encoded, []byte("\n")...) + + f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return err + } + + _, err = f.Write(encoded) + if err1 := f.Close(); err1 != nil && err == nil { + err = err1 + } + + return err +} + func (c *diskCollector) CollectLogs(ctx context.Context, log logrus.FieldLogger, pod *corev1.Pod, containerName string, stream io.Reader) error { - directory, err := c.getDirectory(pod) + directory, err := c.getDirectory(pod.Namespace) if err != nil { return err } diff --git a/pkg/collector/interface.go b/pkg/collector/interface.go index 382c24b..7d07181 100644 --- a/pkg/collector/interface.go +++ b/pkg/collector/interface.go @@ -10,5 +10,6 @@ import ( type Collector interface { CollectPodMetadata(ctx context.Context, pod *corev1.Pod) error + CollectEvent(ctx context.Context, event *corev1.Event) error CollectLogs(ctx context.Context, log logrus.FieldLogger, pod *corev1.Pod, containerName string, stream io.Reader) error } diff --git a/pkg/collector/multiplex.go b/pkg/collector/multiplex.go index 4cfb6ac..e3b6587 100644 --- a/pkg/collector/multiplex.go +++ b/pkg/collector/multiplex.go @@ -23,6 +23,14 @@ func NewMultiplexCollector(a, b Collector) (Collector, error) { }, nil } +func (c *multiplexCollector) CollectEvent(ctx context.Context, event *corev1.Event) error { + if err := c.a.CollectEvent(ctx, event); err != nil { + return err + } + + return c.b.CollectEvent(ctx, event) +} + func (c *multiplexCollector) CollectPodMetadata(ctx context.Context, pod *corev1.Pod) error { if err := c.a.CollectPodMetadata(ctx, pod); err != nil { return err diff --git a/pkg/collector/stream.go b/pkg/collector/stream.go index c0d8744..b16666e 100644 --- a/pkg/collector/stream.go +++ b/pkg/collector/stream.go @@ -26,6 +26,10 @@ func NewStreamCollector(prefixFormat string) (Collector, error) { }, nil } +func (c *streamCollector) CollectEvent(ctx context.Context, event *corev1.Event) error { + return nil +} + func (c *streamCollector) CollectPodMetadata(ctx context.Context, pod *corev1.Pod) error { return nil } diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 63639b7..5d255d8 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -24,6 +24,7 @@ type Watcher struct { log logrus.FieldLogger collector collector.Collector initialPods []corev1.Pod + initialEvents []corev1.Event opt Options seenContainers sets.String } @@ -36,6 +37,7 @@ type Options struct { RunningOnly bool OneShot bool DumpMetadata bool + DumpEvents bool } func NewWatcher( @@ -43,6 +45,7 @@ func NewWatcher( c collector.Collector, log logrus.FieldLogger, initialPods []corev1.Pod, + initialEvents []corev1.Event, opt Options, ) *Watcher { return &Watcher{ @@ -50,12 +53,13 @@ func NewWatcher( log: log, collector: c, initialPods: initialPods, + initialEvents: initialEvents, opt: opt, seenContainers: sets.NewString(), } } -func (w *Watcher) Watch(ctx context.Context, wi watch.Interface) { +func (w *Watcher) Watch(ctx context.Context, podWatcher watch.Interface, eventWatcher watch.Interface) { wg := sync.WaitGroup{} for i := range w.initialPods { @@ -64,10 +68,42 @@ func (w *Watcher) Watch(ctx context.Context, wi watch.Interface) { } } + for i := range w.initialEvents { + if w.eventMatchesCriteria(&w.initialEvents[i]) { + w.dumpEvent(ctx, &w.initialEvents[i]) + } + } + + // eventWatcher is nil if neither --events not --raw-events was not specified. + if eventWatcher != nil { + wg.Add(1) + + go func() { + for event := range eventWatcher.ResultChan() { + unstructuredObj, ok := event.Object.(*unstructured.Unstructured) + if !ok { + continue + } + + k8sEvent := &corev1.Event{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), k8sEvent) + if err != nil { + continue + } + + if w.eventMatchesCriteria(k8sEvent) { + w.dumpEvent(ctx, k8sEvent) + } + } + + wg.Done() + }() + } + // wi can be nil if we do not want to actually watch, but instead // just process the initial pods (if --oneshot is given) - if wi != nil { - for event := range wi.ResultChan() { + if podWatcher != nil { + for event := range podWatcher.ResultChan() { obj, ok := event.Object.(*unstructured.Unstructured) if !ok { continue @@ -94,6 +130,16 @@ func (w *Watcher) startLogCollectors(ctx context.Context, wg *sync.WaitGroup, po w.startLogCollectorsForContainers(ctx, wg, pod, pod.Spec.Containers, pod.Status.ContainerStatuses) } +func (w *Watcher) dumpEvent(ctx context.Context, event *corev1.Event) { + if !w.opt.DumpEvents { + return + } + + if err := w.collector.CollectEvent(ctx, event); err != nil { + w.getEventLog(event.InvolvedObject).WithError(err).Error("Failed to collect event.") + } +} + func (w *Watcher) dumpPodMetadata(ctx context.Context, pod *corev1.Pod) { if !w.opt.DumpMetadata { return @@ -192,6 +238,30 @@ func (w *Watcher) podMatchesCriteria(pod *corev1.Pod) bool { return w.resourceNameMatches(podLog, pod) && w.resourceNamespaceMatches(podLog, pod) && w.resourceLabelsMatches(podLog, pod) } +func (w *Watcher) getEventLog(obj corev1.ObjectReference) logrus.FieldLogger { + return w.log.WithField("pod", obj.Name).WithField("namespace", obj.Namespace) +} + +func (w *Watcher) eventMatchesCriteria(event *corev1.Event) bool { + obj := event.InvolvedObject + + if obj.Kind != "Pod" || obj.APIVersion != "v1" { + w.log.Debug("Involved object is not a Pod.") + return false + } + + eventLog := w.getEventLog(obj) + + dummyPod := &corev1.Pod{} + dummyPod.Name = obj.Name + dummyPod.Namespace = obj.Namespace + + // Without fetching the object and hoping it still exists, we cannot compare the labels, so for + // events we simply ignore the label selector :grim: + + return w.resourceNameMatches(eventLog, dummyPod) && w.resourceNamespaceMatches(eventLog, dummyPod) +} + func (w *Watcher) resourceNameMatches(log logrus.FieldLogger, pod *corev1.Pod) bool { if needleMatchesPatterns(pod.GetName(), w.opt.ResourceNames) { return true