Skip to content

Commit

Permalink
[scaling] feat: create a resource service (#906)
Browse files Browse the repository at this point in the history
* feat: skeleton resource-recorder

* test: adding resources

* feat: add resources

* feat: get resources

* refactor: constant service id

* feat: keep track of inactive resources

* test: resource can be updated

* feat: get by project id

* feat: deleting resources

* refactor: sortable ids

* refactor: clippy suggestion

* feat: column indices

* feat: record resource route

* feat: get resources route

* feat: delete resource route

* tests: integration tests

* feat: scope routes

* refactor: verify claim

* refactor: remove resource management from deployer-alpha

* tmp: native-tls

* Apply suggestions from code review

Co-authored-by: Iulian Barbu <14218860+iulianbarbu@users.noreply.github.com>
Co-authored-by: Oddbjørn Grødem <29732646+oddgrd@users.noreply.github.com>

* refactor: make fields private

* refactor: fix file name

* tests: redundant unit tests

* refactor: add last_updated field

* refactor: error on ID errors

* refactor: comments

---------

Co-authored-by: Iulian Barbu <14218860+iulianbarbu@users.noreply.github.com>
Co-authored-by: Oddbjørn Grødem <29732646+oddgrd@users.noreply.github.com>
  • Loading branch information
3 people authored May 19, 2023
1 parent 5defdb4 commit aaeba25
Show file tree
Hide file tree
Showing 27 changed files with 1,729 additions and 366 deletions.
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"gateway",
"proto",
"provisioner",
"resource-recorder",
"runtime",
"service",
"orchestrator"
Expand Down Expand Up @@ -68,6 +69,7 @@ opentelemetry-http = "0.8.0"
pin-project = "1.0.12"
pipe = "0.4.0"
portpicker = "0.1.1"
pretty_assertions = "1.3.0"
prost = "0.11.8"
prost-types = "0.11.0"
rand = "0.8.5"
Expand Down Expand Up @@ -98,4 +100,5 @@ tracing-subscriber = { version = "0.3.16", default-features = false, features =
ttl_cache = "0.5.1"
utoipa = { version = "3.2.1", features = [ "uuid", "chrono" ] }
utoipa-swagger-ui = { version = "3.1.3", features = ["axum"] }
ulid = "1.0.0"
uuid = "1.2.2"
2 changes: 1 addition & 1 deletion codegen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ quote = "1.0.21"
syn = { version = "1.0.104", features = ["full", "extra-traits"] }

[dev-dependencies]
pretty_assertions = "1.3.0"
pretty_assertions = { workspace = true }
trybuild = "1.0.72"

[features]
Expand Down
31 changes: 31 additions & 0 deletions common/src/backends/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,37 @@ where
}
}

pub trait VerifyClaim {
type Error;

fn verify(&self, required_scope: Scope) -> Result<(), Self::Error>;
}

#[cfg(feature = "tonic")]
impl<B> VerifyClaim for tonic::Request<B> {
type Error = tonic::Status;

fn verify(&self, required_scope: Scope) -> Result<(), Self::Error> {
use strum::EnumMessage;

let claim = self
.extensions()
.get::<Claim>()
.ok_or_else(|| tonic::Status::internal("could not get claim"))?;

if claim.scopes.contains(&required_scope) {
Ok(())
} else {
Err(tonic::Status::permission_denied(format!(
"don't have permisson to: {}",
required_scope
.get_documentation()
.unwrap_or("perform this operation")
)))
}
}
}

#[cfg(test)]
mod tests {
use axum::{routing::get, Extension, Router};
Expand Down
3 changes: 2 additions & 1 deletion common/src/claims.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use opentelemetry::global;
use opentelemetry_http::HeaderInjector;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use strum::EnumMessage;
use tower::{Layer, Service};
use tracing::{error, trace, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
Expand All @@ -28,7 +29,7 @@ const ISS: &str = "shuttle";

/// The scope of operations that can be performed on shuttle
/// Every scope defaults to read and will use a suffix for updating tasks
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq, EnumMessage)]
#[serde(rename_all = "snake_case")]
pub enum Scope {
/// Read the details, such as status and address, of a deployment
Expand Down
20 changes: 4 additions & 16 deletions deployer-alpha/src/deployment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::{instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::{
persistence::{DeploymentUpdater, ResourceManager, SecretGetter, SecretRecorder, State},
persistence::{DeploymentUpdater, SecretGetter, SecretRecorder, State},
RuntimeManager,
};
use tokio::sync::{mpsc, Mutex};
Expand All @@ -23,26 +23,24 @@ use self::{deploy_layer::LogRecorder, gateway_client::BuildQueueClient};
const QUEUE_BUFFER_SIZE: usize = 100;
const RUN_BUFFER_SIZE: usize = 100;

pub struct DeploymentManagerBuilder<LR, SR, ADG, DU, SG, RM, QC> {
pub struct DeploymentManagerBuilder<LR, SR, ADG, DU, SG, QC> {
build_log_recorder: Option<LR>,
secret_recorder: Option<SR>,
active_deployment_getter: Option<ADG>,
artifacts_path: Option<PathBuf>,
runtime_manager: Option<Arc<Mutex<RuntimeManager>>>,
deployment_updater: Option<DU>,
secret_getter: Option<SG>,
resource_manager: Option<RM>,
queue_client: Option<QC>,
}

impl<LR, SR, ADG, DU, SG, RM, QC> DeploymentManagerBuilder<LR, SR, ADG, DU, SG, RM, QC>
impl<LR, SR, ADG, DU, SG, QC> DeploymentManagerBuilder<LR, SR, ADG, DU, SG, QC>
where
LR: LogRecorder,
SR: SecretRecorder,
ADG: ActiveDeploymentsGetter,
DU: DeploymentUpdater,
SG: SecretGetter,
RM: ResourceManager,
QC: BuildQueueClient,
{
pub fn build_log_recorder(mut self, build_log_recorder: LR) -> Self {
Expand Down Expand Up @@ -81,12 +79,6 @@ where
self
}

pub fn resource_manager(mut self, resource_manager: RM) -> Self {
self.resource_manager = Some(resource_manager);

self
}

pub fn runtime(mut self, runtime_manager: Arc<Mutex<RuntimeManager>>) -> Self {
self.runtime_manager = Some(runtime_manager);

Expand Down Expand Up @@ -118,7 +110,6 @@ where
.deployment_updater
.expect("a deployment updater to be set");
let secret_getter = self.secret_getter.expect("a secret getter to be set");
let resource_manager = self.resource_manager.expect("a resource manager to be set");

let (queue_send, queue_recv) = mpsc::channel(QUEUE_BUFFER_SIZE);
let (run_send, run_recv) = mpsc::channel(RUN_BUFFER_SIZE);
Expand All @@ -141,7 +132,6 @@ where
deployment_updater,
active_deployment_getter,
secret_getter,
resource_manager,
storage_manager.clone(),
));

Expand Down Expand Up @@ -179,8 +169,7 @@ pub struct DeploymentManager {
impl DeploymentManager {
/// Create a new deployment manager. Manages one or more 'pipelines' for
/// processing service building, loading, and deployment.
pub fn builder<LR, SR, ADG, DU, SG, RM, QC>(
) -> DeploymentManagerBuilder<LR, SR, ADG, DU, SG, RM, QC> {
pub fn builder<LR, SR, ADG, DU, SG, QC>() -> DeploymentManagerBuilder<LR, SR, ADG, DU, SG, QC> {
DeploymentManagerBuilder {
build_log_recorder: None,
secret_recorder: None,
Expand All @@ -189,7 +178,6 @@ impl DeploymentManager {
runtime_manager: None,
deployment_updater: None,
secret_getter: None,
resource_manager: None,
queue_client: None,
}
}
Expand Down
38 changes: 9 additions & 29 deletions deployer-alpha/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use opentelemetry::global;
use portpicker::pick_unused_port;
use shuttle_common::{
claims::{Claim, ClaimService, InjectPropagation},
resource,
storage_manager::ArtifactsStorageManager,
};

Expand All @@ -27,7 +26,7 @@ use uuid::Uuid;
use super::{RunReceiver, State};
use crate::{
error::{Error, Result},
persistence::{DeploymentUpdater, Resource, ResourceManager, SecretGetter},
persistence::{DeploymentUpdater, SecretGetter},
RuntimeManager,
};

Expand All @@ -39,7 +38,6 @@ pub async fn task(
deployment_updater: impl DeploymentUpdater,
active_deployment_getter: impl ActiveDeploymentsGetter,
secret_getter: impl SecretGetter,
resource_manager: impl ResourceManager,
storage_manager: ArtifactsStorageManager,
) {
info!("Run task started");
Expand All @@ -51,7 +49,6 @@ pub async fn task(

let deployment_updater = deployment_updater.clone();
let secret_getter = secret_getter.clone();
let resource_manager = resource_manager.clone();
let storage_manager = storage_manager.clone();

let old_deployments_killer = kill_old_deployments(
Expand Down Expand Up @@ -95,7 +92,6 @@ pub async fn task(
.handle(
storage_manager,
secret_getter,
resource_manager,
runtime_manager,
deployment_updater,
old_deployments_killer,
Expand Down Expand Up @@ -188,13 +184,12 @@ pub struct Built {
}

impl Built {
#[instrument(skip(self, storage_manager, secret_getter, resource_manager, runtime_manager, deployment_updater, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))]
#[instrument(skip(self, storage_manager, secret_getter, runtime_manager, deployment_updater, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))]
#[allow(clippy::too_many_arguments)]
async fn handle(
self,
storage_manager: ArtifactsStorageManager,
secret_getter: impl SecretGetter,
resource_manager: impl ResourceManager,
runtime_manager: Arc<Mutex<RuntimeManager>>,
deployment_updater: impl DeploymentUpdater,
kill_old_deployments: impl futures::Future<Output = Result<()>>,
Expand Down Expand Up @@ -238,7 +233,6 @@ impl Built {
self.service_id,
executable_path.clone(),
secret_getter,
resource_manager,
runtime_client.clone(),
self.claim,
)
Expand All @@ -262,7 +256,6 @@ async fn load(
service_id: Uuid,
executable_path: PathBuf,
secret_getter: impl SecretGetter,
resource_manager: impl ResourceManager,
mut runtime_client: RuntimeClient<ClaimService<InjectPropagation<Channel>>>,
claim: Option<Claim>,
) -> Result<()> {
Expand All @@ -277,14 +270,7 @@ async fn load(

// Get resources from cache when a claim is not set (ie an idl project is started)
let resources = if claim.is_none() {
resource_manager
.get_resources(&service_id)
.await
.unwrap()
.into_iter()
.map(resource::Response::from)
.map(resource::Response::into_bytes)
.collect()
Default::default()
} else {
Default::default()
};
Expand Down Expand Up @@ -322,18 +308,12 @@ async fn load(
// secrets.
info!(success = %response.success, "loading response");

for resource in response.resources {
let resource: resource::Response = serde_json::from_slice(&resource).unwrap();
let resource = Resource {
service_id,
r#type: resource.r#type.into(),
config: resource.config,
data: resource.data,
};
resource_manager
.insert_resource(&resource)
.await
.expect("to add resource to persistence");
for _resource in response.resources {
// TODO: restore in new deployer after loading runtime
// resource_manager
// .insert_resource(&resource)
// .await
// .expect("to add resource to persistence");
}

if response.success {
Expand Down
21 changes: 11 additions & 10 deletions deployer-alpha/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use utoipa_swagger_ui::SwaggerUi;
use uuid::Uuid;

use crate::deployment::{DeploymentManager, Queued};
use crate::persistence::{Deployment, Log, Persistence, ResourceManager, SecretGetter, State};
use crate::persistence::{Deployment, Log, Persistence, SecretGetter, State};

use std::collections::HashMap;

Expand Down Expand Up @@ -237,15 +237,16 @@ pub async fn get_service_resources(
Extension(persistence): Extension<Persistence>,
Path((project_name, service_name)): Path<(String, String)>,
) -> Result<Json<Vec<shuttle_common::resource::Response>>> {
if let Some(service) = persistence.get_service_by_name(&service_name).await? {
let resources = persistence
.get_resources(&service.id)
.await?
.into_iter()
.map(Into::into)
.collect();

Ok(Json(resources))
if let Some(_service) = persistence.get_service_by_name(&service_name).await? {
// TODO: restore in new gateway
// let resources = persistence
// .get_resources(&service.id)
// .await?
// .into_iter()
// .map(Into::into)
// .collect();

Ok(Json(Default::default()))
} else {
Err(Error::NotFound("service not found".to_string()))
}
Expand Down
1 change: 0 additions & 1 deletion deployer-alpha/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub async fn start(
.runtime(runtime_manager)
.deployment_updater(persistence.clone())
.secret_getter(persistence.clone())
.resource_manager(persistence.clone())
.queue_client(GatewayClient::new(args.gateway_uri))
.build();

Expand Down
Loading

0 comments on commit aaeba25

Please sign in to comment.