Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filter batch by sequence in memtable #5367

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/metric-engine/src/metadata_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ impl MetadataRegion {
output_ordering: None,
limit: None,
series_row_selector: None,
sequence: None,
};
let record_batch_stream = self
.mito
Expand Down Expand Up @@ -469,6 +470,7 @@ impl MetadataRegion {
output_ordering: None,
limit: None,
series_row_selector: None,
sequence: None,
}
}

Expand Down Expand Up @@ -630,6 +632,7 @@ mod test {
output_ordering: None,
limit: None,
series_row_selector: None,
sequence: None,
};
let actual_scan_request = MetadataRegion::build_read_request(key);
assert_eq!(actual_scan_request, expected_scan_request);
Expand Down
8 changes: 4 additions & 4 deletions src/mito2/benches/memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn full_scan(c: &mut Criterion) {
}

b.iter(|| {
let iter = memtable.iter(None, None).unwrap();
let iter = memtable.iter(None, None, None).unwrap();
for batch in iter {
let _batch = batch.unwrap();
}
Expand All @@ -98,7 +98,7 @@ fn full_scan(c: &mut Criterion) {
}

b.iter(|| {
let iter = memtable.iter(None, None).unwrap();
let iter = memtable.iter(None, None, None).unwrap();
for batch in iter {
let _batch = batch.unwrap();
}
Expand All @@ -124,7 +124,7 @@ fn filter_1_host(c: &mut Criterion) {
let predicate = generator.random_host_filter();

b.iter(|| {
let iter = memtable.iter(None, Some(predicate.clone())).unwrap();
let iter = memtable.iter(None, Some(predicate.clone()), None).unwrap();
for batch in iter {
let _batch = batch.unwrap();
}
Expand All @@ -138,7 +138,7 @@ fn filter_1_host(c: &mut Criterion) {
let predicate = generator.random_host_filter();

b.iter(|| {
let iter = memtable.iter(None, Some(predicate.clone())).unwrap();
let iter = memtable.iter(None, Some(predicate.clone()), None).unwrap();
for batch in iter {
let _batch = batch.unwrap();
}
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/engine/projection_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ async fn test_scan_projection() {
output_ordering: None,
limit: None,
series_row_selector: None,
sequence: None,
};
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ impl RegionFlushTask {

let max_sequence = mem.stats().max_sequence();
let file_id = FileId::random();
let iter = mem.iter(None, None)?;
let iter = mem.iter(None, None, None)?;
let source = Source::Iter(iter);

// Flush to level 0.
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
) -> Result<BoxedBatchIterator>;

/// Returns the ranges in the memtable.
Expand All @@ -155,6 +156,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
) -> MemtableRanges;

/// Returns true if the memtable is empty.
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/memtable/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::sync::{Arc, RwLock};

use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;

use crate::error::Result;
Expand Down Expand Up @@ -63,6 +63,7 @@ impl Memtable for BulkMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
_sequence: Option<SequenceNumber>,
) -> Result<BoxedBatchIterator> {
todo!()
}
Expand All @@ -71,6 +72,7 @@ impl Memtable for BulkMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
_sequence: Option<SequenceNumber>,
) -> MemtableRanges {
todo!()
}
Expand Down
24 changes: 16 additions & 8 deletions src/mito2/src/memtable/bulk/part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;

use crate::error;
Expand Down Expand Up @@ -68,7 +68,11 @@ impl BulkPart {
&self.metadata
}

pub(crate) fn read(&self, context: BulkIterContextRef) -> Result<Option<BoxedBatchIterator>> {
pub(crate) fn read(
&self,
context: BulkIterContextRef,
sequence: Option<SequenceNumber>,
) -> Result<Option<BoxedBatchIterator>> {
// use predicate to find row groups to read.
let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);

Expand All @@ -82,6 +86,7 @@ impl BulkPart {
row_groups_to_read,
self.metadata.parquet_metadata.clone(),
self.data.clone(),
sequence,
)?;
Ok(Some(Box::new(iter) as BoxedBatchIterator))
}
Expand Down Expand Up @@ -786,11 +791,14 @@ mod tests {
let projection = &[4u32];

let mut reader = part
.read(Arc::new(BulkIterContext::new(
part.metadata.region_metadata.clone(),
&Some(projection.as_slice()),
.read(
Arc::new(BulkIterContext::new(
part.metadata.region_metadata.clone(),
&Some(projection.as_slice()),
None,
)),
None,
)))
)
.unwrap()
.expect("expect at least one row group");

Expand Down Expand Up @@ -837,7 +845,7 @@ mod tests {
predicate,
));
let mut reader = part
.read(context)
.read(context, None)
.unwrap()
.expect("expect at least one row group");
let mut total_rows_read = 0;
Expand Down Expand Up @@ -866,7 +874,7 @@ mod tests {
datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
)])),
));
assert!(part.read(context).unwrap().is_none());
assert!(part.read(context, None).unwrap().is_none());

check_prune_row_group(&part, None, 310);

Expand Down
10 changes: 8 additions & 2 deletions src/mito2/src/memtable/bulk/part_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use bytes::Bytes;
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
use store_api::storage::SequenceNumber;

use crate::error;
use crate::memtable::bulk::context::BulkIterContextRef;
Expand All @@ -31,6 +32,7 @@ pub struct BulkPartIter {
row_groups_to_read: VecDeque<usize>,
current_reader: Option<PruneReader>,
builder: MemtableRowGroupReaderBuilder,
sequence: Option<SequenceNumber>,
}

impl BulkPartIter {
Expand All @@ -40,6 +42,7 @@ impl BulkPartIter {
mut row_groups_to_read: VecDeque<usize>,
parquet_meta: Arc<ParquetMetaData>,
data: Bytes,
sequence: Option<SequenceNumber>,
) -> error::Result<Self> {
let projection_mask = ProjectionMask::roots(
parquet_meta.file_metadata().schema_descr(),
Expand All @@ -62,6 +65,7 @@ impl BulkPartIter {
row_groups_to_read,
current_reader: init_reader,
builder,
sequence,
})
}

Expand All @@ -71,14 +75,16 @@ impl BulkPartIter {
return Ok(None);
};

if let Some(batch) = current.next_batch()? {
if let Some(mut batch) = current.next_batch()? {
batch.filter_by_sequence(self.sequence)?;
return Ok(Some(batch));
}

// Previous row group exhausted, read next row group
while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
current.reset(self.builder.build_row_group_reader(next_row_group, None)?);
if let Some(next_batch) = current.next_batch()? {
if let Some(mut next_batch) = current.next_batch()? {
next_batch.filter_by_sequence(self.sequence)?;
return Ok(Some(next_batch));
}
}
Expand Down
29 changes: 18 additions & 11 deletions src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(crate) use partition::DensePrimaryKeyFilter;
use serde::{Deserialize, Serialize};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;

use crate::error::{Result, UnsupportedOperationSnafu};
Expand Down Expand Up @@ -187,20 +187,23 @@ impl Memtable for PartitionTreeMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
) -> Result<BoxedBatchIterator> {
self.tree.read(projection, predicate)
self.tree.read(projection, predicate, sequence)
}

fn ranges(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
) -> MemtableRanges {
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {
tree: self.tree.clone(),
projection,
predicate,
sequence,
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder));

Expand Down Expand Up @@ -347,12 +350,16 @@ struct PartitionTreeIterBuilder {
tree: Arc<PartitionTree>,
projection: Option<Vec<ColumnId>>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
}

impl IterBuilder for PartitionTreeIterBuilder {
fn build(&self) -> Result<BoxedBatchIterator> {
self.tree
.read(self.projection.as_deref(), self.predicate.clone())
self.tree.read(
self.projection.as_deref(),
self.predicate.clone(),
self.sequence,
)
}
}

Expand Down Expand Up @@ -407,7 +414,7 @@ mod tests {
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<Vec<_>>();

let iter = memtable.iter(None, None).unwrap();
let iter = memtable.iter(None, None, None).unwrap();
let read = collect_iter_timestamps(iter);
assert_eq!(expected_ts, read);

Expand Down Expand Up @@ -461,11 +468,11 @@ mod tests {
);
memtable.write(&kvs).unwrap();

let iter = memtable.iter(None, None).unwrap();
let iter = memtable.iter(None, None, None).unwrap();
let read = collect_iter_timestamps(iter);
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);

let iter = memtable.iter(None, None).unwrap();
let iter = memtable.iter(None, None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
Expand Down Expand Up @@ -506,7 +513,7 @@ mod tests {
let expect = (0..100).collect::<Vec<_>>();
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
memtable.write(&kvs).unwrap();
let iter = memtable.iter(Some(&[3]), None).unwrap();
let iter = memtable.iter(Some(&[3]), None, None).unwrap();

let mut v0_all = vec![];
for res in iter {
Expand Down Expand Up @@ -578,7 +585,7 @@ mod tests {
data.sort_unstable();

let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
let iter = memtable.iter(None, None).unwrap();
let iter = memtable.iter(None, None, None).unwrap();
let read = collect_iter_timestamps(iter);
assert_eq!(expect, read);
}
Expand Down Expand Up @@ -614,7 +621,7 @@ mod tests {
right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
});
let iter = memtable
.iter(None, Some(Predicate::new(vec![expr])))
.iter(None, Some(Predicate::new(vec![expr])), None)
.unwrap();
let read = collect_iter_timestamps(iter);
assert_eq!(timestamps, read);
Expand Down Expand Up @@ -781,7 +788,7 @@ mod tests {
))
.unwrap();

let mut reader = new_memtable.iter(None, None).unwrap();
let mut reader = new_memtable.iter(None, None, None).unwrap();
let batch = reader.next().unwrap().unwrap();
let pk = codec.decode(batch.primary_key()).unwrap();
if let Value::String(s) = &pk[2] {
Expand Down
10 changes: 9 additions & 1 deletion src/mito2/src/memtable/partition_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use memcomparable::Serializer;
use serde::Serialize;
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;

use crate::error::{PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu};
Expand Down Expand Up @@ -202,6 +202,7 @@ impl PartitionTree {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
) -> Result<BoxedBatchIterator> {
let start = Instant::now();
// Creates the projection set.
Expand All @@ -225,6 +226,7 @@ impl PartitionTree {
let partitions = self.prune_partitions(&filters, &mut tree_iter_metric);

let mut iter = TreeIter {
sequence,
partitions,
current_reader: None,
metrics: tree_iter_metric,
Expand Down Expand Up @@ -451,6 +453,8 @@ struct TreeIterMetrics {
}

struct TreeIter {
/// Optional Sequence number of the current reader which limit results batch to lower than this sequence number.
sequence: Option<SequenceNumber>,
partitions: VecDeque<PartitionRef>,
current_reader: Option<PartitionReader>,
metrics: TreeIterMetrics,
Expand Down Expand Up @@ -519,6 +523,8 @@ impl TreeIter {
if part_reader.is_valid() {
self.metrics.rows_fetched += batch.num_rows();
self.metrics.batches_fetched += 1;
let mut batch = batch;
batch.filter_by_sequence(self.sequence)?;
return Ok(Some(batch));
}

Expand All @@ -529,6 +535,8 @@ impl TreeIter {

self.metrics.rows_fetched += batch.num_rows();
self.metrics.batches_fetched += 1;
let mut batch = batch;
batch.filter_by_sequence(self.sequence)?;
Ok(Some(batch))
}
}
Loading
Loading