Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cargo-shuttle: separate local runs per target family #823

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 121 additions & 114 deletions cargo-shuttle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use shuttle_service::builder::{build_workspace, BuiltService};
use std::fmt::Write;
use strum::IntoEnumIterator;
use tar::Builder;
use tracing::{error, info, trace, warn};
use tracing::{error, trace, warn};
use uuid::Uuid;

use crate::args::{DeploymentCommand, ProjectCommand, ResourceCommand};
Expand Down Expand Up @@ -652,7 +652,7 @@ impl Shuttle {
runtime_client: &mut RuntimeClient<ClaimService<InjectPropagation<Channel>>>,
) -> Result<(), Status> {
let stop_request = StopRequest {};
info!(?stop_request, "stopping service");
trace!(?stop_request, "stopping service");
let response = runtime_client
.stop(tonic::Request::new(stop_request))
.or_else(|err| async {
Expand All @@ -662,7 +662,7 @@ impl Shuttle {
})
.await?
.into_inner();
info!(response = ?response, "client stop response: ");
trace!(response = ?response, "client stop response: ");
Ok(())
}

Expand Down Expand Up @@ -702,7 +702,7 @@ impl Shuttle {
Ok(())
}

async fn local_run(&self, run_args: RunArgs) -> Result<()> {
async fn pre_local_run(&self, run_args: &RunArgs) -> Result<Vec<BuiltService>> {
trace!("starting a local run for a service: {run_args:?}");

let (tx, rx): (crossbeam_channel::Sender<Message>, _) = crossbeam_channel::bounded(0);
Expand All @@ -729,6 +729,12 @@ impl Shuttle {
working_directory.display()
);

// Compile all the alpha or shuttle-next services in the workspace.
build_workspace(working_directory, run_args.release, tx).await
}

async fn setup_local_provisioner(
) -> Result<(JoinHandle<Result<(), tonic::transport::Error>>, u16)> {
let provisioner = LocalProvisioner::new()?;
let provisioner_port =
portpicker::pick_unused_port().expect("unable to find available port");
Expand All @@ -737,23 +743,19 @@ impl Shuttle {
provisioner_port,
));

// Compile all the alpha or shuttle-next services in the workspace.
let services = build_workspace(working_directory, run_args.release, tx).await?;

let (mut sigterm_notif, mut sigint_notif) = if cfg!(target_family = "unix") {
(
Some(
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Can not get the SIGTERM signal receptor"),
),
Some(
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
.expect("Can not get the SIGINT signal receptor"),
),
)
} else {
(None, None)
};
Ok((provisioner_server, provisioner_port))
}

#[cfg(target_family = "unix")]
async fn local_run(&self, run_args: RunArgs) -> Result<()> {
let services = Shuttle::pre_local_run(self, &run_args).await?;
let (provisioner_server, provisioner_port) = Shuttle::setup_local_provisioner().await?;
let mut sigterm_notif =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Can not get the SIGTERM signal receptor");
let mut sigint_notif =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
.expect("Can not get the SIGINT signal receptor");

// Start all the services.
let mut runtimes: Vec<(
Expand All @@ -763,46 +765,28 @@ impl Shuttle {
let mut signal_received = false;
for (i, service) in services.iter().enumerate() {
// We must cover the case of starting multiple workspace services and receiving a signal in parallel.
// This must stop all the existing runtimes and stop creating new ones.
if cfg!(target_family = "unix") {
let sigterm = sigterm_notif.as_mut().expect("SIGTERM reactor failure");
let sigint = sigint_notif.as_mut().expect("SIGINT reactor failure");
signal_received = tokio::select! {
res = Shuttle::spin_local_runtime(&run_args, service, &provisioner_server, i as u16, provisioner_port) => {
Shuttle::add_runtime_info(res.unwrap(), &mut runtimes, &provisioner_server).await?;
false
},
_ = sigterm.recv() => {
println!(
"cargo-shuttle received SIGTERM. Killing all the runtimes..."
);
true
},
_ = sigint.recv() => {
println!(
"cargo-shuttle received SIGINT. Killing all the runtimes..."
);
true
}
};

if signal_received {
break;
// This must stop all the existing runtimes and creating new ones.
signal_received = tokio::select! {
res = Shuttle::spin_local_runtime(&run_args, service, &provisioner_server, i as u16, provisioner_port) => {
Shuttle::add_runtime_info(res.unwrap(), &mut runtimes, &provisioner_server).await?;
false
},
_ = sigterm_notif.recv() => {
println!(
"cargo-shuttle received SIGTERM. Killing all the runtimes..."
);
true
},
_ = sigint_notif.recv() => {
println!(
"cargo-shuttle received SIGINT. Killing all the runtimes..."
);
true
}
} else {
Shuttle::add_runtime_info(
Shuttle::spin_local_runtime(
&run_args,
service,
&provisioner_server,
i as u16,
provisioner_port,
)
.await?,
&mut runtimes,
&provisioner_server,
)
.await?;
};

if signal_received {
break;
}
}

Expand All @@ -814,69 +798,92 @@ impl Shuttle {
Shuttle::stop_runtime(&mut rt, &mut rt_client)
.await
.unwrap_or_else(|err| {
info!(status = ?err, "stopping the runtime errored out");
trace!(status = ?err, "stopping the runtime errored out");
});
}
return Ok(());
}

// If no signal was received during runtimes initialization, then we must handle each runtime until
// completion and handle the signals during this time.
if cfg!(target_family = "unix") {
let sigterm = sigterm_notif.as_mut().expect("SIGTERM reactor failure");
let sigint = sigint_notif.as_mut().expect("SIGINT reactor failure");
for (mut rt, mut rt_client) in runtimes {
// If we received a signal while waiting for any runtime we must stop the rest and exit
// the waiting loop.
if signal_received {
Shuttle::stop_runtime(&mut rt, &mut rt_client)
.await
.unwrap_or_else(|err| {
info!(status = ?err, "stopping the runtime errored out");
});
continue;
for (mut rt, mut rt_client) in runtimes {
// If we received a signal while waiting for any runtime we must stop the rest and exit
// the waiting loop.
if signal_received {
Shuttle::stop_runtime(&mut rt, &mut rt_client)
.await
.unwrap_or_else(|err| {
trace!(status = ?err, "stopping the runtime errored out");
});
continue;
}

// Receiving a signal will stop the current runtime we're waiting for.
signal_received = tokio::select! {
res = rt.wait() => {
println!(
"a service future completed with exit status: {:?}",
res.unwrap().code()
);
false
},
_ = sigterm_notif.recv() => {
println!(
"cargo-shuttle received SIGTERM. Killing all the runtimes..."
);
provisioner_server.abort();
Shuttle::stop_runtime(&mut rt, &mut rt_client).await.unwrap_or_else(|err| {
trace!(status = ?err, "stopping the runtime errored out");
});
true
},
_ = sigint_notif.recv() => {
println!(
"cargo-shuttle received SIGINT. Killing all the runtimes..."
);
provisioner_server.abort();
Shuttle::stop_runtime(&mut rt, &mut rt_client).await.unwrap_or_else(|err| {
trace!(status = ?err, "stopping the runtime errored out");
});
true
}
};
}

// Receiving a signal will stop the current runtime we're waiting for.
signal_received = tokio::select! {
res = rt.wait() => {
println!(
"a service future completed with exit status: {:?}",
res.unwrap().code()
);
false
},
_ = sigterm.recv() => {
println!(
"cargo-shuttle received SIGTERM. Killing all the runtimes..."
);
provisioner_server.abort();
Shuttle::stop_runtime(&mut rt, &mut rt_client).await.unwrap_or_else(|err| {
info!(status = ?err, "stopping the runtime errored out");
});
true
},
_ = sigint.recv() => {
println!(
"cargo-shuttle received SIGINT. Killing all the runtimes..."
);
provisioner_server.abort();
Shuttle::stop_runtime(&mut rt, &mut rt_client).await.unwrap_or_else(|err| {
info!(status = ?err, "stopping the runtime errored out");
});
true
}
};
}
} else {
// In case we're not on an unix family OS, we're simply waiting for the runtimes
// to end.
for (mut rt, _) in runtimes {
println!(
"a service future completed with exit status: {:?}",
rt.wait().await?.code()
);
}
Ok(())
}

#[cfg(target_family = "windows")]
async fn local_run(&self, run_args: RunArgs) -> Result<()> {
let services = Shuttle::pre_local_run(&self, &run_args).await?;
let (provisioner_server, provisioner_port) = Shuttle::setup_local_provisioner().await?;

// Start all the services.
let mut runtimes: Vec<(
Child,
RuntimeClient<ClaimService<InjectPropagation<Channel>>>,
)> = Vec::new();
for (i, service) in services.iter().enumerate() {
Shuttle::add_runtime_info(
Shuttle::spin_local_runtime(
&run_args,
service,
&provisioner_server,
i as u16,
provisioner_port,
)
.await?,
&mut runtimes,
&provisioner_server,
)
.await?;
}

for (mut rt, _) in runtimes {
println!(
"a service future completed with exit status: {:?}",
rt.wait().await?.code()
);
}

Ok(())
Expand Down