diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index 0bc854accd3b..5e3e1e878b3d 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -21,7 +21,7 @@ mod gc; mod mutation; mod navigate; mod operation_log; -mod read; +mod read_data; mod read_partitions; mod recluster; mod truncate; diff --git a/src/query/storages/fuse/src/operations/read.rs b/src/query/storages/fuse/src/operations/read_data.rs similarity index 82% rename from src/query/storages/fuse/src/operations/read.rs rename to src/query/storages/fuse/src/operations/read_data.rs index 76b9f4c18d57..e4994258a0cd 100644 --- a/src/query/storages/fuse/src/operations/read.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -36,10 +36,11 @@ use common_pipeline_core::processors::Processor; use common_pipeline_core::Pipeline; use common_pipeline_core::SourcePipeBuilder; use common_pipeline_transforms::processors::ExpressionExecutor; +use tracing::info; use crate::fuse_lazy_part::FuseLazyPartInfo; use crate::io::BlockReader; -use crate::operations::read::State::Generated; +use crate::operations::read_data::State::Generated; use crate::FuseTable; impl FuseTable { @@ -63,7 +64,7 @@ impl FuseTable { } } - pub fn prewhere_of_push_downs(&self, push_downs: &Option) -> Option { + fn prewhere_of_push_downs(&self, push_downs: &Option) -> Option { if let Some(Extras { prewhere, .. }) = push_downs { prewhere.clone() } else { @@ -71,6 +72,79 @@ impl FuseTable { } } + // Build the block reader. + fn build_block_reader(&self, plan: &ReadDataSourcePlan) -> Result> { + match self.prewhere_of_push_downs(&plan.push_downs) { + None => { + let projection = self.projection_of_push_downs(&plan.push_downs); + self.create_block_reader(projection) + } + Some(v) => self.create_block_reader(v.output_columns), + } + } + + // Build the prewhere reader. + fn build_prewhere_reader(&self, plan: &ReadDataSourcePlan) -> Result> { + match self.prewhere_of_push_downs(&plan.push_downs) { + None => { + let projection = self.projection_of_push_downs(&plan.push_downs); + self.create_block_reader(projection) + } + Some(v) => self.create_block_reader(v.prewhere_columns), + } + } + + // Build the prewhere filter executor. + fn build_prewhere_filter_executor( + &self, + ctx: Arc, + plan: &ReadDataSourcePlan, + ) -> Result>> { + Ok(match self.prewhere_of_push_downs(&plan.push_downs) { + None => Arc::new(None), + Some(v) => { + let table_schema = self.table_info.schema(); + let prewhere_schema = Arc::new(v.prewhere_columns.project_schema(&table_schema)); + let expr_field = v.filter.to_data_field(&prewhere_schema)?; + let expr_schema = DataSchemaRefExt::create(vec![expr_field]); + + let executor = ExpressionExecutor::try_create( + ctx, + "filter expression executor (prewhere) ", + prewhere_schema, + expr_schema, + vec![v.filter], + false, + )?; + Arc::new(Some(executor)) + } + }) + } + + // Build the remain reader. + fn build_remain_reader(&self, plan: &ReadDataSourcePlan) -> Result>> { + Ok(match self.prewhere_of_push_downs(&plan.push_downs) { + None => Arc::new(None), + Some(v) => { + if v.remain_columns.is_empty() { + Arc::new(None) + } else { + Arc::new(Some((*self.create_block_reader(v.remain_columns)?).clone())) + } + } + }) + } + + // Adjust the max io request. + fn adjust_max_io_requests( + ctx: Arc, + _plan: &ReadDataSourcePlan, + ) -> Result { + // TODO(bohu): change to max_storage_io_requests when pipeline resize is ok. + let max_threads = ctx.get_settings().get_max_threads()? as usize; + Ok(std::cmp::max(1, max_threads)) + } + #[inline] pub fn do_read_data( &self, @@ -120,66 +194,29 @@ impl FuseTable { }); } - let table_schema = self.table_info.schema(); - let projection = self.projection_of_push_downs(&plan.push_downs); - let output_reader = self.create_block_reader(projection)?; // for deserialize output blocks - - let (output_reader, prewhere_reader, prewhere_filter, remain_reader) = - if let Some(prewhere) = self.prewhere_of_push_downs(&plan.push_downs) { - let prewhere_schema = prewhere.prewhere_columns.project_schema(&table_schema); - let prewhere_schema = Arc::new(prewhere_schema); - let expr_field = prewhere.filter.to_data_field(&prewhere_schema)?; - let expr_schema = DataSchemaRefExt::create(vec![expr_field]); - - let executor = ExpressionExecutor::try_create( - ctx.clone(), - "filter expression executor (prewhere) ", - prewhere_schema, - expr_schema, - vec![prewhere.filter.clone()], - false, - )?; - let output_reader = self.create_block_reader(prewhere.output_columns.clone())?; - let prewhere_reader = - self.create_block_reader(prewhere.prewhere_columns.clone())?; - let remain_reader = if prewhere.remain_columns.is_empty() { - None - } else { - Some((*self.create_block_reader(prewhere.remain_columns)?).clone()) - }; - - ( - output_reader, - prewhere_reader, - Some(executor), - remain_reader, - ) - } else { - (output_reader.clone(), output_reader, None, None) - }; - - let prewhere_filter = Arc::new(prewhere_filter); - let remain_reader = Arc::new(remain_reader); + let block_reader = self.build_block_reader(plan)?; + let prewhere_reader = self.build_prewhere_reader(plan)?; + let prewhere_filter = self.build_prewhere_filter_executor(ctx.clone(), plan)?; + let remain_reader = self.build_remain_reader(plan)?; - let max_threads = ctx.get_settings().get_max_threads()? as usize; + let max_io_requests = Self::adjust_max_io_requests(ctx.clone(), plan)?; + info!("read block data adjust max io requests:{}", max_io_requests); let mut source_builder = SourcePipeBuilder::create(); - - for _index in 0..std::cmp::max(1, max_threads) { + for _index in 0..max_io_requests { let output = OutputPort::create(); source_builder.add_source( output.clone(), FuseTableSource::create( ctx.clone(), output, - output_reader.clone(), + block_reader.clone(), prewhere_reader.clone(), prewhere_filter.clone(), remain_reader.clone(), )?, ); } - pipeline.add_pipe(source_builder.finalize()); Ok(()) } diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 1b68cdc1fd37..08037877177a 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -30,6 +30,7 @@ use common_legacy_planners::Statistics; use common_meta_app::schema::TableInfo; use opendal::Operator; use tracing::debug; +use tracing::info; use crate::fuse_lazy_part::FuseLazyPartInfo; use crate::fuse_part::ColumnLeaves; @@ -39,7 +40,7 @@ use crate::pruning::BlockPruner; use crate::FuseTable; impl FuseTable { - #[inline] + #[tracing::instrument(level = "debug", name = "do_read_partitions", skip_all, fields(ctx.id = ctx.get_id().as_str()))] pub async fn do_read_partitions( &self, ctx: Arc, @@ -91,6 +92,7 @@ impl FuseTable { } } + #[tracing::instrument(level = "debug", name = "prune_snapshot_blocks", skip_all, fields(ctx.id = ctx.get_id().as_str()))] pub async fn prune_snapshot_blocks( ctx: Arc, dal: Operator, @@ -100,7 +102,7 @@ impl FuseTable { summary: usize, ) -> Result<(Statistics, Partitions)> { let start = Instant::now(); - debug!( + info!( "prune snapshot block start, segment numbers:{}", segments_location.len() ); @@ -117,7 +119,7 @@ impl FuseTable { .map(|(_, v)| v) .collect::>(); - debug!( + info!( "prune snapshot block end, final block numbers:{}, cost:{}", block_metas.len(), start.elapsed().as_secs()