Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: resets dict builder keys counter and avoid unnecessary pruning #3386

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,12 @@ impl DataPart {
DataPart::Parquet(data_bytes) => DataPartReader::new(data_bytes.data.clone(), None),
}
}

fn is_empty(&self) -> bool {
match self {
DataPart::Parquet(p) => p.data.is_empty(),
}
}
}

pub struct DataPartReader {
Expand Down Expand Up @@ -879,6 +885,10 @@ impl DataParts {
let merger = Merger::try_new(nodes)?;
Ok(DataPartsReader { merger })
}

pub(crate) fn is_empty(&self) -> bool {
self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
}
}

/// Reader for all parts inside a `DataParts`.
Expand Down
17 changes: 17 additions & 0 deletions src/mito2/src/memtable/merge_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl KeyDictBuilder {
// Overwrites the pk index.
*pk_index = i as PkIndex;
}
self.num_keys = 0;

Some(KeyDict {
pk_to_index,
Expand Down Expand Up @@ -407,4 +408,20 @@ mod tests {
assert_eq!(5130, metrics.key_bytes);
assert_eq!(8850, builder.memory_size());
}

#[test]
fn test_builder_finish() {
let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 2).into());
let mut metrics = WriteMetrics::default();
for i in 0..MAX_KEYS_PER_BLOCK * 2 {
let key = format!("{i:010}");
assert!(!builder.is_full());
builder.insert_key(key.as_bytes(), &mut metrics);
}
assert!(builder.is_full());
builder.finish();

assert!(!builder.is_full());
assert_eq!(0, builder.insert_key(b"a0", &mut metrics));
}
}
72 changes: 47 additions & 25 deletions src/mito2/src/memtable/merge_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,15 @@ impl Partition {
let nodes = {
let inner = self.inner.read().unwrap();
let mut nodes = Vec::with_capacity(inner.shards.len() + 1);
let bulder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
nodes.push(ShardNode::new(ShardSource::Builder(bulder_reader)));
if !inner.shard_builder.is_empty() {
let bulder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
nodes.push(ShardNode::new(ShardSource::Builder(bulder_reader)));
}
for shard in &inner.shards {
let shard_reader = shard.read()?;
nodes.push(ShardNode::new(ShardSource::Shard(shard_reader)));
if !shard.is_empty() {
let shard_reader = shard.read()?;
nodes.push(ShardNode::new(ShardSource::Shard(shard_reader)));
}
}
nodes
};
Expand Down Expand Up @@ -211,23 +215,15 @@ impl Partition {
///
/// It can merge rows from multiple shards.
pub struct PartitionReader {
metadata: RegionMetadataRef,
row_codec: Arc<McmpRowCodec>,
projection: HashSet<ColumnId>,
filters: Vec<SimpleFilterEvaluator>,
pk_weights: Vec<u16>,
context: ReadPartitionContext,
source: BoxedDataBatchSource,
last_yield_pk_id: Option<PkId>,
}

impl PartitionReader {
fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result<Self> {
let mut reader = Self {
metadata: context.metadata,
row_codec: context.row_codec,
projection: context.projection,
filters: context.filters,
pk_weights: context.pk_weights,
context,
source,
last_yield_pk_id: None,
};
Expand Down Expand Up @@ -259,25 +255,19 @@ impl PartitionReader {
pub fn convert_current_batch(&self) -> Result<Batch> {
let data_batch = self.source.current_data_batch();
data_batch_to_batch(
&self.metadata,
&self.projection,
&self.context.metadata,
&self.context.projection,
self.source.current_key(),
data_batch,
)
}

pub(crate) fn into_context(self) -> ReadPartitionContext {
ReadPartitionContext {
metadata: self.metadata,
row_codec: self.row_codec,
projection: self.projection,
filters: self.filters,
pk_weights: self.pk_weights,
}
self.context
}

fn prune_batch_by_key(&mut self) -> Result<()> {
if self.metadata.primary_key.is_empty() {
if self.context.metadata.primary_key.is_empty() || !self.context.need_prune_key {
// Nothing to prune.
return Ok(());
}
Expand All @@ -293,7 +283,12 @@ impl PartitionReader {
}
let key = self.source.current_key().unwrap();
// Prune batch by primary key.
if prune_primary_key(&self.metadata, &self.filters, &self.row_codec, key) {
if prune_primary_key(
&self.context.metadata,
&self.context.filters,
&self.context.row_codec,
key,
) {
// We need this key.
self.last_yield_pk_id = Some(pk_id);
break;
Expand Down Expand Up @@ -334,6 +329,9 @@ fn prune_primary_key(
// evaluate filters against primary key values
let mut result = true;
for filter in filters {
if Partition::is_partition_column(filter.column_name()) {
continue;
}
let Some(column) = metadata.column_by_name(filter.column_name()) else {
continue;
};
Expand Down Expand Up @@ -362,6 +360,7 @@ pub(crate) struct ReadPartitionContext {
filters: Vec<SimpleFilterEvaluator>,
/// Buffer to store pk weights.
pk_weights: Vec<u16>,
need_prune_key: bool,
}

impl ReadPartitionContext {
Expand All @@ -371,14 +370,37 @@ impl ReadPartitionContext {
projection: HashSet<ColumnId>,
filters: Vec<SimpleFilterEvaluator>,
) -> ReadPartitionContext {
let need_prune_key = Self::need_prune_key(&metadata, &filters);
ReadPartitionContext {
metadata,
row_codec,
projection,
filters,
pk_weights: Vec::new(),
need_prune_key,
}
}

/// Does filters contains predicate on primary key columns after pruning the
/// partition column.
fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
for filter in filters {
// We already pruned partitions before so we skip the partition column.
if Partition::is_partition_column(filter.column_name()) {
continue;
}
let Some(column) = metadata.column_by_name(filter.column_name()) else {
continue;
};
if column.semantic_type != SemanticType::Tag {
continue;
}

return true;
}

false
}
}

// TODO(yingwen): Pushdown projection to shard readers.
Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/memtable/merge_tree/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl Shard {
dedup: self.dedup,
}
}

/// Returns true if the shard is empty (No data).
pub fn is_empty(&self) -> bool {
self.data_parts.is_empty()
}
}

/// Source that returns [DataBatch].
Expand Down Expand Up @@ -399,12 +404,14 @@ mod tests {
let metadata = metadata_for_test();
let input = input_with_key(&metadata);
let mut shard = new_shard_with_dict(8, metadata, &input);
assert!(shard.is_empty());
for key_values in &input {
for kv in key_values.iter() {
let key = encode_key_by_kv(&kv);
let pk_id = shard.find_id_by_key(&key).unwrap();
shard.write_with_pk_id(pk_id, kv);
}
}
assert!(!shard.is_empty());
}
}
38 changes: 22 additions & 16 deletions src/mito2/src/memtable/merge_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,27 +245,33 @@ impl MergeTree {

fn prune_partitions(&self, filters: &[SimpleFilterEvaluator]) -> VecDeque<PartitionRef> {
let partitions = self.partitions.read().unwrap();
if self.is_partitioned {
// Prune partition keys.
if !self.is_partitioned {
return partitions.values().cloned().collect();
}

let mut pruned = VecDeque::new();
// Prune partition keys.
for (key, partition) in partitions.iter() {
let mut is_needed = true;
for filter in filters {
// Only the first filter takes effect.
if Partition::is_partition_column(filter.column_name()) {
let mut pruned = VecDeque::new();
for (key, partition) in partitions.iter() {
if filter
.evaluate_scalar(&ScalarValue::UInt32(Some(*key)))
.unwrap_or(true)
{
pruned.push_back(partition.clone());
}
}

return pruned;
if !Partition::is_partition_column(filter.column_name()) {
continue;
}

if !filter
.evaluate_scalar(&ScalarValue::UInt32(Some(*key)))
.unwrap_or(true)
{
is_needed = false;
}
}

if is_needed {
pruned.push_back(partition.clone());
}
}

partitions.values().cloned().collect()
pruned
}
}

Expand Down
Loading