Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-3121: simple chunk strategy in insertion #3122

Merged
merged 18 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
//

pub const TBL_OPT_KEY_SNAPSHOT_LOC: &str = "SNAPSHOT_LOC";
pub const TBL_OPT_KEY_CHUNK_BLOCK_NUM: &str = "CHUNK_BLOCK_NUM";
pub const TBL_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD: &str = "BLOCK_SIZE_THRESHOLD";
pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b";
pub const FUSE_TBL_SEGMENT_PREFIX: &str = "_sg";
pub const FUSE_TBL_SNAPSHOT_PREFIX: &str = "_ss";

pub const DEFAULT_CHUNK_BLOCK_NUM: usize = 1000;
pub const DEFAULT_BLOCK_SIZE_IN_MEM_SIZE_THRESHOLD: usize = 100 * 1024 * 1024;
6 changes: 4 additions & 2 deletions query/src/storages/fuse/index/min_max_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use futures::TryStreamExt;

use crate::catalogs::Catalog;
use crate::storages::fuse::index::range_filter;
use crate::storages::fuse::io::TBL_OPT_KEY_SNAPSHOT_LOC;
use crate::storages::fuse::table_test_fixture::TestFixture;
use crate::storages::fuse::TBL_OPT_KEY_CHUNK_BLOCK_NUM;
use crate::storages::fuse::TBL_OPT_KEY_SNAPSHOT_LOC;

#[tokio::test]
async fn test_min_max_index() -> Result<()> {
Expand All @@ -53,7 +54,8 @@ async fn test_min_max_index() -> Result<()> {
table_meta: TableMeta {
schema: test_schema.clone(),
engine: "FUSE".to_string(),
options: Default::default(),
// make sure blocks will not be merged
options: [(TBL_OPT_KEY_CHUNK_BLOCK_NUM.to_owned(), "1".to_owned())].into(),
},
};

Expand Down
83 changes: 63 additions & 20 deletions query/src/storages/fuse/io/block_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
use std::cmp::Reverse;
use std::sync::Arc;

use common_arrow::arrow::datatypes::Schema as ArrowSchema;
Expand All @@ -24,7 +25,9 @@ use common_datavalues::DataSchema;
use common_exception::ErrorCode;
use common_exception::Result;
use common_streams::SendableDataBlockStream;
use futures::stream::TryChunksError;
use futures::StreamExt;
use futures::TryStreamExt;

use crate::storages::fuse::io;
use crate::storages::fuse::meta::SegmentInfo;
Expand All @@ -33,39 +36,81 @@ use crate::storages::fuse::statistics;
use crate::storages::fuse::statistics::BlockMetaAccumulator;
use crate::storages::fuse::statistics::StatisticsAccumulator;

/// dummy struct, namespace placeholder
pub struct BlockAppender;

impl BlockAppender {
// A simple strategy of merging small blocks into larger ones:
// for each n successive data blocks in `blocks`, if the sum of their `memory_size` exceeds
// `block_size_threshold`, they will be merged into one larger block.
// NOTE:
// - the max size of merge-block will be 2 * block_size_threshold
// - for block that is larger than `block_size_threshold`, they will NOT be split
// TODO handling split in table compact/optimize
pub(crate) fn reshape_blocks(
dantengsky marked this conversation as resolved.
Show resolved Hide resolved
mut blocks: Vec<DataBlock>,
block_size_threshold: usize,
) -> Result<Vec<DataBlock>> {
// sort by memory_size DESC
blocks.sort_unstable_by_key(|r| Reverse(r.memory_size()));

let mut result = vec![];

let mut block_size_acc = 0;
let mut block_acc = vec![];

for block in blocks {
block_size_acc += block.memory_size();
block_acc.push(block);
if block_size_acc >= block_size_threshold {
result.push(DataBlock::concat_blocks(&block_acc)?);
block_acc.clear();
block_size_acc = 0;
}
}

if !block_acc.is_empty() {
result.push(DataBlock::concat_blocks(&block_acc)?)
}

Ok(result)
}

// TODO should return a stream of SegmentInfo (batch blocks into segments)
pub async fn append_blocks(
data_accessor: Arc<dyn DataAccessor>,
mut stream: SendableDataBlockStream,
stream: SendableDataBlockStream,
data_schema: &DataSchema,
) -> Result<Option<SegmentInfo>> {
let mut stats_acc = StatisticsAccumulator::new();
let mut block_meta_acc = BlockMetaAccumulator::new();
chunk_block_num: usize,
block_size_threshold: usize,
) -> Result<Vec<SegmentInfo>> {
// filter out empty blocks
let stream = stream.try_filter(|block| std::future::ready(block.num_rows() > 0));

let mut block_nums = 0;
// chunks by chunk_block_num
let mut stream = stream.try_chunks(chunk_block_num);

let mut segments = vec![];
// accumulate the stats and save the blocks
while let Some(block) = stream.next().await {
let block = block?;
if block.num_rows() != 0 {
while let Some(item) = stream.next().await {
let blocks = item.map_err(|TryChunksError(_, e)| e)?;
// re-shape the blocks
let blocks = Self::reshape_blocks(blocks, block_size_threshold)?;
let mut stats_acc = StatisticsAccumulator::new();
let mut block_meta_acc = BlockMetaAccumulator::new();

for block in blocks.into_iter() {
stats_acc.acc(&block)?;
let schema = block.schema().to_arrow();
let location = io::gen_block_location();
let file_size = Self::save_block(&schema, block, &data_accessor, &location).await?;
block_meta_acc.acc(file_size, location, &mut stats_acc);
block_nums += 1;
}
}

let segment = if block_nums > 0 {
// summary and give back a segment_info
// we need to send back a stream of segment latter
let block_metas = block_meta_acc.blocks_metas;
let summary = statistics::reduce_block_stats(&stats_acc.blocks_stats, data_schema)?;
Some(SegmentInfo {
let seg = SegmentInfo {
blocks: block_metas,
summary: Stats {
row_count: stats_acc.summary_row_count,
Expand All @@ -74,12 +119,10 @@ impl BlockAppender {
compressed_byte_size: stats_acc.file_size,
col_stats: summary,
},
})
} else {
None
};

Ok(segment)
};
segments.push(seg)
}
Ok(segments)
}

pub(super) async fn save_block(
Expand Down Expand Up @@ -112,7 +155,7 @@ impl BlockAppender {

use bytes::BufMut;
// we need a configuration of block size threshold here
let mut writer = Vec::with_capacity(10 * 1024 * 1024).writer();
let mut writer = Vec::with_capacity(100 * 1024 * 1024).writer();

let len = common_arrow::parquet::write::write_file(
&mut writer,
Expand Down
120 changes: 112 additions & 8 deletions query/src/storages/fuse/io/block_appender_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,127 @@ use common_datavalues::DataType;
use tempfile::TempDir;

use crate::storages::fuse::io::BlockAppender;
use crate::storages::fuse::DEFAULT_CHUNK_BLOCK_NUM;

#[tokio::test]
async fn test_fuse_table_block_appender() {
let tmp_dir = TempDir::new().unwrap();
let local_fs = common_dal::Local::with_path(tmp_dir.path().to_owned());
let local_fs = Arc::new(local_fs);
// some blocks
let schema = DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]);

// single segments
let block = DataBlock::create_by_array(schema.clone(), vec![Series::new(vec![1, 2, 3])]);
let block_stream = futures::stream::iter(vec![Ok(block)]);
let r = BlockAppender::append_blocks(local_fs.clone(), Box::pin(block_stream), schema.as_ref())
.await;
assert!(r.is_ok());
let r = BlockAppender::append_blocks(
local_fs.clone(),
Box::pin(block_stream),
schema.as_ref(),
DEFAULT_CHUNK_BLOCK_NUM,
0,
)
.await;
assert!(r.is_ok(), "oops, unexpected result: {:?}", r);
let r = r.unwrap();
assert_eq!(r.len(), 1);

// multiple segments
let number_of_blocks = 30;
let chunk_size = 10;
let block = DataBlock::create_by_array(schema.clone(), vec![Series::new(vec![1, 2, 3])]);
let blocks = std::iter::repeat(Ok(block)).take(number_of_blocks);
let block_stream = futures::stream::iter(blocks);
let r = BlockAppender::append_blocks(
local_fs.clone(),
Box::pin(block_stream),
schema.as_ref(),
chunk_size,
0,
)
.await;
assert!(r.is_ok(), "oops, unexpected result: {:?}", r);
let r = r.unwrap();
assert_eq!(r.len(), number_of_blocks / chunk_size);

// non blocks
// empty blocks
let block_stream = futures::stream::iter(vec![]);
let r = BlockAppender::append_blocks(local_fs, Box::pin(block_stream), schema.as_ref()).await;
assert!(r.is_ok());
assert!(r.unwrap().is_none())
let r = BlockAppender::append_blocks(
local_fs,
Box::pin(block_stream),
schema.as_ref(),
DEFAULT_CHUNK_BLOCK_NUM,
0,
)
.await;
assert!(r.is_ok(), "oops, unexpected result: {:?}", r);
assert!(r.unwrap().is_empty())
}

#[test]
fn test_fuse_table_block_appender_reshape() -> common_exception::Result<()> {
let schema = DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]);
let sample_block = DataBlock::create_by_array(schema, vec![Series::new(vec![1, 2, 3])]);
let sample_block_size = sample_block.memory_size();

// 1 empty blocks
// 1.1 empty block, zero block_size_threshold
let blocks = vec![];
let r = BlockAppender::reshape_blocks(blocks, 0);
assert!(r.is_ok(), "oops, unexpected result: {:?}", r);
let r = r.unwrap();
assert_eq!(r.len(), 0);

// 1.2 empty block, arbitrary block_size_threshold
let blocks = vec![];
let r = BlockAppender::reshape_blocks(blocks, 100);
assert!(r.is_ok(), "oops, unexpected result: {:?}", r);
let r = r.unwrap();
assert_eq!(r.len(), 0);

// 2. merge
// 2.1 several blocks into exactly one block
let block_num = 10;
let (blocks, block_size_threshold) = gen_blocks(&sample_block, block_num);
let r = BlockAppender::reshape_blocks(blocks.collect(), block_size_threshold)?;
assert_eq!(r.len(), 1);
assert_eq!(r[0].memory_size(), block_size_threshold);

// 2.1 with remainders
// 2.1.1 reminders at tail
let block_num = 10;
let (blocks, block_size_threshold) = gen_blocks(&sample_block, block_num);
// push back an extra block
let blocks = blocks.chain(std::iter::once(sample_block.clone()));
let r = BlockAppender::reshape_blocks(blocks.collect(), block_size_threshold)?;
assert_eq!(r.len(), 2);
assert_eq!(r[0].memory_size(), block_size_threshold);
assert_eq!(r[1].memory_size(), sample_block_size);

// 2.1.2 large blocks will not be split
let block_num = 10;
let (blocks, block_size_threshold) = gen_blocks(&sample_block, block_num);

// generate a large block
let (tmp_blocks, tmp_block_size_threshold) = gen_blocks(&sample_block, block_num * 2);
assert!(tmp_block_size_threshold > block_size_threshold);
let large_block = DataBlock::concat_blocks(&tmp_blocks.collect::<Vec<_>>())?;
let large_block_size = large_block.memory_size();
// push back the large block
let blocks = blocks.chain(std::iter::once(large_block));

let r = BlockAppender::reshape_blocks(blocks.collect(), block_size_threshold)?;
assert_eq!(r.len(), 2);
// blocks are sorted (DESC by size) during reshape, thus we get the large_block at head
assert_eq!(r[0].memory_size(), large_block_size);
assert_eq!(r[1].memory_size(), block_size_threshold);

Ok(())
}

fn gen_blocks(sample_block: &DataBlock, num: usize) -> (impl Iterator<Item = DataBlock>, usize) {
let block_size = sample_block.memory_size();
let block = sample_block.clone();
let blocks = std::iter::repeat(block).take(num);
let ideal_threshold = block_size * num;
(blocks, ideal_threshold)
}
6 changes: 3 additions & 3 deletions query/src/storages/fuse/io/location_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

use uuid::Uuid;

use super::constants::FUSE_TBL_BLOCK_PREFIX;
use super::constants::FUSE_TBL_SEGMENT_PREFIX;
use super::constants::FUSE_TBL_SNAPSHOT_PREFIX;
use crate::storages::fuse::constants::FUSE_TBL_BLOCK_PREFIX;
use crate::storages::fuse::constants::FUSE_TBL_SEGMENT_PREFIX;
use crate::storages::fuse::constants::FUSE_TBL_SNAPSHOT_PREFIX;

pub fn gen_block_location() -> String {
let part_uuid = Uuid::new_v4().to_simple().to_string() + ".parquet";
Expand Down
16 changes: 7 additions & 9 deletions query/src/storages/fuse/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
// limitations under the License.
//

pub use block_appender::BlockAppender;
pub use col_encoding::col_encoding;
pub use constants::TBL_OPT_KEY_SNAPSHOT_LOC;
pub use location_gen::gen_block_location;
pub use location_gen::gen_segment_info_location;
pub use location_gen::snapshot_location;
#[cfg(test)]
mod block_appender_test;

mod block_appender;
mod col_encoding;
mod constants;
mod location_gen;

#[cfg(test)]
mod block_appender_test;
pub use block_appender::BlockAppender;
pub use col_encoding::col_encoding;
pub use location_gen::gen_block_location;
pub use location_gen::gen_segment_info_location;
pub use location_gen::snapshot_location;
8 changes: 5 additions & 3 deletions query/src/storages/fuse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ mod table_test;
#[cfg(test)]
mod table_test_fixture;

mod constants;
mod index;
mod io;
mod meta;
mod operations;
mod statistics;
mod table;

pub mod io;
pub mod operations;

pub use constants::*;
pub use meta::ColumnId;
pub use meta::ColumnStats;
pub use statistics::BlockStats;
pub use table::FuseTable;
Loading