Skip to content

Commit

Permalink
fix: Skip missed executions if CronWorkflow schedule is changed. Fixes
Browse files Browse the repository at this point in the history
…#7182 (#7353)

Signed-off-by: Simon Behar <simbeh7@gmail.com>
  • Loading branch information
simster7 authored Dec 11, 2021
1 parent e1e7ed8 commit 9320408
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 16 deletions.
6 changes: 1 addition & 5 deletions cmd/argo/commands/cron/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ import (
// GetNextRuntime returns the next time the workflow should run in local time. It assumes the workflow-controller is in
// UTC, but nevertheless returns the time in the local timezone.
func GetNextRuntime(cwf *v1alpha1.CronWorkflow) (time.Time, error) {
cronScheduleString := cwf.Spec.Schedule
if cwf.Spec.Timezone != "" {
cronScheduleString = "CRON_TZ=" + cwf.Spec.Timezone + " " + cronScheduleString
}
cronSchedule, err := cron.ParseStandard(cronScheduleString)
cronSchedule, err := cron.ParseStandard(cwf.Spec.GetScheduleString())
if err != nil {
return time.Time{}, err
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/apis/workflow/v1alpha1/cron_workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
)

// CronWorkflow is the definition of a scheduled workflow resource
Expand Down Expand Up @@ -34,6 +36,8 @@ const (
ReplaceConcurrent ConcurrencyPolicy = "Replace"
)

const annotationKeyLatestSchedule = workflow.CronWorkflowFullName + "/last-used-schedule"

// CronWorkflowSpec is the specification of a CronWorkflow
type CronWorkflowSpec struct {
// WorkflowSpec is the spec of the workflow to be run
Expand Down Expand Up @@ -67,6 +71,32 @@ type CronWorkflowStatus struct {
Conditions Conditions `json:"conditions" protobuf:"bytes,3,rep,name=conditions"`
}

func (c *CronWorkflow) IsUsingNewSchedule() bool {
lastUsedSchedule, exists := c.Annotations[annotationKeyLatestSchedule]
// If last-used-schedule does not exist, or if it does not match the current schedule then the CronWorkflow schedule
// was just updated
return !exists || lastUsedSchedule != c.Spec.GetScheduleString()
}

func (c *CronWorkflow) SetSchedule(schedule string) {
if c.Annotations == nil {
c.Annotations = map[string]string{}
}
c.Annotations[annotationKeyLatestSchedule] = schedule
}

func (c *CronWorkflow) GetLatestSchedule() string {
return c.Annotations[annotationKeyLatestSchedule]
}

func (c *CronWorkflowSpec) GetScheduleString() string {
scheduleString := c.Schedule
if c.Timezone != "" {
scheduleString = "CRON_TZ=" + c.Timezone + " " + scheduleString
}
return scheduleString
}

func (c *CronWorkflowStatus) HasActiveUID(uid types.UID) bool {
for _, ref := range c.Active {
if uid == ref.UID {
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/workflow/v1alpha1/cron_workflow_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,15 @@ func TestCronWorkflowStatus_HasActiveUID(t *testing.T) {
assert.True(t, cwfStatus.HasActiveUID("123"))
assert.False(t, cwfStatus.HasActiveUID("foo"))
}

func TestCronWorkflowSpec_GetScheduleString(t *testing.T) {
cwfSpec := CronWorkflowSpec{
Timezone: "",
Schedule: "* * * * *",
}

assert.Equal(t, "* * * * *", cwfSpec.GetScheduleString())

cwfSpec.Timezone = "America/Los_Angeles"
assert.Equal(t, "CRON_TZ=America/Los_Angeles * * * * *", cwfSpec.GetScheduleString())
}
2 changes: 2 additions & 0 deletions test/e2e/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ spec:
Wait(1 * time.Minute).
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
assert.Equal(t, cronWf.Spec.GetScheduleString(), cronWf.GetLatestSchedule())
assert.True(t, cronWf.Status.LastScheduledTime.Time.After(time.Now().Add(-1*time.Minute)))
})
})
Expand Down Expand Up @@ -109,6 +110,7 @@ spec:
Wait(1 * time.Minute).
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
assert.Equal(t, cronWf.Spec.GetScheduleString(), cronWf.GetLatestSchedule())
assert.True(t, cronWf.Status.LastScheduledTime.Time.After(time.Now().Add(-1*time.Minute)))
})
})
Expand Down
7 changes: 1 addition & 6 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,7 @@ func (cc *Controller) processNextCronItem(ctx context.Context) bool {
// The job is currently scheduled, remove it and re add it.
cc.cron.Delete(key.(string))

cronSchedule := cronWf.Spec.Schedule
if cronWf.Spec.Timezone != "" {
cronSchedule = "CRON_TZ=" + cronWf.Spec.Timezone + " " + cronSchedule
}

lastScheduledTimeFunc, err := cc.cron.AddJob(key.(string), cronSchedule, cronWorkflowOperationCtx)
lastScheduledTimeFunc, err := cc.cron.AddJob(key.(string), cronWf.Spec.GetScheduleString(), cronWorkflowOperationCtx)
if err != nil {
logCtx.WithError(err).Error("could not schedule CronWorkflow")
return true
Expand Down
13 changes: 11 additions & 2 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (woc *cronWfOperationCtx) run(ctx context.Context, scheduledRuntime time.Ti

woc.log.Infof("Running %s", woc.name)

// If the cron workflow has a schedule that was just updated, update its annotation
if woc.cronWf.IsUsingNewSchedule() {
woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleString())
}

err := woc.validateCronWorkflow()
if err != nil {
return
Expand Down Expand Up @@ -129,7 +134,7 @@ func getWorkflowObjectReference(wf *v1alpha1.Workflow, runWf *v1alpha1.Workflow)
}

func (woc *cronWfOperationCtx) persistUpdate(ctx context.Context) {
woc.patch(ctx, map[string]interface{}{"status": woc.cronWf.Status})
woc.patch(ctx, map[string]interface{}{"status": woc.cronWf.Status, "metadata": map[string]interface{}{"annotations": woc.cronWf.Annotations}})
}

func (woc *cronWfOperationCtx) persistUpdateActiveWorkflows(ctx context.Context) {
Expand Down Expand Up @@ -214,6 +219,10 @@ func (woc *cronWfOperationCtx) runOutstandingWorkflows(ctx context.Context) (boo
}

func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (time.Time, error) {
// If the CronWorkflow schedule was just updated, then do not run any outstanding workflows.
if woc.cronWf.IsUsingNewSchedule() {
return time.Time{}, nil
}
// If this CronWorkflow has been run before, check if we have missed any scheduled executions
if woc.cronWf.Status.LastScheduledTime != nil {
var now time.Time
Expand All @@ -225,7 +234,7 @@ func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (time.Time, err
}
now = time.Now().In(loc)

cronScheduleString := "CRON_TZ=" + woc.cronWf.Spec.Timezone + " " + woc.cronWf.Spec.Schedule
cronScheduleString := woc.cronWf.Spec.GetScheduleString()
cronSchedule, err = cron.ParseStandard(cronScheduleString)
if err != nil {
return time.Time{}, fmt.Errorf("unable to form timezone schedule '%s': %s", cronScheduleString, err)
Expand Down
73 changes: 70 additions & 3 deletions workflow/cron/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ var scheduledWf = `
schedule: '* * * * *'
startingDeadlineSeconds: 30
workflowSpec:
entrypoint: whalesay
templates:
- container:
Expand Down Expand Up @@ -76,6 +75,7 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleString())
missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
// The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledTime
Expand All @@ -92,8 +92,14 @@ func TestRunOutstandingWorkflows(t *testing.T) {
assert.NoError(t, err)
assert.True(t, missedExecutionTime.IsZero())

// Run the same test in a different timezone
// Same test, but simulate a change to the schedule immediately prior by setting a different last-used-schedule annotation
// In this case, since a schedule change is detected, not workflow should be run
woc.cronWf.SetSchedule("0 * * * *")
missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
assert.True(t, missedExecutionTime.IsZero())

// Run the same test in a different timezone
testTimezone := "Pacific/Niue"
testLocation, err := time.LoadLocation(testTimezone)
if err != nil {
Expand All @@ -109,6 +115,8 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
// Reset last-used-schedule as if the current schedule has been used before
woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleString())
missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
// The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledTime
Expand All @@ -124,6 +132,13 @@ func TestRunOutstandingWorkflows(t *testing.T) {
missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
assert.True(t, missedExecutionTime.IsZero())

// Same test, but simulate a change to the schedule immediately prior by setting a different last-used-schedule annotation
// In this case, since a schedule change is detected, not workflow should be run
woc.cronWf.SetSchedule("0 * * * *")
missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
assert.True(t, missedExecutionTime.IsZero())
}

type fakeLister struct{}
Expand All @@ -144,7 +159,6 @@ var invalidWf = `
schedule: '* * * * *'
startingDeadlineSeconds: 30
workflowSpec:
entrypoint: whalesay
templates:
- container:
Expand Down Expand Up @@ -263,3 +277,56 @@ func TestScheduleTimeParam(t *testing.T) {
assert.Len(t, wf.GetAnnotations(), 1)
assert.NotEmpty(t, wf.GetAnnotations()[common.AnnotationKeyCronWfScheduledTime])
}

const lastUsedSchedule = `apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: test
spec:
concurrencyPolicy: Forbid
failedJobsHistoryLimit: 1
schedule: 41 12 * * *
successfulJobsHistoryLimit: 1
timezone: America/New_York
workflowSpec:
arguments: {}
entrypoint: job
templates:
- container:
args:
- /bin/echo "hello argo"
command:
- /bin/sh
- -c
image: alpine
imagePullPolicy: Always
name: job
`

func TestLastUsedSchedule(t *testing.T) {
var cronWf v1alpha1.CronWorkflow
v1alpha1.MustUnmarshal([]byte(lastUsedSchedule), &cronWf)

cs := fake.NewSimpleClientset()
testMetrics := metrics.New(metrics.ServerConfig{}, metrics.ServerConfig{})
woc := &cronWfOperationCtx{
wfClientset: cs,
wfClient: cs.ArgoprojV1alpha1().Workflows(""),
cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""),
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: testMetrics,
scheduledTimeFunc: inferScheduledTime,
}

missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun()
if assert.NoError(t, err) {
assert.Equal(t, time.Time{}, missedExecutionTime)
}

woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleString())

if assert.NotNil(t, woc.cronWf.Annotations) {
assert.Equal(t, woc.cronWf.Spec.GetScheduleString(), woc.cronWf.GetLatestSchedule())
}
}

0 comments on commit 9320408

Please sign in to comment.