Skip to content

Commit

Permalink
compute_ctl: Break up main() into discrete phases
Browse files Browse the repository at this point in the history
This commit is intentionally designed to have as small a diff as
possible. To that end, the basic idea is that each distinct "chunk" of
the previous main() has been wrapped in its own function, with the
return values from each function being passed directly into the next.

The structure of main() is now visible from its contents:

  1. init()
  2. process_cli()
  3. wait_spec()
  4. start_postgres()
  5. wait_postgres()
  6. cleanup_and_exit()

There's a lot of other work that can / should(?) be done beyond this,
but I figure that's more opinionated, and this should be a solid start.
  • Loading branch information
sharnoff committed May 1, 2024
1 parent f9c2945 commit c947243
Showing 1 changed file with 132 additions and 2 deletions.
134 changes: 132 additions & 2 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use tracing::{error, info};
use url::Url;

use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeSpec;

use compute_tools::compute::{
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
Expand All @@ -68,6 +69,20 @@ use compute_tools::spec::*;
const BUILD_TAG_DEFAULT: &str = "latest";

fn main() -> Result<()> {
let (build_tag, clap_args) = init()?;

let (startup_context_guard, cli_result) = process_cli(&clap_args)?;

let wait_spec_result = wait_spec(build_tag, cli_result)?;

let (pg_handle, start_pg_result) = start_postgres(&clap_args, wait_spec_result)?;

let wait_pg_result = wait_postgres(pg_handle, startup_context_guard)?;

cleanup_and_exit(start_pg_result, wait_pg_result)
}

fn init() -> Result<(String, clap::ArgMatches)> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;

let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
Expand All @@ -82,7 +97,12 @@ fn main() -> Result<()> {
.to_string();
info!("build_tag: {build_tag}");

let matches = cli().get_matches();
Ok((build_tag, cli().get_matches()))
}

fn process_cli(
matches: &clap::ArgMatches,
) -> Result<(Option<opentelemetry::ContextGuard>, ProcessCliResult)> {
let pgbin_default = "postgres";
let pgbin = matches
.get_one::<String>("pgbin")
Expand Down Expand Up @@ -202,6 +222,47 @@ fn main() -> Result<()> {
}
};

let result = ProcessCliResult {
// directly from CLI:
connstr,
pgdata,
pgbin,
ext_remote_storage,
http_port,
// others:
spec,
live_config_allowed,
};

// TODO: Move startup_context_guard out of this function. It's here right now only because
// that's where it was before, and moving it would've made the diff big.
Ok((startup_context_guard, result))
}

struct ProcessCliResult<'clap> {
connstr: &'clap str,
pgdata: &'clap str,
pgbin: &'clap str,
ext_remote_storage: Option<&'clap str>,
http_port: u16,

/// If a spec was provided via CLI or file, the [`ComputeSpec`]
spec: Option<ComputeSpec>,
live_config_allowed: bool,
}

fn wait_spec(
build_tag: String,
ProcessCliResult {
connstr,
pgdata,
pgbin,
ext_remote_storage,
http_port,
spec,
live_config_allowed,
}: ProcessCliResult,
) -> Result<WaitSpecResult> {
let mut new_state = ComputeState::new();
let spec_set;

Expand Down Expand Up @@ -256,6 +317,19 @@ fn main() -> Result<()> {
}
}

Ok(WaitSpecResult { compute, http_port })
}

struct WaitSpecResult {
compute: Arc<ComputeNode>,
// passed through from ProcessCliResult
http_port: u16,
}

fn start_postgres(
matches: &clap::ArgMatches,
WaitSpecResult { compute, http_port }: WaitSpecResult,
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();

Expand Down Expand Up @@ -336,7 +410,7 @@ fn main() -> Result<()> {
// This token is used internally by the monitor to clean up all threads
let token = CancellationToken::new();

let vm_monitor = &rt.as_ref().map(|rt| {
let vm_monitor = rt.as_ref().map(|rt| {
rt.spawn(vm_monitor::start(
Box::leak(Box::new(vm_monitor::Args {
cgroup: cgroup.cloned(),
Expand All @@ -349,11 +423,47 @@ fn main() -> Result<()> {
}
}

Ok((
pg,
StartPostgresResult {
delay_exit,
compute,
#[cfg(target_os = "linux")]
rt,
#[cfg(target_os = "linux")]
token,
#[cfg(target_os = "linux")]
vm_monitor,
},
))
}

type PostgresHandle = (std::process::Child, std::thread::JoinHandle<()>);

struct StartPostgresResult {
delay_exit: bool,
// passed through from WaitSpecResult
compute: Arc<ComputeNode>,

#[cfg(target_os = "linux")]
rt: Option<tokio::runtime::Runtime>,
#[cfg(target_os = "linux")]
token: tokio_util::sync::CancellationToken,
#[cfg(target_os = "linux")]
vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
}

fn wait_postgres(
pg: Option<PostgresHandle>,
startup_context_guard: Option<opentelemetry::ContextGuard>,
) -> Result<WaitPostgresResult> {
// Wait for the child Postgres process forever. In this state Ctrl+C will
// propagate to Postgres and it will be shut down as well.
let mut exit_code = None;
if let Some((mut pg, logs_handle)) = pg {
// Startup is finished, exit the startup tracing span
// TODO: Probably easier to drop startup_context_guard outside this function. It's here
// right now because keeping it here reduced the size of the diff.
drop(startup_context_guard);

let ecode = pg
Expand All @@ -370,6 +480,26 @@ fn main() -> Result<()> {
exit_code = ecode.code()
}

Ok(WaitPostgresResult { exit_code })
}

struct WaitPostgresResult {
exit_code: Option<i32>,
}

fn cleanup_and_exit(
StartPostgresResult {
mut delay_exit,
compute,
#[cfg(target_os = "linux")]
vm_monitor,
#[cfg(target_os = "linux")]
token,
#[cfg(target_os = "linux")]
rt,
}: StartPostgresResult,
WaitPostgresResult { exit_code }: WaitPostgresResult,
) -> Result<()> {
// Terminate the vm_monitor so it releases the file watcher on
// /sys/fs/cgroup/neon-postgres.
// Note: the vm-monitor only runs on linux because it requires cgroups.
Expand Down

0 comments on commit c947243

Please sign in to comment.