diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 863748a..b77ec4d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,22 +12,28 @@ jobs: name: Test on go ${{ matrix.go_version }} strategy: matrix: - go_version: ["1.19", "1.20", "1.21"] + go_version: [ + "1.19", + "1.20", + "1.21", + ] runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - + - name: Redis Server in GitHub Actions - uses: supercharge/redis-github-action@1.4.0 + uses: supercharge/redis-github-action@1.7.0 - name: Set up Go uses: actions/setup-go@v3 with: go-version: ${{ matrix.go_version }} - name: Test - run: go test -coverprofile=coverage.txt -covermode=atomic -v ./... + run: go test -v -timeout 30m -coverprofile=coverage.txt -covermode=atomic ./... + - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v3 env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + diff --git a/.gitignore b/.gitignore index 6ad0309..12bcf8c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ .idea .idea/* -bin/ \ No newline at end of file +bin/ +coverage.txt +coverage.html \ No newline at end of file diff --git a/cron/LICENSE b/cron/LICENSE new file mode 100644 index 0000000..3a0f627 --- /dev/null +++ b/cron/LICENSE @@ -0,0 +1,21 @@ +Copyright (C) 2012 Rob Figueiredo +All Rights Reserved. + +MIT LICENSE + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/cron/README.md b/cron/README.md new file mode 100644 index 0000000..f3ddd13 --- /dev/null +++ b/cron/README.md @@ -0,0 +1,3 @@ +# cron + +fork from [robfig/cron](github.com/robfig/cron) \ No newline at end of file diff --git a/cron/chain.go b/cron/chain.go new file mode 100644 index 0000000..fdec8c9 --- /dev/null +++ b/cron/chain.go @@ -0,0 +1,97 @@ +package cron + +import ( + "fmt" + "runtime" + "sync" + "time" + + "github.com/libi/dcron/dlog" +) + +// JobWrapper decorates the given Job with some behavior. +type JobWrapper func(Job) Job + +// Chain is a sequence of JobWrappers that decorates submitted jobs with +// cross-cutting behaviors like logging or synchronization. +type Chain struct { + wrappers []JobWrapper +} + +// NewChain returns a Chain consisting of the given JobWrappers. +func NewChain(c ...JobWrapper) Chain { + return Chain{c} +} + +// Then decorates the given job with all JobWrappers in the chain. +// +// This: +// +// NewChain(m1, m2, m3).Then(job) +// +// is equivalent to: +// +// m1(m2(m3(job))) +func (c Chain) Then(j Job) Job { + for i := range c.wrappers { + j = c.wrappers[len(c.wrappers)-i-1](j) + } + return j +} + +// Recover panics in wrapped jobs and log them with the provided logger. +func Recover(logger dlog.Logger) JobWrapper { + return func(j Job) Job { + return FuncJob(func() { + defer func() { + if r := recover(); r != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + err, ok := r.(error) + if !ok { + err = fmt.Errorf("%v", r) + } + logger.Errorf("panic: stack %v\n%v\n", err, string(buf)) + } + }() + j.Run() + }) + } +} + +// DelayIfStillRunning serializes jobs, delaying subsequent runs until the +// previous one is complete. Jobs running after a delay of more than a minute +// have the delay logged at Info. +func DelayIfStillRunning(logger dlog.Logger) JobWrapper { + return func(j Job) Job { + var mu sync.Mutex + return FuncJob(func() { + start := time.Now() + mu.Lock() + defer mu.Unlock() + if dur := time.Since(start); dur > time.Minute { + logger.Infof("delay duration=%v", dur) + } + j.Run() + }) + } +} + +// SkipIfStillRunning skips an invocation of the Job if a previous invocation is +// still running. It logs skips to the given logger at Info level. +func SkipIfStillRunning(logger dlog.Logger) JobWrapper { + return func(j Job) Job { + var ch = make(chan struct{}, 1) + ch <- struct{}{} + return FuncJob(func() { + select { + case v := <-ch: + defer func() { ch <- v }() + j.Run() + default: + logger.Infof("skip") + } + }) + } +} diff --git a/cron/chain_test.go b/cron/chain_test.go new file mode 100644 index 0000000..240deb7 --- /dev/null +++ b/cron/chain_test.go @@ -0,0 +1,244 @@ +package cron + +import ( + "io" + "log" + "reflect" + "sync" + "testing" + "time" + + "github.com/libi/dcron/dlog" +) + +func appendingJob(slice *[]int, value int) Job { + var m sync.Mutex + return FuncJob(func() { + m.Lock() + *slice = append(*slice, value) + m.Unlock() + }) +} + +func appendingWrapper(slice *[]int, value int) JobWrapper { + return func(j Job) Job { + return FuncJob(func() { + appendingJob(slice, value).Run() + j.Run() + }) + } +} + +func TestChain(t *testing.T) { + var nums []int + var ( + append1 = appendingWrapper(&nums, 1) + append2 = appendingWrapper(&nums, 2) + append3 = appendingWrapper(&nums, 3) + append4 = appendingJob(&nums, 4) + ) + NewChain(append1, append2, append3).Then(append4).Run() + if !reflect.DeepEqual(nums, []int{1, 2, 3, 4}) { + t.Error("unexpected order of calls:", nums) + } +} + +func TestChainRecover(t *testing.T) { + panickingJob := FuncJob(func() { + panic("panickingJob panics") + }) + + t.Run("panic exits job by default", + func(t *testing.T) { + defer func() { + if err := recover(); err == nil { + t.Errorf("panic expected, but none received") + } + }() + NewChain().Then(panickingJob). + Run() + }) + + t.Run("Recovering JobWrapper recovers", func(_ *testing.T) { + NewChain(Recover(dlog.DefaultPrintfLogger(log.New(io.Discard, "", 0)))). + Then(panickingJob). + Run() + }) + + t.Run("composed with the *IfStillRunning wrappers", func(_ *testing.T) { + NewChain(Recover(dlog.DefaultPrintfLogger(log.New(io.Discard, "", 0)))). + Then(panickingJob). + Run() + }) +} + +type countJob struct { + m sync.Mutex + started int + done int + delay time.Duration +} + +func (j *countJob) Run() { + j.m.Lock() + j.started++ + j.m.Unlock() + time.Sleep(j.delay) + j.m.Lock() + j.done++ + j.m.Unlock() +} + +func (j *countJob) Started() int { + defer j.m.Unlock() + j.m.Lock() + return j.started +} + +func (j *countJob) Done() int { + defer j.m.Unlock() + j.m.Lock() + return j.done +} + +func TestChainDelayIfStillRunning(t *testing.T) { + + t.Run("runs immediately", func(t *testing.T) { + var j countJob + wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) + go wrappedJob.Run() + time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete. + if c := j.Done(); c != 1 { + t.Errorf("expected job run once, immediately, got %d", c) + } + }) + + t.Run("second run immediate if first done", func(t *testing.T) { + var j countJob + wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) + go func() { + go wrappedJob.Run() + time.Sleep(time.Millisecond) + go wrappedJob.Run() + }() + time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete. + if c := j.Done(); c != 2 { + t.Errorf("expected job run twice, immediately, got %d", c) + } + }) + + t.Run("second run delayed if first not done", func(t *testing.T) { + var j countJob + j.delay = 10 * time.Millisecond + wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) + go func() { + go wrappedJob.Run() + time.Sleep(time.Millisecond) + go wrappedJob.Run() + }() + + // After 5ms, the first job is still in progress, and the second job was + // run but should be waiting for it to finish. + time.Sleep(5 * time.Millisecond) + started, done := j.Started(), j.Done() + if started != 1 || done != 0 { + t.Error("expected first job started, but not finished, got", started, done) + } + + // Verify that the second job completes. + time.Sleep(25 * time.Millisecond) + started, done = j.Started(), j.Done() + if started != 2 || done != 2 { + t.Error("expected both jobs done, got", started, done) + } + }) +} + +func TestChainSkipIfStillRunning(t *testing.T) { + + t.Run("runs immediately", func(t *testing.T) { + var j countJob + wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) + go wrappedJob.Run() + time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete. + if c := j.Done(); c != 1 { + t.Errorf("expected job run once, immediately, got %d", c) + } + }) + + t.Run("second run immediate if first done", func(t *testing.T) { + var j countJob + wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) + go func() { + go wrappedJob.Run() + time.Sleep(time.Millisecond) + go wrappedJob.Run() + }() + time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete. + if c := j.Done(); c != 2 { + t.Errorf("expected job run twice, immediately, got %d", c) + } + }) + + t.Run("second run skipped if first not done", func(t *testing.T) { + var j countJob + j.delay = 10 * time.Millisecond + wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) + go func() { + go wrappedJob.Run() + time.Sleep(time.Millisecond) + go wrappedJob.Run() + }() + + // After 5ms, the first job is still in progress, and the second job was + // aleady skipped. + time.Sleep(5 * time.Millisecond) + started, done := j.Started(), j.Done() + if started != 1 || done != 0 { + t.Error("expected first job started, but not finished, got", started, done) + } + + // Verify that the first job completes and second does not run. + time.Sleep(25 * time.Millisecond) + started, done = j.Started(), j.Done() + if started != 1 || done != 1 { + t.Error("expected second job skipped, got", started, done) + } + }) + + t.Run("skip 10 jobs on rapid fire", func(t *testing.T) { + var j countJob + j.delay = 10 * time.Millisecond + wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) + for i := 0; i < 11; i++ { + go wrappedJob.Run() + } + time.Sleep(200 * time.Millisecond) + done := j.Done() + if done != 1 { + t.Error("expected 1 jobs executed, 10 jobs dropped, got", done) + } + }) + + t.Run("different jobs independent", func(t *testing.T) { + var j1, j2 countJob + j1.delay = 10 * time.Millisecond + j2.delay = 10 * time.Millisecond + chain := NewChain(SkipIfStillRunning(DiscardLogger)) + wrappedJob1 := chain.Then(&j1) + wrappedJob2 := chain.Then(&j2) + for i := 0; i < 11; i++ { + go wrappedJob1.Run() + go wrappedJob2.Run() + } + time.Sleep(100 * time.Millisecond) + var ( + done1 = j1.Done() + done2 = j2.Done() + ) + if done1 != 1 || done2 != 1 { + t.Error("expected both jobs executed once, got", done1, "and", done2) + } + }) + +} diff --git a/cron/constantdelay.go b/cron/constantdelay.go new file mode 100644 index 0000000..cd6e7b1 --- /dev/null +++ b/cron/constantdelay.go @@ -0,0 +1,27 @@ +package cron + +import "time" + +// ConstantDelaySchedule represents a simple recurring duty cycle, e.g. "Every 5 minutes". +// It does not support jobs more frequent than once a second. +type ConstantDelaySchedule struct { + Delay time.Duration +} + +// Every returns a crontab Schedule that activates once every duration. +// Delays of less than a second are not supported (will round up to 1 second). +// Any fields less than a Second are truncated. +func Every(duration time.Duration) ConstantDelaySchedule { + if duration < time.Second { + duration = time.Second + } + return ConstantDelaySchedule{ + Delay: duration - time.Duration(duration.Nanoseconds())%time.Second, + } +} + +// Next returns the next time this should be run. +// This rounds so that the next activation time will be on the second. +func (schedule ConstantDelaySchedule) Next(t time.Time) time.Time { + return t.Add(schedule.Delay - time.Duration(t.Nanosecond())*time.Nanosecond) +} diff --git a/cron/constantdelay_test.go b/cron/constantdelay_test.go new file mode 100644 index 0000000..f43a58a --- /dev/null +++ b/cron/constantdelay_test.go @@ -0,0 +1,54 @@ +package cron + +import ( + "testing" + "time" +) + +func TestConstantDelayNext(t *testing.T) { + tests := []struct { + time string + delay time.Duration + expected string + }{ + // Simple cases + {"Mon Jul 9 14:45 2012", 15*time.Minute + 50*time.Nanosecond, "Mon Jul 9 15:00 2012"}, + {"Mon Jul 9 14:59 2012", 15 * time.Minute, "Mon Jul 9 15:14 2012"}, + {"Mon Jul 9 14:59:59 2012", 15 * time.Minute, "Mon Jul 9 15:14:59 2012"}, + + // Wrap around hours + {"Mon Jul 9 15:45 2012", 35 * time.Minute, "Mon Jul 9 16:20 2012"}, + + // Wrap around days + {"Mon Jul 9 23:46 2012", 14 * time.Minute, "Tue Jul 10 00:00 2012"}, + {"Mon Jul 9 23:45 2012", 35 * time.Minute, "Tue Jul 10 00:20 2012"}, + {"Mon Jul 9 23:35:51 2012", 44*time.Minute + 24*time.Second, "Tue Jul 10 00:20:15 2012"}, + {"Mon Jul 9 23:35:51 2012", 25*time.Hour + 44*time.Minute + 24*time.Second, "Thu Jul 11 01:20:15 2012"}, + + // Wrap around months + {"Mon Jul 9 23:35 2012", 91*24*time.Hour + 25*time.Minute, "Thu Oct 9 00:00 2012"}, + + // Wrap around minute, hour, day, month, and year + {"Mon Dec 31 23:59:45 2012", 15 * time.Second, "Tue Jan 1 00:00:00 2013"}, + + // Round to nearest second on the delay + {"Mon Jul 9 14:45 2012", 15*time.Minute + 50*time.Nanosecond, "Mon Jul 9 15:00 2012"}, + + // Round up to 1 second if the duration is less. + {"Mon Jul 9 14:45:00 2012", 15 * time.Millisecond, "Mon Jul 9 14:45:01 2012"}, + + // Round to nearest second when calculating the next time. + {"Mon Jul 9 14:45:00.005 2012", 15 * time.Minute, "Mon Jul 9 15:00 2012"}, + + // Round to nearest second for both. + {"Mon Jul 9 14:45:00.005 2012", 15*time.Minute + 50*time.Nanosecond, "Mon Jul 9 15:00 2012"}, + } + + for _, c := range tests { + actual := Every(c.delay).Next(getTime(c.time)) + expected := getTime(c.expected) + if actual != expected { + t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.delay, expected, actual) + } + } +} diff --git a/cron/cron.go b/cron/cron.go new file mode 100644 index 0000000..17bcbfb --- /dev/null +++ b/cron/cron.go @@ -0,0 +1,357 @@ +package cron + +import ( + "context" + "sort" + "sync" + "time" + + "github.com/libi/dcron/dlog" +) + +// Cron keeps track of any number of entries, invoking the associated func as +// specified by the schedule. It may be started, stopped, and the entries may +// be inspected while running. +type Cron struct { + entries []*Entry + chain Chain + stop chan struct{} + add chan *Entry + remove chan EntryID + snapshot chan chan []Entry + running bool + logger dlog.Logger + runningMu sync.Mutex + location *time.Location + parser ScheduleParser + nextID EntryID + jobWaiter sync.WaitGroup +} + +// ScheduleParser is an interface for schedule spec parsers that return a Schedule +type ScheduleParser interface { + Parse(spec string) (Schedule, error) +} + +// Job is an interface for submitted cron jobs. +type Job interface { + Run() +} + +// Schedule describes a job's duty cycle. +type Schedule interface { + // Next returns the next activation time, later than the given time. + // Next is invoked initially, and then each time the job is run. + Next(time.Time) time.Time +} + +// EntryID identifies an entry within a Cron instance +type EntryID int + +// Entry consists of a schedule and the func to execute on that schedule. +type Entry struct { + // ID is the cron-assigned ID of this entry, which may be used to look up a + // snapshot or remove it. + ID EntryID + + // Schedule on which this job should be run. + Schedule Schedule + + // Next time the job will run, or the zero time if Cron has not been + // started or this entry's schedule is unsatisfiable + Next time.Time + + // Prev is the last time this job was run, or the zero time if never. + Prev time.Time + + // WrappedJob is the thing to run when the Schedule is activated. + WrappedJob Job + + // Job is the thing that was submitted to cron. + // It is kept around so that user code that needs to get at the job later, + // e.g. via Entries() can do so. + Job Job +} + +// Valid returns true if this is not the zero entry. +func (e Entry) Valid() bool { return e.ID != 0 } + +// byTime is a wrapper for sorting the entry array by time +// (with zero time at the end). +type byTime []*Entry + +func (s byTime) Len() int { return len(s) } +func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s byTime) Less(i, j int) bool { + // Two zero times should return false. + // Otherwise, zero is "greater" than any other time. + // (To sort it at the end of the list.) + if s[i].Next.IsZero() { + return false + } + if s[j].Next.IsZero() { + return true + } + return s[i].Next.Before(s[j].Next) +} + +// New returns a new Cron job runner, modified by the given options. +// +// Available Settings +// +// Time Zone +// Description: The time zone in which schedules are interpreted +// Default: time.Local +// +// Parser +// Description: Parser converts cron spec strings into cron.Schedules. +// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron +// +// Chain +// Description: Wrap submitted jobs to customize behavior. +// Default: A chain that recovers panics and logs them to stderr. +// +// See "cron.With*" to modify the default behavior. +func New(opts ...Option) *Cron { + c := &Cron{ + entries: nil, + chain: NewChain(), + add: make(chan *Entry), + stop: make(chan struct{}), + snapshot: make(chan chan []Entry), + remove: make(chan EntryID), + running: false, + runningMu: sync.Mutex{}, + logger: DefaultLogger, + location: time.Local, + parser: standardParser, + } + for _, opt := range opts { + opt(c) + } + return c +} + +// FuncJob is a wrapper that turns a func() into a cron.Job +type FuncJob func() + +func (f FuncJob) Run() { f() } + +// AddFunc adds a func to the Cron to be run on the given schedule. +// The spec is parsed using the time zone of this Cron instance as the default. +// An opaque ID is returned that can be used to later remove it. +func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) { + return c.AddJob(spec, FuncJob(cmd)) +} + +// AddJob adds a Job to the Cron to be run on the given schedule. +// The spec is parsed using the time zone of this Cron instance as the default. +// An opaque ID is returned that can be used to later remove it. +func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) { + schedule, err := c.parser.Parse(spec) + if err != nil { + return 0, err + } + return c.Schedule(schedule, cmd), nil +} + +// Schedule adds a Job to the Cron to be run on the given schedule. +// The job is wrapped with the configured Chain. +func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { + c.runningMu.Lock() + defer c.runningMu.Unlock() + c.nextID++ + entry := &Entry{ + ID: c.nextID, + Schedule: schedule, + WrappedJob: c.chain.Then(cmd), + Job: cmd, + } + if !c.running { + c.entries = append(c.entries, entry) + } else { + c.add <- entry + } + return entry.ID +} + +// Entries returns a snapshot of the cron entries. +func (c *Cron) Entries() []Entry { + c.runningMu.Lock() + defer c.runningMu.Unlock() + if c.running { + replyChan := make(chan []Entry, 1) + c.snapshot <- replyChan + return <-replyChan + } + return c.entrySnapshot() +} + +// Location gets the time zone location +func (c *Cron) Location() *time.Location { + return c.location +} + +// Entry returns a snapshot of the given entry, or nil if it couldn't be found. +func (c *Cron) Entry(id EntryID) Entry { + for _, entry := range c.Entries() { + if id == entry.ID { + return entry + } + } + return Entry{} +} + +// Remove an entry from being run in the future. +func (c *Cron) Remove(id EntryID) { + c.runningMu.Lock() + defer c.runningMu.Unlock() + if c.running { + c.remove <- id + } else { + c.removeEntry(id) + } +} + +// Start the cron scheduler in its own goroutine, or no-op if already started. +func (c *Cron) Start() { + c.runningMu.Lock() + defer c.runningMu.Unlock() + if c.running { + return + } + c.running = true + go c.run() +} + +// Run the cron scheduler, or no-op if already running. +func (c *Cron) Run() { + c.runningMu.Lock() + if c.running { + c.runningMu.Unlock() + return + } + c.running = true + c.runningMu.Unlock() + c.run() +} + +// run the scheduler.. this is private just due to the need to synchronize +// access to the 'running' state variable. +func (c *Cron) run() { + c.logger.Infof("start") + + // Figure out the next activation times for each entry. + now := c.now() + for _, entry := range c.entries { + entry.Next = entry.Schedule.Next(now) + c.logger.Infof("schedule|now=%v, entry=%v, next=%v", now, entry.ID, entry.Next) + } + + for { + // Determine the next entry to run. + sort.Sort(byTime(c.entries)) + + var timer *time.Timer + if len(c.entries) == 0 || c.entries[0].Next.IsZero() { + // If there are no entries yet, just sleep - it still handles new entries + // and stop requests. + timer = time.NewTimer(100000 * time.Hour) + } else { + timer = time.NewTimer(c.entries[0].Next.Sub(now)) + } + + for { + select { + case now = <-timer.C: + now = now.In(c.location) + c.logger.Infof("wake|now=%v", now) + + // Run every entry whose next time was less than now + for _, e := range c.entries { + if e.Next.After(now) || e.Next.IsZero() { + break + } + c.startJob(e.WrappedJob) + e.Prev = e.Next + e.Next = e.Schedule.Next(now) + c.logger.Infof("run|now=%v, entry=%v, next=%v", now, e.ID, e.Next) + } + + case newEntry := <-c.add: + timer.Stop() + now = c.now() + newEntry.Next = newEntry.Schedule.Next(now) + c.entries = append(c.entries, newEntry) + c.logger.Infof("added|now=%v, entry=%v, next=%v", now, newEntry.ID, newEntry.Next) + + case replyChan := <-c.snapshot: + replyChan <- c.entrySnapshot() + continue + + case <-c.stop: + timer.Stop() + c.logger.Infof("stop") + return + + case id := <-c.remove: + timer.Stop() + now = c.now() + c.removeEntry(id) + c.logger.Infof("removed|entry=%v", id) + } + + break + } + } +} + +// startJob runs the given job in a new goroutine. +func (c *Cron) startJob(j Job) { + c.jobWaiter.Add(1) + go func() { + defer c.jobWaiter.Done() + j.Run() + }() +} + +// now returns current time in c location +func (c *Cron) now() time.Time { + return time.Now().In(c.location) +} + +// Stop stops the cron scheduler if it is running; otherwise it does nothing. +// A context is returned so the caller can wait for running jobs to complete. +func (c *Cron) Stop() context.Context { + c.runningMu.Lock() + defer c.runningMu.Unlock() + if c.running { + c.stop <- struct{}{} + c.running = false + } + ctx, cancel := context.WithCancel(context.Background()) + go func() { + c.jobWaiter.Wait() + cancel() + }() + return ctx +} + +// entrySnapshot returns a copy of the current cron entry list. +func (c *Cron) entrySnapshot() []Entry { + var entries = make([]Entry, len(c.entries)) + for i, e := range c.entries { + entries[i] = *e + } + return entries +} + +func (c *Cron) removeEntry(id EntryID) { + var entries []*Entry + for _, e := range c.entries { + if e.ID != id { + entries = append(entries, e) + } + } + c.entries = entries +} diff --git a/cron/cron_test.go b/cron/cron_test.go new file mode 100644 index 0000000..696eea2 --- /dev/null +++ b/cron/cron_test.go @@ -0,0 +1,696 @@ +package cron + +import ( + "bytes" + "fmt" + "log" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/libi/dcron/dlog" +) + +// Many tests schedule a job for every second, and then wait at most a second +// for it to run. This amount is just slightly larger than 1 second to +// compensate for a few milliseconds of runtime. +const OneSecond = 1*time.Second + 50*time.Millisecond + +type syncWriter struct { + wr bytes.Buffer + m sync.Mutex +} + +func (sw *syncWriter) Write(data []byte) (n int, err error) { + sw.m.Lock() + n, err = sw.wr.Write(data) + sw.m.Unlock() + return +} + +func (sw *syncWriter) String() string { + sw.m.Lock() + defer sw.m.Unlock() + return sw.wr.String() +} + +func newBufLogger(sw *syncWriter) dlog.Logger { + return dlog.DefaultPrintfLogger(log.New(sw, "", log.LstdFlags)) +} + +func TestFuncPanicRecovery(t *testing.T) { + var buf syncWriter + cron := New(WithParser(secondParser), + WithChain(Recover(newBufLogger(&buf)))) + cron.Start() + defer cron.Stop() + cron.AddFunc("* * * * * ?", func() { + panic("YOLO") + }) + + time.Sleep(OneSecond) + if !strings.Contains(buf.String(), "YOLO") { + t.Error("expected a panic to be logged, got none") + } +} + +type DummyJob struct{} + +func (d DummyJob) Run() { + panic("YOLO") +} + +func TestJobPanicRecovery(t *testing.T) { + var job DummyJob + + var buf syncWriter + cron := New(WithParser(secondParser), + WithChain(Recover(newBufLogger(&buf)))) + cron.Start() + defer cron.Stop() + cron.AddJob("* * * * * ?", job) + + time.Sleep(OneSecond) + if !strings.Contains(buf.String(), "YOLO") { + t.Error("expected a panic to be logged, got none") + } +} + +// Start and stop cron with no entries. +func TestNoEntries(t *testing.T) { + cron := newWithSeconds() + cron.Start() + + select { + case <-time.After(OneSecond): + t.Fatal("expected cron will be stopped immediately") + case <-stop(cron): + } +} + +// Start, stop, then add an entry. Verify entry doesn't run. +func TestStopCausesJobsToNotRun(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + cron := newWithSeconds() + cron.Start() + cron.Stop() + cron.AddFunc("* * * * * ?", func() { wg.Done() }) + + select { + case <-time.After(OneSecond): + // No job ran! + case <-wait(wg): + t.Fatal("expected stopped cron does not run any job") + } +} + +// Add a job, start cron, expect it runs. +func TestAddBeforeRunning(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + cron := newWithSeconds() + cron.AddFunc("* * * * * ?", func() { wg.Done() }) + cron.Start() + defer cron.Stop() + + // Give cron 2 seconds to run our job (which is always activated). + select { + case <-time.After(OneSecond): + t.Fatal("expected job runs") + case <-wait(wg): + } +} + +// Start cron, add a job, expect it runs. +func TestAddWhileRunning(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + cron := newWithSeconds() + cron.Start() + defer cron.Stop() + cron.AddFunc("* * * * * ?", func() { wg.Done() }) + + select { + case <-time.After(OneSecond): + t.Fatal("expected job runs") + case <-wait(wg): + } +} + +// Test for #34. Adding a job after calling start results in multiple job invocations +func TestAddWhileRunningWithDelay(t *testing.T) { + cron := newWithSeconds() + cron.Start() + defer cron.Stop() + time.Sleep(5 * time.Second) + var calls int64 + cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) }) + + <-time.After(OneSecond) + if atomic.LoadInt64(&calls) != 1 { + t.Errorf("called %d times, expected 1\n", calls) + } +} + +// Add a job, remove a job, start cron, expect nothing runs. +func TestRemoveBeforeRunning(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + cron := newWithSeconds() + id, _ := cron.AddFunc("* * * * * ?", func() { wg.Done() }) + cron.Remove(id) + cron.Start() + defer cron.Stop() + + select { + case <-time.After(OneSecond): + // Success, shouldn't run + case <-wait(wg): + t.FailNow() + } +} + +// Start cron, add a job, remove it, expect it doesn't run. +func TestRemoveWhileRunning(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + cron := newWithSeconds() + cron.Start() + defer cron.Stop() + id, _ := cron.AddFunc("* * * * * ?", func() { wg.Done() }) + cron.Remove(id) + + select { + case <-time.After(OneSecond): + case <-wait(wg): + t.FailNow() + } +} + +// Test timing with Entries. +func TestSnapshotEntries(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + cron := New() + cron.AddFunc("@every 2s", func() { wg.Done() }) + cron.Start() + defer cron.Stop() + + // Cron should fire in 2 seconds. After 1 second, call Entries. + time.Sleep(OneSecond) + cron.Entries() + + // Even though Entries was called, the cron should fire at the 2 second mark. + select { + case <-time.After(OneSecond): + t.Error("expected job runs at 2 second mark") + case <-wait(wg): + } +} + +// Test that the entries are correctly sorted. +// Add a bunch of long-in-the-future entries, and an immediate entry, and ensure +// that the immediate entry runs immediately. +// Also: Test that multiple jobs run in the same instant. +func TestMultipleEntries(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(2) + + cron := newWithSeconds() + cron.AddFunc("0 0 0 1 1 ?", func() {}) + cron.AddFunc("* * * * * ?", func() { wg.Done() }) + id1, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() }) + id2, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() }) + cron.AddFunc("0 0 0 31 12 ?", func() {}) + cron.AddFunc("* * * * * ?", func() { wg.Done() }) + + cron.Remove(id1) + cron.Start() + cron.Remove(id2) + defer cron.Stop() + + select { + case <-time.After(OneSecond): + t.Error("expected job run in proper order") + case <-wait(wg): + } +} + +// Test running the same job twice. +func TestRunningJobTwice(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(2) + + cron := newWithSeconds() + cron.AddFunc("0 0 0 1 1 ?", func() {}) + cron.AddFunc("0 0 0 31 12 ?", func() {}) + cron.AddFunc("* * * * * ?", func() { wg.Done() }) + + cron.Start() + defer cron.Stop() + + select { + case <-time.After(2 * OneSecond): + t.Error("expected job fires 2 times") + case <-wait(wg): + } +} + +func TestRunningMultipleSchedules(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(2) + + cron := newWithSeconds() + cron.AddFunc("0 0 0 1 1 ?", func() {}) + cron.AddFunc("0 0 0 31 12 ?", func() {}) + cron.AddFunc("* * * * * ?", func() { wg.Done() }) + cron.Schedule(Every(time.Minute), FuncJob(func() {})) + cron.Schedule(Every(time.Second), FuncJob(func() { wg.Done() })) + cron.Schedule(Every(time.Hour), FuncJob(func() {})) + + cron.Start() + defer cron.Stop() + + select { + case <-time.After(2 * OneSecond): + t.Error("expected job fires 2 times") + case <-wait(wg): + } +} + +// Test that the cron is run in the local time zone (as opposed to UTC). +func TestLocalTimezone(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(2) + + now := time.Now() + // FIX: Issue #205 + // This calculation doesn't work in seconds 58 or 59. + // Take the easy way out and sleep. + if now.Second() >= 58 { + time.Sleep(2 * time.Second) + now = time.Now() + } + spec := fmt.Sprintf("%d,%d %d %d %d %d ?", + now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month()) + + cron := newWithSeconds() + cron.AddFunc(spec, func() { wg.Done() }) + cron.Start() + defer cron.Stop() + + select { + case <-time.After(OneSecond * 2): + t.Error("expected job fires 2 times") + case <-wait(wg): + } +} + +// Test that the cron is run in the given time zone (as opposed to local). +func TestNonLocalTimezone(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(2) + + loc, err := time.LoadLocation("Atlantic/Cape_Verde") + if err != nil { + fmt.Printf("Failed to load time zone Atlantic/Cape_Verde: %+v", err) + t.Fail() + } + + now := time.Now().In(loc) + // FIX: Issue #205 + // This calculation doesn't work in seconds 58 or 59. + // Take the easy way out and sleep. + if now.Second() >= 58 { + time.Sleep(2 * time.Second) + now = time.Now().In(loc) + } + spec := fmt.Sprintf("%d,%d %d %d %d %d ?", + now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month()) + + cron := New(WithLocation(loc), WithParser(secondParser)) + cron.AddFunc(spec, func() { wg.Done() }) + cron.Start() + defer cron.Stop() + + select { + case <-time.After(OneSecond * 2): + t.Error("expected job fires 2 times") + case <-wait(wg): + } +} + +// Test that calling stop before start silently returns without +// blocking the stop channel. +func TestStopWithoutStart(t *testing.T) { + cron := New() + cron.Stop() +} + +type testJob struct { + wg *sync.WaitGroup + name string +} + +func (t testJob) Run() { + t.wg.Done() +} + +// Test that adding an invalid job spec returns an error +func TestInvalidJobSpec(t *testing.T) { + cron := New() + _, err := cron.AddJob("this will not parse", nil) + if err == nil { + t.Errorf("expected an error with invalid spec, got nil") + } +} + +// Test blocking run method behaves as Start() +func TestBlockingRun(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + cron := newWithSeconds() + cron.AddFunc("* * * * * ?", func() { wg.Done() }) + + var unblockChan = make(chan struct{}) + + go func() { + cron.Run() + close(unblockChan) + }() + defer cron.Stop() + + select { + case <-time.After(OneSecond): + t.Error("expected job fires") + case <-unblockChan: + t.Error("expected that Run() blocks") + case <-wait(wg): + } +} + +// Test that double-running is a no-op +func TestStartNoop(t *testing.T) { + var tickChan = make(chan struct{}, 2) + + cron := newWithSeconds() + cron.AddFunc("* * * * * ?", func() { + tickChan <- struct{}{} + }) + + cron.Start() + defer cron.Stop() + + // Wait for the first firing to ensure the runner is going + <-tickChan + + cron.Start() + + <-tickChan + + // Fail if this job fires again in a short period, indicating a double-run + select { + case <-time.After(time.Millisecond): + case <-tickChan: + t.Error("expected job fires exactly twice") + } +} + +// Simple test using Runnables. +func TestJob(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + cron := newWithSeconds() + cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"}) + cron.AddJob("0 0 0 1 1 ?", testJob{wg, "job1"}) + job2, _ := cron.AddJob("* * * * * ?", testJob{wg, "job2"}) + cron.AddJob("1 0 0 1 1 ?", testJob{wg, "job3"}) + cron.Schedule(Every(5*time.Second+5*time.Nanosecond), testJob{wg, "job4"}) + job5 := cron.Schedule(Every(5*time.Minute), testJob{wg, "job5"}) + + // Test getting an Entry pre-Start. + if actualName := cron.Entry(job2).Job.(testJob).name; actualName != "job2" { + t.Error("wrong job retrieved:", actualName) + } + if actualName := cron.Entry(job5).Job.(testJob).name; actualName != "job5" { + t.Error("wrong job retrieved:", actualName) + } + + cron.Start() + defer cron.Stop() + + select { + case <-time.After(OneSecond): + t.FailNow() + case <-wait(wg): + } + + // Ensure the entries are in the right order. + expecteds := []string{"job2", "job4", "job5", "job1", "job3", "job0"} + + var actuals []string + for _, entry := range cron.Entries() { + actuals = append(actuals, entry.Job.(testJob).name) + } + + for i, expected := range expecteds { + if actuals[i] != expected { + t.Fatalf("Jobs not in the right order. (expected) %s != %s (actual)", expecteds, actuals) + } + } + + // Test getting Entries. + if actualName := cron.Entry(job2).Job.(testJob).name; actualName != "job2" { + t.Error("wrong job retrieved:", actualName) + } + if actualName := cron.Entry(job5).Job.(testJob).name; actualName != "job5" { + t.Error("wrong job retrieved:", actualName) + } +} + +// Issue #206 +// Ensure that the next run of a job after removing an entry is accurate. +func TestScheduleAfterRemoval(t *testing.T) { + var wg1 sync.WaitGroup + var wg2 sync.WaitGroup + wg1.Add(1) + wg2.Add(1) + + // The first time this job is run, set a timer and remove the other job + // 750ms later. Correct behavior would be to still run the job again in + // 250ms, but the bug would cause it to run instead 1s later. + + var calls int + var mu sync.Mutex + + cron := newWithSeconds() + hourJob := cron.Schedule(Every(time.Hour), FuncJob(func() {})) + cron.Schedule(Every(time.Second), FuncJob(func() { + mu.Lock() + defer mu.Unlock() + switch calls { + case 0: + wg1.Done() + calls++ + case 1: + time.Sleep(750 * time.Millisecond) + cron.Remove(hourJob) + calls++ + case 2: + calls++ + wg2.Done() + case 3: + panic("unexpected 3rd call") + } + })) + + cron.Start() + defer cron.Stop() + + // the first run might be any length of time 0 - 1s, since the schedule + // rounds to the second. wait for the first run to true up. + wg1.Wait() + + select { + case <-time.After(2 * OneSecond): + t.Error("expected job fires 2 times") + case <-wait(&wg2): + } +} + +type ZeroSchedule struct{} + +func (*ZeroSchedule) Next(time.Time) time.Time { + return time.Time{} +} + +// Tests that job without time does not run +func TestJobWithZeroTimeDoesNotRun(t *testing.T) { + cron := newWithSeconds() + var calls int64 + cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) }) + cron.Schedule(new(ZeroSchedule), FuncJob(func() { t.Error("expected zero task will not run") })) + cron.Start() + defer cron.Stop() + <-time.After(OneSecond) + if atomic.LoadInt64(&calls) != 1 { + t.Errorf("called %d times, expected 1\n", calls) + } +} + +func TestStopAndWait(t *testing.T) { + t.Run("nothing running, returns immediately", func(t *testing.T) { + cron := newWithSeconds() + cron.Start() + ctx := cron.Stop() + select { + case <-ctx.Done(): + case <-time.After(time.Millisecond): + t.Error("context was not done immediately") + } + }) + + t.Run("repeated calls to Stop", func(t *testing.T) { + cron := newWithSeconds() + cron.Start() + _ = cron.Stop() + time.Sleep(time.Millisecond) + ctx := cron.Stop() + select { + case <-ctx.Done(): + case <-time.After(time.Millisecond): + t.Error("context was not done immediately") + } + }) + + t.Run("a couple fast jobs added, still returns immediately", func(t *testing.T) { + cron := newWithSeconds() + cron.AddFunc("* * * * * *", func() {}) + cron.Start() + cron.AddFunc("* * * * * *", func() {}) + cron.AddFunc("* * * * * *", func() {}) + cron.AddFunc("* * * * * *", func() {}) + time.Sleep(time.Second) + ctx := cron.Stop() + select { + case <-ctx.Done(): + case <-time.After(time.Millisecond): + t.Error("context was not done immediately") + } + }) + + t.Run("a couple fast jobs and a slow job added, waits for slow job", func(t *testing.T) { + cron := newWithSeconds() + cron.AddFunc("* * * * * *", func() {}) + cron.Start() + cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) }) + cron.AddFunc("* * * * * *", func() {}) + time.Sleep(time.Second) + + ctx := cron.Stop() + + // Verify that it is not done for at least 750ms + select { + case <-ctx.Done(): + t.Error("context was done too quickly immediately") + case <-time.After(750 * time.Millisecond): + // expected, because the job sleeping for 1 second is still running + } + + // Verify that it IS done in the next 500ms (giving 250ms buffer) + select { + case <-ctx.Done(): + // expected + case <-time.After(1500 * time.Millisecond): + t.Error("context not done after job should have completed") + } + }) + + t.Run("repeated calls to stop, waiting for completion and after", func(t *testing.T) { + cron := newWithSeconds() + cron.AddFunc("* * * * * *", func() {}) + cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) }) + cron.Start() + cron.AddFunc("* * * * * *", func() {}) + time.Sleep(time.Second) + ctx := cron.Stop() + ctx2 := cron.Stop() + + // Verify that it is not done for at least 1500ms + select { + case <-ctx.Done(): + t.Error("context was done too quickly immediately") + case <-ctx2.Done(): + t.Error("context2 was done too quickly immediately") + case <-time.After(1500 * time.Millisecond): + // expected, because the job sleeping for 2 seconds is still running + } + + // Verify that it IS done in the next 1s (giving 500ms buffer) + select { + case <-ctx.Done(): + // expected + case <-time.After(time.Second): + t.Error("context not done after job should have completed") + } + + // Verify that ctx2 is also done. + select { + case <-ctx2.Done(): + // expected + case <-time.After(time.Millisecond): + t.Error("context2 not done even though context1 is") + } + + // Verify that a new context retrieved from stop is immediately done. + ctx3 := cron.Stop() + select { + case <-ctx3.Done(): + // expected + case <-time.After(time.Millisecond): + t.Error("context not done even when cron Stop is completed") + } + + }) +} + +func TestMultiThreadedStartAndStop(t *testing.T) { + cron := New() + go cron.Run() + time.Sleep(2 * time.Millisecond) + cron.Stop() +} + +func wait(wg *sync.WaitGroup) chan bool { + ch := make(chan bool) + go func() { + wg.Wait() + ch <- true + }() + return ch +} + +func stop(cron *Cron) chan bool { + ch := make(chan bool) + go func() { + cron.Stop() + ch <- true + }() + return ch +} + +// newWithSeconds returns a Cron with the seconds field enabled. +func newWithSeconds() *Cron { + return New(WithParser(secondParser), WithChain()) +} diff --git a/cron/doc.go b/cron/doc.go new file mode 100644 index 0000000..fa5d08b --- /dev/null +++ b/cron/doc.go @@ -0,0 +1,231 @@ +/* +Package cron implements a cron spec parser and job runner. + +Installation + +To download the specific tagged release, run: + + go get github.com/robfig/cron/v3@v3.0.0 + +Import it in your program as: + + import "github.com/robfig/cron/v3" + +It requires Go 1.11 or later due to usage of Go Modules. + +Usage + +Callers may register Funcs to be invoked on a given schedule. Cron will run +them in their own goroutines. + + c := cron.New() + c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") }) + c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") }) + c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") }) + c.AddFunc("@hourly", func() { fmt.Println("Every hour, starting an hour from now") }) + c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty, starting an hour thirty from now") }) + c.Start() + .. + // Funcs are invoked in their own goroutine, asynchronously. + ... + // Funcs may also be added to a running Cron + c.AddFunc("@daily", func() { fmt.Println("Every day") }) + .. + // Inspect the cron job entries' next and previous run times. + inspect(c.Entries()) + .. + c.Stop() // Stop the scheduler (does not stop any jobs already running). + +CRON Expression Format + +A cron expression represents a set of times, using 5 space-separated fields. + + Field name | Mandatory? | Allowed values | Allowed special characters + ---------- | ---------- | -------------- | -------------------------- + Minutes | Yes | 0-59 | * / , - + Hours | Yes | 0-23 | * / , - + Day of month | Yes | 1-31 | * / , - ? + Month | Yes | 1-12 or JAN-DEC | * / , - + Day of week | Yes | 0-6 or SUN-SAT | * / , - ? + +Month and Day-of-week field values are case insensitive. "SUN", "Sun", and +"sun" are equally accepted. + +The specific interpretation of the format is based on the Cron Wikipedia page: +https://en.wikipedia.org/wiki/Cron + +Alternative Formats + +Alternative Cron expression formats support other fields like seconds. You can +implement that by creating a custom Parser as follows. + + cron.New( + cron.WithParser( + cron.NewParser( + cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor))) + +Since adding Seconds is the most common modification to the standard cron spec, +cron provides a builtin function to do that, which is equivalent to the custom +parser you saw earlier, except that its seconds field is REQUIRED: + + cron.New(cron.WithSeconds()) + +That emulates Quartz, the most popular alternative Cron schedule format: +http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html + +Special Characters + +Asterisk ( * ) + +The asterisk indicates that the cron expression will match for all values of the +field; e.g., using an asterisk in the 5th field (month) would indicate every +month. + +Slash ( / ) + +Slashes are used to describe increments of ranges. For example 3-59/15 in the +1st field (minutes) would indicate the 3rd minute of the hour and every 15 +minutes thereafter. The form "*\/..." is equivalent to the form "first-last/...", +that is, an increment over the largest possible range of the field. The form +"N/..." is accepted as meaning "N-MAX/...", that is, starting at N, use the +increment until the end of that specific range. It does not wrap around. + +Comma ( , ) + +Commas are used to separate items of a list. For example, using "MON,WED,FRI" in +the 5th field (day of week) would mean Mondays, Wednesdays and Fridays. + +Hyphen ( - ) + +Hyphens are used to define ranges. For example, 9-17 would indicate every +hour between 9am and 5pm inclusive. + +Question mark ( ? ) + +Question mark may be used instead of '*' for leaving either day-of-month or +day-of-week blank. + +Predefined schedules + +You may use one of several pre-defined schedules in place of a cron expression. + + Entry | Description | Equivalent To + ----- | ----------- | ------------- + @yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 1 1 * + @monthly | Run once a month, midnight, first of month | 0 0 1 * * + @weekly | Run once a week, midnight between Sat/Sun | 0 0 * * 0 + @daily (or @midnight) | Run once a day, midnight | 0 0 * * * + @hourly | Run once an hour, beginning of hour | 0 * * * * + +Intervals + +You may also schedule a job to execute at fixed intervals, starting at the time it's added +or cron is run. This is supported by formatting the cron spec like this: + + @every + +where "duration" is a string accepted by time.ParseDuration +(http://golang.org/pkg/time/#ParseDuration). + +For example, "@every 1h30m10s" would indicate a schedule that activates after +1 hour, 30 minutes, 10 seconds, and then every interval after that. + +Note: The interval does not take the job runtime into account. For example, +if a job takes 3 minutes to run, and it is scheduled to run every 5 minutes, +it will have only 2 minutes of idle time between each run. + +Time zones + +By default, all interpretation and scheduling is done in the machine's local +time zone (time.Local). You can specify a different time zone on construction: + + cron.New( + cron.WithLocation(time.UTC)) + +Individual cron schedules may also override the time zone they are to be +interpreted in by providing an additional space-separated field at the beginning +of the cron spec, of the form "CRON_TZ=Asia/Tokyo". + +For example: + + # Runs at 6am in time.Local + cron.New().AddFunc("0 6 * * ?", ...) + + # Runs at 6am in America/New_York + nyc, _ := time.LoadLocation("America/New_York") + c := cron.New(cron.WithLocation(nyc)) + c.AddFunc("0 6 * * ?", ...) + + # Runs at 6am in Asia/Tokyo + cron.New().AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", ...) + + # Runs at 6am in Asia/Tokyo + c := cron.New(cron.WithLocation(nyc)) + c.SetLocation("America/New_York") + c.AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", ...) + +The prefix "TZ=(TIME ZONE)" is also supported for legacy compatibility. + +Be aware that jobs scheduled during daylight-savings leap-ahead transitions will +not be run! + +Job Wrappers + +A Cron runner may be configured with a chain of job wrappers to add +cross-cutting functionality to all submitted jobs. For example, they may be used +to achieve the following effects: + + - Recover any panics from jobs (activated by default) + - Delay a job's execution if the previous run hasn't completed yet + - Skip a job's execution if the previous run hasn't completed yet + - Log each job's invocations + +Install wrappers for all jobs added to a cron using the `cron.WithChain` option: + + cron.New(cron.WithChain( + cron.SkipIfStillRunning(logger), + )) + +Install wrappers for individual jobs by explicitly wrapping them: + + job = cron.NewChain( + cron.SkipIfStillRunning(logger), + ).Then(job) + +Thread safety + +Since the Cron service runs concurrently with the calling code, some amount of +care must be taken to ensure proper synchronization. + +All cron methods are designed to be correctly synchronized as long as the caller +ensures that invocations have a clear happens-before ordering between them. + +Logging + +Cron defines a Logger interface that is a subset of the one defined in +github.com/go-logr/logr. It has two logging levels (Info and Error), and +parameters are key/value pairs. This makes it possible for cron logging to plug +into structured logging systems. An adapter, [Verbose]PrintfLogger, is provided +to wrap the standard library *log.Logger. + +For additional insight into Cron operations, verbose logging may be activated +which will record job runs, scheduling decisions, and added or removed jobs. +Activate it with a one-off logger as follows: + + cron.New( + cron.WithLogger( + cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)))) + + +Implementation + +Cron entries are stored in an array, sorted by their next activation time. Cron +sleeps until the next job is due to be run. + +Upon waking: + - it runs each entry that is active on that second + - it calculates the next run times for the jobs that were run + - it re-sorts the array of entries by next activation time. + - it goes to sleep until the soonest job. +*/ +package cron diff --git a/cron/logger.go b/cron/logger.go new file mode 100644 index 0000000..8f93149 --- /dev/null +++ b/cron/logger.go @@ -0,0 +1,15 @@ +package cron + +import ( + "io" + "log" + "os" + + "github.com/libi/dcron/dlog" +) + +// DefaultLogger is used by Cron if none is specified. +var DefaultLogger dlog.Logger = dlog.DefaultPrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)) + +// DiscardLogger can be used by callers to discard all log messages. +var DiscardLogger dlog.Logger = dlog.DefaultPrintfLogger(log.New(io.Discard, "", 0)) diff --git a/cron/option.go b/cron/option.go new file mode 100644 index 0000000..4c035ce --- /dev/null +++ b/cron/option.go @@ -0,0 +1,47 @@ +package cron + +import ( + "time" + + "github.com/libi/dcron/dlog" +) + +// Option represents a modification to the default behavior of a Cron. +type Option func(*Cron) + +// WithLocation overrides the timezone of the cron instance. +func WithLocation(loc *time.Location) Option { + return func(c *Cron) { + c.location = loc + } +} + +// WithSeconds overrides the parser used for interpreting job schedules to +// include a seconds field as the first one. +func WithSeconds() Option { + return WithParser(NewParser( + Second | Minute | Hour | Dom | Month | Dow | Descriptor, + )) +} + +// WithParser overrides the parser used for interpreting job schedules. +func WithParser(p ScheduleParser) Option { + return func(c *Cron) { + c.parser = p + } +} + +// WithChain specifies Job wrappers to apply to all jobs added to this cron. +// Refer to the Chain* functions in this package for provided wrappers. +func WithChain(wrappers ...JobWrapper) Option { + return func(c *Cron) { + c.chain = NewChain(wrappers...) + } +} + +// WithLogger uses the provided logger. +func WithLogger(logger dlog.Logger) Option { + return func(c *Cron) { + c.logger = logger + } +} diff --git a/cron/option_test.go b/cron/option_test.go new file mode 100644 index 0000000..1e09b03 --- /dev/null +++ b/cron/option_test.go @@ -0,0 +1,21 @@ +package cron + +import ( + "testing" + "time" +) + +func TestWithLocation(t *testing.T) { + c := New(WithLocation(time.UTC)) + if c.location != time.UTC { + t.Errorf("expected UTC, got %v", c.location) + } +} + +func TestWithParser(t *testing.T) { + var parser = NewParser(Dow) + c := New(WithParser(parser)) + if c.parser != parser { + t.Error("expected provided parser") + } +} diff --git a/cron/parser.go b/cron/parser.go new file mode 100644 index 0000000..8da6547 --- /dev/null +++ b/cron/parser.go @@ -0,0 +1,434 @@ +package cron + +import ( + "fmt" + "math" + "strconv" + "strings" + "time" +) + +// Configuration options for creating a parser. Most options specify which +// fields should be included, while others enable features. If a field is not +// included the parser will assume a default value. These options do not change +// the order fields are parse in. +type ParseOption int + +const ( + Second ParseOption = 1 << iota // Seconds field, default 0 + SecondOptional // Optional seconds field, default 0 + Minute // Minutes field, default 0 + Hour // Hours field, default 0 + Dom // Day of month field, default * + Month // Month field, default * + Dow // Day of week field, default * + DowOptional // Optional day of week field, default * + Descriptor // Allow descriptors such as @monthly, @weekly, etc. +) + +var places = []ParseOption{ + Second, + Minute, + Hour, + Dom, + Month, + Dow, +} + +var defaults = []string{ + "0", + "0", + "0", + "*", + "*", + "*", +} + +// A custom Parser that can be configured. +type Parser struct { + options ParseOption +} + +// NewParser creates a Parser with custom options. +// +// It panics if more than one Optional is given, since it would be impossible to +// correctly infer which optional is provided or missing in general. +// +// Examples +// +// // Standard parser without descriptors +// specParser := NewParser(Minute | Hour | Dom | Month | Dow) +// sched, err := specParser.Parse("0 0 15 */3 *") +// +// // Same as above, just excludes time fields +// specParser := NewParser(Dom | Month | Dow) +// sched, err := specParser.Parse("15 */3 *") +// +// // Same as above, just makes Dow optional +// specParser := NewParser(Dom | Month | DowOptional) +// sched, err := specParser.Parse("15 */3") +// +func NewParser(options ParseOption) Parser { + optionals := 0 + if options&DowOptional > 0 { + optionals++ + } + if options&SecondOptional > 0 { + optionals++ + } + if optionals > 1 { + panic("multiple optionals may not be configured") + } + return Parser{options} +} + +// Parse returns a new crontab schedule representing the given spec. +// It returns a descriptive error if the spec is not valid. +// It accepts crontab specs and features configured by NewParser. +func (p Parser) Parse(spec string) (Schedule, error) { + if len(spec) == 0 { + return nil, fmt.Errorf("empty spec string") + } + + // Extract timezone if present + var loc = time.Local + if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") { + var err error + i := strings.Index(spec, " ") + eq := strings.Index(spec, "=") + if loc, err = time.LoadLocation(spec[eq+1 : i]); err != nil { + return nil, fmt.Errorf("provided bad location %s: %v", spec[eq+1:i], err) + } + spec = strings.TrimSpace(spec[i:]) + } + + // Handle named schedules (descriptors), if configured + if strings.HasPrefix(spec, "@") { + if p.options&Descriptor == 0 { + return nil, fmt.Errorf("parser does not accept descriptors: %v", spec) + } + return parseDescriptor(spec, loc) + } + + // Split on whitespace. + fields := strings.Fields(spec) + + // Validate & fill in any omitted or optional fields + var err error + fields, err = normalizeFields(fields, p.options) + if err != nil { + return nil, err + } + + field := func(field string, r bounds) uint64 { + if err != nil { + return 0 + } + var bits uint64 + bits, err = getField(field, r) + return bits + } + + var ( + second = field(fields[0], seconds) + minute = field(fields[1], minutes) + hour = field(fields[2], hours) + dayofmonth = field(fields[3], dom) + month = field(fields[4], months) + dayofweek = field(fields[5], dow) + ) + if err != nil { + return nil, err + } + + return &SpecSchedule{ + Second: second, + Minute: minute, + Hour: hour, + Dom: dayofmonth, + Month: month, + Dow: dayofweek, + Location: loc, + }, nil +} + +// normalizeFields takes a subset set of the time fields and returns the full set +// with defaults (zeroes) populated for unset fields. +// +// As part of performing this function, it also validates that the provided +// fields are compatible with the configured options. +func normalizeFields(fields []string, options ParseOption) ([]string, error) { + // Validate optionals & add their field to options + optionals := 0 + if options&SecondOptional > 0 { + options |= Second + optionals++ + } + if options&DowOptional > 0 { + options |= Dow + optionals++ + } + if optionals > 1 { + return nil, fmt.Errorf("multiple optionals may not be configured") + } + + // Figure out how many fields we need + max := 0 + for _, place := range places { + if options&place > 0 { + max++ + } + } + min := max - optionals + + // Validate number of fields + if count := len(fields); count < min || count > max { + if min == max { + return nil, fmt.Errorf("expected exactly %d fields, found %d: %s", min, count, fields) + } + return nil, fmt.Errorf("expected %d to %d fields, found %d: %s", min, max, count, fields) + } + + // Populate the optional field if not provided + if min < max && len(fields) == min { + switch { + case options&DowOptional > 0: + fields = append(fields, defaults[5]) // TODO: improve access to default + case options&SecondOptional > 0: + fields = append([]string{defaults[0]}, fields...) + default: + return nil, fmt.Errorf("unknown optional field") + } + } + + // Populate all fields not part of options with their defaults + n := 0 + expandedFields := make([]string, len(places)) + copy(expandedFields, defaults) + for i, place := range places { + if options&place > 0 { + expandedFields[i] = fields[n] + n++ + } + } + return expandedFields, nil +} + +var standardParser = NewParser( + Minute | Hour | Dom | Month | Dow | Descriptor, +) + +// ParseStandard returns a new crontab schedule representing the given +// standardSpec (https://en.wikipedia.org/wiki/Cron). It requires 5 entries +// representing: minute, hour, day of month, month and day of week, in that +// order. It returns a descriptive error if the spec is not valid. +// +// It accepts +// - Standard crontab specs, e.g. "* * * * ?" +// - Descriptors, e.g. "@midnight", "@every 1h30m" +func ParseStandard(standardSpec string) (Schedule, error) { + return standardParser.Parse(standardSpec) +} + +// getField returns an Int with the bits set representing all of the times that +// the field represents or error parsing field value. A "field" is a comma-separated +// list of "ranges". +func getField(field string, r bounds) (uint64, error) { + var bits uint64 + ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' }) + for _, expr := range ranges { + bit, err := getRange(expr, r) + if err != nil { + return bits, err + } + bits |= bit + } + return bits, nil +} + +// getRange returns the bits indicated by the given expression: +// number | number "-" number [ "/" number ] +// or error parsing range. +func getRange(expr string, r bounds) (uint64, error) { + var ( + start, end, step uint + rangeAndStep = strings.Split(expr, "/") + lowAndHigh = strings.Split(rangeAndStep[0], "-") + singleDigit = len(lowAndHigh) == 1 + err error + ) + + var extra uint64 + if lowAndHigh[0] == "*" || lowAndHigh[0] == "?" { + start = r.min + end = r.max + extra = starBit + } else { + start, err = parseIntOrName(lowAndHigh[0], r.names) + if err != nil { + return 0, err + } + switch len(lowAndHigh) { + case 1: + end = start + case 2: + end, err = parseIntOrName(lowAndHigh[1], r.names) + if err != nil { + return 0, err + } + default: + return 0, fmt.Errorf("too many hyphens: %s", expr) + } + } + + switch len(rangeAndStep) { + case 1: + step = 1 + case 2: + step, err = mustParseInt(rangeAndStep[1]) + if err != nil { + return 0, err + } + + // Special handling: "N/step" means "N-max/step". + if singleDigit { + end = r.max + } + if step > 1 { + extra = 0 + } + default: + return 0, fmt.Errorf("too many slashes: %s", expr) + } + + if start < r.min { + return 0, fmt.Errorf("beginning of range (%d) below minimum (%d): %s", start, r.min, expr) + } + if end > r.max { + return 0, fmt.Errorf("end of range (%d) above maximum (%d): %s", end, r.max, expr) + } + if start > end { + return 0, fmt.Errorf("beginning of range (%d) beyond end of range (%d): %s", start, end, expr) + } + if step == 0 { + return 0, fmt.Errorf("step of range should be a positive number: %s", expr) + } + + return getBits(start, end, step) | extra, nil +} + +// parseIntOrName returns the (possibly-named) integer contained in expr. +func parseIntOrName(expr string, names map[string]uint) (uint, error) { + if names != nil { + if namedInt, ok := names[strings.ToLower(expr)]; ok { + return namedInt, nil + } + } + return mustParseInt(expr) +} + +// mustParseInt parses the given expression as an int or returns an error. +func mustParseInt(expr string) (uint, error) { + num, err := strconv.Atoi(expr) + if err != nil { + return 0, fmt.Errorf("failed to parse int from %s: %s", expr, err) + } + if num < 0 { + return 0, fmt.Errorf("negative number (%d) not allowed: %s", num, expr) + } + + return uint(num), nil +} + +// getBits sets all bits in the range [min, max], modulo the given step size. +func getBits(min, max, step uint) uint64 { + var bits uint64 + + // If step is 1, use shifts. + if step == 1 { + return ^(math.MaxUint64 << (max + 1)) & (math.MaxUint64 << min) + } + + // Else, use a simple loop. + for i := min; i <= max; i += step { + bits |= 1 << i + } + return bits +} + +// all returns all bits within the given bounds. (plus the star bit) +func all(r bounds) uint64 { + return getBits(r.min, r.max, 1) | starBit +} + +// parseDescriptor returns a predefined schedule for the expression, or error if none matches. +func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { + switch descriptor { + case "@yearly", "@annually": + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: 1 << dom.min, + Month: 1 << months.min, + Dow: all(dow), + Location: loc, + }, nil + + case "@monthly": + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: 1 << dom.min, + Month: all(months), + Dow: all(dow), + Location: loc, + }, nil + + case "@weekly": + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: all(dom), + Month: all(months), + Dow: 1 << dow.min, + Location: loc, + }, nil + + case "@daily", "@midnight": + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: all(dom), + Month: all(months), + Dow: all(dow), + Location: loc, + }, nil + + case "@hourly": + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: all(hours), + Dom: all(dom), + Month: all(months), + Dow: all(dow), + Location: loc, + }, nil + + } + + const every = "@every " + if strings.HasPrefix(descriptor, every) { + duration, err := time.ParseDuration(descriptor[len(every):]) + if err != nil { + return nil, fmt.Errorf("failed to parse duration %s: %s", descriptor, err) + } + return Every(duration), nil + } + + return nil, fmt.Errorf("unrecognized descriptor: %s", descriptor) +} diff --git a/cron/parser_test.go b/cron/parser_test.go new file mode 100644 index 0000000..41c8c52 --- /dev/null +++ b/cron/parser_test.go @@ -0,0 +1,383 @@ +package cron + +import ( + "reflect" + "strings" + "testing" + "time" +) + +var secondParser = NewParser(Second | Minute | Hour | Dom | Month | DowOptional | Descriptor) + +func TestRange(t *testing.T) { + zero := uint64(0) + ranges := []struct { + expr string + min, max uint + expected uint64 + err string + }{ + {"5", 0, 7, 1 << 5, ""}, + {"0", 0, 7, 1 << 0, ""}, + {"7", 0, 7, 1 << 7, ""}, + + {"5-5", 0, 7, 1 << 5, ""}, + {"5-6", 0, 7, 1<<5 | 1<<6, ""}, + {"5-7", 0, 7, 1<<5 | 1<<6 | 1<<7, ""}, + + {"5-6/2", 0, 7, 1 << 5, ""}, + {"5-7/2", 0, 7, 1<<5 | 1<<7, ""}, + {"5-7/1", 0, 7, 1<<5 | 1<<6 | 1<<7, ""}, + + {"*", 1, 3, 1<<1 | 1<<2 | 1<<3 | starBit, ""}, + {"*/2", 1, 3, 1<<1 | 1<<3, ""}, + + {"5--5", 0, 0, zero, "too many hyphens"}, + {"jan-x", 0, 0, zero, "failed to parse int from"}, + {"2-x", 1, 5, zero, "failed to parse int from"}, + {"*/-12", 0, 0, zero, "negative number"}, + {"*//2", 0, 0, zero, "too many slashes"}, + {"1", 3, 5, zero, "below minimum"}, + {"6", 3, 5, zero, "above maximum"}, + {"5-3", 3, 5, zero, "beyond end of range"}, + {"*/0", 0, 0, zero, "should be a positive number"}, + } + + for _, c := range ranges { + actual, err := getRange(c.expr, bounds{c.min, c.max, nil}) + if len(c.err) != 0 && (err == nil || !strings.Contains(err.Error(), c.err)) { + t.Errorf("%s => expected %v, got %v", c.expr, c.err, err) + } + if len(c.err) == 0 && err != nil { + t.Errorf("%s => unexpected error %v", c.expr, err) + } + if actual != c.expected { + t.Errorf("%s => expected %d, got %d", c.expr, c.expected, actual) + } + } +} + +func TestField(t *testing.T) { + fields := []struct { + expr string + min, max uint + expected uint64 + }{ + {"5", 1, 7, 1 << 5}, + {"5,6", 1, 7, 1<<5 | 1<<6}, + {"5,6,7", 1, 7, 1<<5 | 1<<6 | 1<<7}, + {"1,5-7/2,3", 1, 7, 1<<1 | 1<<5 | 1<<7 | 1<<3}, + } + + for _, c := range fields { + actual, _ := getField(c.expr, bounds{c.min, c.max, nil}) + if actual != c.expected { + t.Errorf("%s => expected %d, got %d", c.expr, c.expected, actual) + } + } +} + +func TestAll(t *testing.T) { + allBits := []struct { + r bounds + expected uint64 + }{ + {minutes, 0xfffffffffffffff}, // 0-59: 60 ones + {hours, 0xffffff}, // 0-23: 24 ones + {dom, 0xfffffffe}, // 1-31: 31 ones, 1 zero + {months, 0x1ffe}, // 1-12: 12 ones, 1 zero + {dow, 0x7f}, // 0-6: 7 ones + } + + for _, c := range allBits { + actual := all(c.r) // all() adds the starBit, so compensate for that.. + if c.expected|starBit != actual { + t.Errorf("%d-%d/%d => expected %b, got %b", + c.r.min, c.r.max, 1, c.expected|starBit, actual) + } + } +} + +func TestBits(t *testing.T) { + bits := []struct { + min, max, step uint + expected uint64 + }{ + {0, 0, 1, 0x1}, + {1, 1, 1, 0x2}, + {1, 5, 2, 0x2a}, // 101010 + {1, 4, 2, 0xa}, // 1010 + } + + for _, c := range bits { + actual := getBits(c.min, c.max, c.step) + if c.expected != actual { + t.Errorf("%d-%d/%d => expected %b, got %b", + c.min, c.max, c.step, c.expected, actual) + } + } +} + +func TestParseScheduleErrors(t *testing.T) { + var tests = []struct{ expr, err string }{ + {"* 5 j * * *", "failed to parse int from"}, + {"@every Xm", "failed to parse duration"}, + {"@unrecognized", "unrecognized descriptor"}, + {"* * * *", "expected 5 to 6 fields"}, + {"", "empty spec string"}, + } + for _, c := range tests { + actual, err := secondParser.Parse(c.expr) + if err == nil || !strings.Contains(err.Error(), c.err) { + t.Errorf("%s => expected %v, got %v", c.expr, c.err, err) + } + if actual != nil { + t.Errorf("expected nil schedule on error, got %v", actual) + } + } +} + +func TestParseSchedule(t *testing.T) { + tokyo, _ := time.LoadLocation("Asia/Tokyo") + entries := []struct { + parser Parser + expr string + expected Schedule + }{ + {secondParser, "0 5 * * * *", every5min(time.Local)}, + {standardParser, "5 * * * *", every5min(time.Local)}, + {secondParser, "CRON_TZ=UTC 0 5 * * * *", every5min(time.UTC)}, + {standardParser, "CRON_TZ=UTC 5 * * * *", every5min(time.UTC)}, + {secondParser, "CRON_TZ=Asia/Tokyo 0 5 * * * *", every5min(tokyo)}, + {secondParser, "@every 5m", ConstantDelaySchedule{5 * time.Minute}}, + {secondParser, "@midnight", midnight(time.Local)}, + {secondParser, "TZ=UTC @midnight", midnight(time.UTC)}, + {secondParser, "TZ=Asia/Tokyo @midnight", midnight(tokyo)}, + {secondParser, "@yearly", annual(time.Local)}, + {secondParser, "@annually", annual(time.Local)}, + { + parser: secondParser, + expr: "* 5 * * * *", + expected: &SpecSchedule{ + Second: all(seconds), + Minute: 1 << 5, + Hour: all(hours), + Dom: all(dom), + Month: all(months), + Dow: all(dow), + Location: time.Local, + }, + }, + } + + for _, c := range entries { + actual, err := c.parser.Parse(c.expr) + if err != nil { + t.Errorf("%s => unexpected error %v", c.expr, err) + } + if !reflect.DeepEqual(actual, c.expected) { + t.Errorf("%s => expected %b, got %b", c.expr, c.expected, actual) + } + } +} + +func TestOptionalSecondSchedule(t *testing.T) { + parser := NewParser(SecondOptional | Minute | Hour | Dom | Month | Dow | Descriptor) + entries := []struct { + expr string + expected Schedule + }{ + {"0 5 * * * *", every5min(time.Local)}, + {"5 5 * * * *", every5min5s(time.Local)}, + {"5 * * * *", every5min(time.Local)}, + } + + for _, c := range entries { + actual, err := parser.Parse(c.expr) + if err != nil { + t.Errorf("%s => unexpected error %v", c.expr, err) + } + if !reflect.DeepEqual(actual, c.expected) { + t.Errorf("%s => expected %b, got %b", c.expr, c.expected, actual) + } + } +} + +func TestNormalizeFields(t *testing.T) { + tests := []struct { + name string + input []string + options ParseOption + expected []string + }{ + { + "AllFields_NoOptional", + []string{"0", "5", "*", "*", "*", "*"}, + Second | Minute | Hour | Dom | Month | Dow | Descriptor, + []string{"0", "5", "*", "*", "*", "*"}, + }, + { + "AllFields_SecondOptional_Provided", + []string{"0", "5", "*", "*", "*", "*"}, + SecondOptional | Minute | Hour | Dom | Month | Dow | Descriptor, + []string{"0", "5", "*", "*", "*", "*"}, + }, + { + "AllFields_SecondOptional_NotProvided", + []string{"5", "*", "*", "*", "*"}, + SecondOptional | Minute | Hour | Dom | Month | Dow | Descriptor, + []string{"0", "5", "*", "*", "*", "*"}, + }, + { + "SubsetFields_NoOptional", + []string{"5", "15", "*"}, + Hour | Dom | Month, + []string{"0", "0", "5", "15", "*", "*"}, + }, + { + "SubsetFields_DowOptional_Provided", + []string{"5", "15", "*", "4"}, + Hour | Dom | Month | DowOptional, + []string{"0", "0", "5", "15", "*", "4"}, + }, + { + "SubsetFields_DowOptional_NotProvided", + []string{"5", "15", "*"}, + Hour | Dom | Month | DowOptional, + []string{"0", "0", "5", "15", "*", "*"}, + }, + { + "SubsetFields_SecondOptional_NotProvided", + []string{"5", "15", "*"}, + SecondOptional | Hour | Dom | Month, + []string{"0", "0", "5", "15", "*", "*"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual, err := normalizeFields(test.input, test.options) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(actual, test.expected) { + t.Errorf("expected %v, got %v", test.expected, actual) + } + }) + } +} + +func TestNormalizeFields_Errors(t *testing.T) { + tests := []struct { + name string + input []string + options ParseOption + err string + }{ + { + "TwoOptionals", + []string{"0", "5", "*", "*", "*", "*"}, + SecondOptional | Minute | Hour | Dom | Month | DowOptional, + "", + }, + { + "TooManyFields", + []string{"0", "5", "*", "*"}, + SecondOptional | Minute | Hour, + "", + }, + { + "NoFields", + []string{}, + SecondOptional | Minute | Hour, + "", + }, + { + "TooFewFields", + []string{"*"}, + SecondOptional | Minute | Hour, + "", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual, err := normalizeFields(test.input, test.options) + if err == nil { + t.Errorf("expected an error, got none. results: %v", actual) + } + if !strings.Contains(err.Error(), test.err) { + t.Errorf("expected error %q, got %q", test.err, err.Error()) + } + }) + } +} + +func TestStandardSpecSchedule(t *testing.T) { + entries := []struct { + expr string + expected Schedule + err string + }{ + { + expr: "5 * * * *", + expected: &SpecSchedule{1 << seconds.min, 1 << 5, all(hours), all(dom), all(months), all(dow), time.Local}, + }, + { + expr: "@every 5m", + expected: ConstantDelaySchedule{time.Duration(5) * time.Minute}, + }, + { + expr: "5 j * * *", + err: "failed to parse int from", + }, + { + expr: "* * * *", + err: "expected exactly 5 fields", + }, + } + + for _, c := range entries { + actual, err := ParseStandard(c.expr) + if len(c.err) != 0 && (err == nil || !strings.Contains(err.Error(), c.err)) { + t.Errorf("%s => expected %v, got %v", c.expr, c.err, err) + } + if len(c.err) == 0 && err != nil { + t.Errorf("%s => unexpected error %v", c.expr, err) + } + if !reflect.DeepEqual(actual, c.expected) { + t.Errorf("%s => expected %b, got %b", c.expr, c.expected, actual) + } + } +} + +func TestNoDescriptorParser(t *testing.T) { + parser := NewParser(Minute | Hour) + _, err := parser.Parse("@every 1m") + if err == nil { + t.Error("expected an error, got none") + } +} + +func every5min(loc *time.Location) *SpecSchedule { + return &SpecSchedule{1 << 0, 1 << 5, all(hours), all(dom), all(months), all(dow), loc} +} + +func every5min5s(loc *time.Location) *SpecSchedule { + return &SpecSchedule{1 << 5, 1 << 5, all(hours), all(dom), all(months), all(dow), loc} +} + +func midnight(loc *time.Location) *SpecSchedule { + return &SpecSchedule{1, 1, 1, all(dom), all(months), all(dow), loc} +} + +func annual(loc *time.Location) *SpecSchedule { + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: 1 << dom.min, + Month: 1 << months.min, + Dow: all(dow), + Location: loc, + } +} diff --git a/cron/spec.go b/cron/spec.go new file mode 100644 index 0000000..fa1e241 --- /dev/null +++ b/cron/spec.go @@ -0,0 +1,188 @@ +package cron + +import "time" + +// SpecSchedule specifies a duty cycle (to the second granularity), based on a +// traditional crontab specification. It is computed initially and stored as bit sets. +type SpecSchedule struct { + Second, Minute, Hour, Dom, Month, Dow uint64 + + // Override location for this schedule. + Location *time.Location +} + +// bounds provides a range of acceptable values (plus a map of name to value). +type bounds struct { + min, max uint + names map[string]uint +} + +// The bounds for each field. +var ( + seconds = bounds{0, 59, nil} + minutes = bounds{0, 59, nil} + hours = bounds{0, 23, nil} + dom = bounds{1, 31, nil} + months = bounds{1, 12, map[string]uint{ + "jan": 1, + "feb": 2, + "mar": 3, + "apr": 4, + "may": 5, + "jun": 6, + "jul": 7, + "aug": 8, + "sep": 9, + "oct": 10, + "nov": 11, + "dec": 12, + }} + dow = bounds{0, 6, map[string]uint{ + "sun": 0, + "mon": 1, + "tue": 2, + "wed": 3, + "thu": 4, + "fri": 5, + "sat": 6, + }} +) + +const ( + // Set the top bit if a star was included in the expression. + starBit = 1 << 63 +) + +// Next returns the next time this schedule is activated, greater than the given +// time. If no time can be found to satisfy the schedule, return the zero time. +func (s *SpecSchedule) Next(t time.Time) time.Time { + // General approach + // + // For Month, Day, Hour, Minute, Second: + // Check if the time value matches. If yes, continue to the next field. + // If the field doesn't match the schedule, then increment the field until it matches. + // While incrementing the field, a wrap-around brings it back to the beginning + // of the field list (since it is necessary to re-verify previous field + // values) + + // Convert the given time into the schedule's timezone, if one is specified. + // Save the original timezone so we can convert back after we find a time. + // Note that schedules without a time zone specified (time.Local) are treated + // as local to the time provided. + origLocation := t.Location() + loc := s.Location + if loc == time.Local { + loc = t.Location() + } + if s.Location != time.Local { + t = t.In(s.Location) + } + + // Start at the earliest possible time (the upcoming second). + t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond) + + // This flag indicates whether a field has been incremented. + added := false + + // If no time is found within five years, return zero. + yearLimit := t.Year() + 5 + +WRAP: + if t.Year() > yearLimit { + return time.Time{} + } + + // Find the first applicable month. + // If it's this month, then do nothing. + for 1< 12 { + t = t.Add(time.Duration(24-t.Hour()) * time.Hour) + } else { + t = t.Add(time.Duration(-t.Hour()) * time.Hour) + } + } + + if t.Day() == 1 { + goto WRAP + } + } + + for 1< 0 + dowMatch bool = 1< 0 + ) + if s.Dom&starBit > 0 || s.Dow&starBit > 0 { + return domMatch && dowMatch + } + return domMatch || dowMatch +} diff --git a/cron/spec_test.go b/cron/spec_test.go new file mode 100644 index 0000000..1b8a503 --- /dev/null +++ b/cron/spec_test.go @@ -0,0 +1,300 @@ +package cron + +import ( + "strings" + "testing" + "time" +) + +func TestActivation(t *testing.T) { + tests := []struct { + time, spec string + expected bool + }{ + // Every fifteen minutes. + {"Mon Jul 9 15:00 2012", "0/15 * * * *", true}, + {"Mon Jul 9 15:45 2012", "0/15 * * * *", true}, + {"Mon Jul 9 15:40 2012", "0/15 * * * *", false}, + + // Every fifteen minutes, starting at 5 minutes. + {"Mon Jul 9 15:05 2012", "5/15 * * * *", true}, + {"Mon Jul 9 15:20 2012", "5/15 * * * *", true}, + {"Mon Jul 9 15:50 2012", "5/15 * * * *", true}, + + // Named months + {"Sun Jul 15 15:00 2012", "0/15 * * Jul *", true}, + {"Sun Jul 15 15:00 2012", "0/15 * * Jun *", false}, + + // Everything set. + {"Sun Jul 15 08:30 2012", "30 08 ? Jul Sun", true}, + {"Sun Jul 15 08:30 2012", "30 08 15 Jul ?", true}, + {"Mon Jul 16 08:30 2012", "30 08 ? Jul Sun", false}, + {"Mon Jul 16 08:30 2012", "30 08 15 Jul ?", false}, + + // Predefined schedules + {"Mon Jul 9 15:00 2012", "@hourly", true}, + {"Mon Jul 9 15:04 2012", "@hourly", false}, + {"Mon Jul 9 15:00 2012", "@daily", false}, + {"Mon Jul 9 00:00 2012", "@daily", true}, + {"Mon Jul 9 00:00 2012", "@weekly", false}, + {"Sun Jul 8 00:00 2012", "@weekly", true}, + {"Sun Jul 8 01:00 2012", "@weekly", false}, + {"Sun Jul 8 00:00 2012", "@monthly", false}, + {"Sun Jul 1 00:00 2012", "@monthly", true}, + + // Test interaction of DOW and DOM. + // If both are restricted, then only one needs to match. + {"Sun Jul 15 00:00 2012", "* * 1,15 * Sun", true}, + {"Fri Jun 15 00:00 2012", "* * 1,15 * Sun", true}, + {"Wed Aug 1 00:00 2012", "* * 1,15 * Sun", true}, + {"Sun Jul 15 00:00 2012", "* * */10 * Sun", true}, // verifies #70 + + // However, if one has a star, then both need to match. + {"Sun Jul 15 00:00 2012", "* * * * Mon", false}, + {"Mon Jul 9 00:00 2012", "* * 1,15 * *", false}, + {"Sun Jul 15 00:00 2012", "* * 1,15 * *", true}, + {"Sun Jul 15 00:00 2012", "* * */2 * Sun", true}, + } + + for _, test := range tests { + sched, err := ParseStandard(test.spec) + if err != nil { + t.Error(err) + continue + } + actual := sched.Next(getTime(test.time).Add(-1 * time.Second)) + expected := getTime(test.time) + if test.expected && expected != actual || !test.expected && expected == actual { + t.Errorf("Fail evaluating %s on %s: (expected) %s != %s (actual)", + test.spec, test.time, expected, actual) + } + } +} + +func TestNext(t *testing.T) { + runs := []struct { + time, spec string + expected string + }{ + // Simple cases + {"Mon Jul 9 14:45 2012", "0 0/15 * * * *", "Mon Jul 9 15:00 2012"}, + {"Mon Jul 9 14:59 2012", "0 0/15 * * * *", "Mon Jul 9 15:00 2012"}, + {"Mon Jul 9 14:59:59 2012", "0 0/15 * * * *", "Mon Jul 9 15:00 2012"}, + + // Wrap around hours + {"Mon Jul 9 15:45 2012", "0 20-35/15 * * * *", "Mon Jul 9 16:20 2012"}, + + // Wrap around days + {"Mon Jul 9 23:46 2012", "0 */15 * * * *", "Tue Jul 10 00:00 2012"}, + {"Mon Jul 9 23:45 2012", "0 20-35/15 * * * *", "Tue Jul 10 00:20 2012"}, + {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 * * * *", "Tue Jul 10 00:20:15 2012"}, + {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 1/2 * * *", "Tue Jul 10 01:20:15 2012"}, + {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 10-12 * * *", "Tue Jul 10 10:20:15 2012"}, + + {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 1/2 */2 * *", "Thu Jul 11 01:20:15 2012"}, + {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 * 9-20 * *", "Wed Jul 10 00:20:15 2012"}, + {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 * 9-20 Jul *", "Wed Jul 10 00:20:15 2012"}, + + // Wrap around months + {"Mon Jul 9 23:35 2012", "0 0 0 9 Apr-Oct ?", "Thu Aug 9 00:00 2012"}, + {"Mon Jul 9 23:35 2012", "0 0 0 */5 Apr,Aug,Oct Mon", "Tue Aug 1 00:00 2012"}, + {"Mon Jul 9 23:35 2012", "0 0 0 */5 Oct Mon", "Mon Oct 1 00:00 2012"}, + + // Wrap around years + {"Mon Jul 9 23:35 2012", "0 0 0 * Feb Mon", "Mon Feb 4 00:00 2013"}, + {"Mon Jul 9 23:35 2012", "0 0 0 * Feb Mon/2", "Fri Feb 1 00:00 2013"}, + + // Wrap around minute, hour, day, month, and year + {"Mon Dec 31 23:59:45 2012", "0 * * * * *", "Tue Jan 1 00:00:00 2013"}, + + // Leap year + {"Mon Jul 9 23:35 2012", "0 0 0 29 Feb ?", "Mon Feb 29 00:00 2016"}, + + // Daylight savings time 2am EST (-5) -> 3am EDT (-4) + {"2012-03-11T00:00:00-0500", "TZ=America/New_York 0 30 2 11 Mar ?", "2013-03-11T02:30:00-0400"}, + + // hourly job + {"2012-03-11T00:00:00-0500", "TZ=America/New_York 0 0 * * * ?", "2012-03-11T01:00:00-0500"}, + {"2012-03-11T01:00:00-0500", "TZ=America/New_York 0 0 * * * ?", "2012-03-11T03:00:00-0400"}, + {"2012-03-11T03:00:00-0400", "TZ=America/New_York 0 0 * * * ?", "2012-03-11T04:00:00-0400"}, + {"2012-03-11T04:00:00-0400", "TZ=America/New_York 0 0 * * * ?", "2012-03-11T05:00:00-0400"}, + + // hourly job using CRON_TZ + {"2012-03-11T00:00:00-0500", "CRON_TZ=America/New_York 0 0 * * * ?", "2012-03-11T01:00:00-0500"}, + {"2012-03-11T01:00:00-0500", "CRON_TZ=America/New_York 0 0 * * * ?", "2012-03-11T03:00:00-0400"}, + {"2012-03-11T03:00:00-0400", "CRON_TZ=America/New_York 0 0 * * * ?", "2012-03-11T04:00:00-0400"}, + {"2012-03-11T04:00:00-0400", "CRON_TZ=America/New_York 0 0 * * * ?", "2012-03-11T05:00:00-0400"}, + + // 1am nightly job + {"2012-03-11T00:00:00-0500", "TZ=America/New_York 0 0 1 * * ?", "2012-03-11T01:00:00-0500"}, + {"2012-03-11T01:00:00-0500", "TZ=America/New_York 0 0 1 * * ?", "2012-03-12T01:00:00-0400"}, + + // 2am nightly job (skipped) + {"2012-03-11T00:00:00-0500", "TZ=America/New_York 0 0 2 * * ?", "2012-03-12T02:00:00-0400"}, + + // Daylight savings time 2am EDT (-4) => 1am EST (-5) + {"2012-11-04T00:00:00-0400", "TZ=America/New_York 0 30 2 04 Nov ?", "2012-11-04T02:30:00-0500"}, + {"2012-11-04T01:45:00-0400", "TZ=America/New_York 0 30 1 04 Nov ?", "2012-11-04T01:30:00-0500"}, + + // hourly job + {"2012-11-04T00:00:00-0400", "TZ=America/New_York 0 0 * * * ?", "2012-11-04T01:00:00-0400"}, + {"2012-11-04T01:00:00-0400", "TZ=America/New_York 0 0 * * * ?", "2012-11-04T01:00:00-0500"}, + {"2012-11-04T01:00:00-0500", "TZ=America/New_York 0 0 * * * ?", "2012-11-04T02:00:00-0500"}, + + // 1am nightly job (runs twice) + {"2012-11-04T00:00:00-0400", "TZ=America/New_York 0 0 1 * * ?", "2012-11-04T01:00:00-0400"}, + {"2012-11-04T01:00:00-0400", "TZ=America/New_York 0 0 1 * * ?", "2012-11-04T01:00:00-0500"}, + {"2012-11-04T01:00:00-0500", "TZ=America/New_York 0 0 1 * * ?", "2012-11-05T01:00:00-0500"}, + + // 2am nightly job + {"2012-11-04T00:00:00-0400", "TZ=America/New_York 0 0 2 * * ?", "2012-11-04T02:00:00-0500"}, + {"2012-11-04T02:00:00-0500", "TZ=America/New_York 0 0 2 * * ?", "2012-11-05T02:00:00-0500"}, + + // 3am nightly job + {"2012-11-04T00:00:00-0400", "TZ=America/New_York 0 0 3 * * ?", "2012-11-04T03:00:00-0500"}, + {"2012-11-04T03:00:00-0500", "TZ=America/New_York 0 0 3 * * ?", "2012-11-05T03:00:00-0500"}, + + // hourly job + {"TZ=America/New_York 2012-11-04T00:00:00-0400", "0 0 * * * ?", "2012-11-04T01:00:00-0400"}, + {"TZ=America/New_York 2012-11-04T01:00:00-0400", "0 0 * * * ?", "2012-11-04T01:00:00-0500"}, + {"TZ=America/New_York 2012-11-04T01:00:00-0500", "0 0 * * * ?", "2012-11-04T02:00:00-0500"}, + + // 1am nightly job (runs twice) + {"TZ=America/New_York 2012-11-04T00:00:00-0400", "0 0 1 * * ?", "2012-11-04T01:00:00-0400"}, + {"TZ=America/New_York 2012-11-04T01:00:00-0400", "0 0 1 * * ?", "2012-11-04T01:00:00-0500"}, + {"TZ=America/New_York 2012-11-04T01:00:00-0500", "0 0 1 * * ?", "2012-11-05T01:00:00-0500"}, + + // 2am nightly job + {"TZ=America/New_York 2012-11-04T00:00:00-0400", "0 0 2 * * ?", "2012-11-04T02:00:00-0500"}, + {"TZ=America/New_York 2012-11-04T02:00:00-0500", "0 0 2 * * ?", "2012-11-05T02:00:00-0500"}, + + // 3am nightly job + {"TZ=America/New_York 2012-11-04T00:00:00-0400", "0 0 3 * * ?", "2012-11-04T03:00:00-0500"}, + {"TZ=America/New_York 2012-11-04T03:00:00-0500", "0 0 3 * * ?", "2012-11-05T03:00:00-0500"}, + + // Unsatisfiable + {"Mon Jul 9 23:35 2012", "0 0 0 30 Feb ?", ""}, + {"Mon Jul 9 23:35 2012", "0 0 0 31 Apr ?", ""}, + + // Monthly job + {"TZ=America/New_York 2012-11-04T00:00:00-0400", "0 0 3 3 * ?", "2012-12-03T03:00:00-0500"}, + + // Test the scenario of DST resulting in midnight not being a valid time. + // https://github.com/robfig/cron/issues/157 + {"2018-10-17T05:00:00-0400", "TZ=America/Sao_Paulo 0 0 9 10 * ?", "2018-11-10T06:00:00-0500"}, + {"2018-02-14T05:00:00-0500", "TZ=America/Sao_Paulo 0 0 9 22 * ?", "2018-02-22T07:00:00-0500"}, + } + + for _, c := range runs { + sched, err := secondParser.Parse(c.spec) + if err != nil { + t.Error(err) + continue + } + actual := sched.Next(getTime(c.time)) + expected := getTime(c.expected) + if !actual.Equal(expected) { + t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.spec, expected, actual) + } + } +} + +func TestErrors(t *testing.T) { + invalidSpecs := []string{ + "xyz", + "60 0 * * *", + "0 60 * * *", + "0 0 * * XYZ", + } + for _, spec := range invalidSpecs { + _, err := ParseStandard(spec) + if err == nil { + t.Error("expected an error parsing: ", spec) + } + } +} + +func getTime(value string) time.Time { + if value == "" { + return time.Time{} + } + + var location = time.Local + if strings.HasPrefix(value, "TZ=") { + parts := strings.Fields(value) + loc, err := time.LoadLocation(parts[0][len("TZ="):]) + if err != nil { + panic("could not parse location:" + err.Error()) + } + location = loc + value = parts[1] + } + + var layouts = []string{ + "Mon Jan 2 15:04 2006", + "Mon Jan 2 15:04:05 2006", + } + for _, layout := range layouts { + if t, err := time.ParseInLocation(layout, value, location); err == nil { + return t + } + } + if t, err := time.ParseInLocation("2006-01-02T15:04:05-0700", value, location); err == nil { + return t + } + panic("could not parse time value " + value) +} + +func TestNextWithTz(t *testing.T) { + runs := []struct { + time, spec string + expected string + }{ + // Failing tests + {"2016-01-03T13:09:03+0530", "14 14 * * *", "2016-01-03T14:14:00+0530"}, + {"2016-01-03T04:09:03+0530", "14 14 * * ?", "2016-01-03T14:14:00+0530"}, + + // Passing tests + {"2016-01-03T14:09:03+0530", "14 14 * * *", "2016-01-03T14:14:00+0530"}, + {"2016-01-03T14:00:00+0530", "14 14 * * ?", "2016-01-03T14:14:00+0530"}, + } + for _, c := range runs { + sched, err := ParseStandard(c.spec) + if err != nil { + t.Error(err) + continue + } + actual := sched.Next(getTimeTZ(c.time)) + expected := getTimeTZ(c.expected) + if !actual.Equal(expected) { + t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.spec, expected, actual) + } + } +} + +func getTimeTZ(value string) time.Time { + if value == "" { + return time.Time{} + } + t, err := time.Parse("Mon Jan 2 15:04 2006", value) + if err != nil { + t, err = time.Parse("Mon Jan 2 15:04:05 2006", value) + if err != nil { + t, err = time.Parse("2006-01-02T15:04:05-0700", value) + if err != nil { + panic(err) + } + } + } + + return t +} + +// https://github.com/robfig/cron/issues/144 +func TestSlash0NoHang(t *testing.T) { + schedule := "TZ=America/New_York 15/0 * * * *" + _, err := ParseStandard(schedule) + if err == nil { + t.Error("expected an error on 0 increment") + } +} diff --git a/dcron.go b/dcron.go index 1862166..cd839ef 100644 --- a/dcron.go +++ b/dcron.go @@ -9,9 +9,9 @@ import ( "sync/atomic" "time" + "github.com/libi/dcron/cron" "github.com/libi/dcron/dlog" "github.com/libi/dcron/driver" - "github.com/robfig/cron/v3" ) const ( @@ -36,8 +36,7 @@ type Dcron struct { nodePool INodePool running int32 - logger dlog.Logger - logInfo bool + logger dlog.Logger nodeUpdateDuration time.Duration hashReplicas int @@ -98,14 +97,14 @@ func (d *Dcron) GetLogger() dlog.Logger { // AddJob add a job func (d *Dcron) AddJob(jobName, cronStr string, job Job) (err error) { - return d.addJob(jobName, cronStr, nil, job) + return d.addJob(jobName, cronStr, job) } // AddFunc add a cron func func (d *Dcron) AddFunc(jobName, cronStr string, cmd func()) (err error) { - return d.addJob(jobName, cronStr, cmd, nil) + return d.addJob(jobName, cronStr, cron.FuncJob(cmd)) } -func (d *Dcron) addJob(jobName, cronStr string, cmd func(), job Job) (err error) { +func (d *Dcron) addJob(jobName, cronStr string, job Job) (err error) { d.logger.Infof("addJob '%s' : %s", jobName, cronStr) d.jobsRWMut.Lock() @@ -116,7 +115,6 @@ func (d *Dcron) addJob(jobName, cronStr string, cmd func(), job Job) (err error) innerJob := JobWarpper{ Name: jobName, CronStr: cronStr, - Func: cmd, Job: job, Dcron: d, } diff --git a/dcron_test.go b/dcron_test.go index 1707bfb..6d37119 100644 --- a/dcron_test.go +++ b/dcron_test.go @@ -1,18 +1,18 @@ package dcron_test import ( - "fmt" "log" "os" "sync" + "sync/atomic" "testing" "time" "github.com/libi/dcron" + "github.com/libi/dcron/cron" "github.com/libi/dcron/dlog" "github.com/libi/dcron/driver" "github.com/redis/go-redis/v9" - "github.com/robfig/cron/v3" "github.com/stretchr/testify/require" ) @@ -20,75 +20,111 @@ const ( DefaultRedisAddr = "127.0.0.1:6379" ) -type TestJob1 struct { +type TestJobWithWG struct { Name string + WG *sync.WaitGroup + Test *testing.T + Cnt *atomic.Int32 } -func (t TestJob1) Run() { - fmt.Println("执行 testjob ", t.Name, time.Now().Format("15:04:05")) +func (job *TestJobWithWG) Run() { + job.Test.Logf("jobName=[%s], time=%s, job rest count=%d", + job.Name, + time.Now().Format("15:04:05"), + job.Cnt.Load(), + ) + if job.Cnt.Load() == 0 { + return + } else { + job.Cnt.Store(job.Cnt.Add(-1)) + if job.Cnt.Load() == 0 { + job.WG.Done() + } + } } -var testData = make(map[string]struct{}) - func TestMultiNodes(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(3) + testJobWGs := make([]*sync.WaitGroup, 0) + testJobWGs = append(testJobWGs, &sync.WaitGroup{}) + testJobWGs = append(testJobWGs, &sync.WaitGroup{}) + testJobWGs = append(testJobWGs, &sync.WaitGroup{}) + testJobWGs[0].Add(1) + testJobWGs[1].Add(1) + + testJobs := make([]*TestJobWithWG, 0) + testJobs = append( + testJobs, + &TestJobWithWG{ + Name: "s1_test1", + WG: testJobWGs[0], + Test: t, + Cnt: &atomic.Int32{}, + }, + &TestJobWithWG{ + Name: "s1_test2", + WG: testJobWGs[1], + Test: t, + Cnt: &atomic.Int32{}, + }, + &TestJobWithWG{ + Name: "s1_test3", + WG: testJobWGs[2], + Test: t, + Cnt: &atomic.Int32{}, + }) + testJobs[0].Cnt.Store(5) + testJobs[1].Cnt.Store(5) + + nodeCancel := make([](chan int), 3) + nodeCancel[0] = make(chan int, 1) + nodeCancel[1] = make(chan int, 1) + nodeCancel[2] = make(chan int, 1) - go runNode(t, wg) // 间隔1秒启动测试节点刷新逻辑 - time.Sleep(time.Second) - go runNode(t, wg) - time.Sleep(time.Second) - go runNode(t, wg) + go runNode(t, wg, testJobs, nodeCancel[0]) + <-time.After(time.Second) + go runNode(t, wg, testJobs, nodeCancel[1]) + <-time.After(time.Second) + go runNode(t, wg, testJobs, nodeCancel[2]) + + testJobWGs[0].Wait() + testJobWGs[1].Wait() + + close(nodeCancel[0]) + close(nodeCancel[1]) + close(nodeCancel[2]) wg.Wait() } -func runNode(t *testing.T, wg *sync.WaitGroup) { +func runNode(t *testing.T, wg *sync.WaitGroup, testJobs []*TestJobWithWG, cancel chan int) { redisCli := redis.NewClient(&redis.Options{ Addr: DefaultRedisAddr, }) drv := driver.NewRedisDriver(redisCli) - dcron := dcron.NewDcron("server1", drv) - //添加多个任务 启动多个节点时 任务会均匀分配给各个节点 - - err := dcron.AddFunc("s1 test1", "* * * * *", func() { - // 同时启动3个节点 但是一个 job 同一时间只会执行一次 通过 map 判重 - key := "s1 test1 : " + time.Now().Format("15:04") - if _, ok := testData[key]; ok { - t.Error("job have running in other node") - } - testData[key] = struct{}{} - }) - if err != nil { - t.Error("add func error") - } - err = dcron.AddFunc("s1 test2", "* * * * *", func() { - t.Log("执行 service1 test2 任务", time.Now().Format("15:04:05")) - }) - if err != nil { - t.Error("add func error") - } + dcron := dcron.NewDcronWithOption( + t.Name(), + drv, + dcron.WithLogger( + dlog.DefaultPrintfLogger( + log.New(os.Stdout, "", log.LstdFlags)))) + // 添加多个任务 启动多个节点时 任务会均匀分配给各个节点 - testJob := TestJob1{"addtestjob"} - err = dcron.AddJob("addtestjob1", "* * * * *", testJob) - if err != nil { - t.Error("add func error") + var err error + for _, job := range testJobs { + if err = dcron.AddJob(job.Name, "* * * * *", job); err != nil { + t.Error("add job error") + } } - err = dcron.AddFunc("s1 test3", "* * * * *", func() { - t.Log("执行 service1 test3 任务", time.Now().Format("15:04:05")) - }) - if err != nil { - t.Error("add func error") - } dcron.Start() - //移除测试 - dcron.Remove("s1 test3") - <-time.After(120 * time.Second) - wg.Done() + dcron.Remove(testJobs[2].Name) + <-cancel dcron.Stop() + wg.Done() } func Test_SecondsJob(t *testing.T) { @@ -115,9 +151,9 @@ func runSecondNode(id string, wg *sync.WaitGroup, runningTime time.Duration, t * drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption(t.Name(), drv, dcron.CronOptionSeconds(), - dcron.WithLogger(&dlog.StdLogger{ - Log: log.New(os.Stdout, "["+id+"]", log.LstdFlags), - }), + dcron.WithLogger(dlog.DefaultPrintfLogger( + log.New(os.Stdout, "["+id+"]", log.LstdFlags), + )), dcron.CronOptionChain(cron.Recover( cron.DefaultLogger, )), @@ -146,15 +182,13 @@ func runSecondNodeWithLogger(id string, wg *sync.WaitGroup, runningTime time.Dur Addr: DefaultRedisAddr, }) drv := driver.NewRedisDriver(redisCli) - dcr := dcron.NewDcronWithOption(t.Name(), drv, - // must use `WithPrintLogInfo` before `WithLogger` - // because we need to set up `cron` log level, it depends - // on ths value of this configuration. - dcron.WithPrintLogInfo(), + dcr := dcron.NewDcronWithOption( + t.Name(), + drv, dcron.CronOptionSeconds(), - dcron.WithLogger(&dlog.StdLogger{ - Log: log.New(os.Stdout, "["+id+"]", log.LstdFlags), - }), + dcron.WithLogger(dlog.VerbosePrintfLogger( + log.New(os.Stdout, "["+id+"]", log.LstdFlags), + )), dcron.CronOptionChain(cron.Recover( cron.DefaultLogger, )), @@ -209,9 +243,9 @@ func Test_WithClusterStableNodes(t *testing.T) { drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption(t.Name(), drv, dcron.CronOptionSeconds(), - dcron.WithLogger(&dlog.StdLogger{ - Log: log.New(os.Stdout, "["+id+"]", log.LstdFlags), - }), + dcron.WithLogger(dlog.DefaultPrintfLogger( + log.New(os.Stdout, "["+id+"]", log.LstdFlags)), + ), dcron.WithClusterStable(timeWindow), dcron.WithNodeUpdateDuration(timeWindow), ) diff --git a/dlog/logger.go b/dlog/logger.go index 168b61a..cefb334 100644 --- a/dlog/logger.go +++ b/dlog/logger.go @@ -5,37 +5,41 @@ import ( ) type PrintfLogger interface { - Printf(string, ...interface{}) + Printf(string, ...any) } type LogfLogger interface { - Logf(string, ...interface{}) + Logf(string, ...any) } type Logger interface { PrintfLogger - Infof(string, ...interface{}) - Warnf(string, ...interface{}) - Errorf(string, ...interface{}) + Infof(string, ...any) + Warnf(string, ...any) + Errorf(string, ...any) } type StdLogger struct { - Log PrintfLogger + Log PrintfLogger + LogVerbose bool } -func (l *StdLogger) Infof(format string, args ...interface{}) { +func (l *StdLogger) Infof(format string, args ...any) { + if !l.LogVerbose { + return + } l.Log.Printf("[INFO] "+format, args...) } -func (l *StdLogger) Warnf(format string, args ...interface{}) { +func (l *StdLogger) Warnf(format string, args ...any) { l.Log.Printf("[WARN] "+format, args...) } -func (l *StdLogger) Errorf(format string, args ...interface{}) { +func (l *StdLogger) Errorf(format string, args ...any) { l.Log.Printf("[ERROR] "+format, args...) } -func (l *StdLogger) Printf(format string, args ...interface{}) { +func (l *StdLogger) Printf(format string, args ...any) { l.Log.Printf(format, args...) } @@ -43,7 +47,7 @@ type PrintfLoggerFromLogfLogger struct { Log LogfLogger } -func (l *PrintfLoggerFromLogfLogger) Printf(fmt string, args ...interface{}) { +func (l *PrintfLoggerFromLogfLogger) Printf(fmt string, args ...any) { l.Log.Logf(fmt, args) } @@ -53,6 +57,22 @@ func NewPrintfLoggerFromLogfLogger(logger LogfLogger) PrintfLogger { func NewLoggerForTest(t *testing.T) Logger { return &StdLogger{ - Log: NewPrintfLoggerFromLogfLogger(t), + Log: NewPrintfLoggerFromLogfLogger(t), + LogVerbose: true, } } + +// 这个方法会打印出所有的WARN level以上的LOG +func WarnPrintfLogger(l PrintfLogger) Logger { + return &StdLogger{Log: l, LogVerbose: false} +} + +// 这个方法会打印出所有的INFO level的LOG +func VerbosePrintfLogger(l PrintfLogger) Logger { + return &StdLogger{Log: l, LogVerbose: true} +} + +// 默认的Logger构造函数,会打印出所有WARN level以上的LOG +func DefaultPrintfLogger(l PrintfLogger) Logger { + return WarnPrintfLogger(l) +} diff --git a/driver/driver.go b/driver/driver.go index 3a70a04..4706b81 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -3,7 +3,7 @@ package driver import ( "context" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -18,7 +18,13 @@ type DriverV2 interface { NodeID() string // get nodes GetNodes(ctx context.Context) (nodes []string, err error) + + // register node to remote server (like etcd/redis), + // will create a goroutine to keep the connection. + // And then continue for other work. Start(ctx context.Context) (err error) + + // stop the goroutine of keep connection. Stop(ctx context.Context) (err error) withOption(opt Option) (err error) diff --git a/driver/etcddriver.go b/driver/etcddriver.go index cc7dd27..7d17de3 100644 --- a/driver/etcddriver.go +++ b/driver/etcddriver.go @@ -21,11 +21,13 @@ type EtcdDriver struct { nodeID string serviceName string - cli *clientv3.Client + cli *clientv3.Client + nodes *sync.Map + logger dlog.Logger + lease int64 - nodes *sync.Map leaseID clientv3.LeaseID - logger dlog.Logger + leaseCh <-chan *clientv3.LeaseKeepAliveResponse ctx context.Context cancel context.CancelFunc @@ -91,9 +93,11 @@ func (e *EtcdDriver) watcher(serviceName string) { for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { - case mvccpb.PUT: //修改或者新增 + case mvccpb.PUT: + // 修改或者新增 e.setServiceList(string(ev.Kv.Key), string(ev.Kv.Value)) - case mvccpb.DELETE: //删除 + case mvccpb.DELETE: + // 删除 e.delServiceList(string(ev.Kv.Key)) } } @@ -138,39 +142,30 @@ func (e *EtcdDriver) revoke(ctx context.Context) { } } -func (e *EtcdDriver) heartBeat(ctx context.Context) { -label: - leaseCh, err := e.keepAlive(ctx, e.nodeID) +func (e *EtcdDriver) startHeartBeat(ctx context.Context) { + var err error + e.leaseCh, err = e.keepAlive(ctx, e.nodeID) if err != nil { e.logger.Errorf("keep alive error, %v", err) return } +} + +func (e *EtcdDriver) keepHeartBeat() { for { select { case <-e.ctx.Done(): { - e.logger.Infof("driver stopped") + e.logger.Warnf("driver stopped") return } - case _, ok := <-leaseCh: + case _, ok := <-e.leaseCh: { - // if lease timeout, goto top of - // this function to keepalive if !ok { - goto label + e.logger.Warnf("lease channel stop, driver stopped") + return } } - case <-time.After(etcdBusinessTimeout): - { - e.logger.Errorf("ectd cli keepalive timeout") - return - } - case <-time.After(time.Duration(e.lease/2) * (time.Second)): - { - // if near to nodes time, - // renew the lease - goto label - } } } } @@ -191,12 +186,13 @@ func (e *EtcdDriver) GetNodes(ctx context.Context) (nodes []string, err error) { func (e *EtcdDriver) Start(ctx context.Context) (err error) { // renew a global ctx when start every time e.ctx, e.cancel = context.WithCancel(context.TODO()) - go e.heartBeat(ctx) + e.startHeartBeat(ctx) err = e.watchService(ctx, e.serviceName) if err != nil { return } - return nil + go e.keepHeartBeat() + return } func (e *EtcdDriver) Stop(ctx context.Context) (err error) { diff --git a/driver/etcddriver_test.go b/driver/etcddriver_test.go index e0f8225..d5b7422 100644 --- a/driver/etcddriver_test.go +++ b/driver/etcddriver_test.go @@ -35,7 +35,7 @@ func TestEtcdDriver_GetNodes(t *testing.T) { require.Nil(t, err) drvs = append(drvs, drv) } - <-time.After(5 * time.Second) + <-time.After(15 * time.Second) for _, v := range drvs { nodes, err := v.GetNodes(context.Background()) require.Nil(t, err) @@ -64,11 +64,9 @@ func TestEtcdDriver_Stop(t *testing.T) { DialTimeout: 3 * time.Second, }) drv2.Init(t.Name(), driver.NewTimeoutOption(5*time.Second), driver.NewLoggerOption(dlog.NewLoggerForTest(t))) - err = drv2.Start(context.Background()) - require.Nil(t, err) + require.Nil(t, drv2.Start(context.Background())) + require.Nil(t, drv1.Start(context.Background())) - err = drv1.Start(context.Background()) - require.Nil(t, err) <-time.After(3 * time.Second) nodes, err = drv1.GetNodes(context.Background()) require.Nil(t, err) @@ -80,17 +78,21 @@ func TestEtcdDriver_Stop(t *testing.T) { drv1.Stop(context.Background()) - <-time.After(5 * time.Second) + <-time.After(15 * time.Second) nodes, err = drv2.GetNodes(context.Background()) require.Nil(t, err) require.Len(t, nodes, 1) err = drv1.Start(context.Background()) require.Nil(t, err) - <-time.After(5 * time.Second) + <-time.After(15 * time.Second) nodes, err = drv2.GetNodes(context.Background()) require.Nil(t, err) require.Len(t, nodes, 2) + nodes, err = drv1.GetNodes(context.Background()) + require.Nil(t, err) + require.Len(t, nodes, 2) drv2.Stop(context.Background()) + drv1.Stop(context.Background()) } diff --git a/driver/redisdriver.go b/driver/redisdriver.go index d618971..b7d193a 100644 --- a/driver/redisdriver.go +++ b/driver/redisdriver.go @@ -9,7 +9,7 @@ import ( "time" "github.com/libi/dcron/dlog" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" ) const ( @@ -25,7 +25,7 @@ type RedisDriver struct { started bool // this context is used to define - // the life time of this driver. + // the lifetime of this driver. runtimeCtx context.Context runtimeCancel context.CancelFunc diff --git a/driver/redisdriver_test.go b/driver/redisdriver_test.go index 4cf380f..31258e2 100644 --- a/driver/redisdriver_test.go +++ b/driver/redisdriver_test.go @@ -9,12 +9,12 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/libi/dcron/dlog" "github.com/libi/dcron/driver" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" ) func testFuncNewRedisDriver(addr string) driver.DriverV2 { - log.Println("redis=", addr) + log.Printf("redis=%s", addr) redisCli := redis.NewClient(&redis.Options{ Addr: addr, }) @@ -60,11 +60,9 @@ func TestRedisDriver_Stop(t *testing.T) { drv2.Init(t.Name(), driver.NewTimeoutOption(5*time.Second), driver.NewLoggerOption(dlog.NewLoggerForTest(t))) - err = drv2.Start(context.Background()) - require.Nil(t, err) - err = drv1.Start(context.Background()) - require.Nil(t, err) + require.Nil(t, drv2.Start(context.Background())) + require.Nil(t, drv1.Start(context.Background())) nodes, err = drv1.GetNodes(context.Background()) require.Nil(t, err) @@ -87,6 +85,10 @@ func TestRedisDriver_Stop(t *testing.T) { nodes, err = drv2.GetNodes(context.Background()) require.Nil(t, err) require.Len(t, nodes, 2) + nodes, err = drv1.GetNodes(context.Background()) + require.Nil(t, err) + require.Len(t, nodes, 2) drv2.Stop(context.Background()) + drv1.Stop(context.Background()) } diff --git a/driver/rediszsetdriver.go b/driver/rediszsetdriver.go index a1372ed..756a23c 100644 --- a/driver/rediszsetdriver.go +++ b/driver/rediszsetdriver.go @@ -21,7 +21,7 @@ type RedisZSetDriver struct { started bool // this context is used to define - // the life time of this driver. + // the lifetime of this driver. runtimeCtx context.Context runtimeCancel context.CancelFunc diff --git a/driver/rediszsetdriver_test.go b/driver/rediszsetdriver_test.go index 4302093..8171f2c 100644 --- a/driver/rediszsetdriver_test.go +++ b/driver/rediszsetdriver_test.go @@ -8,7 +8,7 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/libi/dcron/dlog" "github.com/libi/dcron/driver" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" ) @@ -58,11 +58,9 @@ func TestRedisZSetDriver_Stop(t *testing.T) { drv2.Init(t.Name(), driver.NewTimeoutOption(5*time.Second), driver.NewLoggerOption(dlog.NewLoggerForTest(t))) - err = drv2.Start(context.Background()) - require.Nil(t, err) - err = drv1.Start(context.Background()) - require.Nil(t, err) + require.Nil(t, drv2.Start(context.Background())) + require.Nil(t, drv1.Start(context.Background())) nodes, err = drv1.GetNodes(context.Background()) require.Nil(t, err) @@ -85,6 +83,10 @@ func TestRedisZSetDriver_Stop(t *testing.T) { nodes, err = drv2.GetNodes(context.Background()) require.Nil(t, err) require.Len(t, nodes, 2) + nodes, err = drv1.GetNodes(context.Background()) + require.Nil(t, err) + require.Len(t, nodes, 2) drv2.Stop(context.Background()) + drv1.Stop(context.Background()) } diff --git a/driver/util.go b/driver/util.go index dfe20e9..0a3d01f 100644 --- a/driver/util.go +++ b/driver/util.go @@ -6,7 +6,7 @@ import ( "github.com/google/uuid" ) -// GlobalKeyPrefix is global redis key preifx +// GlobalKeyPrefix is a global redis key prefix const GlobalKeyPrefix = "distributed-cron:" func GetKeyPre(serviceName string) string { diff --git a/examples/example/example.go b/examples/example/example.go index 6764026..7fae1ef 100644 --- a/examples/example/example.go +++ b/examples/example/example.go @@ -64,9 +64,12 @@ func main() { }) driver := driver.NewRedisDriver(redisCli) logger := &dlog.StdLogger{ - Log: log.New(os.Stdout, "["+*subId+"]", log.LstdFlags), + Log: log.New(os.Stdout, "["+*subId+"]", log.LstdFlags), + LogVerbose: true, } - dcron := dcron.NewDcronWithOption(*serverName, driver, + dcron := dcron.NewDcronWithOption( + *serverName, + driver, dcron.WithLogger(logger), dcron.WithHashReplicas(10), dcron.WithNodeUpdateDuration(time.Second*10), diff --git a/go.mod b/go.mod index c791864..e0cfe1f 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/alicebob/miniredis/v2 v2.31.0 github.com/google/uuid v1.4.0 github.com/redis/go-redis/v9 v9.3.1 - github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.8.4 go.etcd.io/etcd/api/v3 v3.5.11 go.etcd.io/etcd/client/v3 v3.5.11 diff --git a/go.sum b/go.sum index a7fae8b..7febefa 100644 --- a/go.sum +++ b/go.sum @@ -124,8 +124,6 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds= github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= -github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= -github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= diff --git a/inodepool_test.go b/inodepool_test.go index 7f0c6ba..ce701e3 100644 --- a/inodepool_test.go +++ b/inodepool_test.go @@ -47,6 +47,12 @@ func (ts *TestINodePoolSuite) setUpEtcd() { ts.etcdsvr = integration.NewLazyCluster() } +func (ts *TestINodePoolSuite) stopAllNodePools(nodePools []dcron.INodePool) { + for _, nodePool := range nodePools { + nodePool.Stop(context.Background()) + } +} + func (ts *TestINodePoolSuite) declareRedisDrivers(clients *[]*redis.Client, drivers *[]driver.DriverV2, numberOfNodes int) { for i := 0; i < numberOfNodes; i++ { *clients = append(*clients, redis.NewClient(&redis.Options{ @@ -116,6 +122,7 @@ func (ts *TestINodePoolSuite) TestMultiNodesRedis() { nodePools = append(nodePools, dcron.NewNodePool(ServiceName, drivers[i], updateDuration, ts.defaultHashReplicas, nil)) } ts.runCheckJobAvailable(numberOfNodes, ServiceName, &nodePools, updateDuration) + ts.stopAllNodePools(nodePools) } func (ts *TestINodePoolSuite) TestMultiNodesEtcd() { @@ -134,6 +141,7 @@ func (ts *TestINodePoolSuite) TestMultiNodesEtcd() { nodePools = append(nodePools, dcron.NewNodePool(ServiceName, drivers[i], updateDuration, ts.defaultHashReplicas, nil)) } ts.runCheckJobAvailable(numberOfNodes, ServiceName, &nodePools, updateDuration) + ts.stopAllNodePools(nodePools) } func (ts *TestINodePoolSuite) TestMultiNodesRedisZSet() { @@ -152,6 +160,7 @@ func (ts *TestINodePoolSuite) TestMultiNodesRedisZSet() { nodePools = append(nodePools, dcron.NewNodePool(ServiceName, drivers[i], updateDuration, ts.defaultHashReplicas, nil)) } ts.runCheckJobAvailable(numberOfNodes, ServiceName, &nodePools, updateDuration) + ts.stopAllNodePools(nodePools) } func TestTestINodePoolSuite(t *testing.T) { diff --git a/job_warpper.go b/job_warpper.go index a79c9f9..fd78e78 100644 --- a/job_warpper.go +++ b/job_warpper.go @@ -1,6 +1,6 @@ package dcron -import "github.com/robfig/cron/v3" +import "github.com/libi/dcron/cron" // Job Interface type Job interface { @@ -17,17 +17,16 @@ type StableJob interface { UnSerialize([]byte) error } -//JobWarpper is a job warpper +// JobWarpper is a job warpper type JobWarpper struct { ID cron.EntryID Dcron *Dcron Name string CronStr string - Func func() Job Job } -//Run is run job +// Run is run job func (job JobWarpper) Run() { //如果该任务分配给了这个节点 则允许执行 if job.Dcron.allowThisNodeRun(job.Name) { @@ -36,9 +35,6 @@ func (job JobWarpper) Run() { } func (job JobWarpper) Execute() { - if job.Func != nil { - job.Func() - } if job.Job != nil { job.Job.Run() } diff --git a/option.go b/option.go index c61217f..af47004 100644 --- a/option.go +++ b/option.go @@ -3,8 +3,8 @@ package dcron import ( "time" + "github.com/libi/dcron/cron" "github.com/libi/dcron/dlog" - "github.com/robfig/cron/v3" ) // Option is Dcron Option @@ -15,27 +15,11 @@ func WithLogger(logger dlog.Logger) Option { return func(dcron *Dcron) { //set dcron logger dcron.logger = logger - //set cron logger - var cronLogger cron.Logger - - if dcron.logInfo { - cronLogger = cron.VerbosePrintfLogger(logger) - } else { - cronLogger = cron.PrintfLogger(logger) - } - - f := cron.WithLogger(cronLogger) + f := cron.WithLogger(logger) dcron.crOptions = append(dcron.crOptions, f) } } -// PrintLogInfo set log info level -func WithPrintLogInfo() Option { - return func(dcron *Dcron) { - dcron.logInfo = true - } -} - // WithNodeUpdateDuration set node update duration func WithNodeUpdateDuration(d time.Duration) Option { return func(dcron *Dcron) { @@ -50,7 +34,7 @@ func WithHashReplicas(d int) Option { } } -//CronOptionLocation is warp cron with location +// CronOptionLocation is warp cron with location func CronOptionLocation(loc *time.Location) Option { return func(dcron *Dcron) { f := cron.WithLocation(loc) @@ -58,7 +42,7 @@ func CronOptionLocation(loc *time.Location) Option { } } -//CronOptionSeconds is warp cron with seconds +// CronOptionSeconds is warp cron with seconds func CronOptionSeconds() Option { return func(dcron *Dcron) { f := cron.WithSeconds()