Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart scheduler on error #7527

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions .github/gh_matrix_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@
"memoize",
}

# Tests that we do not run as part of a Flake tests
flaky_exclude_tests = {
# Not executed as a flake test since it easily exhausts available
# background worker slots.
"bgw_launcher",
}


# helper functions to generate matrix entries
# the release and apache config inherit from the
Expand Down Expand Up @@ -309,11 +316,13 @@ def macos_config(overrides):
sys.exit(1)

if tests:
flake_tests = [t for t in tests if t not in flaky_exclude_tests]
installcheck_args = f'TESTS="{" ".join(flake_tests * 20)}"'
m["include"].append(
build_debug_config(
{
"coverage": False,
"installcheck_args": f'TESTS="{" ".join(list(tests) * 20)}"',
"installcheck_args": installcheck_args,
"name": "Flaky Check Debug",
"pg": PG16_LATEST,
"pginstallcheck": False,
Expand All @@ -324,7 +333,7 @@ def macos_config(overrides):
build_debug_config(
{
"coverage": False,
"installcheck_args": f'TESTS="{" ".join(list(tests) * 20)}"',
"installcheck_args": installcheck_args,
"name": "Flaky Check Debug",
"pg": PG17_LATEST,
"pginstallcheck": False,
Expand Down
1 change: 1 addition & 0 deletions .unreleased/pr_7527
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #7527 Restart scheduler on error
14 changes: 10 additions & 4 deletions src/bgw/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -795,9 +795,11 @@ ts_bgw_scheduler_process(int32 run_for_interval_ms,
* exit. */
if (ts_guc_restoring || IsBinaryUpgrade)
{
elog(LOG,
"scheduler for database %u exiting since the database is restoring or upgrading",
MyDatabaseId);
ereport(LOG,
errmsg("scheduler for database %u exiting with exit status %d",
MyDatabaseId,
ts_debug_bgw_scheduler_exit_status),
errdetail("the database is restoring or upgrading"));
terminate_all_jobs_and_release_workers();
goto scheduler_exit;
}
Expand Down Expand Up @@ -866,7 +868,10 @@ ts_bgw_scheduler_process(int32 run_for_interval_ms,
MemoryContextReset(scratch_mctx);
}

elog(DEBUG1, "database scheduler for database %u exiting", MyDatabaseId);
elog(DEBUG1,
"scheduler for database %u exiting with exit status %d",
MyDatabaseId,
ts_debug_bgw_scheduler_exit_status);

#ifdef TS_DEBUG
if (ts_shutdown_bgw)
Expand All @@ -879,6 +884,7 @@ ts_bgw_scheduler_process(int32 run_for_interval_ms,
wait_for_all_jobs_to_shutdown();
check_for_stopped_and_timed_out_jobs();
scheduled_jobs = NIL;
proc_exit(ts_debug_bgw_scheduler_exit_status);
}

static void
Expand Down
6 changes: 6 additions & 0 deletions src/compat/compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,12 @@ RelationGetSmgr(Relation rel)
GenerationContextCreate(parent, name, blockSize)
#endif

#if PG16_GE
#define pgstat_get_local_beentry_by_index_compat(idx) pgstat_get_local_beentry_by_index(idx)
#else
#define pgstat_get_local_beentry_by_index_compat(idx) pgstat_fetch_stat_local_beentry(idx)
#endif

/*
* PG16 adds a new parameter to DefineIndex, total_parts, that takes
* in the total number of direct and indirect partitions of the relation.
Expand Down
27 changes: 27 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ bool ts_guc_debug_require_batch_sorted_merge = false;

bool ts_guc_debug_allow_cagg_with_deprecated_funcs = false;

/*
* Exit code for the scheduler.
*
* Normally it exits with a zero which means that it will not restart. If an
* error is raised, it exits with error code 1, which will trigger a
* restart.
*
* This variable exists to be able to trigger a restart for a normal exit,
* which is useful when debugging.
*
* See backend/postmaster/bgworker.c
*/
int ts_debug_bgw_scheduler_exit_status = 0;

#ifdef TS_DEBUG
bool ts_shutdown_bgw = false;
char *ts_current_timestamp_mock = NULL;
Expand Down Expand Up @@ -1067,6 +1081,19 @@ _guc_init(void)
/* assign_hook= */ NULL,
/* show_hook= */ NULL);

DefineCustomIntVariable(/* name= */ MAKE_EXTOPTION("debug_bgw_scheduler_exit_status"),
/* short_desc= */ "exit status to use when shutting down the scheduler",
/* long_desc= */ "this is for debugging purposes",
/* valueAddr= */ &ts_debug_bgw_scheduler_exit_status,
/* bootValue= */ 0,
/* minValue= */ 0,
/* maxValue= */ 255,
/* context= */ PGC_SIGHUP,
/* flags= */ 0,
/* check_hook= */ NULL,
/* assign_hook= */ NULL,
/* show_hook= */ NULL);

DefineCustomStringVariable(/* name= */ MAKE_EXTOPTION("current_timestamp_mock"),
/* short_desc= */ "set the current timestamp",
/* long_desc= */ "this is for debugging purposes",
Expand Down
8 changes: 8 additions & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ extern TSDLLEXPORT bool ts_guc_auto_sparse_indexes;
extern TSDLLEXPORT bool ts_guc_enable_columnarscan;
extern TSDLLEXPORT int ts_guc_bgw_log_level;

/*
* Exit code to use when scheduler exits.
*
* Mostly used for debugging, but defined also for non-debug builds since that
* simplifies the code (and also simplifies debugging non-debug builds).
*/
extern TSDLLEXPORT int ts_debug_bgw_scheduler_exit_status;

#ifdef TS_DEBUG
extern bool ts_shutdown_bgw;
extern char *ts_current_timestamp_mock;
Expand Down
121 changes: 111 additions & 10 deletions src/loader/bgw_launcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@

static volatile sig_atomic_t got_SIGHUP = false;

int ts_guc_bgw_scheduler_restart_time_sec = BGW_DEFAULT_RESTART_INTERVAL;

static void
launcher_sighup(SIGNAL_ARGS)
{
Expand Down Expand Up @@ -238,13 +240,27 @@
}

extern void
ts_bgw_cluster_launcher_register(void)
ts_bgw_cluster_launcher_init(void)
{
BackgroundWorker worker;

DefineCustomIntVariable(/* name= */ MAKE_EXTOPTION("bgw_scheduler_restart_time"),
/* short_desc= */ "Restart time for scheduler in seconds",
/* long_desc= */
"The number of seconds until the scheduler restart on failure.",
/* valueAddr= */ &ts_guc_bgw_scheduler_restart_time_sec,
/* bootValue= */ BGW_DEFAULT_RESTART_INTERVAL,
/* minValue= */ 1,
/* maxValue= */ 3600,
/* context= */ PGC_SIGHUP,
/* flags= */ GUC_UNIT_S,
/* check_hook= */ NULL,
/* assign_hook= */ NULL,
/* show_hook= */ NULL);

memset(&worker, 0, sizeof(worker));
/* set up worker settings for our main worker */
snprintf(worker.bgw_name, BGW_MAXLEN, "TimescaleDB Background Worker Launcher");
snprintf(worker.bgw_name, BGW_MAXLEN, TS_BGW_TYPE_LAUNCHER);
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_restart_time = BGW_LAUNCHER_RESTART_TIME_S;

Expand Down Expand Up @@ -274,9 +290,10 @@
BackgroundWorker worker;

memset(&worker, 0, sizeof(worker));
snprintf(worker.bgw_name, BGW_MAXLEN, "TimescaleDB Background Worker Scheduler");
snprintf(worker.bgw_type, BGW_MAXLEN, TS_BGW_TYPE_SCHEDULER);
snprintf(worker.bgw_name, BGW_MAXLEN, "%s for database %d", TS_BGW_TYPE_SCHEDULER, db_id);
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_restart_time = ts_guc_bgw_scheduler_restart_time_sec,
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(worker.bgw_library_name, BGW_MAXLEN, EXTENSION_NAME);
snprintf(worker.bgw_function_name, BGW_MAXLEN, BGW_ENTRYPOINT_FUNCNAME);
Expand Down Expand Up @@ -332,15 +349,89 @@
return db_he;
}

/*
* Result from signalling a backend.
*
* Error codes are non-zero, and success is zero.
*/
enum SignalBackendResult
{
SIGNAL_BACKEND_SUCCESS = 0,
SIGNAL_BACKEND_ERROR,
SIGNAL_BACKEND_NOPERMISSION,
SIGNAL_BACKEND_NOSUPERUSER,
};

/*
* Terminate a background worker.
*
* This is copied from pg_signal_backend() in
* src/backend/storage/ipc/signalfuncs.c but tweaked to not require a database
* connection since the launcher does not have one.
*/
static enum SignalBackendResult
ts_signal_backend(int pid, int sig)

Check warning on line 373 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L373

Added line #L373 was not covered by tests
{
PGPROC *proc = BackendPidGetProc(pid);

Check warning on line 375 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L375

Added line #L375 was not covered by tests

if (unlikely(proc == NULL))
{
ereport(WARNING, (errmsg("PID %d is not a PostgreSQL backend process", pid)));
return SIGNAL_BACKEND_ERROR;

Check warning on line 380 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L380

Added line #L380 was not covered by tests
}

if (unlikely(kill(pid, sig)))
{
/* Again, just a warning to allow loops */
ereport(WARNING, (errmsg("could not send signal to process %d: %m", pid)));
return SIGNAL_BACKEND_ERROR;

Check warning on line 387 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L387

Added line #L387 was not covered by tests
}

return SIGNAL_BACKEND_SUCCESS;

Check warning on line 390 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L390

Added line #L390 was not covered by tests
}

/*
* Terminate backends by backend type.
*
* We iterate through all backends and mark those that match the given backend
* type as terminated.
*
* Note that there is potentially a delay between marking backends as
* terminated and their actual termination, so the backends have to be able to
* run even if there are multiple instances accessing the same data.
*
* Parts of this code is taken from pg_stat_get_activity() in
* src/backend/utils/adt/pgstatfuncs.c.
*/
static void
terminate_backends_by_backend_type(const char *backend_type)
{
Assert(backend_type);

const int num_backends = pgstat_fetch_stat_numbackends();
for (int curr_backend = 1; curr_backend <= num_backends; ++curr_backend)
{
const LocalPgBackendStatus *local_beentry =
pgstat_get_local_beentry_by_index_compat(curr_backend);
const PgBackendStatus *beentry = &local_beentry->backendStatus;
const char *bgw_type = GetBackgroundWorkerTypeByPid(beentry->st_procpid);
if (bgw_type && strcmp(backend_type, bgw_type) == 0)
{
int error = ts_signal_backend(beentry->st_procpid, SIGTERM);

Check warning on line 420 in src/loader/bgw_launcher.c

View check run for this annotation

Codecov / codecov/patch

src/loader/bgw_launcher.c#L420

Added line #L420 was not covered by tests
if (error)
elog(LOG, "failed to terminate backend with pid %d", beentry->st_procpid);
}
}
}

/*
* Model this on autovacuum.c -> get_database_list.
*
* Note that we are not doing
* all the things around memory context that they do, because the hashtable
* we're using to store db entries is automatically created in its own memory
* context (a child of TopMemoryContext) This can get called at two different
* times 1) when the cluster launcher starts and is looking for dbs and 2) if
* it restarts due to a postmaster signal.
* Note that we are not doing all the things around memory context that they
* do, because the hashtable we're using to store db entries is automatically
* created in its own memory context (a child of TopMemoryContext) This can
* get called at two different times 1) when the cluster launcher starts and
* is looking for dbs and 2) if it restarts due to a postmaster signal.
*/
static void
populate_database_htab(HTAB *db_htab)
Expand Down Expand Up @@ -757,6 +848,16 @@
db_htab = init_database_htab();
*htab_storage = db_htab;

/*
* If the launcher was restarted and discovers old schedulers, these has
* to be terminated to avoid exhausting the worker slots.
*
* We cannot easily pick up the old schedulers since we do not have access
* to the slots array in PostgreSQL, so instead we scan for something that
* looks like schedulers for databases, and kill them. New ones will then
* be spawned below.
*/
terminate_backends_by_backend_type(TS_BGW_TYPE_SCHEDULER);
populate_database_htab(db_htab);

while (true)
Expand Down
7 changes: 6 additions & 1 deletion src/loader/bgw_launcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
#include <postgres.h>
#include <fmgr.h>

extern void ts_bgw_cluster_launcher_register(void);
#define TS_BGW_TYPE_LAUNCHER "TimescaleDB Background Worker Launcher"
#define TS_BGW_TYPE_SCHEDULER "TimescaleDB Background Worker Scheduler"

extern int ts_guc_bgw_scheduler_restart_time_sec;

extern void ts_bgw_cluster_launcher_init(void);

/*called by postmaster at launcher bgw startup*/
TSDLLEXPORT extern Datum ts_bgw_cluster_launcher_main(PG_FUNCTION_ARGS);
Expand Down
2 changes: 1 addition & 1 deletion src/loader/loader.c
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ _PG_init(void)
timescaledb_shmem_request_hook();
#endif

ts_bgw_cluster_launcher_register();
ts_bgw_cluster_launcher_init();
ts_bgw_counter_setup_gucs();
ts_bgw_interface_register_api_version();

Expand Down
Loading
Loading