Skip to content

Commit

Permalink
feat: gateway to start last deploy from idle project (#1121)
Browse files Browse the repository at this point in the history
* refactor: stop cycling resources

* feat: gateway start idle deploys

* refactor: comment deploy scopes

* refactor: better name

* bug: use current time

* bug: skip newest not oldest

* refactor: reverse order from query

---------

Co-authored-by: Iulian Barbu <14218860+iulianbarbu@users.noreply.github.com>
  • Loading branch information
chesedo and iulianbarbu authored Aug 2, 2023
1 parent f770f8a commit 4dfd65c
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 79 deletions.
5 changes: 4 additions & 1 deletion auth/src/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,10 @@ mod tests {
fn deployer_machine() {
let scopes: Vec<Scope> = AccountTier::Deployer.into();

assert_eq!(scopes, vec![Scope::DeploymentPush, Scope::Resources]);
assert_eq!(
scopes,
vec![Scope::DeploymentPush, Scope::Resources, Scope::Service]
);
}
}
}
8 changes: 6 additions & 2 deletions common/src/claims.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ impl ScopeBuilder {

/// Extend the current scopes with those needed by a deployer machine / user.
pub fn with_deploy_rights(mut self) -> Self {
self.0.extend(vec![Scope::DeploymentPush, Scope::Resources]);
self.0.extend(vec![
Scope::DeploymentPush, // To start an idle deploy
Scope::Resources, // To get past resources for an idle deploy
Scope::Service, // To get the running deploy for a service
]);
self
}

Expand All @@ -146,7 +150,7 @@ impl Default for ScopeBuilder {
}
}

#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
pub struct Claim {
/// Expiration time (as UTC timestamp).
pub exp: usize,
Expand Down
6 changes: 3 additions & 3 deletions deployer/src/deployment/deploy_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ mod tests {
service_id: Uuid::new_v4(),
tracing_context: Default::default(),
is_next: false,
claim: None,
claim: Default::default(),
})
.await;

Expand Down Expand Up @@ -872,7 +872,7 @@ mod tests {
data: Bytes::from("violets are red").to_vec(),
will_run_tests: false,
tracing_context: Default::default(),
claim: None,
claim: Default::default(),
})
.await;

Expand Down Expand Up @@ -931,7 +931,7 @@ mod tests {
data: bytes,
will_run_tests: false,
tracing_context: Default::default(),
claim: None,
claim: Default::default(),
}
}
}
2 changes: 1 addition & 1 deletion deployer/src/deployment/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub struct Queued {
pub data: Vec<u8>,
pub will_run_tests: bool,
pub tracing_context: HashMap<String, String>,
pub claim: Option<Claim>,
pub claim: Claim,
}

impl Queued {
Expand Down
31 changes: 12 additions & 19 deletions deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ pub struct Built {
pub service_id: Uuid,
pub tracing_context: HashMap<String, String>,
pub is_next: bool,
pub claim: Option<Claim>,
pub claim: Claim,
}

impl Built {
Expand Down Expand Up @@ -286,7 +286,7 @@ async fn load(
secret_getter: impl SecretGetter,
resource_manager: impl ResourceManager,
mut runtime_client: RuntimeClient<ClaimService<InjectPropagation<Channel>>>,
claim: Option<Claim>,
claim: Claim,
) -> Result<()> {
info!(
"loading project from: {}",
Expand All @@ -297,19 +297,14 @@ async fn load(
.unwrap_or_default()
);

// Get resources from cache when a claim is not set (ie an idl project is started)
let resources = if claim.is_none() {
resource_manager
.get_resources(&service_id)
.await
.unwrap()
.into_iter()
.map(resource::Response::from)
.map(resource::Response::into_bytes)
.collect()
} else {
Default::default()
};
let resources = resource_manager
.get_resources(&service_id)
.await
.unwrap()
.into_iter()
.map(resource::Response::from)
.map(resource::Response::into_bytes)
.collect();

let secrets = secret_getter
.get_secrets(&service_id)
Expand All @@ -329,9 +324,7 @@ async fn load(
secrets,
});

if let Some(claim) = claim {
load_request.extensions_mut().insert(claim);
}
load_request.extensions_mut().insert(claim);

debug!(service_name = %service_name, "loading service");
let response = runtime_client.load(load_request).await;
Expand Down Expand Up @@ -749,7 +742,7 @@ mod tests {
service_id: Uuid::new_v4(),
tracing_context: Default::default(),
is_next: false,
claim: None,
claim: Default::default(),
},
storage_manager,
)
Expand Down
4 changes: 2 additions & 2 deletions deployer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ pub async fn create_service(
data: deployment_req.data,
will_run_tests: !deployment_req.no_test,
tracing_context: Default::default(),
claim: Some(claim),
claim,
};

deployment_manager.queue_push(queued).await;
Expand Down Expand Up @@ -521,7 +521,7 @@ pub async fn start_deployment(
service_id: deployment.service_id,
tracing_context: Default::default(),
is_next: deployment.is_next,
claim: Some(claim),
claim,
};
deployment_manager.run_push(built).await;

Expand Down
21 changes: 9 additions & 12 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{convert::Infallible, net::SocketAddr, sync::Arc};

pub use args::Args;
pub use deployment::deploy_layer::DeployLayer;
use deployment::{Built, DeploymentManager};
use deployment::DeploymentManager;
use fqdn::FQDN;
use hyper::{
server::conn::AddrStream,
Expand Down Expand Up @@ -45,17 +45,14 @@ pub async fn start(
persistence.cleanup_invalid_states().await.unwrap();

let runnable_deployments = persistence.get_all_runnable_deployments().await.unwrap();
info!(count = %runnable_deployments.len(), "enqueuing runnable deployments");
for existing_deployment in runnable_deployments {
let built = Built {
id: existing_deployment.id,
service_name: existing_deployment.service_name,
service_id: existing_deployment.service_id,
tracing_context: Default::default(),
is_next: existing_deployment.is_next,
claim: None, // This will cause us to read the resource info from past provisions
};
deployment_manager.run_push(built).await;
info!(count = %runnable_deployments.len(), "stopping all but last running deploy");

// Make sure we don't stop the last running deploy. This works because they are returned in descending order.
for existing_deployment in runnable_deployments.into_iter().skip(1) {
persistence
.stop_running_deployment(existing_deployment)
.await
.unwrap();
}

let mut builder = handlers::RouterBuilder::new(
Expand Down
18 changes: 15 additions & 3 deletions deployer/src/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl Persistence {
FROM deployments AS d
JOIN services AS s ON s.id = d.service_id
WHERE state = ?
ORDER BY last_update"#,
ORDER BY last_update DESC"#,
)
.bind(State::Running)
.fetch_all(&self.pool)
Expand Down Expand Up @@ -328,6 +328,18 @@ impl Persistence {
pub fn get_log_sender(&self) -> crossbeam_channel::Sender<deploy_layer::Log> {
self.log_send.clone()
}

pub async fn stop_running_deployment(&self, deployable: DeploymentRunnable) -> Result<()> {
update_deployment(
&self.pool,
DeploymentState {
id: deployable.id,
last_update: Utc::now(),
state: State::Stopped,
},
)
.await
}
}

async fn update_deployment(pool: &SqlitePool, state: impl Into<DeploymentState>) -> Result<()> {
Expand Down Expand Up @@ -935,7 +947,7 @@ mod tests {
runnable,
[
DeploymentRunnable {
id: id_1,
id: id_3,
service_name: "foo".to_string(),
service_id: foo_id,
is_next: false,
Expand All @@ -947,7 +959,7 @@ mod tests {
is_next: true,
},
DeploymentRunnable {
id: id_3,
id: id_1,
service_name: "foo".to_string(),
service_id: foo_id,
is_next: false,
Expand Down
8 changes: 3 additions & 5 deletions gateway/src/api/latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ async fn create_project(
service
.new_task()
.project(project.clone())
.and_then(task::run_until_done())
.and_then(task::start_idle_deploys())
.send(&sender)
.await?;

Expand Down Expand Up @@ -291,11 +293,7 @@ async fn get_status(

// Compute auth status.
let auth_status = {
let response = AUTH_CLIENT
.get_or_init(reqwest::Client::new)
.get(service.auth_uri().to_string())
.send()
.await;
let response = AUTH_CLIENT.get(service.auth_uri().clone()).await;
match response {
Ok(response) if response.status() == 200 => StatusResponse::healthy(),
Ok(_) | Err(_) => StatusResponse::unhealthy(),
Expand Down
6 changes: 4 additions & 2 deletions gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use std::fmt::Formatter;
use std::io;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::OnceLock;

use acme::AcmeClientError;
use axum::response::{IntoResponse, Response};
use axum::Json;
use bollard::Docker;
use futures::prelude::*;
use hyper::client::HttpConnector;
use hyper::Client;
use once_cell::sync::Lazy;
use serde::{Deserialize, Deserializer, Serialize};
use service::ContainerSettings;
use shuttle_common::models::error::{ApiError, ErrorKind};
Expand All @@ -31,7 +33,7 @@ pub mod task;
pub mod tls;
pub mod worker;

static AUTH_CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
static AUTH_CLIENT: Lazy<Client<HttpConnector>> = Lazy::new(Client::new);

/// Server-side errors that do not have to do with the user runtime
/// should be [`Error`]s.
Expand Down
78 changes: 70 additions & 8 deletions gateway/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@ use bollard::network::{ConnectNetworkOptions, DisconnectNetworkOptions};
use bollard::system::EventsOptions;
use fqdn::FQDN;
use futures::prelude::*;
use http::header::AUTHORIZATION;
use http::uri::InvalidUri;
use http::Uri;
use http::{Method, Request, Uri};
use hyper::client::HttpConnector;
use hyper::Client;
use hyper::{Body, Client};
use once_cell::sync::Lazy;
use rand::distributions::{Alphanumeric, DistString};
use serde::{Deserialize, Serialize};
use shuttle_common::backends::headers::{X_SHUTTLE_ACCOUNT_NAME, X_SHUTTLE_ADMIN_SECRET};
use shuttle_common::models::project::{idle_minutes, IDLE_MINUTES};
use shuttle_common::models::service;
use tokio::time::{sleep, timeout};
use tracing::{debug, error, info, instrument};
use tracing::{debug, error, info, instrument, trace};
use uuid::Uuid;

use crate::service::ContainerSettings;
use crate::{
Expand Down Expand Up @@ -72,7 +76,7 @@ const MAX_REBOOTS: usize = 3;
// Client used for health checks
static CLIENT: Lazy<Client<HttpConnector>> = Lazy::new(Client::new);
// Health check must succeed within 10 seconds
static IS_HEALTHY_TIMEOUT: Duration = Duration::from_secs(10);
pub static IS_HEALTHY_TIMEOUT: Duration = Duration::from_secs(10);

#[async_trait]
impl<Ctx> Refresh<Ctx> for ContainerInspectResponse
Expand Down Expand Up @@ -1178,8 +1182,10 @@ impl ProjectReady {
self.service.is_healthy().await
}

pub async fn start_last_deploy(&mut self, api_key: String) {
self.service.start_last_deploy(api_key).await
pub async fn start_last_deploy(&mut self, jwt: String, admin_secret: String) {
if let Err(error) = self.service.start_last_deploy(jwt, admin_secret).await {
error!(error, "failed to start last running deploy");
};
}
}

Expand Down Expand Up @@ -1239,8 +1245,64 @@ impl Service {
is_healthy
}

pub async fn start_last_deploy(&mut self, _api_key: String) {
// TODO: convert the key to a JWT, get last deployment and start it (ENG-816)
pub async fn start_last_deploy(
&mut self,
jwt: String,
admin_secret: String,
) -> Result<(), Box<dyn std::error::Error>> {
trace!(jwt, "getting last deploy");

let running_id = self.get_running_deploy(&jwt, &admin_secret).await?;

trace!(?running_id, "starting deploy");

if let Some(running_id) = running_id {
// Start this deployment
let uri = self.uri(format!(
"/projects/{}/deployments/{}",
self.name, running_id
))?;

let req = Request::builder()
.method(Method::PUT)
.uri(uri)
.header(AUTHORIZATION, format!("Bearer {}", jwt))
.header(X_SHUTTLE_ACCOUNT_NAME.clone(), "gateway")
.header(X_SHUTTLE_ADMIN_SECRET.clone(), admin_secret)
.body(Body::empty())?;

let _ = timeout(IS_HEALTHY_TIMEOUT, CLIENT.request(req)).await;
}

Ok(())
}

/// Get the last running deployment
async fn get_running_deploy(
&self,
jwt: &str,
admin_secret: &str,
) -> Result<Option<Uuid>, Box<dyn std::error::Error>> {
let uri = self.uri(format!("/projects/{}/services/{}", self.name, self.name))?;

let req = Request::builder()
.uri(uri)
.header(AUTHORIZATION, format!("Bearer {}", jwt))
.header(X_SHUTTLE_ACCOUNT_NAME.clone(), "gateway")
.header(X_SHUTTLE_ADMIN_SECRET.clone(), admin_secret)
.body(Body::empty())?;

let resp = timeout(IS_HEALTHY_TIMEOUT, CLIENT.request(req)).await??;

let body = hyper::body::to_bytes(resp.into_body()).await?;

let service: service::Summary = serde_json::from_slice(&body)?;

if let Some(deployment) = service.deployment {
Ok(Some(deployment.id))
} else {
Ok(None)
}
}
}

Expand Down
Loading

0 comments on commit 4dfd65c

Please sign in to comment.