Skip to content

Commit

Permalink
allow to dump pod events (useful to catch things like ImagePullBackOffs)
Browse files Browse the repository at this point in the history
  • Loading branch information
xrstf committed Apr 26, 2023
1 parent a7d1ea6 commit 7185fe2
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 21 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ go install go.xrstf.de/protokol
```
Usage of protokol:
-c, --container stringArray Container names to store logs for (supports glob expression) (can be given multiple times)
--events Dump events for each matching Pod as a human readable log file (note: label selectors are not respected)
--events-raw Dump events for each matching Pod as YAML (note: label selectors are not respected)
-f, --flat Do not create directory per namespace, but put all logs in the same directory
--kubeconfig string kubeconfig file to use (uses $KUBECONFIG by default)
-l, --labels string Label-selector as an alternative to specifying resource names
Expand All @@ -31,6 +33,7 @@ Usage of protokol:
-n, --namespace stringArray Kubernetes namespace to watch resources in (supports glob expression) (can be given multiple times)
--oneshot Dump logs, but do not tail the containers (i.e. exit after downloading the current state)
-o, --output string Directory where logs should be stored
--prefix string Prefix pattern to put at the beginning of each streamed line (pn = Pod name, pN = Pod namespace, c = container name) (default "[%pN/%pn:%c] >>")
--stream Do not just dump logs to disk, but also stream them to stdout
-v, --verbose Enable more verbose output
```
Expand Down
61 changes: 52 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type options struct {
oneShot bool
flatFiles bool
dumpMetadata bool
dumpEvents bool
dumpRawEvents bool
verbose bool
}

Expand All @@ -55,6 +57,8 @@ func main() {
pflag.StringVar(&opt.streamPrefix, "prefix", opt.streamPrefix, "Prefix pattern to put at the beginning of each streamed line (pn = Pod name, pN = Pod namespace, c = container name)")
pflag.BoolVar(&opt.oneShot, "oneshot", opt.oneShot, "Dump logs, but do not tail the containers (i.e. exit after downloading the current state)")
pflag.BoolVar(&opt.dumpMetadata, "metadata", opt.dumpMetadata, "Dump Pods additionally as YAML (note that this can include secrets in environment variables)")
pflag.BoolVar(&opt.dumpEvents, "events", opt.dumpEvents, "Dump events for each matching Pod as a human readable log file (note: label selectors are not respected)")
pflag.BoolVar(&opt.dumpRawEvents, "events-raw", opt.dumpRawEvents, "Dump events for each matching Pod as YAML (note: label selectors are not respected)")
pflag.BoolVarP(&opt.verbose, "verbose", "v", opt.verbose, "Enable more verbose output")
pflag.Parse()

Expand Down Expand Up @@ -103,7 +107,7 @@ func main() {

log.WithField("directory", opt.directory).Info("Storing logs on disk.")

coll, err := collector.NewDiskCollector(opt.directory, opt.flatFiles)
coll, err := collector.NewDiskCollector(opt.directory, opt.flatFiles, opt.dumpEvents, opt.dumpRawEvents)
if err != nil {
log.Fatalf("Failed to create log collector: %v", err)
}
Expand Down Expand Up @@ -141,14 +145,23 @@ func main() {
}

// //////////////////////////////////////
// start to watch pods
// start to watch pods & potentially events

resourceInterface := dynamicClient.Resource(schema.GroupVersionResource{
podResourceInterface := dynamicClient.Resource(schema.GroupVersionResource{
Version: "v1",
Resource: "pods",
})

log.Debug("Starting to watch pods…")
eventResourceInterface := dynamicClient.Resource(schema.GroupVersionResource{
Version: "v1",
Resource: "events",
})

if opt.dumpEvents || opt.dumpRawEvents {
log.Debug("Starting to watch pods & events…")
} else {
log.Debug("Starting to watch pods…")
}

// to use the retrywatcher, we need a start revision; setting this to empty or "0"
// is not supported, so we need a real revision; to achieve this we simply create
Expand All @@ -159,15 +172,35 @@ func main() {
log.Fatalf("Failed to determine initial resourceVersion: %v", err)
}

var wi watch.Interface
var initialEvents []corev1.Event
if opt.dumpEvents || opt.dumpRawEvents {
initialEvents, err = getStartEvents(rootCtx, clientset, opt.labels)
if err != nil {
log.Fatalf("Failed to retrieve initial events: %v", err)
}
}

var (
podWatcher watch.Interface
eventWatcher watch.Interface
)

if !opt.oneShot {
wi, err = watchtools.NewRetryWatcher(resourceVersion, &watchContextInjector{
podWatcher, err = watchtools.NewRetryWatcher(resourceVersion, &watchContextInjector{
ctx: rootCtx,
ri: resourceInterface,
ri: podResourceInterface,
})
if err != nil {
log.Fatalf("Failed to create watch for pods: %v", err)
}

eventWatcher, err = watchtools.NewRetryWatcher(resourceVersion, &watchContextInjector{
ctx: rootCtx,
ri: eventResourceInterface,
})
if err != nil {
log.Fatalf("Failed to create watch for events: %v", err)
}
}

watcherOpts := watcher.Options{
Expand All @@ -178,10 +211,11 @@ func main() {
RunningOnly: opt.live,
OneShot: opt.oneShot,
DumpMetadata: opt.dumpMetadata,
DumpEvents: opt.dumpEvents || opt.dumpRawEvents,
}

w := watcher.NewWatcher(clientset, coll, log, initialPods, watcherOpts)
w.Watch(rootCtx, wi)
w := watcher.NewWatcher(clientset, coll, log, initialPods, initialEvents, watcherOpts)
w.Watch(rootCtx, podWatcher, eventWatcher)
}

func getStartPods(ctx context.Context, cs *kubernetes.Clientset, labelSelector string) ([]corev1.Pod, string, error) {
Expand All @@ -195,6 +229,15 @@ func getStartPods(ctx context.Context, cs *kubernetes.Clientset, labelSelector s
return pods.Items, pods.ResourceVersion, nil
}

func getStartEvents(ctx context.Context, cs *kubernetes.Clientset, labelSelector string) ([]corev1.Event, error) {
events, err := cs.CoreV1().Events("").List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to perform list on Events: %w", err)
}

return events.Items, nil
}

type watchContextInjector struct {
ctx context.Context
ri dynamic.ResourceInterface
Expand Down
98 changes: 89 additions & 9 deletions pkg/collector/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,28 @@ package collector

import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/yaml"
)

type diskCollector struct {
directory string
flatFiles bool
directory string
flatFiles bool
eventsAsText bool
rawEvents bool
}

var _ Collector = &diskCollector{}

func NewDiskCollector(directory string, flatFiles bool) (Collector, error) {
func NewDiskCollector(directory string, flatFiles bool, eventsAsText bool, rawEvents bool) (Collector, error) {
err := os.MkdirAll(directory, 0755)
if err != nil {
return nil, fmt.Errorf("failed to create directory %q: %w", directory, err)
Expand All @@ -31,16 +35,18 @@ func NewDiskCollector(directory string, flatFiles bool) (Collector, error) {
}

return &diskCollector{
directory: abs,
flatFiles: flatFiles,
directory: abs,
flatFiles: flatFiles,
eventsAsText: eventsAsText,
rawEvents: rawEvents,
}, nil
}

func (c *diskCollector) getDirectory(pod *corev1.Pod) (string, error) {
func (c *diskCollector) getDirectory(namespace string) (string, error) {
directory := c.directory

if !c.flatFiles {
directory = filepath.Join(c.directory, pod.Namespace)
directory = filepath.Join(c.directory, namespace)
}

if err := os.MkdirAll(directory, 0755); err != nil {
Expand All @@ -51,7 +57,7 @@ func (c *diskCollector) getDirectory(pod *corev1.Pod) (string, error) {
}

func (c *diskCollector) CollectPodMetadata(ctx context.Context, pod *corev1.Pod) error {
directory, err := c.getDirectory(pod)
directory, err := c.getDirectory(pod.Namespace)
if err != nil {
return err
}
Expand All @@ -74,8 +80,82 @@ func (c *diskCollector) CollectPodMetadata(ctx context.Context, pod *corev1.Pod)
return os.WriteFile(filename, encoded, 0644)
}

func (c *diskCollector) CollectEvent(ctx context.Context, event *corev1.Event) error {
if !c.eventsAsText && !c.rawEvents {
return errors.New("event dumping is not enabled")
}

directory, err := c.getDirectory(event.InvolvedObject.Namespace)
if err != nil {
return err
}

if c.eventsAsText {
if err := c.dumpEventAsText(directory, event); err != nil {
return err
}
}

if c.rawEvents {
if err := c.dumpEventAsYAML(directory, event); err != nil {
return err
}
}

return nil
}

func (c *diskCollector) dumpEventAsText(directory string, event *corev1.Event) error {
filename := filepath.Join(directory, fmt.Sprintf("%s.events.log", event.InvolvedObject.Name))

stringified := fmt.Sprintf("%s: [%s]", event.LastTimestamp.Format(time.RFC1123), event.Type)
if event.Source.Component != "" {
stringified = fmt.Sprintf("%s [%s]", stringified, event.Source.Component)
}
stringified = fmt.Sprintf("%s %s (reason: %s) (%dx)\n", stringified, event.Message, event.Reason, event.Count)

f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return err
}

_, err = f.WriteString(stringified)
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}

return err
}

func (c *diskCollector) dumpEventAsYAML(directory string, event *corev1.Event) error {
filename := filepath.Join(directory, fmt.Sprintf("%s.events.yaml", event.InvolvedObject.Name))

trimmedEvent := event.DeepCopy()
trimmedEvent.ManagedFields = nil

encoded, err := yaml.Marshal(trimmedEvent)
if err != nil {
return err
}

encoded = append([]byte("---\n"), encoded...)
encoded = append(encoded, []byte("\n")...)

f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return err
}

_, err = f.Write(encoded)
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}

return err
}

func (c *diskCollector) CollectLogs(ctx context.Context, log logrus.FieldLogger, pod *corev1.Pod, containerName string, stream io.Reader) error {
directory, err := c.getDirectory(pod)
directory, err := c.getDirectory(pod.Namespace)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/collector/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ import (

type Collector interface {
CollectPodMetadata(ctx context.Context, pod *corev1.Pod) error
CollectEvent(ctx context.Context, event *corev1.Event) error
CollectLogs(ctx context.Context, log logrus.FieldLogger, pod *corev1.Pod, containerName string, stream io.Reader) error
}
8 changes: 8 additions & 0 deletions pkg/collector/multiplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ func NewMultiplexCollector(a, b Collector) (Collector, error) {
}, nil
}

func (c *multiplexCollector) CollectEvent(ctx context.Context, event *corev1.Event) error {
if err := c.a.CollectEvent(ctx, event); err != nil {
return err
}

return c.b.CollectEvent(ctx, event)
}

func (c *multiplexCollector) CollectPodMetadata(ctx context.Context, pod *corev1.Pod) error {
if err := c.a.CollectPodMetadata(ctx, pod); err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions pkg/collector/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func NewStreamCollector(prefixFormat string) (Collector, error) {
}, nil
}

func (c *streamCollector) CollectEvent(ctx context.Context, event *corev1.Event) error {
return nil
}

func (c *streamCollector) CollectPodMetadata(ctx context.Context, pod *corev1.Pod) error {
return nil
}
Expand Down
Loading

0 comments on commit 7185fe2

Please sign in to comment.