Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refine the fuse read codes #8302

Merged
merged 1 commit into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod gc;
mod mutation;
mod navigate;
mod operation_log;
mod read;
mod read_data;
mod read_partitions;
mod recluster;
mod truncate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ use common_pipeline_core::processors::Processor;
use common_pipeline_core::Pipeline;
use common_pipeline_core::SourcePipeBuilder;
use common_pipeline_transforms::processors::ExpressionExecutor;
use tracing::info;

use crate::fuse_lazy_part::FuseLazyPartInfo;
use crate::io::BlockReader;
use crate::operations::read::State::Generated;
use crate::operations::read_data::State::Generated;
use crate::FuseTable;

impl FuseTable {
Expand All @@ -63,14 +64,87 @@ impl FuseTable {
}
}

pub fn prewhere_of_push_downs(&self, push_downs: &Option<Extras>) -> Option<PrewhereInfo> {
fn prewhere_of_push_downs(&self, push_downs: &Option<Extras>) -> Option<PrewhereInfo> {
if let Some(Extras { prewhere, .. }) = push_downs {
prewhere.clone()
} else {
None
}
}

// Build the block reader.
fn build_block_reader(&self, plan: &ReadDataSourcePlan) -> Result<Arc<BlockReader>> {
match self.prewhere_of_push_downs(&plan.push_downs) {
None => {
let projection = self.projection_of_push_downs(&plan.push_downs);
self.create_block_reader(projection)
}
Some(v) => self.create_block_reader(v.output_columns),
}
}

// Build the prewhere reader.
fn build_prewhere_reader(&self, plan: &ReadDataSourcePlan) -> Result<Arc<BlockReader>> {
match self.prewhere_of_push_downs(&plan.push_downs) {
None => {
let projection = self.projection_of_push_downs(&plan.push_downs);
self.create_block_reader(projection)
}
Some(v) => self.create_block_reader(v.prewhere_columns),
}
}

// Build the prewhere filter executor.
fn build_prewhere_filter_executor(
&self,
ctx: Arc<dyn TableContext>,
plan: &ReadDataSourcePlan,
) -> Result<Arc<Option<ExpressionExecutor>>> {
Ok(match self.prewhere_of_push_downs(&plan.push_downs) {
None => Arc::new(None),
Some(v) => {
let table_schema = self.table_info.schema();
let prewhere_schema = Arc::new(v.prewhere_columns.project_schema(&table_schema));
let expr_field = v.filter.to_data_field(&prewhere_schema)?;
let expr_schema = DataSchemaRefExt::create(vec![expr_field]);

let executor = ExpressionExecutor::try_create(
ctx,
"filter expression executor (prewhere) ",
prewhere_schema,
expr_schema,
vec![v.filter],
false,
)?;
Arc::new(Some(executor))
}
})
}

// Build the remain reader.
fn build_remain_reader(&self, plan: &ReadDataSourcePlan) -> Result<Arc<Option<BlockReader>>> {
Ok(match self.prewhere_of_push_downs(&plan.push_downs) {
None => Arc::new(None),
Some(v) => {
if v.remain_columns.is_empty() {
Arc::new(None)
} else {
Arc::new(Some((*self.create_block_reader(v.remain_columns)?).clone()))
}
}
})
}

// Adjust the max io request.
fn adjust_max_io_requests(
ctx: Arc<dyn TableContext>,
_plan: &ReadDataSourcePlan,
) -> Result<usize> {
// TODO(bohu): change to max_storage_io_requests when pipeline resize is ok.
let max_threads = ctx.get_settings().get_max_threads()? as usize;
Ok(std::cmp::max(1, max_threads))
}

#[inline]
pub fn do_read_data(
&self,
Expand Down Expand Up @@ -120,66 +194,29 @@ impl FuseTable {
});
}

let table_schema = self.table_info.schema();
let projection = self.projection_of_push_downs(&plan.push_downs);
let output_reader = self.create_block_reader(projection)?; // for deserialize output blocks

let (output_reader, prewhere_reader, prewhere_filter, remain_reader) =
if let Some(prewhere) = self.prewhere_of_push_downs(&plan.push_downs) {
let prewhere_schema = prewhere.prewhere_columns.project_schema(&table_schema);
let prewhere_schema = Arc::new(prewhere_schema);
let expr_field = prewhere.filter.to_data_field(&prewhere_schema)?;
let expr_schema = DataSchemaRefExt::create(vec![expr_field]);

let executor = ExpressionExecutor::try_create(
ctx.clone(),
"filter expression executor (prewhere) ",
prewhere_schema,
expr_schema,
vec![prewhere.filter.clone()],
false,
)?;
let output_reader = self.create_block_reader(prewhere.output_columns.clone())?;
let prewhere_reader =
self.create_block_reader(prewhere.prewhere_columns.clone())?;
let remain_reader = if prewhere.remain_columns.is_empty() {
None
} else {
Some((*self.create_block_reader(prewhere.remain_columns)?).clone())
};

(
output_reader,
prewhere_reader,
Some(executor),
remain_reader,
)
} else {
(output_reader.clone(), output_reader, None, None)
};

let prewhere_filter = Arc::new(prewhere_filter);
let remain_reader = Arc::new(remain_reader);
let block_reader = self.build_block_reader(plan)?;
let prewhere_reader = self.build_prewhere_reader(plan)?;
let prewhere_filter = self.build_prewhere_filter_executor(ctx.clone(), plan)?;
let remain_reader = self.build_remain_reader(plan)?;

let max_threads = ctx.get_settings().get_max_threads()? as usize;
let max_io_requests = Self::adjust_max_io_requests(ctx.clone(), plan)?;
info!("read block data adjust max io requests:{}", max_io_requests);

let mut source_builder = SourcePipeBuilder::create();

for _index in 0..std::cmp::max(1, max_threads) {
for _index in 0..max_io_requests {
let output = OutputPort::create();
source_builder.add_source(
output.clone(),
FuseTableSource::create(
ctx.clone(),
output,
output_reader.clone(),
block_reader.clone(),
prewhere_reader.clone(),
prewhere_filter.clone(),
remain_reader.clone(),
)?,
);
}

pipeline.add_pipe(source_builder.finalize());
Ok(())
}
Expand Down
8 changes: 5 additions & 3 deletions src/query/storages/fuse/src/operations/read_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_legacy_planners::Statistics;
use common_meta_app::schema::TableInfo;
use opendal::Operator;
use tracing::debug;
use tracing::info;

use crate::fuse_lazy_part::FuseLazyPartInfo;
use crate::fuse_part::ColumnLeaves;
Expand All @@ -39,7 +40,7 @@ use crate::pruning::BlockPruner;
use crate::FuseTable;

impl FuseTable {
#[inline]
#[tracing::instrument(level = "debug", name = "do_read_partitions", skip_all, fields(ctx.id = ctx.get_id().as_str()))]
pub async fn do_read_partitions(
&self,
ctx: Arc<dyn TableContext>,
Expand Down Expand Up @@ -91,6 +92,7 @@ impl FuseTable {
}
}

#[tracing::instrument(level = "debug", name = "prune_snapshot_blocks", skip_all, fields(ctx.id = ctx.get_id().as_str()))]
pub async fn prune_snapshot_blocks(
ctx: Arc<dyn TableContext>,
dal: Operator,
Expand All @@ -100,7 +102,7 @@ impl FuseTable {
summary: usize,
) -> Result<(Statistics, Partitions)> {
let start = Instant::now();
debug!(
info!(
"prune snapshot block start, segment numbers:{}",
segments_location.len()
);
Expand All @@ -117,7 +119,7 @@ impl FuseTable {
.map(|(_, v)| v)
.collect::<Vec<_>>();

debug!(
info!(
"prune snapshot block end, final block numbers:{}, cost:{}",
block_metas.len(),
start.elapsed().as_secs()
Expand Down