diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index e440eb1765f7..4b924ec3006c 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -411,6 +411,7 @@ impl MetadataRegion { output_ordering: None, limit: None, series_row_selector: None, + sequence: None, }; let record_batch_stream = self .mito @@ -469,6 +470,7 @@ impl MetadataRegion { output_ordering: None, limit: None, series_row_selector: None, + sequence: None, } } @@ -630,6 +632,7 @@ mod test { output_ordering: None, limit: None, series_row_selector: None, + sequence: None, }; let actual_scan_request = MetadataRegion::build_read_request(key); assert_eq!(actual_scan_request, expected_scan_request); diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index 74ff58a8ec1f..b0c6a550b2cf 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -85,7 +85,7 @@ fn full_scan(c: &mut Criterion) { } b.iter(|| { - let iter = memtable.iter(None, None).unwrap(); + let iter = memtable.iter(None, None, None).unwrap(); for batch in iter { let _batch = batch.unwrap(); } @@ -98,7 +98,7 @@ fn full_scan(c: &mut Criterion) { } b.iter(|| { - let iter = memtable.iter(None, None).unwrap(); + let iter = memtable.iter(None, None, None).unwrap(); for batch in iter { let _batch = batch.unwrap(); } @@ -124,7 +124,7 @@ fn filter_1_host(c: &mut Criterion) { let predicate = generator.random_host_filter(); b.iter(|| { - let iter = memtable.iter(None, Some(predicate.clone())).unwrap(); + let iter = memtable.iter(None, Some(predicate.clone()), None).unwrap(); for batch in iter { let _batch = batch.unwrap(); } @@ -138,7 +138,7 @@ fn filter_1_host(c: &mut Criterion) { let predicate = generator.random_host_filter(); b.iter(|| { - let iter = memtable.iter(None, Some(predicate.clone())).unwrap(); + let iter = memtable.iter(None, Some(predicate.clone()), None).unwrap(); for batch in iter { let _batch = batch.unwrap(); } diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index 37a458082086..b3c4fc83e13b 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -79,6 +79,7 @@ async fn test_scan_projection() { output_ordering: None, limit: None, series_row_selector: None, + sequence: None, }; let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index a0400deb5bc0..daf11ab86f0d 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -348,7 +348,7 @@ impl RegionFlushTask { let max_sequence = mem.stats().max_sequence(); let file_id = FileId::random(); - let iter = mem.iter(None, None)?; + let iter = mem.iter(None, None, None)?; let source = Source::Iter(iter); // Flush to level 0. diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 7d00b6bde8ec..7c6e51509b1f 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -147,6 +147,7 @@ pub trait Memtable: Send + Sync + fmt::Debug { &self, projection: Option<&[ColumnId]>, predicate: Option, + sequence: Option, ) -> Result; /// Returns the ranges in the memtable. @@ -155,6 +156,7 @@ pub trait Memtable: Send + Sync + fmt::Debug { &self, projection: Option<&[ColumnId]>, predicate: Option, + sequence: Option, ) -> MemtableRanges; /// Returns true if the memtable is empty. diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 8bbbda8ca367..2060a81cdc12 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -17,7 +17,7 @@ use std::sync::{Arc, RwLock}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::error::Result; @@ -63,6 +63,7 @@ impl Memtable for BulkMemtable { &self, _projection: Option<&[ColumnId]>, _predicate: Option, + _sequence: Option, ) -> Result { todo!() } @@ -71,6 +72,7 @@ impl Memtable for BulkMemtable { &self, _projection: Option<&[ColumnId]>, _predicate: Option, + _sequence: Option, ) -> MemtableRanges { todo!() } diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 2de5f841af1f..6c132ce64458 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -39,7 +39,7 @@ use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::error; @@ -68,7 +68,11 @@ impl BulkPart { &self.metadata } - pub(crate) fn read(&self, context: BulkIterContextRef) -> Result> { + pub(crate) fn read( + &self, + context: BulkIterContextRef, + sequence: Option, + ) -> Result> { // use predicate to find row groups to read. let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata); @@ -82,6 +86,7 @@ impl BulkPart { row_groups_to_read, self.metadata.parquet_metadata.clone(), self.data.clone(), + sequence, )?; Ok(Some(Box::new(iter) as BoxedBatchIterator)) } @@ -786,11 +791,14 @@ mod tests { let projection = &[4u32]; let mut reader = part - .read(Arc::new(BulkIterContext::new( - part.metadata.region_metadata.clone(), - &Some(projection.as_slice()), + .read( + Arc::new(BulkIterContext::new( + part.metadata.region_metadata.clone(), + &Some(projection.as_slice()), + None, + )), None, - ))) + ) .unwrap() .expect("expect at least one row group"); @@ -837,7 +845,7 @@ mod tests { predicate, )); let mut reader = part - .read(context) + .read(context, None) .unwrap() .expect("expect at least one row group"); let mut total_rows_read = 0; @@ -866,7 +874,7 @@ mod tests { datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)), )])), )); - assert!(part.read(context).unwrap().is_none()); + assert!(part.read(context, None).unwrap().is_none()); check_prune_row_group(&part, None, 310); diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index fdf3f81f5e11..9bd4c87ab880 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use bytes::Bytes; use parquet::arrow::ProjectionMask; use parquet::file::metadata::ParquetMetaData; +use store_api::storage::SequenceNumber; use crate::error; use crate::memtable::bulk::context::BulkIterContextRef; @@ -31,6 +32,7 @@ pub struct BulkPartIter { row_groups_to_read: VecDeque, current_reader: Option, builder: MemtableRowGroupReaderBuilder, + sequence: Option, } impl BulkPartIter { @@ -40,6 +42,7 @@ impl BulkPartIter { mut row_groups_to_read: VecDeque, parquet_meta: Arc, data: Bytes, + sequence: Option, ) -> error::Result { let projection_mask = ProjectionMask::roots( parquet_meta.file_metadata().schema_descr(), @@ -62,6 +65,7 @@ impl BulkPartIter { row_groups_to_read, current_reader: init_reader, builder, + sequence, }) } @@ -71,14 +75,16 @@ impl BulkPartIter { return Ok(None); }; - if let Some(batch) = current.next_batch()? { + if let Some(mut batch) = current.next_batch()? { + batch.filter_by_sequence(self.sequence)?; return Ok(Some(batch)); } // Previous row group exhausted, read next row group while let Some(next_row_group) = self.row_groups_to_read.pop_front() { current.reset(self.builder.build_row_group_reader(next_row_group, None)?); - if let Some(next_batch) = current.next_batch()? { + if let Some(mut next_batch) = current.next_batch()? { + next_batch.filter_by_sequence(self.sequence)?; return Ok(Some(next_batch)); } } diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 458d6a6d69c5..f1142fd3008d 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -32,7 +32,7 @@ pub(crate) use partition::DensePrimaryKeyFilter; use serde::{Deserialize, Serialize}; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::error::{Result, UnsupportedOperationSnafu}; @@ -187,20 +187,23 @@ impl Memtable for PartitionTreeMemtable { &self, projection: Option<&[ColumnId]>, predicate: Option, + sequence: Option, ) -> Result { - self.tree.read(projection, predicate) + self.tree.read(projection, predicate, sequence) } fn ranges( &self, projection: Option<&[ColumnId]>, predicate: Option, + sequence: Option, ) -> MemtableRanges { let projection = projection.map(|ids| ids.to_vec()); let builder = Box::new(PartitionTreeIterBuilder { tree: self.tree.clone(), projection, predicate, + sequence, }); let context = Arc::new(MemtableRangeContext::new(self.id, builder)); @@ -347,12 +350,16 @@ struct PartitionTreeIterBuilder { tree: Arc, projection: Option>, predicate: Option, + sequence: Option, } impl IterBuilder for PartitionTreeIterBuilder { fn build(&self) -> Result { - self.tree - .read(self.projection.as_deref(), self.predicate.clone()) + self.tree.read( + self.projection.as_deref(), + self.predicate.clone(), + self.sequence, + ) } } @@ -407,7 +414,7 @@ mod tests { .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value()) .collect::>(); - let iter = memtable.iter(None, None).unwrap(); + let iter = memtable.iter(None, None, None).unwrap(); let read = collect_iter_timestamps(iter); assert_eq!(expected_ts, read); @@ -461,11 +468,11 @@ mod tests { ); memtable.write(&kvs).unwrap(); - let iter = memtable.iter(None, None).unwrap(); + let iter = memtable.iter(None, None, None).unwrap(); let read = collect_iter_timestamps(iter); assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read); - let iter = memtable.iter(None, None).unwrap(); + let iter = memtable.iter(None, None, None).unwrap(); let read = iter .flat_map(|batch| { batch @@ -506,7 +513,7 @@ mod tests { let expect = (0..100).collect::>(); let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1); memtable.write(&kvs).unwrap(); - let iter = memtable.iter(Some(&[3]), None).unwrap(); + let iter = memtable.iter(Some(&[3]), None, None).unwrap(); let mut v0_all = vec![]; for res in iter { @@ -578,7 +585,7 @@ mod tests { data.sort_unstable(); let expect = data.into_iter().map(|x| x.2).collect::>(); - let iter = memtable.iter(None, None).unwrap(); + let iter = memtable.iter(None, None, None).unwrap(); let read = collect_iter_timestamps(iter); assert_eq!(expect, read); } @@ -614,7 +621,7 @@ mod tests { right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))), }); let iter = memtable - .iter(None, Some(Predicate::new(vec![expr]))) + .iter(None, Some(Predicate::new(vec![expr])), None) .unwrap(); let read = collect_iter_timestamps(iter); assert_eq!(timestamps, read); @@ -781,7 +788,7 @@ mod tests { )) .unwrap(); - let mut reader = new_memtable.iter(None, None).unwrap(); + let mut reader = new_memtable.iter(None, None, None).unwrap(); let batch = reader.next().unwrap().unwrap(); let pk = codec.decode(batch.primary_key()).unwrap(); if let Value::String(s) = &pk[2] { diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index 81e281080415..d02b13ddb47a 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -27,7 +27,7 @@ use memcomparable::Serializer; use serde::Serialize; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::error::{PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu}; @@ -202,6 +202,7 @@ impl PartitionTree { &self, projection: Option<&[ColumnId]>, predicate: Option, + sequence: Option, ) -> Result { let start = Instant::now(); // Creates the projection set. @@ -225,6 +226,7 @@ impl PartitionTree { let partitions = self.prune_partitions(&filters, &mut tree_iter_metric); let mut iter = TreeIter { + sequence, partitions, current_reader: None, metrics: tree_iter_metric, @@ -451,6 +453,8 @@ struct TreeIterMetrics { } struct TreeIter { + /// Optional Sequence number of the current reader which limit results batch to lower than this sequence number. + sequence: Option, partitions: VecDeque, current_reader: Option, metrics: TreeIterMetrics, @@ -519,6 +523,8 @@ impl TreeIter { if part_reader.is_valid() { self.metrics.rows_fetched += batch.num_rows(); self.metrics.batches_fetched += 1; + let mut batch = batch; + batch.filter_by_sequence(self.sequence)?; return Ok(Some(batch)); } @@ -529,6 +535,8 @@ impl TreeIter { self.metrics.rows_fetched += batch.num_rows(); self.metrics.batches_fetched += 1; + let mut batch = batch; + batch.filter_by_sequence(self.sequence)?; Ok(Some(batch)) } } diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 052fdca9bcf8..4a49d9c031b8 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -482,7 +482,7 @@ mod tests { partitions.list_memtables(&mut memtables); assert_eq!(0, memtables[0].id()); - let iter = memtables[0].iter(None, None).unwrap(); + let iter = memtables[0].iter(None, None, None).unwrap(); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]); } @@ -520,7 +520,7 @@ mod tests { let mut memtables = Vec::new(); partitions.list_memtables(&mut memtables); - let iter = memtables[0].iter(None, None).unwrap(); + let iter = memtables[0].iter(None, None, None).unwrap(); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]); let parts = partitions.list_partitions(); @@ -572,7 +572,7 @@ mod tests { let partitions = new_multi_partitions(&metadata); let parts = partitions.list_partitions(); - let iter = parts[0].memtable.iter(None, None).unwrap(); + let iter = parts[0].memtable.iter(None, None, None).unwrap(); let timestamps = collect_iter_timestamps(iter); assert_eq!(0, parts[0].memtable.id()); assert_eq!( @@ -584,7 +584,7 @@ mod tests { parts[0].time_range.unwrap().max_timestamp ); assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]); - let iter = parts[1].memtable.iter(None, None).unwrap(); + let iter = parts[1].memtable.iter(None, None, None).unwrap(); assert_eq!(1, parts[1].memtable.id()); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[5000, 7000], ×tamps[..]); diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 8331a2f58220..d9bc44815f89 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -33,7 +33,7 @@ use datatypes::vectors::{ }; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::error::{ @@ -236,6 +236,7 @@ impl Memtable for TimeSeriesMemtable { &self, projection: Option<&[ColumnId]>, filters: Option, + sequence: Option, ) -> Result { let projection = if let Some(projection) = projection { projection.iter().copied().collect() @@ -248,7 +249,7 @@ impl Memtable for TimeSeriesMemtable { let iter = self .series_set - .iter_series(projection, filters, self.dedup)?; + .iter_series(projection, filters, self.dedup, sequence)?; if self.merge_mode == MergeMode::LastNonNull { let iter = LastNonNullIter::new(iter); @@ -262,6 +263,7 @@ impl Memtable for TimeSeriesMemtable { &self, projection: Option<&[ColumnId]>, predicate: Option, + sequence: Option, ) -> MemtableRanges { let projection = if let Some(projection) = projection { projection.iter().copied().collect() @@ -277,6 +279,7 @@ impl Memtable for TimeSeriesMemtable { predicate, dedup: self.dedup, merge_mode: self.merge_mode, + sequence, }); let context = Arc::new(MemtableRangeContext::new(self.id, builder)); @@ -384,6 +387,7 @@ impl SeriesSet { projection: HashSet, predicate: Option, dedup: bool, + sequence: Option, ) -> Result { let primary_key_schema = primary_key_schema(&self.region_metadata); let primary_key_datatypes = self @@ -401,6 +405,7 @@ impl SeriesSet { primary_key_datatypes, self.codec.clone(), dedup, + sequence, ) } } @@ -448,6 +453,7 @@ struct Iter { pk_datatypes: Vec, codec: Arc, dedup: bool, + sequence: Option, metrics: Metrics, } @@ -462,6 +468,7 @@ impl Iter { pk_datatypes: Vec, codec: Arc, dedup: bool, + sequence: Option, ) -> Result { let predicate = predicate .map(|predicate| { @@ -482,6 +489,7 @@ impl Iter { pk_datatypes, codec, dedup, + sequence, metrics: Metrics::default(), }) } @@ -546,6 +554,12 @@ impl Iterator for Iter { self.metrics.num_batches += 1; self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); self.metrics.scan_cost += start.elapsed(); + + let mut batch = batch; + batch = batch.and_then(|mut batch| { + batch.filter_by_sequence(self.sequence)?; + Ok(batch) + }); return Some(batch); } self.metrics.scan_cost += start.elapsed(); @@ -855,6 +869,7 @@ struct TimeSeriesIterBuilder { projection: HashSet, predicate: Option, dedup: bool, + sequence: Option, merge_mode: MergeMode, } @@ -864,6 +879,7 @@ impl IterBuilder for TimeSeriesIterBuilder { self.projection.clone(), self.predicate.clone(), self.dedup, + self.sequence, )?; if self.merge_mode == MergeMode::LastNonNull { @@ -1253,7 +1269,7 @@ mod tests { *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 }; } - let iter = memtable.iter(None, None).unwrap(); + let iter = memtable.iter(None, None, None).unwrap(); let mut read = HashMap::new(); for ts in iter @@ -1293,7 +1309,7 @@ mod tests { let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow); memtable.write(&kvs).unwrap(); - let iter = memtable.iter(Some(&[3]), None).unwrap(); + let iter = memtable.iter(Some(&[3]), None, None).unwrap(); let mut v0_all = vec![]; diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index c4de103f1000..6001d3062491 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -35,7 +35,7 @@ use async_trait::async_trait; use common_time::Timestamp; use datafusion_common::arrow::array::UInt8Array; use datatypes::arrow; -use datatypes::arrow::array::{Array, ArrayRef}; +use datatypes::arrow::array::{Array, ArrayRef, UInt64Array}; use datatypes::arrow::compute::SortOptions; use datatypes::arrow::row::{RowConverter, SortField}; use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector}; @@ -334,6 +334,24 @@ impl Batch { Ok(()) } + /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`. + pub fn filter_by_sequence(&mut self, sequence: Option) -> Result<()> { + let seq = match (sequence, self.last_sequence()) { + (None, _) | (_, None) => return Ok(()), + (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()), + (Some(sequence), Some(_)) => sequence, + }; + + let seqs = self.sequences.as_arrow(); + let sequence = UInt64Array::new_scalar(seq); + let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence) + .context(ComputeArrowSnafu)?; + let predicate = BooleanVector::from(predicate); + self.filter(&predicate)?; + + Ok(()) + } + /// Sorts rows in the batch. If `dedup` is true, it also removes /// duplicated rows according to primary keys. /// @@ -1212,6 +1230,57 @@ mod tests { assert_eq!(expect, batch); } + #[test] + fn test_filter_by_sequence() { + // Filters put only. + let mut batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], + &[21, 22, 23, 24], + ); + batch.filter_by_sequence(Some(13)).unwrap(); + let expect = new_batch( + &[1, 2, 3], + &[11, 12, 13], + &[OpType::Put, OpType::Put, OpType::Put], + &[21, 22, 23], + ); + assert_eq!(expect, batch); + + // Filters to empty. + let mut batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put], + &[21, 22, 23, 24], + ); + + batch.filter_by_sequence(Some(10)).unwrap(); + assert!(batch.is_empty()); + + // None filter. + let mut batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put], + &[21, 22, 23, 24], + ); + let expect = batch.clone(); + batch.filter_by_sequence(None).unwrap(); + assert_eq!(expect, batch); + + // Filter a empty batch + let mut batch = new_batch(&[], &[], &[], &[]); + batch.filter_by_sequence(Some(10)).unwrap(); + assert!(batch.is_empty()); + + // Filter a empty batch with None + let mut batch = new_batch(&[], &[], &[], &[]); + batch.filter_by_sequence(None).unwrap(); + assert!(batch.is_empty()); + } + #[test] fn test_filter() { // Filters put only. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index ad5d2e4a15cf..fd660128a678 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -300,6 +300,9 @@ impl ScanRegion { if file_in_range(file, &time_range) { files.push(file.clone()); } + // There is no need to check and prune for file's sequence here as the sequence number is usually very new, + // unless the timing is too good, or the sequence number wouldn't be in file. + // and the batch will be filtered out by tree reader anyway. } } @@ -347,7 +350,11 @@ impl ScanRegion { let memtables = memtables .into_iter() .map(|mem| { - let ranges = mem.ranges(Some(mapper.column_ids()), Some(predicate.clone())); + let ranges = mem.ranges( + Some(mapper.column_ids()), + Some(predicate.clone()), + self.request.sequence, + ); MemRangeBuilder::new(ranges) }) .collect(); diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 373218e91f4c..01d13d2a53cc 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -84,6 +84,7 @@ impl Memtable for EmptyMemtable { &self, _projection: Option<&[ColumnId]>, _filters: Option, + _sequence: Option, ) -> Result { Ok(Box::new(std::iter::empty())) } @@ -92,6 +93,7 @@ impl Memtable for EmptyMemtable { &self, _projection: Option<&[ColumnId]>, _predicate: Option, + _sequence: Option, ) -> MemtableRanges { MemtableRanges::default() } diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 02bd745ca10e..dfdcd1037d3c 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -16,6 +16,8 @@ use common_recordbatch::OrderOption; use datafusion_expr::expr::Expr; use strum::Display; +use crate::storage::SequenceNumber; + /// A hint on how to select rows from a time-series. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)] pub enum TimeSeriesRowSelector { @@ -39,4 +41,8 @@ pub struct ScanRequest { pub limit: Option, /// Optional hint to select rows from time-series. pub series_row_selector: Option, + /// Optional constraint on the sequence number of the rows to read. + /// If set, only rows with a sequence number lesser or equal to this value + /// will be returned. + pub sequence: Option, }