Skip to content

Commit

Permalink
fix(controller): Only patch status.active in cron workflows when sync…
Browse files Browse the repository at this point in the history
…ing (#4659)

Signed-off-by: Simon Behar <simbeh7@gmail.com>
  • Loading branch information
simster7 authored Dec 9, 2020
1 parent 6b68b1f commit f47fc22
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 3 deletions.
10 changes: 10 additions & 0 deletions pkg/apis/workflow/v1alpha1/cron_workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1alpha1
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

// CronWorkflow is the definition of a scheduled workflow resource
Expand Down Expand Up @@ -66,6 +67,15 @@ type CronWorkflowStatus struct {
Conditions Conditions `json:"conditions" protobuf:"bytes,3,rep,name=conditions"`
}

func (c *CronWorkflowStatus) HasActiveUID(uid types.UID) bool {
for _, ref := range c.Active {
if uid == ref.UID {
return true
}
}
return false
}

const (
// ConditionTypeSubmissionError signifies that there was an error when submitting the CronWorkflow as a Workflow
ConditionTypeSubmissionError ConditionType = "SubmissionError"
Expand Down
17 changes: 17 additions & 0 deletions pkg/apis/workflow/v1alpha1/cron_workflow_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package v1alpha1

import (
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
)

func TestCronWorkflowStatus_HasActiveUID(t *testing.T) {
cwfStatus := CronWorkflowStatus{
Active: []v1.ObjectReference{{UID: "123"}},
}

assert.True(t, cwfStatus.HasActiveUID("123"))
assert.False(t, cwfStatus.HasActiveUID("foo"))
}
1 change: 0 additions & 1 deletion workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func (cc *Controller) syncCronWorkflow(cronWf *v1alpha1.CronWorkflow, workflows
return err
}

cwoc.persistUpdate()
return nil
}

Expand Down
27 changes: 25 additions & 2 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned"
typed "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
argoerr "github.com/argoproj/argo/util/errors"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/metrics"
"github.com/argoproj/argo/workflow/templateresolution"
Expand Down Expand Up @@ -108,14 +109,25 @@ func getWorkflowObjectReference(wf *v1alpha1.Workflow, runWf *v1alpha1.Workflow)
}

func (woc *cronWfOperationCtx) persistUpdate() {
data, err := json.Marshal(map[string]interface{}{"status": woc.cronWf.Status})
woc.patch(map[string]interface{}{"status": woc.cronWf.Status})
}

func (woc *cronWfOperationCtx) persistUpdateActiveWorkflows() {
woc.patch(map[string]interface{}{"status": map[string]interface{}{"active": woc.cronWf.Status.Active}})
}

func (woc *cronWfOperationCtx) patch(patch map[string]interface{}) {
data, err := json.Marshal(patch)
if err != nil {
woc.log.WithError(err).Error("failed to marshall cron workflow status data")
woc.log.WithError(err).Error("failed to marshall cron workflow status.active data")
return
}
err = wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
cronWf, err := woc.cronWfIf.Patch(woc.cronWf.Name, types.MergePatchType, data)
if err != nil {
if argoerr.IsTransientErr(err) {
return false, nil
}
return false, err
}
woc.cronWf = cronWf
Expand Down Expand Up @@ -227,17 +239,28 @@ func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (bool, error) {
}

func (woc *cronWfOperationCtx) reconcileActiveWfs(workflows []v1alpha1.Workflow) error {
updated := false
currentWfsFulfilled := make(map[types.UID]bool)
for _, wf := range workflows {
currentWfsFulfilled[wf.UID] = wf.Status.Fulfilled()

if !woc.cronWf.Status.HasActiveUID(wf.UID) && !wf.Status.Fulfilled() {
updated = true
woc.cronWf.Status.Active = append(woc.cronWf.Status.Active, getWorkflowObjectReference(&wf, &wf))
}
}

for _, objectRef := range woc.cronWf.Status.Active {
if fulfilled, found := currentWfsFulfilled[objectRef.UID]; !found || fulfilled {
updated = true
woc.removeFromActiveList(objectRef.UID)
}
}

if updated {
woc.persistUpdateActiveWorkflows()
}

return nil
}

Expand Down

0 comments on commit f47fc22

Please sign in to comment.