diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 668f1d98dc9c7..39a1cfdf08fb4 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -69,6 +69,8 @@ pub trait TableContext: Send + Sync { fn try_get_part(&self) -> Option; // Update the context partition pool from the pipeline builder. fn try_set_partitions(&self, partitions: Partitions) -> Result<()>; + // Get the partition queue size. + fn get_partition_queue_size(&self) -> usize; fn attach_query_str(&self, kind: String, query: &str); fn get_fragment_id(&self) -> usize; fn get_catalog(&self, catalog_name: &str) -> Result>; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 831e423db01f0..b71046b860d79 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -243,6 +243,11 @@ impl TableContext for QueryContext { } Ok(()) } + + fn get_partition_queue_size(&self) -> usize { + self.partition_queue.read().len() + } + fn attach_query_str(&self, kind: String, query: &str) { self.shared.attach_query_str(kind, query); } diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 198b5fb73393f..9480dadd4e1c0 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -135,16 +135,13 @@ impl FuseTable { } // Adjust the max io request. - fn adjust_max_io_requests( - ctx: Arc, - plan: &ReadDataSourcePlan, - ) -> Result { - let parts_len = plan.parts.len(); + fn adjust_max_io_requests(ctx: Arc) -> Result { + let parts_size = ctx.get_partition_queue_size(); let max_storage_io = ctx.get_settings().get_max_storage_io_requests()? as usize; - let max_io_requests = if parts_len > max_storage_io { + let max_io_requests = if parts_size > max_storage_io { max_storage_io } else { - parts_len + parts_size }; Ok(std::cmp::max(1, max_io_requests)) @@ -204,7 +201,7 @@ impl FuseTable { let prewhere_filter = self.build_prewhere_filter_executor(ctx.clone(), plan)?; let remain_reader = self.build_remain_reader(plan)?; - let max_io_requests = Self::adjust_max_io_requests(ctx.clone(), plan)?; + let max_io_requests = Self::adjust_max_io_requests(ctx.clone())?; info!("read block data adjust max io requests:{}", max_io_requests); // Add source pipe.