Skip to content

Commit

Permalink
Fix a SEGFAULT that was hidden by our waitpid() calls.
Browse files Browse the repository at this point in the history
When a subprocess terminates with a successful return code, it might still
have been terminated by a signal, one signal would be SIGSEGV. Arrange our
code to report when that happens.

This happened in initialisation of the streaming module when trying to call
setvbuf on a un-assigned file descriptor. This is fixed in follow.c when
preparing the call.
  • Loading branch information
dimitri committed Aug 9, 2023
1 parent 988dfec commit 6d1b8d2
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 53 deletions.
17 changes: 9 additions & 8 deletions src/bin/pgcopydb/cli_clone_follow.c
Original file line number Diff line number Diff line change
Expand Up @@ -634,20 +634,20 @@ start_follow_process(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs,
* been updated from a previous run of the command, and we might have
* nothing to catch-up to when e.g. the endpos was reached already.
*/
CopyDBSentinel sentinel = { 0 };
CopyDBSentinel *sentinel = &(streamSpecs->sentinel);

if (!follow_init_sentinel(streamSpecs, &sentinel))
if (!follow_init_sentinel(streamSpecs, sentinel))
{
/* errors have already been logged */
log_error("Failed to initialise sentinel, see above for details");
return false;
}

if (sentinel.endpos != InvalidXLogRecPtr &&
sentinel.endpos <= sentinel.replay_lsn)
if (sentinel->endpos != InvalidXLogRecPtr &&
sentinel->endpos <= sentinel->replay_lsn)
{
log_info("Current endpos %X/%X was previously reached at %X/%X",
LSN_FORMAT_ARGS(sentinel.endpos),
LSN_FORMAT_ARGS(sentinel.replay_lsn));
LSN_FORMAT_ARGS(sentinel->endpos),
LSN_FORMAT_ARGS(sentinel->replay_lsn));

return true;
}
Expand Down Expand Up @@ -698,6 +698,7 @@ cli_clone_follow_wait_subprocess(const char *name, pid_t pid)
{
bool exited = false;
int returnCode = -1;
int sig = 0;

if (pid < 0)
{
Expand All @@ -707,7 +708,7 @@ cli_clone_follow_wait_subprocess(const char *name, pid_t pid)

while (!exited)
{
if (!follow_wait_pid(pid, &exited, &returnCode))
if (!follow_wait_pid(pid, &exited, &returnCode, &sig))
{
/* errors have already been logged */
return false;
Expand Down
135 changes: 110 additions & 25 deletions src/bin/pgcopydb/follow.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel)
LSN_FORMAT_ARGS(sentinel->replay_lsn),
LSN_FORMAT_ARGS(sentinel->endpos));
}
else
else if (sentinel->replay_lsn != InvalidXLogRecPtr)
{
log_info("Current sentinel replay_lsn is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn));
Expand Down Expand Up @@ -285,7 +285,8 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
{
if (!followDB(copySpecs, streamSpecs))
{
/* errors have already been logged */
log_error("Failed to follow changes from source, "
"see above for details");
return false;
}

Expand All @@ -305,6 +306,11 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)

if (done)
{
log_info("Follow mode is now done, "
"reached replay_lsn %X/%X with endpos %X/%X",
LSN_FORMAT_ARGS(streamSpecs->sentinel.endpos),
LSN_FORMAT_ARGS(streamSpecs->sentinel.replay_lsn));

return true;
}

Expand Down Expand Up @@ -345,13 +351,20 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)

if (done)
{
log_info("Follow mode is now done, "
"reached replay_lsn %X/%X with endpos %X/%X",
LSN_FORMAT_ARGS(streamSpecs->sentinel.endpos),
LSN_FORMAT_ARGS(streamSpecs->sentinel.replay_lsn));

return true;
}

log_info("Restarting logical decoding follower in %s mode",
LogicalStreamModeToString(currentMode));
}

/* keep compiler happy */
log_warn("BUG: follow_main_loop reached out of loop");
return true;
}

Expand All @@ -362,23 +375,23 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
bool
follow_reached_endpos(StreamSpecs *streamSpecs, bool *done)
{
CopyDBSentinel sentinel = { 0 };
CopyDBSentinel *sentinel = &(streamSpecs->sentinel);

if (!follow_get_sentinel(streamSpecs, &sentinel))
if (!follow_get_sentinel(streamSpecs, sentinel))
{
/* errors have already been logged */
log_error("Failed to get sentinel values");
return false;
}

if (sentinel.endpos != InvalidXLogRecPtr &&
sentinel.endpos <= sentinel.replay_lsn)
if (sentinel->endpos != InvalidXLogRecPtr &&
sentinel->endpos <= sentinel->replay_lsn)
{
/* follow_get_sentinel logs replay_lsn and endpos already */
*done = true;

log_info("Current endpos %X/%X has been reached at %X/%X",
LSN_FORMAT_ARGS(sentinel.endpos),
LSN_FORMAT_ARGS(sentinel.replay_lsn));
LSN_FORMAT_ARGS(sentinel->endpos),
LSN_FORMAT_ARGS(sentinel->replay_lsn));
}

return true;
Expand Down Expand Up @@ -432,7 +445,8 @@ follow_prepare_mode_switch(StreamSpecs *streamSpecs,

if (!stream_transform_from_queue(streamSpecs))
{
/* errors have already been logged */
log_error("Failed to process messages from the transform queue, "
"see above for details");
return false;
}
}
Expand All @@ -441,7 +455,8 @@ follow_prepare_mode_switch(StreamSpecs *streamSpecs,
/* then catch-up with what's been stream and transformed already */
if (!stream_apply_catchup(streamSpecs))
{
/* errors have already been logged */
log_error("Failed to catchup with on-disk files, "
"see above for details");
return false;
}

Expand All @@ -468,6 +483,19 @@ followDB(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
return false;
}

/*
* Before starting sub-processes, clean-up intermediate files from previous
* round. Here that's the stream context with WAL segment size and timeline
* history, which are fetched from the source server to compute WAL file
* names. The current timeline can only change at a server restart or a
* failover, both with trigger a reconnect.
*/
if (!stream_cleanup_context(streamSpecs))
{
/* errors have already been logged */
return false;
}

/*
* Prepare the sub-process communication mechanisms, when needed:
*
Expand Down Expand Up @@ -568,25 +596,35 @@ follow_start_prefetch(StreamSpecs *specs)
if (specs->mode == STREAM_MODE_REPLAY)
{
/* arrange to write to the receive-transform pipe */
specs->stdOut = true;
specs->out = fdopen(specs->pipe_rt[1], "a");

/* close pipe ends we're not using */
close_fd_or_exit(specs->pipe_rt[0]);
close_fd_or_exit(specs->pipe_ta[0]);
close_fd_or_exit(specs->pipe_ta[1]);
}

if (!stream_cleanup_context(specs))
{
/* errors have already been logged */
return false;
}
/* switch out stream from block buffered to line buffered mode */
if (setvbuf(specs->out, NULL, _IOLBF, 0) != 0)
{
log_error("Failed to set out stream to line buffered mode: %m");
return false;
}

bool success = startLogicalStreaming(specs);
bool success = startLogicalStreaming(specs);

close_fd_or_exit(specs->pipe_rt[1]);
close_fd_or_exit(specs->pipe_rt[1]);

return success;
return success;
}
else
{
specs->stdOut = false;

return startLogicalStreaming(specs);
}

return true;
}


Expand All @@ -609,13 +647,23 @@ follow_start_transform(StreamSpecs *specs)
* Arrange to read from receive-transform pipe and write to the
* transform-apply pipe.
*/
specs->stdIn = true;
specs->stdOut = true;

specs->in = fdopen(specs->pipe_rt[0], "r");
specs->out = fdopen(specs->pipe_ta[1], "a");

/* close pipe ends we're not using */
close_fd_or_exit(specs->pipe_rt[1]);
close_fd_or_exit(specs->pipe_ta[0]);

/* switch out stream from block buffered to line buffered mode */
if (setvbuf(specs->out, NULL, _IOLBF, 0) != 0)
{
log_error("Failed to set out stream to line buffered mode: %m");
return false;
}

bool success = stream_transform_stream(specs);

close_fd_or_exit(specs->pipe_rt[0]);
Expand All @@ -631,6 +679,9 @@ follow_start_transform(StreamSpecs *specs)
* internal message queue and batch processes one file at a
* time.
*/
specs->stdIn = false;
specs->stdOut = false;

return stream_transform_worker(specs);
}

Expand All @@ -651,6 +702,7 @@ follow_start_catchup(StreamSpecs *specs)
if (specs->mode == STREAM_MODE_REPLAY)
{
/* arrange to read from the transform-apply pipe */
specs->stdIn = true;
specs->in = fdopen(specs->pipe_ta[0], "r");

/* close pipe ends we're not using */
Expand All @@ -672,6 +724,8 @@ follow_start_catchup(StreamSpecs *specs)
* current LSN on the target database origin tracking system to
* open the right SQL file and apply statements from there.
*/
specs->stdIn = false;

return stream_apply_catchup(specs);
}

Expand Down Expand Up @@ -800,7 +854,8 @@ follow_wait_subprocesses(StreamSpecs *specs)
/* follow_wait_pid is non-blocking: uses WNOHANG */
if (!follow_wait_pid(processArray[i]->pid,
&(processArray[i]->exited),
&(processArray[i]->returnCode)))
&(processArray[i]->returnCode),
&(processArray[i]->sig)))
{
/* errors have already been logged */
return false;
Expand All @@ -815,13 +870,33 @@ follow_wait_subprocesses(StreamSpecs *specs)

if (processArray[i]->returnCode == 0)
{
sformat(details, sizeof(details), "successfully");
if (processArray[i]->sig == 0)
{
sformat(details, sizeof(details), "successfully");
}
else
{
sformat(details, sizeof(details),
"successfully after signal %s",
signal_to_string(processArray[i]->sig));
}
}
else
{
logLevel = LOG_ERROR;
sformat(details, sizeof(details), "with error code %d",
processArray[i]->returnCode);

if (processArray[i]->sig == 0)
{
sformat(details, sizeof(details), "with error code %d",
processArray[i]->returnCode);
}
else
{
sformat(details, sizeof(details),
"with error code %d and signal %s",
processArray[i]->returnCode,
signal_to_string(processArray[i]->sig));
}
}

log_level(logLevel,
Expand Down Expand Up @@ -913,7 +988,7 @@ follow_terminate_subprocesses(StreamSpecs *specs)
* follow_wait_pid waits for a given known sub-process.
*/
bool
follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode)
follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode, int *sig)
{
int status = 0;

Expand All @@ -932,7 +1007,9 @@ follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode)
if (errno == ECHILD)
{
/* no more childrens */
*sig = 0;
*exited = true;
*returnCode = -1;
return true;
}
else
Expand All @@ -950,7 +1027,9 @@ follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode)
* We're using WNOHANG, 0 means there are no stopped or
* exited children, it's all good.
*/
*sig = 0;
*exited = false;
*returnCode = -1;
break;
}

Expand All @@ -963,9 +1042,15 @@ follow_wait_pid(pid_t subprocess, bool *exited, int *returnCode)
return false;
}

*sig = 0;
*exited = true;
*returnCode = WEXITSTATUS(status);

if (WIFSIGNALED(status))
{
*sig = WTERMSIG(status);
}

break;
}
}
Expand Down
12 changes: 1 addition & 11 deletions src/bin/pgcopydb/ld_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,16 +404,6 @@ startLogicalStreaming(StreamSpecs *specs)
StreamContext *privateContext = &(specs->private);
context.private = (void *) privateContext;

if (specs->stdOut)
{
/* switch stdout from block buffered to line buffered mode */
if (setvbuf(specs->out, NULL, _IOLBF, 0) != 0)
{
log_error("Failed to set stdout to line buffered mode: %m");
return false;
}
}

log_notice("Connecting to logical decoding replication stream");

/*
Expand Down Expand Up @@ -2474,7 +2464,7 @@ stream_create_sentinel(CopyDataSpec *copySpecs,
if (!pgsql_execute(pgsql, sql[i]))
{
/* errors have already been logged */
exit(EXIT_CODE_SOURCE);
return false;
}
}

Expand Down
Loading

0 comments on commit 6d1b8d2

Please sign in to comment.