diff --git a/src/query/catalog/src/table_mutator.rs b/src/query/catalog/src/table_mutator.rs index 01f9aa665067..7dc73093342e 100644 --- a/src/query/catalog/src/table_mutator.rs +++ b/src/query/catalog/src/table_mutator.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use common_exception::Result; -use common_meta_app::schema::TableInfo; + +use crate::table::Table; #[async_trait::async_trait] pub trait TableMutator: Send + Sync { async fn blocks_select(&mut self) -> Result; - async fn try_commit(&self, table_info: &TableInfo) -> Result<()>; + async fn try_commit(&self, table: Arc) -> Result<()>; } diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index 06d1c31867e3..2f632bebe70b 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -75,7 +75,7 @@ impl Interpreter for OptimizeTableInterpreter { executor.execute()?; drop(executor); - mutator.try_commit(table.get_table_info()).await?; + mutator.try_commit(table.clone()).await?; } if do_purge { diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index b5d059afeb36..a43abee89de0 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -91,7 +91,7 @@ impl Interpreter for ReclusterTableInterpreter { executor.execute()?; drop(executor); - mutator.try_commit(table.get_table_info()).await?; + mutator.try_commit(table).await?; if !plan.is_final { break; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/compact_mutator.rs new file mode 100644 index 000000000000..296006c8fcf6 --- /dev/null +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/compact_mutator.rs @@ -0,0 +1,187 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_base::base::tokio; +use common_catalog::table::Table; +use common_catalog::table_mutator::TableMutator; +use common_exception::ErrorCode; +use common_exception::Result; +use common_storages_fuse::FuseTable; +use databend_query::pipelines::executor::ExecutorSettings; +use databend_query::pipelines::executor::PipelineCompleteExecutor; +use databend_query::sessions::QueryContext; +use databend_query::sessions::TableContext; +use databend_query::sql::plans::Plan; +use databend_query::sql::Planner; + +use crate::storages::fuse::table_test_fixture::execute_command; +use crate::storages::fuse::table_test_fixture::execute_query; +use crate::storages::fuse::table_test_fixture::expects_ok; +use crate::storages::fuse::table_test_fixture::TestFixture; + +#[tokio::test] +async fn test_compact() -> Result<()> { + let fixture = TestFixture::new().await; + let ctx = fixture.ctx(); + let tbl_name = fixture.default_table_name(); + let db_name = fixture.default_db_name(); + + fixture.create_normal_table().await?; + + // insert + for i in 0..9 { + let qry = format!("insert into {}.{}(id) values({})", db_name, tbl_name, i); + execute_command(ctx.clone(), qry.as_str()).await?; + } + + // compact + let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?; + let table = catalog + .get_table(ctx.get_tenant().as_str(), &db_name, &tbl_name) + .await?; + let mutator = build_mutator(ctx.clone(), table.clone()).await?; + + // compact commit + mutator.try_commit(table).await?; + + // check count + let expected = vec![ + "+---------------+-------+", + "| segment_count | count |", + "+---------------+-------+", + "| 1 | 1 |", + "+---------------+-------+", + ]; + let qry = format!( + "select segment_count, block_count as count from fuse_snapshot('{}', '{}') limit 1", + db_name, tbl_name + ); + expects_ok( + "check segment and block count", + execute_query(fixture.ctx(), qry.as_str()).await, + expected, + ) + .await?; + Ok(()) +} + +#[tokio::test] +async fn test_compact_resolved_conflict() -> Result<()> { + let fixture = TestFixture::new().await; + let ctx = fixture.ctx(); + let tbl_name = fixture.default_table_name(); + let db_name = fixture.default_db_name(); + + fixture.create_normal_table().await?; + + // insert + for i in 0..9 { + let qry = format!("insert into {}.{}(id) values({})", db_name, tbl_name, i); + execute_command(ctx.clone(), qry.as_str()).await?; + } + + // compact + let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?; + let table = catalog + .get_table(ctx.get_tenant().as_str(), &db_name, &tbl_name) + .await?; + let mutator = build_mutator(ctx.clone(), table.clone()).await?; + + // insert + let qry = format!("insert into {}.{}(id) values(10)", db_name, tbl_name); + execute_command(ctx.clone(), qry.as_str()).await?; + + // compact commit + mutator.try_commit(table).await?; + + // check count + let expected = vec![ + "+---------------+-------+", + "| segment_count | count |", + "+---------------+-------+", + "| 2 | 2 |", + "+---------------+-------+", + ]; + let qry = format!( + "select segment_count, block_count as count from fuse_snapshot('{}', '{}') limit 1", + db_name, tbl_name + ); + expects_ok( + "check segment and block count", + execute_query(fixture.ctx(), qry.as_str()).await, + expected, + ) + .await?; + Ok(()) +} + +#[tokio::test] +async fn test_compact_unresolved_conflict() -> Result<()> { + let fixture = TestFixture::new().await; + let ctx = fixture.ctx(); + let tbl_name = fixture.default_table_name(); + let db_name = fixture.default_db_name(); + + fixture.create_normal_table().await?; + + // insert + for i in 0..9 { + let qry = format!("insert into {}.{}(id) values({})", db_name, tbl_name, i); + execute_command(ctx.clone(), qry.as_str()).await?; + } + + // compact + let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?; + let table = catalog + .get_table(ctx.get_tenant().as_str(), &db_name, &tbl_name) + .await?; + let mutator = build_mutator(ctx.clone(), table.clone()).await?; + + // delete + let query = format!("delete from {}.{} where id=1", db_name, tbl_name); + let mut planner = Planner::new(ctx.clone()); + let (plan, _, _) = planner.plan_sql(&query).await?; + if let Plan::Delete(delete) = plan { + table.delete(ctx.clone(), *delete.clone()).await?; + } + + // compact commit + let r = mutator.try_commit(table).await; + assert!(r.is_err()); + assert_eq!(r.err().unwrap().code(), ErrorCode::storage_other_code()); + + Ok(()) +} + +async fn build_mutator( + ctx: Arc, + table: Arc, +) -> Result> { + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let settings = ctx.get_settings(); + settings.set_max_threads(1)?; + let mut pipeline = common_pipeline_core::Pipeline::create(); + let mutator = fuse_table.compact(ctx.clone(), &mut pipeline).await?; + assert!(mutator.is_some()); + let mutator = mutator.unwrap(); + pipeline.set_max_threads(1); + let executor_settings = ExecutorSettings::try_create(&settings)?; + let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?; + ctx.set_executor(Arc::downgrade(&executor.get_inner())); + executor.execute()?; + drop(executor); + Ok(mutator) +} diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs index fdd07370aef8..90c4ceb9f512 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs @@ -111,13 +111,13 @@ async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> { } } - let new_snapshot = mutator.into_new_snapshot().await?; + let (segments, _, _) = mutator.generate_segments().await?; // half segments left after deletion - assert_eq!(new_snapshot.segments.len(), 50); + assert_eq!(segments.len(), 50); // new_segments should be a subset of test_segments in our case (no partial deletion of segment) - let new_segments = HashSet::<_, RandomState>::from_iter(new_snapshot.segments.into_iter()); + let new_segments = HashSet::<_, RandomState>::from_iter(segments.into_iter()); let test_segments = HashSet::from_iter(test_segment_locations.into_iter()); assert!(new_segments.is_subset(&test_segments)); diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs index 86121db4fb44..7171c435d8ad 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod compact_mutator; mod deletion_mutator; mod recluster_mutator; diff --git a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs index aa26e5b3bbed..ba7bf06c5077 100644 --- a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs +++ b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs @@ -26,8 +26,6 @@ use common_fuse_meta::meta::ColumnStatistics; use common_legacy_planners::Extras; use common_legacy_planners::Projection; use common_storages_fuse::ColumnLeaves; -use databend_query::interpreters::CreateTableInterpreterV2; -use databend_query::interpreters::Interpreter; use databend_query::storages::fuse::ColumnLeaf; use databend_query::storages::fuse::FuseTable; use futures::TryStreamExt; @@ -146,9 +144,7 @@ async fn test_fuse_table_exact_statistic() -> Result<()> { let fixture = TestFixture::new().await; let ctx = fixture.ctx(); - let create_table_plan = fixture.default_crate_table_plan(); - let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?; - interpreter.execute(ctx.clone()).await?; + fixture.create_default_table().await?; let mut table = fixture.latest_default_table().await?; diff --git a/src/query/service/tests/it/storages/fuse/table.rs b/src/query/service/tests/it/storages/fuse/table.rs index 8f3248467812..9be8cc3f45b0 100644 --- a/src/query/service/tests/it/storages/fuse/table.rs +++ b/src/query/service/tests/it/storages/fuse/table.rs @@ -45,9 +45,7 @@ async fn test_fuse_table_normal_case() -> Result<()> { let fixture = TestFixture::new().await; let ctx = fixture.ctx(); - let create_table_plan = fixture.default_crate_table_plan(); - let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?; - interpreter.execute(ctx.clone()).await?; + fixture.create_default_table().await?; let mut table = fixture.latest_default_table().await?; @@ -172,9 +170,7 @@ async fn test_fuse_table_truncate() -> Result<()> { let fixture = TestFixture::new().await; let ctx = fixture.ctx(); - let create_table_plan = fixture.default_crate_table_plan(); - let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?; - interpreter.execute(ctx.clone()).await?; + fixture.create_default_table().await?; let table = fixture.latest_default_table().await?; @@ -235,15 +231,10 @@ async fn test_fuse_table_truncate() -> Result<()> { async fn test_fuse_table_optimize() -> Result<()> { let fixture = TestFixture::new().await; let ctx = fixture.ctx(); - let mut planner = Planner::new(ctx.clone()); + let tbl_name = fixture.default_table_name(); + let db_name = fixture.default_db_name(); - let create_table_plan = fixture.create_normal_table_plan(); - - // create test table - let tbl_name = create_table_plan.table.clone(); - let db_name = create_table_plan.database.clone(); - let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?; - interpreter.execute(ctx.clone()).await?; + fixture.create_normal_table().await?; // insert 5 times let n = 5; @@ -266,6 +257,7 @@ async fn test_fuse_table_optimize() -> Result<()> { // do compact let query = format!("optimize table {}.{} compact", db_name, tbl_name); + let mut planner = Planner::new(ctx.clone()); let (plan, _, _) = planner.plan_sql(&query).await?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; diff --git a/src/query/service/tests/it/storages/fuse/table_functions/clustering_information_table.rs b/src/query/service/tests/it/storages/fuse/table_functions/clustering_information_table.rs index 673e4cb7d484..d807726670ff 100644 --- a/src/query/service/tests/it/storages/fuse/table_functions/clustering_information_table.rs +++ b/src/query/service/tests/it/storages/fuse/table_functions/clustering_information_table.rs @@ -18,8 +18,6 @@ use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; use common_legacy_expression::*; -use databend_query::interpreters::CreateTableInterpreterV2; -use databend_query::interpreters::Interpreter; use tokio_stream::StreamExt; use crate::storages::fuse::table_test_fixture::*; @@ -32,9 +30,7 @@ async fn test_clustering_information_table_read() -> Result<()> { let ctx = fixture.ctx(); // test db & table - let create_table_plan = fixture.default_crate_table_plan(); - let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?; - interpreter.execute(ctx.clone()).await?; + fixture.create_default_table().await?; // func args let arg_db = LegacyExpression::create_literal(DataValue::String(db.as_bytes().to_vec())); diff --git a/src/query/service/tests/it/storages/fuse/table_functions/fuse_block_table.rs b/src/query/service/tests/it/storages/fuse/table_functions/fuse_block_table.rs index 10e2aef27e00..a01497c2a846 100644 --- a/src/query/service/tests/it/storages/fuse/table_functions/fuse_block_table.rs +++ b/src/query/service/tests/it/storages/fuse/table_functions/fuse_block_table.rs @@ -16,8 +16,6 @@ use common_base::base::tokio; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; -use databend_query::interpreters::CreateTableInterpreterV2; -use databend_query::interpreters::Interpreter; use tokio_stream::StreamExt; use crate::storages::fuse::table_test_fixture::*; @@ -30,9 +28,7 @@ async fn test_fuse_block_table() -> Result<()> { let ctx = fixture.ctx(); // test db & table - let create_table_plan = fixture.default_crate_table_plan(); - let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?; - interpreter.execute(ctx.clone()).await?; + fixture.create_default_table().await?; { let expected = vec![ diff --git a/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs b/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs index 9ac35e0251f9..469526f2556c 100644 --- a/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs +++ b/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs @@ -18,8 +18,6 @@ use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; use common_legacy_expression::*; -use databend_query::interpreters::CreateTableInterpreterV2; -use databend_query::interpreters::Interpreter; use tokio_stream::StreamExt; use crate::storages::fuse::table_test_fixture::*; @@ -82,9 +80,7 @@ async fn test_fuse_snapshot_table_read() -> Result<()> { let ctx = fixture.ctx(); // test db & table - let create_table_plan = fixture.default_crate_table_plan(); - let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?; - interpreter.execute(ctx.clone()).await?; + fixture.create_default_table().await?; { let expected = vec![ diff --git a/src/query/service/tests/it/storages/fuse/table_test_fixture.rs b/src/query/service/tests/it/storages/fuse/table_test_fixture.rs index 7d468f8b79d2..e6db04233a5e 100644 --- a/src/query/service/tests/it/storages/fuse/table_test_fixture.rs +++ b/src/query/service/tests/it/storages/fuse/table_test_fixture.rs @@ -164,7 +164,7 @@ impl TestFixture { } // create a normal table without cluster key. - pub fn create_normal_table_plan(&self) -> CreateTablePlanV2 { + pub fn normal_create_table_plan(&self) -> CreateTablePlanV2 { CreateTablePlanV2 { if_not_exists: false, tenant: self.default_tenant(), @@ -194,6 +194,14 @@ impl TestFixture { Ok(()) } + pub async fn create_normal_table(&self) -> Result<()> { + let create_table_plan = self.normal_create_table_plan(); + let interpreter = + CreateTableInterpreterV2::try_create(self.ctx.clone(), create_table_plan)?; + interpreter.execute(self.ctx.clone()).await?; + Ok(()) + } + pub fn gen_sample_blocks(num: usize, start: i32) -> Vec> { Self::gen_sample_blocks_ex(num, 3, start) } diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index a7626d38ab6f..21ee8c6b88b9 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -46,10 +46,13 @@ use tracing::warn; use uuid::Uuid; use crate::io::write_meta; +use crate::io::SegmentsIO; use crate::io::TableMetaLocationGenerator; +use crate::operations::mutation::AbortOperation; use crate::operations::AppendOperationLogEntry; use crate::operations::TableOperationLog; use crate::statistics; +use crate::statistics::merge_statistics; use crate::FuseTable; use crate::OPT_KEY_LEGACY_SNAPSHOT_LOC; use crate::OPT_KEY_SNAPSHOT_LOCATION; @@ -57,6 +60,7 @@ use crate::OPT_KEY_SNAPSHOT_LOCATION; const OCC_DEFAULT_BACKOFF_INIT_DELAY_MS: Duration = Duration::from_millis(5); const OCC_DEFAULT_BACKOFF_MAX_DELAY_MS: Duration = Duration::from_millis(20 * 1000); const OCC_DEFAULT_BACKOFF_MAX_ELAPSED_MS: Duration = Duration::from_millis(120 * 1000); +const MAX_RETRIES: u64 = 10; impl FuseTable { pub async fn do_commit( @@ -393,6 +397,98 @@ impl FuseTable { warn!("write last snapshot hint failure. {}", e); }) } + + pub async fn commit_mutation( + &self, + ctx: Arc, + snapshot: Arc, + mut segments: Vec, + mut summary: Statistics, + abort_operation: AbortOperation, + ) -> Result<()> { + let mut table = self; + let mut latest: Arc; + let mut retries = 0; + let mut latest_snapshot; + let mut base_snapshot = snapshot; + let mut current_table_info = &self.table_info; + + while retries < MAX_RETRIES { + let mut snapshot_tobe_committed = TableSnapshot::from_previous(base_snapshot.as_ref()); + snapshot_tobe_committed.segments = segments.clone(); + snapshot_tobe_committed.summary = summary.clone(); + match Self::commit_to_meta_server( + ctx.as_ref(), + current_table_info, + &self.meta_location_generator, + snapshot_tobe_committed, + &self.operator, + ) + .await + { + Err(e) if e.code() == ErrorCode::table_version_mismatched_code() => { + latest = table.refresh(ctx.as_ref()).await?; + table = FuseTable::try_from_table(latest.as_ref())?; + + latest_snapshot = + table + .read_table_snapshot(ctx.clone()) + .await? + .ok_or_else(|| { + ErrorCode::LogicalError( + "mutation meets empty snapshot during conflict reconciliation", + ) + })?; + current_table_info = &table.table_info; + + if latest_snapshot.segments.len() < base_snapshot.segments.len() { + abort_operation + .abort(ctx.clone(), self.operator.clone()) + .await?; + return Err(ErrorCode::StorageOther( + "mutation conflicts, concurrent mutation detected while committing segment compaction operation", + )); + } + + // Check if there is only insertion during the operation. + let mut new_segments = latest_snapshot.segments.clone(); + let suffix = new_segments + .split_off(latest_snapshot.segments.len() - base_snapshot.segments.len()); + if suffix.ne(&base_snapshot.segments) { + abort_operation + .abort(ctx.clone(), self.operator.clone()) + .await?; + return Err(ErrorCode::StorageOther( + "mutation conflicts, concurrent mutation detected while committing segment compaction operation", + )); + } + + info!("resolvable conflicts detected"); + // Read all segments information in parallel. + let fuse_segment_io = SegmentsIO::create(ctx.clone(), table.operator.clone()); + let results = fuse_segment_io.read_segments(&new_segments).await?; + for result in results.iter() { + let segment = result.clone()?; + summary = merge_statistics(&summary, &segment.summary)?; + } + new_segments.extend(segments.clone()); + segments = new_segments; + base_snapshot = latest_snapshot; + retries += 1; + } + Err(e) => return Err(e), + Ok(_) => return Ok(()), + } + } + + abort_operation + .abort(ctx.clone(), self.operator.clone()) + .await?; + Err(ErrorCode::StorageOther(format!( + "commit mutation failed after {} retries", + retries + ))) + } } mod utils { @@ -410,6 +506,9 @@ mod utils { // if deletion operation failed (after DAL retried) // we just left them there, and let the "major GC" collect them let _ = operator.object(block_location).delete().await; + if let Some(index) = &block.bloom_filter_index_location { + let _ = operator.object(&index.0).delete().await; + } } let _ = operator.object(&entry.segment_location).delete().await; } diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index c953c010a3fe..de8979a75734 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use common_catalog::table::Table; use common_catalog::table_context::TableContext; use common_datavalues::DataSchemaRefExt; use common_exception::ErrorCode; @@ -129,25 +128,26 @@ impl FuseTable { } } } - self.commit_deletion(ctx.as_ref(), deletion_collector).await + + self.commit_deletion(ctx, deletion_collector).await } async fn commit_deletion( &self, - ctx: &dyn TableContext, + ctx: Arc, del_holder: DeletionMutator, ) -> Result<()> { - let new_snapshot = del_holder.into_new_snapshot().await?; - Self::commit_to_meta_server( - ctx, - self.get_table_info(), - &self.meta_location_generator, - new_snapshot, - &self.operator, - ) - .await?; + let (segments, summary, abort_operation) = del_holder.generate_segments().await?; + // TODO check if error is recoverable, and try to resolve the conflict - Ok(()) + self.commit_mutation( + ctx.clone(), + del_holder.base_snapshot().clone(), + segments, + summary, + abort_operation, + ) + .await } fn cluster_stats_gen(&self, ctx: Arc) -> Result { diff --git a/src/query/storages/fuse/src/operations/mutation/abort_operation.rs b/src/query/storages/fuse/src/operations/mutation/abort_operation.rs new file mode 100644 index 000000000000..dc32b3351219 --- /dev/null +++ b/src/query/storages/fuse/src/operations/mutation/abort_operation.rs @@ -0,0 +1,51 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_catalog::table_context::TableContext; +use common_exception::Result; +use common_fuse_meta::meta::BlockMeta; +use opendal::Operator; + +use crate::io::Files; + +#[derive(Default, Clone, Debug)] +pub struct AbortOperation { + pub segments: Vec, + pub blocks: Vec, + pub bloom_filter_indexes: Vec, +} + +impl AbortOperation { + pub fn add_block(mut self, block: &BlockMeta) -> Self { + let block_location = block.location.clone(); + self.blocks.push(block_location.0); + if let Some(index) = block.bloom_filter_index_location.clone() { + self.bloom_filter_indexes.push(index.0); + } + self + } + + pub fn add_segment(mut self, segment: String) -> Self { + self.segments.push(segment); + self + } + + pub async fn abort(self, ctx: Arc, operator: Operator) -> Result<()> { + let fuse_file = Files::create(ctx, operator); + let locations = vec![self.blocks, self.bloom_filter_indexes, self.segments].concat(); + fuse_file.remove_file_in_batch(&locations).await + } +} diff --git a/src/query/storages/fuse/src/operations/mutation/base_mutator.rs b/src/query/storages/fuse/src/operations/mutation/base_mutator.rs index a3f1c79ff1c6..d28f03070914 100644 --- a/src/query/storages/fuse/src/operations/mutation/base_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/base_mutator.rs @@ -26,6 +26,7 @@ use common_fuse_meta::meta::Statistics; use common_fuse_meta::meta::TableSnapshot; use opendal::Operator; +use super::AbortOperation; use crate::io::MetaReaders; use crate::io::SegmentWriter; use crate::io::TableMetaLocationGenerator; @@ -81,19 +82,8 @@ impl BaseMutator { }); } - pub async fn into_new_snapshot( - self, - segments: Vec, - summary: Statistics, - ) -> Result { - let snapshot = self.base_snapshot; - let mut new_snapshot = TableSnapshot::from_previous(&snapshot); - new_snapshot.segments = segments; - new_snapshot.summary = summary; - Ok(new_snapshot) - } - - pub async fn generate_segments(&self) -> Result<(Vec, Statistics)> { + pub async fn generate_segments(&self) -> Result<(Vec, Statistics, AbortOperation)> { + let mut abort_operation = AbortOperation::default(); let segments = self.base_snapshot.segments.clone(); let mut segments_editor = HashMap::<_, _, RandomState>::from_iter(segments.clone().into_iter().enumerate()); @@ -143,6 +133,7 @@ impl BaseMutator { )) })?; if let Some(block_meta) = replacement.new_block_meta { + abort_operation = abort_operation.add_block(&block_meta); block_editor.insert(*position, block_meta); } else { block_editor.remove(position); @@ -160,7 +151,8 @@ impl BaseMutator { new_segment.summary = new_summary; // write down new segment let new_segment_location = seg_writer.write_segment(new_segment).await?; - segments_editor.insert(seg_idx, new_segment_location); + segments_editor.insert(seg_idx, new_segment_location.clone()); + abort_operation = abort_operation.add_segment(new_segment_location.0); } } @@ -175,6 +167,6 @@ impl BaseMutator { // update the summary of new snapshot let new_summary = reduce_statistics(&new_segment_summaries)?; - Ok((new_segments, new_summary)) + Ok((new_segments, new_summary, abort_operation)) } } diff --git a/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs index 0630174039d3..36c0e5e702ea 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs @@ -22,18 +22,19 @@ use common_fuse_meta::meta::SegmentInfo; use common_fuse_meta::meta::Statistics; use common_fuse_meta::meta::TableSnapshot; use common_fuse_meta::meta::Versioned; -use common_meta_app::schema::TableInfo; use opendal::Operator; use crate::io::BlockCompactor; use crate::io::SegmentWriter; use crate::io::SegmentsIO; use crate::io::TableMetaLocationGenerator; +use crate::operations::mutation::AbortOperation; use crate::operations::AppendOperationLogEntry; use crate::statistics::merge_statistics; use crate::statistics::reducers::reduce_block_metas; use crate::statistics::reducers::reduce_statistics; use crate::FuseTable; +use crate::Table; use crate::TableContext; use crate::TableMutator; @@ -45,6 +46,8 @@ pub struct CompactMutator { block_compactor: BlockCompactor, location_generator: TableMetaLocationGenerator, selected_blocks: Vec, + // Blocks that need to be reorganized into new segments. + remain_blocks: Vec, segments: Vec, summary: Statistics, block_per_seg: usize, @@ -69,6 +72,7 @@ impl CompactMutator { block_compactor, location_generator, selected_blocks: Vec::new(), + remain_blocks: Vec::new(), segments: Vec::new(), summary: Statistics::default(), block_per_seg, @@ -94,8 +98,6 @@ impl TableMutator for CompactMutator { async fn blocks_select(&mut self) -> Result { let snapshot = self.base_snapshot.clone(); let segment_locations = &snapshot.segments; - // Blocks that need to be reorganized into new segments. - let mut remain_blocks = Vec::new(); let mut summarys = Vec::new(); // Read all segments information in parallel. @@ -128,15 +130,26 @@ impl TableMutator for CompactMutator { continue; } - remain_blocks.append(&mut remains); + self.remain_blocks.append(&mut remains); } if self.selected_blocks.is_empty() - && (remain_blocks.is_empty() || snapshot.segments.len() <= self.segments.len() + 1) + && (self.remain_blocks.is_empty() || snapshot.segments.len() <= self.segments.len() + 1) { return Ok(false); } + // update the summary of new snapshot + self.summary = reduce_statistics(&summarys)?; + Ok(true) + } + + async fn try_commit(&self, table: Arc) -> Result<()> { + let ctx = self.ctx.clone(); + let mut segments = self.segments.clone(); + let mut summary = self.summary.clone(); + let mut abort_operation = AbortOperation::default(); + // Create new segments. let segment_info_cache = CacheManager::instance().get_table_segment_cache(); let seg_writer = SegmentWriter::new( @@ -144,25 +157,16 @@ impl TableMutator for CompactMutator { &self.location_generator, &segment_info_cache, ); - let chunks = remain_blocks.chunks(self.block_per_seg); + let chunks = self.remain_blocks.chunks(self.block_per_seg); for chunk in chunks { let new_summary = reduce_block_metas(chunk)?; let new_segment = SegmentInfo::new(chunk.to_vec(), new_summary.clone()); let new_segment_location = seg_writer.write_segment(new_segment).await?; - self.segments.push(new_segment_location); - summarys.push(new_summary); - } + segments.push(new_segment_location.clone()); + summary = merge_statistics(&summary, &new_summary)?; - // update the summary of new snapshot - self.summary = reduce_statistics(&summarys)?; - Ok(true) - } - - async fn try_commit(&self, table_info: &TableInfo) -> Result<()> { - let ctx = self.ctx.clone(); - let snapshot = self.base_snapshot.clone(); - let mut new_snapshot = TableSnapshot::from_previous(&snapshot); - new_snapshot.segments = self.segments.clone(); + abort_operation = abort_operation.add_segment(new_segment_location.0); + } let append_entries = ctx.consume_precommit_blocks(); let append_log_entries = append_entries @@ -173,20 +177,29 @@ impl TableMutator for CompactMutator { let (merged_segments, merged_summary) = FuseTable::merge_append_operations(&append_log_entries)?; - let mut merged_segments = merged_segments - .into_iter() - .map(|loc| (loc, SegmentInfo::VERSION)) - .collect(); - new_snapshot.segments.append(&mut merged_segments); - new_snapshot.summary = merge_statistics(&self.summary, &merged_summary)?; + for entry in append_log_entries { + for block in &entry.segment_info.blocks { + abort_operation = abort_operation.add_block(block); + } + abort_operation = abort_operation.add_segment(entry.segment_location); + } - FuseTable::commit_to_meta_server( - ctx.as_ref(), - table_info, - &self.location_generator, - new_snapshot, - &self.data_accessor, - ) - .await + segments.extend( + merged_segments + .into_iter() + .map(|loc| (loc, SegmentInfo::VERSION)), + ); + summary = merge_statistics(&summary, &merged_summary)?; + + let table = FuseTable::try_from_table(table.as_ref())?; + table + .commit_mutation( + ctx.clone(), + self.base_snapshot.clone(), + segments, + summary, + abort_operation, + ) + .await } } diff --git a/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs b/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs index 0a08818abee7..34d6426870d2 100644 --- a/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs @@ -19,9 +19,11 @@ use common_datablocks::DataBlock; use common_exception::Result; use common_fuse_meta::meta::ClusterStatistics; use common_fuse_meta::meta::Location; +use common_fuse_meta::meta::Statistics; use common_fuse_meta::meta::TableSnapshot; use opendal::Operator; +use super::AbortOperation; use crate::io::BlockWriter; use crate::io::TableMetaLocationGenerator; use crate::operations::mutation::BaseMutator; @@ -52,9 +54,12 @@ impl DeletionMutator { }) } - pub async fn into_new_snapshot(self) -> Result { - let (segments, summary) = self.base_mutator.generate_segments().await?; - self.base_mutator.into_new_snapshot(segments, summary).await + pub fn base_snapshot(self) -> Arc { + self.base_mutator.base_snapshot + } + + pub async fn generate_segments(&self) -> Result<(Vec, Statistics, AbortOperation)> { + self.base_mutator.generate_segments().await } /// Records the replacements: diff --git a/src/query/storages/fuse/src/operations/mutation/mod.rs b/src/query/storages/fuse/src/operations/mutation/mod.rs index 2d3170eff00d..4e3e440156f8 100644 --- a/src/query/storages/fuse/src/operations/mutation/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/mod.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod abort_operation; pub mod base_mutator; pub mod block_filter; pub mod compact_mutator; pub mod deletion_mutator; pub mod recluster_mutator; +pub use abort_operation::AbortOperation; pub use base_mutator::BaseMutator; pub use block_filter::delete_from_block; pub use compact_mutator::CompactMutator; diff --git a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs index fe03d8d8bb48..6a9286b9def7 100644 --- a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs @@ -24,7 +24,6 @@ use common_fuse_meta::meta::BlockMeta; use common_fuse_meta::meta::SegmentInfo; use common_fuse_meta::meta::TableSnapshot; use common_fuse_meta::meta::Versioned; -use common_meta_app::schema::TableInfo; use opendal::Operator; use crate::io::BlockCompactor; @@ -34,6 +33,7 @@ use crate::operations::AppendOperationLogEntry; use crate::sessions::TableContext; use crate::statistics::merge_statistics; use crate::FuseTable; +use crate::Table; use crate::TableMutator; static MAX_BLOCK_COUNT: usize = 50; @@ -46,7 +46,6 @@ pub struct ReclusterMutator { level: i32, block_compactor: BlockCompactor, threshold: f64, - data_accessor: Operator, } impl ReclusterMutator { @@ -59,12 +58,8 @@ impl ReclusterMutator { blocks_map: BTreeMap>, data_accessor: Operator, ) -> Result { - let base_mutator = BaseMutator::try_create( - ctx, - data_accessor.clone(), - location_generator, - base_snapshot, - )?; + let base_mutator = + BaseMutator::try_create(ctx, data_accessor, location_generator, base_snapshot)?; Ok(Self { base_mutator, blocks_map, @@ -72,7 +67,6 @@ impl ReclusterMutator { level: 0, block_compactor, threshold, - data_accessor, }) } @@ -208,10 +202,11 @@ impl TableMutator for ReclusterMutator { Ok(false) } - async fn try_commit(&self, table_info: &TableInfo) -> Result<()> { + async fn try_commit(&self, table: Arc) -> Result<()> { let base_mutator = self.base_mutator.clone(); let ctx = base_mutator.ctx.clone(); - let (mut segments, mut summary) = self.base_mutator.generate_segments().await?; + let (mut segments, mut summary, mut abort_operation) = + self.base_mutator.generate_segments().await?; let append_entries = ctx.consume_precommit_blocks(); let append_log_entries = append_entries @@ -222,24 +217,29 @@ impl TableMutator for ReclusterMutator { let (merged_segments, merged_summary) = FuseTable::merge_append_operations(&append_log_entries)?; - let mut merged_segments = merged_segments - .into_iter() - .map(|loc| (loc, SegmentInfo::VERSION)) - .collect(); + for entry in append_log_entries { + for block in &entry.segment_info.blocks { + abort_operation = abort_operation.add_block(block); + } + abort_operation = abort_operation.add_segment(entry.segment_location); + } - segments.append(&mut merged_segments); + segments.extend( + merged_segments + .into_iter() + .map(|loc| (loc, SegmentInfo::VERSION)), + ); summary = merge_statistics(&summary, &merged_summary)?; - let new_snapshot = base_mutator.into_new_snapshot(segments, summary).await?; - - FuseTable::commit_to_meta_server( - ctx.as_ref(), - table_info, - &self.base_mutator.location_generator, - new_snapshot, - &self.data_accessor, - ) - .await?; - Ok(()) + let table = FuseTable::try_from_table(table.as_ref())?; + table + .commit_mutation( + ctx.clone(), + base_mutator.base_snapshot.clone(), + segments, + summary, + abort_operation, + ) + .await } }