diff --git a/src/query/service/src/servers/http/middleware.rs b/src/query/service/src/servers/http/middleware.rs index a9e4ecdce9c9..43e2ac3047cf 100644 --- a/src/query/service/src/servers/http/middleware.rs +++ b/src/query/service/src/servers/http/middleware.rs @@ -27,8 +27,8 @@ use poem::Addr; use poem::Endpoint; use poem::Middleware; use poem::Request; +use tracing::error; use tracing::info; -use tracing::warn; use super::v1::HttpQueryContext; use crate::auth::AuthMgr; @@ -161,7 +161,7 @@ impl Endpoint for HTTPSessionEndpoint { )), }; if let Err(ref err) = res { - warn!("http request error: {}", err); + error!("http request error: {}", err); }; res } diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index ed0252422076..198b5fb73393 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -137,11 +137,17 @@ impl FuseTable { // Adjust the max io request. fn adjust_max_io_requests( ctx: Arc, - _plan: &ReadDataSourcePlan, + plan: &ReadDataSourcePlan, ) -> Result { - // TODO(bohu): change to max_storage_io_requests when pipeline resize is ok. - let max_threads = ctx.get_settings().get_max_threads()? as usize; - Ok(std::cmp::max(1, max_threads)) + let parts_len = plan.parts.len(); + let max_storage_io = ctx.get_settings().get_max_storage_io_requests()? as usize; + let max_io_requests = if parts_len > max_storage_io { + max_storage_io + } else { + parts_len + }; + + Ok(std::cmp::max(1, max_io_requests)) } #[inline] @@ -214,7 +220,16 @@ impl FuseTable { ) }, max_io_requests, - ) + )?; + + // Resize pipeline to max threads. + let max_threads = ctx.get_settings().get_max_threads()? as usize; + let resize_to = std::cmp::min(max_threads, max_io_requests); + info!( + "read block pipeline resize from:{} to:{}", + max_io_requests, resize_to + ); + pipeline.resize(resize_to) } }