Skip to content

Commit

Permalink
Merge pull request #8350 from dantengsky/refactor-code-dedup
Browse files Browse the repository at this point in the history
refactor: use commit_mutation in segment compaction
  • Loading branch information
mergify[bot] authored Oct 20, 2022
2 parents 04ac812 + f8b7dc0 commit c15ca52
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 309 deletions.
4 changes: 2 additions & 2 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ pub trait Table: Sync + Send {
ctx: Arc<dyn TableContext>,
target: CompactTarget,
pipeline: &mut Pipeline,
) -> Result<Option<Arc<dyn TableMutator>>> {
) -> Result<Option<Box<dyn TableMutator>>> {
let (_, _, _) = (ctx, target, pipeline);

Err(ErrorCode::UnImplement(format!(
Expand All @@ -249,7 +249,7 @@ pub trait Table: Sync + Send {
ctx: Arc<dyn TableContext>,
pipeline: &mut Pipeline,
push_downs: Option<Extras>,
) -> Result<Option<Arc<dyn TableMutator>>> {
) -> Result<Option<Box<dyn TableMutator>>> {
let (_, _, _) = (ctx, pipeline, push_downs);

Err(ErrorCode::UnImplement(format!(
Expand Down
2 changes: 1 addition & 1 deletion src/query/catalog/src/table_mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ use crate::table::Table;
#[async_trait::async_trait]
pub trait TableMutator: Send + Sync {
async fn target_select(&mut self) -> Result<bool>;
async fn try_commit(&self, table: Arc<dyn Table>) -> Result<()>;
async fn try_commit(self: Box<Self>, table: Arc<dyn Table>) -> Result<()>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async fn test_compact_unresolved_conflict() -> Result<()> {
async fn build_mutator(
ctx: Arc<QueryContext>,
table: Arc<dyn Table>,
) -> Result<Arc<dyn TableMutator>> {
) -> Result<Box<dyn TableMutator>> {
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let settings = ctx.get_settings();
settings.set_max_threads(1)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod compact_mutator;
mod compact_segments_mutator;
mod deletion_mutator;
mod full_compact_mutator;
mod recluster_mutator;
mod segments_compact_mutator;
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use common_base::base::tokio;
use common_catalog::table::CompactTarget;
use common_catalog::table::Table;
use common_catalog::table::TableExt;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -123,6 +124,18 @@ async fn test_compact_segment_resolvable_conflict() -> Result<()> {
let count_block = "select block_count as count from fuse_snapshot('default', 't') limit 1";
let stream = execute_query(fixture.ctx(), count_block).await?;
assert_eq!(num_inserts as u64 * 2, check_count(stream).await?);

// check table statistics

let latest = table.refresh(ctx.as_ref()).await?;
let latest_fuse_table = FuseTable::try_from_table(latest.as_ref())?;
let table_statistics = latest_fuse_table
.table_statistics(ctx.clone())
.await?
.unwrap();

assert_eq!(table_statistics.num_rows.unwrap() as usize, num_inserts * 2);

Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ impl Table for FuseTable {
ctx: Arc<dyn TableContext>,
target: CompactTarget,
pipeline: &mut Pipeline,
) -> Result<Option<Arc<dyn TableMutator>>> {
) -> Result<Option<Box<dyn TableMutator>>> {
self.do_compact(ctx, target, pipeline).await
}

Expand All @@ -466,7 +466,7 @@ impl Table for FuseTable {
ctx: Arc<dyn TableContext>,
pipeline: &mut Pipeline,
push_downs: Option<Extras>,
) -> Result<Option<Arc<dyn TableMutator>>> {
) -> Result<Option<Box<dyn TableMutator>>> {
self.do_recluster(ctx, pipeline, push_downs).await
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/query/storages/fuse/src/io/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@ impl Files {
}

#[tracing::instrument(level = "debug", skip_all)]
pub async fn remove_file_in_batch(&self, file_locations: &[String]) -> Result<()> {
pub async fn remove_file_in_batch(
&self,
file_locations: impl IntoIterator<Item = impl AsRef<str>>,
) -> Result<()> {
let ctx = self.ctx.clone();
let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize;
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;

// 1.1 combine all the tasks.
let mut iter = file_locations.iter();
let mut iter = file_locations.into_iter();
let tasks = std::iter::from_fn(move || {
if let Some(location) = iter.next() {
let location = location.clone();
let location = location.as_ref().to_owned();
Some(
Self::remove_file(self.operator.clone(), location)
.instrument(tracing::debug_span!("remove_file")),
Expand Down
124 changes: 82 additions & 42 deletions src/query/storages/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ impl FuseTable {
}
None => {
info!("aborting operations");
let _ =
self::utils::abort_operations(self.get_operator(), operation_log).await;
let _ = utils::abort_operations(self.get_operator(), operation_log).await;
break Err(ErrorCode::OCCRetryFailure(format!(
"can not fulfill the tx after retries({} times, {} ms), aborted. table name {}, identity {}",
retry_times,
Expand Down Expand Up @@ -238,7 +237,7 @@ impl FuseTable {
// 1. merge stats with previous snapshot, if any
let stats = if let Some(snapshot) = &previous {
let summary = &snapshot.summary;
statistics::merge_statistics(&statistics, summary)?
merge_statistics(&statistics, summary)?
} else {
statistics
};
Expand Down Expand Up @@ -284,7 +283,7 @@ impl FuseTable {
snapshot_location.clone(),
);
// remove legacy options
self::utils::remove_legacy_options(&mut new_table_meta.options);
utils::remove_legacy_options(&mut new_table_meta.options);

// 2.2 setup table statistics
let stats = &snapshot.summary;
Expand Down Expand Up @@ -413,47 +412,59 @@ impl FuseTable {
&self,
ctx: &Arc<dyn TableContext>,
base_snapshot: Arc<TableSnapshot>,
mut segments: Vec<Location>,
mut summary: Statistics,
base_segments: Vec<Location>,
base_summary: Statistics,
abort_operation: AbortOperation,
) -> Result<()> {
let mut table = self;
let mut latest: Arc<dyn Table>;
let mut retries = 0;

let mut latest_snapshot = base_snapshot.clone();
let mut current_table_info = &self.table_info;
let mut latest_table_info = &self.table_info;

// holding the reference of latest table during retries
let mut latest_table_ref: Arc<dyn Table>;

// potentially concurrently appended segments, init it to empty
let mut concurrently_appended_segment_locations: &[Location] = &[];

while retries < MAX_RETRIES {
let mut snapshot_tobe_committed =
TableSnapshot::from_previous(latest_snapshot.as_ref());
snapshot_tobe_committed.segments = segments.clone();
snapshot_tobe_committed.summary = summary.clone();

let (segments_tobe_committed, statistics_tobe_committed) = Self::merge_with_base(
ctx.clone(),
self.operator.clone(),
&base_segments,
&base_summary,
concurrently_appended_segment_locations,
)
.await?;
snapshot_tobe_committed.segments = segments_tobe_committed;
snapshot_tobe_committed.summary = statistics_tobe_committed;

match Self::commit_to_meta_server(
ctx.as_ref(),
current_table_info,
latest_table_info,
&self.meta_location_generator,
snapshot_tobe_committed,
&self.operator,
)
.await
{
Err(e) if e.code() == ErrorCode::table_version_mismatched_code() => {
latest = table.refresh(ctx.as_ref()).await?;
table = FuseTable::try_from_table(latest.as_ref())?;

latest_snapshot =
table
.read_table_snapshot(ctx.clone())
.await?
.ok_or_else(|| {
ErrorCode::LogicalError(
"mutation meets empty snapshot during conflict reconciliation",
)
})?;
current_table_info = &table.table_info;
latest_table_ref = self.refresh(ctx.as_ref()).await?;
let latest_fuse_table = FuseTable::try_from_table(latest_table_ref.as_ref())?;
latest_snapshot = latest_fuse_table
.read_table_snapshot(ctx.clone())
.await?
.ok_or_else(|| {
ErrorCode::LogicalError(
"mutation meets empty snapshot during conflict reconciliation",
)
})?;
latest_table_info = &latest_fuse_table.table_info;

// Check if there is only insertion during the operation.
let mut new_segments = latest_snapshot.segments.clone();
match MutatorConflictDetector::detect_conflicts(
base_snapshot.as_ref(),
latest_snapshot.as_ref(),
Expand All @@ -462,30 +473,29 @@ impl FuseTable {
abort_operation
.abort(ctx.clone(), self.operator.clone())
.await?;
metrics::counter!("fuse_commit_mutation_unresolvable_conflict", 1);
return Err(ErrorCode::StorageOther(
"mutation conflicts, concurrent mutation detected while committing segment compaction operation",
));
}
Conflict::ResolvableAppend(range_of_newly_append) => {
// for resolvable append only operation, the range returned always begin with 0
new_segments.truncate(range_of_newly_append.len());
info!("resolvable conflicts detected");
metrics::counter!("fuse_commit_mutation_resolvable_conflict", 1);
concurrently_appended_segment_locations =
&latest_snapshot.segments[range_of_newly_append];
}
}

info!("resolvable conflicts detected");
// Read all segments information in parallel.
let fuse_segment_io = SegmentsIO::create(ctx.clone(), table.operator.clone());
let results = fuse_segment_io.read_segments(&new_segments).await?;
for result in results.iter() {
let segment = result.clone()?;
summary = merge_statistics(&summary, &segment.summary)?;
}
new_segments.extend(segments.clone());
segments = new_segments;
retries += 1;
metrics::counter!("fuse_commit_mutation_retry", 1);
}
Err(e) => return Err(e),
Ok(_) => return Ok(()),
Ok(_) => {
return {
metrics::counter!("fuse_commit_mutation_success", 1);
Ok(())
};
}
}
}

Expand All @@ -497,6 +507,38 @@ impl FuseTable {
retries
)))
}

async fn merge_with_base(
ctx: Arc<dyn TableContext>,
operator: Operator,
base_segments: &[Location],
base_summary: &Statistics,
concurrently_appended_segment_locations: &[Location],
) -> Result<(Vec<Location>, Statistics)> {
if concurrently_appended_segment_locations.is_empty() {
Ok((base_segments.to_owned(), base_summary.clone()))
} else {
// place the concurrently appended segments at the head of segment list
let new_segments = concurrently_appended_segment_locations
.iter()
.chain(base_segments.iter())
.cloned()
.collect();

let fuse_segment_io = SegmentsIO::create(ctx, operator);
let concurrent_appended_segment_infos = fuse_segment_io
.read_segments(concurrently_appended_segment_locations)
.await?;

let mut new_statistics = base_summary.clone();
for result in concurrent_appended_segment_infos.into_iter() {
let concurrent_appended_segment = result?;
new_statistics =
merge_statistics(&new_statistics, &concurrent_appended_segment.summary)?;
}
Ok((new_segments, new_statistics))
}
}
}

pub enum Conflict {
Expand All @@ -522,9 +564,6 @@ impl MutatorConflictDetector {
&& base_segments[0..base_segments_len]
== latest_segments[(latest_segments_len - base_segments_len)..latest_segments_len]
{
// note (latest_segments_len == base_segments_len indicate) indicates that the latest
// snapshot is generated by non-data related operations, like `alter table add column`
// which can be merged
Conflict::ResolvableAppend(0..(latest_segments_len - base_segments_len))
} else {
Conflict::Unresolvable
Expand All @@ -541,6 +580,7 @@ mod utils {
operator: Operator,
operation_log: TableOperationLog,
) -> Result<()> {
metrics::counter!("fuse_commit_mutation_aborts", 1);
for entry in operation_log {
for block in &entry.segment_info.blocks {
let block_location = &block.location.0;
Expand Down
10 changes: 5 additions & 5 deletions src/query/storages/fuse/src/operations/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl FuseTable {
ctx: Arc<dyn TableContext>,
target: CompactTarget,
pipeline: &mut Pipeline,
) -> Result<Option<Arc<dyn TableMutator>>> {
) -> Result<Option<Box<dyn TableMutator>>> {
let snapshot_opt = self.read_table_snapshot(ctx.clone()).await?;
let base_snapshot = if let Some(val) = snapshot_opt {
val
Expand Down Expand Up @@ -76,7 +76,7 @@ impl FuseTable {
ctx: Arc<dyn TableContext>,
_pipeline: &mut Pipeline,
options: CompactOptions,
) -> Result<Option<Arc<dyn TableMutator>>> {
) -> Result<Option<Box<dyn TableMutator>>> {
let mut segment_mutator = SegmentCompactMutator::try_create(
ctx.clone(),
options.base_snapshot,
Expand All @@ -86,7 +86,7 @@ impl FuseTable {
)?;

if segment_mutator.target_select().await? {
Ok(Some(Arc::new(segment_mutator)))
Ok(Some(Box::new(segment_mutator)))
} else {
Ok(None)
}
Expand All @@ -97,7 +97,7 @@ impl FuseTable {
ctx: Arc<dyn TableContext>,
pipeline: &mut Pipeline,
options: CompactOptions,
) -> Result<Option<Arc<dyn TableMutator>>> {
) -> Result<Option<Box<dyn TableMutator>>> {
let block_compactor = self.get_block_compactor();

let block_per_seg = options.block_per_seg;
Expand Down Expand Up @@ -162,6 +162,6 @@ impl FuseTable {
)
})?;

Ok(Some(Arc::new(mutator)))
Ok(Some(Box::new(mutator)))
}
}
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl FuseTable {

self.commit_mutation(
&ctx,
del_holder.base_snapshot().clone(),
del_holder.base_snapshot(),
segments,
summary,
abort_operation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ impl AbortOperation {

pub async fn abort(self, ctx: Arc<dyn TableContext>, operator: Operator) -> Result<()> {
let fuse_file = Files::create(ctx, operator);
let locations = vec![self.blocks, self.bloom_filter_indexes, self.segments].concat();
fuse_file.remove_file_in_batch(&locations).await
let locations = self
.blocks
.into_iter()
.chain(self.bloom_filter_indexes.into_iter())
.chain(self.segments.into_iter());
fuse_file.remove_file_in_batch(locations).await
}
}
Loading

1 comment on commit c15ca52

@vercel
Copy link

@vercel vercel bot commented on c15ca52 Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-databend.vercel.app
databend-git-main-databend.vercel.app
databend.vercel.app
databend.rs

Please sign in to comment.