From 6d1b8d2f3542e681a2c244ef693b459d664615d1 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Wed, 9 Aug 2023 16:56:13 +0200 Subject: [PATCH] Fix a SEGFAULT that was hidden by our waitpid() calls. When a subprocess terminates with a successful return code, it might still have been terminated by a signal, one signal would be SIGSEGV. Arrange our code to report when that happens. This happened in initialisation of the streaming module when trying to call setvbuf on a un-assigned file descriptor. This is fixed in follow.c when preparing the call. --- src/bin/pgcopydb/cli_clone_follow.c | 17 ++-- src/bin/pgcopydb/follow.c | 135 ++++++++++++++++++++++------ src/bin/pgcopydb/ld_stream.c | 12 +-- src/bin/pgcopydb/ld_stream.h | 4 +- src/bin/pgcopydb/ld_transform.c | 7 -- src/bin/pgcopydb/signals.c | 5 +- 6 files changed, 127 insertions(+), 53 deletions(-) diff --git a/src/bin/pgcopydb/cli_clone_follow.c b/src/bin/pgcopydb/cli_clone_follow.c index 97a77dc00..1e3a3f9ec 100644 --- a/src/bin/pgcopydb/cli_clone_follow.c +++ b/src/bin/pgcopydb/cli_clone_follow.c @@ -634,20 +634,20 @@ start_follow_process(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs, * been updated from a previous run of the command, and we might have * nothing to catch-up to when e.g. the endpos was reached already. */ - CopyDBSentinel sentinel = { 0 }; + CopyDBSentinel *sentinel = &(streamSpecs->sentinel); - if (!follow_init_sentinel(streamSpecs, &sentinel)) + if (!follow_init_sentinel(streamSpecs, sentinel)) { - /* errors have already been logged */ + log_error("Failed to initialise sentinel, see above for details"); return false; } - if (sentinel.endpos != InvalidXLogRecPtr && - sentinel.endpos <= sentinel.replay_lsn) + if (sentinel->endpos != InvalidXLogRecPtr && + sentinel->endpos <= sentinel->replay_lsn) { log_info("Current endpos %X/%X was previously reached at %X/%X", - LSN_FORMAT_ARGS(sentinel.endpos), - LSN_FORMAT_ARGS(sentinel.replay_lsn)); + LSN_FORMAT_ARGS(sentinel->endpos), + LSN_FORMAT_ARGS(sentinel->replay_lsn)); return true; } @@ -698,6 +698,7 @@ cli_clone_follow_wait_subprocess(const char *name, pid_t pid) { bool exited = false; int returnCode = -1; + int sig = 0; if (pid < 0) { @@ -707,7 +708,7 @@ cli_clone_follow_wait_subprocess(const char *name, pid_t pid) while (!exited) { - if (!follow_wait_pid(pid, &exited, &returnCode)) + if (!follow_wait_pid(pid, &exited, &returnCode, &sig)) { /* errors have already been logged */ return false; diff --git a/src/bin/pgcopydb/follow.c b/src/bin/pgcopydb/follow.c index 68ed851f5..b616e1a1a 100644 --- a/src/bin/pgcopydb/follow.c +++ b/src/bin/pgcopydb/follow.c @@ -222,7 +222,7 @@ follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel) LSN_FORMAT_ARGS(sentinel->replay_lsn), LSN_FORMAT_ARGS(sentinel->endpos)); } - else + else if (sentinel->replay_lsn != InvalidXLogRecPtr) { log_info("Current sentinel replay_lsn is %X/%X", LSN_FORMAT_ARGS(sentinel->replay_lsn)); @@ -285,7 +285,8 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs) { if (!followDB(copySpecs, streamSpecs)) { - /* errors have already been logged */ + log_error("Failed to follow changes from source, " + "see above for details"); return false; } @@ -305,6 +306,11 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs) if (done) { + log_info("Follow mode is now done, " + "reached replay_lsn %X/%X with endpos %X/%X", + LSN_FORMAT_ARGS(streamSpecs->sentinel.endpos), + LSN_FORMAT_ARGS(streamSpecs->sentinel.replay_lsn)); + return true; } @@ -345,6 +351,11 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs) if (done) { + log_info("Follow mode is now done, " + "reached replay_lsn %X/%X with endpos %X/%X", + LSN_FORMAT_ARGS(streamSpecs->sentinel.endpos), + LSN_FORMAT_ARGS(streamSpecs->sentinel.replay_lsn)); + return true; } @@ -352,6 +363,8 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs) LogicalStreamModeToString(currentMode)); } + /* keep compiler happy */ + log_warn("BUG: follow_main_loop reached out of loop"); return true; } @@ -362,23 +375,23 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs) bool follow_reached_endpos(StreamSpecs *streamSpecs, bool *done) { - CopyDBSentinel sentinel = { 0 }; + CopyDBSentinel *sentinel = &(streamSpecs->sentinel); - if (!follow_get_sentinel(streamSpecs, &sentinel)) + if (!follow_get_sentinel(streamSpecs, sentinel)) { - /* errors have already been logged */ + log_error("Failed to get sentinel values"); return false; } - if (sentinel.endpos != InvalidXLogRecPtr && - sentinel.endpos <= sentinel.replay_lsn) + if (sentinel->endpos != InvalidXLogRecPtr && + sentinel->endpos <= sentinel->replay_lsn) { /* follow_get_sentinel logs replay_lsn and endpos already */ *done = true; log_info("Current endpos %X/%X has been reached at %X/%X", - LSN_FORMAT_ARGS(sentinel.endpos), - LSN_FORMAT_ARGS(sentinel.replay_lsn)); + LSN_FORMAT_ARGS(sentinel->endpos), + LSN_FORMAT_ARGS(sentinel->replay_lsn)); } return true; @@ -432,7 +445,8 @@ follow_prepare_mode_switch(StreamSpecs *streamSpecs, if (!stream_transform_from_queue(streamSpecs)) { - /* errors have already been logged */ + log_error("Failed to process messages from the transform queue, " + "see above for details"); return false; } } @@ -441,7 +455,8 @@ follow_prepare_mode_switch(StreamSpecs *streamSpecs, /* then catch-up with what's been stream and transformed already */ if (!stream_apply_catchup(streamSpecs)) { - /* errors have already been logged */ + log_error("Failed to catchup with on-disk files, " + "see above for details"); return false; } @@ -468,6 +483,19 @@ followDB(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs) return false; } + /* + * Before starting sub-processes, clean-up intermediate files from previous + * round. Here that's the stream context with WAL segment size and timeline + * history, which are fetched from the source server to compute WAL file + * names. The current timeline can only change at a server restart or a + * failover, both with trigger a reconnect. + */ + if (!stream_cleanup_context(streamSpecs)) + { + /* errors have already been logged */ + return false; + } + /* * Prepare the sub-process communication mechanisms, when needed: * @@ -568,25 +596,35 @@ follow_start_prefetch(StreamSpecs *specs) if (specs->mode == STREAM_MODE_REPLAY) { /* arrange to write to the receive-transform pipe */ + specs->stdOut = true; specs->out = fdopen(specs->pipe_rt[1], "a"); /* close pipe ends we're not using */ close_fd_or_exit(specs->pipe_rt[0]); close_fd_or_exit(specs->pipe_ta[0]); close_fd_or_exit(specs->pipe_ta[1]); - } - if (!stream_cleanup_context(specs)) - { - /* errors have already been logged */ - return false; - } + /* switch out stream from block buffered to line buffered mode */ + if (setvbuf(specs->out, NULL, _IOLBF, 0) != 0) + { + log_error("Failed to set out stream to line buffered mode: %m"); + return false; + } - bool success = startLogicalStreaming(specs); + bool success = startLogicalStreaming(specs); - close_fd_or_exit(specs->pipe_rt[1]); + close_fd_or_exit(specs->pipe_rt[1]); - return success; + return success; + } + else + { + specs->stdOut = false; + + return startLogicalStreaming(specs); + } + + return true; } @@ -609,6 +647,9 @@ follow_start_transform(StreamSpecs *specs) * Arrange to read from receive-transform pipe and write to the * transform-apply pipe. */ + specs->stdIn = true; + specs->stdOut = true; + specs->in = fdopen(specs->pipe_rt[0], "r"); specs->out = fdopen(specs->pipe_ta[1], "a"); @@ -616,6 +657,13 @@ follow_start_transform(StreamSpecs *specs) close_fd_or_exit(specs->pipe_rt[1]); close_fd_or_exit(specs->pipe_ta[0]); + /* switch out stream from block buffered to line buffered mode */ + if (setvbuf(specs->out, NULL, _IOLBF, 0) != 0) + { + log_error("Failed to set out stream to line buffered mode: %m"); + return false; + } + bool success = stream_transform_stream(specs); close_fd_or_exit(specs->pipe_rt[0]); @@ -631,6 +679,9 @@ follow_start_transform(StreamSpecs *specs) * internal message queue and batch processes one file at a * time. */ + specs->stdIn = false; + specs->stdOut = false; + return stream_transform_worker(specs); } @@ -651,6 +702,7 @@ follow_start_catchup(StreamSpecs *specs) if (specs->mode == STREAM_MODE_REPLAY) { /* arrange to read from the transform-apply pipe */ + specs->stdIn = true; specs->in = fdopen(specs->pipe_ta[0], "r"); /* close pipe ends we're not using */ @@ -672,6 +724,8 @@ follow_start_catchup(StreamSpecs *specs) * current LSN on the target database origin tracking system to * open the right SQL file and apply statements from there. */ + specs->stdIn = false; + return stream_apply_catchup(specs); } @@ -800,7 +854,8 @@ follow_wait_subprocesses(StreamSpecs *specs) /* follow_wait_pid is non-blocking: uses WNOHANG */ if (!follow_wait_pid(processArray[i]->pid, &(processArray[i]->exited), - &(processArray[i]->returnCode))) + &(processArray[i]->returnCode), + &(processArray[i]->sig))) { /* errors have already been logged */ return false; @@ -815,13 +870,33 @@ follow_wait_subprocesses(StreamSpecs *specs) if (processArray[i]->returnCode == 0) { - sformat(details, sizeof(details), "successfully"); + if (processArray[i]->sig == 0) + { + sformat(details, sizeof(details), "successfully"); + } + else + { + sformat(details, sizeof(details), + "successfully after signal %s", + signal_to_string(processArray[i]->sig)); + } } else { logLevel = LOG_ERROR; - sformat(details, sizeof(details), "with error code %d", - processArray[i]->returnCode); + + if (processArray[i]->sig == 0) + { + sformat(details, sizeof(details), "with error code %d", + processArray[i]->returnCode); + } + else + { + sformat(details, sizeof(details), + "with error code %d and signal %s", + processArray[i]->returnCode, + signal_to_string(processArray[i]->sig)); + } } log_level(logLevel, @@ -913,7 +988,7 @@ follow_terminate_subprocesses(StreamSpecs *specs) * follow_wait_pid waits for a given known sub-process. */ bool -follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode) +follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode, int *sig) { int status = 0; @@ -932,7 +1007,9 @@ follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode) if (errno == ECHILD) { /* no more childrens */ + *sig = 0; *exited = true; + *returnCode = -1; return true; } else @@ -950,7 +1027,9 @@ follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode) * We're using WNOHANG, 0 means there are no stopped or * exited children, it's all good. */ + *sig = 0; *exited = false; + *returnCode = -1; break; } @@ -963,9 +1042,15 @@ follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode) return false; } + *sig = 0; *exited = true; *returnCode = WEXITSTATUS(status); + if (WIFSIGNALED(status)) + { + *sig = WTERMSIG(status); + } + break; } } diff --git a/src/bin/pgcopydb/ld_stream.c b/src/bin/pgcopydb/ld_stream.c index de82fb0d3..8c3f47895 100644 --- a/src/bin/pgcopydb/ld_stream.c +++ b/src/bin/pgcopydb/ld_stream.c @@ -404,16 +404,6 @@ startLogicalStreaming(StreamSpecs *specs) StreamContext *privateContext = &(specs->private); context.private = (void *) privateContext; - if (specs->stdOut) - { - /* switch stdout from block buffered to line buffered mode */ - if (setvbuf(specs->out, NULL, _IOLBF, 0) != 0) - { - log_error("Failed to set stdout to line buffered mode: %m"); - return false; - } - } - log_notice("Connecting to logical decoding replication stream"); /* @@ -2474,7 +2464,7 @@ stream_create_sentinel(CopyDataSpec *copySpecs, if (!pgsql_execute(pgsql, sql[i])) { /* errors have already been logged */ - exit(EXIT_CODE_SOURCE); + return false; } } diff --git a/src/bin/pgcopydb/ld_stream.h b/src/bin/pgcopydb/ld_stream.h index 6c03176c9..8711f9372 100644 --- a/src/bin/pgcopydb/ld_stream.h +++ b/src/bin/pgcopydb/ld_stream.h @@ -395,6 +395,7 @@ typedef struct FollowSubProcess pid_t pid; bool exited; int returnCode; + int sig; } FollowSubProcess; @@ -418,6 +419,7 @@ struct StreamSpecs uint64_t startpos; uint64_t endpos; + CopyDBSentinel sentinel; bool startposComputedFromJSON; StreamAction startposActionFromJSON; @@ -682,6 +684,6 @@ void follow_exit_early(StreamSpecs *specs); bool follow_wait_subprocesses(StreamSpecs *specs); bool follow_terminate_subprocesses(StreamSpecs *specs); -bool follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode); +bool follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode, int *sig); #endif /* LD_STREAM_H */ diff --git a/src/bin/pgcopydb/ld_transform.c b/src/bin/pgcopydb/ld_transform.c index b4bcdf379..3980530ea 100644 --- a/src/bin/pgcopydb/ld_transform.c +++ b/src/bin/pgcopydb/ld_transform.c @@ -95,13 +95,6 @@ stream_transform_stream(StreamSpecs *specs) .ctx = &ctx }; - /* switch out stream from block buffered to line buffered mode */ - if (setvbuf(privateContext->out, NULL, _IOLBF, 0) != 0) - { - log_error("Failed to set stdout to line buffered mode: %m"); - exit(EXIT_CODE_INTERNAL_ERROR); - } - if (!read_from_stream(privateContext->in, &context)) { log_error("Failed to transform JSON messages from input stream, " diff --git a/src/bin/pgcopydb/signals.c b/src/bin/pgcopydb/signals.c index 2a4ec9a14..654ec4b7c 100644 --- a/src/bin/pgcopydb/signals.c +++ b/src/bin/pgcopydb/signals.c @@ -165,6 +165,7 @@ void catch_quit_and_exit(int sig) { /* default signal handler disposition is to core dump, we don't */ + log_warn("SIGQUIT"); exit(EXIT_CODE_QUIT); } @@ -265,6 +266,8 @@ signal_to_string(int signal) } default: - return "unknown signal"; + { + return strsignal(signal); + } } }