Skip to content

Commit

Permalink
make dcron running locally, update cron test, add GetJobs / GetJob fu…
Browse files Browse the repository at this point in the history
…nction for dcron. (#89)
  • Loading branch information
dxyinme authored Feb 6, 2024
1 parent 0ff369b commit c9eca56
Show file tree
Hide file tree
Showing 9 changed files with 425 additions and 70 deletions.
8 changes: 3 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Redis Server in GitHub Actions
uses: supercharge/redis-github-action@1.7.0
- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go_version }}

- name: Test
run: go test -v -timeout 30m -coverprofile=coverage.txt -covermode=atomic ./...

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

4 changes: 3 additions & 1 deletion cron/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# cron

fork from [robfig/cron](github.com/robfig/cron)
fork from [robfig/cron](github.com/robfig/cron)

maintain by Dcron opensource team.
40 changes: 26 additions & 14 deletions cron/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,37 +116,45 @@ func TestChainDelayIfStillRunning(t *testing.T) {
t.Run("second run immediate if first done", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
<-time.After(time.Millisecond)
go wrappedJob.Run()
wg.Done()
}()
time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
wg.Wait()
<-time.After(3 * time.Millisecond)
if c := j.Done(); c != 2 {
t.Errorf("expected job run twice, immediately, got %d", c)
}
})

t.Run("second run delayed if first not done", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
j.delay = 100 * time.Millisecond
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
<-time.After(10 * time.Millisecond)
go wrappedJob.Run()
wg.Done()
}()

// After 5ms, the first job is still in progress, and the second job was
wg.Wait()
// After 50ms, the first job is still in progress, and the second job was
// run but should be waiting for it to finish.
time.Sleep(5 * time.Millisecond)
<-time.After(50 * time.Millisecond)
started, done := j.Started(), j.Done()
if started != 1 || done != 0 {
t.Error("expected first job started, but not finished, got", started, done)
}

// Verify that the second job completes.
time.Sleep(25 * time.Millisecond)
<-time.After(220 * time.Millisecond)
started, done = j.Started(), j.Done()
if started != 2 || done != 2 {
t.Error("expected both jobs done, got", started, done)
Expand All @@ -169,37 +177,41 @@ func TestChainSkipIfStillRunning(t *testing.T) {
t.Run("second run immediate if first done", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
<-time.After(time.Millisecond)
go wrappedJob.Run()
wg.Done()
}()
time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
wg.Wait()
<-time.After(3 * time.Millisecond) // Give both jobs 3ms to complete.
if c := j.Done(); c != 2 {
t.Errorf("expected job run twice, immediately, got %d", c)
}
})

t.Run("second run skipped if first not done", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
j.delay = 100 * time.Millisecond
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
<-time.After(10 * time.Millisecond)
go wrappedJob.Run()
}()

// After 5ms, the first job is still in progress, and the second job was
// After 50ms, the first job is still in progress, and the second job was
// aleady skipped.
time.Sleep(5 * time.Millisecond)
<-time.After(50 * time.Millisecond)
started, done := j.Started(), j.Done()
if started != 1 || done != 0 {
t.Error("expected first job started, but not finished, got", started, done)
}

// Verify that the first job completes and second does not run.
time.Sleep(25 * time.Millisecond)
<-time.After(220 * time.Millisecond)
started, done = j.Started(), j.Done()
if started != 1 || done != 1 {
t.Error("expected second job skipped, got", started, done)
Expand Down
106 changes: 93 additions & 13 deletions dcron.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ const (
dcronStateUpgrade = "dcronStateUpgrade"
)

var (
ErrJobExist = errors.New("jobName already exist")
ErrJobNotExist = errors.New("jobName not exist")
ErrJobWrongNode = errors.New("job is not running in this node")
)

type RecoverFuncType func(d *Dcron)

// Dcron is main struct
type Dcron struct {
jobs map[string]*JobWarpper
jobsRWMut sync.Mutex
jobsRWMut sync.RWMutex

ServerName string
nodePool INodePool
Expand All @@ -48,6 +54,8 @@ type Dcron struct {

recentJobs IRecentJobPacker
state atomic.Value

runningLocally bool
}

// NewDcron create a Dcron
Expand All @@ -68,7 +76,9 @@ func NewDcronWithOption(serverName string, driver driver.DriverV2, dcronOpts ...
}

dcron.cr = cron.New(dcron.crOptions...)
dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger)
if !dcron.runningLocally {
dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger)
}
return dcron
}

Expand Down Expand Up @@ -110,7 +120,7 @@ func (d *Dcron) addJob(jobName, cronStr string, job Job) (err error) {
d.jobsRWMut.Lock()
defer d.jobsRWMut.Unlock()
if _, ok := d.jobs[jobName]; ok {
return errors.New("jobName already exist")
return ErrJobExist
}
innerJob := JobWarpper{
Name: jobName,
Expand All @@ -127,7 +137,7 @@ func (d *Dcron) addJob(jobName, cronStr string, job Job) (err error) {
return nil
}

// Remove Job
// Remove Job by jobName
func (d *Dcron) Remove(jobName string) {
d.jobsRWMut.Lock()
defer d.jobsRWMut.Unlock()
Expand All @@ -138,7 +148,71 @@ func (d *Dcron) Remove(jobName string) {
}
}

// Get job by jobName
// if this jobName not exist, will return error.
//
// if `thisNodeOnly` is true
// if this job is not available in this node, will return error.
// otherwise return the struct of JobWarpper whose name is jobName.
func (d *Dcron) GetJob(jobName string, thisNodeOnly bool) (*JobWarpper, error) {
d.jobsRWMut.RLock()
defer d.jobsRWMut.RUnlock()

job, ok := d.jobs[jobName]
if !ok {
d.logger.Warnf("job: %s, not exist", jobName)
return nil, ErrJobNotExist
}
if !thisNodeOnly {
return job, nil
}
isRunningHere, err := d.nodePool.CheckJobAvailable(jobName)
if err != nil {
return nil, err
}
if isRunningHere {
return nil, ErrJobWrongNode
}
return job, nil
}

// Get job list.
//
// if `thisNodeOnly` is true
// return all jobs available in this node.
// otherwise return all jobs added to dcron.
//
// we never return nil. If there is no job.
// this func will return an empty slice.
func (d *Dcron) GetJobs(thisNodeOnly bool) []*JobWarpper {
d.jobsRWMut.RLock()
defer d.jobsRWMut.RUnlock()

ret := make([]*JobWarpper, 0)
for _, v := range d.jobs {
var (
isRunningHere bool
ok bool = true
err error
)
if thisNodeOnly {
isRunningHere, err = d.nodePool.CheckJobAvailable(v.Name)
if err != nil {
continue
}
ok = isRunningHere
}
if ok {
ret = append(ret, v)
}
}
return ret
}

func (d *Dcron) allowThisNodeRun(jobName string) (ok bool) {
if d.runningLocally {
return true
}
ok, err := d.nodePool.CheckJobAvailable(jobName)
if err != nil {
d.logger.Errorf("allow this node run error, err=%v", err)
Expand All @@ -165,12 +239,14 @@ func (d *Dcron) Start() {
d.RecoverFunc(d)
}
if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) {
if err := d.startNodePool(); err != nil {
atomic.StoreInt32(&d.running, dcronStopped)
return
if !d.runningLocally {
if err := d.startNodePool(); err != nil {
atomic.StoreInt32(&d.running, dcronStopped)
return
}
d.logger.Infof("dcron started, nodeID is %s", d.nodePool.GetNodeID())
}
d.cr.Start()
d.logger.Infof("dcron started, nodeID is %s", d.nodePool.GetNodeID())
} else {
d.logger.Infof("dcron have started")
}
Expand All @@ -183,11 +259,13 @@ func (d *Dcron) Run() {
d.RecoverFunc(d)
}
if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) {
if err := d.startNodePool(); err != nil {
atomic.StoreInt32(&d.running, dcronStopped)
return
if !d.runningLocally {
if err := d.startNodePool(); err != nil {
atomic.StoreInt32(&d.running, dcronStopped)
return
}
d.logger.Infof("dcron running, nodeID is %s", d.nodePool.GetNodeID())
}
d.logger.Infof("dcron running, nodeID is %s", d.nodePool.GetNodeID())
d.cr.Run()
} else {
d.logger.Infof("dcron already running")
Expand All @@ -205,7 +283,9 @@ func (d *Dcron) startNodePool() error {
// Stop job
func (d *Dcron) Stop() {
tick := time.NewTicker(time.Millisecond)
d.nodePool.Stop(context.Background())
if !d.runningLocally {
d.nodePool.Stop(context.Background())
}
for range tick.C {
if atomic.CompareAndSwapInt32(&d.running, dcronRunning, dcronStopped) {
d.cr.Stop()
Expand Down
51 changes: 51 additions & 0 deletions dcron_locally_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package dcron_test

import (
"testing"
"time"

"github.com/libi/dcron"
"github.com/libi/dcron/cron"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

type DcronLocallyTestSuite struct {
suite.Suite
}

func (s *DcronLocallyTestSuite) TestNormal() {
dcr := dcron.NewDcronWithOption(
"not a necessary servername",
nil,
dcron.RunningLocally(),
dcron.CronOptionSeconds(),
dcron.CronOptionChain(
cron.Recover(
cron.DefaultLogger,
)))

s.Assert().NotNil(dcr)
runningTime := 60 * time.Second
t := s.T()
var err error
err = dcr.AddFunc("job1", "*/5 * * * * *", func() {
t.Log(time.Now())
})
require.Nil(t, err)
err = dcr.AddFunc("job2", "*/8 * * * * *", func() {
panic("test panic")
})
require.Nil(t, err)
err = dcr.AddFunc("job3", "*/2 * * * * *", func() {
t.Log("job3:", time.Now())
})
require.Nil(t, err)
dcr.Start()
<-time.After(runningTime)
dcr.Stop()
}

func TestDcronLocallyTestSuite(t *testing.T) {
suite.Run(t, &DcronLocallyTestSuite{})
}
Loading

0 comments on commit c9eca56

Please sign in to comment.