diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 49dbb45b0fbb..46e08e6feb86 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -234,7 +234,7 @@ pub trait Table: Sync + Send { ctx: Arc, target: CompactTarget, pipeline: &mut Pipeline, - ) -> Result>> { + ) -> Result>> { let (_, _, _) = (ctx, target, pipeline); Err(ErrorCode::UnImplement(format!( @@ -249,7 +249,7 @@ pub trait Table: Sync + Send { ctx: Arc, pipeline: &mut Pipeline, push_downs: Option, - ) -> Result>> { + ) -> Result>> { let (_, _, _) = (ctx, pipeline, push_downs); Err(ErrorCode::UnImplement(format!( diff --git a/src/query/catalog/src/table_mutator.rs b/src/query/catalog/src/table_mutator.rs index 7fd5b670af80..47ad847af438 100644 --- a/src/query/catalog/src/table_mutator.rs +++ b/src/query/catalog/src/table_mutator.rs @@ -21,5 +21,5 @@ use crate::table::Table; #[async_trait::async_trait] pub trait TableMutator: Send + Sync { async fn target_select(&mut self) -> Result; - async fn try_commit(&self, table: Arc) -> Result<()>; + async fn try_commit(self: Box, table: Arc) -> Result<()>; } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/full_compact_mutator.rs similarity index 99% rename from src/query/service/tests/it/storages/fuse/operations/mutation/compact_mutator.rs rename to src/query/service/tests/it/storages/fuse/operations/mutation/full_compact_mutator.rs index 3cdb906fa77e..e6fd81c44d22 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/full_compact_mutator.rs @@ -170,7 +170,7 @@ async fn test_compact_unresolved_conflict() -> Result<()> { async fn build_mutator( ctx: Arc, table: Arc, -) -> Result> { +) -> Result> { let fuse_table = FuseTable::try_from_table(table.as_ref())?; let settings = ctx.get_settings(); settings.set_max_threads(1)?; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs index 0e918ffe9fcf..2898801f4922 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod compact_mutator; -mod compact_segments_mutator; mod deletion_mutator; +mod full_compact_mutator; mod recluster_mutator; +mod segments_compact_mutator; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/compact_segments_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs similarity index 94% rename from src/query/service/tests/it/storages/fuse/operations/mutation/compact_segments_mutator.rs rename to src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 8a386eb505fb..d0bfa9fee550 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/compact_segments_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_base::base::tokio; use common_catalog::table::CompactTarget; use common_catalog::table::Table; +use common_catalog::table::TableExt; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; @@ -123,6 +124,18 @@ async fn test_compact_segment_resolvable_conflict() -> Result<()> { let count_block = "select block_count as count from fuse_snapshot('default', 't') limit 1"; let stream = execute_query(fixture.ctx(), count_block).await?; assert_eq!(num_inserts as u64 * 2, check_count(stream).await?); + + // check table statistics + + let latest = table.refresh(ctx.as_ref()).await?; + let latest_fuse_table = FuseTable::try_from_table(latest.as_ref())?; + let table_statistics = latest_fuse_table + .table_statistics(ctx.clone()) + .await? + .unwrap(); + + assert_eq!(table_statistics.num_rows.unwrap() as usize, num_inserts * 2); + Ok(()) } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 3340800b78f0..06dd8ebd41e6 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -457,7 +457,7 @@ impl Table for FuseTable { ctx: Arc, target: CompactTarget, pipeline: &mut Pipeline, - ) -> Result>> { + ) -> Result>> { self.do_compact(ctx, target, pipeline).await } @@ -466,7 +466,7 @@ impl Table for FuseTable { ctx: Arc, pipeline: &mut Pipeline, push_downs: Option, - ) -> Result>> { + ) -> Result>> { self.do_recluster(ctx, pipeline, push_downs).await } } diff --git a/src/query/storages/fuse/src/io/files.rs b/src/query/storages/fuse/src/io/files.rs index b01550290bd3..df606c5c2afd 100644 --- a/src/query/storages/fuse/src/io/files.rs +++ b/src/query/storages/fuse/src/io/files.rs @@ -36,16 +36,19 @@ impl Files { } #[tracing::instrument(level = "debug", skip_all)] - pub async fn remove_file_in_batch(&self, file_locations: &[String]) -> Result<()> { + pub async fn remove_file_in_batch( + &self, + file_locations: impl IntoIterator>, + ) -> Result<()> { let ctx = self.ctx.clone(); let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize; let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; // 1.1 combine all the tasks. - let mut iter = file_locations.iter(); + let mut iter = file_locations.into_iter(); let tasks = std::iter::from_fn(move || { if let Some(location) = iter.next() { - let location = location.clone(); + let location = location.as_ref().to_owned(); Some( Self::remove_file(self.operator.clone(), location) .instrument(tracing::debug_span!("remove_file")), diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 7968bd028872..970e74497b26 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -145,8 +145,7 @@ impl FuseTable { } None => { info!("aborting operations"); - let _ = - self::utils::abort_operations(self.get_operator(), operation_log).await; + let _ = utils::abort_operations(self.get_operator(), operation_log).await; break Err(ErrorCode::OCCRetryFailure(format!( "can not fulfill the tx after retries({} times, {} ms), aborted. table name {}, identity {}", retry_times, @@ -238,7 +237,7 @@ impl FuseTable { // 1. merge stats with previous snapshot, if any let stats = if let Some(snapshot) = &previous { let summary = &snapshot.summary; - statistics::merge_statistics(&statistics, summary)? + merge_statistics(&statistics, summary)? } else { statistics }; @@ -284,7 +283,7 @@ impl FuseTable { snapshot_location.clone(), ); // remove legacy options - self::utils::remove_legacy_options(&mut new_table_meta.options); + utils::remove_legacy_options(&mut new_table_meta.options); // 2.2 setup table statistics let stats = &snapshot.summary; @@ -413,24 +412,39 @@ impl FuseTable { &self, ctx: &Arc, base_snapshot: Arc, - mut segments: Vec, - mut summary: Statistics, + base_segments: Vec, + base_summary: Statistics, abort_operation: AbortOperation, ) -> Result<()> { - let mut table = self; - let mut latest: Arc; let mut retries = 0; + let mut latest_snapshot = base_snapshot.clone(); - let mut current_table_info = &self.table_info; + let mut latest_table_info = &self.table_info; + + // holding the reference of latest table during retries + let mut latest_table_ref: Arc; + + // potentially concurrently appended segments, init it to empty + let mut concurrently_appended_segment_locations: &[Location] = &[]; while retries < MAX_RETRIES { let mut snapshot_tobe_committed = TableSnapshot::from_previous(latest_snapshot.as_ref()); - snapshot_tobe_committed.segments = segments.clone(); - snapshot_tobe_committed.summary = summary.clone(); + + let (segments_tobe_committed, statistics_tobe_committed) = Self::merge_with_base( + ctx.clone(), + self.operator.clone(), + &base_segments, + &base_summary, + concurrently_appended_segment_locations, + ) + .await?; + snapshot_tobe_committed.segments = segments_tobe_committed; + snapshot_tobe_committed.summary = statistics_tobe_committed; + match Self::commit_to_meta_server( ctx.as_ref(), - current_table_info, + latest_table_info, &self.meta_location_generator, snapshot_tobe_committed, &self.operator, @@ -438,22 +452,19 @@ impl FuseTable { .await { Err(e) if e.code() == ErrorCode::table_version_mismatched_code() => { - latest = table.refresh(ctx.as_ref()).await?; - table = FuseTable::try_from_table(latest.as_ref())?; - - latest_snapshot = - table - .read_table_snapshot(ctx.clone()) - .await? - .ok_or_else(|| { - ErrorCode::LogicalError( - "mutation meets empty snapshot during conflict reconciliation", - ) - })?; - current_table_info = &table.table_info; + latest_table_ref = self.refresh(ctx.as_ref()).await?; + let latest_fuse_table = FuseTable::try_from_table(latest_table_ref.as_ref())?; + latest_snapshot = latest_fuse_table + .read_table_snapshot(ctx.clone()) + .await? + .ok_or_else(|| { + ErrorCode::LogicalError( + "mutation meets empty snapshot during conflict reconciliation", + ) + })?; + latest_table_info = &latest_fuse_table.table_info; // Check if there is only insertion during the operation. - let mut new_segments = latest_snapshot.segments.clone(); match MutatorConflictDetector::detect_conflicts( base_snapshot.as_ref(), latest_snapshot.as_ref(), @@ -462,30 +473,29 @@ impl FuseTable { abort_operation .abort(ctx.clone(), self.operator.clone()) .await?; + metrics::counter!("fuse_commit_mutation_unresolvable_conflict", 1); return Err(ErrorCode::StorageOther( "mutation conflicts, concurrent mutation detected while committing segment compaction operation", )); } Conflict::ResolvableAppend(range_of_newly_append) => { - // for resolvable append only operation, the range returned always begin with 0 - new_segments.truncate(range_of_newly_append.len()); + info!("resolvable conflicts detected"); + metrics::counter!("fuse_commit_mutation_resolvable_conflict", 1); + concurrently_appended_segment_locations = + &latest_snapshot.segments[range_of_newly_append]; } } - info!("resolvable conflicts detected"); - // Read all segments information in parallel. - let fuse_segment_io = SegmentsIO::create(ctx.clone(), table.operator.clone()); - let results = fuse_segment_io.read_segments(&new_segments).await?; - for result in results.iter() { - let segment = result.clone()?; - summary = merge_statistics(&summary, &segment.summary)?; - } - new_segments.extend(segments.clone()); - segments = new_segments; retries += 1; + metrics::counter!("fuse_commit_mutation_retry", 1); } Err(e) => return Err(e), - Ok(_) => return Ok(()), + Ok(_) => { + return { + metrics::counter!("fuse_commit_mutation_success", 1); + Ok(()) + }; + } } } @@ -497,6 +507,38 @@ impl FuseTable { retries ))) } + + async fn merge_with_base( + ctx: Arc, + operator: Operator, + base_segments: &[Location], + base_summary: &Statistics, + concurrently_appended_segment_locations: &[Location], + ) -> Result<(Vec, Statistics)> { + if concurrently_appended_segment_locations.is_empty() { + Ok((base_segments.to_owned(), base_summary.clone())) + } else { + // place the concurrently appended segments at the head of segment list + let new_segments = concurrently_appended_segment_locations + .iter() + .chain(base_segments.iter()) + .cloned() + .collect(); + + let fuse_segment_io = SegmentsIO::create(ctx, operator); + let concurrent_appended_segment_infos = fuse_segment_io + .read_segments(concurrently_appended_segment_locations) + .await?; + + let mut new_statistics = base_summary.clone(); + for result in concurrent_appended_segment_infos.into_iter() { + let concurrent_appended_segment = result?; + new_statistics = + merge_statistics(&new_statistics, &concurrent_appended_segment.summary)?; + } + Ok((new_segments, new_statistics)) + } + } } pub enum Conflict { @@ -522,9 +564,6 @@ impl MutatorConflictDetector { && base_segments[0..base_segments_len] == latest_segments[(latest_segments_len - base_segments_len)..latest_segments_len] { - // note (latest_segments_len == base_segments_len indicate) indicates that the latest - // snapshot is generated by non-data related operations, like `alter table add column` - // which can be merged Conflict::ResolvableAppend(0..(latest_segments_len - base_segments_len)) } else { Conflict::Unresolvable @@ -541,6 +580,7 @@ mod utils { operator: Operator, operation_log: TableOperationLog, ) -> Result<()> { + metrics::counter!("fuse_commit_mutation_aborts", 1); for entry in operation_log { for block in &entry.segment_info.blocks { let block_location = &block.location.0; diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index 3c046ed26a08..2c28c27155a0 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -44,7 +44,7 @@ impl FuseTable { ctx: Arc, target: CompactTarget, pipeline: &mut Pipeline, - ) -> Result>> { + ) -> Result>> { let snapshot_opt = self.read_table_snapshot(ctx.clone()).await?; let base_snapshot = if let Some(val) = snapshot_opt { val @@ -76,7 +76,7 @@ impl FuseTable { ctx: Arc, _pipeline: &mut Pipeline, options: CompactOptions, - ) -> Result>> { + ) -> Result>> { let mut segment_mutator = SegmentCompactMutator::try_create( ctx.clone(), options.base_snapshot, @@ -86,7 +86,7 @@ impl FuseTable { )?; if segment_mutator.target_select().await? { - Ok(Some(Arc::new(segment_mutator))) + Ok(Some(Box::new(segment_mutator))) } else { Ok(None) } @@ -97,7 +97,7 @@ impl FuseTable { ctx: Arc, pipeline: &mut Pipeline, options: CompactOptions, - ) -> Result>> { + ) -> Result>> { let block_compactor = self.get_block_compactor(); let block_per_seg = options.block_per_seg; @@ -162,6 +162,6 @@ impl FuseTable { ) })?; - Ok(Some(Arc::new(mutator))) + Ok(Some(Box::new(mutator))) } } diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index 696cb9f23537..0d793703a579 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -141,7 +141,7 @@ impl FuseTable { self.commit_mutation( &ctx, - del_holder.base_snapshot().clone(), + del_holder.base_snapshot(), segments, summary, abort_operation, diff --git a/src/query/storages/fuse/src/operations/mutation/abort_operation.rs b/src/query/storages/fuse/src/operations/mutation/abort_operation.rs index dc32b3351219..0c5052ea5057 100644 --- a/src/query/storages/fuse/src/operations/mutation/abort_operation.rs +++ b/src/query/storages/fuse/src/operations/mutation/abort_operation.rs @@ -45,7 +45,11 @@ impl AbortOperation { pub async fn abort(self, ctx: Arc, operator: Operator) -> Result<()> { let fuse_file = Files::create(ctx, operator); - let locations = vec![self.blocks, self.bloom_filter_indexes, self.segments].concat(); - fuse_file.remove_file_in_batch(&locations).await + let locations = self + .blocks + .into_iter() + .chain(self.bloom_filter_indexes.into_iter()) + .chain(self.segments.into_iter()); + fuse_file.remove_file_in_batch(locations).await } } diff --git a/src/query/storages/fuse/src/operations/mutation/compact_mutator/full_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact_mutator/full_compact_mutator.rs index ef2049239a62..ea03da7a809f 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact_mutator/full_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact_mutator/full_compact_mutator.rs @@ -144,10 +144,10 @@ impl TableMutator for FullCompactMutator { Ok(true) } - async fn try_commit(&self, table: Arc) -> Result<()> { + async fn try_commit(self: Box, table: Arc) -> Result<()> { let ctx = self.ctx.clone(); - let mut segments = self.segments.clone(); - let mut summary = self.summary.clone(); + let mut segments = self.segments; + let mut summary = self.summary; let mut abort_operation = AbortOperation::default(); // Create new segments. @@ -193,13 +193,7 @@ impl TableMutator for FullCompactMutator { let table = FuseTable::try_from_table(table.as_ref())?; table - .commit_mutation( - &ctx, - self.base_snapshot.clone(), - segments, - summary, - abort_operation, - ) + .commit_mutation(&ctx, self.base_snapshot, segments, summary, abort_operation) .await } } diff --git a/src/query/storages/fuse/src/operations/mutation/compact_mutator/segment_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact_mutator/segment_compact_mutator.rs index 791121037750..202d78cce04b 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact_mutator/segment_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact_mutator/segment_compact_mutator.rs @@ -16,8 +16,6 @@ use std::sync::Arc; use std::time::Instant; use common_catalog::table::Table; -use common_catalog::table::TableExt; -use common_exception::ErrorCode; use common_exception::Result; use common_fuse_meta::caches::CacheManager; use common_fuse_meta::meta::BlockMeta; @@ -25,26 +23,52 @@ use common_fuse_meta::meta::Location; use common_fuse_meta::meta::SegmentInfo; use common_fuse_meta::meta::Statistics; use common_fuse_meta::meta::TableSnapshot; -use metrics::counter; use metrics::gauge; use opendal::Operator; -use tracing::info; -use tracing::warn; -use crate::io::Files; use crate::io::SegmentWriter; use crate::io::SegmentsIO; use crate::io::TableMetaLocationGenerator; -use crate::operations::commit::Conflict; -use crate::operations::commit::MutatorConflictDetector; +use crate::operations::mutation::AbortOperation; use crate::statistics::merge_statistics; use crate::statistics::reducers::reduce_block_metas; -use crate::statistics::reducers::reduce_statistics; use crate::FuseTable; use crate::TableContext; use crate::TableMutator; -const MAX_RETRIES: u64 = 10; +// The rough idea goes like this: +// 1. `target_select` will working and the base_snapshot: compact segments and keep the compactions +// 2. `try_commit` will try to commit the compacted snapshot to the meta store. +// in condition of reconcilable conflicts detected, compactor will try to resolve the conflict and try again. +// after MAX_RETRIES attempts, if still failed, abort the operation even if conflict resolvable. +// +// example of a merge process: +// - segments of base_snapshot: +// [a, b, c, d, e, f, g] +// suppose a and d are compacted enough, and others are not. +// - after `select_target`: +// unchanged_segments : [a, d] +// compacted_segments : [x, y], where +// [b, c] compacted into x +// [e, f, g] compacted into y +// - if there is a concurrent tx, committed BEFORE compact segments, +// and suppose the segments of the tx's snapshot is +// [a, b, c, d, e, f, g, h, i] +// compare with the base_snapshot's segments +// [a, b, c, d, e, f, g] +// we know that tx is appended(only) on the base of base_snapshot +// and the `appended_segments` is [h, i] +// +// The final merged segments should be +// [x, y, a, d, h, i] +// +// note that +// 1. the concurrently appended(and committed) segment will NOT be compacted during tx +// conflicts resolving to make the committing of compact operation agile +// 2. in the implementation, the order of segment in the vector is arranged in reversed order +// - compaction starts from the head of the snapshot's segment vector. +// - newly compacted segments will be added to the end of snapshot's segment list + pub struct SegmentCompactMutator { ctx: Arc, // the snapshot that compactor working on, it never changed during phases compaction. @@ -52,39 +76,13 @@ pub struct SegmentCompactMutator { data_accessor: Operator, location_generator: TableMetaLocationGenerator, blocks_per_seg: usize, - // chosen from the base_snapshot.segments which contains number of - // blocks lesser than `blocks_per_seg`, and then compacted them into `compacted_segments`, - // such that, at most one of them contains less than `blocks_per_seg` number of blocks, - // and each of the others(if any) contains `blocks_per_seg` number of blocks. - compacted_segment_accumulator: SegmentAccumulator, - // segments of base_snapshot that need not to be compacted, but should be included in - // the new snapshot - unchanged_segment_accumulator: SegmentAccumulator, -} - -struct SegmentAccumulator { - // location of accumulated segments - locations: Vec, - // summarised statistics of all the accumulated segment - summary: Statistics, -} - -impl SegmentAccumulator { - fn new() -> Self { - Self { - locations: vec![], - summary: Default::default(), - } - } - - fn merge_stats(&mut self, stats: &Statistics) -> Result<()> { - self.summary = merge_statistics(stats, &self.summary)?; - Ok(()) - } - fn add_location(&mut self, location: Location) { - self.locations.push(location) - } + // summarised statistics of all the accumulated segments(compacted, and unchanged) + merged_segment_statistics: Statistics, + // locations all the accumulated segments(compacted, and unchanged) + merged_segments_locations: Vec, + // paths all the newly created segments (which are compacted) + new_segment_paths: Vec, } impl SegmentCompactMutator { @@ -101,51 +99,17 @@ impl SegmentCompactMutator { data_accessor: operator, location_generator, blocks_per_seg, - compacted_segment_accumulator: SegmentAccumulator::new(), - unchanged_segment_accumulator: SegmentAccumulator::new(), + merged_segment_statistics: Statistics::default(), + merged_segments_locations: vec![], + new_segment_paths: vec![], }) } fn need_compaction(&self) -> bool { - !self.compacted_segment_accumulator.locations.is_empty() + !self.merged_segments_locations.is_empty() } } -// The rough idea goes like this: -// 1. `target_select` will working and the base_snapshot: compact segments and keep the compactions -// 2. `try_commit` will try to commit the compacted snapshot to the meta store. -// in condition of reconcilable conflicts detected, compactor will try to resolve the conflict and try again. -// after MAX_RETRIES attempts, if still failed, abort the operation even if conflict resolvable. -// -// example of a merge process: -// - segments of base_snapshot: -// [a, b, c, d, e, f, g] -// suppose a and d are compacted enough, and others are not. -// - after `select_target`: -// unchanged_segments : [a, d] -// compacted_segments : [x, y], where -// [b, c] compacted into x -// [e, f, g] compacted into y -// - if there is a concurrent tx, committed BEFORE compact segments, -// and suppose the segments of the tx's snapshot is -// [a, b, c, d, e, f, g, h, i] -// compare with the base_snapshot's segments -// [a, b, c, d, e, f, g] -// we know that tx is appended(only) on the base of base_snapshot -// and the `appended_segments` is [h, i] -// -// The final merged segments should be -// [x, y, a, d, h, i] -// -// note that -// 1. the concurrently appended(and committed) segment will NOT be compacted here, to make the commit of compact agile -// 2. in the implementation, the order of segment in the vector is arranged in reversed order -// compaction starts from the head of the snapshot's segment vector. -// newly compacted segments will be added the end of snapshot's segment list, so that later -// when compaction of segments is executed incrementally, the newly added segments will be -// checked first. -// - #[async_trait::async_trait] impl TableMutator for SegmentCompactMutator { async fn target_select(&mut self) -> Result { @@ -162,14 +126,17 @@ impl TableMutator for SegmentCompactMutator { let blocks_per_segment_threshold = self.blocks_per_seg; let mut segments_tobe_compacted = Vec::with_capacity(base_segments.len() / 2); + + let mut unchanged_segment_locations = Vec::with_capacity(base_segments.len() / 2); + + let mut unchanged_segment_statistics = Statistics::default(); for (idx, segment) in base_segments.iter().enumerate() { let number_blocks = segment.blocks.len(); if number_blocks >= blocks_per_segment_threshold { // skip if current segment is large enough, mark it as unchanged - self.unchanged_segment_accumulator - .add_location(base_segment_locations[idx].clone()); - self.unchanged_segment_accumulator - .merge_stats(&segment.summary)?; + unchanged_segment_locations.push(base_segment_locations[idx].clone()); + unchanged_segment_statistics = + merge_statistics(&unchanged_segment_statistics, &segment.summary)?; continue; } else { // if number of blocks meets the threshold, mark them down @@ -192,25 +159,41 @@ impl TableMutator for SegmentCompactMutator { // split the block metas into chunks of blocks, with chunk size set to blocks_per_seg let chunk_of_blocks = blocks_of_new_segments.chunks(self.blocks_per_seg); + // Build new segments which are compacted according to the setting of `block_per_seg` + // note that the newly segments will be persistent into storage, such that if retry + // happens during the later `try_commit` phase, they do not need to be written again. + let mut compacted_segment_statistics = Statistics::default(); + let mut compacted_segment_locations = Vec::with_capacity(base_segments.len() / 2); let segment_info_cache = CacheManager::instance().get_table_segment_cache(); let segment_writer = SegmentWriter::new( &self.data_accessor, &self.location_generator, &segment_info_cache, ); - - // Build new segments which are compacted according to the setting of `block_per_seg` - // note that the newly segments will be persistent into storage, such that if retry - // happens during `try_commit`, they do no need to be written again. for chunk in chunk_of_blocks { let stats = reduce_block_metas(chunk)?; - self.compacted_segment_accumulator.merge_stats(&stats)?; + compacted_segment_statistics = merge_statistics(&compacted_segment_statistics, &stats)?; let blocks: Vec = chunk.iter().map(|block| Clone::clone(*block)).collect(); let new_segment = SegmentInfo::new(blocks, stats); let location = segment_writer.write_segment(new_segment).await?; - self.compacted_segment_accumulator.add_location(location) + compacted_segment_locations.push(location); } + // collect all the paths of newly created segments + self.new_segment_paths = compacted_segment_locations + .iter() + .map(|(path, _version)| path.clone()) + .collect(); + + self.merged_segment_statistics = + merge_statistics(&compacted_segment_statistics, &unchanged_segment_statistics)?; + + self.merged_segments_locations = { + // place the newly generated compacted segment to the tail of the vector + unchanged_segment_locations.append(&mut compacted_segment_locations); + unchanged_segment_locations + }; + gauge!( "fuse_compact_segments_select_duration_second", select_begin.elapsed(), @@ -218,153 +201,26 @@ impl TableMutator for SegmentCompactMutator { Ok(self.need_compaction()) } - async fn try_commit(&self, table: Arc) -> Result<()> { + async fn try_commit(self: Box, table: Arc) -> Result<()> { if !self.need_compaction() { // defensive checking return Ok(()); } - let ctx = &self.ctx; - let base_snapshot = &self.base_snapshot; - let mut table = table; - let mut latest_snapshot = self.base_snapshot.clone(); - let mut retries = 0; - let mut current_table_info = table.get_table_info(); - - // newly appended segments of concurrent txs which are committed before us - // the init value of it is an empty slice - let mut concurrently_appended_segments_locations: &[Location] = &[]; - - let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), self.data_accessor.clone()); - loop { - let concurrently_append_segments_infos = fuse_segment_io - .read_segments(concurrently_appended_segments_locations) - .await? - .into_iter() - .collect::>>>()?; - - let concurrently_append_segments_stats = concurrently_append_segments_infos - .iter() - .map(|seg| &seg.summary) - .collect::>(); - - // merge all the statistics - let stats_concurrently_appended = - reduce_statistics(&concurrently_append_segments_stats)?; - - let stats_merged_with_compacted = merge_statistics( - &stats_concurrently_appended, - &self.compacted_segment_accumulator.summary, - )?; - - let stats_merged_with_unchanged = merge_statistics( - &stats_merged_with_compacted, - &self.unchanged_segment_accumulator.summary, - )?; - - // chain all the locations, places - // - the merged, potentially not compacted, concurrently appended segments at head - // - the unchanged, compacted segments in the middle - // - the compacted segments at the tail - let locations = concurrently_appended_segments_locations - .iter() - .chain(self.unchanged_segment_accumulator.locations.iter()) - .chain(self.compacted_segment_accumulator.locations.iter()); - - let mut snapshot_tobe_committed = - TableSnapshot::from_previous(latest_snapshot.as_ref()); - snapshot_tobe_committed.segments = locations.into_iter().cloned().collect::>(); - snapshot_tobe_committed.summary = stats_merged_with_unchanged; - - match FuseTable::commit_to_meta_server( - ctx.as_ref(), - current_table_info, - &self.location_generator, - snapshot_tobe_committed, - &self.data_accessor, + let abort_action = AbortOperation { + segments: self.new_segment_paths, + ..Default::default() + }; + + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + fuse_table + .commit_mutation( + &self.ctx, + self.base_snapshot, + self.merged_segments_locations, + self.merged_segment_statistics, + abort_action, ) .await - { - Err(e) => { - if e.code() == ErrorCode::table_version_mismatched_code() { - // refresh the table - table = table.refresh(ctx.as_ref()).await?; - let fuse_table = FuseTable::try_from_table(table.as_ref())?; - latest_snapshot = - fuse_table.read_table_snapshot(ctx.clone()).await?.ok_or_else(|| { - ErrorCode::LogicalError("compacting meets empty snapshot during conflict reconciliation") - })?; - current_table_info = &fuse_table.table_info; - - // check conflicts between the base and latest snapshots - match MutatorConflictDetector::detect_conflicts( - base_snapshot, - &latest_snapshot, - ) { - Conflict::Unresolvable => { - counter!("fuse_compact_segments_unresolvable_conflict", 1); - counter!("fuse_compact_segments_aborts", 1); - abort_segment_compaction( - self.ctx.clone(), - self.data_accessor.clone(), - &self.compacted_segment_accumulator.locations, - ) - .await; - break Err(ErrorCode::StorageOther( - "mutation conflicts, concurrent mutation detected while committing segment compaction operation", - )); - } - Conflict::ResolvableAppend(r) => { - counter!("fuse_compact_segments_resolvable_conflict", 1); - info!("resolvable conflicts detected"); - concurrently_appended_segments_locations = - &latest_snapshot.segments[r]; - } - } - - retries += 1; - - counter!("fuse_compact_segments_retires", retries); - if retries >= MAX_RETRIES { - counter!("fuse_compact_segments_aborts", 1); - abort_segment_compaction( - self.ctx.clone(), - self.data_accessor.clone(), - &self.compacted_segment_accumulator.locations, - ) - .await; - return Err(ErrorCode::StorageOther(format!( - "compact segment failed after {} retries", - retries - ))); - } - // different from retry of commit_insert, here operation retired - // without hesitation, to resolve the conflict as soon as possible - continue; - } else { - return Err(e); - } - } - Ok(_) => return Ok(()), - } - } } } - -async fn abort_segment_compaction( - ctx: Arc, - operator: Operator, - locations: &[Location], -) { - let files = Files::create(ctx, operator); - let paths = locations - .iter() - .map(|(path, _v)| path.clone()) - .collect::>(); - files.remove_file_in_batch(&paths).await.unwrap_or_else(|e| { - warn!( - "failed to delete all the segment meta files while aborting segment compact operation. {} ", - e - ) - }) -} diff --git a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs index 2f07590932b7..e2bf1258848e 100644 --- a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs @@ -202,9 +202,8 @@ impl TableMutator for ReclusterMutator { Ok(false) } - async fn try_commit(&self, table: Arc) -> Result<()> { - let base_mutator = self.base_mutator.clone(); - let ctx = base_mutator.ctx.clone(); + async fn try_commit(self: Box, table: Arc) -> Result<()> { + let ctx = &self.base_mutator.ctx; let (mut segments, mut summary, mut abort_operation) = self.base_mutator.generate_segments().await?; @@ -234,8 +233,8 @@ impl TableMutator for ReclusterMutator { let table = FuseTable::try_from_table(table.as_ref())?; table .commit_mutation( - &ctx, - base_mutator.base_snapshot, + ctx, + self.base_mutator.base_snapshot, segments, summary, abort_operation, diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 5d49297b6b4b..f049b1ac5d23 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -45,7 +45,7 @@ impl FuseTable { ctx: Arc, pipeline: &mut Pipeline, push_downs: Option, - ) -> Result>> { + ) -> Result>> { if self.cluster_key_meta.is_none() { return Ok(None); } @@ -194,6 +194,6 @@ impl FuseTable { None, ) })?; - Ok(Some(Arc::new(mutator))) + Ok(Some(Box::new(mutator))) } }