Skip to content

Commit

Permalink
fix: workflow.status is now set properly in metrics. Fixes #8895 (#8939)
Browse files Browse the repository at this point in the history
* fix: workflow.status is now set properly in metrics. Fixes #8895

Signed-off-by: Dillen Padhiar <dillen_padhiar@intuit.com>

* test: add runtime metrics test

Signed-off-by: Dillen Padhiar <dillen_padhiar@intuit.com>

* feat: add duration to global runtime parameters

Signed-off-by: Dillen Padhiar <dillen_padhiar@intuit.com>

* chore: rerun git tests

Signed-off-by: Dillen Padhiar <dillen_padhiar@intuit.com>
  • Loading branch information
dpadhiar committed Jun 23, 2022
1 parent 2aa32ae commit 89f3433
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 8 deletions.
26 changes: 18 additions & 8 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,6 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
}

// Update workflow duration variable
if woc.wf.Status.StartedAt.IsZero() {
woc.globalParams[common.GlobalVarWorkflowDuration] = fmt.Sprintf("%f", time.Duration(0).Seconds())
} else {
woc.globalParams[common.GlobalVarWorkflowDuration] = fmt.Sprintf("%f", time.Since(woc.wf.Status.StartedAt.Time).Seconds())
}

// Populate the phase of all the nodes prior to execution
for _, node := range woc.wf.Status.Nodes {
woc.preExecutionNodePhases[node.ID] = node.Phase
Expand Down Expand Up @@ -538,7 +531,7 @@ func (woc *wfOperationCtx) setGlobalParameters(executionParameters wfv1.Argument
woc.globalParams[common.GlobalVarWorkflowCronScheduleTime] = val
}
}
woc.globalParams[common.GlobalVarWorkflowStatus] = string(woc.wf.Status.Phase)

if woc.execWf.Spec.Priority != nil {
woc.globalParams[common.GlobalVarWorkflowPriority] = strconv.Itoa(int(*woc.execWf.Spec.Priority))
}
Expand Down Expand Up @@ -3539,9 +3532,24 @@ func (woc *wfOperationCtx) setExecWorkflow(ctx context.Context) error {
if err != nil {
return err
}

// runtime value will be set after the substitution, otherwise will not be reflected from stored wf spec
woc.setGlobalRuntimeParameters()

return nil
}

func (woc *wfOperationCtx) setGlobalRuntimeParameters() {
woc.globalParams[common.GlobalVarWorkflowStatus] = string(woc.wf.Status.Phase)

// Update workflow duration variable
if woc.wf.Status.StartedAt.IsZero() {
woc.globalParams[common.GlobalVarWorkflowDuration] = fmt.Sprintf("%f", time.Duration(0).Seconds())
} else {
woc.globalParams[common.GlobalVarWorkflowDuration] = fmt.Sprintf("%f", time.Since(woc.wf.Status.StartedAt.Time).Seconds())
}
}

func (woc *wfOperationCtx) addFinalizers() {
woc.addArtifactGCFinalizer()
}
Expand Down Expand Up @@ -3651,6 +3659,7 @@ func (woc *wfOperationCtx) substituteGlobalVariables() error {
if err != nil {
return err
}

resolveSpec, err := template.Replace(string(wfSpec), woc.globalParams, true)
if err != nil {
return err
Expand All @@ -3659,6 +3668,7 @@ func (woc *wfOperationCtx) substituteGlobalVariables() error {
if err != nil {
return err
}

return nil
}

Expand Down
53 changes: 53 additions & 0 deletions workflow/controller/operator_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,3 +809,56 @@ func TestControllerRestartWithRunningWorkflow(t *testing.T) {
assert.NoError(t, err)
assert.Contains(t, metricString, `model_a`)
}

var runtimeWfMetrics = `apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-task-
spec:
entrypoint: dag-task
metrics: # Custom metric workflow level
prometheus:
- name: playground_workflow_new
help: "Count of workflow execution by result status - workflow level"
labels:
- key: "playground_id_workflow_counter"
value: "test"
- key: status
value: "{{workflow.status}}"
counter:
value: "1"
templates:
- name: dag-task
dag:
tasks:
- name: TEST-ONE
template: echo
- name: echo
container:
image: alpine:3.7
command: [echo, "hello"]
`

func TestRuntimeMetrics(t *testing.T) {
cancel, controller := newController()
defer cancel()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
wf := v1alpha1.MustUnmarshalWorkflow(runtimeWfMetrics)
ctx := context.Background()
_, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx) // create step node

makePodsPhase(ctx, woc, apiv1.PodSucceeded) // pod is successful - manually workflow is succeeded
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx) // node status of previous context

metricDesc := woc.wf.Spec.Metrics.Prometheus[0].GetDesc()
metric := controller.metrics.GetCustomMetric(metricDesc)
assert.NotNil(t, metric)
metricString, err := getMetricStringValue(metric)
assert.NoError(t, err)
assert.Contains(t, metricString, `Succeeded`)
}

0 comments on commit 89f3433

Please sign in to comment.