Skip to content

Commit

Permalink
perf: Reduce decode overhead during pruning keys in the memtable (#3415)
Browse files Browse the repository at this point in the history
* feat: reuse value buf

* feat: skip values to decode

* feat: prune shard

chore: fix compiler errors

refactor: shard prune metrics

* fix: panic on DedupReader::try_new

* fix: prune after next

* chore: num parts metrics

* feat: metrics and logs

* chore: data build cost

* chore: more logs

* feat: cache skip result

* chore: todo

* fix: index out of bound

* test: test codec

* fix: invalid offsets

* fix: skip binary

* fix: offset buffer reuse

* chore: comment

* test: test memtable filter

* style: fix clippy

* chore: fix compiler error
  • Loading branch information
evenyag authored Mar 8, 2024
1 parent 352bd7b commit 3ee5336
Show file tree
Hide file tree
Showing 7 changed files with 494 additions and 138 deletions.
53 changes: 53 additions & 0 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ mod tests {
use std::collections::BTreeSet;

use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};

Expand Down Expand Up @@ -528,4 +530,55 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(expect, read);
}

#[test]
fn test_memtable_filter() {
let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
// Try to build a memtable via the builder.
let memtable = MergeTreeMemtableBuilder::new(
MergeTreeConfig {
index_max_keys_per_shard: 40,
..Default::default()
},
None,
)
.build(1, &metadata);

for i in 0..100 {
let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), i, &timestamps, 1);
memtable.write(&kvs).unwrap();
}

for i in 0..100 {
let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "k1".to_string(),
})),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
});
let iter = memtable
.iter(None, Some(Predicate::new(vec![expr.into()])))
.unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
assert_eq!(timestamps, read);
}
}
}
7 changes: 7 additions & 0 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -994,9 +994,11 @@ impl DataPartsReaderBuilder {
for p in self.parts {
nodes.push(DataNode::new(DataSource::Part(p)));
}
let num_parts = nodes.len();
let merger = Merger::try_new(nodes)?;
Ok(DataPartsReader {
merger,
num_parts,
elapsed: Default::default(),
})
}
Expand All @@ -1005,6 +1007,7 @@ impl DataPartsReaderBuilder {
/// Reader for all parts inside a `DataParts`.
pub struct DataPartsReader {
merger: Merger<DataNode>,
num_parts: usize,
elapsed: Duration,
}

Expand Down Expand Up @@ -1032,6 +1035,10 @@ impl DataPartsReader {
pub(crate) fn is_valid(&self) -> bool {
self.merger.is_valid()
}

pub(crate) fn num_parts(&self) -> usize {
self.num_parts
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/memtable/merge_tree/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl<T: DataBatchSource> DataBatchSource for DedupReader<T> {
}

fn next(&mut self) -> Result<()> {
loop {
while self.inner.is_valid() {
match &mut self.prev_batch_last_row {
None => {
// First shot, fill prev_batch_last_row and current_batch_range with first batch.
Expand Down
Loading

0 comments on commit 3ee5336

Please sign in to comment.