diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 343ed8e2884a..ac724bae2171 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -743,6 +743,12 @@ impl DataPart { DataPart::Parquet(data_bytes) => DataPartReader::new(data_bytes.data.clone(), None), } } + + fn is_empty(&self) -> bool { + match self { + DataPart::Parquet(p) => p.data.is_empty(), + } + } } pub struct DataPartReader { @@ -879,6 +885,10 @@ impl DataParts { let merger = Merger::try_new(nodes)?; Ok(DataPartsReader { merger }) } + + pub(crate) fn is_empty(&self) -> bool { + self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty()) + } } /// Reader for all parts inside a `DataParts`. diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs index 989a59d007ac..b3adc445e0c2 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -122,6 +122,7 @@ impl KeyDictBuilder { // Overwrites the pk index. *pk_index = i as PkIndex; } + self.num_keys = 0; Some(KeyDict { pk_to_index, @@ -407,4 +408,20 @@ mod tests { assert_eq!(5130, metrics.key_bytes); assert_eq!(8850, builder.memory_size()); } + + #[test] + fn test_builder_finish() { + let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 2).into()); + let mut metrics = WriteMetrics::default(); + for i in 0..MAX_KEYS_PER_BLOCK * 2 { + let key = format!("{i:010}"); + assert!(!builder.is_full()); + builder.insert_key(key.as_bytes(), &mut metrics); + } + assert!(builder.is_full()); + builder.finish(); + + assert!(!builder.is_full()); + assert_eq!(0, builder.insert_key(b"a0", &mut metrics)); + } } diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index d58c26682654..926fb360ec3c 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -108,11 +108,15 @@ impl Partition { let nodes = { let inner = self.inner.read().unwrap(); let mut nodes = Vec::with_capacity(inner.shards.len() + 1); - let bulder_reader = inner.shard_builder.read(&mut context.pk_weights)?; - nodes.push(ShardNode::new(ShardSource::Builder(bulder_reader))); + if !inner.shard_builder.is_empty() { + let bulder_reader = inner.shard_builder.read(&mut context.pk_weights)?; + nodes.push(ShardNode::new(ShardSource::Builder(bulder_reader))); + } for shard in &inner.shards { - let shard_reader = shard.read()?; - nodes.push(ShardNode::new(ShardSource::Shard(shard_reader))); + if !shard.is_empty() { + let shard_reader = shard.read()?; + nodes.push(ShardNode::new(ShardSource::Shard(shard_reader))); + } } nodes }; @@ -211,11 +215,7 @@ impl Partition { /// /// It can merge rows from multiple shards. pub struct PartitionReader { - metadata: RegionMetadataRef, - row_codec: Arc, - projection: HashSet, - filters: Vec, - pk_weights: Vec, + context: ReadPartitionContext, source: BoxedDataBatchSource, last_yield_pk_id: Option, } @@ -223,11 +223,7 @@ pub struct PartitionReader { impl PartitionReader { fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result { let mut reader = Self { - metadata: context.metadata, - row_codec: context.row_codec, - projection: context.projection, - filters: context.filters, - pk_weights: context.pk_weights, + context, source, last_yield_pk_id: None, }; @@ -259,25 +255,19 @@ impl PartitionReader { pub fn convert_current_batch(&self) -> Result { let data_batch = self.source.current_data_batch(); data_batch_to_batch( - &self.metadata, - &self.projection, + &self.context.metadata, + &self.context.projection, self.source.current_key(), data_batch, ) } pub(crate) fn into_context(self) -> ReadPartitionContext { - ReadPartitionContext { - metadata: self.metadata, - row_codec: self.row_codec, - projection: self.projection, - filters: self.filters, - pk_weights: self.pk_weights, - } + self.context } fn prune_batch_by_key(&mut self) -> Result<()> { - if self.metadata.primary_key.is_empty() { + if self.context.metadata.primary_key.is_empty() || !self.context.need_prune_key { // Nothing to prune. return Ok(()); } @@ -293,7 +283,12 @@ impl PartitionReader { } let key = self.source.current_key().unwrap(); // Prune batch by primary key. - if prune_primary_key(&self.metadata, &self.filters, &self.row_codec, key) { + if prune_primary_key( + &self.context.metadata, + &self.context.filters, + &self.context.row_codec, + key, + ) { // We need this key. self.last_yield_pk_id = Some(pk_id); break; @@ -334,6 +329,9 @@ fn prune_primary_key( // evaluate filters against primary key values let mut result = true; for filter in filters { + if Partition::is_partition_column(filter.column_name()) { + continue; + } let Some(column) = metadata.column_by_name(filter.column_name()) else { continue; }; @@ -362,6 +360,7 @@ pub(crate) struct ReadPartitionContext { filters: Vec, /// Buffer to store pk weights. pk_weights: Vec, + need_prune_key: bool, } impl ReadPartitionContext { @@ -371,14 +370,37 @@ impl ReadPartitionContext { projection: HashSet, filters: Vec, ) -> ReadPartitionContext { + let need_prune_key = Self::need_prune_key(&metadata, &filters); ReadPartitionContext { metadata, row_codec, projection, filters, pk_weights: Vec::new(), + need_prune_key, } } + + /// Does filters contains predicate on primary key columns after pruning the + /// partition column. + fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool { + for filter in filters { + // We already pruned partitions before so we skip the partition column. + if Partition::is_partition_column(filter.column_name()) { + continue; + } + let Some(column) = metadata.column_by_name(filter.column_name()) else { + continue; + }; + if column.semantic_type != SemanticType::Tag { + continue; + } + + return true; + } + + false + } } // TODO(yingwen): Pushdown projection to shard readers. diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index e0dcad9989f7..90b4ce02c483 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -99,6 +99,11 @@ impl Shard { dedup: self.dedup, } } + + /// Returns true if the shard is empty (No data). + pub fn is_empty(&self) -> bool { + self.data_parts.is_empty() + } } /// Source that returns [DataBatch]. @@ -399,6 +404,7 @@ mod tests { let metadata = metadata_for_test(); let input = input_with_key(&metadata); let mut shard = new_shard_with_dict(8, metadata, &input); + assert!(shard.is_empty()); for key_values in &input { for kv in key_values.iter() { let key = encode_key_by_kv(&kv); @@ -406,5 +412,6 @@ mod tests { shard.write_with_pk_id(pk_id, kv); } } + assert!(!shard.is_empty()); } } diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 7e19a6945432..67fd604bc8e5 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -245,27 +245,33 @@ impl MergeTree { fn prune_partitions(&self, filters: &[SimpleFilterEvaluator]) -> VecDeque { let partitions = self.partitions.read().unwrap(); - if self.is_partitioned { - // Prune partition keys. + if !self.is_partitioned { + return partitions.values().cloned().collect(); + } + + let mut pruned = VecDeque::new(); + // Prune partition keys. + for (key, partition) in partitions.iter() { + let mut is_needed = true; for filter in filters { - // Only the first filter takes effect. - if Partition::is_partition_column(filter.column_name()) { - let mut pruned = VecDeque::new(); - for (key, partition) in partitions.iter() { - if filter - .evaluate_scalar(&ScalarValue::UInt32(Some(*key))) - .unwrap_or(true) - { - pruned.push_back(partition.clone()); - } - } - - return pruned; + if !Partition::is_partition_column(filter.column_name()) { + continue; + } + + if !filter + .evaluate_scalar(&ScalarValue::UInt32(Some(*key))) + .unwrap_or(true) + { + is_needed = false; } } + + if is_needed { + pruned.push_back(partition.clone()); + } } - partitions.values().cloned().collect() + pruned } }