Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
dxyinme committed May 20, 2024
1 parent b0e3dac commit 77e6c60
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
7 changes: 4 additions & 3 deletions dcron.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,18 +281,19 @@ func (d *Dcron) startNodePool() error {
}

// Stop job
func (d *Dcron) Stop() {
func (d *Dcron) Stop() context.Context {
tick := time.NewTicker(time.Millisecond)
if !d.runningLocally {
d.nodePool.Stop(context.Background())
}
for range tick.C {
if atomic.CompareAndSwapInt32(&d.running, dcronRunning, dcronStopped) {
d.cr.Stop()
d.logger.Infof("dcron stopped")
return
return d.cr.Stop()
}
}
// We ensure this function won't return nil.
return nil
}

func (d *Dcron) reRunRecentJobs(jobNames []string) {
Expand Down
8 changes: 4 additions & 4 deletions inodepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (ts *TestINodePoolSuite) declareRedisZSetDrivers(clients *[]*redis.Client,
}
}

func (ts *TestINodePoolSuite) runCheckJobAvailable(numberOfNodes int, ServiceName string, nodePools *[]dcron.INodePool, updateDuration time.Duration) {
func (ts *TestINodePoolSuite) runCheckJobAvailable(numberOfNodes int, nodePools *[]dcron.INodePool, updateDuration time.Duration) {
for i := 0; i < numberOfNodes; i++ {
err := (*nodePools)[i].Start(context.Background())
ts.Require().Nil(err)
Expand Down Expand Up @@ -123,7 +123,7 @@ func (ts *TestINodePoolSuite) TestMultiNodesRedis() {
for i := 0; i < numberOfNodes; i++ {
nodePools = append(nodePools, dcron.NewNodePool(ServiceName, drivers[i], updateDuration, ts.defaultHashReplicas, nil))
}
ts.runCheckJobAvailable(numberOfNodes, ServiceName, &nodePools, updateDuration)
ts.runCheckJobAvailable(numberOfNodes, &nodePools, updateDuration)
ts.stopAllNodePools(nodePools)
}

Expand All @@ -142,7 +142,7 @@ func (ts *TestINodePoolSuite) TestMultiNodesEtcd() {
for i := 0; i < numberOfNodes; i++ {
nodePools = append(nodePools, dcron.NewNodePool(ServiceName, drivers[i], updateDuration, ts.defaultHashReplicas, nil))
}
ts.runCheckJobAvailable(numberOfNodes, ServiceName, &nodePools, updateDuration)
ts.runCheckJobAvailable(numberOfNodes, &nodePools, updateDuration)
ts.stopAllNodePools(nodePools)
}

Expand All @@ -161,7 +161,7 @@ func (ts *TestINodePoolSuite) TestMultiNodesRedisZSet() {
for i := 0; i < numberOfNodes; i++ {
nodePools = append(nodePools, dcron.NewNodePool(ServiceName, drivers[i], updateDuration, ts.defaultHashReplicas, nil))
}
ts.runCheckJobAvailable(numberOfNodes, ServiceName, &nodePools, updateDuration)
ts.runCheckJobAvailable(numberOfNodes, &nodePools, updateDuration)
ts.stopAllNodePools(nodePools)
}

Expand Down

0 comments on commit 77e6c60

Please sign in to comment.