diff --git a/Cargo.lock b/Cargo.lock index 1752bbb99..66519545b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1748,19 +1748,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "dashmap" -version = "5.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" -dependencies = [ - "cfg-if 1.0.0", - "hashbrown 0.14.2", - "lock_api", - "once_cell", - "parking_lot_core 0.9.9", -] - [[package]] name = "data-encoding" version = "2.4.0" @@ -2149,16 +2136,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "forwarded-header-value" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8835f84f38484cc86f110a805655697908257fb9a7af005234060891557198e9" -dependencies = [ - "nonempty", - "thiserror", -] - [[package]] name = "fqdn" version = "0.3.2" @@ -2279,12 +2256,6 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" -[[package]] -name = "futures-timer" -version = "3.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" - [[package]] name = "futures-util" version = "0.3.29" @@ -3148,24 +3119,6 @@ dependencies = [ "regex", ] -[[package]] -name = "governor" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "821239e5672ff23e2a7060901fa622950bbd80b649cdaadd78d1c1767ed14eb4" -dependencies = [ - "cfg-if 1.0.0", - "dashmap", - "futures", - "futures-timer", - "no-std-compat", - "nonzero_ext", - "parking_lot 0.12.1", - "quanta", - "rand 0.8.5", - "smallvec", -] - [[package]] name = "guppy-workspace-hack" version = "0.1.0" @@ -3893,15 +3846,6 @@ dependencies = [ "libc", ] -[[package]] -name = "mach2" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" -dependencies = [ - "libc", -] - [[package]] name = "malloc_buf" version = "0.0.6" @@ -4130,12 +4074,6 @@ dependencies = [ "pin-utils", ] -[[package]] -name = "no-std-compat" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" - [[package]] name = "nom" version = "7.1.3" @@ -4146,18 +4084,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "nonempty" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" - -[[package]] -name = "nonzero_ext" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" - [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -4798,22 +4724,6 @@ dependencies = [ "unicase", ] -[[package]] -name = "quanta" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" -dependencies = [ - "crossbeam-utils", - "libc", - "mach2", - "once_cell", - "raw-cpuid", - "wasi 0.11.0+wasi-snapshot-preview1", - "web-sys", - "winapi", -] - [[package]] name = "queues" version = "1.1.0" @@ -4921,15 +4831,6 @@ dependencies = [ "rand_core 0.6.4", ] -[[package]] -name = "raw-cpuid" -version = "10.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "raw-window-handle" version = "0.5.2" @@ -6014,8 +5915,6 @@ dependencies = [ "chrono", "clap", "ctor", - "futures", - "http", "once_cell", "portpicker", "pretty_assertions", @@ -6029,8 +5928,6 @@ dependencies = [ "tokio", "tokio-stream", "tonic 0.10.2", - "tower", - "tower_governor", "tracing", "tracing-subscriber", "uuid", @@ -7199,26 +7096,6 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" -[[package]] -name = "tower_governor" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db81d9313372d714152194f3f2b66badda23a783fb6a97462e35f632814f4cff" -dependencies = [ - "axum", - "forwarded-header-value", - "futures", - "futures-core", - "governor", - "http", - "pin-project", - "thiserror", - "tokio", - "tower", - "tower-layer", - "tracing", -] - [[package]] name = "tracing" version = "0.1.40" diff --git a/common/src/models/error.rs b/common/src/models/error.rs index 45f2aac7b..e328bddc0 100644 --- a/common/src/models/error.rs +++ b/common/src/models/error.rs @@ -60,7 +60,6 @@ pub enum ErrorKind { NotReady, ServiceUnavailable, DeleteProjectFailed, - RateLimited(String), } impl From for ApiError { @@ -127,12 +126,6 @@ impl From for ApiError { ErrorKind::Forbidden => (StatusCode::FORBIDDEN, "Forbidden"), ErrorKind::NotReady => (StatusCode::INTERNAL_SERVER_ERROR, "Service not ready"), ErrorKind::DeleteProjectFailed => (StatusCode::INTERNAL_SERVER_ERROR, "Deleting project failed"), - ErrorKind::RateLimited(message) => { - return Self { - message, - status_code: StatusCode::TOO_MANY_REQUESTS.as_u16(), - } - }, }; Self { message: error_message.to_string(), diff --git a/deployer/src/handlers/error.rs b/deployer/src/handlers/error.rs index a58e59de0..292e86728 100644 --- a/deployer/src/handlers/error.rs +++ b/deployer/src/handlers/error.rs @@ -26,8 +26,6 @@ pub enum Error { Internal(#[from] anyhow::Error), #[error("Missing header: {0}")] MissingHeader(String), - #[error("{0}. Retry the request in a few minutes")] - RateLimited(String), } impl Serialize for Error { @@ -49,7 +47,6 @@ impl IntoResponse for Error { let code = match self { Error::NotFound(_) => StatusCode::NOT_FOUND, - Error::RateLimited(_) => StatusCode::TOO_MANY_REQUESTS, _ => StatusCode::INTERNAL_SERVER_ERROR, }; diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index 4aa78816a..fa9faae8c 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -33,7 +33,7 @@ use shuttle_common::{ claims::{Claim, Scope}, models::{ deployment::{DeploymentRequest, CREATE_SERVICE_BODY_LIMIT, GIT_STRINGS_MAX_LENGTH}, - error::{axum::CustomErrorPath, ApiError, ErrorKind}, + error::axum::CustomErrorPath, project::ProjectName, }, request_span, LogItem, @@ -663,16 +663,8 @@ pub async fn get_logs( .collect(), )), Err(error) => { - if error.code() == tonic::Code::Unavailable - && error.metadata().get("x-ratelimit-limit").is_some() - { - Err(Error::RateLimited( - "your application is producing too many logs. Interactions with the shuttle logger service will be rate limited" - .to_string(), - )) - } else { - Err(anyhow!("failed to retrieve logs for deployment").into()) - } + error!(error = %error, "failed to retrieve logs for deployment"); + Err(anyhow!("failed to retrieve logs for deployment").into()) } } } @@ -723,24 +715,10 @@ async fn logs_websocket_handler( "failed to get backlog of logs" ); - if error.code() == tonic::Code::Unavailable - && error.metadata().get("x-ratelimit-limit").is_some() - { - let message = serde_json::to_string( - &ApiError::from( - ErrorKind::RateLimited( - "your application is producing too many logs. Interactions with the shuttle logger service will be rate limited" - .to_string() - ) - )) - .expect("to convert error to json"); + let _ = s + .send(ws::Message::Text("failed to get logs".to_string())) + .await; - let _ = s.send(ws::Message::Text(message)).await; - } else { - let _ = s - .send(ws::Message::Text("failed to get logs".to_string())) - .await; - } let _ = s.close().await; return; } diff --git a/logger/Cargo.toml b/logger/Cargo.toml index 07326a41d..0218b6336 100644 --- a/logger/Cargo.toml +++ b/logger/Cargo.toml @@ -12,7 +12,6 @@ shuttle-proto = { workspace = true, features = ["logger"] } async-trait = { workspace = true } chrono = { workspace = true } clap = { workspace = true } -http = { workspace = true } prost-types = { workspace = true } serde_json = { workspace = true } sqlx = { workspace = true, features = [ @@ -25,8 +24,6 @@ thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } tokio-stream = { workspace = true } tonic = { workspace = true } -tower = { workspace = true } -tower_governor = { version= "0.1.0", features = ["tracing"] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["default"] } @@ -38,5 +35,3 @@ serde_json = { workspace = true } shuttle-common-tests = { workspace = true } uuid = { workspace = true } ctor = { workspace = true } -futures = { workspace = true } -tower = { workspace = true, features = ["util"] } diff --git a/logger/src/lib.rs b/logger/src/lib.rs index 3340f8c8d..98b2418fc 100644 --- a/logger/src/lib.rs +++ b/logger/src/lib.rs @@ -15,7 +15,6 @@ use tracing::{debug, error, field, Span}; pub mod args; mod dal; -pub mod rate_limiting; pub use dal::Postgres; diff --git a/logger/src/main.rs b/logger/src/main.rs index 8e04193ed..1ae2a876b 100644 --- a/logger/src/main.rs +++ b/logger/src/main.rs @@ -8,15 +8,9 @@ use shuttle_common::{ }, log::Backend, }; -use shuttle_logger::{ - args::Args, - rate_limiting::{tonic_error, TonicPeerIpKeyExtractor, BURST_SIZE, REFRESH_INTERVAL}, - Postgres, Service, -}; +use shuttle_logger::{args::Args, Postgres, Service}; use shuttle_proto::logger::logger_server::LoggerServer; use tonic::transport::Server; -use tower::ServiceBuilder; -use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tracing::trace; #[tokio::main] @@ -27,33 +21,12 @@ async fn main() { trace!(args = ?args, "parsed args"); - let governor_config = GovernorConfigBuilder::default() - // Regenerate capacity at a rate of 2 requests per second, meaning the maximum capacity - // for sustained traffic is 2 RPS per peer address. - .per_millisecond(REFRESH_INTERVAL) - // Allow bursts of up to 6 requests, when any burst capacity is used, it will regenerate - // one element at a time at the rate set above. - .burst_size(BURST_SIZE) - .use_headers() - .key_extractor(TonicPeerIpKeyExtractor) - .finish() - .unwrap(); - let mut server_builder = Server::builder() .http2_keepalive_interval(Some(Duration::from_secs(60))) .layer(JwtAuthenticationLayer::new(AuthPublicKey::new( args.auth_uri, ))) - .layer(ExtractPropagationLayer) - .layer( - ServiceBuilder::new() - // This middleware goes above `GovernorLayer` because it will receive errors returned by - // `GovernorLayer`. - .map_err(tonic_error) - .layer(GovernorLayer { - config: &governor_config, - }), - ); + .layer(ExtractPropagationLayer); let postgres = Postgres::new(&args.db_connection_uri).await; diff --git a/logger/src/rate_limiting.rs b/logger/src/rate_limiting.rs deleted file mode 100644 index d14f3ddca..000000000 --- a/logger/src/rate_limiting.rs +++ /dev/null @@ -1,83 +0,0 @@ -use std::net::IpAddr; - -use tonic::{ - metadata::{KeyAndValueRef, MetadataMap}, - transport::server::TcpConnectInfo, - Status, -}; -use tower::BoxError; -use tower_governor::{key_extractor::KeyExtractor, GovernorError}; - -/// The interval at which the rate limiter refreshes one slot in milliseconds. -pub const REFRESH_INTERVAL: u64 = 500; -/// The quota of requests that can be received before rate limiting is applied. -pub const BURST_SIZE: u32 = 6; - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct TonicPeerIpKeyExtractor; - -impl KeyExtractor for TonicPeerIpKeyExtractor { - type Key = IpAddr; - - fn name(&self) -> &'static str { - "peer IP" - } - - fn extract(&self, req: &http::Request) -> Result { - req.extensions() - .get::() - .and_then(|info| info.remote_addr()) - .map(|addr| addr.ip()) - .ok_or(GovernorError::UnableToExtractKey) - } - - fn key_name(&self, key: &Self::Key) -> Option { - Some(key.to_string()) - } -} - -/// Convert errors from the Governor rate limiter layer to tonic statuses. -pub fn tonic_error(e: BoxError) -> tonic::Status { - if let Some(error) = e.downcast_ref::() { - match error.to_owned() { - GovernorError::TooManyRequests { wait_time, headers } => { - let mut response = Status::unavailable(format!( - "received too many requests, wait for {wait_time}ms" - )); - - // Add rate limiting headers: x-ratelimit-remaining, x-ratelimit-after, x-ratelimit-limit. - if let Some(headers) = headers { - let metadata = MetadataMap::from_headers(headers); - - for header in metadata.iter() { - if let KeyAndValueRef::Ascii(key, value) = header { - response.metadata_mut().insert(key, value.clone()); - } - } - } - - response - } - GovernorError::UnableToExtractKey => { - Status::unavailable("unable to extract peer address") - } - GovernorError::Other { headers, .. } => { - let mut response = Status::internal("unexpected error in rate limiter"); - - if let Some(headers) = headers { - let metadata = MetadataMap::from_headers(headers); - - for header in metadata.iter() { - if let KeyAndValueRef::Ascii(key, value) = header { - response.metadata_mut().insert(key, value.clone()); - } - } - } - - response - } - } - } else { - Status::internal("unexpected error in rate limiter") - } -} diff --git a/logger/tests/integration_tests.rs b/logger/tests/integration_tests.rs index d36dfa2d2..81e5b2120 100644 --- a/logger/tests/integration_tests.rs +++ b/logger/tests/integration_tests.rs @@ -36,13 +36,7 @@ fn cleanup() { } mod needs_docker { use super::*; - use futures::future::join_all; use pretty_assertions::assert_eq; - use shuttle_logger::rate_limiting::{ - tonic_error, TonicPeerIpKeyExtractor, BURST_SIZE, REFRESH_INTERVAL, - }; - use tower::ServiceBuilder; - use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; #[tokio::test] async fn store_and_get_logs() { @@ -197,104 +191,6 @@ mod needs_docker { } } - #[tokio::test] - async fn store_and_get_logs_rate_limited() { - let logger_port = pick_unused_port().unwrap(); - let deployment_id = "runtime-fetch-logs-deployment-id"; - - // Create a unique database name so we have a new database for each test. - let db_name = Uuid::new_v4().to_string(); - - let server = spawn_server(logger_port, db_name); - - let test_future = tokio::spawn(async move { - // Ensure the DB has been created and server has started. - tokio::time::sleep(Duration::from_millis(300)).await; - - let dst = format!("http://localhost:{logger_port}"); - let mut client = LoggerClient::connect(dst).await.unwrap(); - - let store_logs = || async { - client - .clone() - .store_logs(Request::new(StoreLogsRequest { - logs: vec![LogItem { - deployment_id: deployment_id.to_string(), - log_line: Some(LogLine { - service_name: SHUTTLE_SERVICE.to_string(), - tx_timestamp: Some(Timestamp::from(SystemTime::UNIX_EPOCH)), - data: ("log example").as_bytes().to_vec(), - }), - }], - })) - .await - }; - - // Six concurrent requests succeeds when rate limiter is fresh. - let futures = (0..6).map(|_| store_logs()); - let result = join_all(futures).await; - - assert!(result.iter().all(|response| response.is_ok())); - - // Allow rate limiter time to regenerate two requests. - tokio::time::sleep(Duration::from_millis(1000)).await; - - let futures = (0..2).map(|_| store_logs()); - let result = join_all(futures).await; - - assert!(result.iter().all(|response| response.is_ok())); - - // Allow rate limiter time to regenerate two requests. - tokio::time::sleep(Duration::from_millis(1000)).await; - - // Send three requests when the capacity is two. - let futures = (0..3).map(|_| store_logs()); - let result = join_all(futures).await; - - assert_eq!(result.iter().filter(|response| response.is_ok()).count(), 2); - - // Check that the error has the expected status and rate limiting headers. - result - .iter() - .filter(|response| response.is_err()) - .for_each(|err| { - let err = err.as_ref().unwrap_err(); - - assert_eq!(err.code(), tonic::Code::Unavailable); - assert!(err.message().contains("too many requests")); - - let expected = [ - "x-ratelimit-remaining", - "x-ratelimit-after", - "x-ratelimit-limit", - ]; - - let headers = err.metadata(); - assert!(expected.into_iter().all(|key| headers.contains_key(key))); - }); - - // Allow rate limiter to regenerate a slot for the get_logs request. - tokio::time::sleep(Duration::from_millis(500)).await; - - // Verify that all the logs that weren't rate limited were persisted in the logger. - let logs = client - .get_logs(Request::new(LogsRequest { - deployment_id: deployment_id.into(), - })) - .await - .unwrap() - .into_inner() - .log_items; - - assert_eq!(logs.len(), 10); - }); - - tokio::select! { - _ = server => panic!("server stopped first"), - result = test_future => result.expect("test should succeed") - } - } - fn spawn_server(port: u16, db_name: String) -> JoinHandle<()> { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); @@ -303,27 +199,10 @@ mod needs_docker { exec_psql(&format!(r#"CREATE DATABASE "{}";"#, &db_name)); - let governor_config = GovernorConfigBuilder::default() - .per_millisecond(REFRESH_INTERVAL) - .burst_size(BURST_SIZE) - .use_headers() - .key_extractor(TonicPeerIpKeyExtractor) - .finish() - .unwrap(); - tokio::task::spawn(async move { let pg = Postgres::new(&pg_uri).await; Server::builder() .layer(JwtScopesLayer::new(vec![Scope::Logs])) - .layer( - ServiceBuilder::new() - // This middleware goes above `GovernorLayer` because it will receive errors returned by - // `GovernorLayer`. - .map_err(tonic_error) - .layer(GovernorLayer { - config: &governor_config, - }), - ) .add_service(LoggerServer::new(Service::new(pg.get_sender(), pg))) .serve(addr) .await diff --git a/proto/src/lib.rs b/proto/src/lib.rs index f1c0cfa2f..2e0fa77cc 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -256,79 +256,14 @@ pub mod logger { type Item = LogItem; async fn receive(&mut self, items: Vec) { - // A log vector should never be received without any items. We clone the first item - // here so we can use IDs to generate a rate limiting logline, which we will send to - // the logger as a warning to the user that they are being rate limited. - let Some(item) = items.first().cloned() else { - error!("received log vector without any items"); - - return; - }; - if let Err(error) = self .store_logs(Request::new(StoreLogsRequest { logs: items })) .await { - match error.code() { - tonic::Code::Unavailable => { - if error.metadata().get("x-ratelimit-limit").is_some() { - let LogItem { - deployment_id, - log_line, - } = item; - - let LogLine { service_name, .. } = log_line.unwrap(); - - let timestamp = Utc::now(); - - let new_item = LogItem { - deployment_id, - log_line: Some(LogLine { - tx_timestamp: Some(prost_types::Timestamp { - seconds: timestamp.timestamp(), - nanos: timestamp.timestamp_subsec_nanos() as i32, - }), - service_name: Backend::Runtime(service_name.clone()) - .to_string(), - data: "your application is producing too many logs, log recording is being rate limited".into(), - }), - }; - - // Give the rate limiter time to refresh, the duration we need to sleep - // for here is determined by the refresh rate of the rate limiter in - // the logger server. It is currently set to refresh one slot per 500ms, - // so we could get away with a duration of 500ms here, but we give it - // some extra time in case this rate limiting was caused by a short - // burst of activity. - tokio::time::sleep(Duration::from_millis(1500)).await; - - if let Err(error) = self - // NOTE: the request to send this rate limiting log to the logger will - // also expend a slot in the logger rate limiter. - .store_logs(Request::new(StoreLogsRequest { - logs: vec![new_item], - })) - .await - { - error!( - error = &error as &dyn std::error::Error, - "failed to send rate limiting warning to logger service" - ); - }; - } else { - error!( - error = &error as &dyn std::error::Error, - "failed to send batch logs to logger" - ); - } - } - _ => { - error!( - error = &error as &dyn std::error::Error, - "failed to send batch logs to logger" - ); - } - }; + error!( + error = &error as &dyn std::error::Error, + "failed to send batch logs to logger" + ); } } }