From 2a0551af26e12b454a0b8f030630ddba34982fa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E4=B8=B0?= Date: Thu, 11 Jan 2024 15:36:28 +0800 Subject: [PATCH] Update task: reduce the expiration of coordinator key. (#16) --- task/task.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/task/task.go b/task/task.go index a5f2105..257a9c7 100644 --- a/task/task.go +++ b/task/task.go @@ -353,7 +353,7 @@ func (coordinator *Coordinator) coordinate(jobName string, scheduledTime time.Ti if scheduledInterval == 0 { redisKeyDuration = 5 * time.Second } else { - redisKeyDuration = scheduledInterval + redisKeyDuration = coordinator.getCoordinatorKeyExpiration(scheduledInterval) } if err != nil { if err == redis.Nil { @@ -404,6 +404,13 @@ func (coordinator *Coordinator) coordinate(jobName string, scheduledTime time.Ti return stat, err } +func (coordinator *Coordinator) getCoordinatorKeyExpiration(scheduledInterval time.Duration) time.Duration { + if scheduledInterval <= time.Second { + return scheduledInterval + } + return scheduledInterval - time.Second +} + func (coordinator *Coordinator) isJobSchedulable(name string, scheduledTime time.Time, scheduledInterval time.Duration) (bool, error) { key := coordinator.getCoordinatorKey(name) scheduledTime = scheduledTime.Truncate(time.Second) @@ -462,7 +469,7 @@ func (coordinator *Coordinator) getScheduledTime(jobName string) (time.Time, err value, err := client.Get(contextTODO, key).Result() if err != nil { if err == redis.Nil { - err = nil + return time.Time{}, nil } return time.Time{}, newCoordinateError(err) }