From e7ed5657beea60659506b5f9b2c74bbd2788f848 Mon Sep 17 00:00:00 2001 From: Felix Seidel Date: Sat, 18 Jun 2022 17:24:58 +0200 Subject: [PATCH] :recycle: Add ConcurrencyLimitLayer middleware --- Cargo.lock | 14 +++++++------- Cargo.toml | 3 ++- src/app/config.rs | 14 ++++++++++++++ src/app/web/router.rs | 7 +++++-- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9cf6bac..eeed280 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1018,9 +1018,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", "http", @@ -1356,7 +1356,7 @@ dependencies = [ "tokio", "tokio-util 0.7.0", "tower", - "tower-http 0.3.3", + "tower-http 0.3.4", "tracing", ] @@ -2753,14 +2753,13 @@ dependencies = [ "pin-project-lite", "tower-layer", "tower-service", - "tracing", ] [[package]] name = "tower-http" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d342c6d58709c0a6d48d48dabbb62d4ef955cf5f0f3bbfd845838e7ae88dbae" +checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" dependencies = [ "base64", "bitflags", @@ -3153,7 +3152,8 @@ dependencies = [ "serde_json", "serde_yaml", "tokio", - "tower-http 0.2.3", + "tower", + "tower-http 0.3.4", "tracing", "tracing-opentelemetry", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 474b8cd..7456de9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,8 @@ async-trait = "0.1.53" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" serde_yaml = "0.8.24" -tower-http = { version = "0.2.3", features = ["trace", "set-header"] } +tower = { version= "0.4.12", features = ["limit"] } +tower-http = { version = "0.3.4", features = ["trace", "set-header"] } clap = { version = "3.1.0", features = ["derive", "env"] } chrono = "0.4.19" diff --git a/src/app/config.rs b/src/app/config.rs index 3740b3a..b47db1e 100644 --- a/src/app/config.rs +++ b/src/app/config.rs @@ -58,6 +58,9 @@ pub struct Config { #[clap(long = "distributed-wait-duration", default_value_t = 300)] pub distributed_wait_duration: u16, + + #[clap(long = "num-concurrent-requests")] + pub num_concurrent_requests: Option, } impl Config { @@ -86,6 +89,17 @@ impl Config { } Ok(None) } + + pub fn num_concurrent_requests(&self) -> u16 { + if let Some(n) = self.num_concurrent_requests { + n + } else { + match self.mode() { + Mode::Local => 5, + Mode::Distributed => 100, + } + } + } } #[derive(Clone, Debug)] diff --git a/src/app/web/router.rs b/src/app/web/router.rs index a80a1a9..2828e99 100644 --- a/src/app/web/router.rs +++ b/src/app/web/router.rs @@ -3,18 +3,21 @@ use crate::app::web::handler; use axum::http::{header, HeaderValue}; use axum::routing::{get, post}; use axum::{AddExtensionLayer, Router}; +use tower::limit::concurrency::ConcurrencyLimitLayer; use tower_http::set_header::SetRequestHeaderLayer; use tower_http::trace::TraceLayer; pub fn routes(deps: DynDependencyProvider) -> axum::Router { + let num_concurrent_requests = deps.get_config().num_concurrent_requests(); let template_execute = post(handler::execute_template) - .layer(TraceLayer::new_for_http()) // TODO remove SetRequestHeaderLayer once Argo sends correct Content-Type header + .layer(AddExtensionLayer::new(deps)) .layer(SetRequestHeaderLayer::if_not_present( header::CONTENT_TYPE, HeaderValue::from_static("application/json"), )) - .layer(AddExtensionLayer::new(deps)); + .layer(ConcurrencyLimitLayer::new(num_concurrent_requests.into())) + .layer(TraceLayer::new_for_http()); Router::new() .route("/healthz", get(|| async { "ok\n" }))