Skip to content

Commit

Permalink
Merge pull request #2692 from networkservicemesh/fix-interdomain-logs
Browse files Browse the repository at this point in the history
qfix: properly handle interdomain logging
  • Loading branch information
edwarnicke authored Apr 24, 2023
2 parents 0900877 + 5804754 commit 8386f28
Showing 1 changed file with 60 additions and 42 deletions.
102 changes: 60 additions & 42 deletions extensions/logs/logs.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// Copyright (c) 2021-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -44,28 +44,28 @@ import (
const (
defaultQPS = 5 // this is default value for QPS of kubeconfig. See at documentation.
fromAllNamespaces = ""
kubeconfigEnv = "KUBECONFIG"
)

var (
once sync.Once
config Config
jobsCh chan func()
ctx context.Context
kubeClient kubernetes.Interface
matchRegex *regexp.Regexp
once sync.Once
config Config
jobsCh chan func()
ctx context.Context
kubeClients []kubernetes.Interface
kubeConfigs []string
matchRegex *regexp.Regexp
)

// Config is env config to setup log collecting.
type Config struct {
KubeConfig string `default:"" desc:".kube config file path" envconfig:"KUBECONFIG"`
ArtifactsDir string `default:"logs" desc:"Directory for storing container logs" envconfig:"ARTIFACTS_DIR"`
Timeout time.Duration `default:"5s" desc:"Context timeout for kubernetes queries" split_words:"true"`
WorkerCount int `default:"8" desc:"Number of log collector workers" split_words:"true"`
MaxKubeConfigs int `default:"3" desc:"Number of used kubeconfigs" split_words:"true"`
AllowedNamespaces string `default:"(ns-.*)|(nsm-system)|(spire)|(observability)" desc:"Regex of allowed namespaces" split_words:"true"`
}

func savePodLogs(ctx context.Context, pod *corev1.Pod, opts *corev1.PodLogOptions, fromInitContainers bool, dir string) {
func savePodLogs(ctx context.Context, kubeClient kubernetes.Interface, pod *corev1.Pod, opts *corev1.PodLogOptions, fromInitContainers bool, dir string) {
containers := pod.Spec.Containers
if fromInitContainers {
containers = pod.Spec.InitContainers
Expand Down Expand Up @@ -104,7 +104,7 @@ func savePodLogs(ctx context.Context, pod *corev1.Pod, opts *corev1.PodLogOption
}
}

func captureLogs(from time.Time, dir string) {
func captureLogs(kubeClient kubernetes.Interface, from time.Time, dir string) {
operationCtx, cancel := context.WithTimeout(ctx, config.Timeout)
defer cancel()
resp, err := kubeClient.CoreV1().Pods(fromAllNamespaces).List(operationCtx, metav1.ListOptions{})
Expand All @@ -123,8 +123,8 @@ func captureLogs(from time.Time, dir string) {
opts := &corev1.PodLogOptions{
SinceTime: &metav1.Time{Time: from},
}
savePodLogs(operationCtx, pod, opts, false, dir)
savePodLogs(operationCtx, pod, opts, true, dir)
savePodLogs(operationCtx, kubeClient, pod, opts, false, dir)
savePodLogs(operationCtx, kubeClient, pod, opts, true, dir)

wg.Done()
}
Expand Down Expand Up @@ -153,21 +153,36 @@ func initialize() {

jobsCh = make(chan func(), config.WorkerCount)

if config.KubeConfig == "" {
config.KubeConfig = filepath.Join(os.Getenv("HOME"), ".kube", "config")
var kubeConfig = os.Getenv("KUBECONFIG")

if kubeConfig == "" {
kubeConfig = filepath.Join(os.Getenv("HOME"), ".kube", "config")
}

kubeconfig, err := clientcmd.BuildConfigFromFlags("", config.KubeConfig)
if err != nil {
logrus.Fatal(err.Error())
kubeConfigs = []string{kubeConfig}

for i := 1; i <= config.MaxKubeConfigs; i++ {
kubeConfig = os.Getenv("KUBECONFIG" + fmt.Sprint(i))
if kubeConfig != "" {
kubeConfigs = append(kubeConfigs, kubeConfig)
}
}

kubeconfig.QPS = float32(config.WorkerCount) * defaultQPS
kubeconfig.Burst = int(kubeconfig.QPS) * 2
for _, cfg := range kubeConfigs {
kubeconfig, err := clientcmd.BuildConfigFromFlags("", cfg)
if err != nil {
logrus.Fatal(err.Error())
}

kubeClient, err = kubernetes.NewForConfig(kubeconfig)
if err != nil {
logrus.Fatal(err.Error())
kubeconfig.QPS = float32(config.WorkerCount) * defaultQPS
kubeconfig.Burst = int(kubeconfig.QPS) * 2

kubeClient, err := kubernetes.NewForConfig(kubeconfig)
if err != nil {
logrus.Fatal(err.Error())
}

kubeClients = append(kubeClients, kubeClient)
}

var cancel context.CancelFunc
Expand All @@ -194,7 +209,7 @@ func initialize() {
}()
}

func capture(name string) context.CancelFunc {
func capture(kubeClient kubernetes.Interface, name string) context.CancelFunc {
once.Do(initialize)

now := time.Now()
Expand All @@ -203,11 +218,12 @@ func capture(name string) context.CancelFunc {
_ = os.MkdirAll(dir, os.ModePerm)

return func() {
captureLogs(now, dir)
captureLogs(kubeClient, now, dir)
}
}

func describePods(name string) {
// TODO: do not use bash runner to get describe info. Use kubernetes API instead.
func describePods(kubeClient kubernetes.Interface, kubeConfig, name string) {
getCtx, cancel := context.WithTimeout(ctx, config.Timeout)
defer cancel()

Expand All @@ -222,7 +238,8 @@ func describePods(name string) {
}

for _, ns := range filterNamespaces(nsList) {
_, _, exitCode, err := runner.Run("kubectl describe pods -n " + ns + ">" + filepath.Join(config.ArtifactsDir, name, "describe-"+ns+".log"))
p := filepath.Join(config.ArtifactsDir, name, "describe-"+ns+".log")
_, _, exitCode, err := runner.Run(fmt.Sprintf("kubectl --kubeconfig %v describe pods -n %v > %v", kubeConfig, ns, p))
if exitCode != 0 || err != nil {
logrus.Errorf("An error while retrieving describe for namespace: %v", ns)
}
Expand All @@ -243,23 +260,24 @@ func filterNamespaces(nsList *corev1.NamespaceList) []string {

// Capture returns a function that saves logs since Capture function has been called.
func Capture(name string) context.CancelFunc {
c := capture(name)

return func() {
describePods(name)
var pushArtifacts = func() {}

kubeconfigValue := os.Getenv(kubeconfigEnv)
c()
for i := 0; ; i++ {
val := os.Getenv(kubeconfigEnv + fmt.Sprint(i))
for i, client := range kubeClients {
var clusterPrefix = filepath.Join(fmt.Sprintf("cluster%v", i+1), name)
var prevPushFn = pushArtifacts
var nextPushFn = capture(client, clusterPrefix)

if val == "" {
break
}
pushArtifacts = func() {
prevPushFn()
nextPushFn()
}
}
return func() {
for i, client := range kubeClients {
var clusterPrefix = filepath.Join(fmt.Sprintf("cluster%v", i+1), name)

_ = os.Setenv(kubeconfigEnv, val)
c()
describePods(client, kubeConfigs[i], clusterPrefix)
}
_ = os.Setenv(kubeconfigEnv, kubeconfigValue)
pushArtifacts()
}
}

0 comments on commit 8386f28

Please sign in to comment.