Skip to content

Commit

Permalink
Set bloom filter on byte array (#3284)
Browse files Browse the repository at this point in the history
* Set bloom filter on byte array

* Check positive values

* For review

* Clippy

Co-authored-by: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
  • Loading branch information
viirya and tustvold authored Dec 8, 2022
1 parent 7d21397 commit fa1f611
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 8 deletions.
18 changes: 16 additions & 2 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ struct ByteArrayEncoder {
dict_encoder: Option<DictEncoder>,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
bloom_filter: Option<Sbbf>,
}

impl ColumnValueEncoder for ByteArrayEncoder {
Expand All @@ -453,8 +454,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
}

fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
// TODO FIX ME need to handle bloom filter in arrow writer
None
self.bloom_filter.take()
}

fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
Expand All @@ -467,11 +467,17 @@ impl ColumnValueEncoder for ByteArrayEncoder {

let fallback = FallbackEncoder::new(descr, props)?;

let bloom_filter = props
.bloom_filter_properties(descr.path())
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;

Ok(Self {
fallback,
dict_encoder: dictionary,
min_value: None,
max_value: None,
bloom_filter,
})
}

Expand Down Expand Up @@ -555,6 +561,14 @@ where
}
}

// encode the values into bloom filter if enabled
if let Some(bloom_filter) = &mut encoder.bloom_filter {
let valid = indices.iter().cloned();
for idx in valid {
bloom_filter.insert(values.value(idx).as_ref());
}
}

match &mut encoder.dict_encoder {
Some(dict_encoder) => dict_encoder.encode(values, indices),
None => encoder.fallback.encode(values, indices),
Expand Down
119 changes: 114 additions & 5 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,8 @@ mod tests {
use crate::basic::Encoding;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::properties::WriterVersion;
use crate::file::properties::{ReaderProperties, WriterVersion};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
reader::{FileReader, SerializedFileReader},
statistics::Statistics,
Expand Down Expand Up @@ -1269,6 +1270,7 @@ mod tests {
.set_dictionary_enabled(dictionary_size != 0)
.set_dictionary_pagesize_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(true)
.build();

files.push(roundtrip_opts(&expected_batch, props))
Expand All @@ -1279,17 +1281,17 @@ mod tests {
files
}

fn values_required<A, I>(iter: I)
fn values_required<A, I>(iter: I) -> Vec<File>
where
A: From<Vec<I::Item>> + Array + 'static,
I: IntoIterator,
{
let raw_values: Vec<_> = iter.into_iter().collect();
let values = Arc::new(A::from(raw_values));
one_column_roundtrip(values, false);
one_column_roundtrip(values, false)
}

fn values_optional<A, I>(iter: I)
fn values_optional<A, I>(iter: I) -> Vec<File>
where
A: From<Vec<Option<I::Item>>> + Array + 'static,
I: IntoIterator,
Expand All @@ -1300,7 +1302,7 @@ mod tests {
.map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
.collect();
let optional_values = Arc::new(A::from(optional_raw_values));
one_column_roundtrip(optional_values, true);
one_column_roundtrip(optional_values, true)
}

fn required_and_optional<A, I>(iter: I)
Expand All @@ -1312,6 +1314,70 @@ mod tests {
values_optional::<A, I>(iter);
}

fn check_bloom_filter<T: AsBytes>(
files: Vec<File>,
file_column: String,
positive_values: Vec<T>,
negative_values: Vec<T>,
) {
files.into_iter().take(1).for_each(|file| {
let file_reader = SerializedFileReader::new_with_options(
file,
ReadOptionsBuilder::new()
.with_reader_properties(
ReaderProperties::builder()
.set_read_bloom_filter(true)
.build(),
)
.build(),
)
.expect("Unable to open file as Parquet");
let metadata = file_reader.metadata();

// Gets bloom filters from all row groups.
let mut bloom_filters: Vec<_> = vec![];
for (ri, row_group) in metadata.row_groups().iter().enumerate() {
if let Some((column_index, _)) = row_group
.columns()
.iter()
.enumerate()
.find(|(_, column)| column.column_path().string() == file_column)
{
let row_group_reader = file_reader
.get_row_group(ri)
.expect("Unable to read row group");
if let Some(sbbf) =
row_group_reader.get_column_bloom_filter(column_index)
{
bloom_filters.push(sbbf.clone());
} else {
panic!("No bloom filter for column named {} found", file_column);
}
} else {
panic!("No column named {} found", file_column);
}
}

positive_values.iter().for_each(|value| {
let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
assert!(
found.is_some(),
"{}",
format!("Value {:?} should be in bloom filter", value.as_bytes())
);
});

negative_values.iter().for_each(|value| {
let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
assert!(
found.is_none(),
"{}",
format!("Value {:?} should not be in bloom filter", value.as_bytes())
);
});
});
}

#[test]
fn all_null_primitive_single_column() {
let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
Expand Down Expand Up @@ -1528,6 +1594,49 @@ mod tests {
values_required::<BinaryArray, _>(many_vecs_iter);
}

#[test]
fn i32_column_bloom_filter() {
let positive_values: Vec<i32> = (0..SMALL_SIZE as i32).collect();
let files = values_required::<Int32Array, _>(positive_values);
check_bloom_filter(
files,
"col".to_string(),
(0..SMALL_SIZE as i32).collect(),
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
);
}

#[test]
fn binary_column_bloom_filter() {
let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());

let files = values_required::<BinaryArray, _>(many_vecs_iter);
check_bloom_filter(
files,
"col".to_string(),
many_vecs,
vec![vec![(SMALL_SIZE + 1) as u8]],
);
}

#[test]
fn empty_string_null_column_bloom_filter() {
let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
let raw_strs = raw_values.iter().map(|s| s.as_str());

let files = values_optional::<StringArray, _>(raw_strs);

let optional_raw_values: Vec<_> = raw_values
.iter()
.enumerate()
.filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
.collect();
// For null slots, empty string should not be in bloom filter.
check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
}

#[test]
fn large_binary_single_column() {
let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl Sbbf {
}

/// Insert an [AsBytes] value into the filter
pub fn insert<T: AsBytes>(&mut self, value: &T) {
pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
self.insert_hash(hash_as_bytes(value));
}

Expand Down

0 comments on commit fa1f611

Please sign in to comment.