diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 7c3f7d9384ab..7077a1ae1815 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -1075,82 +1075,28 @@ mod tests { create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap() } - // Note the values in the `String` column are: - // ❯ select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet'; - // +-----------+ - // | String | - // +-----------+ - // | Hello | - // | This is | - // | a | - // | test | - // | How | - // | are you | - // | doing | - // | today | - // | the quick | - // | brown fox | - // | jumps | - // | over | - // | the lazy | - // | dog | - // +-----------+ #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() { - // load parquet file - let testdata = datafusion_common::test_util::parquet_test_data(); - let file_name = "data_index_bloom_encoding_stats.parquet"; - let path = format!("{testdata}/{file_name}"); - let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - - // generate pruning predicate `(String = "Hello_Not_exists")` - let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); - let expr = col(r#""String""#).eq(lit("Hello_Not_Exists")); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - - let row_groups = vec![0]; - let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( - file_name, - data, - &pruning_predicate, - &row_groups, - ) - .await - .unwrap(); - assert!(pruned_row_groups.is_empty()); + BloomFilterTest::new_data_index_bloom_encoding_stats() + .with_expect_all_pruned() + // generate pruning predicate `(String = "Hello_Not_exists")` + .run(col(r#""String""#).eq(lit("Hello_Not_Exists"))) + .await } #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_mutiple_expr() { - // load parquet file - let testdata = datafusion_common::test_util::parquet_test_data(); - let file_name = "data_index_bloom_encoding_stats.parquet"; - let path = format!("{testdata}/{file_name}"); - let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - - // generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")` - let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); - let expr = lit("1").eq(lit("1")).and( - col(r#""String""#) - .eq(lit("Hello_Not_Exists")) - .or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))), - ); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - - let row_groups = vec![0]; - let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( - file_name, - data, - &pruning_predicate, - &row_groups, - ) - .await - .unwrap(); - assert!(pruned_row_groups.is_empty()); + BloomFilterTest::new_data_index_bloom_encoding_stats() + .with_expect_all_pruned() + // generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")` + .run( + lit("1").eq(lit("1")).and( + col(r#""String""#) + .eq(lit("Hello_Not_Exists")) + .or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))), + ), + ) + .await } #[tokio::test] @@ -1186,144 +1132,161 @@ mod tests { #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_with_exists_value() { - // load parquet file - let testdata = datafusion_common::test_util::parquet_test_data(); - let file_name = "data_index_bloom_encoding_stats.parquet"; - let path = format!("{testdata}/{file_name}"); - let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - - // generate pruning predicate `(String = "Hello")` - let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); - let expr = col(r#""String""#).eq(lit("Hello")); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - - let row_groups = vec![0]; - let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( - file_name, - data, - &pruning_predicate, - &row_groups, - ) - .await - .unwrap(); - assert_eq!(pruned_row_groups, row_groups); + BloomFilterTest::new_data_index_bloom_encoding_stats() + .with_expect_none_pruned() + // generate pruning predicate `(String = "Hello")` + .run(col(r#""String""#).eq(lit("Hello"))) + .await } #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_with_exists_2_values() { - // load parquet file - let testdata = datafusion_common::test_util::parquet_test_data(); - let file_name = "data_index_bloom_encoding_stats.parquet"; - let path = format!("{testdata}/{file_name}"); - let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - - // generate pruning predicate `(String = "Hello") OR (String = "the quick")` - let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); - let expr = col(r#""String""#) - .eq(lit("Hello")) - .or(col(r#""String""#).eq(lit("the quick"))); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - - let row_groups = vec![0]; - let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( - file_name, - data, - &pruning_predicate, - &row_groups, - ) - .await - .unwrap(); - assert_eq!(pruned_row_groups, row_groups); + BloomFilterTest::new_data_index_bloom_encoding_stats() + .with_expect_none_pruned() + // generate pruning predicate `(String = "Hello") OR (String = "the quick")` + .run( + col(r#""String""#) + .eq(lit("Hello")) + .or(col(r#""String""#).eq(lit("the quick"))), + ) + .await } #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_with_exists_3_values() { - // load parquet file - let testdata = datafusion_common::test_util::parquet_test_data(); - let file_name = "data_index_bloom_encoding_stats.parquet"; - let path = format!("{testdata}/{file_name}"); - let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - - // generate pruning predicate `(String = "Hello") OR (String = "the quick") OR (String = "are you")` - let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); - let expr = col(r#""String""#) - .eq(lit("Hello")) - .or(col(r#""String""#).eq(lit("the quick"))) - .or(col(r#""String""#).eq(lit("are you"))); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - - let row_groups = vec![0]; - let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( - file_name, - data, - &pruning_predicate, - &row_groups, - ) - .await - .unwrap(); - assert_eq!(pruned_row_groups, row_groups); + BloomFilterTest::new_data_index_bloom_encoding_stats() + .with_expect_none_pruned() + // generate pruning predicate `(String = "Hello") OR (String = "the quick") OR (String = "are you")` + .run( + col(r#""String""#) + .eq(lit("Hello")) + .or(col(r#""String""#).eq(lit("the quick"))) + .or(col(r#""String""#).eq(lit("are you"))), + ) + .await } #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_with_or_not_eq() { - // load parquet file - let testdata = datafusion_common::test_util::parquet_test_data(); - let file_name = "data_index_bloom_encoding_stats.parquet"; - let path = format!("{testdata}/{file_name}"); - let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - - // generate pruning predicate `(String = "foo") OR (String != "bar")` - let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); - let expr = col(r#""String""#) - .not_eq(lit("foo")) - .or(col(r#""String""#).not_eq(lit("bar"))); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - - let row_groups = vec![0]; - let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( - file_name, - data, - &pruning_predicate, - &row_groups, - ) - .await - .unwrap(); - assert_eq!(pruned_row_groups, row_groups); + BloomFilterTest::new_data_index_bloom_encoding_stats() + .with_expect_none_pruned() + // generate pruning predicate `(String = "foo") OR (String != "bar")` + .run( + col(r#""String""#) + .not_eq(lit("foo")) + .or(col(r#""String""#).not_eq(lit("bar"))), + ) + .await } #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_without_bloom_filter() { - // load parquet file - let testdata = datafusion_common::test_util::parquet_test_data(); - let file_name = "alltypes_plain.parquet"; - let path = format!("{testdata}/{file_name}"); - let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - // generate pruning predicate on a column without a bloom filter - let schema = Schema::new(vec![Field::new("string_col", DataType::Utf8, false)]); - let expr = col(r#""string_col""#).eq(lit("0")); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + BloomFilterTest::new_all_types() + .with_expect_none_pruned() + .run(col(r#""string_col""#).eq(lit("0"))) + .await + } - let row_groups = vec![0]; - let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( - file_name, - data, - &pruning_predicate, - &row_groups, - ) - .await - .unwrap(); - assert_eq!(pruned_row_groups, row_groups); + struct BloomFilterTest { + file_name: String, + schema: Schema, + // which row groups should be attempted to prune + row_groups: Vec, + // which row groups are expected to be left after pruning. Must be set + // otherwise will panic on run() + post_pruning_row_groups: Option>, + } + + impl BloomFilterTest { + /// Return a test for data_index_bloom_encoding_stats.parquet + /// Note the values in the `String` column are: + /// ```sql + /// ❯ select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet'; + /// +-----------+ + /// | String | + /// +-----------+ + /// | Hello | + /// | This is | + /// | a | + /// | test | + /// | How | + /// | are you | + /// | doing | + /// | today | + /// | the quick | + /// | brown fox | + /// | jumps | + /// | over | + /// | the lazy | + /// | dog | + /// +-----------+ + /// ``` + fn new_data_index_bloom_encoding_stats() -> Self { + Self { + file_name: String::from("data_index_bloom_encoding_stats.parquet"), + schema: Schema::new(vec![Field::new("String", DataType::Utf8, false)]), + row_groups: vec![0], + post_pruning_row_groups: None, + } + } + + // Return a test for alltypes_plain.parquet + fn new_all_types() -> Self { + Self { + file_name: String::from("alltypes_plain.parquet"), + schema: Schema::new(vec![Field::new( + "string_col", + DataType::Utf8, + false, + )]), + row_groups: vec![0], + post_pruning_row_groups: None, + } + } + + /// Expect all row groups to be pruned + pub fn with_expect_all_pruned(mut self) -> Self { + self.post_pruning_row_groups = Some(vec![]); + self + } + + /// Expect all row groups not to be pruned + pub fn with_expect_none_pruned(mut self) -> Self { + self.post_pruning_row_groups = Some(self.row_groups.clone()); + self + } + + /// Prune this file using the specified expression and check that the expected row groups are left + async fn run(self, expr: Expr) { + let Self { + file_name, + schema, + row_groups, + post_pruning_row_groups, + } = self; + + let post_pruning_row_groups = + post_pruning_row_groups.expect("post_pruning_row_groups must be set"); + + let testdata = datafusion_common::test_util::parquet_test_data(); + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + &file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert_eq!(pruned_row_groups, post_pruning_row_groups); + } } async fn test_row_group_bloom_filter_pruning_predicate(