diff --git a/query/src/interpreters/interpreter_table_optimize.rs b/query/src/interpreters/interpreter_table_optimize.rs index db058e92e4f3..9d31b326ccb0 100644 --- a/query/src/interpreters/interpreter_table_optimize.rs +++ b/query/src/interpreters/interpreter_table_optimize.rs @@ -19,13 +19,10 @@ use common_planners::OptimizeTableAction; use common_planners::OptimizeTablePlan; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; -use futures::StreamExt; use crate::interpreters::Interpreter; -use crate::interpreters::InterpreterFactory; use crate::interpreters::InterpreterPtr; use crate::sessions::QueryContext; -use crate::sql::PlanParser; pub struct OptimizeTableInterpreter { ctx: Arc, @@ -65,15 +62,8 @@ impl Interpreter for OptimizeTableInterpreter { ); if do_compact { - // it is a "simple and violent" strategy, to be optimized later - let obj_name = format!("{}.{}", &plan.database, &plan.table); - let rewritten_query = - format!("INSERT OVERWRITE {} SELECT * FROM {}", obj_name, obj_name); - let rewritten_plan = - PlanParser::parse(self.ctx.clone(), rewritten_query.as_str()).await?; - let interpreter = InterpreterFactory::get(self.ctx.clone(), rewritten_plan)?; - let mut stream = interpreter.execute(None).await?; - while let Some(Ok(_)) = stream.next().await {} + // TODO(zhyass): clustering key. + table.compact(self.ctx.clone(), self.plan.clone()).await?; if do_purge { // currently, context caches the table, we have to "refresh" // the table by using the catalog API directly diff --git a/query/src/storages/fuse/fuse_table.rs b/query/src/storages/fuse/fuse_table.rs index a5d3a72a337c..795f360d9c69 100644 --- a/query/src/storages/fuse/fuse_table.rs +++ b/query/src/storages/fuse/fuse_table.rs @@ -28,6 +28,7 @@ use common_meta_types::MatchSeq; use common_planners::DeletePlan; use common_planners::Expression; use common_planners::Extras; +use common_planners::OptimizeTablePlan; use common_planners::Partitions; use common_planners::ReadDataSourcePlan; use common_planners::Statistics; @@ -43,6 +44,7 @@ use crate::sql::PlanParser; use crate::sql::OPT_KEY_DATABASE_ID; use crate::sql::OPT_KEY_LEGACY_SNAPSHOT_LOC; use crate::sql::OPT_KEY_SNAPSHOT_LOCATION; +use crate::storages::fuse::io::write_meta; use crate::storages::fuse::io::MetaReaders; use crate::storages::fuse::io::TableMetaLocationGenerator; use crate::storages::fuse::meta::ClusterKey; @@ -170,7 +172,7 @@ impl FuseTable { } } - async fn update_table_meta( + pub async fn update_table_meta( &self, ctx: &QueryContext, catalog_name: &str, @@ -181,9 +183,8 @@ impl FuseTable { let snapshot_loc = self .meta_location_generator() .snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?; - let bytes = serde_json::to_vec(snapshot)?; let operator = ctx.get_storage_operator()?; - operator.object(&snapshot_loc).write(bytes).await?; + write_meta(&operator, &snapshot_loc, snapshot).await?; // set new snapshot location meta.options @@ -449,4 +450,9 @@ impl Table for FuseTable { async fn delete(&self, ctx: Arc, delete_plan: DeletePlan) -> Result<()> { self.do_delete(ctx, &delete_plan).await } + + #[tracing::instrument(level = "debug", name = "fuse_table_compact", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))] + async fn compact(&self, ctx: Arc, plan: OptimizeTablePlan) -> Result<()> { + self.do_compact(ctx, &plan).await + } } diff --git a/query/src/storages/fuse/operations/append.rs b/query/src/storages/fuse/operations/append.rs index 8bb15ce100f8..b156f86ac50d 100644 --- a/query/src/storages/fuse/operations/append.rs +++ b/query/src/storages/fuse/operations/append.rs @@ -223,7 +223,7 @@ impl FuseTable { Ok(()) } - fn get_option(&self, opt_key: &str, default: T) -> T { + pub fn get_option(&self, opt_key: &str, default: T) -> T { self.table_info .options() .get(opt_key) diff --git a/query/src/storages/fuse/operations/commit.rs b/query/src/storages/fuse/operations/commit.rs index ba5ebd3adb43..6e82f71f0be4 100644 --- a/query/src/storages/fuse/operations/commit.rs +++ b/query/src/storages/fuse/operations/commit.rs @@ -20,7 +20,6 @@ use std::time::Instant; use backoff::backoff::Backoff; use backoff::ExponentialBackoffBuilder; use common_base::base::ProgressValues; -use common_cache::Cache; use common_datavalues::DataSchema; use common_exception::ErrorCode; use common_exception::Result; @@ -37,7 +36,6 @@ use uuid::Uuid; use crate::sessions::QueryContext; use crate::sql::OPT_KEY_LEGACY_SNAPSHOT_LOC; use crate::sql::OPT_KEY_SNAPSHOT_LOCATION; -use crate::storages::fuse::io::write_meta; use crate::storages::fuse::meta::ClusterKey; use crate::storages::fuse::meta::Location; use crate::storages::fuse::meta::SegmentInfo; @@ -199,39 +197,17 @@ impl FuseTable { )? }; - let uuid = new_snapshot.snapshot_id; - let snapshot_loc = self - .meta_location_generator() - .snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?; - let operator = ctx.get_storage_operator()?; - write_meta(&operator, &snapshot_loc, &new_snapshot).await?; - - let result = Self::commit_to_meta_server( - ctx, - catalog_name, - self.get_table_info(), - snapshot_loc.clone(), - &new_snapshot.summary, - ) - .await; - - match result { - Ok(_) => { - if let Some(snapshot_cache) = - ctx.get_storage_cache_manager().get_table_snapshot_cache() - { - let cache = &mut snapshot_cache.write().await; - cache.put(snapshot_loc, Arc::new(new_snapshot)); - } - Ok(()) - } - Err(e) => { - // commit snapshot to meta server failed, try to delete it. - // "major GC" will collect this, if deletion failure (even after DAL retried) - let _ = operator.object(&snapshot_loc).delete().await; - Err(e) - } - } + let mut new_table_meta = self.get_table_info().meta.clone(); + // update statistics + new_table_meta.statistics = TableStatistics { + number_of_rows: new_snapshot.summary.row_count, + data_bytes: new_snapshot.summary.uncompressed_byte_size, + compressed_data_bytes: new_snapshot.summary.compressed_byte_size, + index_data_bytes: 0, // TODO we do not have it yet + }; + + self.update_table_meta(ctx, catalog_name, &new_snapshot, &mut new_table_meta) + .await } fn merge_table_operations( diff --git a/query/src/storages/fuse/operations/compact.rs b/query/src/storages/fuse/operations/compact.rs new file mode 100644 index 000000000000..c64058eb7540 --- /dev/null +++ b/query/src/storages/fuse/operations/compact.rs @@ -0,0 +1,73 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use std::sync::Arc; + +use common_exception::Result; +use common_meta_app::schema::TableStatistics; +use common_planners::OptimizeTablePlan; + +use super::mutation::CompactMutator; +use crate::sessions::QueryContext; +use crate::storages::fuse::FuseTable; +use crate::storages::fuse::DEFAULT_BLOCK_PER_SEGMENT; +use crate::storages::fuse::DEFAULT_ROW_PER_BLOCK; +use crate::storages::fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; +use crate::storages::fuse::FUSE_OPT_KEY_ROW_PER_BLOCK; +use crate::storages::storage_table::Table; + +impl FuseTable { + pub async fn do_compact(&self, ctx: Arc, plan: &OptimizeTablePlan) -> Result<()> { + let snapshot_opt = self.read_table_snapshot(ctx.as_ref()).await?; + let snapshot = if let Some(val) = snapshot_opt { + val + } else { + // no snapshot, no compaction. + return Ok(()); + }; + + if snapshot.summary.block_count <= 1 { + return Ok(()); + } + + let row_per_block = self.get_option(FUSE_OPT_KEY_ROW_PER_BLOCK, DEFAULT_ROW_PER_BLOCK); + let block_per_seg = + self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); + + let mut mutator = CompactMutator::try_create( + &ctx, + &self.meta_location_generator, + &snapshot, + row_per_block, + block_per_seg, + )?; + + let new_snapshot = mutator.compact(self).await?; + let mut new_table_meta = self.get_table_info().meta.clone(); // update statistics + new_table_meta.statistics = TableStatistics { + number_of_rows: new_snapshot.summary.row_count, + data_bytes: new_snapshot.summary.uncompressed_byte_size, + compressed_data_bytes: new_snapshot.summary.compressed_byte_size, + index_data_bytes: 0, // TODO we do not have it yet + }; + self.update_table_meta( + ctx.as_ref(), + &plan.catalog, + &new_snapshot, + &mut new_table_meta, + ) + .await + } +} diff --git a/query/src/storages/fuse/operations/mod.rs b/query/src/storages/fuse/operations/mod.rs index 372a303f1479..d05ef6cfcfdf 100644 --- a/query/src/storages/fuse/operations/mod.rs +++ b/query/src/storages/fuse/operations/mod.rs @@ -14,6 +14,7 @@ mod append; mod commit; +mod compact; mod delete; mod fuse_sink; mod gc; diff --git a/query/src/storages/fuse/operations/mutation/block_filter.rs b/query/src/storages/fuse/operations/mutation/block_filter.rs index ef070aa5545a..1a1e09db3442 100644 --- a/query/src/storages/fuse/operations/mutation/block_filter.rs +++ b/query/src/storages/fuse/operations/mutation/block_filter.rs @@ -115,7 +115,7 @@ pub async fn delete_from_block( Ok(res) } -fn all_the_columns_ids(table: &FuseTable) -> Vec { +pub fn all_the_columns_ids(table: &FuseTable) -> Vec { (0..table.table_info.schema().fields().len()) .into_iter() .collect::>() diff --git a/query/src/storages/fuse/operations/mutation/compact_mutator.rs b/query/src/storages/fuse/operations/mutation/compact_mutator.rs new file mode 100644 index 000000000000..d1812d8fe100 --- /dev/null +++ b/query/src/storages/fuse/operations/mutation/compact_mutator.rs @@ -0,0 +1,152 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_exception::Result; +use opendal::Operator; + +use super::block_filter::all_the_columns_ids; +use crate::sessions::QueryContext; +use crate::storages::fuse::io::BlockCompactor; +use crate::storages::fuse::io::BlockWriter; +use crate::storages::fuse::io::MetaReaders; +use crate::storages::fuse::io::SegmentWriter; +use crate::storages::fuse::io::TableMetaLocationGenerator; +use crate::storages::fuse::meta::BlockMeta; +use crate::storages::fuse::meta::SegmentInfo; +use crate::storages::fuse::meta::TableSnapshot; +use crate::storages::fuse::statistics::reducers::reduce_block_metas; +use crate::storages::fuse::statistics::reducers::reduce_statistics; +use crate::storages::fuse::FuseTable; + +pub struct CompactMutator<'a> { + ctx: &'a Arc, + location_generator: &'a TableMetaLocationGenerator, + base_snapshot: &'a TableSnapshot, + data_accessor: Operator, + row_per_block: usize, + block_per_seg: usize, +} + +impl<'a> CompactMutator<'a> { + pub fn try_create( + ctx: &'a Arc, + location_generator: &'a TableMetaLocationGenerator, + base_snapshot: &'a TableSnapshot, + row_per_block: usize, + block_per_seg: usize, + ) -> Result { + let data_accessor = ctx.get_storage_operator()?; + Ok(Self { + ctx, + location_generator, + base_snapshot, + data_accessor, + row_per_block, + block_per_seg, + }) + } + + pub async fn compact(&mut self, table: &FuseTable) -> Result { + let snapshot = self.base_snapshot; + // Blocks that need to be reorganized into new segments. + let mut remain_blocks = Vec::new(); + // Blocks that need to be compacted. + let mut merged_blocks = Vec::new(); + // The new segments. + let mut segments = Vec::new(); + let mut summarys = Vec::new(); + let reader = MetaReaders::segment_info_reader(self.ctx); + for segment_location in &snapshot.segments { + let (x, ver) = (segment_location.0.clone(), segment_location.1); + let mut need_merge = false; + let mut remains = Vec::new(); + let segment = reader.read(x, None, ver).await?; + segment.blocks.iter().for_each(|b| { + if b.row_count != self.row_per_block as u64 { + merged_blocks.push(b.clone()); + need_merge = true; + } else { + remains.push(b.clone()); + } + }); + + // If the number of blocks of segment meets block_per_seg, and the blocks in segments donot need to be compacted, + // then record the segment information. + if !need_merge && segment.blocks.len() == self.block_per_seg { + segments.push(segment_location.clone()); + summarys.push(segment.summary.clone()); + continue; + } + + remain_blocks.append(&mut remains); + } + + // Compact the blocks. + let col_ids = all_the_columns_ids(table); + let mut compactor = BlockCompactor::new(self.row_per_block); + let block_writer = BlockWriter::new(&self.data_accessor, self.location_generator); + for block_meta in &merged_blocks { + let block_reader = table.create_block_reader(self.ctx, col_ids.clone())?; + let data_block = block_reader.read_with_block_meta(block_meta).await?; + + let res = compactor.compact(data_block)?; + Self::write_block(&block_writer, res, &mut remain_blocks).await?; + } + let remains = compactor.finish()?; + Self::write_block(&block_writer, remains, &mut remain_blocks).await?; + + // Create new segments. + let segment_info_cache = self + .ctx + .get_storage_cache_manager() + .get_table_segment_cache(); + let seg_writer = SegmentWriter::new( + &self.data_accessor, + self.location_generator, + &segment_info_cache, + ); + let chunks = remain_blocks.chunks(self.block_per_seg); + for chunk in chunks { + let new_summary = reduce_block_metas(chunk)?; + let new_segment = SegmentInfo::new(chunk.to_vec(), new_summary.clone()); + let new_segment_location = seg_writer.write_segment(new_segment).await?; + segments.push(new_segment_location); + summarys.push(new_summary); + } + + let mut new_snapshot = TableSnapshot::from_previous(snapshot); + new_snapshot.segments = segments; + // update the summary of new snapshot + let new_summary = reduce_statistics(&summarys)?; + new_snapshot.summary = new_summary; + Ok(new_snapshot) + } + + async fn write_block( + writer: &BlockWriter<'_>, + blocks: Option>, + metas: &mut Vec, + ) -> Result<()> { + if let Some(blocks) = blocks { + for block in blocks { + let new_block_meta = writer.write(block).await?; + metas.push(new_block_meta); + } + } + Ok(()) + } +} diff --git a/query/src/storages/fuse/operations/mutation/mod.rs b/query/src/storages/fuse/operations/mutation/mod.rs index d36ac3e51d49..5b91d349124c 100644 --- a/query/src/storages/fuse/operations/mutation/mod.rs +++ b/query/src/storages/fuse/operations/mutation/mod.rs @@ -13,6 +13,8 @@ // limitations under the License. pub mod block_filter; +pub mod compact_mutator; pub mod mutations_collector; pub use block_filter::delete_from_block; +pub use compact_mutator::CompactMutator; diff --git a/query/src/storages/storage_table.rs b/query/src/storages/storage_table.rs index 94d10f48eca0..bf2918e28e25 100644 --- a/query/src/storages/storage_table.rs +++ b/query/src/storages/storage_table.rs @@ -27,6 +27,7 @@ use common_meta_types::MetaId; use common_planners::DeletePlan; use common_planners::Expression; use common_planners::Extras; +use common_planners::OptimizeTablePlan; use common_planners::Partitions; use common_planners::ReadDataSourcePlan; use common_planners::Statistics; @@ -202,6 +203,14 @@ pub trait Table: Sync + Send { self.get_table_info().engine(), ))) } + + async fn compact(&self, _ctx: Arc, _plan: OptimizeTablePlan) -> Result<()> { + Err(ErrorCode::UnImplement(format!( + "table {}, of engine type {}, does not support compact", + self.name(), + self.get_table_info().engine(), + ))) + } } #[derive(Debug)]