Skip to content

Commit

Permalink
add --oneshot for when we do not want to follow the containers, just …
Browse files Browse the repository at this point in the history
…dump them once
  • Loading branch information
xrstf committed May 24, 2022
1 parent f5f53e3 commit fd1242b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 26 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Usage of loks:
-l, --labels string Label-selector as an alternative to specifying resource names
--live Only consider running pods, ignore completed/failed pods
-n, --namespace stringArray Kubernetes namespace to watch resources in (supports glob expression) (can be given multiple times)
--oneshot Dump logs, but do not tail the containers (i.e. exit after downloading the current state)
-o, --output string Directory where logs should be stored
-v, --verbose Enable more verbose output
```
Expand Down
18 changes: 12 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type options struct {
containerNames []string
labels string
live bool
oneShot bool
flatFiles bool
verbose bool
}
Expand All @@ -44,6 +45,7 @@ func main() {
pflag.StringVarP(&opt.directory, "output", "o", opt.directory, "Directory where logs should be stored")
pflag.BoolVarP(&opt.flatFiles, "flat", "f", opt.flatFiles, "Do not create directory per namespace, but put all logs in the same directory")
pflag.BoolVar(&opt.live, "live", opt.live, "Only consider running pods, ignore completed/failed pods")
pflag.BoolVar(&opt.oneShot, "oneshot", opt.oneShot, "Dump logs, but do not tail the containers (i.e. exit after downloading the current state)")
pflag.BoolVarP(&opt.verbose, "verbose", "v", opt.verbose, "Enable more verbose output")
pflag.Parse()

Expand Down Expand Up @@ -136,12 +138,15 @@ func main() {
log.Fatalf("Failed to determine initial resourceVersion: %v", err)
}

wi, err := watchtools.NewRetryWatcher(resourceVersion, &watchContextInjector{
ctx: rootCtx,
ri: resourceInterface,
})
if err != nil {
log.Fatalf("Failed to create watch for pods: %v", err)
var wi watch.Interface
if !opt.oneShot {
wi, err = watchtools.NewRetryWatcher(resourceVersion, &watchContextInjector{
ctx: rootCtx,
ri: resourceInterface,
})
if err != nil {
log.Fatalf("Failed to create watch for pods: %v", err)
}
}

watcherOpts := watcher.Options{
Expand All @@ -150,6 +155,7 @@ func main() {
ResourceNames: args,
ContainerNames: opt.containerNames,
RunningOnly: opt.live,
OneShot: opt.oneShot,
}

w := watcher.NewWatcher(clientset, c, log, initialPods, watcherOpts)
Expand Down
53 changes: 33 additions & 20 deletions pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"path/filepath"
"strings"
"sync"

"go.xrstf.de/loks/pkg/collector"

Expand Down Expand Up @@ -33,6 +34,7 @@ type Options struct {
ResourceNames []string
ContainerNames []string
RunningOnly bool
OneShot bool
}

func NewWatcher(
Expand All @@ -53,36 +55,44 @@ func NewWatcher(
}

func (w *Watcher) Watch(ctx context.Context, wi watch.Interface) {
wg := sync.WaitGroup{}

for i := range w.initialPods {
if w.podMatchesCriteria(&w.initialPods[i]) {
w.startLogCollectors(ctx, &w.initialPods[i])
w.startLogCollectors(ctx, &wg, &w.initialPods[i])
}
}

for event := range wi.ResultChan() {
obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
continue
}
// wi can be nil if we do not want to actually watch, but instead
// just process the initial pods (if --oneshot is given)
if wi != nil {
for event := range wi.ResultChan() {
obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
continue
}

pod := &corev1.Pod{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pod)
if err != nil {
continue
}
pod := &corev1.Pod{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pod)
if err != nil {
continue
}

if w.podMatchesCriteria(pod) {
w.startLogCollectors(ctx, pod)
if w.podMatchesCriteria(pod) {
w.startLogCollectors(ctx, &wg, pod)
}
}
}

wg.Wait()
}

func (w *Watcher) startLogCollectors(ctx context.Context, pod *corev1.Pod) {
w.startLogCollectorsForContainers(ctx, pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses)
w.startLogCollectorsForContainers(ctx, pod, pod.Spec.Containers, pod.Status.ContainerStatuses)
func (w *Watcher) startLogCollectors(ctx context.Context, wg *sync.WaitGroup, pod *corev1.Pod) {
w.startLogCollectorsForContainers(ctx, wg, pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses)
w.startLogCollectorsForContainers(ctx, wg, pod, pod.Spec.Containers, pod.Status.ContainerStatuses)
}

func (w *Watcher) startLogCollectorsForContainers(ctx context.Context, pod *corev1.Pod, containers []corev1.Container, statuses []corev1.ContainerStatus) {
func (w *Watcher) startLogCollectorsForContainers(ctx context.Context, wg *sync.WaitGroup, pod *corev1.Pod, containers []corev1.Container, statuses []corev1.ContainerStatus) {
podLog := w.getPodLog(pod)

for _, container := range containers {
Expand Down Expand Up @@ -131,16 +141,19 @@ func (w *Watcher) startLogCollectorsForContainers(ctx context.Context, pod *core
// remember that we have seen this incarnation
w.seenContainers.Insert(ident)

go w.collectLogs(ctx, containerLog, pod, containerName, int(status.RestartCount))
wg.Add(1)
go w.collectLogs(ctx, wg, containerLog, pod, containerName, int(status.RestartCount))
}
}

func (w *Watcher) collectLogs(ctx context.Context, log logrus.FieldLogger, pod *corev1.Pod, containerName string, restartCount int) {
func (w *Watcher) collectLogs(ctx context.Context, wg *sync.WaitGroup, log logrus.FieldLogger, pod *corev1.Pod, containerName string, restartCount int) {
defer wg.Done()

log.Info("Starting to collect logs…")

request := w.clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{
Container: containerName,
Follow: true,
Follow: !w.opt.OneShot,
})

stream, err := request.Stream(ctx)
Expand Down

0 comments on commit fd1242b

Please sign in to comment.