From a838d5a36cffaebb216a44c11c9433b4efeaeb8e Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Fri, 27 Oct 2023 14:09:48 +0200 Subject: [PATCH 01/17] feat(logger): rate limit based on peer address --- Cargo.lock | 110 ++++++++++++++++++++++++++++++ logger/Cargo.toml | 5 ++ logger/src/lib.rs | 1 + logger/src/main.rs | 28 +++++++- logger/src/rate_limiting.rs | 72 +++++++++++++++++++ logger/tests/integration_tests.rs | 108 +++++++++++++++++++++++++++++ 6 files changed, 322 insertions(+), 2 deletions(-) create mode 100644 logger/src/rate_limiting.rs diff --git a/Cargo.lock b/Cargo.lock index b6bc4fc57..4f5f86b1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2159,6 +2159,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "forwarded-header-value" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8835f84f38484cc86f110a805655697908257fb9a7af005234060891557198e9" +dependencies = [ + "nonempty", + "thiserror", +] + [[package]] name = "fqdn" version = "0.2.3" @@ -2279,6 +2289,12 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.28" @@ -3142,6 +3158,24 @@ dependencies = [ "regex", ] +[[package]] +name = "governor" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "821239e5672ff23e2a7060901fa622950bbd80b649cdaadd78d1c1767ed14eb4" +dependencies = [ + "cfg-if 1.0.0", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.1", + "quanta", + "rand 0.8.5", + "smallvec", +] + [[package]] name = "guppy-workspace-hack" version = "0.1.0" @@ -3864,6 +3898,15 @@ dependencies = [ "libc", ] +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "malloc_buf" version = "0.0.6" @@ -4098,6 +4141,12 @@ dependencies = [ "pin-utils", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -4108,6 +4157,18 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonempty" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" + +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -4748,6 +4809,22 @@ dependencies = [ "unicase", ] +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "queues" version = "1.1.0" @@ -4855,6 +4932,15 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "raw-window-handle" version = "0.5.2" @@ -5950,9 +6036,11 @@ name = "shuttle-logger" version = "0.30.1" dependencies = [ "async-trait", + "axum", "chrono", "clap", "ctor", + "futures", "once_cell", "portpicker", "pretty_assertions", @@ -5966,6 +6054,8 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tower", + "tower_governor", "tracing", "tracing-subscriber", "uuid", @@ -7096,6 +7186,26 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +[[package]] +name = "tower_governor" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db81d9313372d714152194f3f2b66badda23a783fb6a97462e35f632814f4cff" +dependencies = [ + "axum", + "forwarded-header-value", + "futures", + "futures-core", + "governor", + "http", + "pin-project", + "thiserror", + "tokio", + "tower", + "tower-layer", + "tracing", +] + [[package]] name = "tracing" version = "0.1.40" diff --git a/logger/Cargo.toml b/logger/Cargo.toml index 14489b3cb..d6d2f018c 100644 --- a/logger/Cargo.toml +++ b/logger/Cargo.toml @@ -7,6 +7,7 @@ repository.workspace = true [dependencies] async-trait = { workspace = true } +axum = { workspace = true } chrono = { workspace = true } clap = { workspace = true } prost-types = { workspace = true } @@ -22,6 +23,8 @@ thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } tokio-stream = { workspace = true } tonic = { workspace = true } +tower = { workspace = true } +tower_governor = { version= "0.1.0", features = ["tracing"] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["default"] } @@ -40,3 +43,5 @@ serde_json = { workspace = true } shuttle-common-tests = { workspace = true } uuid = { workspace = true } ctor = { workspace = true } +futures = { workspace = true } +tower = { workspace = true, features = ["util"] } diff --git a/logger/src/lib.rs b/logger/src/lib.rs index 98b2418fc..3340f8c8d 100644 --- a/logger/src/lib.rs +++ b/logger/src/lib.rs @@ -15,6 +15,7 @@ use tracing::{debug, error, field, Span}; pub mod args; mod dal; +pub mod rate_limiting; pub use dal::Postgres; diff --git a/logger/src/main.rs b/logger/src/main.rs index 1ae2a876b..c1359875f 100644 --- a/logger/src/main.rs +++ b/logger/src/main.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use axum::error_handling::HandleErrorLayer; use clap::Parser; use shuttle_common::{ backends::{ @@ -8,9 +9,15 @@ use shuttle_common::{ }, log::Backend, }; -use shuttle_logger::{args::Args, Postgres, Service}; +use shuttle_logger::{ + args::Args, + rate_limiting::{tonic_error, TonicPeerIpKeyExtractor}, + Postgres, Service, +}; use shuttle_proto::logger::logger_server::LoggerServer; use tonic::transport::Server; +use tower::BoxError; +use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tracing::trace; #[tokio::main] @@ -21,12 +28,29 @@ async fn main() { trace!(args = ?args, "parsed args"); + // The server can receive no more than 6 requests per peer address per second. + let governor_config = GovernorConfigBuilder::default() + .per_second(1) + .burst_size(6) + .use_headers() + .key_extractor(TonicPeerIpKeyExtractor) + .finish() + .unwrap(); + let mut server_builder = Server::builder() .http2_keepalive_interval(Some(Duration::from_secs(60))) .layer(JwtAuthenticationLayer::new(AuthPublicKey::new( args.auth_uri, ))) - .layer(ExtractPropagationLayer); + .layer(ExtractPropagationLayer) + // This middleware goes above `GovernorLayer` because it will receive errors returned by + // `GovernorLayer`. + .layer(HandleErrorLayer::new(|e: BoxError| async move { + tonic_error(e) + })) + .layer(GovernorLayer { + config: &governor_config, + }); let postgres = Postgres::new(&args.db_connection_uri).await; diff --git a/logger/src/rate_limiting.rs b/logger/src/rate_limiting.rs new file mode 100644 index 000000000..9dea4fbc0 --- /dev/null +++ b/logger/src/rate_limiting.rs @@ -0,0 +1,72 @@ +use std::net::IpAddr; + +use axum::http::{request, Response}; +use tonic::{body::BoxBody, transport::server::TcpConnectInfo, Status}; +use tower::BoxError; +use tower_governor::{key_extractor::KeyExtractor, GovernorError}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TonicPeerIpKeyExtractor; + +impl KeyExtractor for TonicPeerIpKeyExtractor { + type Key = IpAddr; + + fn name(&self) -> &'static str { + "peer IP" + } + + fn extract(&self, req: &request::Request) -> Result { + req.extensions() + .get::() + .and_then(|info| info.remote_addr()) + .map(|addr| addr.ip()) + .ok_or(GovernorError::UnableToExtractKey) + } + + fn key_name(&self, key: &Self::Key) -> Option { + Some(key.to_string()) + } +} + +/// Convert errors from the Governor rate limiter layer to tonic statuses. The errors are +/// captured by an axum error handling layer, so we convert the tonic statuses to http for +/// compatibility with that layer. +pub fn tonic_error(e: BoxError) -> Response { + if e.is::() { + // It shouldn't be possible for this to panic, since we already know it's a GovernorError + let error = e.downcast_ref::().unwrap().to_owned(); + match error { + GovernorError::TooManyRequests { wait_time, headers } => { + // TODO: after upgrading tonic, use tonic types trait extensions to enrich status, + // see example: https://github.com/hyperium/tonic/blob/master/examples/src/richer-error/server.rs. + // We can for example add wait time as: https://docs.rs/tonic-types/latest/tonic_types/struct.RetryInfo.html + + let mut response = Status::unavailable(format!( + "received too many requests, wait for {wait_time}s" + )) + .to_http(); + + // Add rate limiting headers: x-ratelimit-remaining, x-ratelimit-after, x-ratelimit-limit. + if let Some(headers) = headers { + response.headers_mut().extend(headers); + } + + response + } + GovernorError::UnableToExtractKey => { + Status::unavailable("unable to extract client address").to_http() + } + GovernorError::Other { headers, .. } => { + let mut response = Status::internal("unexpected error in rate limiter").to_http(); + + if let Some(headers) = headers { + response.headers_mut().extend(headers); + } + + response + } + } + } else { + Status::internal("unexpected error in rate limiter").to_http() + } +} diff --git a/logger/tests/integration_tests.rs b/logger/tests/integration_tests.rs index 81e5b2120..db6a9775c 100644 --- a/logger/tests/integration_tests.rs +++ b/logger/tests/integration_tests.rs @@ -36,7 +36,11 @@ fn cleanup() { } mod needs_docker { use super::*; + use axum::{error_handling::HandleErrorLayer, BoxError}; + use futures::future::join_all; use pretty_assertions::assert_eq; + use shuttle_logger::rate_limiting::{tonic_error, TonicPeerIpKeyExtractor}; + use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; #[tokio::test] async fn store_and_get_logs() { @@ -191,6 +195,96 @@ mod needs_docker { } } + #[tokio::test] + async fn store_and_get_logs_rate_limited() { + let logger_port = pick_unused_port().unwrap(); + let deployment_id = "runtime-fetch-logs-deployment-id"; + + // Create a unique database name so we have a new database for each test. + let db_name = Uuid::new_v4().to_string(); + + let server = spawn_server(logger_port, db_name); + + let test_future = tokio::spawn(async move { + // Ensure the DB has been created and server has started. + tokio::time::sleep(Duration::from_millis(300)).await; + + let dst = format!("http://localhost:{logger_port}"); + let mut client = LoggerClient::connect(dst).await.unwrap(); + + let store_logs = || async { + client + .clone() + .store_logs(Request::new(StoreLogsRequest { + logs: vec![LogItem { + deployment_id: deployment_id.to_string(), + log_line: Some(LogLine { + service_name: SHUTTLE_SERVICE.to_string(), + tx_timestamp: Some(Timestamp::from(SystemTime::UNIX_EPOCH)), + data: ("log example").as_bytes().to_vec(), + }), + }], + })) + .await + }; + + // Two concurrent requests succeeds. + let futures = (0..2).map(|_| store_logs()); + let result = join_all(futures).await; + + assert!(result.iter().all(|response| response.is_ok())); + + // Allow rate limiter time to regenerate. + tokio::time::sleep(Duration::from_millis(1000)).await; + + // If we send 6 concurrent requests, 5 will succeed. + let futures = (0..6).map(|_| store_logs()); + let result = join_all(futures).await; + + assert_eq!(result.iter().filter(|response| response.is_ok()).count(), 5); + + // Check that the error has the expected status and rate limiting headers. + result + .iter() + .filter(|response| response.is_err()) + .for_each(|err| { + let err = err.as_ref().unwrap_err(); + + assert_eq!(err.code(), tonic::Code::Unavailable); + assert!(err.message().contains("too many requests")); + + let expected = [ + "x-ratelimit-remaining", + "x-ratelimit-after", + "x-ratelimit-limit", + ]; + + let headers = err.metadata(); + assert!(expected.into_iter().all(|key| headers.contains_key(key))); + }); + + // Allow rate limiter to regenerate. + tokio::time::sleep(Duration::from_millis(1000)).await; + + // Verify that all the logs that weren't rate limited were persisted in the logger. + let logs = client + .get_logs(Request::new(LogsRequest { + deployment_id: deployment_id.into(), + })) + .await + .unwrap() + .into_inner() + .log_items; + + assert_eq!(logs.len(), 7); + }); + + tokio::select! { + _ = server => panic!("server stopped first"), + result = test_future => result.expect("test should succeed") + } + } + fn spawn_server(port: u16, db_name: String) -> JoinHandle<()> { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); @@ -199,10 +293,24 @@ mod needs_docker { exec_psql(&format!(r#"CREATE DATABASE "{}";"#, &db_name)); + let governor_config = GovernorConfigBuilder::default() + .per_second(1) + .burst_size(6) + .use_headers() + .key_extractor(TonicPeerIpKeyExtractor) + .finish() + .unwrap(); + tokio::task::spawn(async move { let pg = Postgres::new(&pg_uri).await; Server::builder() .layer(JwtScopesLayer::new(vec![Scope::Logs])) + .layer(HandleErrorLayer::new(|e: BoxError| async move { + tonic_error(e) + })) + .layer(GovernorLayer { + config: &governor_config, + }) .add_service(LoggerServer::new(Service::new(pg.get_sender(), pg))) .serve(addr) .await From a8a97326effbd0359baec27d38998d2fdaa91ed5 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Fri, 27 Oct 2023 17:24:15 +0200 Subject: [PATCH 02/17] feat(proto): reduce log batch size --- logger/src/main.rs | 2 +- proto/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/logger/src/main.rs b/logger/src/main.rs index c1359875f..c2c11a906 100644 --- a/logger/src/main.rs +++ b/logger/src/main.rs @@ -31,7 +31,7 @@ async fn main() { // The server can receive no more than 6 requests per peer address per second. let governor_config = GovernorConfigBuilder::default() .per_second(1) - .burst_size(6) + .burst_size(3) .use_headers() .key_extractor(TonicPeerIpKeyExtractor) .finish() diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 8aaf6f5bd..9d03a4f9e 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -377,7 +377,7 @@ pub mod logger { /// Create a batcher around inner. It will send a batch of items to inner if a capacity of 2048 is reached /// or if an interval of 1 second is reached. pub fn wrap(inner: I) -> Self { - Self::new(inner, 2048, Duration::from_secs(1)) + Self::new(inner, 128, Duration::from_secs(1)) } /// Send a single item into this batcher From 0f2d55077490178dc235ef295654ff19c48daede Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Sat, 28 Oct 2023 13:05:10 +0200 Subject: [PATCH 03/17] feat(logger): refactor out axum layer and dependency --- Cargo.lock | 2 +- logger/Cargo.toml | 2 +- logger/src/main.rs | 20 ++++++++-------- logger/src/rate_limiting.rs | 40 ++++++++++++++++++++----------- logger/tests/integration_tests.rs | 17 +++++++------ 5 files changed, 48 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f5f86b1e..ea221e6fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6036,11 +6036,11 @@ name = "shuttle-logger" version = "0.30.1" dependencies = [ "async-trait", - "axum", "chrono", "clap", "ctor", "futures", + "http", "once_cell", "portpicker", "pretty_assertions", diff --git a/logger/Cargo.toml b/logger/Cargo.toml index d6d2f018c..be52b7910 100644 --- a/logger/Cargo.toml +++ b/logger/Cargo.toml @@ -7,9 +7,9 @@ repository.workspace = true [dependencies] async-trait = { workspace = true } -axum = { workspace = true } chrono = { workspace = true } clap = { workspace = true } +http = { workspace = true } prost-types = { workspace = true } serde_json = { workspace = true } sqlx = { workspace = true, features = [ diff --git a/logger/src/main.rs b/logger/src/main.rs index c2c11a906..b486469f0 100644 --- a/logger/src/main.rs +++ b/logger/src/main.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use axum::error_handling::HandleErrorLayer; use clap::Parser; use shuttle_common::{ backends::{ @@ -16,7 +15,7 @@ use shuttle_logger::{ }; use shuttle_proto::logger::logger_server::LoggerServer; use tonic::transport::Server; -use tower::BoxError; +use tower::ServiceBuilder; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tracing::trace; @@ -43,14 +42,15 @@ async fn main() { args.auth_uri, ))) .layer(ExtractPropagationLayer) - // This middleware goes above `GovernorLayer` because it will receive errors returned by - // `GovernorLayer`. - .layer(HandleErrorLayer::new(|e: BoxError| async move { - tonic_error(e) - })) - .layer(GovernorLayer { - config: &governor_config, - }); + .layer( + ServiceBuilder::new() + // This middleware goes above `GovernorLayer` because it will receive errors returned by + // `GovernorLayer`. + .map_err(tonic_error) + .layer(GovernorLayer { + config: &governor_config, + }), // .map_err(tonic_error), + ); let postgres = Postgres::new(&args.db_connection_uri).await; diff --git a/logger/src/rate_limiting.rs b/logger/src/rate_limiting.rs index 9dea4fbc0..44a73d1ed 100644 --- a/logger/src/rate_limiting.rs +++ b/logger/src/rate_limiting.rs @@ -1,7 +1,10 @@ use std::net::IpAddr; -use axum::http::{request, Response}; -use tonic::{body::BoxBody, transport::server::TcpConnectInfo, Status}; +use tonic::{ + metadata::{KeyAndValueRef, MetadataMap}, + transport::server::TcpConnectInfo, + Status, +}; use tower::BoxError; use tower_governor::{key_extractor::KeyExtractor, GovernorError}; @@ -15,7 +18,7 @@ impl KeyExtractor for TonicPeerIpKeyExtractor { "peer IP" } - fn extract(&self, req: &request::Request) -> Result { + fn extract(&self, req: &http::Request) -> Result { req.extensions() .get::() .and_then(|info| info.remote_addr()) @@ -28,10 +31,8 @@ impl KeyExtractor for TonicPeerIpKeyExtractor { } } -/// Convert errors from the Governor rate limiter layer to tonic statuses. The errors are -/// captured by an axum error handling layer, so we convert the tonic statuses to http for -/// compatibility with that layer. -pub fn tonic_error(e: BoxError) -> Response { +/// Convert errors from the Governor rate limiter layer to tonic statuses. +pub fn tonic_error(e: BoxError) -> tonic::Status { if e.is::() { // It shouldn't be possible for this to panic, since we already know it's a GovernorError let error = e.downcast_ref::().unwrap().to_owned(); @@ -43,30 +44,41 @@ pub fn tonic_error(e: BoxError) -> Response { let mut response = Status::unavailable(format!( "received too many requests, wait for {wait_time}s" - )) - .to_http(); + )); // Add rate limiting headers: x-ratelimit-remaining, x-ratelimit-after, x-ratelimit-limit. if let Some(headers) = headers { - response.headers_mut().extend(headers); + let metadata = MetadataMap::from_headers(headers); + + for header in metadata.iter() { + if let KeyAndValueRef::Ascii(key, value) = header { + response.metadata_mut().insert(key, value.clone()); + } + } } response } GovernorError::UnableToExtractKey => { - Status::unavailable("unable to extract client address").to_http() + Status::unavailable("unable to extract client address") } GovernorError::Other { headers, .. } => { - let mut response = Status::internal("unexpected error in rate limiter").to_http(); + let mut response = Status::internal("unexpected error in rate limiter"); if let Some(headers) = headers { - response.headers_mut().extend(headers); + let metadata = MetadataMap::from_headers(headers); + + for header in metadata.iter() { + if let KeyAndValueRef::Ascii(key, value) = header { + response.metadata_mut().insert(key, value.clone()); + } + } } response } } } else { - Status::internal("unexpected error in rate limiter").to_http() + Status::internal("unexpected error in rate limiter") } } diff --git a/logger/tests/integration_tests.rs b/logger/tests/integration_tests.rs index db6a9775c..d3e33f991 100644 --- a/logger/tests/integration_tests.rs +++ b/logger/tests/integration_tests.rs @@ -36,10 +36,10 @@ fn cleanup() { } mod needs_docker { use super::*; - use axum::{error_handling::HandleErrorLayer, BoxError}; use futures::future::join_all; use pretty_assertions::assert_eq; use shuttle_logger::rate_limiting::{tonic_error, TonicPeerIpKeyExtractor}; + use tower::ServiceBuilder; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; #[tokio::test] @@ -305,12 +305,15 @@ mod needs_docker { let pg = Postgres::new(&pg_uri).await; Server::builder() .layer(JwtScopesLayer::new(vec![Scope::Logs])) - .layer(HandleErrorLayer::new(|e: BoxError| async move { - tonic_error(e) - })) - .layer(GovernorLayer { - config: &governor_config, - }) + .layer( + ServiceBuilder::new() + // This middleware goes above `GovernorLayer` because it will receive errors returned by + // `GovernorLayer`. + .map_err(tonic_error) + .layer(GovernorLayer { + config: &governor_config, + }), // .map_err(tonic_error), + ) .add_service(LoggerServer::new(Service::new(pg.get_sender(), pg))) .serve(addr) .await From 2d42218172064a8becc9aee9009acceee27f8b34 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Wed, 1 Nov 2023 17:27:31 +0100 Subject: [PATCH 04/17] feat(deployer): handle rate limiting error for get_logs --- cargo-shuttle/src/lib.rs | 19 +++++++++++++--- deployer/src/handlers/error.rs | 3 +++ deployer/src/handlers/mod.rs | 40 +++++++++++++++++++++++++++------- 3 files changed, 51 insertions(+), 11 deletions(-) diff --git a/cargo-shuttle/src/lib.rs b/cargo-shuttle/src/lib.rs index 5305a783e..b921a29e9 100644 --- a/cargo-shuttle/src/lib.rs +++ b/cargo-shuttle/src/lib.rs @@ -707,9 +707,22 @@ impl Shuttle { while let Some(Ok(msg)) = stream.next().await { if let tokio_tungstenite::tungstenite::Message::Text(line) = msg { - let log_item: shuttle_common::LogItem = serde_json::from_str(&line) - .context("Failed parsing logs. Is your cargo-shuttle outdated?")?; - println!("{log_item}") + match serde_json::from_str::(&line) { + Ok(log_item) => { + println!("{log_item}") + } + Err(err) => { + debug!(error = %err, "failed to parse logs"); + + let message = if line.contains("rate limit") { + line + } else { + "failed parsing logs, is your cargo-shuttle outdated?".to_string() + }; + + bail!(message); + } + } } } } else { diff --git a/deployer/src/handlers/error.rs b/deployer/src/handlers/error.rs index 531712f58..2982bd6cb 100644 --- a/deployer/src/handlers/error.rs +++ b/deployer/src/handlers/error.rs @@ -27,6 +27,8 @@ pub enum Error { Internal(#[from] anyhow::Error), #[error("Missing header: {0}")] MissingHeader(String), + #[error("{0}. Retry the command in a few minutes")] + RateLimited(String), } impl Serialize for Error { @@ -48,6 +50,7 @@ impl IntoResponse for Error { let code = match self { Error::NotFound(_) => StatusCode::NOT_FOUND, + Error::RateLimited(_) => StatusCode::TOO_MANY_REQUESTS, _ => StatusCode::INTERNAL_SERVER_ERROR, }; diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index 14d1a7b6b..191999b92 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -658,16 +658,27 @@ pub async fn get_logs( logs_request.extensions_mut().insert(claim); let mut client = deployment_manager.logs_fetcher().clone(); - if let Ok(logs) = client.get_logs(logs_request).await { - Ok(Json( + + match client.get_logs(logs_request).await { + Ok(logs) => Ok(Json( logs.into_inner() .log_items .into_iter() .map(|l| l.to_log_item_with_id(deployment_id)) .collect(), - )) - } else { - Err(Error::NotFound("deployment not found".to_string())) + )), + Err(error) => { + if error.code() == tonic::Code::Unavailable + && error.metadata().get("x-ratelimit-after").is_some() + { + Err(Error::RateLimited( + "your application is producing too many logs, any interaction with the shuttle logger service will be rate limited" + .to_string(), + )) + } else { + Err(anyhow!("failed to retrieve logs for deployment").into()) + } + } } } @@ -715,9 +726,22 @@ async fn logs_websocket_handler( "failed to get backlog of logs" ); - let _ = s - .send(ws::Message::Text("failed to get logs".to_string())) - .await; + if error.code() == tonic::Code::Unavailable + && error.metadata().get("x-ratelimit-limit").is_some() + { + let _ = s + .send(ws::Message::Text( + Error::RateLimited( + "your application is producing too many logs, any interaction with the shuttle logger service will be rate limited" + .to_string(), + ).to_string() + )) + .await; + } else { + let _ = s + .send(ws::Message::Text("failed to get logs".to_string())) + .await; + } let _ = s.close().await; return; } From f179eae5c5297a1ed2cdc28b0c6a9ae79f93996c Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Wed, 1 Nov 2023 17:50:07 +0100 Subject: [PATCH 05/17] refactor: increase batch & burst size --- logger/src/main.rs | 4 ++-- proto/src/lib.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/logger/src/main.rs b/logger/src/main.rs index b486469f0..0e3c892a6 100644 --- a/logger/src/main.rs +++ b/logger/src/main.rs @@ -30,7 +30,7 @@ async fn main() { // The server can receive no more than 6 requests per peer address per second. let governor_config = GovernorConfigBuilder::default() .per_second(1) - .burst_size(3) + .burst_size(6) .use_headers() .key_extractor(TonicPeerIpKeyExtractor) .finish() @@ -49,7 +49,7 @@ async fn main() { .map_err(tonic_error) .layer(GovernorLayer { config: &governor_config, - }), // .map_err(tonic_error), + }), ); let postgres = Postgres::new(&args.db_connection_uri).await; diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 9d03a4f9e..20de1e8e0 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -377,7 +377,7 @@ pub mod logger { /// Create a batcher around inner. It will send a batch of items to inner if a capacity of 2048 is reached /// or if an interval of 1 second is reached. pub fn wrap(inner: I) -> Self { - Self::new(inner, 128, Duration::from_secs(1)) + Self::new(inner, 256, Duration::from_secs(1)) } /// Send a single item into this batcher From 34a67e08fbbd3025d05aad6a76ab9b557b643742 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Wed, 1 Nov 2023 18:33:06 +0100 Subject: [PATCH 06/17] feat(logger): send warning to logger when rate limited --- proto/src/lib.rs | 65 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 4 deletions(-) diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 20de1e8e0..e2c80c919 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -342,14 +342,71 @@ pub mod logger { type Item = LogItem; async fn receive(&mut self, items: Vec) { + // A log vector should never be received without any items. We clone the first item + // here so we can use IDs to generate a rate limiting logline. + let Some(item) = items.first().cloned() else { + error!("received log vector without any items"); + + return; + }; + if let Err(error) = self .store_logs(Request::new(StoreLogsRequest { logs: items })) .await { - error!( - error = &error as &dyn std::error::Error, - "failed to send batch logs to logger" - ); + match error.code() { + tonic::Code::Unavailable => { + if let Some(_) = error.metadata().get("x-ratelimit-limit") { + let LogItem { + deployment_id, + log_line, + } = item; + + let LogLine { service_name, .. } = log_line.unwrap(); + + let timestamp = Utc::now(); + + let new_item = LogItem { + deployment_id, + log_line: Some(LogLine { + tx_timestamp: Some(prost_types::Timestamp { + seconds: timestamp.timestamp(), + nanos: timestamp.timestamp_subsec_nanos() as i32, + }), + service_name: Backend::Runtime(service_name.clone()) + .to_string(), + data: "your application is producing too many logs, log recording is being rate limited".into(), + }), + }; + + // Give the rate limiter time to refresh. + tokio::time::sleep(Duration::from_millis(1500)).await; + + if let Err(error) = self + .store_logs(Request::new(StoreLogsRequest { + logs: vec![new_item], + })) + .await + { + error!( + error = &error as &dyn std::error::Error, + "failed to send rate limiting warning to logger service" + ); + }; + } else { + error!( + error = &error as &dyn std::error::Error, + "failed to send batch logs to logger" + ); + } + } + _ => { + error!( + error = &error as &dyn std::error::Error, + "failed to send batch logs to logger" + ); + } + }; } } } From ce320015fb9f3afb2e4ef6c01964cbb6c81132d0 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Wed, 1 Nov 2023 18:33:56 +0100 Subject: [PATCH 07/17] feat(logger): increase refresh rate of rate limiter to 2 RPS --- logger/src/main.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/logger/src/main.rs b/logger/src/main.rs index 0e3c892a6..fabf8e30c 100644 --- a/logger/src/main.rs +++ b/logger/src/main.rs @@ -27,9 +27,12 @@ async fn main() { trace!(args = ?args, "parsed args"); - // The server can receive no more than 6 requests per peer address per second. let governor_config = GovernorConfigBuilder::default() - .per_second(1) + // Regenerate capacity at a rate of 2 requests per second, meaning the maximum capacity + // for sustained traffic is 2 RPS per peer address. + .per_millisecond(500) + // Allow bursts of up to 6 requests, when any burst capacity is used, it will regenerate + // one element at a time at the rate set above. .burst_size(6) .use_headers() .key_extractor(TonicPeerIpKeyExtractor) From 5841342ae0232c5282c445c122a903edd75a5ba8 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Thu, 2 Nov 2023 09:20:48 +0100 Subject: [PATCH 08/17] fix(proto): clippy --- proto/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto/src/lib.rs b/proto/src/lib.rs index e2c80c919..711452330 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -356,7 +356,7 @@ pub mod logger { { match error.code() { tonic::Code::Unavailable => { - if let Some(_) = error.metadata().get("x-ratelimit-limit") { + if error.metadata().get("x-ratelimit-limit").is_some() { let LogItem { deployment_id, log_line, From 0668981403f3dbcd87eec1d8187a4974764cc3ef Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Thu, 2 Nov 2023 09:24:35 +0100 Subject: [PATCH 09/17] tests(logger): refactor rate limiter to 2 RPS --- logger/tests/integration_tests.rs | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/logger/tests/integration_tests.rs b/logger/tests/integration_tests.rs index d3e33f991..fc30e9d39 100644 --- a/logger/tests/integration_tests.rs +++ b/logger/tests/integration_tests.rs @@ -228,20 +228,28 @@ mod needs_docker { .await }; - // Two concurrent requests succeeds. + // Six concurrent requests succeeds when rate limiter is fresh. + let futures = (0..6).map(|_| store_logs()); + let result = join_all(futures).await; + + assert!(result.iter().all(|response| response.is_ok())); + + // Allow rate limiter time to regenerate two requests. + tokio::time::sleep(Duration::from_millis(1000)).await; + let futures = (0..2).map(|_| store_logs()); let result = join_all(futures).await; assert!(result.iter().all(|response| response.is_ok())); - // Allow rate limiter time to regenerate. + // Allow rate limiter time to regenerate two requests. tokio::time::sleep(Duration::from_millis(1000)).await; - // If we send 6 concurrent requests, 5 will succeed. - let futures = (0..6).map(|_| store_logs()); + // Send three requests when the capacity is two. + let futures = (0..3).map(|_| store_logs()); let result = join_all(futures).await; - assert_eq!(result.iter().filter(|response| response.is_ok()).count(), 5); + assert_eq!(result.iter().filter(|response| response.is_ok()).count(), 2); // Check that the error has the expected status and rate limiting headers. result @@ -263,8 +271,8 @@ mod needs_docker { assert!(expected.into_iter().all(|key| headers.contains_key(key))); }); - // Allow rate limiter to regenerate. - tokio::time::sleep(Duration::from_millis(1000)).await; + // Allow rate limiter to regenerate a slot for the get_logs request. + tokio::time::sleep(Duration::from_millis(500)).await; // Verify that all the logs that weren't rate limited were persisted in the logger. let logs = client @@ -276,7 +284,7 @@ mod needs_docker { .into_inner() .log_items; - assert_eq!(logs.len(), 7); + assert_eq!(logs.len(), 10); }); tokio::select! { @@ -294,7 +302,7 @@ mod needs_docker { exec_psql(&format!(r#"CREATE DATABASE "{}";"#, &db_name)); let governor_config = GovernorConfigBuilder::default() - .per_second(1) + .per_millisecond(500) .burst_size(6) .use_headers() .key_extractor(TonicPeerIpKeyExtractor) From 6311872ee21eb095915ee1b897ab9b535d86cbce Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Thu, 2 Nov 2023 09:30:09 +0100 Subject: [PATCH 10/17] refactor(deployer): match more common x-ratelimit-limit header --- deployer/src/handlers/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index 191999b92..3f24f141c 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -669,7 +669,7 @@ pub async fn get_logs( )), Err(error) => { if error.code() == tonic::Code::Unavailable - && error.metadata().get("x-ratelimit-after").is_some() + && error.metadata().get("x-ratelimit-limit").is_some() { Err(Error::RateLimited( "your application is producing too many logs, any interaction with the shuttle logger service will be rate limited" From 2404f5ab6269efd3f2979c99e0cf4381a5d11777 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Thu, 2 Nov 2023 09:38:17 +0100 Subject: [PATCH 11/17] misc(logger): cleanups and comments --- logger/src/rate_limiting.rs | 4 ++-- logger/tests/integration_tests.rs | 2 +- proto/src/lib.rs | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/logger/src/rate_limiting.rs b/logger/src/rate_limiting.rs index 44a73d1ed..a1f2d5ed3 100644 --- a/logger/src/rate_limiting.rs +++ b/logger/src/rate_limiting.rs @@ -43,7 +43,7 @@ pub fn tonic_error(e: BoxError) -> tonic::Status { // We can for example add wait time as: https://docs.rs/tonic-types/latest/tonic_types/struct.RetryInfo.html let mut response = Status::unavailable(format!( - "received too many requests, wait for {wait_time}s" + "received too many requests, wait for {wait_time}ms" )); // Add rate limiting headers: x-ratelimit-remaining, x-ratelimit-after, x-ratelimit-limit. @@ -60,7 +60,7 @@ pub fn tonic_error(e: BoxError) -> tonic::Status { response } GovernorError::UnableToExtractKey => { - Status::unavailable("unable to extract client address") + Status::unavailable("unable to extract peer address") } GovernorError::Other { headers, .. } => { let mut response = Status::internal("unexpected error in rate limiter"); diff --git a/logger/tests/integration_tests.rs b/logger/tests/integration_tests.rs index fc30e9d39..a4a52f72b 100644 --- a/logger/tests/integration_tests.rs +++ b/logger/tests/integration_tests.rs @@ -320,7 +320,7 @@ mod needs_docker { .map_err(tonic_error) .layer(GovernorLayer { config: &governor_config, - }), // .map_err(tonic_error), + }), ) .add_service(LoggerServer::new(Service::new(pg.get_sender(), pg))) .serve(addr) diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 711452330..5f746608b 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -343,7 +343,8 @@ pub mod logger { async fn receive(&mut self, items: Vec) { // A log vector should never be received without any items. We clone the first item - // here so we can use IDs to generate a rate limiting logline. + // here so we can use IDs to generate a rate limiting logline, which we will send to + // the logger as a warning to the user that they are being rate limited. let Some(item) = items.first().cloned() else { error!("received log vector without any items"); From 58ec21dc028ae7171714558d98684f6095ac9656 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Fri, 10 Nov 2023 12:19:23 +0100 Subject: [PATCH 12/17] chore: update lockfile --- Cargo.lock | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index a1a759587..134b38e2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1748,6 +1748,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if 1.0.0", + "hashbrown 0.14.2", + "lock_api", + "once_cell", + "parking_lot_core 0.9.9", +] + [[package]] name = "data-encoding" version = "2.4.0" From c1df918b21087be459daf7eacfd562ac46450f45 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Tue, 14 Nov 2023 09:54:08 +0100 Subject: [PATCH 13/17] misc: rephrase errors, comments, constants --- deployer/src/handlers/error.rs | 2 +- deployer/src/handlers/mod.rs | 4 ++-- logger/src/main.rs | 6 +++--- logger/src/rate_limiting.rs | 5 +++++ logger/tests/integration_tests.rs | 8 +++++--- proto/src/lib.rs | 11 +++++++++-- 6 files changed, 25 insertions(+), 11 deletions(-) diff --git a/deployer/src/handlers/error.rs b/deployer/src/handlers/error.rs index 77568d799..a58e59de0 100644 --- a/deployer/src/handlers/error.rs +++ b/deployer/src/handlers/error.rs @@ -26,7 +26,7 @@ pub enum Error { Internal(#[from] anyhow::Error), #[error("Missing header: {0}")] MissingHeader(String), - #[error("{0}. Retry the command in a few minutes")] + #[error("{0}. Retry the request in a few minutes")] RateLimited(String), } diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index 60d90d795..58bb2e8e4 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -674,7 +674,7 @@ pub async fn get_logs( && error.metadata().get("x-ratelimit-limit").is_some() { Err(Error::RateLimited( - "your application is producing too many logs, any interaction with the shuttle logger service will be rate limited" + "your application is producing too many logs. Interactions with the shuttle logger service will be rate limited for a while" .to_string(), )) } else { @@ -736,7 +736,7 @@ async fn logs_websocket_handler( let _ = s .send(ws::Message::Text( Error::RateLimited( - "your application is producing too many logs, any interaction with the shuttle logger service will be rate limited" + "your application is producing too many logs. Interactions with the shuttle logger service will be rate limited for a while" .to_string(), ).to_string() )) diff --git a/logger/src/main.rs b/logger/src/main.rs index fabf8e30c..8e04193ed 100644 --- a/logger/src/main.rs +++ b/logger/src/main.rs @@ -10,7 +10,7 @@ use shuttle_common::{ }; use shuttle_logger::{ args::Args, - rate_limiting::{tonic_error, TonicPeerIpKeyExtractor}, + rate_limiting::{tonic_error, TonicPeerIpKeyExtractor, BURST_SIZE, REFRESH_INTERVAL}, Postgres, Service, }; use shuttle_proto::logger::logger_server::LoggerServer; @@ -30,10 +30,10 @@ async fn main() { let governor_config = GovernorConfigBuilder::default() // Regenerate capacity at a rate of 2 requests per second, meaning the maximum capacity // for sustained traffic is 2 RPS per peer address. - .per_millisecond(500) + .per_millisecond(REFRESH_INTERVAL) // Allow bursts of up to 6 requests, when any burst capacity is used, it will regenerate // one element at a time at the rate set above. - .burst_size(6) + .burst_size(BURST_SIZE) .use_headers() .key_extractor(TonicPeerIpKeyExtractor) .finish() diff --git a/logger/src/rate_limiting.rs b/logger/src/rate_limiting.rs index a1f2d5ed3..1b56cef75 100644 --- a/logger/src/rate_limiting.rs +++ b/logger/src/rate_limiting.rs @@ -8,6 +8,11 @@ use tonic::{ use tower::BoxError; use tower_governor::{key_extractor::KeyExtractor, GovernorError}; +/// The interval at which the rate limiter refreshes one slot in milliseconds. +pub const REFRESH_INTERVAL: u64 = 500; +/// The quota of requests that can be received before rate limiting is applied. +pub const BURST_SIZE: u32 = 6; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct TonicPeerIpKeyExtractor; diff --git a/logger/tests/integration_tests.rs b/logger/tests/integration_tests.rs index a4a52f72b..d36dfa2d2 100644 --- a/logger/tests/integration_tests.rs +++ b/logger/tests/integration_tests.rs @@ -38,7 +38,9 @@ mod needs_docker { use super::*; use futures::future::join_all; use pretty_assertions::assert_eq; - use shuttle_logger::rate_limiting::{tonic_error, TonicPeerIpKeyExtractor}; + use shuttle_logger::rate_limiting::{ + tonic_error, TonicPeerIpKeyExtractor, BURST_SIZE, REFRESH_INTERVAL, + }; use tower::ServiceBuilder; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; @@ -302,8 +304,8 @@ mod needs_docker { exec_psql(&format!(r#"CREATE DATABASE "{}";"#, &db_name)); let governor_config = GovernorConfigBuilder::default() - .per_millisecond(500) - .burst_size(6) + .per_millisecond(REFRESH_INTERVAL) + .burst_size(BURST_SIZE) .use_headers() .key_extractor(TonicPeerIpKeyExtractor) .finish() diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 0385cf358..336df66ce 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -382,10 +382,17 @@ pub mod logger { }), }; - // Give the rate limiter time to refresh. + // Give the rate limiter time to refresh, the duration we need to sleep + // for here is determined by the refresh rate of the rate limiter in + // the logger server. It is currently set to refresh one slot per 500ms, + // so we could get away with a duration of 500ms here, but we give it + // some extra time in case this rate limiting was caused by a short + // burst of activity. tokio::time::sleep(Duration::from_millis(1500)).await; if let Err(error) = self + // NOTE: the request to send this rate limiting log to the logger will + // also expend a slot in the logger rate limiter. .store_logs(Request::new(StoreLogsRequest { logs: vec![new_item], })) @@ -434,7 +441,7 @@ pub mod logger { Self { tx } } - /// Create a batcher around inner. It will send a batch of items to inner if a capacity of 2048 is reached + /// Create a batcher around inner. It will send a batch of items to inner if a capacity of 256 is reached /// or if an interval of 1 second is reached. pub fn wrap(inner: I) -> Self { Self::new(inner, 256, Duration::from_secs(1)) From eaa5e23bb273d6ddef9cb970f373888873a930bb Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Mon, 20 Nov 2023 13:36:34 +0100 Subject: [PATCH 14/17] refactor: send deserializable error for log stream rate limit --- cargo-shuttle/src/lib.rs | 8 ++++---- deployer/src/handlers/mod.rs | 14 +++++--------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/cargo-shuttle/src/lib.rs b/cargo-shuttle/src/lib.rs index 328d13e5b..999f46d9e 100644 --- a/cargo-shuttle/src/lib.rs +++ b/cargo-shuttle/src/lib.rs @@ -720,12 +720,12 @@ impl Shuttle { println!("{log_item}") } Err(err) => { - debug!(error = %err, "failed to parse logs"); + debug!(error = %err, "failed to parse message into log item"); - let message = if line.contains("rate limit") { - line + let message = if let Ok(err) = serde_json::from_str::(&line) { + err.to_string() } else { - "failed parsing logs, is your cargo-shuttle outdated?".to_string() + "failed to parse logs, is your cargo-shuttle outdated?".to_string() }; bail!(message); diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index 58bb2e8e4..eb365f265 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -33,7 +33,7 @@ use shuttle_common::{ claims::{Claim, Scope}, models::{ deployment::{DeploymentRequest, CREATE_SERVICE_BODY_LIMIT, GIT_STRINGS_MAX_LENGTH}, - error::axum::CustomErrorPath, + error::{axum::CustomErrorPath, ApiError, ErrorKind}, project::ProjectName, secret, }, @@ -733,14 +733,10 @@ async fn logs_websocket_handler( if error.code() == tonic::Code::Unavailable && error.metadata().get("x-ratelimit-limit").is_some() { - let _ = s - .send(ws::Message::Text( - Error::RateLimited( - "your application is producing too many logs. Interactions with the shuttle logger service will be rate limited for a while" - .to_string(), - ).to_string() - )) - .await; + let message = serde_json::to_string(&ApiError::from(ErrorKind::ServiceUnavailable)) + .expect("to convert error to json"); + + let _ = s.send(ws::Message::Text(message)).await; } else { let _ = s .send(ws::Message::Text("failed to get logs".to_string())) From 6968f8c0aaebc29ce0ebcaae32f1493b43ac98be Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Mon, 20 Nov 2023 15:56:51 +0100 Subject: [PATCH 15/17] feat: add ratelimited apierror variant --- common/src/models/error.rs | 7 +++++++ deployer/src/handlers/mod.rs | 10 ++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/common/src/models/error.rs b/common/src/models/error.rs index 8be5d9d09..ad94f5f54 100644 --- a/common/src/models/error.rs +++ b/common/src/models/error.rs @@ -59,6 +59,7 @@ pub enum ErrorKind { NotReady, ServiceUnavailable, DeleteProjectFailed, + RateLimited(String), } impl From for ApiError { @@ -121,6 +122,12 @@ impl From for ApiError { ErrorKind::Forbidden => (StatusCode::FORBIDDEN, "forbidden"), ErrorKind::NotReady => (StatusCode::INTERNAL_SERVER_ERROR, "service not ready"), ErrorKind::DeleteProjectFailed => (StatusCode::INTERNAL_SERVER_ERROR, "deleting project failed"), + ErrorKind::RateLimited(message) => { + return Self { + message, + status_code: StatusCode::TOO_MANY_REQUESTS.as_u16(), + } + }, }; Self { message: error_message.to_string(), diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index eb365f265..2412c857f 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -674,7 +674,7 @@ pub async fn get_logs( && error.metadata().get("x-ratelimit-limit").is_some() { Err(Error::RateLimited( - "your application is producing too many logs. Interactions with the shuttle logger service will be rate limited for a while" + "your application is producing too many logs. Interactions with the shuttle logger service will be rate limited" .to_string(), )) } else { @@ -733,7 +733,13 @@ async fn logs_websocket_handler( if error.code() == tonic::Code::Unavailable && error.metadata().get("x-ratelimit-limit").is_some() { - let message = serde_json::to_string(&ApiError::from(ErrorKind::ServiceUnavailable)) + let message = serde_json::to_string( + &ApiError::from( + ErrorKind::RateLimited( + "your application is producing too many logs. Interactions with the shuttle logger service will be rate limited" + .to_string() + ) + )) .expect("to convert error to json"); let _ = s.send(ws::Message::Text(message)).await; From b750e6e0c4a21a2ca7b410cf556f94043f59301c Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Tue, 21 Nov 2023 08:36:47 +0100 Subject: [PATCH 16/17] refactor(logger): use downcast_ref in if clause --- logger/src/rate_limiting.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/logger/src/rate_limiting.rs b/logger/src/rate_limiting.rs index 1b56cef75..9696c6f1e 100644 --- a/logger/src/rate_limiting.rs +++ b/logger/src/rate_limiting.rs @@ -38,10 +38,8 @@ impl KeyExtractor for TonicPeerIpKeyExtractor { /// Convert errors from the Governor rate limiter layer to tonic statuses. pub fn tonic_error(e: BoxError) -> tonic::Status { - if e.is::() { - // It shouldn't be possible for this to panic, since we already know it's a GovernorError - let error = e.downcast_ref::().unwrap().to_owned(); - match error { + if let Some(error) = e.downcast_ref::() { + match error.to_owned() { GovernorError::TooManyRequests { wait_time, headers } => { // TODO: after upgrading tonic, use tonic types trait extensions to enrich status, // see example: https://github.com/hyperium/tonic/blob/master/examples/src/richer-error/server.rs. From b5705a9e2f6abc66ac9041b413854cd152f8a87e Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Tue, 21 Nov 2023 08:37:20 +0100 Subject: [PATCH 17/17] docs(logger): remove todo --- logger/src/rate_limiting.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/logger/src/rate_limiting.rs b/logger/src/rate_limiting.rs index 9696c6f1e..d14f3ddca 100644 --- a/logger/src/rate_limiting.rs +++ b/logger/src/rate_limiting.rs @@ -41,10 +41,6 @@ pub fn tonic_error(e: BoxError) -> tonic::Status { if let Some(error) = e.downcast_ref::() { match error.to_owned() { GovernorError::TooManyRequests { wait_time, headers } => { - // TODO: after upgrading tonic, use tonic types trait extensions to enrich status, - // see example: https://github.com/hyperium/tonic/blob/master/examples/src/richer-error/server.rs. - // We can for example add wait time as: https://docs.rs/tonic-types/latest/tonic_types/struct.RetryInfo.html - let mut response = Status::unavailable(format!( "received too many requests, wait for {wait_time}ms" ));