From 39d5f2ef3d5decde1ec083cd1865b9a9df2f965a Mon Sep 17 00:00:00 2001 From: zhyass <34016424+zhyass@users.noreply.github.com> Date: Mon, 27 Jun 2022 17:18:51 +0800 Subject: [PATCH 1/6] table compact --- query/src/storages/fuse/operations/append.rs | 2 +- query/src/storages/fuse/operations/merge.rs | 149 ++++++++++++++++++ query/src/storages/fuse/operations/mod.rs | 1 + query/src/storages/storage_table_read_plan.rs | 2 +- 4 files changed, 152 insertions(+), 2 deletions(-) create mode 100644 query/src/storages/fuse/operations/merge.rs diff --git a/query/src/storages/fuse/operations/append.rs b/query/src/storages/fuse/operations/append.rs index 8bb15ce100f8..b156f86ac50d 100644 --- a/query/src/storages/fuse/operations/append.rs +++ b/query/src/storages/fuse/operations/append.rs @@ -223,7 +223,7 @@ impl FuseTable { Ok(()) } - fn get_option(&self, opt_key: &str, default: T) -> T { + pub fn get_option(&self, opt_key: &str, default: T) -> T { self.table_info .options() .get(opt_key) diff --git a/query/src/storages/fuse/operations/merge.rs b/query/src/storages/fuse/operations/merge.rs new file mode 100644 index 000000000000..e3dc298f041e --- /dev/null +++ b/query/src/storages/fuse/operations/merge.rs @@ -0,0 +1,149 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_exception::Result; +use common_planners::Partitions; +use common_planners::ReadDataSourcePlan; +use common_planners::SourceInfo; +use common_planners::Statistics; + +use super::AppendOperationLogEntry; +use crate::pipelines::new::NewPipeline; +use crate::sessions::QueryContext; +use crate::storages::fuse::io::MetaReaders; +use crate::storages::fuse::meta::BlockMeta; +use crate::storages::fuse::meta::SegmentInfo; +use crate::storages::fuse::FuseTable; +use crate::storages::fuse::DEFAULT_BLOCK_PER_SEGMENT; +use crate::storages::fuse::DEFAULT_ROW_PER_BLOCK; +use crate::storages::fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; +use crate::storages::fuse::FUSE_OPT_KEY_ROW_PER_BLOCK; +use crate::storages::storage_table::Table; +use crate::storages::storage_table_read_plan::get_description; + +pub struct MergeWriter { + plan: ReadDataSourcePlan, + remain_blocks: Vec, +} + +// prepare: merge the blocks write the segments. +// exec: write block +// genera the segments +// finish + +impl FuseTable { + pub async fn select_blocks_to_merge( + &self, + ctx: &Arc, + ) -> Result<(Statistics, Partitions)> { + let snapshot = self.read_table_snapshot(ctx.as_ref()).await?; + let max_row_per_block = + self.get_option(FUSE_OPT_KEY_ROW_PER_BLOCK, DEFAULT_ROW_PER_BLOCK) as u64; + let min_rows_per_block = (max_row_per_block as f64 * 0.8) as u64; + let block_per_seg = + self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); + + match snapshot { + Some(snapshot) => { + let mut block_metas = Vec::new(); + let mut remain_blocks = Vec::new(); + let reader = MetaReaders::segment_info_reader(ctx.as_ref()); + for (x, ver) in &snapshot.segments { + let mut need_merge = false; + let mut remains = Vec::new(); + let res = reader.read(x, None, *ver).await?; + res.blocks.iter().for_each(|b| { + if b.row_count > max_row_per_block || b.row_count < min_rows_per_block { + block_metas.push(b.clone()); + need_merge = true; + } else { + remains.push(b.clone()); + } + }); + + if !need_merge && res.blocks.len() == block_per_seg { + let log_entry = AppendOperationLogEntry::new(x.to_string(), res); + ctx.push_precommit_block(DataBlock::try_from(log_entry)?); + continue; + } + + remain_blocks.append(&mut remains); + } + + let partitions_scanned = block_metas.len(); + if partitions_scanned == 0 { + return Ok((Statistics::default(), vec![])); + } + let partitions_total = snapshot.summary.block_count as usize; + + let (mut statistics, parts) = Self::to_partitions(&block_metas, None); + // Update planner statistics. + statistics.partitions_total = partitions_total; + statistics.partitions_scanned = partitions_scanned; + + // Update context statistics. + ctx.get_dal_context() + .get_metrics() + .inc_partitions_total(partitions_total as u64); + ctx.get_dal_context() + .get_metrics() + .inc_partitions_scanned(partitions_scanned as u64); + + Ok((statistics, parts)) + } + None => Ok((Statistics::default(), vec![])), + } + } + + async fn build_read_datasource_plan( + &self, + ctx: &Arc, + catalog: String, + ) -> Result { + let (statistics, parts) = self.select_blocks_to_merge(ctx).await?; + let table_info = self.get_table_info(); + let description = get_description(table_info, &statistics); + + Ok(ReadDataSourcePlan { + catalog, + source_info: SourceInfo::TableSource(table_info.clone()), + scan_fields: None, + parts, + statistics, + description, + tbl_args: self.table_args(), + push_downs: None, + }) + } + + async fn merge(&self, ctx: &Arc, plan: &ReadDataSourcePlan) -> Result<()> { + let mut pipeline = NewPipeline::create(); + let read_source_plan = plan.clone(); + ctx.try_set_partitions(plan.parts.clone())?; + let res = self.do_read2(ctx.clone(), &read_source_plan, &mut pipeline); + if let Err(e) = res { + return Err(e); + } + + todo!() + } +} + +impl MergeWriter { + async fn prepare(&mut self) {} +} diff --git a/query/src/storages/fuse/operations/mod.rs b/query/src/storages/fuse/operations/mod.rs index 372a303f1479..4f499efe6287 100644 --- a/query/src/storages/fuse/operations/mod.rs +++ b/query/src/storages/fuse/operations/mod.rs @@ -18,6 +18,7 @@ mod delete; mod fuse_sink; mod gc; mod mutation; +mod merge; mod navigate; mod operation_log; mod read; diff --git a/query/src/storages/storage_table_read_plan.rs b/query/src/storages/storage_table_read_plan.rs index 0bfbe5301222..0ab863fb1089 100644 --- a/query/src/storages/storage_table_read_plan.rs +++ b/query/src/storages/storage_table_read_plan.rs @@ -87,7 +87,7 @@ impl ToReadDataSourcePlan for dyn Table { } } -fn get_description(table_info: &TableInfo, statistics: &Statistics) -> String { +pub fn get_description(table_info: &TableInfo, statistics: &Statistics) -> String { if statistics.read_rows > 0 { format!( "(Read from {} table, {} Read Rows:{}, Read Bytes:{}, Partitions Scanned:{}, Partitions Total:{})", From ba6eedc868678393c95229cbc5b3a3d4b19b17c7 Mon Sep 17 00:00:00 2001 From: zhyass <34016424+zhyass@users.noreply.github.com> Date: Fri, 1 Jul 2022 00:51:32 +0800 Subject: [PATCH 2/6] improve optimize table compact --- .../interpreter_table_optimize.rs | 14 +- query/src/storages/fuse/fuse_table.rs | 6 + query/src/storages/fuse/operations/compact.rs | 88 ++++++++++ query/src/storages/fuse/operations/merge.rs | 149 ---------------- query/src/storages/fuse/operations/mod.rs | 2 +- .../fuse/operations/mutation/block_filter.rs | 2 +- .../operations/mutation/compact_mutator.rs | 159 ++++++++++++++++++ .../storages/fuse/operations/mutation/mod.rs | 2 + query/src/storages/storage_table.rs | 9 + 9 files changed, 268 insertions(+), 163 deletions(-) create mode 100644 query/src/storages/fuse/operations/compact.rs delete mode 100644 query/src/storages/fuse/operations/merge.rs create mode 100644 query/src/storages/fuse/operations/mutation/compact_mutator.rs diff --git a/query/src/interpreters/interpreter_table_optimize.rs b/query/src/interpreters/interpreter_table_optimize.rs index db058e92e4f3..9d31b326ccb0 100644 --- a/query/src/interpreters/interpreter_table_optimize.rs +++ b/query/src/interpreters/interpreter_table_optimize.rs @@ -19,13 +19,10 @@ use common_planners::OptimizeTableAction; use common_planners::OptimizeTablePlan; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; -use futures::StreamExt; use crate::interpreters::Interpreter; -use crate::interpreters::InterpreterFactory; use crate::interpreters::InterpreterPtr; use crate::sessions::QueryContext; -use crate::sql::PlanParser; pub struct OptimizeTableInterpreter { ctx: Arc, @@ -65,15 +62,8 @@ impl Interpreter for OptimizeTableInterpreter { ); if do_compact { - // it is a "simple and violent" strategy, to be optimized later - let obj_name = format!("{}.{}", &plan.database, &plan.table); - let rewritten_query = - format!("INSERT OVERWRITE {} SELECT * FROM {}", obj_name, obj_name); - let rewritten_plan = - PlanParser::parse(self.ctx.clone(), rewritten_query.as_str()).await?; - let interpreter = InterpreterFactory::get(self.ctx.clone(), rewritten_plan)?; - let mut stream = interpreter.execute(None).await?; - while let Some(Ok(_)) = stream.next().await {} + // TODO(zhyass): clustering key. + table.compact(self.ctx.clone(), self.plan.clone()).await?; if do_purge { // currently, context caches the table, we have to "refresh" // the table by using the catalog API directly diff --git a/query/src/storages/fuse/fuse_table.rs b/query/src/storages/fuse/fuse_table.rs index a5d3a72a337c..b9f8a5bbc1ca 100644 --- a/query/src/storages/fuse/fuse_table.rs +++ b/query/src/storages/fuse/fuse_table.rs @@ -28,6 +28,7 @@ use common_meta_types::MatchSeq; use common_planners::DeletePlan; use common_planners::Expression; use common_planners::Extras; +use common_planners::OptimizeTablePlan; use common_planners::Partitions; use common_planners::ReadDataSourcePlan; use common_planners::Statistics; @@ -449,4 +450,9 @@ impl Table for FuseTable { async fn delete(&self, ctx: Arc, delete_plan: DeletePlan) -> Result<()> { self.do_delete(ctx, &delete_plan).await } + + #[tracing::instrument(level = "debug", name = "fuse_table_compact", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))] + async fn compact(&self, ctx: Arc, plan: OptimizeTablePlan) -> Result<()> { + self.do_compact(ctx, &plan).await + } } diff --git a/query/src/storages/fuse/operations/compact.rs b/query/src/storages/fuse/operations/compact.rs new file mode 100644 index 000000000000..8f046c1ff52e --- /dev/null +++ b/query/src/storages/fuse/operations/compact.rs @@ -0,0 +1,88 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use std::sync::Arc; + +use common_cache::Cache; +use common_exception::Result; +use common_planners::OptimizeTablePlan; + +use super::mutation::CompactMutator; +use crate::sessions::QueryContext; +use crate::storages::fuse::FuseTable; +use crate::storages::fuse::DEFAULT_BLOCK_PER_SEGMENT; +use crate::storages::fuse::DEFAULT_ROW_PER_BLOCK; +use crate::storages::fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; +use crate::storages::fuse::FUSE_OPT_KEY_ROW_PER_BLOCK; +use crate::storages::storage_table::Table; + +impl FuseTable { + pub async fn do_compact(&self, ctx: Arc, plan: &OptimizeTablePlan) -> Result<()> { + let snapshot_opt = self.read_table_snapshot(ctx.as_ref()).await?; + let snapshot = if let Some(val) = snapshot_opt { + val + } else { + // no snapshot, no compaction. + return Ok(()); + }; + + if snapshot.summary.block_count <= 1 { + return Ok(()); + } + + let row_per_block = self.get_option(FUSE_OPT_KEY_ROW_PER_BLOCK, DEFAULT_ROW_PER_BLOCK); + let block_per_seg = + self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); + + let mut mutator = CompactMutator::try_create( + &ctx, + &self.meta_location_generator, + &snapshot, + row_per_block, + block_per_seg, + )?; + + mutator.compact(self).await?; + let (new_snapshot, loc) = mutator.into_new_snapshot().await?; + + let operator = ctx.get_storage_operator()?; + let result = Self::commit_to_meta_server( + ctx.as_ref(), + &plan.catalog, + self.get_table_info(), + loc.clone(), + &new_snapshot.summary, + ) + .await; + + match result { + Ok(_) => { + if let Some(snapshot_cache) = + ctx.get_storage_cache_manager().get_table_snapshot_cache() + { + let cache = &mut snapshot_cache.write().await; + cache.put(loc, Arc::new(new_snapshot)); + } + Ok(()) + } + Err(e) => { + // commit snapshot to meta server failed, try to delete it. + // "major GC" will collect this, if deletion failure (even after DAL retried) + let _ = operator.object(&loc).delete().await; + Err(e) + } + } + } +} diff --git a/query/src/storages/fuse/operations/merge.rs b/query/src/storages/fuse/operations/merge.rs deleted file mode 100644 index e3dc298f041e..000000000000 --- a/query/src/storages/fuse/operations/merge.rs +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -use std::sync::Arc; - -use common_datablocks::DataBlock; -use common_exception::Result; -use common_planners::Partitions; -use common_planners::ReadDataSourcePlan; -use common_planners::SourceInfo; -use common_planners::Statistics; - -use super::AppendOperationLogEntry; -use crate::pipelines::new::NewPipeline; -use crate::sessions::QueryContext; -use crate::storages::fuse::io::MetaReaders; -use crate::storages::fuse::meta::BlockMeta; -use crate::storages::fuse::meta::SegmentInfo; -use crate::storages::fuse::FuseTable; -use crate::storages::fuse::DEFAULT_BLOCK_PER_SEGMENT; -use crate::storages::fuse::DEFAULT_ROW_PER_BLOCK; -use crate::storages::fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; -use crate::storages::fuse::FUSE_OPT_KEY_ROW_PER_BLOCK; -use crate::storages::storage_table::Table; -use crate::storages::storage_table_read_plan::get_description; - -pub struct MergeWriter { - plan: ReadDataSourcePlan, - remain_blocks: Vec, -} - -// prepare: merge the blocks write the segments. -// exec: write block -// genera the segments -// finish - -impl FuseTable { - pub async fn select_blocks_to_merge( - &self, - ctx: &Arc, - ) -> Result<(Statistics, Partitions)> { - let snapshot = self.read_table_snapshot(ctx.as_ref()).await?; - let max_row_per_block = - self.get_option(FUSE_OPT_KEY_ROW_PER_BLOCK, DEFAULT_ROW_PER_BLOCK) as u64; - let min_rows_per_block = (max_row_per_block as f64 * 0.8) as u64; - let block_per_seg = - self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); - - match snapshot { - Some(snapshot) => { - let mut block_metas = Vec::new(); - let mut remain_blocks = Vec::new(); - let reader = MetaReaders::segment_info_reader(ctx.as_ref()); - for (x, ver) in &snapshot.segments { - let mut need_merge = false; - let mut remains = Vec::new(); - let res = reader.read(x, None, *ver).await?; - res.blocks.iter().for_each(|b| { - if b.row_count > max_row_per_block || b.row_count < min_rows_per_block { - block_metas.push(b.clone()); - need_merge = true; - } else { - remains.push(b.clone()); - } - }); - - if !need_merge && res.blocks.len() == block_per_seg { - let log_entry = AppendOperationLogEntry::new(x.to_string(), res); - ctx.push_precommit_block(DataBlock::try_from(log_entry)?); - continue; - } - - remain_blocks.append(&mut remains); - } - - let partitions_scanned = block_metas.len(); - if partitions_scanned == 0 { - return Ok((Statistics::default(), vec![])); - } - let partitions_total = snapshot.summary.block_count as usize; - - let (mut statistics, parts) = Self::to_partitions(&block_metas, None); - // Update planner statistics. - statistics.partitions_total = partitions_total; - statistics.partitions_scanned = partitions_scanned; - - // Update context statistics. - ctx.get_dal_context() - .get_metrics() - .inc_partitions_total(partitions_total as u64); - ctx.get_dal_context() - .get_metrics() - .inc_partitions_scanned(partitions_scanned as u64); - - Ok((statistics, parts)) - } - None => Ok((Statistics::default(), vec![])), - } - } - - async fn build_read_datasource_plan( - &self, - ctx: &Arc, - catalog: String, - ) -> Result { - let (statistics, parts) = self.select_blocks_to_merge(ctx).await?; - let table_info = self.get_table_info(); - let description = get_description(table_info, &statistics); - - Ok(ReadDataSourcePlan { - catalog, - source_info: SourceInfo::TableSource(table_info.clone()), - scan_fields: None, - parts, - statistics, - description, - tbl_args: self.table_args(), - push_downs: None, - }) - } - - async fn merge(&self, ctx: &Arc, plan: &ReadDataSourcePlan) -> Result<()> { - let mut pipeline = NewPipeline::create(); - let read_source_plan = plan.clone(); - ctx.try_set_partitions(plan.parts.clone())?; - let res = self.do_read2(ctx.clone(), &read_source_plan, &mut pipeline); - if let Err(e) = res { - return Err(e); - } - - todo!() - } -} - -impl MergeWriter { - async fn prepare(&mut self) {} -} diff --git a/query/src/storages/fuse/operations/mod.rs b/query/src/storages/fuse/operations/mod.rs index 4f499efe6287..d05ef6cfcfdf 100644 --- a/query/src/storages/fuse/operations/mod.rs +++ b/query/src/storages/fuse/operations/mod.rs @@ -14,11 +14,11 @@ mod append; mod commit; +mod compact; mod delete; mod fuse_sink; mod gc; mod mutation; -mod merge; mod navigate; mod operation_log; mod read; diff --git a/query/src/storages/fuse/operations/mutation/block_filter.rs b/query/src/storages/fuse/operations/mutation/block_filter.rs index ef070aa5545a..1a1e09db3442 100644 --- a/query/src/storages/fuse/operations/mutation/block_filter.rs +++ b/query/src/storages/fuse/operations/mutation/block_filter.rs @@ -115,7 +115,7 @@ pub async fn delete_from_block( Ok(res) } -fn all_the_columns_ids(table: &FuseTable) -> Vec { +pub fn all_the_columns_ids(table: &FuseTable) -> Vec { (0..table.table_info.schema().fields().len()) .into_iter() .collect::>() diff --git a/query/src/storages/fuse/operations/mutation/compact_mutator.rs b/query/src/storages/fuse/operations/mutation/compact_mutator.rs new file mode 100644 index 000000000000..a731bc7feebe --- /dev/null +++ b/query/src/storages/fuse/operations/mutation/compact_mutator.rs @@ -0,0 +1,159 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use opendal::Operator; + +use super::block_filter::all_the_columns_ids; +use crate::sessions::QueryContext; +use crate::storages::fuse::io::BlockCompactor; +use crate::storages::fuse::io::BlockWriter; +use crate::storages::fuse::io::MetaReaders; +use crate::storages::fuse::io::SegmentWriter; +use crate::storages::fuse::io::TableMetaLocationGenerator; +use crate::storages::fuse::meta::Location; +use crate::storages::fuse::meta::SegmentInfo; +use crate::storages::fuse::meta::Statistics; +use crate::storages::fuse::meta::TableSnapshot; +use crate::storages::fuse::statistics::reducers::reduce_block_metas; +use crate::storages::fuse::statistics::reducers::reduce_statistics; +use crate::storages::fuse::FuseTable; + +pub struct CompactMutator<'a> { + ctx: &'a Arc, + location_generator: &'a TableMetaLocationGenerator, + base_snapshot: &'a TableSnapshot, + data_accessor: Operator, + row_per_block: usize, + block_per_seg: usize, + segments: Vec, + summarys: Vec, +} + +impl<'a> CompactMutator<'a> { + pub fn try_create( + ctx: &'a Arc, + location_generator: &'a TableMetaLocationGenerator, + base_snapshot: &'a TableSnapshot, + row_per_block: usize, + block_per_seg: usize, + ) -> Result { + let data_accessor = ctx.get_storage_operator()?; + Ok(Self { + ctx, + location_generator, + base_snapshot, + data_accessor, + row_per_block, + block_per_seg, + segments: Vec::new(), + summarys: Vec::new(), + }) + } + + pub async fn into_new_snapshot(self) -> Result<(TableSnapshot, String)> { + let snapshot = self.base_snapshot; + let mut new_snapshot = TableSnapshot::from_previous(snapshot); + new_snapshot.segments = self.segments.clone(); + // update the summary of new snapshot + let new_summary = reduce_statistics(&self.summarys)?; + new_snapshot.summary = new_summary; + + // write the new segment out (and keep it in undo log) + let snapshot_loc = self.location_generator.snapshot_location_from_uuid( + &new_snapshot.snapshot_id, + new_snapshot.format_version(), + )?; + let bytes = serde_json::to_vec(&new_snapshot)?; + self.data_accessor + .object(&snapshot_loc) + .write(bytes) + .await?; + Ok((new_snapshot, snapshot_loc)) + } + + pub async fn compact(&mut self, table: &FuseTable) -> Result<()> { + let mut remain_blocks = Vec::new(); + let mut merged_blocks = Vec::new(); + let reader = MetaReaders::segment_info_reader(self.ctx); + for segment_location in &self.base_snapshot.segments { + let (x, ver) = (segment_location.0.clone(), segment_location.1); + let mut need_merge = false; + let mut remains = Vec::new(); + let segment = reader.read(x, None, ver).await?; + segment.blocks.iter().for_each(|b| { + if b.row_count != self.row_per_block as u64 { + merged_blocks.push(b.clone()); + need_merge = true; + } else { + remains.push(b.clone()); + } + }); + + if !need_merge && segment.blocks.len() == self.block_per_seg { + self.segments.push(segment_location.clone()); + self.summarys.push(segment.summary.clone()); + continue; + } + + remain_blocks.append(&mut remains); + } + + let col_ids = all_the_columns_ids(table); + let mut compactor = BlockCompactor::new(self.row_per_block); + let block_writer = BlockWriter::new(&self.data_accessor, self.location_generator); + for block_meta in &merged_blocks { + let block_reader = table.create_block_reader(self.ctx, col_ids.clone())?; + let data_block = block_reader.read_with_block_meta(block_meta).await?; + + let res = compactor.compact(data_block)?; + if let Some(blocks) = res { + for block in blocks { + let new_block_meta = block_writer.write(block).await?; + remain_blocks.push(new_block_meta); + } + } + } + let remains = compactor.finish()?; + if let Some(blocks) = remains { + for block in blocks { + let new_block_meta = block_writer.write(block).await?; + remain_blocks.push(new_block_meta); + } + } + + let segment_info_cache = self + .ctx + .get_storage_cache_manager() + .get_table_segment_cache(); + let seg_writer = SegmentWriter::new( + &self.data_accessor, + self.location_generator, + &segment_info_cache, + ); + + let chunks = remain_blocks.chunks(self.block_per_seg); + for chunk in chunks { + let new_summary = reduce_block_metas(chunk)?; + let new_segment = SegmentInfo::new(chunk.to_vec(), new_summary.clone()); + let new_segment_location = seg_writer.write_segment(new_segment).await?; + self.segments.push(new_segment_location); + self.summarys.push(new_summary); + } + + Ok(()) + } +} diff --git a/query/src/storages/fuse/operations/mutation/mod.rs b/query/src/storages/fuse/operations/mutation/mod.rs index d36ac3e51d49..5b91d349124c 100644 --- a/query/src/storages/fuse/operations/mutation/mod.rs +++ b/query/src/storages/fuse/operations/mutation/mod.rs @@ -13,6 +13,8 @@ // limitations under the License. pub mod block_filter; +pub mod compact_mutator; pub mod mutations_collector; pub use block_filter::delete_from_block; +pub use compact_mutator::CompactMutator; diff --git a/query/src/storages/storage_table.rs b/query/src/storages/storage_table.rs index 94d10f48eca0..a2f9e77cb93f 100644 --- a/query/src/storages/storage_table.rs +++ b/query/src/storages/storage_table.rs @@ -27,6 +27,7 @@ use common_meta_types::MetaId; use common_planners::DeletePlan; use common_planners::Expression; use common_planners::Extras; +use common_planners::OptimizeTablePlan; use common_planners::Partitions; use common_planners::ReadDataSourcePlan; use common_planners::Statistics; @@ -202,6 +203,14 @@ pub trait Table: Sync + Send { self.get_table_info().engine(), ))) } + + async fn compact(&self, _ctx: Arc, _plan: OptimizeTablePlan) -> Result<()> { + Err(ErrorCode::UnImplement(format!( + "table {}, of engine type {}, does not support Optimize", + self.name(), + self.get_table_info().engine(), + ))) + } } #[derive(Debug)] From 528b5c0ab17fa7cd74aca982aaea85c17e3017e4 Mon Sep 17 00:00:00 2001 From: zhyass <34016424+zhyass@users.noreply.github.com> Date: Fri, 1 Jul 2022 00:57:28 +0800 Subject: [PATCH 3/6] remove unused code --- query/src/storages/storage_table_read_plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/query/src/storages/storage_table_read_plan.rs b/query/src/storages/storage_table_read_plan.rs index 0ab863fb1089..0bfbe5301222 100644 --- a/query/src/storages/storage_table_read_plan.rs +++ b/query/src/storages/storage_table_read_plan.rs @@ -87,7 +87,7 @@ impl ToReadDataSourcePlan for dyn Table { } } -pub fn get_description(table_info: &TableInfo, statistics: &Statistics) -> String { +fn get_description(table_info: &TableInfo, statistics: &Statistics) -> String { if statistics.read_rows > 0 { format!( "(Read from {} table, {} Read Rows:{}, Read Bytes:{}, Partitions Scanned:{}, Partitions Total:{})", From 41f0f2f7d40907f719fa9cb77b5de574f54467bf Mon Sep 17 00:00:00 2001 From: zhyass <34016424+zhyass@users.noreply.github.com> Date: Fri, 1 Jul 2022 10:09:58 +0800 Subject: [PATCH 4/6] Add some comments --- .../storages/fuse/operations/mutation/compact_mutator.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/query/src/storages/fuse/operations/mutation/compact_mutator.rs b/query/src/storages/fuse/operations/mutation/compact_mutator.rs index a731bc7feebe..6465d516e639 100644 --- a/query/src/storages/fuse/operations/mutation/compact_mutator.rs +++ b/query/src/storages/fuse/operations/mutation/compact_mutator.rs @@ -86,7 +86,9 @@ impl<'a> CompactMutator<'a> { } pub async fn compact(&mut self, table: &FuseTable) -> Result<()> { + // Blocks that need to be reorganized into new segments. let mut remain_blocks = Vec::new(); + // Blocks that need to be compacted. let mut merged_blocks = Vec::new(); let reader = MetaReaders::segment_info_reader(self.ctx); for segment_location in &self.base_snapshot.segments { @@ -103,6 +105,8 @@ impl<'a> CompactMutator<'a> { } }); + // If the number of blocks of segment meets block_per_seg, and the blocks in segments donot need to be compacted, + // then record the segment information. if !need_merge && segment.blocks.len() == self.block_per_seg { self.segments.push(segment_location.clone()); self.summarys.push(segment.summary.clone()); @@ -112,6 +116,7 @@ impl<'a> CompactMutator<'a> { remain_blocks.append(&mut remains); } + // Compact the blocks. let col_ids = all_the_columns_ids(table); let mut compactor = BlockCompactor::new(self.row_per_block); let block_writer = BlockWriter::new(&self.data_accessor, self.location_generator); @@ -135,6 +140,7 @@ impl<'a> CompactMutator<'a> { } } + // Create new segments. let segment_info_cache = self .ctx .get_storage_cache_manager() @@ -144,7 +150,6 @@ impl<'a> CompactMutator<'a> { self.location_generator, &segment_info_cache, ); - let chunks = remain_blocks.chunks(self.block_per_seg); for chunk in chunks { let new_summary = reduce_block_metas(chunk)?; From e64ae8cd26de57fb854f991bb710e3b955b1a869 Mon Sep 17 00:00:00 2001 From: zhyass <34016424+zhyass@users.noreply.github.com> Date: Fri, 1 Jul 2022 16:55:31 +0800 Subject: [PATCH 5/6] fix review comment --- query/src/storages/fuse/fuse_table.rs | 6 +- query/src/storages/fuse/operations/commit.rs | 45 +++-------- query/src/storages/fuse/operations/compact.rs | 41 +++------- .../operations/mutation/compact_mutator.rs | 80 ++++++++----------- query/src/storages/storage_table.rs | 2 +- 5 files changed, 62 insertions(+), 112 deletions(-) diff --git a/query/src/storages/fuse/fuse_table.rs b/query/src/storages/fuse/fuse_table.rs index b9f8a5bbc1ca..795f360d9c69 100644 --- a/query/src/storages/fuse/fuse_table.rs +++ b/query/src/storages/fuse/fuse_table.rs @@ -44,6 +44,7 @@ use crate::sql::PlanParser; use crate::sql::OPT_KEY_DATABASE_ID; use crate::sql::OPT_KEY_LEGACY_SNAPSHOT_LOC; use crate::sql::OPT_KEY_SNAPSHOT_LOCATION; +use crate::storages::fuse::io::write_meta; use crate::storages::fuse::io::MetaReaders; use crate::storages::fuse::io::TableMetaLocationGenerator; use crate::storages::fuse::meta::ClusterKey; @@ -171,7 +172,7 @@ impl FuseTable { } } - async fn update_table_meta( + pub async fn update_table_meta( &self, ctx: &QueryContext, catalog_name: &str, @@ -182,9 +183,8 @@ impl FuseTable { let snapshot_loc = self .meta_location_generator() .snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?; - let bytes = serde_json::to_vec(snapshot)?; let operator = ctx.get_storage_operator()?; - operator.object(&snapshot_loc).write(bytes).await?; + write_meta(&operator, &snapshot_loc, snapshot).await?; // set new snapshot location meta.options diff --git a/query/src/storages/fuse/operations/commit.rs b/query/src/storages/fuse/operations/commit.rs index ba5ebd3adb43..14aa7b7672c8 100644 --- a/query/src/storages/fuse/operations/commit.rs +++ b/query/src/storages/fuse/operations/commit.rs @@ -37,7 +37,6 @@ use uuid::Uuid; use crate::sessions::QueryContext; use crate::sql::OPT_KEY_LEGACY_SNAPSHOT_LOC; use crate::sql::OPT_KEY_SNAPSHOT_LOCATION; -use crate::storages::fuse::io::write_meta; use crate::storages::fuse::meta::ClusterKey; use crate::storages::fuse::meta::Location; use crate::storages::fuse::meta::SegmentInfo; @@ -199,39 +198,17 @@ impl FuseTable { )? }; - let uuid = new_snapshot.snapshot_id; - let snapshot_loc = self - .meta_location_generator() - .snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?; - let operator = ctx.get_storage_operator()?; - write_meta(&operator, &snapshot_loc, &new_snapshot).await?; - - let result = Self::commit_to_meta_server( - ctx, - catalog_name, - self.get_table_info(), - snapshot_loc.clone(), - &new_snapshot.summary, - ) - .await; - - match result { - Ok(_) => { - if let Some(snapshot_cache) = - ctx.get_storage_cache_manager().get_table_snapshot_cache() - { - let cache = &mut snapshot_cache.write().await; - cache.put(snapshot_loc, Arc::new(new_snapshot)); - } - Ok(()) - } - Err(e) => { - // commit snapshot to meta server failed, try to delete it. - // "major GC" will collect this, if deletion failure (even after DAL retried) - let _ = operator.object(&snapshot_loc).delete().await; - Err(e) - } - } + let mut new_table_meta = self.get_table_info().meta.clone(); + // update statistics + new_table_meta.statistics = TableStatistics { + number_of_rows: new_snapshot.summary.row_count, + data_bytes: new_snapshot.summary.uncompressed_byte_size, + compressed_data_bytes: new_snapshot.summary.compressed_byte_size, + index_data_bytes: 0, // TODO we do not have it yet + }; + + self.update_table_meta(ctx, catalog_name, &new_snapshot, &mut new_table_meta) + .await } fn merge_table_operations( diff --git a/query/src/storages/fuse/operations/compact.rs b/query/src/storages/fuse/operations/compact.rs index 8f046c1ff52e..c64058eb7540 100644 --- a/query/src/storages/fuse/operations/compact.rs +++ b/query/src/storages/fuse/operations/compact.rs @@ -15,8 +15,8 @@ use std::sync::Arc; -use common_cache::Cache; use common_exception::Result; +use common_meta_app::schema::TableStatistics; use common_planners::OptimizeTablePlan; use super::mutation::CompactMutator; @@ -54,35 +54,20 @@ impl FuseTable { block_per_seg, )?; - mutator.compact(self).await?; - let (new_snapshot, loc) = mutator.into_new_snapshot().await?; - - let operator = ctx.get_storage_operator()?; - let result = Self::commit_to_meta_server( + let new_snapshot = mutator.compact(self).await?; + let mut new_table_meta = self.get_table_info().meta.clone(); // update statistics + new_table_meta.statistics = TableStatistics { + number_of_rows: new_snapshot.summary.row_count, + data_bytes: new_snapshot.summary.uncompressed_byte_size, + compressed_data_bytes: new_snapshot.summary.compressed_byte_size, + index_data_bytes: 0, // TODO we do not have it yet + }; + self.update_table_meta( ctx.as_ref(), &plan.catalog, - self.get_table_info(), - loc.clone(), - &new_snapshot.summary, + &new_snapshot, + &mut new_table_meta, ) - .await; - - match result { - Ok(_) => { - if let Some(snapshot_cache) = - ctx.get_storage_cache_manager().get_table_snapshot_cache() - { - let cache = &mut snapshot_cache.write().await; - cache.put(loc, Arc::new(new_snapshot)); - } - Ok(()) - } - Err(e) => { - // commit snapshot to meta server failed, try to delete it. - // "major GC" will collect this, if deletion failure (even after DAL retried) - let _ = operator.object(&loc).delete().await; - Err(e) - } - } + .await } } diff --git a/query/src/storages/fuse/operations/mutation/compact_mutator.rs b/query/src/storages/fuse/operations/mutation/compact_mutator.rs index 6465d516e639..d1812d8fe100 100644 --- a/query/src/storages/fuse/operations/mutation/compact_mutator.rs +++ b/query/src/storages/fuse/operations/mutation/compact_mutator.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_datablocks::DataBlock; use common_exception::Result; use opendal::Operator; @@ -24,9 +25,8 @@ use crate::storages::fuse::io::BlockWriter; use crate::storages::fuse::io::MetaReaders; use crate::storages::fuse::io::SegmentWriter; use crate::storages::fuse::io::TableMetaLocationGenerator; -use crate::storages::fuse::meta::Location; +use crate::storages::fuse::meta::BlockMeta; use crate::storages::fuse::meta::SegmentInfo; -use crate::storages::fuse::meta::Statistics; use crate::storages::fuse::meta::TableSnapshot; use crate::storages::fuse::statistics::reducers::reduce_block_metas; use crate::storages::fuse::statistics::reducers::reduce_statistics; @@ -39,8 +39,6 @@ pub struct CompactMutator<'a> { data_accessor: Operator, row_per_block: usize, block_per_seg: usize, - segments: Vec, - summarys: Vec, } impl<'a> CompactMutator<'a> { @@ -59,39 +57,20 @@ impl<'a> CompactMutator<'a> { data_accessor, row_per_block, block_per_seg, - segments: Vec::new(), - summarys: Vec::new(), }) } - pub async fn into_new_snapshot(self) -> Result<(TableSnapshot, String)> { + pub async fn compact(&mut self, table: &FuseTable) -> Result { let snapshot = self.base_snapshot; - let mut new_snapshot = TableSnapshot::from_previous(snapshot); - new_snapshot.segments = self.segments.clone(); - // update the summary of new snapshot - let new_summary = reduce_statistics(&self.summarys)?; - new_snapshot.summary = new_summary; - - // write the new segment out (and keep it in undo log) - let snapshot_loc = self.location_generator.snapshot_location_from_uuid( - &new_snapshot.snapshot_id, - new_snapshot.format_version(), - )?; - let bytes = serde_json::to_vec(&new_snapshot)?; - self.data_accessor - .object(&snapshot_loc) - .write(bytes) - .await?; - Ok((new_snapshot, snapshot_loc)) - } - - pub async fn compact(&mut self, table: &FuseTable) -> Result<()> { - // Blocks that need to be reorganized into new segments. + // Blocks that need to be reorganized into new segments. let mut remain_blocks = Vec::new(); // Blocks that need to be compacted. let mut merged_blocks = Vec::new(); + // The new segments. + let mut segments = Vec::new(); + let mut summarys = Vec::new(); let reader = MetaReaders::segment_info_reader(self.ctx); - for segment_location in &self.base_snapshot.segments { + for segment_location in &snapshot.segments { let (x, ver) = (segment_location.0.clone(), segment_location.1); let mut need_merge = false; let mut remains = Vec::new(); @@ -105,11 +84,11 @@ impl<'a> CompactMutator<'a> { } }); - // If the number of blocks of segment meets block_per_seg, and the blocks in segments donot need to be compacted, + // If the number of blocks of segment meets block_per_seg, and the blocks in segments donot need to be compacted, // then record the segment information. if !need_merge && segment.blocks.len() == self.block_per_seg { - self.segments.push(segment_location.clone()); - self.summarys.push(segment.summary.clone()); + segments.push(segment_location.clone()); + summarys.push(segment.summary.clone()); continue; } @@ -125,20 +104,10 @@ impl<'a> CompactMutator<'a> { let data_block = block_reader.read_with_block_meta(block_meta).await?; let res = compactor.compact(data_block)?; - if let Some(blocks) = res { - for block in blocks { - let new_block_meta = block_writer.write(block).await?; - remain_blocks.push(new_block_meta); - } - } + Self::write_block(&block_writer, res, &mut remain_blocks).await?; } let remains = compactor.finish()?; - if let Some(blocks) = remains { - for block in blocks { - let new_block_meta = block_writer.write(block).await?; - remain_blocks.push(new_block_meta); - } - } + Self::write_block(&block_writer, remains, &mut remain_blocks).await?; // Create new segments. let segment_info_cache = self @@ -155,10 +124,29 @@ impl<'a> CompactMutator<'a> { let new_summary = reduce_block_metas(chunk)?; let new_segment = SegmentInfo::new(chunk.to_vec(), new_summary.clone()); let new_segment_location = seg_writer.write_segment(new_segment).await?; - self.segments.push(new_segment_location); - self.summarys.push(new_summary); + segments.push(new_segment_location); + summarys.push(new_summary); } + let mut new_snapshot = TableSnapshot::from_previous(snapshot); + new_snapshot.segments = segments; + // update the summary of new snapshot + let new_summary = reduce_statistics(&summarys)?; + new_snapshot.summary = new_summary; + Ok(new_snapshot) + } + + async fn write_block( + writer: &BlockWriter<'_>, + blocks: Option>, + metas: &mut Vec, + ) -> Result<()> { + if let Some(blocks) = blocks { + for block in blocks { + let new_block_meta = writer.write(block).await?; + metas.push(new_block_meta); + } + } Ok(()) } } diff --git a/query/src/storages/storage_table.rs b/query/src/storages/storage_table.rs index a2f9e77cb93f..bf2918e28e25 100644 --- a/query/src/storages/storage_table.rs +++ b/query/src/storages/storage_table.rs @@ -206,7 +206,7 @@ pub trait Table: Sync + Send { async fn compact(&self, _ctx: Arc, _plan: OptimizeTablePlan) -> Result<()> { Err(ErrorCode::UnImplement(format!( - "table {}, of engine type {}, does not support Optimize", + "table {}, of engine type {}, does not support compact", self.name(), self.get_table_info().engine(), ))) From 1ba97796f82533e650480f7947d9decf25dbd7df Mon Sep 17 00:00:00 2001 From: zhyass <34016424+zhyass@users.noreply.github.com> Date: Fri, 1 Jul 2022 16:59:37 +0800 Subject: [PATCH 6/6] remove unused code --- query/src/storages/fuse/operations/commit.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/query/src/storages/fuse/operations/commit.rs b/query/src/storages/fuse/operations/commit.rs index 14aa7b7672c8..6e82f71f0be4 100644 --- a/query/src/storages/fuse/operations/commit.rs +++ b/query/src/storages/fuse/operations/commit.rs @@ -20,7 +20,6 @@ use std::time::Instant; use backoff::backoff::Backoff; use backoff::ExponentialBackoffBuilder; use common_base::base::ProgressValues; -use common_cache::Cache; use common_datavalues::DataSchema; use common_exception::ErrorCode; use common_exception::Result;