From 77d87da3be49ee344090f3ee99498853fdb30ba2 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Mon, 6 Dec 2021 09:59:00 -0800 Subject: [PATCH] fix: Use and enforce structured logging. Fixes #7243 (#7324) Signed-off-by: Alex Collins --- .github/workflows/ci-build.yaml | 4 ++++ Makefile | 3 ++- hack/check-logging.sh | 28 +++++++++++++++++++++++++++ test/e2e/agent_test.go | 13 +++++++------ workflow/controller/agent.go | 16 +++++++++------- workflow/controller/cache_gc.go | 2 +- workflow/controller/controller.go | 6 +++--- workflow/controller/operator.go | 31 +++++++++++++++++++----------- workflow/controller/taskset.go | 14 ++++++-------- workflow/executor/agent.go | 32 ++++++++++++++++++------------- 10 files changed, 99 insertions(+), 50 deletions(-) create mode 100755 hack/check-logging.sh diff --git a/.github/workflows/ci-build.yaml b/.github/workflows/ci-build.yaml index c44fc65f4a66..d54bebb52dce 100644 --- a/.github/workflows/ci-build.yaml +++ b/.github/workflows/ci-build.yaml @@ -205,10 +205,14 @@ jobs: GOPATH: /home/runner/go steps: - uses: actions/checkout@v2 + with: + fetch-depth: 0 - run: cp server/static/files.go.stub server/static/files.go + - run: ./hack/check-logging.sh - uses: golangci/golangci-lint-action@v2 with: version: v1.42.0 + - run: git diff --exit-code ui: name: UI diff --git a/Makefile b/Makefile index 5ba4f89ffe7f..e30d92706e8e 100644 --- a/Makefile +++ b/Makefile @@ -377,7 +377,8 @@ lint: server/static/files.go $(GOPATH)/bin/golangci-lint rm -Rf v3 vendor # Tidy Go modules go mod tidy - + # Lint logging statements + ./hack/check-logging.sh # Lint Go files $(GOPATH)/bin/golangci-lint run --fix --verbose diff --git a/hack/check-logging.sh b/hack/check-logging.sh new file mode 100755 index 000000000000..c17fc25b1efc --- /dev/null +++ b/hack/check-logging.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env sh +# This script will return an error if the current branch introduces unstructured logging statements, such as: +# +# Errorf/Warningf/Warnf/Infof/Debugf +# +# Unstructured logging is not machine readable, so it is not possible to build reports from it, or efficiently query on it. +# +# Most production system will not be logging at debug level, so why ban Debugf? +# +# * You may change debug logging to info. +# * To encourage best practice. +# +# What to do if really must format your log messages. +# +# I've tried hard to think of any occasion when I'd prefer unstructured logging. I can't think of any times, but here +# might be some edge cases. +# +# As a last resort, use `log.Info(fmt.Sprintf(""))`. + +set -eu + +from=$(git merge-base --fork-point origin/master) +count=$(git diff "$from" -- '*.go' | grep '^+' | grep -v '\(fmt\|errors\).Errorf' | grep -c '\(Debug\|Info\|Warn\|Warning\|Error\)f' || true) + +if [ $count -gt 0 ]; then + echo 'Errorf/Warningf/Warnf/Infof/Debugf are banned. Use structured logging, e.g. log.WithError(err).Error() or log.WithField().Info().' >&2 + exit 1 +fi diff --git a/test/e2e/agent_test.go b/test/e2e/agent_test.go index dc29f5bf22ce..2ba5de436f60 100644 --- a/test/e2e/agent_test.go +++ b/test/e2e/agent_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -21,13 +22,9 @@ type AgentSuite struct { func (s *AgentSuite) TestParallel() { s.Given(). - Workflow(`apiVersion: argoproj.io/v1alpha1 -kind: Workflow + Workflow(` metadata: - name: http-template-par - workflowMetadata: - labels: - workflows.argoproj.io/test: "true" + generateName: agent- spec: entrypoint: main templates: @@ -83,3 +80,7 @@ spec: } }) } + +func TestAgentSuite(t *testing.T) { + suite.Run(t, new(AgentSuite)) +} diff --git a/workflow/controller/agent.go b/workflow/controller/agent.go index 4709cf143ea2..f7c09707c36a 100644 --- a/workflow/controller/agent.go +++ b/workflow/controller/agent.go @@ -43,7 +43,7 @@ func (woc *wfOperationCtx) reconcileAgentPod(ctx context.Context) error { } func (woc *wfOperationCtx) updateAgentPodStatus(ctx context.Context, pod *apiv1.Pod) { - woc.log.Infof("updateAgentPodStatus") + woc.log.Info("updateAgentPodStatus") newPhase, message := assessAgentPodStatus(pod) if newPhase == wfv1.WorkflowFailed || newPhase == wfv1.WorkflowError { woc.markWorkflowError(ctx, fmt.Errorf("agent pod failed with reason %s", message)) @@ -53,7 +53,9 @@ func (woc *wfOperationCtx) updateAgentPodStatus(ctx context.Context, pod *apiv1. func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.WorkflowPhase, string) { var newPhase wfv1.WorkflowPhase var message string - log.Infof("assessAgentPodStatus") + log.WithField("namespace", pod.Namespace). + WithField("podName", pod.Name). + Info("assessAgentPodStatus") switch pod.Status.Phase { case apiv1.PodSucceeded, apiv1.PodRunning, apiv1.PodPending: return "", "" @@ -69,6 +71,7 @@ func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.WorkflowPhase, string) { func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, error) { podName := woc.getAgentPodName() + log := woc.log.WithField("podName", podName) obj, exists, err := woc.controller.podInformer.GetStore().Get(cache.ExplicitKey(woc.wf.Namespace + "/" + podName)) if err != nil { @@ -77,7 +80,7 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro if exists { existing, ok := obj.(*apiv1.Pod) if ok { - woc.log.WithField("podPhase", existing.Status.Phase).Debugf("Skipped pod %s creation: already exists", podName) + log.WithField("podPhase", existing.Status.Phase).Debug("Skipped pod creation: already exists") return existing, nil } } @@ -130,17 +133,16 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro pod.Spec.ServiceAccountName = woc.wf.Spec.ServiceAccountName } - woc.log.Debugf("Creating Agent Pod: %s", podName) + log.Debug("Creating Agent pod") created, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { + log.WithError(err).Info("Failed to create Agent pod") if apierr.IsAlreadyExists(err) { - woc.log.Infof("Agent Pod %s creation: already exists", podName) return created, nil } - woc.log.Infof("Failed to create Agent pod %s: %v", podName, err) return nil, errors.InternalWrapError(fmt.Errorf("failed to create Agent pod. Reason: %v", err)) } - woc.log.Infof("Created Agent pod: %s", created.Name) + log.Info("Created Agent pod") return created, nil } diff --git a/workflow/controller/cache_gc.go b/workflow/controller/cache_gc.go index 47d71be5e87a..5d641c7407ec 100644 --- a/workflow/controller/cache_gc.go +++ b/workflow/controller/cache_gc.go @@ -20,7 +20,7 @@ import ( var gcAfterNotHitDuration = env.LookupEnvDurationOr("CACHE_GC_AFTER_NOT_HIT_DURATION", 30*time.Second) func init() { - log.WithField("gcAfterNotHitDuration", gcAfterNotHitDuration).Infof("Memoization caches will be garbage-collected if they have not been hit after %s", gcAfterNotHitDuration) + log.WithField("gcAfterNotHitDuration", gcAfterNotHitDuration).Info("Memoization caches will be garbage-collected if they have not been hit after") } // syncAllCacheForGC syncs all cache for GC diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 3d660f2b3238..8b436c8f6a3b 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -134,7 +134,7 @@ var cacheGCPeriod = env.LookupEnvDurationOr("CACHE_GC_PERIOD", 0) func init() { if cacheGCPeriod != 0 { - log.WithField("cacheGCPeriod", cacheGCPeriod).Infof("GC for memoization caches will be performed every %s", cacheGCPeriod) + log.WithField("cacheGCPeriod", cacheGCPeriod).Info("GC for memoization caches will be performed every") } } @@ -230,7 +230,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo WithField("workflowTtl", workflowTTLWorkers). WithField("pod", podWorkers). WithField("podCleanup", podCleanupWorkers). - Infof("Current Worker Numbers") + Info("Current Worker Numbers") wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListOptions, indexers) wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace) @@ -597,7 +597,7 @@ func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{}) defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) periodicity := env.LookupEnvDurationOr("WORKFLOW_GC_PERIOD", 5*time.Minute) - log.Infof("Performing periodic GC every %v", periodicity) + log.WithField("periodicity", periodicity).Info("Performing periodic GC") ticker := time.NewTicker(periodicity) for { select { diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index a5d5a465a057..6b80aba2b1de 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -200,7 +200,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { } }() - woc.log.Infof("Processing workflow") + woc.log.Info("Processing workflow") // Set the Execute workflow spec for execution // ExecWF is a runtime execution spec which merged from Wf, WFT and Wfdefault @@ -302,7 +302,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { } if woc.ShouldSuspend() { - woc.log.Infof("workflow suspended") + woc.log.Info("workflow suspended") return } if woc.execWf.Spec.Parallelism != nil { @@ -603,7 +603,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { woc.log.Info("Re-applying updates on latest version and retrying update") wf, err := woc.reapplyUpdate(ctx, wfClient, nodes) if err != nil { - woc.log.Infof("Failed to re-apply update: %+v", err) + woc.log.WithError(err).Info("Failed to re-apply update") return } woc.wf = wf @@ -1104,10 +1104,19 @@ func (woc *wfOperationCtx) getAllWorkflowPods() ([]*apiv1.Pod, error) { func printPodSpecLog(pod *apiv1.Pod, wfName string) { podSpecByte, err := json.Marshal(pod) + log := log.WithField("workflow", wfName). + WithField("podName", pod.Name). + WithField("nodeID", pod.Annotations[common.AnnotationKeyNodeID]). + WithField("namespace", pod.Namespace) if err != nil { - log.WithField("workflow", wfName).WithField("nodename", pod.Name).WithField("namespace", pod.Namespace).Warnf("Unable to mashal pod spec. %v", err) + log. + WithError(err). + Warn("Unable to marshal pod spec.") + } else { + log. + WithField("spec", string(podSpecByte)). + Info("Pod Spec") } - log.WithField("workflow", wfName).WithField("nodename", pod.Name).WithField("namespace", pod.Namespace).Infof("Pod Spec: %s", string(podSpecByte)) } // assessNodeStatus compares the current state of a pod with its corresponding node @@ -1971,7 +1980,7 @@ func (woc *wfOperationCtx) markWorkflowPhase(ctx context.Context, phase wfv1.Wor case wfv1.WorkflowSucceeded, wfv1.WorkflowFailed, wfv1.WorkflowError: // wait for all daemon nodes to get terminated before marking workflow completed if markCompleted && !woc.hasDaemonNodes() { - woc.log.Infof("Marking workflow completed") + woc.log.Info("Marking workflow completed") woc.wf.Status.FinishedAt = metav1.Time{Time: time.Now().UTC()} woc.globalParams[common.GlobalVarWorkflowDuration] = fmt.Sprintf("%f", woc.wf.Status.FinishedAt.Sub(woc.wf.Status.StartedAt.Time).Seconds()) if woc.wf.ObjectMeta.Labels == nil { @@ -1988,10 +1997,10 @@ func (woc *wfOperationCtx) markWorkflowPhase(ctx context.Context, phase wfv1.Wor } if woc.controller.wfArchive.IsEnabled() { if woc.controller.isArchivable(woc.wf) { - woc.log.Infof("Marking workflow as pending archiving") + woc.log.Info("Marking workflow as pending archiving") woc.wf.Labels[common.LabelKeyWorkflowArchivingStatus] = "Pending" } else { - woc.log.Infof("Doesn't match with archive label selector. Skipping Archive") + woc.log.Info("Doesn't match with archive label selector. Skipping Archive") } } woc.updated = true @@ -2225,7 +2234,7 @@ func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) { if eventConfig.SendAsPod { pod, err := woc.getPodByNode(node) if err != nil { - woc.log.Infof("Error getting pod from workflow node: %s", err) + woc.log.WithError(err).Info("Error getting pod from workflow node") } if pod != nil { involvedObject = pod @@ -3283,7 +3292,7 @@ func (woc *wfOperationCtx) createPDBResource(ctx context.Context) error { if err != nil { return err } - woc.log.Infof("Created PDB resource for workflow.") + woc.log.Info("Created PDB resource for workflow.") woc.updated = true return nil } @@ -3303,7 +3312,7 @@ func (woc *wfOperationCtx) deletePDBResource(ctx context.Context) error { woc.log.WithField("err", err).Error("Unable to delete PDB resource for workflow.") return err } - woc.log.Infof("Deleted PDB resource for workflow.") + woc.log.Info("Deleted PDB resource for workflow.") return nil } diff --git a/workflow/controller/taskset.go b/workflow/controller/taskset.go index 18a7d52fa736..83bea738e0fb 100644 --- a/workflow/controller/taskset.go +++ b/workflow/controller/taskset.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" - log "github.com/sirupsen/logrus" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -85,7 +84,7 @@ func (woc *wfOperationCtx) reconcileTaskSet(ctx context.Context) error { return err } - woc.log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Infof("TaskSet Reconciliation") + woc.log.Info("TaskSet Reconciliation") if workflowTaskSet != nil && len(workflowTaskSet.Status.Nodes) > 0 { for nodeID, taskResult := range workflowTaskSet.Status.Nodes { node := woc.wf.Status.Nodes[nodeID] @@ -107,8 +106,7 @@ func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error { return nil } - key := fmt.Sprintf("%s/%s", woc.wf.Namespace, woc.wf.Name) - log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).WithField("TaskSet", key).Infof("Creating TaskSet") + woc.log.Info("Creating TaskSet") taskSet := wfv1.WorkflowTaskSet{ TypeMeta: metav1.TypeMeta{ Kind: workflow.WorkflowTaskSetKind, @@ -130,23 +128,23 @@ func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error { Tasks: woc.taskSet, }, } - log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).WithField("TaskSet", key).Debug("creating new taskset") + woc.log.Debug("creating new taskset") _, err := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(woc.wf.Namespace).Create(ctx, &taskSet, metav1.CreateOptions{}) if apierr.IsConflict(err) || apierr.IsAlreadyExists(err) { - log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).WithField("TaskSet", woc.taskSet).Debug("patching the exiting taskset") + woc.log.Debug("patching the exiting taskset") spec := map[string]interface{}{ "spec": wfv1.WorkflowTaskSetSpec{Tasks: woc.taskSet}, } // patch the new templates into taskset err = woc.patchTaskSet(ctx, spec, types.MergePatchType) if err != nil { - log.WithError(err).WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Error("Failed to patch WorkflowTaskSet") + woc.log.WithError(err).Error("Failed to patch WorkflowTaskSet") return fmt.Errorf("failed to patch TaskSet. %v", err) } } else if err != nil { - log.WithError(err).WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Error("Failed to create WorkflowTaskSet") + woc.log.WithError(err).Error("Failed to create WorkflowTaskSet") return err } return nil diff --git a/workflow/executor/agent.go b/workflow/executor/agent.go index 8c6b538d157b..34deb5c4a138 100644 --- a/workflow/executor/agent.go +++ b/workflow/executor/agent.go @@ -29,6 +29,7 @@ import ( ) type AgentExecutor struct { + log *log.Entry WorkflowName string ClientSet kubernetes.Interface WorkflowInterface workflow.Interface @@ -39,6 +40,7 @@ type AgentExecutor struct { func NewAgentExecutor(clientSet kubernetes.Interface, restClient rest.Interface, config *rest.Config, namespace, workflowName string) *AgentExecutor { return &AgentExecutor{ + log: log.WithField("workflow", workflowName), ClientSet: clientSet, RESTClient: restClient, Namespace: namespace, @@ -63,7 +65,7 @@ func (ae *AgentExecutor) Agent(ctx context.Context) error { taskWorkers := env.LookupEnvIntOr(common.EnvAgentTaskWorkers, 16) requeueTime := env.LookupEnvDurationOr(common.EnvAgentPatchRate, 10*time.Second) - log.WithFields(log.Fields{"taskWorkers": taskWorkers, "requeueTime": requeueTime}).Info("Starting Agent") + ae.log.WithFields(log.Fields{"taskWorkers": taskWorkers, "requeueTime": requeueTime}).Info("Starting Agent") taskQueue := make(chan task) responseQueue := make(chan response) @@ -81,7 +83,7 @@ func (ae *AgentExecutor) Agent(ctx context.Context) error { } for event := range wfWatch.ResultChan() { - log.WithFields(log.Fields{"workflow": ae.WorkflowName, "event_type": event.Type}).Infof("TaskSet Event") + ae.log.WithField("event_type", event.Type).Info("TaskSet Event") if event.Type == watch.Deleted { // We're done if the task set is deleted @@ -93,7 +95,7 @@ func (ae *AgentExecutor) Agent(ctx context.Context) error { return apierr.FromObject(event.Object) } if IsWorkflowCompleted(taskSet) { - log.WithField("workflow", ae.WorkflowName).Info("Workflow completed... stopping agent") + ae.log.Info("Workflow completed... stopping agent") return nil } @@ -107,25 +109,27 @@ func (ae *AgentExecutor) Agent(ctx context.Context) error { func (ae *AgentExecutor) taskWorker(ctx context.Context, taskQueue chan task, responseQueue chan response) { for task := range taskQueue { nodeID, tmpl := task.NodeId, task.Template - log.WithFields(log.Fields{"nodeID": nodeID}).Info("Attempting task") + log := log.WithField("nodeID", nodeID) // Do not work on tasks that have already been considered once, to prevent calling an endpoint more // than once unintentionally. if _, ok := ae.consideredTasks[nodeID]; ok { - log.WithFields(log.Fields{"nodeID": nodeID}).Info("Task is already considered") + log.Info("Task is already considered") continue } ae.consideredTasks[nodeID] = true - log.WithFields(log.Fields{"nodeID": nodeID}).Info("Processing task") + log.Info("Processing task") result, err := ae.processTask(ctx, tmpl) if err != nil { - log.WithFields(log.Fields{"error": err, "nodeID": nodeID}).Error("Error in agent task") + log.WithError(err).Error("Error in agent task") return } - log.WithFields(log.Fields{"nodeID": nodeID}).Info("Sending result") + log.WithField("phase", result.Phase). + WithField("message", result.Message). + Info("Sending result") responseQueue <- response{NodeId: nodeID, Result: result} } } @@ -144,16 +148,18 @@ func (ae *AgentExecutor) patchWorker(ctx context.Context, taskSetInterface v1alp patch, err := json.Marshal(map[string]interface{}{"status": wfv1.WorkflowTaskSetStatus{Nodes: nodeResults}}) if err != nil { - log.WithError(err).Error("Generating Patch Failed") + ae.log.WithError(err).Error("Generating Patch Failed") continue } - log.WithFields(log.Fields{"workflow": ae.WorkflowName}).Info("Processing Patch") + ae.log.Info("Processing Patch") - obj, err := taskSetInterface.Patch(ctx, ae.WorkflowName, types.MergePatchType, patch, metav1.PatchOptions{}) + _, err = taskSetInterface.Patch(ctx, ae.WorkflowName, types.MergePatchType, patch, metav1.PatchOptions{}) if err != nil { isTransientErr := errors.IsTransientErr(err) - log.WithError(err).WithFields(log.Fields{"taskset": obj, "is_transient_error": isTransientErr}).Errorf("TaskSet Patch Failed") + ae.log.WithError(err). + WithField("is_transient_error", isTransientErr). + Error("TaskSet Patch Failed") // If this is not a transient error, then it's likely that the contents of the patch have caused the error. // To avoid a deadlock with the workflow overall, or an infinite loop, fail and propagate the error messages @@ -173,7 +179,7 @@ func (ae *AgentExecutor) patchWorker(ctx context.Context, taskSetInterface v1alp // Patch was successful, clear nodeResults for next iteration nodeResults = map[string]wfv1.NodeResult{} - log.WithField("taskset", obj).Infof("Patched TaskSet") + ae.log.Info("Patched TaskSet") } } }