From dfb5ac1be742edac8b4cbd6ed3c56d0967c17be1 Mon Sep 17 00:00:00 2001 From: Siyu Wang Date: Wed, 7 Sep 2022 09:59:43 +0800 Subject: [PATCH] Support timezone for AdvancedCronJob (#1070) --- apis/apps/v1alpha1/advancedcronjob_types.go | 9 +- apis/apps/v1alpha1/zz_generated.deepcopy.go | 43 ++- .../apps.kruise.io_advancedcronjobs.yaml | 5 + go.mod | 2 +- go.sum | 3 +- main.go | 1 + ...advancedcronjob_broadcastjob_controller.go | 12 +- .../advancedcronjob_controller_test.go | 68 +++- .../advancedcronjob_job_controller.go | 13 +- .../advancedcronjob/advancedcronjob_utils.go | 23 +- .../advancedcronjob_create_update_handler.go | 41 +- ...ancedcronjob_create_update_handler_test.go | 101 +++++ vendor/github.com/robfig/cron/README.md | 6 - vendor/github.com/robfig/cron/cron.go | 259 ------------- .../robfig/cron/{ => v3}/.gitignore | 0 .../robfig/cron/{ => v3}/.travis.yml | 0 .../github.com/robfig/cron/{ => v3}/LICENSE | 0 vendor/github.com/robfig/cron/v3/README.md | 125 ++++++ vendor/github.com/robfig/cron/v3/chain.go | 92 +++++ .../robfig/cron/{ => v3}/constantdelay.go | 0 vendor/github.com/robfig/cron/v3/cron.go | 355 ++++++++++++++++++ vendor/github.com/robfig/cron/{ => v3}/doc.go | 132 ++++++- vendor/github.com/robfig/cron/v3/logger.go | 86 +++++ vendor/github.com/robfig/cron/v3/option.go | 45 +++ .../github.com/robfig/cron/{ => v3}/parser.go | 268 +++++++------ .../github.com/robfig/cron/{ => v3}/spec.go | 40 +- vendor/modules.txt | 6 +- 27 files changed, 1290 insertions(+), 445 deletions(-) create mode 100644 pkg/webhook/advancedcronjob/validating/advancedcronjob_create_update_handler_test.go delete mode 100644 vendor/github.com/robfig/cron/README.md delete mode 100644 vendor/github.com/robfig/cron/cron.go rename vendor/github.com/robfig/cron/{ => v3}/.gitignore (100%) rename vendor/github.com/robfig/cron/{ => v3}/.travis.yml (100%) rename vendor/github.com/robfig/cron/{ => v3}/LICENSE (100%) create mode 100644 vendor/github.com/robfig/cron/v3/README.md create mode 100644 vendor/github.com/robfig/cron/v3/chain.go rename vendor/github.com/robfig/cron/{ => v3}/constantdelay.go (100%) create mode 100644 vendor/github.com/robfig/cron/v3/cron.go rename vendor/github.com/robfig/cron/{ => v3}/doc.go (50%) create mode 100644 vendor/github.com/robfig/cron/v3/logger.go create mode 100644 vendor/github.com/robfig/cron/v3/option.go rename vendor/github.com/robfig/cron/{ => v3}/parser.go (52%) rename vendor/github.com/robfig/cron/{ => v3}/spec.go (74%) diff --git a/apis/apps/v1alpha1/advancedcronjob_types.go b/apis/apps/v1alpha1/advancedcronjob_types.go index 211a629136..f7de3d1235 100644 --- a/apis/apps/v1alpha1/advancedcronjob_types.go +++ b/apis/apps/v1alpha1/advancedcronjob_types.go @@ -17,7 +17,7 @@ limitations under the License. package v1alpha1 import ( - batchv1beta1 "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -31,6 +31,11 @@ type AdvancedCronJobSpec struct { // The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron. Schedule string `json:"schedule" protobuf:"bytes,1,opt,name=schedule"` + // The time zone name for the given schedule, see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones. + // If not specified, this will default to the time zone of the kruise-controller-manager process. + // +optional + TimeZone *string `json:"timeZone,omitempty" protobuf:"bytes,8,opt,name=timeZone"` + // +kubebuilder:validation:Minimum=0 // Optional deadline in seconds for starting the job if it misses scheduled @@ -73,7 +78,7 @@ type CronJobTemplate struct { // +optional // +kubebuilder:pruning:PreserveUnknownFields // +kubebuilder:validation:Schemaless - JobTemplate *batchv1beta1.JobTemplateSpec `json:"jobTemplate,omitempty" protobuf:"bytes,1,opt,name=jobTemplate"` + JobTemplate *batchv1.JobTemplateSpec `json:"jobTemplate,omitempty" protobuf:"bytes,1,opt,name=jobTemplate"` // Specifies the broadcastjob that will be created when executing a BroadcastCronJob. // +optional diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 782e64a84a..421c2bdd88 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -24,8 +24,8 @@ package v1alpha1 import ( "github.com/openkruise/kruise/apis/apps/pub" appsv1 "k8s.io/api/apps/v1" - "k8s.io/api/batch/v1beta1" - "k8s.io/api/core/v1" + "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" @@ -113,6 +113,11 @@ func (in *AdvancedCronJobList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AdvancedCronJobSpec) DeepCopyInto(out *AdvancedCronJobSpec) { *out = *in + if in.TimeZone != nil { + in, out := &in.TimeZone, &out.TimeZone + *out = new(string) + **out = **in + } if in.StartingDeadlineSeconds != nil { in, out := &in.StartingDeadlineSeconds, &out.StartingDeadlineSeconds *out = new(int64) @@ -151,7 +156,7 @@ func (in *AdvancedCronJobStatus) DeepCopyInto(out *AdvancedCronJobStatus) { *out = *in if in.Active != nil { in, out := &in.Active, &out.Active - *out = make([]v1.ObjectReference, len(*in)) + *out = make([]corev1.ObjectReference, len(*in)) copy(*out, *in) } if in.LastScheduleTime != nil { @@ -432,7 +437,7 @@ func (in *CloneSetSpec) DeepCopyInto(out *CloneSetSpec) { in.Template.DeepCopyInto(&out.Template) if in.VolumeClaimTemplates != nil { in, out := &in.VolumeClaimTemplates, &out.VolumeClaimTemplates - *out = make([]v1.PersistentVolumeClaim, len(*in)) + *out = make([]corev1.PersistentVolumeClaim, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -612,7 +617,7 @@ func (in *ContainerRecreateRequestContainer) DeepCopyInto(out *ContainerRecreate } if in.Ports != nil { in, out := &in.Ports, &out.Ports - *out = make([]v1.ContainerPort, len(*in)) + *out = make([]corev1.ContainerPort, len(*in)) copy(*out, *in) } if in.StatusContext != nil { @@ -800,7 +805,7 @@ func (in *CronJobTemplate) DeepCopyInto(out *CronJobTemplate) { *out = *in if in.JobTemplate != nil { in, out := &in.JobTemplate, &out.JobTemplate - *out = new(v1beta1.JobTemplateSpec) + *out = new(v1.JobTemplateSpec) (*in).DeepCopyInto(*out) } if in.BroadcastJobTemplate != nil { @@ -985,7 +990,7 @@ func (in *EphemeralContainerTemplateSpec) DeepCopyInto(out *EphemeralContainerTe *out = *in if in.EphemeralContainers != nil { in, out := &in.EphemeralContainers, &out.EphemeralContainers - *out = make([]v1.EphemeralContainer, len(*in)) + *out = make([]corev1.EphemeralContainer, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -1427,7 +1432,7 @@ func (in *ImageTagSpec) DeepCopyInto(out *ImageTagSpec) { } if in.OwnerReferences != nil { in, out := &in.OwnerReferences, &out.OwnerReferences - *out = make([]v1.ObjectReference, len(*in)) + *out = make([]corev1.ObjectReference, len(*in)) copy(*out, *in) } } @@ -1784,17 +1789,17 @@ func (in *ProbeHandler) DeepCopyInto(out *ProbeHandler) { *out = *in if in.Exec != nil { in, out := &in.Exec, &out.Exec - *out = new(v1.ExecAction) + *out = new(corev1.ExecAction) (*in).DeepCopyInto(*out) } if in.HTTPGet != nil { in, out := &in.HTTPGet, &out.HTTPGet - *out = new(v1.HTTPGetAction) + *out = new(corev1.HTTPGetAction) (*in).DeepCopyInto(*out) } if in.TCPSocket != nil { in, out := &in.TCPSocket, &out.TCPSocket - *out = new(v1.TCPSocketAction) + *out = new(corev1.TCPSocketAction) **out = **in } } @@ -2306,7 +2311,7 @@ func (in *SidecarSetSpec) DeepCopyInto(out *SidecarSetSpec) { } if in.Volumes != nil { in, out := &in.Volumes, &out.Volumes - *out = make([]v1.Volume, len(*in)) + *out = make([]corev1.Volume, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -2315,7 +2320,7 @@ func (in *SidecarSetSpec) DeepCopyInto(out *SidecarSetSpec) { in.InjectionStrategy.DeepCopyInto(&out.InjectionStrategy) if in.ImagePullSecrets != nil { in, out := &in.ImagePullSecrets, &out.ImagePullSecrets - *out = make([]v1.LocalObjectReference, len(*in)) + *out = make([]corev1.LocalObjectReference, len(*in)) copy(*out, *in) } if in.RevisionHistoryLimit != nil { @@ -2402,7 +2407,7 @@ func (in *SourceContainerNameSource) DeepCopyInto(out *SourceContainerNameSource *out = *in if in.FieldRef != nil { in, out := &in.FieldRef, &out.FieldRef - *out = new(v1.ObjectFieldSelector) + *out = new(corev1.ObjectFieldSelector) **out = **in } } @@ -2492,7 +2497,7 @@ func (in *StatefulSetSpec) DeepCopyInto(out *StatefulSetSpec) { in.Template.DeepCopyInto(&out.Template) if in.VolumeClaimTemplates != nil { in, out := &in.VolumeClaimTemplates, &out.VolumeClaimTemplates - *out = make([]v1.PersistentVolumeClaim, len(*in)) + *out = make([]corev1.PersistentVolumeClaim, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -2585,7 +2590,7 @@ func (in *Subset) DeepCopyInto(out *Subset) { in.NodeSelectorTerm.DeepCopyInto(&out.NodeSelectorTerm) if in.Tolerations != nil { in, out := &in.Tolerations, &out.Tolerations - *out = make([]v1.Toleration, len(*in)) + *out = make([]corev1.Toleration, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -3097,19 +3102,19 @@ func (in *WorkloadSpreadSubset) DeepCopyInto(out *WorkloadSpreadSubset) { *out = *in if in.RequiredNodeSelectorTerm != nil { in, out := &in.RequiredNodeSelectorTerm, &out.RequiredNodeSelectorTerm - *out = new(v1.NodeSelectorTerm) + *out = new(corev1.NodeSelectorTerm) (*in).DeepCopyInto(*out) } if in.PreferredNodeSelectorTerms != nil { in, out := &in.PreferredNodeSelectorTerms, &out.PreferredNodeSelectorTerms - *out = make([]v1.PreferredSchedulingTerm, len(*in)) + *out = make([]corev1.PreferredSchedulingTerm, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } if in.Tolerations != nil { in, out := &in.Tolerations, &out.Tolerations - *out = make([]v1.Toleration, len(*in)) + *out = make([]corev1.Toleration, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } diff --git a/config/crd/bases/apps.kruise.io_advancedcronjobs.yaml b/config/crd/bases/apps.kruise.io_advancedcronjobs.yaml index 0138464fe7..3b7d25baa9 100644 --- a/config/crd/bases/apps.kruise.io_advancedcronjobs.yaml +++ b/config/crd/bases/apps.kruise.io_advancedcronjobs.yaml @@ -184,6 +184,11 @@ spec: a CronJob. x-kubernetes-preserve-unknown-fields: true type: object + timeZone: + description: The time zone name for the given schedule, see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones. + If not specified, this will default to the time zone of the kruise-controller-manager + process. + type: string required: - schedule - template diff --git a/go.mod b/go.mod index 9f97406b1a..b058b229de 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/opencontainers/image-spec v1.0.2 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 - github.com/robfig/cron v1.2.0 + github.com/robfig/cron/v3 v3.0.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 github.com/xyproto/simpleredis v0.0.0-20200201215242-1ff0da2967b4 diff --git a/go.sum b/go.sum index 62ce79a72c..0b4b47b8bb 100644 --- a/go.sum +++ b/go.sum @@ -770,8 +770,7 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/quobyte/api v0.1.8/go.mod h1:jL7lIHrmqQ7yh05OJ+eEEdHr0u/kmT1Ff9iHd+4H6VI= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= -github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= -github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/main.go b/main.go index af1dc5e96a..56aa5e0e30 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( _ "net/http/pprof" "os" "time" + _ "time/tzdata" // for AdvancedCronJob Time Zone support "github.com/openkruise/kruise/pkg/util/controllerfinder" "github.com/spf13/pflag" diff --git a/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go b/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go index 2ad48ff59c..650e822253 100644 --- a/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go +++ b/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go @@ -20,19 +20,17 @@ import ( "sort" "time" - "sigs.k8s.io/controller-runtime/pkg/controller" - - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/source" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/robfig/cron" + "github.com/robfig/cron/v3" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ref "k8s.io/client-go/tools/reference" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" ) func watchBroadcastJob(c controller.Controller) error { @@ -212,7 +210,7 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re and the next run, so that we can know when it's time to reconcile again. */ getNextSchedule := func(cronJob *appsv1alpha1.AdvancedCronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) { - sched, err := cron.ParseStandard(cronJob.Spec.Schedule) + sched, err := cron.ParseStandard(formatSchedule(cronJob)) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err) } diff --git a/pkg/controller/advancedcronjob/advancedcronjob_controller_test.go b/pkg/controller/advancedcronjob/advancedcronjob_controller_test.go index ca1929f131..e4fe1310cb 100644 --- a/pkg/controller/advancedcronjob/advancedcronjob_controller_test.go +++ b/pkg/controller/advancedcronjob/advancedcronjob_controller_test.go @@ -19,20 +19,20 @@ package advancedcronjob import ( "flag" "testing" - - batchv1 "k8s.io/api/batch/v1" - - batchv1beta1 "k8s.io/api/batch/v1beta1" + "time" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" "golang.org/x/net/context" + batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + utilpointer "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -45,6 +45,63 @@ func init() { _ = flag.Set("v", "10") } +func TestScheduleWithTimeZone(t *testing.T) { + cases := []struct { + schedule string + timeZone *string + previousTZ string + expectedNext string + }{ + { + schedule: "0 10 * * ?", + timeZone: nil, + previousTZ: "2022-09-05T09:01:00Z", + expectedNext: "2022-09-05T10:00:00Z", + }, + { + schedule: "0 10 * * ?", + timeZone: nil, + previousTZ: "2022-09-05T11:01:00Z", + expectedNext: "2022-09-06T10:00:00Z", + }, + { + schedule: "0 10 * * ?", + timeZone: utilpointer.String("Asia/Shanghai"), + previousTZ: "2022-09-05T09:01:00Z", + expectedNext: "2022-09-06T02:00:00Z", + }, + { + schedule: "0 10 * * ?", + timeZone: utilpointer.String("Asia/Shanghai"), + previousTZ: "2022-09-06T01:01:00Z", + expectedNext: "2022-09-06T02:00:00Z", + }, + { + schedule: "TZ=Asia/Shanghai 0 10 * * ?", + timeZone: nil, + previousTZ: "2022-09-06T01:01:00Z", + expectedNext: "2022-09-06T02:00:00Z", + }, + } + + for i, tc := range cases { + acj := &appsv1alpha1.AdvancedCronJob{Spec: appsv1alpha1.AdvancedCronJobSpec{Schedule: tc.schedule, TimeZone: tc.timeZone}} + sched, err := cron.ParseStandard(formatSchedule(acj)) + if err != nil { + t.Fatal(err) + } + + previousTZ, err := time.Parse(time.RFC3339, tc.previousTZ) + if err != nil { + t.Fatal(err) + } + gotNext := sched.Next(previousTZ).Format(time.RFC3339) + if gotNext != tc.expectedNext { + t.Fatalf("case %d failed, expected next %s, got %s", i, tc.expectedNext, gotNext) + } + } +} + // Test scenario: func TestReconcileAdvancedJobCreateBroadcastJob(t *testing.T) { scheme := runtime.NewScheme() @@ -87,7 +144,6 @@ func TestReconcileAdvancedJobCreateJob(t *testing.T) { scheme := runtime.NewScheme() _ = appsv1alpha1.AddToScheme(scheme) _ = batchv1.AddToScheme(scheme) - _ = batchv1beta1.AddToScheme(scheme) _ = v1.AddToScheme(scheme) // A job @@ -185,7 +241,7 @@ func broadcastJobTemplate() appsv1alpha1.CronJobTemplate { func jobTemplate() appsv1alpha1.CronJobTemplate { return appsv1alpha1.CronJobTemplate{ - JobTemplate: &batchv1beta1.JobTemplateSpec{ + JobTemplate: &batchv1.JobTemplateSpec{ Spec: batchv1.JobSpec{ Template: v1.PodTemplateSpec{ Spec: v1.PodSpec{}, diff --git a/pkg/controller/advancedcronjob/advancedcronjob_job_controller.go b/pkg/controller/advancedcronjob/advancedcronjob_job_controller.go index fefe3851d9..32fe0278b3 100644 --- a/pkg/controller/advancedcronjob/advancedcronjob_job_controller.go +++ b/pkg/controller/advancedcronjob/advancedcronjob_job_controller.go @@ -20,21 +20,18 @@ import ( "sort" "time" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/source" - - "sigs.k8s.io/controller-runtime/pkg/controller" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/robfig/cron/v3" batchv1 "k8s.io/api/batch/v1" - - "github.com/robfig/cron" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ref "k8s.io/client-go/tools/reference" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" ) func watchJob(c controller.Controller) error { @@ -213,7 +210,7 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re and the next run, so that we can know when it's time to reconcile again. */ getNextSchedule := func(cronJob *appsv1alpha1.AdvancedCronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) { - sched, err := cron.ParseStandard(cronJob.Spec.Schedule) + sched, err := cron.ParseStandard(formatSchedule(cronJob)) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err) } diff --git a/pkg/controller/advancedcronjob/advancedcronjob_utils.go b/pkg/controller/advancedcronjob/advancedcronjob_utils.go index dc9f23b6af..ea21e74c1b 100644 --- a/pkg/controller/advancedcronjob/advancedcronjob_utils.go +++ b/pkg/controller/advancedcronjob/advancedcronjob_utils.go @@ -1,6 +1,13 @@ package advancedcronjob -import appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" +import ( + "fmt" + "strings" + "time" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "k8s.io/klog/v2" +) func FindTemplateKind(spec appsv1alpha1.AdvancedCronJobSpec) appsv1alpha1.TemplateKind { if spec.Template.JobTemplate != nil { @@ -9,3 +16,17 @@ func FindTemplateKind(spec appsv1alpha1.AdvancedCronJobSpec) appsv1alpha1.Templa return appsv1alpha1.BroadcastJobTemplate } + +func formatSchedule(acj *appsv1alpha1.AdvancedCronJob) string { + if strings.Contains(acj.Spec.Schedule, "TZ") { + return acj.Spec.Schedule + } + if acj.Spec.TimeZone != nil { + if _, err := time.LoadLocation(*acj.Spec.TimeZone); err != nil { + klog.Errorf("Failed to load location %s for %s/%s: %v", *acj.Spec.TimeZone, acj.Namespace, acj.Name, err) + return acj.Spec.Schedule + } + return fmt.Sprintf("TZ=%s %s", *acj.Spec.TimeZone, acj.Spec.Schedule) + } + return acj.Spec.Schedule +} diff --git a/pkg/webhook/advancedcronjob/validating/advancedcronjob_create_update_handler.go b/pkg/webhook/advancedcronjob/validating/advancedcronjob_create_update_handler.go index 958784687b..7891ca3139 100644 --- a/pkg/webhook/advancedcronjob/validating/advancedcronjob_create_update_handler.go +++ b/pkg/webhook/advancedcronjob/validating/advancedcronjob_create_update_handler.go @@ -21,12 +21,14 @@ import ( "fmt" "net/http" "regexp" + "strings" + "time" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" webhookutil "github.com/openkruise/kruise/pkg/webhook/util" - "github.com/robfig/cron" + "github.com/robfig/cron/v3" admissionv1 "k8s.io/api/admission/v1" - batchv1beta1 "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" genericvalidation "k8s.io/apimachinery/pkg/api/validation" @@ -75,6 +77,10 @@ func validateAdvancedCronJobSpec(spec *appsv1alpha1.AdvancedCronJobSpec, fldPath allErrs := field.ErrorList{} allErrs = append(allErrs, validateAdvancedCronJobSpecSchedule(spec, fldPath)...) allErrs = append(allErrs, validateAdvancedCronJobSpecTemplate(spec, fldPath)...) + if spec.StartingDeadlineSeconds != nil { + allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(*spec.StartingDeadlineSeconds, fldPath.Child("startingDeadlineSeconds"))...) + } + allErrs = append(allErrs, validateTimeZone(spec.TimeZone, fldPath.Child("timeZone"))...) return allErrs } @@ -90,6 +96,32 @@ func validateAdvancedCronJobSpecSchedule(spec *appsv1alpha1.AdvancedCronJobSpec, allErrs = append(allErrs, field.Invalid(fldPath.Child("schedule"), spec.Schedule, err.Error())) } + if strings.Contains(spec.Schedule, "TZ") && spec.TimeZone != nil { + allErrs = append(allErrs, field.Invalid(fldPath.Child("schedule"), + spec.Schedule, "cannot use both timeZone field and TZ or CRON_TZ in schedule")) + } + return allErrs +} + +func validateTimeZone(timeZone *string, fldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + if timeZone == nil { + return allErrs + } + + if len(*timeZone) == 0 { + allErrs = append(allErrs, field.Invalid(fldPath, timeZone, "timeZone must be nil or non-empty string")) + return allErrs + } + + if strings.EqualFold(*timeZone, "Local") { + allErrs = append(allErrs, field.Invalid(fldPath, timeZone, "timeZone must be an explicit time zone as defined in https://www.iana.org/time-zones")) + } + + if _, err := time.LoadLocation(*timeZone); err != nil { + allErrs = append(allErrs, field.Invalid(fldPath, timeZone, err.Error())) + } + return allErrs } @@ -116,7 +148,7 @@ func validateAdvancedCronJobSpecTemplate(spec *appsv1alpha1.AdvancedCronJobSpec, return allErrs } -func validateJobTemplateSpec(jobSpec *batchv1beta1.JobTemplateSpec, fldPath *field.Path) field.ErrorList { +func validateJobTemplateSpec(jobSpec *batchv1.JobTemplateSpec, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} coreTemplate, err := convertPodTemplateSpec(&jobSpec.Spec.Template) if err != nil { @@ -165,8 +197,9 @@ func (h *AdvancedCronJobCreateUpdateHandler) validateAdvancedCronJobUpdate(obj, advanceCronJob.Spec.FailedJobsHistoryLimit = oldObj.Spec.FailedJobsHistoryLimit advanceCronJob.Spec.StartingDeadlineSeconds = oldObj.Spec.StartingDeadlineSeconds advanceCronJob.Spec.Paused = oldObj.Spec.Paused + advanceCronJob.Spec.TimeZone = oldObj.Spec.TimeZone if !apiequality.Semantic.DeepEqual(advanceCronJob.Spec, oldObj.Spec) { - allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "updates to advancedcronjob spec for fields other than 'schedule', 'concurrencyPolicy', 'successfulJobsHistoryLimit', 'failedJobsHistoryLimit', 'startingDeadlineSeconds' and 'paused' are forbidden")) + allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "updates to advancedcronjob spec for fields other than 'schedule', 'concurrencyPolicy', 'successfulJobsHistoryLimit', 'failedJobsHistoryLimit', 'startingDeadlineSeconds', 'timeZone' and 'paused' are forbidden")) } return allErrs } diff --git a/pkg/webhook/advancedcronjob/validating/advancedcronjob_create_update_handler_test.go b/pkg/webhook/advancedcronjob/validating/advancedcronjob_create_update_handler_test.go new file mode 100644 index 0000000000..d3168dc933 --- /dev/null +++ b/pkg/webhook/advancedcronjob/validating/advancedcronjob_create_update_handler_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package validating + +import ( + "testing" + + v1 "k8s.io/api/core/v1" + + batchv1 "k8s.io/api/batch/v1" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/utils/pointer" +) + +func TestValidateCronJobSpec(t *testing.T) { + validPodTemplateSpec := v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + {Name: "foo", Image: "foo:latest", TerminationMessagePolicy: v1.TerminationMessageReadFile, ImagePullPolicy: v1.PullIfNotPresent}, + }, + RestartPolicy: v1.RestartPolicyAlways, + DNSPolicy: v1.DNSDefault, + }, + } + + type testCase struct { + acj *appsv1alpha1.AdvancedCronJobSpec + expectErr bool + } + + cases := map[string]testCase{ + "no validation because timeZone is nil": { + acj: &appsv1alpha1.AdvancedCronJobSpec{ + Schedule: "0 * * * *", + TimeZone: nil, + ConcurrencyPolicy: appsv1alpha1.AllowConcurrent, + Template: appsv1alpha1.CronJobTemplate{ + JobTemplate: &batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: validPodTemplateSpec, + }, + }, + }, + }, + }, + "check timeZone is valid": { + acj: &appsv1alpha1.AdvancedCronJobSpec{ + Schedule: "0 * * * *", + TimeZone: pointer.String("America/New_York"), + ConcurrencyPolicy: appsv1alpha1.AllowConcurrent, + Template: appsv1alpha1.CronJobTemplate{ + JobTemplate: &batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: validPodTemplateSpec, + }, + }, + }, + }, + }, + "check timeZone is invalid": { + acj: &appsv1alpha1.AdvancedCronJobSpec{ + Schedule: "0 * * * *", + TimeZone: pointer.String("broken"), + ConcurrencyPolicy: appsv1alpha1.AllowConcurrent, + Template: appsv1alpha1.CronJobTemplate{ + JobTemplate: &batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: validPodTemplateSpec, + }, + }, + }, + }, + expectErr: true, + }, + } + + for k, v := range cases { + errs := validateAdvancedCronJobSpec(v.acj, field.NewPath("spec")) + if len(errs) > 0 && !v.expectErr { + t.Errorf("unexpected error for %s: %v", k, errs) + } else if len(errs) == 0 && v.expectErr { + t.Errorf("expected error for %s but got nil", k) + } + } +} diff --git a/vendor/github.com/robfig/cron/README.md b/vendor/github.com/robfig/cron/README.md deleted file mode 100644 index ec40c95fcb..0000000000 --- a/vendor/github.com/robfig/cron/README.md +++ /dev/null @@ -1,6 +0,0 @@ -[![GoDoc](http://godoc.org/github.com/robfig/cron?status.png)](http://godoc.org/github.com/robfig/cron) -[![Build Status](https://travis-ci.org/robfig/cron.svg?branch=master)](https://travis-ci.org/robfig/cron) - -# cron - -Documentation here: https://godoc.org/github.com/robfig/cron diff --git a/vendor/github.com/robfig/cron/cron.go b/vendor/github.com/robfig/cron/cron.go deleted file mode 100644 index 2318aeb2e7..0000000000 --- a/vendor/github.com/robfig/cron/cron.go +++ /dev/null @@ -1,259 +0,0 @@ -package cron - -import ( - "log" - "runtime" - "sort" - "time" -) - -// Cron keeps track of any number of entries, invoking the associated func as -// specified by the schedule. It may be started, stopped, and the entries may -// be inspected while running. -type Cron struct { - entries []*Entry - stop chan struct{} - add chan *Entry - snapshot chan []*Entry - running bool - ErrorLog *log.Logger - location *time.Location -} - -// Job is an interface for submitted cron jobs. -type Job interface { - Run() -} - -// The Schedule describes a job's duty cycle. -type Schedule interface { - // Return the next activation time, later than the given time. - // Next is invoked initially, and then each time the job is run. - Next(time.Time) time.Time -} - -// Entry consists of a schedule and the func to execute on that schedule. -type Entry struct { - // The schedule on which this job should be run. - Schedule Schedule - - // The next time the job will run. This is the zero time if Cron has not been - // started or this entry's schedule is unsatisfiable - Next time.Time - - // The last time this job was run. This is the zero time if the job has never - // been run. - Prev time.Time - - // The Job to run. - Job Job -} - -// byTime is a wrapper for sorting the entry array by time -// (with zero time at the end). -type byTime []*Entry - -func (s byTime) Len() int { return len(s) } -func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s byTime) Less(i, j int) bool { - // Two zero times should return false. - // Otherwise, zero is "greater" than any other time. - // (To sort it at the end of the list.) - if s[i].Next.IsZero() { - return false - } - if s[j].Next.IsZero() { - return true - } - return s[i].Next.Before(s[j].Next) -} - -// New returns a new Cron job runner, in the Local time zone. -func New() *Cron { - return NewWithLocation(time.Now().Location()) -} - -// NewWithLocation returns a new Cron job runner. -func NewWithLocation(location *time.Location) *Cron { - return &Cron{ - entries: nil, - add: make(chan *Entry), - stop: make(chan struct{}), - snapshot: make(chan []*Entry), - running: false, - ErrorLog: nil, - location: location, - } -} - -// A wrapper that turns a func() into a cron.Job -type FuncJob func() - -func (f FuncJob) Run() { f() } - -// AddFunc adds a func to the Cron to be run on the given schedule. -func (c *Cron) AddFunc(spec string, cmd func()) error { - return c.AddJob(spec, FuncJob(cmd)) -} - -// AddJob adds a Job to the Cron to be run on the given schedule. -func (c *Cron) AddJob(spec string, cmd Job) error { - schedule, err := Parse(spec) - if err != nil { - return err - } - c.Schedule(schedule, cmd) - return nil -} - -// Schedule adds a Job to the Cron to be run on the given schedule. -func (c *Cron) Schedule(schedule Schedule, cmd Job) { - entry := &Entry{ - Schedule: schedule, - Job: cmd, - } - if !c.running { - c.entries = append(c.entries, entry) - return - } - - c.add <- entry -} - -// Entries returns a snapshot of the cron entries. -func (c *Cron) Entries() []*Entry { - if c.running { - c.snapshot <- nil - x := <-c.snapshot - return x - } - return c.entrySnapshot() -} - -// Location gets the time zone location -func (c *Cron) Location() *time.Location { - return c.location -} - -// Start the cron scheduler in its own go-routine, or no-op if already started. -func (c *Cron) Start() { - if c.running { - return - } - c.running = true - go c.run() -} - -// Run the cron scheduler, or no-op if already running. -func (c *Cron) Run() { - if c.running { - return - } - c.running = true - c.run() -} - -func (c *Cron) runWithRecovery(j Job) { - defer func() { - if r := recover(); r != nil { - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - c.logf("cron: panic running job: %v\n%s", r, buf) - } - }() - j.Run() -} - -// Run the scheduler. this is private just due to the need to synchronize -// access to the 'running' state variable. -func (c *Cron) run() { - // Figure out the next activation times for each entry. - now := c.now() - for _, entry := range c.entries { - entry.Next = entry.Schedule.Next(now) - } - - for { - // Determine the next entry to run. - sort.Sort(byTime(c.entries)) - - var timer *time.Timer - if len(c.entries) == 0 || c.entries[0].Next.IsZero() { - // If there are no entries yet, just sleep - it still handles new entries - // and stop requests. - timer = time.NewTimer(100000 * time.Hour) - } else { - timer = time.NewTimer(c.entries[0].Next.Sub(now)) - } - - for { - select { - case now = <-timer.C: - now = now.In(c.location) - // Run every entry whose next time was less than now - for _, e := range c.entries { - if e.Next.After(now) || e.Next.IsZero() { - break - } - go c.runWithRecovery(e.Job) - e.Prev = e.Next - e.Next = e.Schedule.Next(now) - } - - case newEntry := <-c.add: - timer.Stop() - now = c.now() - newEntry.Next = newEntry.Schedule.Next(now) - c.entries = append(c.entries, newEntry) - - case <-c.snapshot: - c.snapshot <- c.entrySnapshot() - continue - - case <-c.stop: - timer.Stop() - return - } - - break - } - } -} - -// Logs an error to stderr or to the configured error log -func (c *Cron) logf(format string, args ...interface{}) { - if c.ErrorLog != nil { - c.ErrorLog.Printf(format, args...) - } else { - log.Printf(format, args...) - } -} - -// Stop stops the cron scheduler if it is running; otherwise it does nothing. -func (c *Cron) Stop() { - if !c.running { - return - } - c.stop <- struct{}{} - c.running = false -} - -// entrySnapshot returns a copy of the current cron entry list. -func (c *Cron) entrySnapshot() []*Entry { - entries := []*Entry{} - for _, e := range c.entries { - entries = append(entries, &Entry{ - Schedule: e.Schedule, - Next: e.Next, - Prev: e.Prev, - Job: e.Job, - }) - } - return entries -} - -// now returns current time in c location -func (c *Cron) now() time.Time { - return time.Now().In(c.location) -} diff --git a/vendor/github.com/robfig/cron/.gitignore b/vendor/github.com/robfig/cron/v3/.gitignore similarity index 100% rename from vendor/github.com/robfig/cron/.gitignore rename to vendor/github.com/robfig/cron/v3/.gitignore diff --git a/vendor/github.com/robfig/cron/.travis.yml b/vendor/github.com/robfig/cron/v3/.travis.yml similarity index 100% rename from vendor/github.com/robfig/cron/.travis.yml rename to vendor/github.com/robfig/cron/v3/.travis.yml diff --git a/vendor/github.com/robfig/cron/LICENSE b/vendor/github.com/robfig/cron/v3/LICENSE similarity index 100% rename from vendor/github.com/robfig/cron/LICENSE rename to vendor/github.com/robfig/cron/v3/LICENSE diff --git a/vendor/github.com/robfig/cron/v3/README.md b/vendor/github.com/robfig/cron/v3/README.md new file mode 100644 index 0000000000..984c537c01 --- /dev/null +++ b/vendor/github.com/robfig/cron/v3/README.md @@ -0,0 +1,125 @@ +[![GoDoc](http://godoc.org/github.com/robfig/cron?status.png)](http://godoc.org/github.com/robfig/cron) +[![Build Status](https://travis-ci.org/robfig/cron.svg?branch=master)](https://travis-ci.org/robfig/cron) + +# cron + +Cron V3 has been released! + +To download the specific tagged release, run: + + go get github.com/robfig/cron/v3@v3.0.0 + +Import it in your program as: + + import "github.com/robfig/cron/v3" + +It requires Go 1.11 or later due to usage of Go Modules. + +Refer to the documentation here: +http://godoc.org/github.com/robfig/cron + +The rest of this document describes the the advances in v3 and a list of +breaking changes for users that wish to upgrade from an earlier version. + +## Upgrading to v3 (June 2019) + +cron v3 is a major upgrade to the library that addresses all outstanding bugs, +feature requests, and rough edges. It is based on a merge of master which +contains various fixes to issues found over the years and the v2 branch which +contains some backwards-incompatible features like the ability to remove cron +jobs. In addition, v3 adds support for Go Modules, cleans up rough edges like +the timezone support, and fixes a number of bugs. + +New features: + +- Support for Go modules. Callers must now import this library as + `github.com/robfig/cron/v3`, instead of `gopkg.in/...` + +- Fixed bugs: + - 0f01e6b parser: fix combining of Dow and Dom (#70) + - dbf3220 adjust times when rolling the clock forward to handle non-existent midnight (#157) + - eeecf15 spec_test.go: ensure an error is returned on 0 increment (#144) + - 70971dc cron.Entries(): update request for snapshot to include a reply channel (#97) + - 1cba5e6 cron: fix: removing a job causes the next scheduled job to run too late (#206) + +- Standard cron spec parsing by default (first field is "minute"), with an easy + way to opt into the seconds field (quartz-compatible). Although, note that the + year field (optional in Quartz) is not supported. + +- Extensible, key/value logging via an interface that complies with + the https://github.com/go-logr/logr project. + +- The new Chain & JobWrapper types allow you to install "interceptors" to add + cross-cutting behavior like the following: + - Recover any panics from jobs + - Delay a job's execution if the previous run hasn't completed yet + - Skip a job's execution if the previous run hasn't completed yet + - Log each job's invocations + - Notification when jobs are completed + +It is backwards incompatible with both v1 and v2. These updates are required: + +- The v1 branch accepted an optional seconds field at the beginning of the cron + spec. This is non-standard and has led to a lot of confusion. The new default + parser conforms to the standard as described by [the Cron wikipedia page]. + + UPDATING: To retain the old behavior, construct your Cron with a custom + parser: + + // Seconds field, required + cron.New(cron.WithSeconds()) + + // Seconds field, optional + cron.New( + cron.WithParser( + cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)) + +- The Cron type now accepts functional options on construction rather than the + previous ad-hoc behavior modification mechanisms (setting a field, calling a setter). + + UPDATING: Code that sets Cron.ErrorLogger or calls Cron.SetLocation must be + updated to provide those values on construction. + +- CRON_TZ is now the recommended way to specify the timezone of a single + schedule, which is sanctioned by the specification. The legacy "TZ=" prefix + will continue to be supported since it is unambiguous and easy to do so. + + UPDATING: No update is required. + +- By default, cron will no longer recover panics in jobs that it runs. + Recovering can be surprising (see issue #192) and seems to be at odds with + typical behavior of libraries. Relatedly, the `cron.WithPanicLogger` option + has been removed to accommodate the more general JobWrapper type. + + UPDATING: To opt into panic recovery and configure the panic logger: + + cron.New(cron.WithChain( + cron.Recover(logger), // or use cron.DefaultLogger + )) + +- In adding support for https://github.com/go-logr/logr, `cron.WithVerboseLogger` was + removed, since it is duplicative with the leveled logging. + + UPDATING: Callers should use `WithLogger` and specify a logger that does not + discard `Info` logs. For convenience, one is provided that wraps `*log.Logger`: + + cron.New( + cron.WithLogger(cron.VerbosePrintfLogger(logger))) + + +### Background - Cron spec format + +There are two cron spec formats in common usage: + +- The "standard" cron format, described on [the Cron wikipedia page] and used by + the cron Linux system utility. + +- The cron format used by [the Quartz Scheduler], commonly used for scheduled + jobs in Java software + +[the Cron wikipedia page]: https://en.wikipedia.org/wiki/Cron +[the Quartz Scheduler]: http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/tutorial-lesson-06.html + +The original version of this package included an optional "seconds" field, which +made it incompatible with both of these formats. Now, the "standard" format is +the default format accepted, and the Quartz format is opt-in. diff --git a/vendor/github.com/robfig/cron/v3/chain.go b/vendor/github.com/robfig/cron/v3/chain.go new file mode 100644 index 0000000000..9565b418e0 --- /dev/null +++ b/vendor/github.com/robfig/cron/v3/chain.go @@ -0,0 +1,92 @@ +package cron + +import ( + "fmt" + "runtime" + "sync" + "time" +) + +// JobWrapper decorates the given Job with some behavior. +type JobWrapper func(Job) Job + +// Chain is a sequence of JobWrappers that decorates submitted jobs with +// cross-cutting behaviors like logging or synchronization. +type Chain struct { + wrappers []JobWrapper +} + +// NewChain returns a Chain consisting of the given JobWrappers. +func NewChain(c ...JobWrapper) Chain { + return Chain{c} +} + +// Then decorates the given job with all JobWrappers in the chain. +// +// This: +// NewChain(m1, m2, m3).Then(job) +// is equivalent to: +// m1(m2(m3(job))) +func (c Chain) Then(j Job) Job { + for i := range c.wrappers { + j = c.wrappers[len(c.wrappers)-i-1](j) + } + return j +} + +// Recover panics in wrapped jobs and log them with the provided logger. +func Recover(logger Logger) JobWrapper { + return func(j Job) Job { + return FuncJob(func() { + defer func() { + if r := recover(); r != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + err, ok := r.(error) + if !ok { + err = fmt.Errorf("%v", r) + } + logger.Error(err, "panic", "stack", "...\n"+string(buf)) + } + }() + j.Run() + }) + } +} + +// DelayIfStillRunning serializes jobs, delaying subsequent runs until the +// previous one is complete. Jobs running after a delay of more than a minute +// have the delay logged at Info. +func DelayIfStillRunning(logger Logger) JobWrapper { + return func(j Job) Job { + var mu sync.Mutex + return FuncJob(func() { + start := time.Now() + mu.Lock() + defer mu.Unlock() + if dur := time.Since(start); dur > time.Minute { + logger.Info("delay", "duration", dur) + } + j.Run() + }) + } +} + +// SkipIfStillRunning skips an invocation of the Job if a previous invocation is +// still running. It logs skips to the given logger at Info level. +func SkipIfStillRunning(logger Logger) JobWrapper { + return func(j Job) Job { + var ch = make(chan struct{}, 1) + ch <- struct{}{} + return FuncJob(func() { + select { + case v := <-ch: + j.Run() + ch <- v + default: + logger.Info("skip") + } + }) + } +} diff --git a/vendor/github.com/robfig/cron/constantdelay.go b/vendor/github.com/robfig/cron/v3/constantdelay.go similarity index 100% rename from vendor/github.com/robfig/cron/constantdelay.go rename to vendor/github.com/robfig/cron/v3/constantdelay.go diff --git a/vendor/github.com/robfig/cron/v3/cron.go b/vendor/github.com/robfig/cron/v3/cron.go new file mode 100644 index 0000000000..c7e9176658 --- /dev/null +++ b/vendor/github.com/robfig/cron/v3/cron.go @@ -0,0 +1,355 @@ +package cron + +import ( + "context" + "sort" + "sync" + "time" +) + +// Cron keeps track of any number of entries, invoking the associated func as +// specified by the schedule. It may be started, stopped, and the entries may +// be inspected while running. +type Cron struct { + entries []*Entry + chain Chain + stop chan struct{} + add chan *Entry + remove chan EntryID + snapshot chan chan []Entry + running bool + logger Logger + runningMu sync.Mutex + location *time.Location + parser ScheduleParser + nextID EntryID + jobWaiter sync.WaitGroup +} + +// ScheduleParser is an interface for schedule spec parsers that return a Schedule +type ScheduleParser interface { + Parse(spec string) (Schedule, error) +} + +// Job is an interface for submitted cron jobs. +type Job interface { + Run() +} + +// Schedule describes a job's duty cycle. +type Schedule interface { + // Next returns the next activation time, later than the given time. + // Next is invoked initially, and then each time the job is run. + Next(time.Time) time.Time +} + +// EntryID identifies an entry within a Cron instance +type EntryID int + +// Entry consists of a schedule and the func to execute on that schedule. +type Entry struct { + // ID is the cron-assigned ID of this entry, which may be used to look up a + // snapshot or remove it. + ID EntryID + + // Schedule on which this job should be run. + Schedule Schedule + + // Next time the job will run, or the zero time if Cron has not been + // started or this entry's schedule is unsatisfiable + Next time.Time + + // Prev is the last time this job was run, or the zero time if never. + Prev time.Time + + // WrappedJob is the thing to run when the Schedule is activated. + WrappedJob Job + + // Job is the thing that was submitted to cron. + // It is kept around so that user code that needs to get at the job later, + // e.g. via Entries() can do so. + Job Job +} + +// Valid returns true if this is not the zero entry. +func (e Entry) Valid() bool { return e.ID != 0 } + +// byTime is a wrapper for sorting the entry array by time +// (with zero time at the end). +type byTime []*Entry + +func (s byTime) Len() int { return len(s) } +func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s byTime) Less(i, j int) bool { + // Two zero times should return false. + // Otherwise, zero is "greater" than any other time. + // (To sort it at the end of the list.) + if s[i].Next.IsZero() { + return false + } + if s[j].Next.IsZero() { + return true + } + return s[i].Next.Before(s[j].Next) +} + +// New returns a new Cron job runner, modified by the given options. +// +// Available Settings +// +// Time Zone +// Description: The time zone in which schedules are interpreted +// Default: time.Local +// +// Parser +// Description: Parser converts cron spec strings into cron.Schedules. +// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron +// +// Chain +// Description: Wrap submitted jobs to customize behavior. +// Default: A chain that recovers panics and logs them to stderr. +// +// See "cron.With*" to modify the default behavior. +func New(opts ...Option) *Cron { + c := &Cron{ + entries: nil, + chain: NewChain(), + add: make(chan *Entry), + stop: make(chan struct{}), + snapshot: make(chan chan []Entry), + remove: make(chan EntryID), + running: false, + runningMu: sync.Mutex{}, + logger: DefaultLogger, + location: time.Local, + parser: standardParser, + } + for _, opt := range opts { + opt(c) + } + return c +} + +// FuncJob is a wrapper that turns a func() into a cron.Job +type FuncJob func() + +func (f FuncJob) Run() { f() } + +// AddFunc adds a func to the Cron to be run on the given schedule. +// The spec is parsed using the time zone of this Cron instance as the default. +// An opaque ID is returned that can be used to later remove it. +func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) { + return c.AddJob(spec, FuncJob(cmd)) +} + +// AddJob adds a Job to the Cron to be run on the given schedule. +// The spec is parsed using the time zone of this Cron instance as the default. +// An opaque ID is returned that can be used to later remove it. +func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) { + schedule, err := c.parser.Parse(spec) + if err != nil { + return 0, err + } + return c.Schedule(schedule, cmd), nil +} + +// Schedule adds a Job to the Cron to be run on the given schedule. +// The job is wrapped with the configured Chain. +func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { + c.runningMu.Lock() + defer c.runningMu.Unlock() + c.nextID++ + entry := &Entry{ + ID: c.nextID, + Schedule: schedule, + WrappedJob: c.chain.Then(cmd), + Job: cmd, + } + if !c.running { + c.entries = append(c.entries, entry) + } else { + c.add <- entry + } + return entry.ID +} + +// Entries returns a snapshot of the cron entries. +func (c *Cron) Entries() []Entry { + c.runningMu.Lock() + defer c.runningMu.Unlock() + if c.running { + replyChan := make(chan []Entry, 1) + c.snapshot <- replyChan + return <-replyChan + } + return c.entrySnapshot() +} + +// Location gets the time zone location +func (c *Cron) Location() *time.Location { + return c.location +} + +// Entry returns a snapshot of the given entry, or nil if it couldn't be found. +func (c *Cron) Entry(id EntryID) Entry { + for _, entry := range c.Entries() { + if id == entry.ID { + return entry + } + } + return Entry{} +} + +// Remove an entry from being run in the future. +func (c *Cron) Remove(id EntryID) { + c.runningMu.Lock() + defer c.runningMu.Unlock() + if c.running { + c.remove <- id + } else { + c.removeEntry(id) + } +} + +// Start the cron scheduler in its own goroutine, or no-op if already started. +func (c *Cron) Start() { + c.runningMu.Lock() + defer c.runningMu.Unlock() + if c.running { + return + } + c.running = true + go c.run() +} + +// Run the cron scheduler, or no-op if already running. +func (c *Cron) Run() { + c.runningMu.Lock() + if c.running { + c.runningMu.Unlock() + return + } + c.running = true + c.runningMu.Unlock() + c.run() +} + +// run the scheduler.. this is private just due to the need to synchronize +// access to the 'running' state variable. +func (c *Cron) run() { + c.logger.Info("start") + + // Figure out the next activation times for each entry. + now := c.now() + for _, entry := range c.entries { + entry.Next = entry.Schedule.Next(now) + c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next) + } + + for { + // Determine the next entry to run. + sort.Sort(byTime(c.entries)) + + var timer *time.Timer + if len(c.entries) == 0 || c.entries[0].Next.IsZero() { + // If there are no entries yet, just sleep - it still handles new entries + // and stop requests. + timer = time.NewTimer(100000 * time.Hour) + } else { + timer = time.NewTimer(c.entries[0].Next.Sub(now)) + } + + for { + select { + case now = <-timer.C: + now = now.In(c.location) + c.logger.Info("wake", "now", now) + + // Run every entry whose next time was less than now + for _, e := range c.entries { + if e.Next.After(now) || e.Next.IsZero() { + break + } + c.startJob(e.WrappedJob) + e.Prev = e.Next + e.Next = e.Schedule.Next(now) + c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next) + } + + case newEntry := <-c.add: + timer.Stop() + now = c.now() + newEntry.Next = newEntry.Schedule.Next(now) + c.entries = append(c.entries, newEntry) + c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next) + + case replyChan := <-c.snapshot: + replyChan <- c.entrySnapshot() + continue + + case <-c.stop: + timer.Stop() + c.logger.Info("stop") + return + + case id := <-c.remove: + timer.Stop() + now = c.now() + c.removeEntry(id) + c.logger.Info("removed", "entry", id) + } + + break + } + } +} + +// startJob runs the given job in a new goroutine. +func (c *Cron) startJob(j Job) { + c.jobWaiter.Add(1) + go func() { + defer c.jobWaiter.Done() + j.Run() + }() +} + +// now returns current time in c location +func (c *Cron) now() time.Time { + return time.Now().In(c.location) +} + +// Stop stops the cron scheduler if it is running; otherwise it does nothing. +// A context is returned so the caller can wait for running jobs to complete. +func (c *Cron) Stop() context.Context { + c.runningMu.Lock() + defer c.runningMu.Unlock() + if c.running { + c.stop <- struct{}{} + c.running = false + } + ctx, cancel := context.WithCancel(context.Background()) + go func() { + c.jobWaiter.Wait() + cancel() + }() + return ctx +} + +// entrySnapshot returns a copy of the current cron entry list. +func (c *Cron) entrySnapshot() []Entry { + var entries = make([]Entry, len(c.entries)) + for i, e := range c.entries { + entries[i] = *e + } + return entries +} + +func (c *Cron) removeEntry(id EntryID) { + var entries []*Entry + for _, e := range c.entries { + if e.ID != id { + entries = append(entries, e) + } + } + c.entries = entries +} diff --git a/vendor/github.com/robfig/cron/doc.go b/vendor/github.com/robfig/cron/v3/doc.go similarity index 50% rename from vendor/github.com/robfig/cron/doc.go rename to vendor/github.com/robfig/cron/v3/doc.go index d02ec2f3b5..fa5d08b4db 100644 --- a/vendor/github.com/robfig/cron/doc.go +++ b/vendor/github.com/robfig/cron/v3/doc.go @@ -1,15 +1,29 @@ /* Package cron implements a cron spec parser and job runner. +Installation + +To download the specific tagged release, run: + + go get github.com/robfig/cron/v3@v3.0.0 + +Import it in your program as: + + import "github.com/robfig/cron/v3" + +It requires Go 1.11 or later due to usage of Go Modules. + Usage Callers may register Funcs to be invoked on a given schedule. Cron will run them in their own goroutines. c := cron.New() - c.AddFunc("0 30 * * * *", func() { fmt.Println("Every hour on the half hour") }) - c.AddFunc("@hourly", func() { fmt.Println("Every hour") }) - c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty") }) + c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") }) + c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") }) + c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") }) + c.AddFunc("@hourly", func() { fmt.Println("Every hour, starting an hour from now") }) + c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty, starting an hour thirty from now") }) c.Start() .. // Funcs are invoked in their own goroutine, asynchronously. @@ -24,19 +38,40 @@ them in their own goroutines. CRON Expression Format -A cron expression represents a set of times, using 6 space-separated fields. +A cron expression represents a set of times, using 5 space-separated fields. Field name | Mandatory? | Allowed values | Allowed special characters ---------- | ---------- | -------------- | -------------------------- - Seconds | Yes | 0-59 | * / , - Minutes | Yes | 0-59 | * / , - Hours | Yes | 0-23 | * / , - Day of month | Yes | 1-31 | * / , - ? Month | Yes | 1-12 or JAN-DEC | * / , - Day of week | Yes | 0-6 or SUN-SAT | * / , - ? -Note: Month and Day-of-week field values are case insensitive. "SUN", "Sun", -and "sun" are equally accepted. +Month and Day-of-week field values are case insensitive. "SUN", "Sun", and +"sun" are equally accepted. + +The specific interpretation of the format is based on the Cron Wikipedia page: +https://en.wikipedia.org/wiki/Cron + +Alternative Formats + +Alternative Cron expression formats support other fields like seconds. You can +implement that by creating a custom Parser as follows. + + cron.New( + cron.WithParser( + cron.NewParser( + cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor))) + +Since adding Seconds is the most common modification to the standard cron spec, +cron provides a builtin function to do that, which is equivalent to the custom +parser you saw earlier, except that its seconds field is REQUIRED: + + cron.New(cron.WithSeconds()) + +That emulates Quartz, the most popular alternative Cron schedule format: +http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html Special Characters @@ -76,15 +111,15 @@ You may use one of several pre-defined schedules in place of a cron expression. Entry | Description | Equivalent To ----- | ----------- | ------------- - @yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 0 1 1 * - @monthly | Run once a month, midnight, first of month | 0 0 0 1 * * - @weekly | Run once a week, midnight between Sat/Sun | 0 0 0 * * 0 - @daily (or @midnight) | Run once a day, midnight | 0 0 0 * * * - @hourly | Run once an hour, beginning of hour | 0 0 * * * * + @yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 1 1 * + @monthly | Run once a month, midnight, first of month | 0 0 1 * * + @weekly | Run once a week, midnight between Sat/Sun | 0 0 * * 0 + @daily (or @midnight) | Run once a day, midnight | 0 0 * * * + @hourly | Run once an hour, beginning of hour | 0 * * * * Intervals -You may also schedule a job to execute at fixed intervals, starting at the time it's added +You may also schedule a job to execute at fixed intervals, starting at the time it's added or cron is run. This is supported by formatting the cron spec like this: @every @@ -101,12 +136,62 @@ it will have only 2 minutes of idle time between each run. Time zones -All interpretation and scheduling is done in the machine's local time zone (as -provided by the Go time package (http://www.golang.org/pkg/time). +By default, all interpretation and scheduling is done in the machine's local +time zone (time.Local). You can specify a different time zone on construction: + + cron.New( + cron.WithLocation(time.UTC)) + +Individual cron schedules may also override the time zone they are to be +interpreted in by providing an additional space-separated field at the beginning +of the cron spec, of the form "CRON_TZ=Asia/Tokyo". + +For example: + + # Runs at 6am in time.Local + cron.New().AddFunc("0 6 * * ?", ...) + + # Runs at 6am in America/New_York + nyc, _ := time.LoadLocation("America/New_York") + c := cron.New(cron.WithLocation(nyc)) + c.AddFunc("0 6 * * ?", ...) + + # Runs at 6am in Asia/Tokyo + cron.New().AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", ...) + + # Runs at 6am in Asia/Tokyo + c := cron.New(cron.WithLocation(nyc)) + c.SetLocation("America/New_York") + c.AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", ...) + +The prefix "TZ=(TIME ZONE)" is also supported for legacy compatibility. Be aware that jobs scheduled during daylight-savings leap-ahead transitions will not be run! +Job Wrappers + +A Cron runner may be configured with a chain of job wrappers to add +cross-cutting functionality to all submitted jobs. For example, they may be used +to achieve the following effects: + + - Recover any panics from jobs (activated by default) + - Delay a job's execution if the previous run hasn't completed yet + - Skip a job's execution if the previous run hasn't completed yet + - Log each job's invocations + +Install wrappers for all jobs added to a cron using the `cron.WithChain` option: + + cron.New(cron.WithChain( + cron.SkipIfStillRunning(logger), + )) + +Install wrappers for individual jobs by explicitly wrapping them: + + job = cron.NewChain( + cron.SkipIfStillRunning(logger), + ).Then(job) + Thread safety Since the Cron service runs concurrently with the calling code, some amount of @@ -115,6 +200,23 @@ care must be taken to ensure proper synchronization. All cron methods are designed to be correctly synchronized as long as the caller ensures that invocations have a clear happens-before ordering between them. +Logging + +Cron defines a Logger interface that is a subset of the one defined in +github.com/go-logr/logr. It has two logging levels (Info and Error), and +parameters are key/value pairs. This makes it possible for cron logging to plug +into structured logging systems. An adapter, [Verbose]PrintfLogger, is provided +to wrap the standard library *log.Logger. + +For additional insight into Cron operations, verbose logging may be activated +which will record job runs, scheduling decisions, and added or removed jobs. +Activate it with a one-off logger as follows: + + cron.New( + cron.WithLogger( + cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)))) + + Implementation Cron entries are stored in an array, sorted by their next activation time. Cron diff --git a/vendor/github.com/robfig/cron/v3/logger.go b/vendor/github.com/robfig/cron/v3/logger.go new file mode 100644 index 0000000000..b4efcc0535 --- /dev/null +++ b/vendor/github.com/robfig/cron/v3/logger.go @@ -0,0 +1,86 @@ +package cron + +import ( + "io/ioutil" + "log" + "os" + "strings" + "time" +) + +// DefaultLogger is used by Cron if none is specified. +var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)) + +// DiscardLogger can be used by callers to discard all log messages. +var DiscardLogger Logger = PrintfLogger(log.New(ioutil.Discard, "", 0)) + +// Logger is the interface used in this package for logging, so that any backend +// can be plugged in. It is a subset of the github.com/go-logr/logr interface. +type Logger interface { + // Info logs routine messages about cron's operation. + Info(msg string, keysAndValues ...interface{}) + // Error logs an error condition. + Error(err error, msg string, keysAndValues ...interface{}) +} + +// PrintfLogger wraps a Printf-based logger (such as the standard library "log") +// into an implementation of the Logger interface which logs errors only. +func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger { + return printfLogger{l, false} +} + +// VerbosePrintfLogger wraps a Printf-based logger (such as the standard library +// "log") into an implementation of the Logger interface which logs everything. +func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger { + return printfLogger{l, true} +} + +type printfLogger struct { + logger interface{ Printf(string, ...interface{}) } + logInfo bool +} + +func (pl printfLogger) Info(msg string, keysAndValues ...interface{}) { + if pl.logInfo { + keysAndValues = formatTimes(keysAndValues) + pl.logger.Printf( + formatString(len(keysAndValues)), + append([]interface{}{msg}, keysAndValues...)...) + } +} + +func (pl printfLogger) Error(err error, msg string, keysAndValues ...interface{}) { + keysAndValues = formatTimes(keysAndValues) + pl.logger.Printf( + formatString(len(keysAndValues)+2), + append([]interface{}{msg, "error", err}, keysAndValues...)...) +} + +// formatString returns a logfmt-like format string for the number of +// key/values. +func formatString(numKeysAndValues int) string { + var sb strings.Builder + sb.WriteString("%s") + if numKeysAndValues > 0 { + sb.WriteString(", ") + } + for i := 0; i < numKeysAndValues/2; i++ { + if i > 0 { + sb.WriteString(", ") + } + sb.WriteString("%v=%v") + } + return sb.String() +} + +// formatTimes formats any time.Time values as RFC3339. +func formatTimes(keysAndValues []interface{}) []interface{} { + var formattedArgs []interface{} + for _, arg := range keysAndValues { + if t, ok := arg.(time.Time); ok { + arg = t.Format(time.RFC3339) + } + formattedArgs = append(formattedArgs, arg) + } + return formattedArgs +} diff --git a/vendor/github.com/robfig/cron/v3/option.go b/vendor/github.com/robfig/cron/v3/option.go new file mode 100644 index 0000000000..09e4278e77 --- /dev/null +++ b/vendor/github.com/robfig/cron/v3/option.go @@ -0,0 +1,45 @@ +package cron + +import ( + "time" +) + +// Option represents a modification to the default behavior of a Cron. +type Option func(*Cron) + +// WithLocation overrides the timezone of the cron instance. +func WithLocation(loc *time.Location) Option { + return func(c *Cron) { + c.location = loc + } +} + +// WithSeconds overrides the parser used for interpreting job schedules to +// include a seconds field as the first one. +func WithSeconds() Option { + return WithParser(NewParser( + Second | Minute | Hour | Dom | Month | Dow | Descriptor, + )) +} + +// WithParser overrides the parser used for interpreting job schedules. +func WithParser(p ScheduleParser) Option { + return func(c *Cron) { + c.parser = p + } +} + +// WithChain specifies Job wrappers to apply to all jobs added to this cron. +// Refer to the Chain* functions in this package for provided wrappers. +func WithChain(wrappers ...JobWrapper) Option { + return func(c *Cron) { + c.chain = NewChain(wrappers...) + } +} + +// WithLogger uses the provided logger. +func WithLogger(logger Logger) Option { + return func(c *Cron) { + c.logger = logger + } +} diff --git a/vendor/github.com/robfig/cron/parser.go b/vendor/github.com/robfig/cron/v3/parser.go similarity index 52% rename from vendor/github.com/robfig/cron/parser.go rename to vendor/github.com/robfig/cron/v3/parser.go index a5e83c0a8d..3cf8879f7e 100644 --- a/vendor/github.com/robfig/cron/parser.go +++ b/vendor/github.com/robfig/cron/v3/parser.go @@ -15,14 +15,15 @@ import ( type ParseOption int const ( - Second ParseOption = 1 << iota // Seconds field, default 0 - Minute // Minutes field, default 0 - Hour // Hours field, default 0 - Dom // Day of month field, default * - Month // Month field, default * - Dow // Day of week field, default * - DowOptional // Optional day of week field, default * - Descriptor // Allow descriptors such as @monthly, @weekly, etc. + Second ParseOption = 1 << iota // Seconds field, default 0 + SecondOptional // Optional seconds field, default 0 + Minute // Minutes field, default 0 + Hour // Hours field, default 0 + Dom // Day of month field, default * + Month // Month field, default * + Dow // Day of week field, default * + DowOptional // Optional day of week field, default * + Descriptor // Allow descriptors such as @monthly, @weekly, etc. ) var places = []ParseOption{ @@ -45,11 +46,15 @@ var defaults = []string{ // A custom Parser that can be configured. type Parser struct { - options ParseOption - optionals int + options ParseOption } -// Creates a custom Parser with custom options. +// NewParser creates a Parser with custom options. +// +// It panics if more than one Optional is given, since it would be impossible to +// correctly infer which optional is provided or missing in general. +// +// Examples // // // Standard parser without descriptors // specParser := NewParser(Minute | Hour | Dom | Month | Dow) @@ -66,10 +71,15 @@ type Parser struct { func NewParser(options ParseOption) Parser { optionals := 0 if options&DowOptional > 0 { - options |= Dow optionals++ } - return Parser{options, optionals} + if options&SecondOptional > 0 { + optionals++ + } + if optionals > 1 { + panic("multiple optionals may not be configured") + } + return Parser{options} } // Parse returns a new crontab schedule representing the given spec. @@ -77,36 +87,39 @@ func NewParser(options ParseOption) Parser { // It accepts crontab specs and features configured by NewParser. func (p Parser) Parse(spec string) (Schedule, error) { if len(spec) == 0 { - return nil, fmt.Errorf("Empty spec string") - } - if spec[0] == '@' && p.options&Descriptor > 0 { - return parseDescriptor(spec) + return nil, fmt.Errorf("empty spec string") } - // Figure out how many fields we need - max := 0 - for _, place := range places { - if p.options&place > 0 { - max++ + // Extract timezone if present + var loc = time.Local + if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") { + var err error + i := strings.Index(spec, " ") + eq := strings.Index(spec, "=") + if loc, err = time.LoadLocation(spec[eq+1 : i]); err != nil { + return nil, fmt.Errorf("provided bad location %s: %v", spec[eq+1:i], err) } + spec = strings.TrimSpace(spec[i:]) } - min := max - p.optionals - - // Split fields on whitespace - fields := strings.Fields(spec) - // Validate number of fields - if count := len(fields); count < min || count > max { - if min == max { - return nil, fmt.Errorf("Expected exactly %d fields, found %d: %s", min, count, spec) + // Handle named schedules (descriptors), if configured + if strings.HasPrefix(spec, "@") { + if p.options&Descriptor == 0 { + return nil, fmt.Errorf("parser does not accept descriptors: %v", spec) } - return nil, fmt.Errorf("Expected %d to %d fields, found %d: %s", min, max, count, spec) + return parseDescriptor(spec, loc) } - // Fill in missing fields - fields = expandFields(fields, p.options) + // Split on whitespace. + fields := strings.Fields(spec) + // Validate & fill in any omitted or optional fields var err error + fields, err = normalizeFields(fields, p.options) + if err != nil { + return nil, err + } + field := func(field string, r bounds) uint64 { if err != nil { return 0 @@ -129,40 +142,86 @@ func (p Parser) Parse(spec string) (Schedule, error) { } return &SpecSchedule{ - Second: second, - Minute: minute, - Hour: hour, - Dom: dayofmonth, - Month: month, - Dow: dayofweek, + Second: second, + Minute: minute, + Hour: hour, + Dom: dayofmonth, + Month: month, + Dow: dayofweek, + Location: loc, }, nil } -func expandFields(fields []string, options ParseOption) []string { +// normalizeFields takes a subset set of the time fields and returns the full set +// with defaults (zeroes) populated for unset fields. +// +// As part of performing this function, it also validates that the provided +// fields are compatible with the configured options. +func normalizeFields(fields []string, options ParseOption) ([]string, error) { + // Validate optionals & add their field to options + optionals := 0 + if options&SecondOptional > 0 { + options |= Second + optionals++ + } + if options&DowOptional > 0 { + options |= Dow + optionals++ + } + if optionals > 1 { + return nil, fmt.Errorf("multiple optionals may not be configured") + } + + // Figure out how many fields we need + max := 0 + for _, place := range places { + if options&place > 0 { + max++ + } + } + min := max - optionals + + // Validate number of fields + if count := len(fields); count < min || count > max { + if min == max { + return nil, fmt.Errorf("expected exactly %d fields, found %d: %s", min, count, fields) + } + return nil, fmt.Errorf("expected %d to %d fields, found %d: %s", min, max, count, fields) + } + + // Populate the optional field if not provided + if min < max && len(fields) == min { + switch { + case options&DowOptional > 0: + fields = append(fields, defaults[5]) // TODO: improve access to default + case options&SecondOptional > 0: + fields = append([]string{defaults[0]}, fields...) + default: + return nil, fmt.Errorf("unknown optional field") + } + } + + // Populate all fields not part of options with their defaults n := 0 - count := len(fields) - expFields := make([]string, len(places)) - copy(expFields, defaults) + expandedFields := make([]string, len(places)) + copy(expandedFields, defaults) for i, place := range places { if options&place > 0 { - expFields[i] = fields[n] + expandedFields[i] = fields[n] n++ } - if n == count { - break - } } - return expFields + return expandedFields, nil } var standardParser = NewParser( Minute | Hour | Dom | Month | Dow | Descriptor, ) -// ParseStandard returns a new crontab schedule representing the given standardSpec -// (https://en.wikipedia.org/wiki/Cron). It differs from Parse requiring to always -// pass 5 entries representing: minute, hour, day of month, month and day of week, -// in that order. It returns a descriptive error if the spec is not valid. +// ParseStandard returns a new crontab schedule representing the given +// standardSpec (https://en.wikipedia.org/wiki/Cron). It requires 5 entries +// representing: minute, hour, day of month, month and day of week, in that +// order. It returns a descriptive error if the spec is not valid. // // It accepts // - Standard crontab specs, e.g. "* * * * ?" @@ -171,20 +230,6 @@ func ParseStandard(standardSpec string) (Schedule, error) { return standardParser.Parse(standardSpec) } -var defaultParser = NewParser( - Second | Minute | Hour | Dom | Month | DowOptional | Descriptor, -) - -// Parse returns a new crontab schedule representing the given spec. -// It returns a descriptive error if the spec is not valid. -// -// It accepts -// - Full crontab specs, e.g. "* * * * * ?" -// - Descriptors, e.g. "@midnight", "@every 1h30m" -func Parse(spec string) (Schedule, error) { - return defaultParser.Parse(spec) -} - // getField returns an Int with the bits set representing all of the times that // the field represents or error parsing field value. A "field" is a comma-separated // list of "ranges". @@ -232,7 +277,7 @@ func getRange(expr string, r bounds) (uint64, error) { return 0, err } default: - return 0, fmt.Errorf("Too many hyphens: %s", expr) + return 0, fmt.Errorf("too many hyphens: %s", expr) } } @@ -249,21 +294,24 @@ func getRange(expr string, r bounds) (uint64, error) { if singleDigit { end = r.max } + if step > 1 { + extra = 0 + } default: - return 0, fmt.Errorf("Too many slashes: %s", expr) + return 0, fmt.Errorf("too many slashes: %s", expr) } if start < r.min { - return 0, fmt.Errorf("Beginning of range (%d) below minimum (%d): %s", start, r.min, expr) + return 0, fmt.Errorf("beginning of range (%d) below minimum (%d): %s", start, r.min, expr) } if end > r.max { - return 0, fmt.Errorf("End of range (%d) above maximum (%d): %s", end, r.max, expr) + return 0, fmt.Errorf("end of range (%d) above maximum (%d): %s", end, r.max, expr) } if start > end { - return 0, fmt.Errorf("Beginning of range (%d) beyond end of range (%d): %s", start, end, expr) + return 0, fmt.Errorf("beginning of range (%d) beyond end of range (%d): %s", start, end, expr) } if step == 0 { - return 0, fmt.Errorf("Step of range should be a positive number: %s", expr) + return 0, fmt.Errorf("step of range should be a positive number: %s", expr) } return getBits(start, end, step) | extra, nil @@ -283,10 +331,10 @@ func parseIntOrName(expr string, names map[string]uint) (uint, error) { func mustParseInt(expr string) (uint, error) { num, err := strconv.Atoi(expr) if err != nil { - return 0, fmt.Errorf("Failed to parse int from %s: %s", expr, err) + return 0, fmt.Errorf("failed to parse int from %s: %s", expr, err) } if num < 0 { - return 0, fmt.Errorf("Negative number (%d) not allowed: %s", num, expr) + return 0, fmt.Errorf("negative number (%d) not allowed: %s", num, expr) } return uint(num), nil @@ -314,67 +362,73 @@ func all(r bounds) uint64 { } // parseDescriptor returns a predefined schedule for the expression, or error if none matches. -func parseDescriptor(descriptor string) (Schedule, error) { +func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { switch descriptor { case "@yearly", "@annually": return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: 1 << dom.min, - Month: 1 << months.min, - Dow: all(dow), + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: 1 << dom.min, + Month: 1 << months.min, + Dow: all(dow), + Location: loc, }, nil case "@monthly": return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: 1 << dom.min, - Month: all(months), - Dow: all(dow), + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: 1 << dom.min, + Month: all(months), + Dow: all(dow), + Location: loc, }, nil case "@weekly": return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: all(dom), - Month: all(months), - Dow: 1 << dow.min, + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: all(dom), + Month: all(months), + Dow: 1 << dow.min, + Location: loc, }, nil case "@daily", "@midnight": return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: all(dom), - Month: all(months), - Dow: all(dow), + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: all(dom), + Month: all(months), + Dow: all(dow), + Location: loc, }, nil case "@hourly": return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: all(hours), - Dom: all(dom), - Month: all(months), - Dow: all(dow), + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: all(hours), + Dom: all(dom), + Month: all(months), + Dow: all(dow), + Location: loc, }, nil + } const every = "@every " if strings.HasPrefix(descriptor, every) { duration, err := time.ParseDuration(descriptor[len(every):]) if err != nil { - return nil, fmt.Errorf("Failed to parse duration %s: %s", descriptor, err) + return nil, fmt.Errorf("failed to parse duration %s: %s", descriptor, err) } return Every(duration), nil } - return nil, fmt.Errorf("Unrecognized descriptor: %s", descriptor) + return nil, fmt.Errorf("unrecognized descriptor: %s", descriptor) } diff --git a/vendor/github.com/robfig/cron/spec.go b/vendor/github.com/robfig/cron/v3/spec.go similarity index 74% rename from vendor/github.com/robfig/cron/spec.go rename to vendor/github.com/robfig/cron/v3/spec.go index aac9a60b95..fa1e241e5f 100644 --- a/vendor/github.com/robfig/cron/spec.go +++ b/vendor/github.com/robfig/cron/v3/spec.go @@ -6,6 +6,9 @@ import "time" // traditional crontab specification. It is computed initially and stored as bit sets. type SpecSchedule struct { Second, Minute, Hour, Dom, Month, Dow uint64 + + // Override location for this schedule. + Location *time.Location } // bounds provides a range of acceptable values (plus a map of name to value). @@ -53,7 +56,8 @@ const ( // Next returns the next time this schedule is activated, greater than the given // time. If no time can be found to satisfy the schedule, return the zero time. func (s *SpecSchedule) Next(t time.Time) time.Time { - // General approach: + // General approach + // // For Month, Day, Hour, Minute, Second: // Check if the time value matches. If yes, continue to the next field. // If the field doesn't match the schedule, then increment the field until it matches. @@ -61,6 +65,19 @@ func (s *SpecSchedule) Next(t time.Time) time.Time { // of the field list (since it is necessary to re-verify previous field // values) + // Convert the given time into the schedule's timezone, if one is specified. + // Save the original timezone so we can convert back after we find a time. + // Note that schedules without a time zone specified (time.Local) are treated + // as local to the time provided. + origLocation := t.Location() + loc := s.Location + if loc == time.Local { + loc = t.Location() + } + if s.Location != time.Local { + t = t.In(s.Location) + } + // Start at the earliest possible time (the upcoming second). t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond) @@ -82,7 +99,7 @@ WRAP: if !added { added = true // Otherwise, set the date at the beginning (since the current time is irrelevant). - t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, t.Location()) + t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, loc) } t = t.AddDate(0, 1, 0) @@ -93,12 +110,25 @@ WRAP: } // Now get a day in that month. + // + // NOTE: This causes issues for daylight savings regimes where midnight does + // not exist. For example: Sao Paulo has DST that transforms midnight on + // 11/3 into 1am. Handle that by noticing when the Hour ends up != 0. for !dayMatches(s, t) { if !added { added = true - t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) + t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, loc) } t = t.AddDate(0, 0, 1) + // Notice if the hour is no longer midnight due to DST. + // Add an hour if it's 23, subtract an hour if it's 1. + if t.Hour() != 0 { + if t.Hour() > 12 { + t = t.Add(time.Duration(24-t.Hour()) * time.Hour) + } else { + t = t.Add(time.Duration(-t.Hour()) * time.Hour) + } + } if t.Day() == 1 { goto WRAP @@ -108,7 +138,7 @@ WRAP: for 1<