From 8444607797414fb6f02a6647eaca9433d49c0b81 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 30 Mar 2023 16:48:16 +0300 Subject: [PATCH 1/3] feat(cubestore): Allow to configure max_message_size/max_frame_size for WS connection --- rust/cubestore/cubestore/src/config/mod.rs | 19 +++++++++++++++++++ rust/cubestore/cubestore/src/http/mod.rs | 12 ++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 0d942d27d16ff..ae2860231d031 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -446,6 +446,9 @@ pub trait ConfigObj: DIService { fn max_disk_space_per_worker(&self) -> u64; fn disk_space_cache_duration_secs(&self) -> u64; + + fn transport_max_message_size(&self) -> usize; + fn transport_max_frame_size(&self) -> usize; } #[derive(Debug, Clone)] @@ -508,6 +511,8 @@ pub struct ConfigObjImpl { pub max_disk_space: u64, pub max_disk_space_per_worker: u64, pub disk_space_cache_duration_secs: u64, + pub transport_max_message_size: usize, + pub transport_max_frame_size: usize, } crate::di_service!(ConfigObjImpl, [ConfigObj]); @@ -731,6 +736,14 @@ impl ConfigObj for ConfigObjImpl { fn disk_space_cache_duration_secs(&self) -> u64 { self.disk_space_cache_duration_secs } + + fn transport_max_message_size(&self) -> usize { + self.transport_max_message_size + } + + fn transport_max_frame_size(&self) -> usize { + self.transport_max_frame_size + } } lazy_static! { @@ -953,6 +966,8 @@ impl Config { * 1024 * 1024, disk_space_cache_duration_secs: 300, + transport_max_message_size: env_parse("TRANSPORT_MAX_MESSAGE_SIZE", 64 << 20), + transport_max_frame_size: env_parse("TRANSPORT_MAX_FRAME_SIZE", 16 << 20), }), } } @@ -1027,6 +1042,8 @@ impl Config { max_disk_space: 0, max_disk_space_per_worker: 0, disk_space_cache_duration_secs: 0, + transport_max_message_size: 64 << 20, + transport_max_frame_size: 16 << 20, }), } } @@ -1596,6 +1613,8 @@ impl Config { Duration::from_secs(config.check_ws_orphaned_messages_interval_secs()), Duration::from_secs(config.drop_ws_processing_messages_after_secs()), Duration::from_secs(config.drop_ws_complete_messages_after_secs()), + config.transport_max_message_size(), + config.transport_max_frame_size(), ) }) .await; diff --git a/rust/cubestore/cubestore/src/http/mod.rs b/rust/cubestore/cubestore/src/http/mod.rs index 054d58246ceaf..5fb4d9f74f934 100644 --- a/rust/cubestore/cubestore/src/http/mod.rs +++ b/rust/cubestore/cubestore/src/http/mod.rs @@ -50,6 +50,8 @@ pub struct HttpServer { worker_loop: WorkerLoop, drop_orphaned_messages_loop: WorkerLoop, cancel_token: CancellationToken, + max_message_size: usize, + max_frame_size: usize, } crate::di_service!(HttpServer, []); @@ -81,6 +83,8 @@ impl HttpServer { check_orphaned_messages_interval: Duration, drop_processing_messages_after: Duration, drop_complete_messages_after: Duration, + max_message_size: usize, + max_frame_size: usize, ) -> Arc { Arc::new(Self { bind_address, @@ -89,6 +93,8 @@ impl HttpServer { check_orphaned_messages_interval, drop_processing_messages_after, drop_complete_messages_after, + max_message_size, + max_frame_size, worker_loop: WorkerLoop::new("HttpServer message processing"), drop_orphaned_messages_loop: WorkerLoop::new("HttpServer drop orphaned messages"), cancel_token: CancellationToken::new(), @@ -121,14 +127,16 @@ impl HttpServer { let context_filter = tx_to_move_filter.and(auth_filter.clone()); let context_filter_to_move = context_filter.clone(); + let max_frame_size = self.max_frame_size.clone(); + let max_message_size = self.max_message_size.clone(); let query_route = warp::path!("ws") .and(context_filter_to_move) .and(warp::ws::ws()) - .and_then(|tx: mpsc::Sender<(mpsc::Sender>, SqlQueryContext, HttpMessage)>, sql_query_context: SqlQueryContext, ws: Ws| async move { + .and_then(move |tx: mpsc::Sender<(mpsc::Sender>, SqlQueryContext, HttpMessage)>, sql_query_context: SqlQueryContext, ws: Ws| async move { let tx_to_move = tx.clone(); let sql_query_context = sql_query_context.clone(); - Result::<_, Rejection>::Ok(ws.on_upgrade(async move |mut web_socket| { + Result::<_, Rejection>::Ok(ws.max_frame_size(max_frame_size).max_message_size(max_message_size).on_upgrade(async move |mut web_socket| { let (response_tx, mut response_rx) = mpsc::channel::>(10000); loop { tokio::select! { From 700308334332360b000c6cda3d15342ea16ee54e Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 30 Mar 2023 16:52:02 +0300 Subject: [PATCH 2/3] chore: pass test --- rust/cubestore/cubestore/src/http/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rust/cubestore/cubestore/src/http/mod.rs b/rust/cubestore/cubestore/src/http/mod.rs index 5fb4d9f74f934..61429ae177974 100644 --- a/rust/cubestore/cubestore/src/http/mod.rs +++ b/rust/cubestore/cubestore/src/http/mod.rs @@ -1033,6 +1033,8 @@ mod tests { Duration::from_millis(100), Duration::from_millis(10000), Duration::from_millis(1000), + 64 << 20, + 16 << 20, )); { let http_server = http_server.clone(); From 6df1e5eb3ad2826b6559b0b9a55852fad19d74d1 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 30 Mar 2023 17:02:11 +0300 Subject: [PATCH 3/3] chore: use test config in test --- rust/cubestore/cubestore/src/http/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rust/cubestore/cubestore/src/http/mod.rs b/rust/cubestore/cubestore/src/http/mod.rs index 61429ae177974..dcd21472f591b 100644 --- a/rust/cubestore/cubestore/src/http/mod.rs +++ b/rust/cubestore/cubestore/src/http/mod.rs @@ -826,7 +826,7 @@ impl HttpMessage { #[cfg(test)] mod tests { use crate::codegen::{HttpMessageArgs, HttpQuery, HttpQueryArgs, HttpTable, HttpTableArgs}; - use crate::config::init_test_logger; + use crate::config::{init_test_logger, Config}; use crate::http::{HttpCommand, HttpMessage, HttpServer}; use crate::metastore::{Column, ColumnType}; use crate::mysql::MockSqlAuthService; @@ -1026,6 +1026,9 @@ mod tests { }; let mut auth = MockSqlAuthService::new(); auth.expect_authenticate().return_const(Ok(None)); + + let config = Config::test("ws_test").config_obj(); + let http_server = Arc::new(HttpServer::new( "127.0.0.1:53031".to_string(), Arc::new(auth), @@ -1033,8 +1036,8 @@ mod tests { Duration::from_millis(100), Duration::from_millis(10000), Duration::from_millis(1000), - 64 << 20, - 16 << 20, + config.transport_max_message_size(), + config.transport_max_frame_size(), )); { let http_server = http_server.clone();