Skip to content

Commit

Permalink
feat: compaction reader and writer (#972)
Browse files Browse the repository at this point in the history
* feat: compaction reader and writer

* feat: make ParquetWrite accept both memtable iterator and chunk reader

* feat: adapt ParquetWriter to accomodate ChunkReaderImpl

* chore: rebase develop

* wip: compile

* wip: task logic

* feat: version and manifest update

* fix: remove useless as_inner from Timestamp vectors

* feat: mark file compacting

* fix: unit test

* fix: clippy warnings

* fix: CR comment

* chore: according to cr comments, remove visit_levels from LevelMetas

* fix: some CR comments

* fix: add PlainTimestampRowFilter for correctness

* fix: cr comments

* fix: some typos
  • Loading branch information
v0y4g3r authored Feb 14, 2023
1 parent 8491f65 commit 374acc8
Show file tree
Hide file tree
Showing 31 changed files with 1,444 additions and 290 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ license = "Apache-2.0"

[workspace.dependencies]
arrow = "29.0"
arrow-array = "29.0"
arrow-flight = "29.0"
arrow-schema = { version = "29.0", features = ["serde"] }
async-stream = "0.3"
Expand Down
1 change: 1 addition & 0 deletions src/common/time/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(int_roundings)]
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
68 changes: 68 additions & 0 deletions src/common/time/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ impl Timestamp {
}
}

/// Convert a timestamp to given time unit.
/// Conversion from a timestamp with smaller unit to a larger unit will round the value
/// to ceil (positive infinity).
/// Return `None` if conversion causes overflow.
pub fn convert_to_ceil(&self, unit: TimeUnit) -> Option<Timestamp> {
if self.unit().factor() >= unit.factor() {
let mul = self.unit().factor() / unit.factor();
let value = self.value.checked_mul(mul)?;
Some(Timestamp::new(value, unit))
} else {
let mul = unit.factor() / self.unit().factor();
Some(Timestamp::new(self.value.div_ceil(mul), unit))
}
}

/// Split a [Timestamp] into seconds part and nanoseconds part.
/// Notice the seconds part of split result is always rounded down to floor.
fn split(&self) -> (i64, i64) {
Expand Down Expand Up @@ -718,4 +733,57 @@ mod tests {
Timestamp::new(i64::MAX, TimeUnit::Second).split()
);
}

#[test]
fn test_convert_to_ceil() {
assert_eq!(
Timestamp::new(1, TimeUnit::Second),
Timestamp::new(1000, TimeUnit::Millisecond)
.convert_to_ceil(TimeUnit::Second)
.unwrap()
);

// These two cases shows how `Timestamp::convert_to_ceil` behaves differently
// from `Timestamp::convert_to` when converting larger unit to smaller unit.
assert_eq!(
Timestamp::new(1, TimeUnit::Second),
Timestamp::new(1001, TimeUnit::Millisecond)
.convert_to(TimeUnit::Second)
.unwrap()
);
assert_eq!(
Timestamp::new(2, TimeUnit::Second),
Timestamp::new(1001, TimeUnit::Millisecond)
.convert_to_ceil(TimeUnit::Second)
.unwrap()
);

assert_eq!(
Timestamp::new(-1, TimeUnit::Second),
Timestamp::new(-1, TimeUnit::Millisecond)
.convert_to(TimeUnit::Second)
.unwrap()
);
assert_eq!(
Timestamp::new(0, TimeUnit::Second),
Timestamp::new(-1, TimeUnit::Millisecond)
.convert_to_ceil(TimeUnit::Second)
.unwrap()
);

// When converting large unit to smaller unit, there will be no rounding error,
// so `Timestamp::convert_to_ceil` behaves just like `Timestamp::convert_to`
assert_eq!(
Timestamp::new(-1, TimeUnit::Second).convert_to(TimeUnit::Millisecond),
Timestamp::new(-1, TimeUnit::Second).convert_to_ceil(TimeUnit::Millisecond)
);
assert_eq!(
Timestamp::new(1000, TimeUnit::Second).convert_to(TimeUnit::Millisecond),
Timestamp::new(1000, TimeUnit::Second).convert_to_ceil(TimeUnit::Millisecond)
);
assert_eq!(
Timestamp::new(1, TimeUnit::Second).convert_to(TimeUnit::Millisecond),
Timestamp::new(1, TimeUnit::Second).convert_to_ceil(TimeUnit::Millisecond)
);
}
}
3 changes: 2 additions & 1 deletion src/mito/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl<R: Region> Table for MitoTable<R> {
.context(table_error::TableOperationSnafu)?
.reader;

let schema = reader.schema().clone();
let schema = reader.user_schema().clone();
if let Some(first_schema) = &first_schema {
// TODO(hl): we assume all regions' schemas are the same, but undergoing table altering
// may make these schemas inconsistent.
Expand All @@ -198,6 +198,7 @@ impl<R: Region> Table for MitoTable<R> {
let stream = Box::pin(async_stream::try_stream! {
for mut reader in readers {
while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? {
let chunk = reader.project_chunk(chunk);
yield RecordBatch::new(stream_schema.clone(), chunk.columns)?
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/mito/src/table/test_util/mock_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct MockChunkReader {
impl ChunkReader for MockChunkReader {
type Error = MockError;

fn schema(&self) -> &SchemaRef {
fn user_schema(&self) -> &SchemaRef {
&self.schema
}

Expand All @@ -69,6 +69,10 @@ impl ChunkReader for MockChunkReader {

Ok(Some(Chunk::new(columns)))
}

fn project_chunk(&self, chunk: Chunk) -> Chunk {
chunk
}
}

pub struct MockSnapshot {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ arc-swap = "1.0"
async-compat = "0.2"
async-stream.workspace = true
async-trait = "0.1"
arrow.workspace = true
arrow-array.workspace = true
bytes = "1.1"
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
Expand All @@ -18,6 +20,8 @@ common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
datafusion-common.workspace = true
datafusion-expr.workspace = true
futures.workspace = true
futures-util.workspace = true
lazy_static = "1.4"
Expand Down
64 changes: 39 additions & 25 deletions src/storage/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use table::predicate::{Predicate, TimeRangePredicateBuilder};

use crate::error::{self, Error, Result};
use crate::memtable::{IterContext, MemtableRef};
use crate::read::{BoxedBatchReader, DedupReader, MergeReaderBuilder};
use crate::read::{Batch, BoxedBatchReader, DedupReader, MergeReaderBuilder};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions};

/// Chunk reader implementation.
// Now we use async-trait to implement the chunk reader, which is easier to implement than
Expand All @@ -41,7 +41,7 @@ pub struct ChunkReaderImpl {
impl ChunkReader for ChunkReaderImpl {
type Error = Error;

fn schema(&self) -> &SchemaRef {
fn user_schema(&self) -> &SchemaRef {
self.schema.projected_user_schema()
}

Expand All @@ -50,10 +50,14 @@ impl ChunkReader for ChunkReaderImpl {
Some(b) => b,
None => return Ok(None),
};
Ok(Some(Chunk::new(batch.columns)))
}

let chunk = self.schema.batch_to_chunk(&batch);

Ok(Some(chunk))
fn project_chunk(&self, chunk: Chunk) -> Chunk {
let batch = Batch {
columns: chunk.columns,
};
self.schema.batch_to_chunk(&batch)
}
}

Expand All @@ -64,6 +68,11 @@ impl ChunkReaderImpl {
batch_reader,
}
}

#[inline]
pub fn projected_schema(&self) -> &ProjectedSchemaRef {
&self.schema
}
}

/// Builder to create a new [ChunkReaderImpl] from scan request.
Expand Down Expand Up @@ -121,14 +130,34 @@ impl ChunkReaderBuilder {
self
}

pub fn pick_ssts(mut self, ssts: &LevelMetas) -> Result<Self> {
ssts.visit_levels(&mut self)?;

/// Picks all SSTs in all levels
pub fn pick_all_ssts(mut self, ssts: &LevelMetas) -> Result<Self> {
let files = ssts.levels().iter().flat_map(|level| level.files());
// Now we read all files, so just reserve enough space to hold all files.
self.files_to_read.reserve(files.size_hint().0);
for file in files {
// We can't invoke async functions here, so we collects all files first, and
// create the batch reader later in `ChunkReaderBuilder`.
self.files_to_read.push(file.clone());
}
Ok(self)
}

/// Picks given SSTs to read.
pub fn pick_ssts(mut self, ssts: &[FileHandle]) -> Self {
for file in ssts {
self.files_to_read.push(file.clone());
}
self
}

pub async fn build(mut self) -> Result<ChunkReaderImpl> {
let time_range_predicate = self.build_time_range_predicate();
debug!(
"Time range predicate for chunk reader: {:?}",
time_range_predicate
);

let schema = Arc::new(
ProjectedSchema::new(self.schema, self.projection)
.context(error::InvalidProjectionSnafu)?,
Expand All @@ -148,6 +177,7 @@ impl ChunkReaderBuilder {
batch_size: self.iter_ctx.batch_size,
projected_schema: schema.clone(),
predicate: Predicate::new(self.filters),
time_range: time_range_predicate,
};
for file in &self.files_to_read {
if !Self::file_in_range(file, time_range_predicate) {
Expand Down Expand Up @@ -189,19 +219,3 @@ impl ChunkReaderBuilder {
file_ts_range.intersects(&predicate)
}
}

impl Visitor for ChunkReaderBuilder {
fn visit(&mut self, _level: usize, files: &[FileHandle]) -> Result<()> {
// TODO(yingwen): Filter files by time range.

// Now we read all files, so just reserve enough space to hold all files.
self.files_to_read.reserve(files.len());
for file in files {
// We can't invoke async functions here, so we collects all files first, and
// create the batch reader later in `ChunkReaderBuilder`.
self.files_to_read.push(file.clone());
}

Ok(())
}
}
1 change: 1 addition & 0 deletions src/storage/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ mod rate_limit;
mod scheduler;
mod strategy;
mod task;
mod writer;
10 changes: 9 additions & 1 deletion src/storage/src/compaction/dedup_deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ use std::fmt::{Debug, Formatter};
use std::hash::Hash;

/// Deque with key deduplication.
#[derive(Default)]
pub struct DedupDeque<K, V> {
deque: VecDeque<K>,
existing: HashMap<K, V>,
}

impl<K, V> Default for DedupDeque<K, V> {
fn default() -> Self {
Self {
deque: VecDeque::new(),
existing: HashMap::new(),
}
}
}

impl<K: Eq + Hash + Clone, V> DedupDeque<K, V> {
/// Pushes a key value to the back of deque.
/// Returns true if the deque does not already contain value with the same key, otherwise
Expand Down
40 changes: 29 additions & 11 deletions src/storage/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::PhantomData;

use common_telemetry::debug;
use store_api::logstore::LogStore;

use crate::compaction::scheduler::CompactionRequestImpl;
use crate::compaction::strategy::StrategyRef;
Expand All @@ -26,37 +29,52 @@ pub trait Picker<R, T: CompactionTask>: Send + 'static {

pub struct PickerContext {}

/// L0 -> L1 all-to-all compaction based on time windows.
pub(crate) struct SimplePicker {
/// L0 -> L1 compaction based on time windows.
pub(crate) struct SimplePicker<S> {
strategy: StrategyRef,
_phantom_data: PhantomData<S>,
}

#[allow(unused)]
impl SimplePicker {
impl<S> SimplePicker<S> {
pub fn new(strategy: StrategyRef) -> Self {
Self { strategy }
Self {
strategy,
_phantom_data: Default::default(),
}
}
}

impl Picker<CompactionRequestImpl, CompactionTaskImpl> for SimplePicker {
impl<S: LogStore> Picker<CompactionRequestImpl<S>, CompactionTaskImpl<S>> for SimplePicker<S> {
fn pick(
&self,
ctx: &PickerContext,
req: &CompactionRequestImpl,
) -> crate::error::Result<Option<CompactionTaskImpl>> {
let levels = req.levels();
req: &CompactionRequestImpl<S>,
) -> crate::error::Result<Option<CompactionTaskImpl<S>>> {
let levels = &req.levels;

for level_num in 0..levels.level_num() {
let level = levels.level(level_num as u8);
let outputs = self.strategy.pick(ctx, level);

if outputs.is_empty() {
debug!("No SST file can be compacted at level {}", level_num);
return Ok(None);
continue;
}

debug!("Found SST files to compact {:?}", outputs);
// TODO(hl): build compaction task
debug!(
"Found SST files to compact {:?} on level: {}",
outputs, level_num
);
return Ok(Some(CompactionTaskImpl {
schema: req.schema.clone(),
sst_layer: req.sst_layer.clone(),
outputs,
writer: req.writer.clone(),
shared_data: req.shared.clone(),
wal: req.wal.clone(),
manifest: req.manifest.clone(),
}));
}

Ok(None)
Expand Down
Loading

0 comments on commit 374acc8

Please sign in to comment.