Skip to content

Commit

Permalink
fix: Use and enforce structured logging. Fixes #7243 (#7324)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Dec 6, 2021
1 parent 3e727fa commit 77d87da
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 50 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,14 @@ jobs:
GOPATH: /home/runner/go
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- run: cp server/static/files.go.stub server/static/files.go
- run: ./hack/check-logging.sh
- uses: golangci/golangci-lint-action@v2
with:
version: v1.42.0
- run: git diff --exit-code

ui:
name: UI
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ lint: server/static/files.go $(GOPATH)/bin/golangci-lint
rm -Rf v3 vendor
# Tidy Go modules
go mod tidy

# Lint logging statements
./hack/check-logging.sh
# Lint Go files
$(GOPATH)/bin/golangci-lint run --fix --verbose

Expand Down
28 changes: 28 additions & 0 deletions hack/check-logging.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env sh
# This script will return an error if the current branch introduces unstructured logging statements, such as:
#
# Errorf/Warningf/Warnf/Infof/Debugf
#
# Unstructured logging is not machine readable, so it is not possible to build reports from it, or efficiently query on it.
#
# Most production system will not be logging at debug level, so why ban Debugf?
#
# * You may change debug logging to info.
# * To encourage best practice.
#
# What to do if really must format your log messages.
#
# I've tried hard to think of any occasion when I'd prefer unstructured logging. I can't think of any times, but here
# might be some edge cases.
#
# As a last resort, use `log.Info(fmt.Sprintf(""))`.

set -eu

from=$(git merge-base --fork-point origin/master)
count=$(git diff "$from" -- '*.go' | grep '^+' | grep -v '\(fmt\|errors\).Errorf' | grep -c '\(Debug\|Info\|Warn\|Warning\|Error\)f' || true)

if [ $count -gt 0 ]; then
echo 'Errorf/Warningf/Warnf/Infof/Debugf are banned. Use structured logging, e.g. log.WithError(err).Error() or log.WithField().Info().' >&2
exit 1
fi
13 changes: 7 additions & 6 deletions test/e2e/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
Expand All @@ -21,13 +22,9 @@ type AgentSuite struct {

func (s *AgentSuite) TestParallel() {
s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
Workflow(`
metadata:
name: http-template-par
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
generateName: agent-
spec:
entrypoint: main
templates:
Expand Down Expand Up @@ -83,3 +80,7 @@ spec:
}
})
}

func TestAgentSuite(t *testing.T) {
suite.Run(t, new(AgentSuite))
}
16 changes: 9 additions & 7 deletions workflow/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (woc *wfOperationCtx) reconcileAgentPod(ctx context.Context) error {
}

func (woc *wfOperationCtx) updateAgentPodStatus(ctx context.Context, pod *apiv1.Pod) {
woc.log.Infof("updateAgentPodStatus")
woc.log.Info("updateAgentPodStatus")
newPhase, message := assessAgentPodStatus(pod)
if newPhase == wfv1.WorkflowFailed || newPhase == wfv1.WorkflowError {
woc.markWorkflowError(ctx, fmt.Errorf("agent pod failed with reason %s", message))
Expand All @@ -53,7 +53,9 @@ func (woc *wfOperationCtx) updateAgentPodStatus(ctx context.Context, pod *apiv1.
func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.WorkflowPhase, string) {
var newPhase wfv1.WorkflowPhase
var message string
log.Infof("assessAgentPodStatus")
log.WithField("namespace", pod.Namespace).
WithField("podName", pod.Name).
Info("assessAgentPodStatus")
switch pod.Status.Phase {
case apiv1.PodSucceeded, apiv1.PodRunning, apiv1.PodPending:
return "", ""
Expand All @@ -69,6 +71,7 @@ func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.WorkflowPhase, string) {

func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, error) {
podName := woc.getAgentPodName()
log := woc.log.WithField("podName", podName)

obj, exists, err := woc.controller.podInformer.GetStore().Get(cache.ExplicitKey(woc.wf.Namespace + "/" + podName))
if err != nil {
Expand All @@ -77,7 +80,7 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro
if exists {
existing, ok := obj.(*apiv1.Pod)
if ok {
woc.log.WithField("podPhase", existing.Status.Phase).Debugf("Skipped pod %s creation: already exists", podName)
log.WithField("podPhase", existing.Status.Phase).Debug("Skipped pod creation: already exists")
return existing, nil
}
}
Expand Down Expand Up @@ -130,17 +133,16 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro
pod.Spec.ServiceAccountName = woc.wf.Spec.ServiceAccountName
}

woc.log.Debugf("Creating Agent Pod: %s", podName)
log.Debug("Creating Agent pod")

created, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
log.WithError(err).Info("Failed to create Agent pod")
if apierr.IsAlreadyExists(err) {
woc.log.Infof("Agent Pod %s creation: already exists", podName)
return created, nil
}
woc.log.Infof("Failed to create Agent pod %s: %v", podName, err)
return nil, errors.InternalWrapError(fmt.Errorf("failed to create Agent pod. Reason: %v", err))
}
woc.log.Infof("Created Agent pod: %s", created.Name)
log.Info("Created Agent pod")
return created, nil
}
2 changes: 1 addition & 1 deletion workflow/controller/cache_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var gcAfterNotHitDuration = env.LookupEnvDurationOr("CACHE_GC_AFTER_NOT_HIT_DURATION", 30*time.Second)

func init() {
log.WithField("gcAfterNotHitDuration", gcAfterNotHitDuration).Infof("Memoization caches will be garbage-collected if they have not been hit after %s", gcAfterNotHitDuration)
log.WithField("gcAfterNotHitDuration", gcAfterNotHitDuration).Info("Memoization caches will be garbage-collected if they have not been hit after")
}

// syncAllCacheForGC syncs all cache for GC
Expand Down
6 changes: 3 additions & 3 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ var cacheGCPeriod = env.LookupEnvDurationOr("CACHE_GC_PERIOD", 0)

func init() {
if cacheGCPeriod != 0 {
log.WithField("cacheGCPeriod", cacheGCPeriod).Infof("GC for memoization caches will be performed every %s", cacheGCPeriod)
log.WithField("cacheGCPeriod", cacheGCPeriod).Info("GC for memoization caches will be performed every")
}
}

Expand Down Expand Up @@ -230,7 +230,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
WithField("workflowTtl", workflowTTLWorkers).
WithField("pod", podWorkers).
WithField("podCleanup", podCleanupWorkers).
Infof("Current Worker Numbers")
Info("Current Worker Numbers")

wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListOptions, indexers)
wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace)
Expand Down Expand Up @@ -597,7 +597,7 @@ func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{})
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

periodicity := env.LookupEnvDurationOr("WORKFLOW_GC_PERIOD", 5*time.Minute)
log.Infof("Performing periodic GC every %v", periodicity)
log.WithField("periodicity", periodicity).Info("Performing periodic GC")
ticker := time.NewTicker(periodicity)
for {
select {
Expand Down
31 changes: 20 additions & 11 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
}()

woc.log.Infof("Processing workflow")
woc.log.Info("Processing workflow")

// Set the Execute workflow spec for execution
// ExecWF is a runtime execution spec which merged from Wf, WFT and Wfdefault
Expand Down Expand Up @@ -302,7 +302,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}

if woc.ShouldSuspend() {
woc.log.Infof("workflow suspended")
woc.log.Info("workflow suspended")
return
}
if woc.execWf.Spec.Parallelism != nil {
Expand Down Expand Up @@ -603,7 +603,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
woc.log.Info("Re-applying updates on latest version and retrying update")
wf, err := woc.reapplyUpdate(ctx, wfClient, nodes)
if err != nil {
woc.log.Infof("Failed to re-apply update: %+v", err)
woc.log.WithError(err).Info("Failed to re-apply update")
return
}
woc.wf = wf
Expand Down Expand Up @@ -1104,10 +1104,19 @@ func (woc *wfOperationCtx) getAllWorkflowPods() ([]*apiv1.Pod, error) {

func printPodSpecLog(pod *apiv1.Pod, wfName string) {
podSpecByte, err := json.Marshal(pod)
log := log.WithField("workflow", wfName).
WithField("podName", pod.Name).
WithField("nodeID", pod.Annotations[common.AnnotationKeyNodeID]).
WithField("namespace", pod.Namespace)
if err != nil {
log.WithField("workflow", wfName).WithField("nodename", pod.Name).WithField("namespace", pod.Namespace).Warnf("Unable to mashal pod spec. %v", err)
log.
WithError(err).
Warn("Unable to marshal pod spec.")
} else {
log.
WithField("spec", string(podSpecByte)).
Info("Pod Spec")
}
log.WithField("workflow", wfName).WithField("nodename", pod.Name).WithField("namespace", pod.Namespace).Infof("Pod Spec: %s", string(podSpecByte))
}

// assessNodeStatus compares the current state of a pod with its corresponding node
Expand Down Expand Up @@ -1971,7 +1980,7 @@ func (woc *wfOperationCtx) markWorkflowPhase(ctx context.Context, phase wfv1.Wor
case wfv1.WorkflowSucceeded, wfv1.WorkflowFailed, wfv1.WorkflowError:
// wait for all daemon nodes to get terminated before marking workflow completed
if markCompleted && !woc.hasDaemonNodes() {
woc.log.Infof("Marking workflow completed")
woc.log.Info("Marking workflow completed")
woc.wf.Status.FinishedAt = metav1.Time{Time: time.Now().UTC()}
woc.globalParams[common.GlobalVarWorkflowDuration] = fmt.Sprintf("%f", woc.wf.Status.FinishedAt.Sub(woc.wf.Status.StartedAt.Time).Seconds())
if woc.wf.ObjectMeta.Labels == nil {
Expand All @@ -1988,10 +1997,10 @@ func (woc *wfOperationCtx) markWorkflowPhase(ctx context.Context, phase wfv1.Wor
}
if woc.controller.wfArchive.IsEnabled() {
if woc.controller.isArchivable(woc.wf) {
woc.log.Infof("Marking workflow as pending archiving")
woc.log.Info("Marking workflow as pending archiving")
woc.wf.Labels[common.LabelKeyWorkflowArchivingStatus] = "Pending"
} else {
woc.log.Infof("Doesn't match with archive label selector. Skipping Archive")
woc.log.Info("Doesn't match with archive label selector. Skipping Archive")
}
}
woc.updated = true
Expand Down Expand Up @@ -2225,7 +2234,7 @@ func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) {
if eventConfig.SendAsPod {
pod, err := woc.getPodByNode(node)
if err != nil {
woc.log.Infof("Error getting pod from workflow node: %s", err)
woc.log.WithError(err).Info("Error getting pod from workflow node")
}
if pod != nil {
involvedObject = pod
Expand Down Expand Up @@ -3283,7 +3292,7 @@ func (woc *wfOperationCtx) createPDBResource(ctx context.Context) error {
if err != nil {
return err
}
woc.log.Infof("Created PDB resource for workflow.")
woc.log.Info("Created PDB resource for workflow.")
woc.updated = true
return nil
}
Expand All @@ -3303,7 +3312,7 @@ func (woc *wfOperationCtx) deletePDBResource(ctx context.Context) error {
woc.log.WithField("err", err).Error("Unable to delete PDB resource for workflow.")
return err
}
woc.log.Infof("Deleted PDB resource for workflow.")
woc.log.Info("Deleted PDB resource for workflow.")
return nil
}

Expand Down
14 changes: 6 additions & 8 deletions workflow/controller/taskset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"

log "github.com/sirupsen/logrus"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -85,7 +84,7 @@ func (woc *wfOperationCtx) reconcileTaskSet(ctx context.Context) error {
return err
}

woc.log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Infof("TaskSet Reconciliation")
woc.log.Info("TaskSet Reconciliation")
if workflowTaskSet != nil && len(workflowTaskSet.Status.Nodes) > 0 {
for nodeID, taskResult := range workflowTaskSet.Status.Nodes {
node := woc.wf.Status.Nodes[nodeID]
Expand All @@ -107,8 +106,7 @@ func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error {
return nil
}

key := fmt.Sprintf("%s/%s", woc.wf.Namespace, woc.wf.Name)
log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).WithField("TaskSet", key).Infof("Creating TaskSet")
woc.log.Info("Creating TaskSet")
taskSet := wfv1.WorkflowTaskSet{
TypeMeta: metav1.TypeMeta{
Kind: workflow.WorkflowTaskSetKind,
Expand All @@ -130,23 +128,23 @@ func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error {
Tasks: woc.taskSet,
},
}
log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).WithField("TaskSet", key).Debug("creating new taskset")
woc.log.Debug("creating new taskset")

_, err := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(woc.wf.Namespace).Create(ctx, &taskSet, metav1.CreateOptions{})

if apierr.IsConflict(err) || apierr.IsAlreadyExists(err) {
log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).WithField("TaskSet", woc.taskSet).Debug("patching the exiting taskset")
woc.log.Debug("patching the exiting taskset")
spec := map[string]interface{}{
"spec": wfv1.WorkflowTaskSetSpec{Tasks: woc.taskSet},
}
// patch the new templates into taskset
err = woc.patchTaskSet(ctx, spec, types.MergePatchType)
if err != nil {
log.WithError(err).WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Error("Failed to patch WorkflowTaskSet")
woc.log.WithError(err).Error("Failed to patch WorkflowTaskSet")
return fmt.Errorf("failed to patch TaskSet. %v", err)
}
} else if err != nil {
log.WithError(err).WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Error("Failed to create WorkflowTaskSet")
woc.log.WithError(err).Error("Failed to create WorkflowTaskSet")
return err
}
return nil
Expand Down
Loading

0 comments on commit 77d87da

Please sign in to comment.