Skip to content

Commit

Permalink
Restart scheduler on error
Browse files Browse the repository at this point in the history
If the scheduler receives an error, it will never restart again since
`bgw_restart_time` is set to `BGW_NEVER_RESTART`, which will prevent
all jobs from executing.

This commit adds the GUC `timescaledb.bgw_scheduler_restart_time` that
can be set to the restart time for the scheduler. It defaults
to 60 seconds, which is the default restart interval for background
workers defined by PostgreSQL.

It also adds `timescaledb.debug_bgw_scheduler_exit_status` to be able
to shutdown the scheduler with a non-zero exit status, which allows the
restart functionality to be tested.

It also ensures that `backend_type` is explicitly set up rather than
copied from `application_name` and add some more information to
`application_name`. It also updates the tests to use `backend_type`
where applicable.

To avoid exhausting slots when the launcher restarts, it will kill all
existing schedulers and start new ones.
  • Loading branch information
mkindahl committed Dec 17, 2024
1 parent 12f262c commit c167a6b
Show file tree
Hide file tree
Showing 14 changed files with 334 additions and 47 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7527
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #7527 Restart scheduler on error
1 change: 1 addition & 0 deletions src/bgw/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,7 @@ ts_bgw_scheduler_process(int32 run_for_interval_ms,
wait_for_all_jobs_to_shutdown();
check_for_stopped_and_timed_out_jobs();
scheduled_jobs = NIL;
proc_exit(ts_debug_bgw_scheduler_exit_status);
}

static void
Expand Down
6 changes: 6 additions & 0 deletions src/compat/compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,12 @@ RelationGetSmgr(Relation rel)
GenerationContextCreate(parent, name, blockSize)
#endif

#if PG16_GE
#define pgstat_get_local_beentry_by_index_compat(idx) pgstat_get_local_beentry_by_index(idx)
#else
#define pgstat_get_local_beentry_by_index_compat(idx) pgstat_fetch_stat_local_beentry(idx)
#endif

/*
* PG16 adds a new parameter to DefineIndex, total_parts, that takes
* in the total number of direct and indirect partitions of the relation.
Expand Down
27 changes: 27 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ bool ts_guc_debug_require_batch_sorted_merge = false;

bool ts_guc_debug_allow_cagg_with_deprecated_funcs = false;

/*
* Exit code for the scheduler.
*
* Normally it exits with a zero which means that it will not restart. If an
* error is raised, it exits with error code 1, which will trigger a
* restart.
*
* This variable exists to be able to trigger a restart for a normal exit,
* which is useful when debugging.
*
* See backend/postmaster/bgworker.c
*/
int ts_debug_bgw_scheduler_exit_status = 0;

#ifdef TS_DEBUG
bool ts_shutdown_bgw = false;
char *ts_current_timestamp_mock = NULL;
Expand Down Expand Up @@ -1067,6 +1081,19 @@ _guc_init(void)
/* assign_hook= */ NULL,
/* show_hook= */ NULL);

DefineCustomIntVariable(/* name= */ MAKE_EXTOPTION("debug_bgw_scheduler_exit_status"),
/* short_desc= */ "exit status to use when shutting down the scheduler",
/* long_desc= */ "this is for debugging purposes",
/* valueAddr= */ &ts_debug_bgw_scheduler_exit_status,
/* bootValue= */ 0,
/* minValue= */ 0,
/* maxValue= */ 255,
/* context= */ PGC_SIGHUP,
/* flags= */ 0,
/* check_hook= */ NULL,
/* assign_hook= */ NULL,
/* show_hook= */ NULL);

DefineCustomStringVariable(/* name= */ MAKE_EXTOPTION("current_timestamp_mock"),
/* short_desc= */ "set the current timestamp",
/* long_desc= */ "this is for debugging purposes",
Expand Down
8 changes: 8 additions & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ extern TSDLLEXPORT bool ts_guc_auto_sparse_indexes;
extern TSDLLEXPORT bool ts_guc_enable_columnarscan;
extern TSDLLEXPORT int ts_guc_bgw_log_level;

/*
* Exit code to use when scheduler exits.
*
* Mostly used for debugging, but defined also for non-debug builds since that
* simplifies the code (and also simplifies debugging non-debug builds).
*/
extern TSDLLEXPORT int ts_debug_bgw_scheduler_exit_status;

#ifdef TS_DEBUG
extern bool ts_shutdown_bgw;
extern char *ts_current_timestamp_mock;
Expand Down
129 changes: 119 additions & 10 deletions src/loader/bgw_launcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

/* BGW includes below */
/* These are always necessary for a bgworker */
#include <catalog/pg_type_d.h>
#include <miscadmin.h>
#include <postmaster/bgworker.h>
#include <storage/ipc.h>
Expand All @@ -23,6 +24,8 @@
#include <access/htup_details.h>
#include <access/xact.h>
#include <catalog/pg_database.h>
#include <executor/spi.h>
#include <utils/guc.h>
#include <utils/snapmgr.h>

/* and checking db list for whether we're in a template*/
Expand All @@ -40,11 +43,15 @@
/* for allocating the htab storage */
#include <utils/memutils.h>

#include <lib/ilist.h>
#include <postmaster/bgworker_internals.h>

/* for getting settings correct before loading the versioned scheduler */
#include "catalog/pg_db_role_setting.h"

#include "../compat/compat.h"
#include "../extension_constants.h"
#include "../utils.h"
#include "bgw_counter.h"
#include "bgw_launcher.h"
#include "bgw_message_queue.h"
Expand Down Expand Up @@ -84,6 +91,8 @@ typedef enum SchedulerState

static volatile sig_atomic_t got_SIGHUP = false;

int ts_guc_bgw_scheduler_restart_time_sec = BGW_DEFAULT_RESTART_INTERVAL;

static void
launcher_sighup(SIGNAL_ARGS)
{
Expand Down Expand Up @@ -124,6 +133,7 @@ typedef struct DbHashEntry
} DbHashEntry;

static void scheduler_state_trans_enabled_to_allocated(DbHashEntry *entry);
static void scheduler_modify_state(DbHashEntry *entry, SchedulerState new_state);

static void
bgw_on_postmaster_death(void)
Expand Down Expand Up @@ -238,13 +248,27 @@ terminate_background_worker(BackgroundWorkerHandle *handle)
}

extern void
ts_bgw_cluster_launcher_register(void)
ts_bgw_cluster_launcher_init(void)
{
BackgroundWorker worker;

DefineCustomIntVariable(/* name= */ MAKE_EXTOPTION("bgw_scheduler_restart_time"),
/* short_desc= */ "Restart time for scheduler in seconds",
/* long_desc= */
"The number of seconds until the scheduler restart on failure.",
/* valueAddr= */ &ts_guc_bgw_scheduler_restart_time_sec,
/* bootValue= */ BGW_DEFAULT_RESTART_INTERVAL,
/* minValue= */ 1,
/* maxValue= */ 3600,
/* context= */ PGC_SIGHUP,
/* flags= */ GUC_UNIT_S,
/* check_hook= */ NULL,
/* assign_hook= */ NULL,
/* show_hook= */ NULL);

memset(&worker, 0, sizeof(worker));
/* set up worker settings for our main worker */
snprintf(worker.bgw_name, BGW_MAXLEN, "TimescaleDB Background Worker Launcher");
snprintf(worker.bgw_name, BGW_MAXLEN, TS_BGW_TYPE_LAUNCHER);
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_restart_time = BGW_LAUNCHER_RESTART_TIME_S;

Expand Down Expand Up @@ -274,9 +298,10 @@ register_entrypoint_for_db(Oid db_id, VirtualTransactionId vxid, BackgroundWorke
BackgroundWorker worker;

memset(&worker, 0, sizeof(worker));
snprintf(worker.bgw_name, BGW_MAXLEN, "TimescaleDB Background Worker Scheduler");
snprintf(worker.bgw_type, BGW_MAXLEN, TS_BGW_TYPE_SCHEDULER);
snprintf(worker.bgw_name, BGW_MAXLEN, "%s for database %d", TS_BGW_TYPE_SCHEDULER, db_id);
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_restart_time = ts_guc_bgw_scheduler_restart_time_sec,
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(worker.bgw_library_name, BGW_MAXLEN, EXTENSION_NAME);
snprintf(worker.bgw_function_name, BGW_MAXLEN, BGW_ENTRYPOINT_FUNCNAME);
Expand Down Expand Up @@ -332,15 +357,89 @@ db_hash_entry_create_if_not_exists(HTAB *db_htab, Oid db_oid)
return db_he;
}

/*
* Result from signalling a backend.
*
* Error codes are non-zero, and success is zero.
*/
enum SignalBackendResult
{
SIGNAL_BACKEND_SUCCESS = 0,
SIGNAL_BACKEND_ERROR,
SIGNAL_BACKEND_NOPERMISSION,
SIGNAL_BACKEND_NOSUPERUSER,
};

/*
* Terminate a background worker.
*
* This is copied from pg_signal_backend() in
* src/backend/storage/ipc/signalfuncs.c but tweaked to not require a database
* connection since the launcher does not have one.
*/
static enum SignalBackendResult
ts_signal_backend(int pid, int sig)

Check warning on line 381 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L381

Added line #L381 was not covered by tests
{
PGPROC *proc = BackendPidGetProc(pid);

Check warning on line 383 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L383

Added line #L383 was not covered by tests

if (unlikely(proc == NULL))
{
ereport(WARNING, (errmsg("PID %d is not a PostgreSQL backend process", pid)));
return SIGNAL_BACKEND_ERROR;

Check warning on line 388 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L388

Added line #L388 was not covered by tests
}

if (unlikely(kill(pid, sig)))
{
/* Again, just a warning to allow loops */
ereport(WARNING, (errmsg("could not send signal to process %d: %m", pid)));
return SIGNAL_BACKEND_ERROR;

Check warning on line 395 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L395

Added line #L395 was not covered by tests
}

return SIGNAL_BACKEND_SUCCESS;

Check warning on line 398 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L398

Added line #L398 was not covered by tests
}

/*
* Terminate backends by backend type.
*
* We iterate through all backends and mark those that match the given backend
* type as terminated.
*
* Note that there is potentially a delay between marking backends as
* terminated and their actual termination, so the backends have to be able to
* run even if there are multiple instances accessing the same data.
*
* Parts of this code is taken from pg_stat_get_activity() in
* src/backend/utils/adt/pgstatfuncs.c.
*/
static void
terminate_backends_by_backend_type(const char *backend_type)
{
Assert(backend_type);

const int num_backends = pgstat_fetch_stat_numbackends();
for (int curr_backend = 1; curr_backend <= num_backends; ++curr_backend)
{
const LocalPgBackendStatus *local_beentry =
pgstat_get_local_beentry_by_index_compat(curr_backend);
const PgBackendStatus *beentry = &local_beentry->backendStatus;
const char *bgw_type = GetBackgroundWorkerTypeByPid(beentry->st_procpid);
if (bgw_type && strcmp(backend_type, bgw_type) == 0)
{
int error = ts_signal_backend(beentry->st_procpid, SIGTERM);

Check warning on line 428 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L428

Added line #L428 was not covered by tests
if (error)
elog(LOG, "failed to terminate backend with pid %d", beentry->st_procpid);
}
}
}

/*
* Model this on autovacuum.c -> get_database_list.
*
* Note that we are not doing
* all the things around memory context that they do, because the hashtable
* we're using to store db entries is automatically created in its own memory
* context (a child of TopMemoryContext) This can get called at two different
* times 1) when the cluster launcher starts and is looking for dbs and 2) if
* it restarts due to a postmaster signal.
* Note that we are not doing all the things around memory context that they
* do, because the hashtable we're using to store db entries is automatically
* created in its own memory context (a child of TopMemoryContext) This can
* get called at two different times 1) when the cluster launcher starts and
* is looking for dbs and 2) if it restarts due to a postmaster signal.
*/
static void
populate_database_htab(HTAB *db_htab)
Expand Down Expand Up @@ -757,6 +856,16 @@ ts_bgw_cluster_launcher_main(PG_FUNCTION_ARGS)
db_htab = init_database_htab();
*htab_storage = db_htab;

/*
* If the launcher was restarted and discovers old schedulers, these has
* to be terminated to avoid exhausting the worker slots.
*
* We cannot easily pick up the old schedulers since we do not have access
* to the slots array in PostgreSQL, so instead we scan for something that
* looks like schedulers for databases, and kill them. New ones will then
* be spawned below.
*/
terminate_backends_by_backend_type(TS_BGW_TYPE_SCHEDULER);
populate_database_htab(db_htab);

while (true)
Expand Down
7 changes: 6 additions & 1 deletion src/loader/bgw_launcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
#include <postgres.h>
#include <fmgr.h>

extern void ts_bgw_cluster_launcher_register(void);
#define TS_BGW_TYPE_LAUNCHER "TimescaleDB Background Worker Launcher"
#define TS_BGW_TYPE_SCHEDULER "TimescaleDB Background Worker Scheduler"

extern int ts_guc_bgw_scheduler_restart_time_sec;

extern void ts_bgw_cluster_launcher_init(void);

/*called by postmaster at launcher bgw startup*/
TSDLLEXPORT extern Datum ts_bgw_cluster_launcher_main(PG_FUNCTION_ARGS);
Expand Down
2 changes: 1 addition & 1 deletion src/loader/loader.c
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ _PG_init(void)
timescaledb_shmem_request_hook();
#endif

ts_bgw_cluster_launcher_register();
ts_bgw_cluster_launcher_init();
ts_bgw_counter_setup_gucs();
ts_bgw_interface_register_api_version();

Expand Down
Loading

0 comments on commit c167a6b

Please sign in to comment.