Skip to content

Commit

Permalink
chore(storage): hook recluster avoid scan all segments (#16498)
Browse files Browse the repository at this point in the history
* chore: recluster avoid segments full scan

* recluster the small blocks

* recluster small segments

* fix

* fix test

* Update src/query/service/src/interpreters/hook/compact_hook.rs

Co-authored-by: dantengsky <dantengsky@gmail.com>

* fix test

---------

Co-authored-by: dantengsky <dantengsky@gmail.com>
  • Loading branch information
zhyass and dantengsky authored Sep 25, 2024
1 parent fdbb1af commit e665183
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 131 deletions.
4 changes: 2 additions & 2 deletions src/query/expression/src/utils/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ impl BlockThresholds {
}

#[inline]
pub fn check_for_recluster(&self, total_rows: usize, total_bytes: usize) -> bool {
total_rows <= self.max_rows_per_block && total_bytes <= self.max_bytes_per_block
pub fn check_too_small(&self, row_count: usize, block_size: usize) -> bool {
row_count < self.min_rows_per_block / 2 && block_size < self.max_bytes_per_block / 2
}

#[inline]
Expand Down
102 changes: 53 additions & 49 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,13 @@ async fn do_hook_compact(
block_limit: Some(compaction_num_block_hint as usize),
}
}
_ =>
// for mutations other than Insertions, we use an empirical value of 3 segments as the
// limit for compaction. to be refined later.
{
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
CompactionLimits {
segment_limit: Some(auto_compaction_segments_limit as usize),
block_limit: None,
}
_ => {
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
CompactionLimits {
segment_limit: Some(auto_compaction_segments_limit as usize),
block_limit: None,
}
}
};

let op_name = &trace_ctx.operation_name;
Expand Down Expand Up @@ -141,7 +138,12 @@ async fn compact_table(
&compact_target.table,
)
.await?;
let settings = ctx.get_settings();
// keep the original progress value
let progress_value = ctx.get_write_progress_value();

let do_recluster = !table.cluster_keys(ctx.clone()).is_empty();
let do_compact = compaction_limits.block_limit.is_some() || !do_recluster;

// evict the table from cache
ctx.evict_table_from_cache(
Expand All @@ -150,56 +152,58 @@ async fn compact_table(
&compact_target.table,
)?;

let mut build_res = if do_recluster {
if do_compact {
let compact_block = RelOperator::CompactBlock(OptimizeCompactBlock {
catalog: compact_target.catalog.clone(),
database: compact_target.database.clone(),
table: compact_target.table.clone(),
limit: compaction_limits.clone(),
});
let s_expr = SExpr::create_leaf(Arc::new(compact_block));
let compact_interpreter = OptimizeCompactBlockInterpreter::try_create(
ctx.clone(),
s_expr,
lock_opt.clone(),
false,
)?;
let mut build_res = compact_interpreter.execute2().await?;
// execute the compact pipeline
if build_res.main_pipeline.is_complete_pipeline()? {
build_res.set_max_threads(settings.get_max_threads()? as usize);
let executor_settings = ExecutorSettings::try_create(ctx.clone())?;

let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);

let complete_executor =
PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;

// Clears previously generated segment locations to avoid duplicate data in the refresh phase
ctx.clear_segment_locations()?;
ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;
drop(complete_executor);
}
}

if do_recluster {
let recluster = RelOperator::Recluster(Recluster {
catalog: compact_target.catalog,
database: compact_target.database,
table: compact_target.table,
filters: None,
limit: compaction_limits.segment_limit,
limit: Some(settings.get_auto_compaction_segments_limit()? as usize),
});
let s_expr = SExpr::create_leaf(Arc::new(recluster));
let recluster_interpreter =
ReclusterTableInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?;
recluster_interpreter.execute2().await?
} else {
let compact_block = RelOperator::CompactBlock(OptimizeCompactBlock {
catalog: compact_target.catalog,
database: compact_target.database,
table: compact_target.table,
limit: compaction_limits,
});
let s_expr = SExpr::create_leaf(Arc::new(compact_block));
let compact_interpreter =
OptimizeCompactBlockInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?;
compact_interpreter.execute2().await?
};

if build_res.main_pipeline.is_empty() {
return Ok(());
// Recluster will be done in `ReclusterTableInterpreter::execute2` directly,
// we do not need to use `PipelineCompleteExecutor` to execute it.
let build_res = recluster_interpreter.execute2().await?;
assert!(build_res.main_pipeline.is_empty());
}

// execute the compact pipeline (for table with cluster keys, re-cluster will also be executed)
let settings = ctx.get_settings();
build_res.set_max_threads(settings.get_max_threads()? as usize);
let settings = ExecutorSettings::try_create(ctx.clone())?;

if build_res.main_pipeline.is_complete_pipeline()? {
let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);

let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;

// keep the original progress value
let progress_value = ctx.get_write_progress_value();
// Clears previously generated segment locations to avoid duplicate data in the refresh phase
ctx.clear_segment_locations()?;
ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;
drop(complete_executor);

// reset the progress value
ctx.get_write_progress().set(&progress_value);
}
// reset the progress value
ctx.get_write_progress().set(&progress_value);
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ impl ReclusterTableInterpreter {

let complete_executor =
PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
self.ctx.clear_segment_locations()?;
self.ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;
// make sure the executor is dropped before the next loop.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl PipelineBuilder {

self.ctx.set_enable_sort_spill(false);
let sort_pipeline_builder =
SortPipelineBuilder::create(self.ctx.clone(), schema, Arc::new(sort_descs))
SortPipelineBuilder::create(self.ctx.clone(), schema, Arc::new(sort_descs))?
.with_block_size_hit(sort_block_size)
.remove_order_col_at_last();
sort_pipeline_builder.build_merge_sort_pipeline(&mut self.main_pipeline, false)?;
Expand Down
30 changes: 12 additions & 18 deletions src/query/service/src/pipelines/builders/builder_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ impl PipelineBuilder {
self.main_pipeline.try_resize(max_threads)?;
}

let builder =
SortPipelineBuilder::create(self.ctx.clone(), plan_schema, sort_desc).with_limit(limit);
let builder = SortPipelineBuilder::create(self.ctx.clone(), plan_schema, sort_desc)?
.with_limit(limit);

match after_exchange {
Some(true) => {
Expand Down Expand Up @@ -147,7 +147,7 @@ pub struct SortPipelineBuilder {
schema: DataSchemaRef,
sort_desc: Arc<Vec<SortColumnDescription>>,
limit: Option<usize>,
block_size: Option<usize>,
block_size: usize,
remove_order_col_at_last: bool,
}

Expand All @@ -156,15 +156,16 @@ impl SortPipelineBuilder {
ctx: Arc<QueryContext>,
schema: DataSchemaRef,
sort_desc: Arc<Vec<SortColumnDescription>>,
) -> Self {
Self {
) -> Result<Self> {
let block_size = ctx.get_settings().get_max_block_size()? as usize;
Ok(Self {
ctx,
schema,
sort_desc,
limit: None,
block_size: None,
block_size,
remove_order_col_at_last: false,
}
})
}

pub fn with_limit(mut self, limit: Option<usize>) -> Self {
Expand All @@ -174,7 +175,7 @@ impl SortPipelineBuilder {

// The expected output block size, the actual output block size will be equal to or less than the given value.
pub fn with_block_size_hit(mut self, block_size: usize) -> Self {
self.block_size = Some(block_size);
self.block_size = self.block_size.min(block_size);
self
}

Expand Down Expand Up @@ -259,15 +260,14 @@ impl SortPipelineBuilder {

let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?;
let spilling_batch_bytes = settings.get_sort_spilling_batch_bytes()?;
let block_size = settings.get_max_block_size()? as usize;

pipeline.add_transform(|input, output| {
let builder = TransformSortMergeBuilder::create(
input,
output,
sort_merge_output_schema.clone(),
self.sort_desc.clone(),
block_size,
self.block_size,
)
.with_limit(self.limit)
.with_order_col_generated(order_col_generated)
Expand Down Expand Up @@ -312,20 +312,14 @@ impl SortPipelineBuilder {
pub fn build_multi_merge(self, pipeline: &mut Pipeline) -> Result<()> {
// Multi-pipelines merge sort
let settings = self.ctx.get_settings();

let block_size = match self.block_size {
Some(block_size) => block_size.min(settings.get_max_block_size()? as usize),
None => settings.get_max_block_size()? as usize,
};

let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?;
let max_threads = settings.get_max_threads()? as usize;
if settings.get_enable_parallel_multi_merge_sort()? {
add_k_way_merge_sort(
pipeline,
self.schema.clone(),
max_threads,
block_size,
self.block_size,
self.limit,
self.sort_desc,
self.remove_order_col_at_last,
Expand All @@ -335,7 +329,7 @@ impl SortPipelineBuilder {
try_add_multi_sort_merge(
pipeline,
self.schema.clone(),
block_size,
self.block_size,
self.limit,
self.sort_desc,
self.remove_order_col_at_last,
Expand Down
Loading

0 comments on commit e665183

Please sign in to comment.