Skip to content

Commit

Permalink
The scheduler detects the following three types of job failures:
Browse files Browse the repository at this point in the history
1. Jobs that fail to launch (due to shortage of background workers)
2. Jobs that throw a runtime error
3. Jobs that crash due to a process crashing
In cases 2 and 3, additive backoff was applied in calculating the next
start time of a failed job. In case 1, no backoff was added and the
scheduler immediately re-attempted to launch the job.

This commit converts the calculation of backoff from additive to
exponential, doubling the wait time between consecutive failures. The
maximum backoff for cases 2,3 is SCHEDULE_INTERVAL.
It also adds exponential backoff for case 1, starting with a delay of
2 seconds and doubling that up to a ceiling value of 1 minute.
For jobs that launch and complete successfully, there is no backoff
and the next_start field is calculated simply by adding the
schedule_interval to the finish time, as before.

Fixes #4562
  • Loading branch information
konskov committed Aug 23, 2022
1 parent 3acfbd0 commit b02e65e
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 58 deletions.
74 changes: 57 additions & 17 deletions src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "timer.h"
#include "utils.h"

#define MAX_INTERVALS_BACKOFF 5
#define MAX_INTERVALS_BACKOFF 1
#define MAX_FAILURES_MULTIPLIER 20
#define MIN_WAIT_AFTER_CRASH_MS (5 * 60 * 1000)

Expand Down Expand Up @@ -184,9 +184,10 @@ calculate_jitter_percent()
return ldexp((double) (16 - (int) (percent % 32)), -7);
}

/* For failures we have additive backoff based on consecutive failures
* along with a ceiling at schedule_interval * MAX_INTERVALS_BACKOFF
* We also limit the additive backoff in case of consecutive failures as we don't
/* For failures we have exponential backoff based on consecutive failures
* along with a ceiling at schedule_interval * MAX_INTERVALS_BACKOFF / 1 minute
* for jobs failing at runtime / for jobs failing to launch.
* We also limit the exponential backoff in case of consecutive failures as we don't
* want to pass in input that leads to out of range timestamps and don't want to
* put off the next start time for the job indefinitely
*/
Expand All @@ -198,8 +199,8 @@ calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failure
TimestampTz res = 0;
volatile bool res_set = false;
TimestampTz last_finish = finish_time;
float8 multiplier = (consecutive_failures > MAX_FAILURES_MULTIPLIER ? MAX_FAILURES_MULTIPLIER :
consecutive_failures);
float8 exponent = (consecutive_failures > MAX_FAILURES_MULTIPLIER ? MAX_FAILURES_MULTIPLIER :
consecutive_failures);
MemoryContext oldctx;

if (!IS_VALID_TIMESTAMP(finish_time))
Expand All @@ -211,15 +212,37 @@ calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failure
BeginInternalSubTransaction("next start on failure");
PG_TRY();
{
/* ival = retry_period * (consecutive_failures) */
Datum ival = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&job->fd.retry_period),
Float8GetDatum(multiplier));
int i;
Datum ival;
/* NULL job means the failure was a launch failure */
if (job)
ival = IntervalPGetDatum(&job->fd.retry_period);
else
{
// retry every 2 seconds
Interval retry_ival = { .time = 2000000 };
ival = IntervalPGetDatum(&retry_ival);
}

/* ival = retry_period ^ (consecutive_failures - 1) */
/* arbitrarily choose 2 for exponential growth */
for (i = 0; i < exponent - 1; i++)
{
ival = DirectFunctionCall2(interval_mul, ival, Float8GetDatum(2));
}

/* ival_max is the ceiling = MAX_INTERVALS_BACKOFF * schedule_interval */
Datum ival_max = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&job->fd.schedule_interval),
Float8GetDatum(MAX_INTERVALS_BACKOFF));
Datum ival_max;
if (job)
ival_max = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&job->fd.schedule_interval),
Float8GetDatum(MAX_INTERVALS_BACKOFF));
else
{
// max wait time to launch job is 1 minute
Interval interval_max = { .time = 60000000 };
ival_max = IntervalPGetDatum(&interval_max);
}

if (DatumGetInt32(DirectFunctionCall2(interval_cmp, ival, ival_max)) > 0)
ival = ival_max;
Expand Down Expand Up @@ -251,13 +274,27 @@ calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failure
TimestampTz nowt;
/* job->fd_retry_period is a valid non-null value */
nowt = ts_timer_get_current_timestamp();
res = DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval,
TimestampTzGetDatum(nowt),
IntervalPGetDatum(&job->fd.retry_period)));
if (job)
res =
DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval,
TimestampTzGetDatum(nowt),
IntervalPGetDatum(&job->fd.retry_period)));
else
res = DT_NOBEGIN;
}
return res;
}

static TimestampTz
calculate_next_start_on_failed_launch(int consecutive_failed_launches)
{
TimestampTz now = ts_timer_get_current_timestamp();
TimestampTz failure_calc =
calculate_next_start_on_failure(now, consecutive_failed_launches, NULL);

return failure_calc;
}

/* For crashes, the logic is the similar as for failures except we also have
* a minimum wait after a crash that we wait, so that if an operator needs to disable the job,
* there will be enough time before another crash.
Expand Down Expand Up @@ -523,8 +560,11 @@ ts_bgw_job_stat_should_execute(BgwJobStat *jobstat, BgwJob *job)
}

TimestampTz
ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job)
ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job, int32 consecutive_failed_launches)
{
/* give the system some room to breathe, wait before trying to launch again */
if (consecutive_failed_launches > 0)
return calculate_next_start_on_failed_launch(consecutive_failed_launches);
if (jobstat == NULL)
/* Never previously run - run right away */
return DT_NOBEGIN;
Expand Down
3 changes: 2 additions & 1 deletion src/bgw/job_stat.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extern TSDLLEXPORT void ts_bgw_job_stat_upsert_next_start(int32 bgw_job_id, Time

extern bool ts_bgw_job_stat_should_execute(BgwJobStat *jobstat, BgwJob *job);

extern TimestampTz ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job);
extern TimestampTz ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job,
int32 consecutive_failed_starts);

#endif /* BGW_JOB_STAT_H */
8 changes: 7 additions & 1 deletion src/bgw/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ typedef struct ScheduledBgwJob
* perform the mark_end
*/
bool may_need_mark_end;
int32 consecutive_failed_launches;
} ScheduledBgwJob;

static void on_failure_to_start_job(ScheduledBgwJob *sjob);
Expand Down Expand Up @@ -156,6 +157,7 @@ static void
mark_job_as_started(ScheduledBgwJob *sjob)
{
Assert(!sjob->may_need_mark_end);
sjob->consecutive_failed_launches = 0;
ts_bgw_job_stat_mark_start(sjob->job.fd.id);
sjob->may_need_mark_end = true;
}
Expand Down Expand Up @@ -260,7 +262,10 @@ scheduled_bgw_job_transition_state_to(ScheduledBgwJob *sjob, JobState new_state)
job_stat = ts_bgw_job_stat_find(sjob->job.fd.id);

Assert(!sjob->reserved_worker);
sjob->next_start = ts_bgw_job_stat_next_start(job_stat, &sjob->job);
// if sjob->consecutive_failed_launches > 0, give the system some time to breathe,
// do not attempt to immediately re-run the job.
sjob->next_start =
ts_bgw_job_stat_next_start(job_stat, &sjob->job, sjob->consecutive_failed_launches);
break;
case JOB_STATE_STARTED:
Assert(prev_state == JOB_STATE_SCHEDULED);
Expand Down Expand Up @@ -288,6 +293,7 @@ scheduled_bgw_job_transition_state_to(ScheduledBgwJob *sjob, JobState new_state)
"failed to launch job %d \"%s\": out of background workers",
sjob->job.fd.id,
NameStr(sjob->job.fd.application_name));
sjob->consecutive_failed_launches++;
scheduled_bgw_job_transition_state_to(sjob, JOB_STATE_SCHEDULED);
CommitTransactionCommand();
MemoryContextSwitchTo(scratch_mctx);
Expand Down
Loading

0 comments on commit b02e65e

Please sign in to comment.