Skip to content

Commit

Permalink
cargo-shuttle: separated unix from windows local_run (#823)
Browse files Browse the repository at this point in the history
  • Loading branch information
iulianbarbu authored and oddgrd committed Apr 27, 2023
1 parent e1bf7fa commit 46be7b5
Showing 1 changed file with 121 additions and 114 deletions.
235 changes: 121 additions & 114 deletions cargo-shuttle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use shuttle_service::builder::{build_workspace, BuiltService};
use std::fmt::Write;
use strum::IntoEnumIterator;
use tar::Builder;
use tracing::{error, info, trace, warn};
use tracing::{error, trace, warn};
use uuid::Uuid;

use crate::args::{DeploymentCommand, ProjectCommand, ResourceCommand};
Expand Down Expand Up @@ -652,7 +652,7 @@ impl Shuttle {
runtime_client: &mut RuntimeClient<ClaimService<InjectPropagation<Channel>>>,
) -> Result<(), Status> {
let stop_request = StopRequest {};
info!(?stop_request, "stopping service");
trace!(?stop_request, "stopping service");
let response = runtime_client
.stop(tonic::Request::new(stop_request))
.or_else(|err| async {
Expand All @@ -662,7 +662,7 @@ impl Shuttle {
})
.await?
.into_inner();
info!(response = ?response, "client stop response: ");
trace!(response = ?response, "client stop response: ");
Ok(())
}

Expand Down Expand Up @@ -702,7 +702,7 @@ impl Shuttle {
Ok(())
}

async fn local_run(&self, run_args: RunArgs) -> Result<()> {
async fn pre_local_run(&self, run_args: &RunArgs) -> Result<Vec<BuiltService>> {
trace!("starting a local run for a service: {run_args:?}");

let (tx, rx): (crossbeam_channel::Sender<Message>, _) = crossbeam_channel::bounded(0);
Expand All @@ -729,6 +729,12 @@ impl Shuttle {
working_directory.display()
);

// Compile all the alpha or shuttle-next services in the workspace.
build_workspace(working_directory, run_args.release, tx).await
}

async fn setup_local_provisioner(
) -> Result<(JoinHandle<Result<(), tonic::transport::Error>>, u16)> {
let provisioner = LocalProvisioner::new()?;
let provisioner_port =
portpicker::pick_unused_port().expect("unable to find available port");
Expand All @@ -737,23 +743,19 @@ impl Shuttle {
provisioner_port,
));

// Compile all the alpha or shuttle-next services in the workspace.
let services = build_workspace(working_directory, run_args.release, tx).await?;

let (mut sigterm_notif, mut sigint_notif) = if cfg!(target_family = "unix") {
(
Some(
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Can not get the SIGTERM signal receptor"),
),
Some(
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
.expect("Can not get the SIGINT signal receptor"),
),
)
} else {
(None, None)
};
Ok((provisioner_server, provisioner_port))
}

#[cfg(target_family = "unix")]
async fn local_run(&self, run_args: RunArgs) -> Result<()> {
let services = Shuttle::pre_local_run(self, &run_args).await?;
let (provisioner_server, provisioner_port) = Shuttle::setup_local_provisioner().await?;
let mut sigterm_notif =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Can not get the SIGTERM signal receptor");
let mut sigint_notif =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
.expect("Can not get the SIGINT signal receptor");

// Start all the services.
let mut runtimes: Vec<(
Expand All @@ -763,46 +765,28 @@ impl Shuttle {
let mut signal_received = false;
for (i, service) in services.iter().enumerate() {
// We must cover the case of starting multiple workspace services and receiving a signal in parallel.
// This must stop all the existing runtimes and stop creating new ones.
if cfg!(target_family = "unix") {
let sigterm = sigterm_notif.as_mut().expect("SIGTERM reactor failure");
let sigint = sigint_notif.as_mut().expect("SIGINT reactor failure");
signal_received = tokio::select! {
res = Shuttle::spin_local_runtime(&run_args, service, &provisioner_server, i as u16, provisioner_port) => {
Shuttle::add_runtime_info(res.unwrap(), &mut runtimes, &provisioner_server).await?;
false
},
_ = sigterm.recv() => {
println!(
"cargo-shuttle received SIGTERM. Killing all the runtimes..."
);
true
},
_ = sigint.recv() => {
println!(
"cargo-shuttle received SIGINT. Killing all the runtimes..."
);
true
}
};

if signal_received {
break;
// This must stop all the existing runtimes and creating new ones.
signal_received = tokio::select! {
res = Shuttle::spin_local_runtime(&run_args, service, &provisioner_server, i as u16, provisioner_port) => {
Shuttle::add_runtime_info(res.unwrap(), &mut runtimes, &provisioner_server).await?;
false
},
_ = sigterm_notif.recv() => {
println!(
"cargo-shuttle received SIGTERM. Killing all the runtimes..."
);
true
},
_ = sigint_notif.recv() => {
println!(
"cargo-shuttle received SIGINT. Killing all the runtimes..."
);
true
}
} else {
Shuttle::add_runtime_info(
Shuttle::spin_local_runtime(
&run_args,
service,
&provisioner_server,
i as u16,
provisioner_port,
)
.await?,
&mut runtimes,
&provisioner_server,
)
.await?;
};

if signal_received {
break;
}
}

Expand All @@ -814,69 +798,92 @@ impl Shuttle {
Shuttle::stop_runtime(&mut rt, &mut rt_client)
.await
.unwrap_or_else(|err| {
info!(status = ?err, "stopping the runtime errored out");
trace!(status = ?err, "stopping the runtime errored out");
});
}
return Ok(());
}

// If no signal was received during runtimes initialization, then we must handle each runtime until
// completion and handle the signals during this time.
if cfg!(target_family = "unix") {
let sigterm = sigterm_notif.as_mut().expect("SIGTERM reactor failure");
let sigint = sigint_notif.as_mut().expect("SIGINT reactor failure");
for (mut rt, mut rt_client) in runtimes {
// If we received a signal while waiting for any runtime we must stop the rest and exit
// the waiting loop.
if signal_received {
Shuttle::stop_runtime(&mut rt, &mut rt_client)
.await
.unwrap_or_else(|err| {
info!(status = ?err, "stopping the runtime errored out");
});
continue;
for (mut rt, mut rt_client) in runtimes {
// If we received a signal while waiting for any runtime we must stop the rest and exit
// the waiting loop.
if signal_received {
Shuttle::stop_runtime(&mut rt, &mut rt_client)
.await
.unwrap_or_else(|err| {
trace!(status = ?err, "stopping the runtime errored out");
});
continue;
}

// Receiving a signal will stop the current runtime we're waiting for.
signal_received = tokio::select! {
res = rt.wait() => {
println!(
"a service future completed with exit status: {:?}",
res.unwrap().code()
);
false
},
_ = sigterm_notif.recv() => {
println!(
"cargo-shuttle received SIGTERM. Killing all the runtimes..."
);
provisioner_server.abort();
Shuttle::stop_runtime(&mut rt, &mut rt_client).await.unwrap_or_else(|err| {
trace!(status = ?err, "stopping the runtime errored out");
});
true
},
_ = sigint_notif.recv() => {
println!(
"cargo-shuttle received SIGINT. Killing all the runtimes..."
);
provisioner_server.abort();
Shuttle::stop_runtime(&mut rt, &mut rt_client).await.unwrap_or_else(|err| {
trace!(status = ?err, "stopping the runtime errored out");
});
true
}
};
}

// Receiving a signal will stop the current runtime we're waiting for.
signal_received = tokio::select! {
res = rt.wait() => {
println!(
"a service future completed with exit status: {:?}",
res.unwrap().code()
);
false
},
_ = sigterm.recv() => {
println!(
"cargo-shuttle received SIGTERM. Killing all the runtimes..."
);
provisioner_server.abort();
Shuttle::stop_runtime(&mut rt, &mut rt_client).await.unwrap_or_else(|err| {
info!(status = ?err, "stopping the runtime errored out");
});
true
},
_ = sigint.recv() => {
println!(
"cargo-shuttle received SIGINT. Killing all the runtimes..."
);
provisioner_server.abort();
Shuttle::stop_runtime(&mut rt, &mut rt_client).await.unwrap_or_else(|err| {
info!(status = ?err, "stopping the runtime errored out");
});
true
}
};
}
} else {
// In case we're not on an unix family OS, we're simply waiting for the runtimes
// to end.
for (mut rt, _) in runtimes {
println!(
"a service future completed with exit status: {:?}",
rt.wait().await?.code()
);
}
Ok(())
}

#[cfg(target_family = "windows")]
async fn local_run(&self, run_args: RunArgs) -> Result<()> {
let services = Shuttle::pre_local_run(&self, &run_args).await?;
let (provisioner_server, provisioner_port) = Shuttle::setup_local_provisioner().await?;

// Start all the services.
let mut runtimes: Vec<(
Child,
RuntimeClient<ClaimService<InjectPropagation<Channel>>>,
)> = Vec::new();
for (i, service) in services.iter().enumerate() {
Shuttle::add_runtime_info(
Shuttle::spin_local_runtime(
&run_args,
service,
&provisioner_server,
i as u16,
provisioner_port,
)
.await?,
&mut runtimes,
&provisioner_server,
)
.await?;
}

for (mut rt, _) in runtimes {
println!(
"a service future completed with exit status: {:?}",
rt.wait().await?.code()
);
}

Ok(())
Expand Down

0 comments on commit 46be7b5

Please sign in to comment.