Skip to content

Commit

Permalink
boot: replay in subprocess (#654)
Browse files Browse the repository at this point in the history
This PR:
1. Adds the `--serf-bin` option for specifying a serf executable
2. Sets `u3_Host.wrk_c` to `--serf-bin`'s value if specified, self
otherwise
3. Performs boot's replay in a subprocess that spawns our own serf (by
default) or the specified serf if provided
4. Spawns a thread in `_cw_play` that exits if it reads `EOF` from
`STDIN` (thereby ensuring the child process for replaying events exits
whenever the parent is killed)

These changes allow Ares (or any other runtime) to implement its own
`play` subcommand and have Vere use it for the boot process's replay
functionality.

Tests:
- [x] `linux-x86_64` subcommand, replay via boot, killing king via INT,
KILL, TERM
- [x] `macos-aarch64` subcommand, replay via boot, killing king via INT,
KILL, TERM
  • Loading branch information
pkova authored Jun 24, 2024
2 parents 2080cfb + a819801 commit 81b31ca
Showing 1 changed file with 130 additions and 8 deletions.
138 changes: 130 additions & 8 deletions pkg/vere/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "db/lmdb.h"
#include "getopt.h"
#include "libgen.h"
#include "pthread.h"
#include "spawn.h"

#include "ca_bundle.h"
#include "pace.h"
Expand Down Expand Up @@ -299,12 +301,13 @@ _main_getopt(c3_i argc, c3_c** argv)
{ "prop-url", required_argument, NULL, 2 },
{ "prop-name", required_argument, NULL, 3 },
//
{ "urth-loom", required_argument, NULL, 5 },
{ "no-demand", no_argument, NULL, 6 },
{ "swap", no_argument, NULL, 7 },
{ "swap-to", required_argument, NULL, 8 },
{ "toss", required_argument, NULL, 9 },
{ "urth-loom", required_argument, NULL, 5 },
{ "no-demand", no_argument, NULL, 6 },
{ "swap", no_argument, NULL, 7 },
{ "swap-to", required_argument, NULL, 8 },
{ "toss", required_argument, NULL, 9 },
{ "behn-allow-blocked", no_argument, NULL, 10 },
{ "serf-bin", required_argument, NULL, 11 },
{ "lmdb-map-size", required_argument, NULL, 12 },
//
{ NULL, 0, NULL, 0 },
Expand Down Expand Up @@ -349,10 +352,15 @@ _main_getopt(c3_i argc, c3_c** argv)
u3_Host.ops_u.beb = c3y;
break;
}
case 11: { // serf-bin
u3_Host.wrk_c = strdup(optarg);
break;
}
case 12: { // lmdb-map-size
if ( 1 != sscanf(optarg, "%" SCNuMAX, &u3_Host.ops_u.siz_i) ) {
return c3n;
}

break;
}
// special args
Expand Down Expand Up @@ -2370,6 +2378,113 @@ _cw_play_impl(c3_d eve_d, c3_d sap_d, c3_o mel_o, c3_o sof_o, c3_o ful_o)
return pay_d;
}

/* _cw_play_fork_heed(): wait for EOF on STDIN or until canceled.
*/
void* _cw_play_fork_heed(void* arg) {
c3_c buf[1];
c3_zs red;

do {
pthread_testcancel();
red = read(STDIN_FILENO, buf, sizeof(buf));
if ( 0 == red ) {
fprintf(stderr, "play: god save the king! committing sudoku...\r\n");
exit(1);
}
} while ( 0 < red );

return NULL;
}

/* _cw_play_fork(): spawn a subprocess for event replay.
*/
static c3_i
_cw_play_fork(c3_d eve_d, c3_d sap_d, c3_o mel_o, c3_o sof_o, c3_o ful_o)
{
// prepare args
//
c3_c eve_c[21], sap_c[21] = { 0 };
if ( 0 > sprintf(eve_c, "%" PRIu64, eve_d) ||
0 > sprintf(sap_c, "%" PRIu64, sap_d) )
{
fprintf(stderr, "play: error parsing args\r\n");
return 1;
}

c3_c *argv[11] = {
u3_Host.wrk_c,
"play",
u3_Host.dir_c,
"--replay-to",
eve_c,
"--snap-at",
sap_c,
};

c3_z i = 7;
if _(mel_o) {
argv[i++] = "--auto-meld";
}
if _(sof_o) {
argv[i++] = "--soft-mugs";
}
if _(ful_o) {
argv[i++] = "--full";
}
argv[i] = NULL;

// prepare a pipe for ipc with the subprocess
//
c3_i pipefd[2];
if ( 0 != pipe(pipefd) ) {
fprintf(stderr, "play: failed to open pipe\r\n");
return 1;
}

// set the child process' stdin to read from the pipe
//
posix_spawn_file_actions_t action;
posix_spawn_file_actions_init(&action);
posix_spawn_file_actions_addclose(&action, pipefd[1]);
posix_spawn_file_actions_adddup2(&action, pipefd[0], STDIN_FILENO);

// spawn a new serf process and call its play subcommand
//
pid_t pid;
if ( 0 != posix_spawn(&pid, u3_Host.wrk_c, &action, 0, argv, 0) ) {
fprintf(stderr, "play: posix_spawn: %d\r\n", errno);
return 1;
}

// close the read end of the pipe in the parent
//
close(pipefd[0]);

// wait for the child to exit
//
c3_i sat_i;
if ( -1 == waitpid(pid, &sat_i, 0) ) {
fprintf(stderr, "play: waitpid: %d\r\n", errno);
return 1;
}

if ( WIFEXITED(sat_i) ) {
c3_i ret_i = WEXITSTATUS(sat_i);
if ( 0 != ret_i ) {
fprintf(stderr, "play: exited with %d\r\n", ret_i);
}
return ret_i;
}
else if ( WIFSIGNALED(sat_i) ) {
fprintf(stderr, "play: terminated by signal %d\r\n", WTERMSIG(sat_i));
return 1;
}
else {
fprintf(stderr, "play: strange termination\r\n");
return 1;
}
}

/* _cw_play(): replay events, but better.
*/
static void
Expand Down Expand Up @@ -2462,9 +2577,14 @@ _cw_play(c3_i argc, c3_c* argv[])
exit(1);
}

pthread_t ted;
pthread_create(&ted, NULL, _cw_play_fork_heed, NULL);

if ( !_cw_play_impl(eve_d, sap_d, mel_o, sof_o, ful_o) ) {
fprintf(stderr, "mars: nothing to do!\r\n");
}

pthread_cancel(ted);
}

/* _cw_prep(): prepare for upgrade
Expand Down Expand Up @@ -3037,8 +3157,6 @@ main(c3_i argc,

_main_self_path();

// XX add argument
//
if ( !u3_Host.wrk_c ) {
u3_Host.wrk_c = bin_c;
}
Expand Down Expand Up @@ -3181,7 +3299,11 @@ main(c3_i argc,
// we need the current snapshot's latest event number to
// validate whether we can execute disk migration
if ( u3_Host.ops_u.nuu == c3n ) {
_cw_play_impl(0, 0, c3n, c3n, c3n);
c3_i sat_i = _cw_play_fork(0, 0, c3n, c3n, c3n);
if ( sat_i ) {
fprintf(stderr, "play: replay failed: %d\r\n", sat_i);
exit(sat_i);
}
signal(SIGTSTP, _stop_exit);
// XX unmap loom, else parts of the snapshot could be left in memory
}
Expand Down

0 comments on commit 81b31ca

Please sign in to comment.