Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cubestore): Allow to configure max_message_size/max_frame_size f… #6369

Merged
merged 3 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,9 @@ pub trait ConfigObj: DIService {
fn max_disk_space_per_worker(&self) -> u64;

fn disk_space_cache_duration_secs(&self) -> u64;

fn transport_max_message_size(&self) -> usize;
fn transport_max_frame_size(&self) -> usize;
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -508,6 +511,8 @@ pub struct ConfigObjImpl {
pub max_disk_space: u64,
pub max_disk_space_per_worker: u64,
pub disk_space_cache_duration_secs: u64,
pub transport_max_message_size: usize,
pub transport_max_frame_size: usize,
}

crate::di_service!(ConfigObjImpl, [ConfigObj]);
Expand Down Expand Up @@ -731,6 +736,14 @@ impl ConfigObj for ConfigObjImpl {
fn disk_space_cache_duration_secs(&self) -> u64 {
self.disk_space_cache_duration_secs
}

fn transport_max_message_size(&self) -> usize {
self.transport_max_message_size
}

fn transport_max_frame_size(&self) -> usize {
self.transport_max_frame_size
}
}

lazy_static! {
Expand Down Expand Up @@ -953,6 +966,8 @@ impl Config {
* 1024
* 1024,
disk_space_cache_duration_secs: 300,
transport_max_message_size: env_parse("TRANSPORT_MAX_MESSAGE_SIZE", 64 << 20),
transport_max_frame_size: env_parse("TRANSPORT_MAX_FRAME_SIZE", 16 << 20),
}),
}
}
Expand Down Expand Up @@ -1027,6 +1042,8 @@ impl Config {
max_disk_space: 0,
max_disk_space_per_worker: 0,
disk_space_cache_duration_secs: 0,
transport_max_message_size: 64 << 20,
transport_max_frame_size: 16 << 20,
}),
}
}
Expand Down Expand Up @@ -1596,6 +1613,8 @@ impl Config {
Duration::from_secs(config.check_ws_orphaned_messages_interval_secs()),
Duration::from_secs(config.drop_ws_processing_messages_after_secs()),
Duration::from_secs(config.drop_ws_complete_messages_after_secs()),
config.transport_max_message_size(),
config.transport_max_frame_size(),
)
})
.await;
Expand Down
19 changes: 16 additions & 3 deletions rust/cubestore/cubestore/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub struct HttpServer {
worker_loop: WorkerLoop,
drop_orphaned_messages_loop: WorkerLoop,
cancel_token: CancellationToken,
max_message_size: usize,
max_frame_size: usize,
}

crate::di_service!(HttpServer, []);
Expand Down Expand Up @@ -81,6 +83,8 @@ impl HttpServer {
check_orphaned_messages_interval: Duration,
drop_processing_messages_after: Duration,
drop_complete_messages_after: Duration,
max_message_size: usize,
max_frame_size: usize,
) -> Arc<Self> {
Arc::new(Self {
bind_address,
Expand All @@ -89,6 +93,8 @@ impl HttpServer {
check_orphaned_messages_interval,
drop_processing_messages_after,
drop_complete_messages_after,
max_message_size,
max_frame_size,
worker_loop: WorkerLoop::new("HttpServer message processing"),
drop_orphaned_messages_loop: WorkerLoop::new("HttpServer drop orphaned messages"),
cancel_token: CancellationToken::new(),
Expand Down Expand Up @@ -121,14 +127,16 @@ impl HttpServer {
let context_filter = tx_to_move_filter.and(auth_filter.clone());

let context_filter_to_move = context_filter.clone();
let max_frame_size = self.max_frame_size.clone();
let max_message_size = self.max_message_size.clone();

let query_route = warp::path!("ws")
.and(context_filter_to_move)
.and(warp::ws::ws())
.and_then(|tx: mpsc::Sender<(mpsc::Sender<Arc<HttpMessage>>, SqlQueryContext, HttpMessage)>, sql_query_context: SqlQueryContext, ws: Ws| async move {
.and_then(move |tx: mpsc::Sender<(mpsc::Sender<Arc<HttpMessage>>, SqlQueryContext, HttpMessage)>, sql_query_context: SqlQueryContext, ws: Ws| async move {
let tx_to_move = tx.clone();
let sql_query_context = sql_query_context.clone();
Result::<_, Rejection>::Ok(ws.on_upgrade(async move |mut web_socket| {
Result::<_, Rejection>::Ok(ws.max_frame_size(max_frame_size).max_message_size(max_message_size).on_upgrade(async move |mut web_socket| {
let (response_tx, mut response_rx) = mpsc::channel::<Arc<HttpMessage>>(10000);
loop {
tokio::select! {
Expand Down Expand Up @@ -818,7 +826,7 @@ impl HttpMessage {
#[cfg(test)]
mod tests {
use crate::codegen::{HttpMessageArgs, HttpQuery, HttpQueryArgs, HttpTable, HttpTableArgs};
use crate::config::init_test_logger;
use crate::config::{init_test_logger, Config};
use crate::http::{HttpCommand, HttpMessage, HttpServer};
use crate::metastore::{Column, ColumnType};
use crate::mysql::MockSqlAuthService;
Expand Down Expand Up @@ -1018,13 +1026,18 @@ mod tests {
};
let mut auth = MockSqlAuthService::new();
auth.expect_authenticate().return_const(Ok(None));

let config = Config::test("ws_test").config_obj();

let http_server = Arc::new(HttpServer::new(
"127.0.0.1:53031".to_string(),
Arc::new(auth),
Arc::new(sql_service),
Duration::from_millis(100),
Duration::from_millis(10000),
Duration::from_millis(1000),
config.transport_max_message_size(),
config.transport_max_frame_size(),
));
{
let http_server = http_server.clone();
Expand Down