diff --git a/worker.c b/worker.c index 8cfc10b..554ec5f 100644 --- a/worker.c +++ b/worker.c @@ -173,7 +173,6 @@ static void drop_replication_slots(void); static void cleanup_after_server_start(void); static void cleanup_repl_origins(void); static void cleanup_repl_slots(void); -static void start_cleanup_worker(MemoryContext task_cxt); static Snapshot build_historic_snapshot(SnapBuild *builder); static void process_task_internal(MemoryContext task_cxt); @@ -941,20 +940,18 @@ squeeze_worker_main(Datum main_arg) * replication slots and/or origins that other workers could not remove * due to server crash. Do that while holding the exclusive lock - that * also ensures that the other workers wait for the cleanup to finish - * instead of complaining about the existing slots / origins. + * before they create new slots / origins, which we might then drop + * accidentally. + * + * If no "standalone" squeeze worker performed the cleanup yet, the + * scheduler must do it now because it'll also create replication slots / + * origins. Those could be dropped by one of the new workers if that + * worker was to perform the cleanup. */ - if (!am_i_scheduler && !workerData->cleanup_done) + if (!workerData->cleanup_done) { cleanup_after_server_start(); workerData->cleanup_done = true; - - /* Are we assigned a "cleanup-only" task? */ - if (!OidIsValid(MyWorkerTask->dbid)) - { - LWLockRelease(workerData->lock); - ereport(DEBUG1, (errmsg("cleanup-only task completed"))); - goto done; - } } for (i = 0; i < workerData->nslots; i++) @@ -1073,27 +1070,12 @@ scheduler_worker_loop(void) long delay = 0L; int i; MemoryContext sched_cxt, old_cxt; - bool cleanup_done; /* Context for allocations which cannot be freed too early. */ sched_cxt = AllocSetContextCreate(TopMemoryContext, "pg_squeeze scheduler context", ALLOCSET_DEFAULT_SIZES); - /* - * This lock does not eliminate all the possible race conditions: e.g. if - * multiple schedulers (one per database) are launched at the same time, - * multiple clean-up workers can be launched. Nevertheless, it makes sense - * as the worker also uses this lock to examine and set the field. - */ - LWLockAcquire(workerData->lock, LW_EXCLUSIVE); - cleanup_done = workerData->cleanup_done; - LWLockRelease(workerData->lock); - - /* Do we need to do cleanup first? */ - if (!cleanup_done) - start_cleanup_worker(sched_cxt); - while (!got_sigterm) { StringInfoData query; @@ -1863,78 +1845,6 @@ cleanup_repl_slots(void) } } -static void -start_cleanup_worker(MemoryContext task_cxt) -{ - WorkerTask *task; - bool task_exists; - int task_idx; - NameData dummy_name; - SqueezeWorker *worker; - MemoryContext old_cxt; - bool registered; - - /* - * Create a worker to perform the initial cleanup. - * - * We must be sure that the cleanup has finished before we start to create - * replication slots for other workers, otherwise the "cleanup worker" - * could drop them too. - */ - squeezeWorkerCount = 1; - squeezeWorkers = (SqueezeWorker *) MemoryContextAllocZero(task_cxt, - squeezeWorkerCount * - sizeof(SqueezeWorker)); - task = get_unused_task(InvalidOid, NULL, NULL, &task_idx, &task_exists); - Assert(!task_exists); - if (task == NULL) - /* - * This is unlikely to happen, but possible if too many "standalone" - * workers have been started after our check of the 'cleanup_done' - * flag. - */ - ereport(ERROR, - (errmsg("the task queue is currently full"))); - - /* - * No specific information needed here. Setting dummy values explicitly - * seem a good practice though. - */ - NameStr(dummy_name)[0] = '\0'; - initialize_worker_task(task, -1, &dummy_name, &dummy_name, NULL, - false, false, 0); - - worker = squeezeWorkers; - StartTransactionCommand(); - /* - * The handle (and possibly other allocations) must survive the current - * transaction. - */ - old_cxt = MemoryContextSwitchTo(task_cxt); - registered = start_worker_internal(false, task_idx, &worker->handle); - MemoryContextSwitchTo(old_cxt); - if (!registered) - { - /* - * The worker could not even get registered, so it won't set its - * status to WTS_UNUSED. Make sure the task does not leak. - */ - release_task(worker->task); - - ereport(ERROR, - (errmsg("squeeze worker could not start")), - (errhint("consider increasing \"max_worker_processes\" or decreasing \"squeeze.workers_per_database\""))); - - } - CommitTransactionCommand(); - - /* Wait until the cleanup is done. */ - cleanup_workers_and_tasks(false); - - if (!workerData->cleanup_done) - ereport(ERROR, (errmsg("failed to perform the initial cleanup"))); -} - /* * Wrapper for SnapBuildInitialSnapshot(). *