Skip to content

Commit

Permalink
Exponentially backoff when out of background workers
Browse files Browse the repository at this point in the history
The scheduler detects the following three types of job failures:

1.Jobs that fail to launch (due to shortage of background workers)
2.Jobs that throw a runtime error
3.Jobs that crash due to a process crashing

In cases 2 and 3, additive backoff is applied in calculating the next
start time of a failed job.
In case 1 we previously retried to launch all jobs that failed to launch
simultaneously.

This commit introduces exponential backoff in case 1,
randomly selecting a wait time in [2, 2 + 2^f] seconds at microsecond granularity.
The aim is to reduce the collision probability for jobs that compete
for a background worker. The maximum backoff value is 1 minute.
It does not change the behavior for cases 2 and 3.

Fixes #4562
  • Loading branch information
konskov committed Aug 31, 2022
1 parent 1d4f90b commit 5d22571
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 35 deletions.
59 changes: 47 additions & 12 deletions src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,10 @@ calculate_jitter_percent()
return ldexp((double) (16 - (int) (percent % 32)), -7);
}

/* For failures we have additive backoff based on consecutive failures
* along with a ceiling at schedule_interval * MAX_INTERVALS_BACKOFF
* We also limit the additive backoff in case of consecutive failures as we don't
/* For failures we have backoff based on consecutive failures
* along with a ceiling at schedule_interval * MAX_INTERVALS_BACKOFF / 1 minute
* for jobs failing at runtime / for jobs failing to launch.
* We also limit the backoff in case of consecutive failures as we don't
* want to pass in input that leads to out of range timestamps and don't want to
* put off the next start time for the job indefinitely
*/
Expand All @@ -198,9 +199,14 @@ calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failure
TimestampTz res = 0;
volatile bool res_set = false;
TimestampTz last_finish = finish_time;
bool launch_failure = (job == NULL);
float8 multiplier = (consecutive_failures > MAX_FAILURES_MULTIPLIER ? MAX_FAILURES_MULTIPLIER :
consecutive_failures);
MemoryContext oldctx;
Assert(consecutive_failures > 0);
int64 max_slots =
(1 << consecutive_failures) - 1; /* 2^(consecutive_failures) - 1, at most 2^20 */
int64 rand_backoff = random() % (max_slots * USECS_PER_SEC);

if (!IS_VALID_TIMESTAMP(finish_time))
{
Expand All @@ -211,15 +217,31 @@ calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failure
BeginInternalSubTransaction("next start on failure");
PG_TRY();
{
/* ival = retry_period * (consecutive_failures) */
Datum ival = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&job->fd.retry_period),
Float8GetDatum(multiplier));

Datum ival;
/* ival_max is the ceiling = MAX_INTERVALS_BACKOFF * schedule_interval */
Datum ival_max = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&job->fd.schedule_interval),
Float8GetDatum(MAX_INTERVALS_BACKOFF));
Datum ival_max;
// max wait time to launch job is 1 minute
Interval interval_max = { .time = 60000000 };
Interval retry_ival = { .time = 2000000 };
retry_ival.time += rand_backoff;

if (launch_failure)
{
// random backoff seconds in [2, 2 + 2^f]
ival = IntervalPGetDatum(&retry_ival);
ival_max = IntervalPGetDatum(&interval_max);
}
else
{
/* ival = retry_period * (consecutive_failures) */
ival = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&job->fd.retry_period),
Float8GetDatum(multiplier));
/* ival_max is the ceiling = MAX_INTERVALS_BACKOFF * schedule_interval */
ival_max = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&job->fd.schedule_interval),
Float8GetDatum(MAX_INTERVALS_BACKOFF));
}

if (DatumGetInt32(DirectFunctionCall2(interval_cmp, ival, ival_max)) > 0)
ival = ival_max;
Expand Down Expand Up @@ -258,6 +280,16 @@ calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failure
return res;
}

static TimestampTz
calculate_next_start_on_failed_launch(int consecutive_failed_launches)
{
TimestampTz now = ts_timer_get_current_timestamp();
TimestampTz failure_calc =
calculate_next_start_on_failure(now, consecutive_failed_launches, NULL);

return failure_calc;
}

/* For crashes, the logic is the similar as for failures except we also have
* a minimum wait after a crash that we wait, so that if an operator needs to disable the job,
* there will be enough time before another crash.
Expand Down Expand Up @@ -523,8 +555,11 @@ ts_bgw_job_stat_should_execute(BgwJobStat *jobstat, BgwJob *job)
}

TimestampTz
ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job)
ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job, int32 consecutive_failed_launches)
{
/* give the system some room to breathe, wait before trying to launch again */
if (consecutive_failed_launches > 0)
return calculate_next_start_on_failed_launch(consecutive_failed_launches);
if (jobstat == NULL)
/* Never previously run - run right away */
return DT_NOBEGIN;
Expand Down
3 changes: 2 additions & 1 deletion src/bgw/job_stat.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extern TSDLLEXPORT void ts_bgw_job_stat_upsert_next_start(int32 bgw_job_id, Time

extern bool ts_bgw_job_stat_should_execute(BgwJobStat *jobstat, BgwJob *job);

extern TimestampTz ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job);
extern TimestampTz ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job,
int32 consecutive_failed_starts);

#endif /* BGW_JOB_STAT_H */
8 changes: 7 additions & 1 deletion src/bgw/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ typedef struct ScheduledBgwJob
* perform the mark_end
*/
bool may_need_mark_end;
int32 consecutive_failed_launches;
} ScheduledBgwJob;

static void on_failure_to_start_job(ScheduledBgwJob *sjob);
Expand Down Expand Up @@ -156,6 +157,7 @@ static void
mark_job_as_started(ScheduledBgwJob *sjob)
{
Assert(!sjob->may_need_mark_end);
sjob->consecutive_failed_launches = 0;
ts_bgw_job_stat_mark_start(sjob->job.fd.id);
sjob->may_need_mark_end = true;
}
Expand Down Expand Up @@ -260,7 +262,10 @@ scheduled_bgw_job_transition_state_to(ScheduledBgwJob *sjob, JobState new_state)
job_stat = ts_bgw_job_stat_find(sjob->job.fd.id);

Assert(!sjob->reserved_worker);
sjob->next_start = ts_bgw_job_stat_next_start(job_stat, &sjob->job);
// if sjob->consecutive_failed_launches > 0, give the system some time to breathe,
// do not attempt to immediately re-run the job.
sjob->next_start =
ts_bgw_job_stat_next_start(job_stat, &sjob->job, sjob->consecutive_failed_launches);
break;
case JOB_STATE_STARTED:
Assert(prev_state == JOB_STATE_SCHEDULED);
Expand Down Expand Up @@ -288,6 +293,7 @@ scheduled_bgw_job_transition_state_to(ScheduledBgwJob *sjob, JobState new_state)
"failed to launch job %d \"%s\": out of background workers",
sjob->job.fd.id,
NameStr(sjob->job.fd.application_name));
sjob->consecutive_failed_launches++;
scheduled_bgw_job_transition_state_to(sjob, JOB_STATE_SCHEDULED);
CommitTransactionCommand();
MemoryContextSwitchTo(scratch_mctx);
Expand Down
35 changes: 18 additions & 17 deletions tsl/test/expected/bgw_db_scheduler.out
Original file line number Diff line number Diff line change
Expand Up @@ -1077,14 +1077,15 @@ ORDER BY job_id;
1012 | t | 1 | 1 | 0 | 0 | 0
(7 rows)

--but after the first batch finishes and 1 second (START_RETRY_MS) passes, the last job will run.
SELECT ts_bgw_params_reset_time(1000000, true); --set to second 1
--but after the first batch finishes and 1 second (START_RETRY_MS) plus 2 seconds
-- backoff pass, the last job will run.
SELECT ts_bgw_params_reset_time(3000000, true); --set to second 3
ts_bgw_params_reset_time
--------------------------

(1 row)

SELECT wait_for_timer_to_run(1000000);
SELECT wait_for_timer_to_run(3000000);
wait_for_timer_to_run
-----------------------
t
Expand Down Expand Up @@ -1124,20 +1125,20 @@ ORDER BY job_id;
1013 | t | 1 | 1 | 0 | 0 | 0
(8 rows)

SELECT * FROM bgw_log WHERE application_name = 'DB Scheduler' ORDER BY mock_time, application_name, msg_no;
msg_no | mock_time | application_name | msg
--------+-----------+------------------+--------------------------------------------------------------------------
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
1 | 0 | DB Scheduler | [TESTING] Registered new background worker
2 | 0 | DB Scheduler | [TESTING] Registered new background worker
3 | 0 | DB Scheduler | [TESTING] Registered new background worker
4 | 0 | DB Scheduler | [TESTING] Registered new background worker
5 | 0 | DB Scheduler | [TESTING] Registered new background worker
6 | 0 | DB Scheduler | [TESTING] Registered new background worker
7 | 0 | DB Scheduler | failed to launch job 1013 "test_job_3_long_8": out of background workers
8 | 0 | DB Scheduler | [TESTING] Wait until 1000000, started at 0
9 | 1000000 | DB Scheduler | [TESTING] Registered new background worker
10 | 1000000 | DB Scheduler | [TESTING] Wait until 5000000, started at 1000000
SELECT * FROM sorted_bgw_log WHERE application_name = 'DB Scheduler' ORDER BY application_name, msg_no;
msg_no | application_name | msg
--------+------------------+--------------------------------------------------------------------------
0 | DB Scheduler | [TESTING] Registered new background worker
1 | DB Scheduler | [TESTING] Registered new background worker
2 | DB Scheduler | [TESTING] Registered new background worker
3 | DB Scheduler | [TESTING] Registered new background worker
4 | DB Scheduler | [TESTING] Registered new background worker
5 | DB Scheduler | [TESTING] Registered new background worker
6 | DB Scheduler | [TESTING] Registered new background worker
7 | DB Scheduler | failed to launch job 1013 "test_job_3_long_8": out of background workers
8 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
9 | DB Scheduler | [TESTING] Registered new background worker
10 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
(11 rows)

SELECT ts_bgw_params_destroy();
Expand Down
9 changes: 5 additions & 4 deletions tsl/test/sql/bgw_db_scheduler.sql
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,10 @@ SELECT job_id, last_run_success, total_runs, total_successes, total_failures, to
FROM _timescaledb_internal.bgw_job_stat
ORDER BY job_id;

--but after the first batch finishes and 1 second (START_RETRY_MS) passes, the last job will run.
SELECT ts_bgw_params_reset_time(1000000, true); --set to second 1
SELECT wait_for_timer_to_run(1000000);
--but after the first batch finishes and 1 second (START_RETRY_MS) plus 2 seconds
-- backoff pass, the last job will run.
SELECT ts_bgw_params_reset_time(3000000, true); --set to second 3
SELECT wait_for_timer_to_run(3000000);
SELECT wait_for_job_3_to_finish(8);

SELECT ts_bgw_params_reset_time(30000000, true); --set to second 30, which causes a quit.
Expand All @@ -469,7 +470,7 @@ SELECT job_id, last_run_success, total_runs, total_successes, total_failures, to
FROM _timescaledb_internal.bgw_job_stat
ORDER BY job_id;

SELECT * FROM bgw_log WHERE application_name = 'DB Scheduler' ORDER BY mock_time, application_name, msg_no;
SELECT * FROM sorted_bgw_log WHERE application_name = 'DB Scheduler' ORDER BY application_name, msg_no;

SELECT ts_bgw_params_destroy();

Expand Down

0 comments on commit 5d22571

Please sign in to comment.