Skip to content

Commit

Permalink
Merge pull request #4224 from hashicorp/b-cron-parse
Browse files Browse the repository at this point in the history
Handle potential panic in cron parsing
  • Loading branch information
dadgar committed Apr 26, 2018
2 parents 64742cb + dc19a38 commit f1bd5ce
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 37 deletions.
7 changes: 4 additions & 3 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/gorhill/cronexpr"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

const (
Expand Down Expand Up @@ -537,14 +538,14 @@ func (p *PeriodicConfig) Canonicalize() {
// passed time. If no matching instance exists, the zero value of time.Time is
// returned. The `time.Location` of the returned value matches that of the
// passed time.
func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
func (p *PeriodicConfig) Next(fromTime time.Time) (time.Time, error) {
if *p.SpecType == PeriodicSpecCron {
if e, err := cronexpr.Parse(*p.Spec); err == nil {
return e.Next(fromTime)
return structs.CronParseNext(e, fromTime, *p.Spec)
}
}

return time.Time{}
return time.Time{}, nil
}

func (p *PeriodicConfig) GetLocation() (*time.Location, error) {
Expand Down
10 changes: 7 additions & 3 deletions command/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,13 @@ func (c *JobRunCommand) Run(args []string) int {
loc, err := job.Periodic.GetLocation()
if err == nil {
now := time.Now().In(loc)
next := job.Periodic.Next(now)
c.Ui.Output(fmt.Sprintf("Approximate next launch time: %s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second)))
next, err := job.Periodic.Next(now)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error determining next launch time: %v", err))
} else {
c.Ui.Output(fmt.Sprintf("Approximate next launch time: %s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second)))
}
}
} else if !paramjob {
c.Ui.Output("Evaluation ID: " + evalID)
Expand Down
10 changes: 6 additions & 4 deletions command/job_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,12 @@ func (c *JobStatusCommand) Run(args []string) int {
location, err := job.Periodic.GetLocation()
if err == nil {
now := time.Now().In(location)
next := job.Periodic.Next(now)
basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s",
fmt.Sprintf("%s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second))))
next, err := job.Periodic.Next(now)
if err == nil {
basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s",
fmt.Sprintf("%s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second))))
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// tracking it.
if err := n.periodicDispatcher.Add(req.Job); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
return err
return fmt.Errorf("failed adding job to periodic dispatcher: %v", err)
}

// Create a watch set
Expand Down
5 changes: 4 additions & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,10 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)

// If it is a periodic job calculate the next launch
if args.Job.IsPeriodic() && args.Job.Periodic.Enabled {
reply.NextPeriodicLaunch = args.Job.Periodic.Next(time.Now().In(args.Job.Periodic.GetLocation()))
reply.NextPeriodicLaunch, err = args.Job.Periodic.Next(time.Now().In(args.Job.Periodic.GetLocation()))
if err != nil {
return fmt.Errorf("Failed to parse cron expression: %v", err)
}
}

reply.FailedTGAllocs = updatedEval.FailedTGAllocs
Expand Down
9 changes: 7 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ func (s *Server) restorePeriodicDispatcher() error {
}

if err := s.periodicDispatcher.Add(job); err != nil {
return err
s.logger.Printf("[ERR] nomad.periodic: %v", err)
continue
}

// We do not need to force run the job since it isn't active.
Expand All @@ -400,7 +401,11 @@ func (s *Server) restorePeriodicDispatcher() error {
}

// nextLaunch is the next launch that should occur.
nextLaunch := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))
nextLaunch, err := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))
if err != nil {
s.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic launch for job %s: %v", job.NamespacedID(), err)
continue
}

// We skip force launching the job if there should be no next launch
// (the zero case) or if the next launch time is in the future. If it is
Expand Down
13 changes: 9 additions & 4 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {

// Add or update the job.
p.tracked[tuple] = job
next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
next, err := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
if err != nil {
return fmt.Errorf("failed adding job %s: %v", job.NamespacedID(), err)
}
if tracked {
if err := p.heap.Update(job, next); err != nil {
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
Expand Down Expand Up @@ -344,9 +347,11 @@ func (p *PeriodicDispatch) run(ctx context.Context) {
func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
p.l.Lock()

nextLaunch := job.Periodic.Next(launchTime)
if err := p.heap.Update(job, nextLaunch); err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q (%s): %v", job.ID, job.Namespace, err)
nextLaunch, err := job.Periodic.Next(launchTime)
if err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to parse next periodic launch for job %s: %v", job.NamespacedID(), err)
} else if err := p.heap.Update(job, nextLaunch); err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %s: %v", job.NamespacedID(), err)
}

// If the job prohibits overlapping and there are running children, we skip
Expand Down
33 changes: 27 additions & 6 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1997,6 +1997,14 @@ type Job struct {
JobModifyIndex uint64
}

// NamespacedID returns the namespaced id useful for logging
func (j *Job) NamespacedID() *NamespacedID {
return &NamespacedID{
ID: j.ID,
Namespace: j.Namespace,
}
}

// Canonicalize is used to canonicalize fields in the Job. This should be called
// when registering a Job. A set of warnings are returned if the job was changed
// in anyway that the user should be made aware of.
Expand Down Expand Up @@ -2649,28 +2657,41 @@ func (p *PeriodicConfig) Canonicalize() {
p.location = l
}

// CronParseNext is a helper that parses the next time for the given expression
// but captures any panic that may occur in the underlying library.
func CronParseNext(e *cronexpr.Expression, fromTime time.Time, spec string) (t time.Time, err error) {
defer func() {
if recover() != nil {
t = time.Time{}
err = fmt.Errorf("failed parsing cron expression: %q", spec)
}
}()

return e.Next(fromTime), nil
}

// Next returns the closest time instant matching the spec that is after the
// passed time. If no matching instance exists, the zero value of time.Time is
// returned. The `time.Location` of the returned value matches that of the
// passed time.
func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
func (p *PeriodicConfig) Next(fromTime time.Time) (time.Time, error) {
switch p.SpecType {
case PeriodicSpecCron:
if e, err := cronexpr.Parse(p.Spec); err == nil {
return e.Next(fromTime)
return CronParseNext(e, fromTime, p.Spec)
}
case PeriodicSpecTest:
split := strings.Split(p.Spec, ",")
if len(split) == 1 && split[0] == "" {
return time.Time{}
return time.Time{}, nil
}

// Parse the times
times := make([]time.Time, len(split))
for i, s := range split {
unix, err := strconv.Atoi(s)
if err != nil {
return time.Time{}
return time.Time{}, nil
}

times[i] = time.Unix(int64(unix), 0)
Expand All @@ -2679,12 +2700,12 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
// Find the next match
for _, next := range times {
if fromTime.Before(next) {
return next
return next, nil
}
}
}

return time.Time{}
return time.Time{}, nil
}

// GetLocation returns the location to use for determining the time zone to run
Expand Down
56 changes: 43 additions & 13 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2009,15 +2009,44 @@ func TestPeriodicConfig_ValidCron(t *testing.T) {
}

func TestPeriodicConfig_NextCron(t *testing.T) {
require := require.New(t)

type testExpectation struct {
Time time.Time
HasError bool
ErrorMsg string
}

from := time.Date(2009, time.November, 10, 23, 22, 30, 0, time.UTC)
specs := []string{"0 0 29 2 * 1980", "*/5 * * * *"}
expected := []time.Time{{}, time.Date(2009, time.November, 10, 23, 25, 0, 0, time.UTC)}
specs := []string{"0 0 29 2 * 1980",
"*/5 * * * *",
"1 15-0 * * 1-5"}
expected := []*testExpectation{
{
Time: time.Time{},
HasError: false,
},
{
Time: time.Date(2009, time.November, 10, 23, 25, 0, 0, time.UTC),
HasError: false,
},
{
Time: time.Time{},
HasError: true,
ErrorMsg: "failed parsing cron expression",
},
}

for i, spec := range specs {
p := &PeriodicConfig{Enabled: true, SpecType: PeriodicSpecCron, Spec: spec}
p.Canonicalize()
n := p.Next(from)
if expected[i] != n {
t.Fatalf("Next(%v) returned %v; want %v", from, n, expected[i])
n, err := p.Next(from)
nextExpected := expected[i]

require.Equal(nextExpected.Time, n)
require.Equal(err != nil, nextExpected.HasError)
if err != nil {
require.True(strings.Contains(err.Error(), nextExpected.ErrorMsg))
}
}
}
Expand All @@ -2034,6 +2063,8 @@ func TestPeriodicConfig_ValidTimeZone(t *testing.T) {
}

func TestPeriodicConfig_DST(t *testing.T) {
require := require.New(t)

// On Sun, Mar 12, 2:00 am 2017: +1 hour UTC
p := &PeriodicConfig{
Enabled: true,
Expand All @@ -2050,15 +2081,14 @@ func TestPeriodicConfig_DST(t *testing.T) {
e1 := time.Date(2017, time.March, 11, 10, 0, 0, 0, time.UTC)
e2 := time.Date(2017, time.March, 12, 9, 0, 0, 0, time.UTC)

n1 := p.Next(t1).UTC()
n2 := p.Next(t2).UTC()
n1, err := p.Next(t1)
require.Nil(err)

if !reflect.DeepEqual(e1, n1) {
t.Fatalf("Got %v; want %v", n1, e1)
}
if !reflect.DeepEqual(e2, n2) {
t.Fatalf("Got %v; want %v", n1, e1)
}
n2, err := p.Next(t2)
require.Nil(err)

require.Equal(e1, n1.UTC())
require.Equal(e2, n2.UTC())
}

func TestRestartPolicy_Validate(t *testing.T) {
Expand Down

0 comments on commit f1bd5ce

Please sign in to comment.