Skip to content

Commit

Permalink
Update task: reduce the expiration of coordinator key. (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoufeng1989 authored Jan 11, 2024
1 parent 89c86b0 commit 2a0551a
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (coordinator *Coordinator) coordinate(jobName string, scheduledTime time.Ti
if scheduledInterval == 0 {
redisKeyDuration = 5 * time.Second
} else {
redisKeyDuration = scheduledInterval
redisKeyDuration = coordinator.getCoordinatorKeyExpiration(scheduledInterval)
}
if err != nil {
if err == redis.Nil {
Expand Down Expand Up @@ -404,6 +404,13 @@ func (coordinator *Coordinator) coordinate(jobName string, scheduledTime time.Ti
return stat, err
}

func (coordinator *Coordinator) getCoordinatorKeyExpiration(scheduledInterval time.Duration) time.Duration {
if scheduledInterval <= time.Second {
return scheduledInterval
}
return scheduledInterval - time.Second
}

func (coordinator *Coordinator) isJobSchedulable(name string, scheduledTime time.Time, scheduledInterval time.Duration) (bool, error) {
key := coordinator.getCoordinatorKey(name)
scheduledTime = scheduledTime.Truncate(time.Second)
Expand Down Expand Up @@ -462,7 +469,7 @@ func (coordinator *Coordinator) getScheduledTime(jobName string) (time.Time, err
value, err := client.Get(contextTODO, key).Result()
if err != nil {
if err == redis.Nil {
err = nil
return time.Time{}, nil
}
return time.Time{}, newCoordinateError(err)
}
Expand Down

0 comments on commit 2a0551a

Please sign in to comment.