Skip to content

Commit

Permalink
Merge eb9ecaf into d71e875
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi authored Mar 20, 2023
2 parents d71e875 + eb9ecaf commit df6fb63
Show file tree
Hide file tree
Showing 20 changed files with 517 additions and 188 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ object_store = { path = "components/object_store" }
parquet_ext = { path = "components/parquet_ext" }
parquet = { version = "32.0.0" }
paste = "1.0"
pin-project-lite = "0.2.8"
profile = { path = "components/profile" }
prometheus = "0.12"
prometheus-static-metric = "0.5"
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ xorfilter-rs = { workspace = true }
common_types = { workspace = true, features = ["test"] }
common_util = { workspace = true, features = ["test"] }
env_logger = { workspace = true }
pin-project-lite = { workspace = true }
rand = { workspace = true }
wal = { workspace = true, features = ["test"] }
20 changes: 19 additions & 1 deletion analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::{
instance::{
flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore,
},
sst::factory::SstWriteOptions,
table::data::TableDataRef,
TableOptions,
};
Expand Down Expand Up @@ -287,6 +288,7 @@ impl SchedulerImpl {
space_store: Arc<SpaceStore>,
runtime: Arc<Runtime>,
config: SchedulerConfig,
write_sst_max_buffer_size: usize,
) -> Self {
let (tx, rx) = mpsc::channel(config.schedule_channel_len);
let running = Arc::new(AtomicBool::new(true));
Expand All @@ -300,6 +302,7 @@ impl SchedulerImpl {
picker_manager: PickerManager::default(),
max_ongoing_tasks: config.max_ongoing_tasks,
max_unflushed_duration: config.max_unflushed_duration.0,
write_sst_max_buffer_size,
limit: Arc::new(OngoingTaskLimit {
ongoing_tasks: AtomicUsize::new(0),
request_buf: RwLock::new(RequestQueue::default()),
Expand Down Expand Up @@ -367,6 +370,7 @@ struct ScheduleWorker {
max_unflushed_duration: Duration,
picker_manager: PickerManager,
max_ongoing_tasks: usize,
write_sst_max_buffer_size: usize,
limit: Arc<OngoingTaskLimit>,
running: Arc<AtomicBool>,
memory_limit: MemoryLimit,
Expand Down Expand Up @@ -462,13 +466,27 @@ impl ScheduleWorker {

let sender = self.sender.clone();
let request_id = RequestId::next_id();
let storage_format_hint = table_data.table_options().storage_format_hint;
let sst_write_options = SstWriteOptions {
storage_format_hint,
num_rows_per_row_group: table_data.table_options().num_rows_per_row_group,
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
};

// Do actual costly compact job in background.
self.runtime.spawn(async move {
// Release the token after compaction finished.
let _token = token;

let res = space_store
.compact_table(runtime, &table_data, request_id, &compaction_task)
.compact_table(
runtime,
&table_data,
request_id,
&compaction_task,
&sst_write_options,
)
.await;

if let Err(e) = &res {
Expand Down
15 changes: 7 additions & 8 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ impl Instance {
storage_format_hint: table_data.table_options().storage_format_hint,
num_rows_per_row_group: table_data.table_options().num_rows_per_row_group,
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
};

for time_range in &time_ranges {
Expand Down Expand Up @@ -674,6 +675,7 @@ impl Instance {
storage_format_hint,
num_rows_per_row_group: table_data.table_options().num_rows_per_row_group,
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
};
let mut writer = self
.space_store
Expand Down Expand Up @@ -729,6 +731,7 @@ impl SpaceStore {
table_data: &TableData,
request_id: RequestId,
task: &CompactionTask,
sst_write_options: &SstWriteOptions,
) -> Result<()> {
debug!(
"Begin compact table, table_name:{}, id:{}, task:{:?}",
Expand Down Expand Up @@ -766,6 +769,7 @@ impl SpaceStore {
table_data,
request_id,
input,
sst_write_options,
&mut edit_meta,
)
.await?;
Expand Down Expand Up @@ -796,6 +800,7 @@ impl SpaceStore {
table_data: &TableData,
request_id: RequestId,
input: &CompactionInputFiles,
sst_write_options: &SstWriteOptions,
edit_meta: &mut VersionEditMeta,
) -> Result<()> {
debug!(
Expand Down Expand Up @@ -907,18 +912,12 @@ impl SpaceStore {
let file_id = table_data.alloc_file_id();
let sst_file_path = table_data.set_sst_file_path(file_id);

let storage_format_hint = table_data.table_options().storage_format_hint;
let sst_write_options = SstWriteOptions {
storage_format_hint,
num_rows_per_row_group: table_options.num_rows_per_row_group,
compression: table_options.compression,
};
let mut sst_writer = self
.sst_factory
.create_writer(&sst_write_options, &sst_file_path, self.store_picker())
.create_writer(sst_write_options, &sst_file_path, self.store_picker())
.await
.context(CreateSstWriter {
storage_format_hint,
storage_format_hint: sst_write_options.storage_format_hint,
})?;

let sst_info = sst_writer
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ pub struct Instance {
pub(crate) space_write_buffer_size: usize,
/// Replay wal batch size
pub(crate) replay_batch_size: usize,
/// Write sst max buffer size
pub(crate) write_sst_max_buffer_size: usize,
/// Options for scanning sst
pub(crate) iter_options: IterOptions,
pub(crate) remote_engine: Option<RemoteEngineRef>,
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl Instance {
space_store.clone(),
bg_runtime.clone(),
scheduler_config,
ctx.config.write_sst_max_buffer_size.as_bytes() as usize,
));

let file_purger = FilePurger::start(&bg_runtime, store_picker.default_store().clone());
Expand All @@ -92,6 +93,7 @@ impl Instance {
db_write_buffer_size: ctx.config.db_write_buffer_size,
space_write_buffer_size: ctx.config.space_write_buffer_size,
replay_batch_size: ctx.config.replay_batch_size,
write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_bytes() as usize,
iter_options,
remote_engine: remote_engine_ref,
});
Expand Down
5 changes: 4 additions & 1 deletion analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod table_options;
#[cfg(any(test, feature = "test"))]
pub mod tests;

use common_util::config::ReadableDuration;
use common_util::config::{ReadableDuration, ReadableSize};
use manifest::details::Options as ManifestOptions;
use message_queue::kafka::config::Config as KafkaConfig;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -75,6 +75,8 @@ pub struct Config {
pub scan_batch_size: usize,
/// Sst background reading parallelism
pub sst_background_read_parallelism: usize,
/// Max buffer size for writing sst
pub write_sst_max_buffer_size: ReadableSize,

/// Wal storage config
///
Expand Down Expand Up @@ -108,6 +110,7 @@ impl Default for Config {
db_write_buffer_size: 0,
scan_batch_size: 500,
sst_background_read_parallelism: 8,
write_sst_max_buffer_size: ReadableSize::mb(10),
wal: WalStorageConfig::RocksDB(Box::default()),
remote_engine_client: remote_engine_client::config::Config::default(),
}
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub struct SstWriteOptions {
pub storage_format_hint: StorageFormatHint,
pub num_rows_per_row_group: usize,
pub compression: Compression,
pub max_buffer_size: usize,
}

#[derive(Debug, Default)]
Expand Down
13 changes: 7 additions & 6 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ use std::{

use lru::LruCache;
use parquet::file::metadata::FileMetaData;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};

use crate::sst::{
meta_data::{
DecodeCustomMetaData, EmptyCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result,
},
meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result},
parquet::encoding,
};

Expand Down Expand Up @@ -41,11 +39,14 @@ impl MetaData {
let kv_metas = file_meta_data
.key_value_metadata()
.context(KvMetaDataNotFound)?;
ensure!(!kv_metas.is_empty(), EmptyCustomMetaData);
let kv_meta = kv_metas
.iter()
.find(|kv| kv.key == encoding::META_KEY)
.context(KvMetaDataNotFound)?;

let custom = {
let mut sst_meta =
encoding::decode_sst_meta_data(&kv_metas[0]).context(DecodeCustomMetaData)?;
encoding::decode_sst_meta_data(kv_meta).context(DecodeCustomMetaData)?;
if ignore_sst_filter {
sst_meta.parquet_filter = None;
}
Expand Down
Loading

0 comments on commit df6fb63

Please sign in to comment.