Skip to content

Commit

Permalink
feat: build sst in stream way (apache#747)
Browse files Browse the repository at this point in the history
* feat: support build sst in stream way

* use AsyncWrite for building sst procedure

* shutdown when close the async writer

* build sst in streaming way

* find the custom metadata

* better names

* add config for write sst max buffer size

* polish up some comments

* address CR

* fix license header

* use readable size for write_sst_max_buffer_size
  • Loading branch information
ShiKaiWi authored Mar 22, 2023
1 parent 31110b0 commit 54b72b0
Show file tree
Hide file tree
Showing 16 changed files with 513 additions and 187 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 19 additions & 1 deletion analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::{
instance::{
flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore,
},
sst::factory::SstWriteOptions,
table::data::TableDataRef,
TableOptions,
};
Expand Down Expand Up @@ -287,6 +288,7 @@ impl SchedulerImpl {
space_store: Arc<SpaceStore>,
runtime: Arc<Runtime>,
config: SchedulerConfig,
write_sst_max_buffer_size: usize,
) -> Self {
let (tx, rx) = mpsc::channel(config.schedule_channel_len);
let running = Arc::new(AtomicBool::new(true));
Expand All @@ -300,6 +302,7 @@ impl SchedulerImpl {
picker_manager: PickerManager::default(),
max_ongoing_tasks: config.max_ongoing_tasks,
max_unflushed_duration: config.max_unflushed_duration.0,
write_sst_max_buffer_size,
limit: Arc::new(OngoingTaskLimit {
ongoing_tasks: AtomicUsize::new(0),
request_buf: RwLock::new(RequestQueue::default()),
Expand Down Expand Up @@ -367,6 +370,7 @@ struct ScheduleWorker {
max_unflushed_duration: Duration,
picker_manager: PickerManager,
max_ongoing_tasks: usize,
write_sst_max_buffer_size: usize,
limit: Arc<OngoingTaskLimit>,
running: Arc<AtomicBool>,
memory_limit: MemoryLimit,
Expand Down Expand Up @@ -462,13 +466,27 @@ impl ScheduleWorker {

let sender = self.sender.clone();
let request_id = RequestId::next_id();
let storage_format_hint = table_data.table_options().storage_format_hint;
let sst_write_options = SstWriteOptions {
storage_format_hint,
num_rows_per_row_group: table_data.table_options().num_rows_per_row_group,
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
};

// Do actual costly compact job in background.
self.runtime.spawn(async move {
// Release the token after compaction finished.
let _token = token;

let res = space_store
.compact_table(runtime, &table_data, request_id, &compaction_task)
.compact_table(
runtime,
&table_data,
request_id,
&compaction_task,
&sst_write_options,
)
.await;

if let Err(e) = &res {
Expand Down
15 changes: 7 additions & 8 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ impl Instance {
storage_format_hint: table_data.table_options().storage_format_hint,
num_rows_per_row_group: table_data.table_options().num_rows_per_row_group,
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
};

for time_range in &time_ranges {
Expand Down Expand Up @@ -673,6 +674,7 @@ impl Instance {
storage_format_hint,
num_rows_per_row_group: table_data.table_options().num_rows_per_row_group,
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
};
let mut writer = self
.space_store
Expand Down Expand Up @@ -728,6 +730,7 @@ impl SpaceStore {
table_data: &TableData,
request_id: RequestId,
task: &CompactionTask,
sst_write_options: &SstWriteOptions,
) -> Result<()> {
debug!(
"Begin compact table, table_name:{}, id:{}, task:{:?}",
Expand Down Expand Up @@ -765,6 +768,7 @@ impl SpaceStore {
table_data,
request_id,
input,
sst_write_options,
&mut edit_meta,
)
.await?;
Expand Down Expand Up @@ -795,6 +799,7 @@ impl SpaceStore {
table_data: &TableData,
request_id: RequestId,
input: &CompactionInputFiles,
sst_write_options: &SstWriteOptions,
edit_meta: &mut VersionEditMeta,
) -> Result<()> {
debug!(
Expand Down Expand Up @@ -903,18 +908,12 @@ impl SpaceStore {
let file_id = table_data.alloc_file_id();
let sst_file_path = table_data.set_sst_file_path(file_id);

let storage_format_hint = table_data.table_options().storage_format_hint;
let sst_write_options = SstWriteOptions {
storage_format_hint,
num_rows_per_row_group: table_options.num_rows_per_row_group,
compression: table_options.compression,
};
let mut sst_writer = self
.sst_factory
.create_writer(&sst_write_options, &sst_file_path, self.store_picker())
.create_writer(sst_write_options, &sst_file_path, self.store_picker())
.await
.context(CreateSstWriter {
storage_format_hint,
storage_format_hint: sst_write_options.storage_format_hint,
})?;

let sst_info = sst_writer
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ pub struct Instance {
pub(crate) space_write_buffer_size: usize,
/// Replay wal batch size
pub(crate) replay_batch_size: usize,
/// Write sst max buffer size
pub(crate) write_sst_max_buffer_size: usize,
/// Options for scanning sst
pub(crate) iter_options: IterOptions,
pub(crate) remote_engine: Option<RemoteEngineRef>,
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl Instance {
space_store.clone(),
bg_runtime.clone(),
scheduler_config,
ctx.config.write_sst_max_buffer_size.as_bytes() as usize,
));

let file_purger = FilePurger::start(&bg_runtime, store_picker.default_store().clone());
Expand All @@ -92,6 +93,7 @@ impl Instance {
db_write_buffer_size: ctx.config.db_write_buffer_size,
space_write_buffer_size: ctx.config.space_write_buffer_size,
replay_batch_size: ctx.config.replay_batch_size,
write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_bytes() as usize,
iter_options,
remote_engine: remote_engine_ref,
});
Expand Down
5 changes: 4 additions & 1 deletion analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod table_options;
#[cfg(any(test, feature = "test"))]
pub mod tests;

use common_util::config::ReadableDuration;
use common_util::config::{ReadableDuration, ReadableSize};
use manifest::details::Options as ManifestOptions;
use message_queue::kafka::config::Config as KafkaConfig;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -75,6 +75,8 @@ pub struct Config {
pub scan_batch_size: usize,
/// Sst background reading parallelism
pub sst_background_read_parallelism: usize,
/// Max buffer size for writing sst
pub write_sst_max_buffer_size: ReadableSize,

/// Wal storage config
///
Expand Down Expand Up @@ -108,6 +110,7 @@ impl Default for Config {
db_write_buffer_size: 0,
scan_batch_size: 500,
sst_background_read_parallelism: 8,
write_sst_max_buffer_size: ReadableSize::mb(10),
wal: WalStorageConfig::RocksDB(Box::default()),
remote_engine_client: remote_engine_client::config::Config::default(),
}
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub struct SstWriteOptions {
pub storage_format_hint: StorageFormatHint,
pub num_rows_per_row_group: usize,
pub compression: Compression,
pub max_buffer_size: usize,
}

#[derive(Debug, Default)]
Expand Down
13 changes: 7 additions & 6 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ use std::{

use lru::LruCache;
use parquet::file::metadata::FileMetaData;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};

use crate::sst::{
meta_data::{
DecodeCustomMetaData, EmptyCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result,
},
meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result},
parquet::encoding,
};

Expand Down Expand Up @@ -41,11 +39,14 @@ impl MetaData {
let kv_metas = file_meta_data
.key_value_metadata()
.context(KvMetaDataNotFound)?;
ensure!(!kv_metas.is_empty(), EmptyCustomMetaData);
let kv_meta = kv_metas
.iter()
.find(|kv| kv.key == encoding::META_KEY)
.context(KvMetaDataNotFound)?;

let custom = {
let mut sst_meta =
encoding::decode_sst_meta_data(&kv_metas[0]).context(DecodeCustomMetaData)?;
encoding::decode_sst_meta_data(kv_meta).context(DecodeCustomMetaData)?;
if ignore_sst_filter {
sst_meta.parquet_filter = None;
}
Expand Down
Loading

0 comments on commit 54b72b0

Please sign in to comment.