Skip to content

Commit

Permalink
feat: initial changes for compaction_time_window field support (#1083)
Browse files Browse the repository at this point in the history
* feat(compaction_time_window): initial changes for compaction_time_window field support

* feat(compaction_time_window): move PickerContext creation

* feat(compaction_time_window): update region descriptor, fix formatting

* feat(compaction_time_window): add minor enhancements

* feat(compaction_time_window): fix failing test

* feat(compaction_time_window):  return an error instead silently skip for the user provided compaction_time_window

* feat(compaction_time_window): add TODO reminder
  • Loading branch information
etolbakov authored Apr 6, 2023
1 parent a6932c6 commit 59f7630
Show file tree
Hide file tree
Showing 19 changed files with 140 additions and 33 deletions.
3 changes: 3 additions & 0 deletions src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.id(region_id)
.name(&region_name)
.row_key(row_key.clone())
.compaction_time_window(request.table_options.compaction_time_window)
.default_cf(default_cf.clone())
.build()
.context(BuildRegionDescriptorSnafu {
Expand All @@ -406,6 +407,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.write_buffer_size
.map(|size| size.0 as usize),
ttl: request.table_options.ttl,
compaction_time_window: request.table_options.compaction_time_window,
};

let region = self
Expand Down Expand Up @@ -512,6 +514,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.write_buffer_size
.map(|s| s.0 as usize),
ttl: table_info.meta.options.ttl,
compaction_time_window: table_info.meta.options.compaction_time_window,
};

debug!(
Expand Down
4 changes: 4 additions & 0 deletions src/mito/src/engine/procedure/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,18 @@ impl<S: StorageEngine> CreateMitoTable<S> {
let table_options = &self.data.request.table_options;
let write_buffer_size = table_options.write_buffer_size.map(|size| size.0 as usize);
let ttl = table_options.ttl;
let compaction_time_window = table_options.compaction_time_window;
let open_opts = OpenOptions {
parent_dir: table_dir.clone(),
write_buffer_size,
ttl,
compaction_time_window,
};
let create_opts = CreateOptions {
parent_dir: table_dir,
write_buffer_size,
ttl,
compaction_time_window,
};

let primary_key_indices = &self.data.request.primary_key_indices;
Expand Down Expand Up @@ -216,6 +219,7 @@ impl<S: StorageEngine> CreateMitoTable<S> {
.name(region_name.clone())
.row_key(row_key.clone())
.default_cf(default_cf.clone())
.compaction_time_window(compaction_time_window)
.build()
.context(BuildRegionDescriptorSnafu {
table_name: &self.data.request.table_name,
Expand Down
1 change: 1 addition & 0 deletions src/storage/benches/memtable/util/regiondesc_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl RegionDescBuilder {
row_key: self.key_builder.build().unwrap(),
default_cf: self.default_cf_builder.build().unwrap(),
extra_cfs: Vec::new(),
compaction_time_window: None,
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/compaction/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::marker::PhantomData;

use store_api::storage::RegionId;

use crate::compaction::{CompactionTask, Picker, PickerContext};
use crate::compaction::{CompactionTask, Picker};
use crate::error::Result;
use crate::scheduler::{Request, Scheduler};

Expand Down Expand Up @@ -49,7 +49,7 @@ impl Picker for NoopCompactionPicker {
type Request = NoopCompactionRequest;
type Task = NoopCompactionTask;

fn pick(&self, _ctx: &PickerContext, _req: &Self::Request) -> Result<Option<Self::Task>> {
fn pick(&self, _req: &Self::Request) -> Result<Option<Self::Task>> {
Ok(None)
}
}
Expand Down
27 changes: 19 additions & 8 deletions src/storage/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,24 @@ pub trait Picker: Send + 'static {
type Request: Request;
type Task: CompactionTask;

fn pick(
&self,
ctx: &PickerContext,
req: &Self::Request,
) -> crate::error::Result<Option<Self::Task>>;
fn pick(&self, req: &Self::Request) -> crate::error::Result<Option<Self::Task>>;
}

pub struct PickerContext {}
pub struct PickerContext {
compaction_time_window: Option<i64>,
}

impl PickerContext {
pub fn with(compaction_time_window: Option<i64>) -> Self {
Self {
compaction_time_window,
}
}

pub fn compaction_time_window(&self) -> Option<i64> {
self.compaction_time_window
}
}

/// L0 -> L1 compaction based on time windows.
pub struct SimplePicker<S> {
Expand Down Expand Up @@ -89,7 +99,6 @@ impl<S: LogStore> Picker for SimplePicker<S> {

fn pick(
&self,
ctx: &PickerContext,
req: &CompactionRequestImpl<S>,
) -> crate::error::Result<Option<CompactionTaskImpl<S>>> {
let levels = &req.levels();
Expand All @@ -110,9 +119,10 @@ impl<S: LogStore> Picker for SimplePicker<S> {
expired_ssts.iter().for_each(|f| f.mark_compacting(true));
}

let ctx = &PickerContext::with(req.compaction_time_window);
for level_num in 0..levels.level_num() {
let level = levels.level(level_num as u8);
let outputs = self.strategy.pick(ctx, level);
let (compaction_time_window, outputs) = self.strategy.pick(ctx, level);

if outputs.is_empty() {
debug!("No SST file can be compacted at level {}", level_num);
Expand All @@ -133,6 +143,7 @@ impl<S: LogStore> Picker for SimplePicker<S> {
manifest: req.manifest.clone(),
expired_ssts,
sst_write_buffer_size: req.sst_write_buffer_size,
compaction_time_window,
}));
}

Expand Down
5 changes: 3 additions & 2 deletions src/storage/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use store_api::storage::RegionId;
use tokio::sync::oneshot::Sender;
use tokio::sync::Notify;

use crate::compaction::picker::{Picker, PickerContext};
use crate::compaction::picker::Picker;
use crate::compaction::task::CompactionTask;
use crate::error::Result;
use crate::manifest::region::RegionManifest;
Expand Down Expand Up @@ -59,6 +59,7 @@ pub struct CompactionRequestImpl<S: LogStore> {
pub manifest: RegionManifest,
pub wal: Wal<S>,
pub ttl: Option<Duration>,
pub compaction_time_window: Option<i64>,
/// Compaction result sender.
pub sender: Option<Sender<Result<()>>>,

Expand Down Expand Up @@ -101,7 +102,7 @@ where
finish_notifier: Arc<Notify>,
) -> Result<()> {
let region_id = req.key();
let Some(task) = self.picker.pick(&PickerContext {}, &req)? else {
let Some(task) = self.picker.pick(&req)? else {
info!("No file needs compaction in region: {:?}", region_id);
req.complete(Ok(()));
return Ok(());
Expand Down
34 changes: 19 additions & 15 deletions src/storage/src/compaction/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::sst::{FileHandle, LevelMeta};

/// Compaction strategy that defines which SSTs need to be compacted at given level.
pub trait Strategy {
fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> Vec<CompactionOutput>;
fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> (Option<i64>, Vec<CompactionOutput>);
}

pub type StrategyRef = Arc<dyn Strategy + Send + Sync>;
Expand All @@ -37,29 +37,33 @@ pub type StrategyRef = Arc<dyn Strategy + Send + Sync>;
pub struct SimpleTimeWindowStrategy {}

impl Strategy for SimpleTimeWindowStrategy {
fn pick(&self, _ctx: &PickerContext, level: &LevelMeta) -> Vec<CompactionOutput> {
fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> (Option<i64>, Vec<CompactionOutput>) {
// SimpleTimeWindowStrategy only handles level 0 to level 1 compaction.
if level.level() != 0 {
return vec![];
return (None, vec![]);
}
let files = find_compactable_files(level);
debug!("Compactable files found: {:?}", files);
if files.is_empty() {
return vec![];
return (None, vec![]);
}

let time_bucket = infer_time_bucket(&files);
let time_bucket = ctx
.compaction_time_window()
.unwrap_or_else(|| infer_time_bucket(&files));
let buckets = calculate_time_buckets(time_bucket, &files);
debug!("File bucket:{}, file groups: {:?}", time_bucket, buckets);
buckets
.into_iter()
.map(|(bound, files)| CompactionOutput {
output_level: 1,
bucket_bound: bound,
bucket: time_bucket,
inputs: files,
})
.collect()
(
Some(time_bucket),
buckets
.into_iter()
.map(|(bound, files)| CompactionOutput {
output_level: 1,
bucket_bound: bound,
bucket: time_bucket,
inputs: files,
})
.collect(),
)
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct CompactionTaskImpl<S: LogStore> {
pub manifest: RegionManifest,
pub expired_ssts: Vec<FileHandle>,
pub sst_write_buffer_size: ReadableSize,
pub compaction_time_window: Option<i64>,
}

impl<S: LogStore> Debug for CompactionTaskImpl<S> {
Expand Down Expand Up @@ -101,6 +102,7 @@ impl<S: LogStore> CompactionTaskImpl<S> {
}

/// Writes updated SST info into manifest.
// TODO(etolbakov): we are not persisting inferred compaction_time_window (#1083)[https://github.com/GreptimeTeam/greptimedb/pull/1083]
async fn write_manifest_and_apply(
&self,
output: HashSet<FileMeta>,
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ impl<S: LogStore> EngineInner<S> {
name,
&self.config,
opts.ttl,
opts.compaction_time_window,
)
.await?;

Expand Down Expand Up @@ -340,6 +341,7 @@ impl<S: LogStore> EngineInner<S> {
&region_name,
&self.config,
opts.ttl,
opts.compaction_time_window,
)
.await?;

Expand Down Expand Up @@ -368,6 +370,7 @@ impl<S: LogStore> EngineInner<S> {
region_name: &str,
config: &EngineConfig,
ttl: Option<Duration>,
compaction_time_window: Option<i64>,
) -> Result<StoreConfig<S>> {
let parent_dir = util::normalize_dir(parent_dir);

Expand Down Expand Up @@ -397,6 +400,7 @@ impl<S: LogStore> EngineInner<S> {
engine_config: self.config.clone(),
file_purger: self.file_purger.clone(),
ttl,
compaction_time_window,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/manifest/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct RawRegionMetadata {
pub columns: RawColumnsMetadata,
pub column_families: RawColumnFamiliesMetadata,
pub version: VersionNumber,
/// Time window for compaction
pub compaction_time_window: Option<i64>,
}

/// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata).
Expand Down
22 changes: 21 additions & 1 deletion src/storage/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ pub struct RegionMetadata {
pub columns: ColumnsMetadataRef,
column_families: ColumnFamiliesMetadata,
version: VersionNumber,
/// Time window for compaction
compaction_time_window: Option<i64>,
}

impl RegionMetadata {
Expand All @@ -216,6 +218,11 @@ impl RegionMetadata {
&self.schema
}

#[inline]
pub fn compaction_time_window(&self) -> Option<i64> {
self.compaction_time_window
}

#[inline]
pub fn user_schema(&self) -> &SchemaRef {
self.schema.user_schema()
Expand Down Expand Up @@ -317,7 +324,8 @@ impl RegionMetadata {
let mut builder = RegionDescriptorBuilder::default()
.id(self.id)
.name(&self.name)
.row_key(row_key);
.row_key(row_key)
.compaction_time_window(self.compaction_time_window);

for (cf_id, cf) in &self.column_families.id_to_cfs {
let mut cf_builder = ColumnFamilyDescriptorBuilder::default()
Expand Down Expand Up @@ -350,6 +358,7 @@ impl From<&RegionMetadata> for RawRegionMetadata {
columns: RawColumnsMetadata::from(&*data.columns),
column_families: RawColumnFamiliesMetadata::from(&data.column_families),
version: data.version,
compaction_time_window: data.compaction_time_window,
}
}
}
Expand All @@ -368,6 +377,7 @@ impl TryFrom<RawRegionMetadata> for RegionMetadata {
columns,
column_families: raw.column_families.into(),
version: raw.version,
compaction_time_window: raw.compaction_time_window,
})
}
}
Expand Down Expand Up @@ -635,6 +645,7 @@ impl TryFrom<RegionDescriptor> for RegionMetadataBuilder {
.name(desc.name)
.id(desc.id)
.row_key(desc.row_key)?
.compaction_time_window(desc.compaction_time_window)
.add_column_family(desc.default_cf)?;
for cf in desc.extra_cfs {
builder = builder.add_column_family(cf)?;
Expand Down Expand Up @@ -791,6 +802,7 @@ struct RegionMetadataBuilder {
columns_meta_builder: ColumnsMetadataBuilder,
cfs_meta_builder: ColumnFamiliesMetadataBuilder,
version: VersionNumber,
compaction_time_window: Option<i64>,
}

impl Default for RegionMetadataBuilder {
Expand All @@ -807,6 +819,7 @@ impl RegionMetadataBuilder {
columns_meta_builder: ColumnsMetadataBuilder::default(),
cfs_meta_builder: ColumnFamiliesMetadataBuilder::default(),
version: Schema::INITIAL_VERSION,
compaction_time_window: None,
}
}

Expand All @@ -831,6 +844,11 @@ impl RegionMetadataBuilder {
Ok(self)
}

fn compaction_time_window(mut self, compaction_time_window: Option<i64>) -> Self {
self.compaction_time_window = compaction_time_window;
self
}

fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result<Self> {
let column_index_start = self.columns_meta_builder.columns.len();
let column_index_end = column_index_start + cf.columns.len();
Expand Down Expand Up @@ -861,6 +879,7 @@ impl RegionMetadataBuilder {
columns,
column_families: self.cfs_meta_builder.build(),
version: self.version,
compaction_time_window: self.compaction_time_window,
})
}
}
Expand Down Expand Up @@ -1065,6 +1084,7 @@ mod tests {
.unwrap();
RegionMetadataBuilder::new()
.name(TEST_REGION)
.compaction_time_window(None)
.row_key(row_key)
.unwrap()
.add_column_family(cf)
Expand Down
Loading

0 comments on commit 59f7630

Please sign in to comment.