Skip to content

Commit

Permalink
Merge pull request #6373 from zhyass/feature_cluster_table
Browse files Browse the repository at this point in the history
feat(storage): Improve optimize table compact
  • Loading branch information
BohuTANG authored Jul 1, 2022
2 parents 4837f1a + 1ba9779 commit e3d3f09
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 52 deletions.
14 changes: 2 additions & 12 deletions query/src/interpreters/interpreter_table_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ use common_planners::OptimizeTableAction;
use common_planners::OptimizeTablePlan;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use futures::StreamExt;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterFactory;
use crate::interpreters::InterpreterPtr;
use crate::sessions::QueryContext;
use crate::sql::PlanParser;

pub struct OptimizeTableInterpreter {
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -65,15 +62,8 @@ impl Interpreter for OptimizeTableInterpreter {
);

if do_compact {
// it is a "simple and violent" strategy, to be optimized later
let obj_name = format!("{}.{}", &plan.database, &plan.table);
let rewritten_query =
format!("INSERT OVERWRITE {} SELECT * FROM {}", obj_name, obj_name);
let rewritten_plan =
PlanParser::parse(self.ctx.clone(), rewritten_query.as_str()).await?;
let interpreter = InterpreterFactory::get(self.ctx.clone(), rewritten_plan)?;
let mut stream = interpreter.execute(None).await?;
while let Some(Ok(_)) = stream.next().await {}
// TODO(zhyass): clustering key.
table.compact(self.ctx.clone(), self.plan.clone()).await?;
if do_purge {
// currently, context caches the table, we have to "refresh"
// the table by using the catalog API directly
Expand Down
12 changes: 9 additions & 3 deletions query/src/storages/fuse/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_meta_types::MatchSeq;
use common_planners::DeletePlan;
use common_planners::Expression;
use common_planners::Extras;
use common_planners::OptimizeTablePlan;
use common_planners::Partitions;
use common_planners::ReadDataSourcePlan;
use common_planners::Statistics;
Expand All @@ -43,6 +44,7 @@ use crate::sql::PlanParser;
use crate::sql::OPT_KEY_DATABASE_ID;
use crate::sql::OPT_KEY_LEGACY_SNAPSHOT_LOC;
use crate::sql::OPT_KEY_SNAPSHOT_LOCATION;
use crate::storages::fuse::io::write_meta;
use crate::storages::fuse::io::MetaReaders;
use crate::storages::fuse::io::TableMetaLocationGenerator;
use crate::storages::fuse::meta::ClusterKey;
Expand Down Expand Up @@ -170,7 +172,7 @@ impl FuseTable {
}
}

async fn update_table_meta(
pub async fn update_table_meta(
&self,
ctx: &QueryContext,
catalog_name: &str,
Expand All @@ -181,9 +183,8 @@ impl FuseTable {
let snapshot_loc = self
.meta_location_generator()
.snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?;
let bytes = serde_json::to_vec(snapshot)?;
let operator = ctx.get_storage_operator()?;
operator.object(&snapshot_loc).write(bytes).await?;
write_meta(&operator, &snapshot_loc, snapshot).await?;

// set new snapshot location
meta.options
Expand Down Expand Up @@ -449,4 +450,9 @@ impl Table for FuseTable {
async fn delete(&self, ctx: Arc<QueryContext>, delete_plan: DeletePlan) -> Result<()> {
self.do_delete(ctx, &delete_plan).await
}

#[tracing::instrument(level = "debug", name = "fuse_table_compact", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))]
async fn compact(&self, ctx: Arc<QueryContext>, plan: OptimizeTablePlan) -> Result<()> {
self.do_compact(ctx, &plan).await
}
}
2 changes: 1 addition & 1 deletion query/src/storages/fuse/operations/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl FuseTable {
Ok(())
}

fn get_option<T: FromStr>(&self, opt_key: &str, default: T) -> T {
pub fn get_option<T: FromStr>(&self, opt_key: &str, default: T) -> T {
self.table_info
.options()
.get(opt_key)
Expand Down
46 changes: 11 additions & 35 deletions query/src/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::time::Instant;
use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
use common_base::base::ProgressValues;
use common_cache::Cache;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -37,7 +36,6 @@ use uuid::Uuid;
use crate::sessions::QueryContext;
use crate::sql::OPT_KEY_LEGACY_SNAPSHOT_LOC;
use crate::sql::OPT_KEY_SNAPSHOT_LOCATION;
use crate::storages::fuse::io::write_meta;
use crate::storages::fuse::meta::ClusterKey;
use crate::storages::fuse::meta::Location;
use crate::storages::fuse::meta::SegmentInfo;
Expand Down Expand Up @@ -199,39 +197,17 @@ impl FuseTable {
)?
};

let uuid = new_snapshot.snapshot_id;
let snapshot_loc = self
.meta_location_generator()
.snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?;
let operator = ctx.get_storage_operator()?;
write_meta(&operator, &snapshot_loc, &new_snapshot).await?;

let result = Self::commit_to_meta_server(
ctx,
catalog_name,
self.get_table_info(),
snapshot_loc.clone(),
&new_snapshot.summary,
)
.await;

match result {
Ok(_) => {
if let Some(snapshot_cache) =
ctx.get_storage_cache_manager().get_table_snapshot_cache()
{
let cache = &mut snapshot_cache.write().await;
cache.put(snapshot_loc, Arc::new(new_snapshot));
}
Ok(())
}
Err(e) => {
// commit snapshot to meta server failed, try to delete it.
// "major GC" will collect this, if deletion failure (even after DAL retried)
let _ = operator.object(&snapshot_loc).delete().await;
Err(e)
}
}
let mut new_table_meta = self.get_table_info().meta.clone();
// update statistics
new_table_meta.statistics = TableStatistics {
number_of_rows: new_snapshot.summary.row_count,
data_bytes: new_snapshot.summary.uncompressed_byte_size,
compressed_data_bytes: new_snapshot.summary.compressed_byte_size,
index_data_bytes: 0, // TODO we do not have it yet
};

self.update_table_meta(ctx, catalog_name, &new_snapshot, &mut new_table_meta)
.await
}

fn merge_table_operations(
Expand Down
73 changes: 73 additions & 0 deletions query/src/storages/fuse/operations/compact.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use std::sync::Arc;

use common_exception::Result;
use common_meta_app::schema::TableStatistics;
use common_planners::OptimizeTablePlan;

use super::mutation::CompactMutator;
use crate::sessions::QueryContext;
use crate::storages::fuse::FuseTable;
use crate::storages::fuse::DEFAULT_BLOCK_PER_SEGMENT;
use crate::storages::fuse::DEFAULT_ROW_PER_BLOCK;
use crate::storages::fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
use crate::storages::fuse::FUSE_OPT_KEY_ROW_PER_BLOCK;
use crate::storages::storage_table::Table;

impl FuseTable {
pub async fn do_compact(&self, ctx: Arc<QueryContext>, plan: &OptimizeTablePlan) -> Result<()> {
let snapshot_opt = self.read_table_snapshot(ctx.as_ref()).await?;
let snapshot = if let Some(val) = snapshot_opt {
val
} else {
// no snapshot, no compaction.
return Ok(());
};

if snapshot.summary.block_count <= 1 {
return Ok(());
}

let row_per_block = self.get_option(FUSE_OPT_KEY_ROW_PER_BLOCK, DEFAULT_ROW_PER_BLOCK);
let block_per_seg =
self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);

let mut mutator = CompactMutator::try_create(
&ctx,
&self.meta_location_generator,
&snapshot,
row_per_block,
block_per_seg,
)?;

let new_snapshot = mutator.compact(self).await?;
let mut new_table_meta = self.get_table_info().meta.clone(); // update statistics
new_table_meta.statistics = TableStatistics {
number_of_rows: new_snapshot.summary.row_count,
data_bytes: new_snapshot.summary.uncompressed_byte_size,
compressed_data_bytes: new_snapshot.summary.compressed_byte_size,
index_data_bytes: 0, // TODO we do not have it yet
};
self.update_table_meta(
ctx.as_ref(),
&plan.catalog,
&new_snapshot,
&mut new_table_meta,
)
.await
}
}
1 change: 1 addition & 0 deletions query/src/storages/fuse/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod append;
mod commit;
mod compact;
mod delete;
mod fuse_sink;
mod gc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub async fn delete_from_block(
Ok(res)
}

fn all_the_columns_ids(table: &FuseTable) -> Vec<usize> {
pub fn all_the_columns_ids(table: &FuseTable) -> Vec<usize> {
(0..table.table_info.schema().fields().len())
.into_iter()
.collect::<Vec<usize>>()
Expand Down
152 changes: 152 additions & 0 deletions query/src/storages/fuse/operations/mutation/compact_mutator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datablocks::DataBlock;
use common_exception::Result;
use opendal::Operator;

use super::block_filter::all_the_columns_ids;
use crate::sessions::QueryContext;
use crate::storages::fuse::io::BlockCompactor;
use crate::storages::fuse::io::BlockWriter;
use crate::storages::fuse::io::MetaReaders;
use crate::storages::fuse::io::SegmentWriter;
use crate::storages::fuse::io::TableMetaLocationGenerator;
use crate::storages::fuse::meta::BlockMeta;
use crate::storages::fuse::meta::SegmentInfo;
use crate::storages::fuse::meta::TableSnapshot;
use crate::storages::fuse::statistics::reducers::reduce_block_metas;
use crate::storages::fuse::statistics::reducers::reduce_statistics;
use crate::storages::fuse::FuseTable;

pub struct CompactMutator<'a> {
ctx: &'a Arc<QueryContext>,
location_generator: &'a TableMetaLocationGenerator,
base_snapshot: &'a TableSnapshot,
data_accessor: Operator,
row_per_block: usize,
block_per_seg: usize,
}

impl<'a> CompactMutator<'a> {
pub fn try_create(
ctx: &'a Arc<QueryContext>,
location_generator: &'a TableMetaLocationGenerator,
base_snapshot: &'a TableSnapshot,
row_per_block: usize,
block_per_seg: usize,
) -> Result<Self> {
let data_accessor = ctx.get_storage_operator()?;
Ok(Self {
ctx,
location_generator,
base_snapshot,
data_accessor,
row_per_block,
block_per_seg,
})
}

pub async fn compact(&mut self, table: &FuseTable) -> Result<TableSnapshot> {
let snapshot = self.base_snapshot;
// Blocks that need to be reorganized into new segments.
let mut remain_blocks = Vec::new();
// Blocks that need to be compacted.
let mut merged_blocks = Vec::new();
// The new segments.
let mut segments = Vec::new();
let mut summarys = Vec::new();
let reader = MetaReaders::segment_info_reader(self.ctx);
for segment_location in &snapshot.segments {
let (x, ver) = (segment_location.0.clone(), segment_location.1);
let mut need_merge = false;
let mut remains = Vec::new();
let segment = reader.read(x, None, ver).await?;
segment.blocks.iter().for_each(|b| {
if b.row_count != self.row_per_block as u64 {
merged_blocks.push(b.clone());
need_merge = true;
} else {
remains.push(b.clone());
}
});

// If the number of blocks of segment meets block_per_seg, and the blocks in segments donot need to be compacted,
// then record the segment information.
if !need_merge && segment.blocks.len() == self.block_per_seg {
segments.push(segment_location.clone());
summarys.push(segment.summary.clone());
continue;
}

remain_blocks.append(&mut remains);
}

// Compact the blocks.
let col_ids = all_the_columns_ids(table);
let mut compactor = BlockCompactor::new(self.row_per_block);
let block_writer = BlockWriter::new(&self.data_accessor, self.location_generator);
for block_meta in &merged_blocks {
let block_reader = table.create_block_reader(self.ctx, col_ids.clone())?;
let data_block = block_reader.read_with_block_meta(block_meta).await?;

let res = compactor.compact(data_block)?;
Self::write_block(&block_writer, res, &mut remain_blocks).await?;
}
let remains = compactor.finish()?;
Self::write_block(&block_writer, remains, &mut remain_blocks).await?;

// Create new segments.
let segment_info_cache = self
.ctx
.get_storage_cache_manager()
.get_table_segment_cache();
let seg_writer = SegmentWriter::new(
&self.data_accessor,
self.location_generator,
&segment_info_cache,
);
let chunks = remain_blocks.chunks(self.block_per_seg);
for chunk in chunks {
let new_summary = reduce_block_metas(chunk)?;
let new_segment = SegmentInfo::new(chunk.to_vec(), new_summary.clone());
let new_segment_location = seg_writer.write_segment(new_segment).await?;
segments.push(new_segment_location);
summarys.push(new_summary);
}

let mut new_snapshot = TableSnapshot::from_previous(snapshot);
new_snapshot.segments = segments;
// update the summary of new snapshot
let new_summary = reduce_statistics(&summarys)?;
new_snapshot.summary = new_summary;
Ok(new_snapshot)
}

async fn write_block(
writer: &BlockWriter<'_>,
blocks: Option<Vec<DataBlock>>,
metas: &mut Vec<BlockMeta>,
) -> Result<()> {
if let Some(blocks) = blocks {
for block in blocks {
let new_block_meta = writer.write(block).await?;
metas.push(new_block_meta);
}
}
Ok(())
}
}
Loading

0 comments on commit e3d3f09

Please sign in to comment.