From 78153b7722225db7cfa9bb002083963330ad4cff Mon Sep 17 00:00:00 2001 From: logan-keede Date: Tue, 4 Feb 2025 02:55:42 +0530 Subject: [PATCH 1/9] make datafusion_catalog_listing --- Cargo.toml | 2 + datafusion/catalog-listing/Cargo.toml | 64 ++++ datafusion/catalog-listing/LICENSE.txt | 1 + datafusion/catalog-listing/NOTICE.txt | 1 + datafusion/catalog-listing/README.md | 28 ++ .../src}/helpers.rs | 239 ++------------- datafusion/catalog-listing/src/mod.rs | 278 ++++++++++++++++++ .../listing => catalog-listing/src}/url.rs | 11 +- datafusion/core/Cargo.toml | 1 + datafusion/core/src/datasource/listing/mod.rs | 259 +--------------- .../core/tests/catalog_listing/helpers.rs | 224 ++++++++++++++ datafusion/core/tests/catalog_listing/mod.rs | 18 ++ datafusion/core/tests/core_integration.rs | 2 + 13 files changed, 644 insertions(+), 484 deletions(-) create mode 100644 datafusion/catalog-listing/Cargo.toml create mode 120000 datafusion/catalog-listing/LICENSE.txt create mode 120000 datafusion/catalog-listing/NOTICE.txt create mode 100644 datafusion/catalog-listing/README.md rename datafusion/{core/src/datasource/listing => catalog-listing/src}/helpers.rs (77%) create mode 100644 datafusion/catalog-listing/src/mod.rs rename datafusion/{core/src/datasource/listing => catalog-listing/src}/url.rs (98%) create mode 100644 datafusion/core/tests/catalog_listing/helpers.rs create mode 100644 datafusion/core/tests/catalog_listing/mod.rs diff --git a/Cargo.toml b/Cargo.toml index c4930c61b9fe..2f2ae010c879 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "datafusion/common", "datafusion/common-runtime", "datafusion/catalog", + "datafusion/catalog-listing", "datafusion/core", "datafusion/expr", "datafusion/expr-common", @@ -100,6 +101,7 @@ ctor = "0.2.9" dashmap = "6.0.1" datafusion = { path = "datafusion/core", version = "45.0.0", default-features = false } datafusion-catalog = { path = "datafusion/catalog", version = "45.0.0" } +datafusion-catalog-listing = { path = "datafusion/catalog-listing", version = "45.0.0" } datafusion-common = { path = "datafusion/common", version = "45.0.0", default-features = false } datafusion-common-runtime = { path = "datafusion/common-runtime", version = "45.0.0" } datafusion-doc = { path = "datafusion/doc", version = "45.0.0" } diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml new file mode 100644 index 000000000000..2752e530264c --- /dev/null +++ b/datafusion/catalog-listing/Cargo.toml @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-catalog-listing" +description = "datafusion-catalog-listing" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +arrow = { workspace = true } +arrow-schema = { workspace = true } +async-compression = { version = "0.4.0", features = [ + "bzip2", + "gzip", + "xz", + "zstd", + "tokio", +], optional = true } +chrono = { workspace = true } +datafusion-catalog = { workspace = true } +datafusion-common = { workspace = true, features = ["object_store"] } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } +datafusion-physical-plan = { workspace = true } +futures = { workspace = true } +glob = "0.3.0" +itertools = { workspace = true } +log = { workspace = true } +object_store = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } +tokio = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_catalog_listing" +path = "src/mod.rs" diff --git a/datafusion/catalog-listing/LICENSE.txt b/datafusion/catalog-listing/LICENSE.txt new file mode 120000 index 000000000000..1ef648f64b34 --- /dev/null +++ b/datafusion/catalog-listing/LICENSE.txt @@ -0,0 +1 @@ +../../LICENSE.txt \ No newline at end of file diff --git a/datafusion/catalog-listing/NOTICE.txt b/datafusion/catalog-listing/NOTICE.txt new file mode 120000 index 000000000000..fb051c92b10b --- /dev/null +++ b/datafusion/catalog-listing/NOTICE.txt @@ -0,0 +1 @@ +../../NOTICE.txt \ No newline at end of file diff --git a/datafusion/catalog-listing/README.md b/datafusion/catalog-listing/README.md new file mode 100644 index 000000000000..fd2b80f913bf --- /dev/null +++ b/datafusion/catalog-listing/README.md @@ -0,0 +1,28 @@ + + +# DataFusion Catalog-listing + +[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. + +This crate is a submodule of DataFusion that provides catalog management functionality, including catalogs, schemas, and tables. + + + +[df]: https://crates.io/crates/datafusion diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/catalog-listing/src/helpers.rs similarity index 77% rename from datafusion/core/src/datasource/listing/helpers.rs rename to datafusion/catalog-listing/src/helpers.rs index 228b9a4e9f6b..94542446d952 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use super::ListingTableUrl; use super::PartitionedFile; -use crate::execution::context::SessionState; +use datafusion_catalog::Session; use datafusion_common::internal_err; use datafusion_common::{HashMap, Result, ScalarValue}; use datafusion_expr::{BinaryExpr, Operator}; @@ -154,7 +154,7 @@ pub fn split_files( chunks } -struct Partition { +pub struct Partition { /// The path to the partition, including the table prefix path: Path, /// How many path segments below the table prefix `path` contains @@ -183,7 +183,7 @@ impl Partition { } /// Returns a recursive list of the partitions in `table_path` up to `max_depth` -async fn list_partitions( +pub async fn list_partitions( store: &dyn ObjectStore, table_path: &ListingTableUrl, max_depth: usize, @@ -364,7 +364,7 @@ fn populate_partition_values<'a>( } } -fn evaluate_partition_prefix<'a>( +pub fn evaluate_partition_prefix<'a>( partition_cols: &'a [(String, DataType)], filters: &'a [Expr], ) -> Option { @@ -405,7 +405,7 @@ fn evaluate_partition_prefix<'a>( /// `filters` should only contain expressions that can be evaluated /// using only the partition columns. pub async fn pruned_partition_list<'a>( - ctx: &'a SessionState, + ctx: &'a dyn Session, store: &'a dyn ObjectStore, table_path: &'a ListingTableUrl, filters: &'a [Expr], @@ -489,7 +489,7 @@ pub async fn pruned_partition_list<'a>( /// Extract the partition values for the given `file_path` (in the given `table_path`) /// associated to the partitions defined by `table_partition_cols` -fn parse_partitions_for_path<'a, I>( +pub fn parse_partitions_for_path<'a, I>( table_path: &ListingTableUrl, file_path: &'a Path, table_partition_cols: I, @@ -517,14 +517,25 @@ where } Some(part_values) } +/// Describe a partition as a (path, depth, files) tuple for easier assertions +pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) { + ( + partition.path.as_ref(), + partition.depth, + partition + .files + .as_ref() + .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect()) + .unwrap_or_default(), + ) +} #[cfg(test)] mod tests { use std::ops::Not; - use futures::StreamExt; + // use futures::StreamExt; - use crate::test::object_store::make_test_store_and_state; use datafusion_expr::{case, col, lit, Expr}; use super::*; @@ -569,218 +580,6 @@ mod tests { assert_eq!(0, chunks.len()); } - #[tokio::test] - async fn test_pruned_partition_list_empty() { - let (store, state) = make_test_store_and_state(&[ - ("tablepath/mypartition=val1/notparquetfile", 100), - ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), - ("tablepath/file.parquet", 100), - ]); - let filter = Expr::eq(col("mypartition"), lit("val1")); - let pruned = pruned_partition_list( - &state, - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - &[filter], - ".parquet", - &[(String::from("mypartition"), DataType::Utf8)], - ) - .await - .expect("partition pruning failed") - .collect::>() - .await; - - assert_eq!(pruned.len(), 0); - } - - #[tokio::test] - async fn test_pruned_partition_list() { - let (store, state) = make_test_store_and_state(&[ - ("tablepath/mypartition=val1/file.parquet", 100), - ("tablepath/mypartition=val2/file.parquet", 100), - ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), - ("tablepath/mypartition=val1/other=val3/file.parquet", 100), - ]); - let filter = Expr::eq(col("mypartition"), lit("val1")); - let pruned = pruned_partition_list( - &state, - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - &[filter], - ".parquet", - &[(String::from("mypartition"), DataType::Utf8)], - ) - .await - .expect("partition pruning failed") - .try_collect::>() - .await - .unwrap(); - - assert_eq!(pruned.len(), 2); - let f1 = &pruned[0]; - assert_eq!( - f1.object_meta.location.as_ref(), - "tablepath/mypartition=val1/file.parquet" - ); - assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]); - let f2 = &pruned[1]; - assert_eq!( - f2.object_meta.location.as_ref(), - "tablepath/mypartition=val1/other=val3/file.parquet" - ); - assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); - } - - #[tokio::test] - async fn test_pruned_partition_list_multi() { - let (store, state) = make_test_store_and_state(&[ - ("tablepath/part1=p1v1/file.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), - ("tablepath/part1=p1v3/part2=p2v1/file2.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v2/file2.parquet", 100), - ]); - let filter1 = Expr::eq(col("part1"), lit("p1v2")); - let filter2 = Expr::eq(col("part2"), lit("p2v1")); - let pruned = pruned_partition_list( - &state, - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - &[filter1, filter2], - ".parquet", - &[ - (String::from("part1"), DataType::Utf8), - (String::from("part2"), DataType::Utf8), - ], - ) - .await - .expect("partition pruning failed") - .try_collect::>() - .await - .unwrap(); - - assert_eq!(pruned.len(), 2); - let f1 = &pruned[0]; - assert_eq!( - f1.object_meta.location.as_ref(), - "tablepath/part1=p1v2/part2=p2v1/file1.parquet" - ); - assert_eq!( - &f1.partition_values, - &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1"),] - ); - let f2 = &pruned[1]; - assert_eq!( - f2.object_meta.location.as_ref(), - "tablepath/part1=p1v2/part2=p2v1/file2.parquet" - ); - assert_eq!( - &f2.partition_values, - &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1")] - ); - } - - /// Describe a partition as a (path, depth, files) tuple for easier assertions - fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) { - ( - partition.path.as_ref(), - partition.depth, - partition - .files - .as_ref() - .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect()) - .unwrap_or_default(), - ) - } - - #[tokio::test] - async fn test_list_partition() { - let (store, _) = make_test_store_and_state(&[ - ("tablepath/part1=p1v1/file.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), - ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0), - ]); - - let partitions = list_partitions( - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - 0, - None, - ) - .await - .expect("listing partitions failed"); - - assert_eq!( - &partitions - .iter() - .map(describe_partition) - .collect::>(), - &vec![ - ("tablepath", 0, vec![]), - ("tablepath/part1=p1v1", 1, vec![]), - ("tablepath/part1=p1v2", 1, vec![]), - ("tablepath/part1=p1v3", 1, vec![]), - ] - ); - - let partitions = list_partitions( - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - 1, - None, - ) - .await - .expect("listing partitions failed"); - - assert_eq!( - &partitions - .iter() - .map(describe_partition) - .collect::>(), - &vec![ - ("tablepath", 0, vec![]), - ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), - ("tablepath/part1=p1v2", 1, vec![]), - ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]), - ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]), - ("tablepath/part1=p1v3", 1, vec![]), - ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]), - ] - ); - - let partitions = list_partitions( - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - 2, - None, - ) - .await - .expect("listing partitions failed"); - - assert_eq!( - &partitions - .iter() - .map(describe_partition) - .collect::>(), - &vec![ - ("tablepath", 0, vec![]), - ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), - ("tablepath/part1=p1v2", 1, vec![]), - ("tablepath/part1=p1v3", 1, vec![]), - ( - "tablepath/part1=p1v2/part2=p2v1", - 2, - vec!["file1.parquet", "file2.parquet"] - ), - ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]), - ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]), - ] - ); - } - #[test] fn test_parse_partitions_for_path() { assert_eq!( diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs new file mode 100644 index 000000000000..ade6ea9b3f73 --- /dev/null +++ b/datafusion/catalog-listing/src/mod.rs @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A table that uses the `ObjectStore` listing capability +//! to get the list of files to process. + +pub mod helpers; +pub mod url; + +use chrono::TimeZone; +use datafusion_common::Result; +use datafusion_common::{ScalarValue, Statistics}; +use futures::Stream; +use object_store::{path::Path, ObjectMeta}; +use std::pin::Pin; +use std::sync::Arc; + +pub use self::url::ListingTableUrl; + +/// Stream of files get listed from object store +pub type PartitionedFileStream = + Pin> + Send + Sync + 'static>>; + +/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint" +/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping +/// sections of a Parquet file in parallel. +#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)] +pub struct FileRange { + /// Range start + pub start: i64, + /// Range end + pub end: i64, +} + +impl FileRange { + /// returns true if this file range contains the specified offset + pub fn contains(&self, offset: i64) -> bool { + offset >= self.start && offset < self.end + } +} + +#[derive(Debug, Clone)] +/// A single file or part of a file that should be read, along with its schema, statistics +/// and partition column values that need to be appended to each row. +pub struct PartitionedFile { + /// Path for the file (e.g. URL, filesystem path, etc) + pub object_meta: ObjectMeta, + /// Values of partition columns to be appended to each row. + /// + /// These MUST have the same count, order, and type than the [`table_partition_cols`]. + /// + /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type. + /// + /// + /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict + /// [`wrap_partition_value_in_dict`]: crate::datasource::physical_plan::wrap_partition_value_in_dict + /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols + pub partition_values: Vec, + /// An optional file range for a more fine-grained parallel execution + pub range: Option, + /// Optional statistics that describe the data in this file if known. + /// + /// DataFusion relies on these statistics for planning (in particular to sort file groups), + /// so if they are incorrect, incorrect answers may result. + pub statistics: Option, + /// An optional field for user defined per object metadata + pub extensions: Option>, + /// The estimated size of the parquet metadata, in bytes + pub metadata_size_hint: Option, +} + +impl PartitionedFile { + /// Create a simple file without metadata or partition + pub fn new(path: impl Into, size: u64) -> Self { + Self { + object_meta: ObjectMeta { + location: Path::from(path.into()), + last_modified: chrono::Utc.timestamp_nanos(0), + size: size as usize, + e_tag: None, + version: None, + }, + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + metadata_size_hint: None, + } + } + + /// Create a file range without metadata or partition + pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self { + Self { + object_meta: ObjectMeta { + location: Path::from(path), + last_modified: chrono::Utc.timestamp_nanos(0), + size: size as usize, + e_tag: None, + version: None, + }, + partition_values: vec![], + range: Some(FileRange { start, end }), + statistics: None, + extensions: None, + metadata_size_hint: None, + } + .with_range(start, end) + } + + /// Provide a hint to the size of the file metadata. If a hint is provided + /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. + /// Without an appropriate hint, two read may be required to fetch the metadata. + pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { + self.metadata_size_hint = Some(metadata_size_hint); + self + } + + /// Return a file reference from the given path + pub fn from_path(path: String) -> Result { + let size = std::fs::metadata(path.clone())?.len(); + Ok(Self::new(path, size)) + } + + /// Return the path of this partitioned file + pub fn path(&self) -> &Path { + &self.object_meta.location + } + + /// Update the file to only scan the specified range (in bytes) + pub fn with_range(mut self, start: i64, end: i64) -> Self { + self.range = Some(FileRange { start, end }); + self + } + + /// Update the user defined extensions for this file. + /// + /// This can be used to pass reader specific information. + pub fn with_extensions( + mut self, + extensions: Arc, + ) -> Self { + self.extensions = Some(extensions); + self + } +} + +impl From for PartitionedFile { + fn from(object_meta: ObjectMeta) -> Self { + PartitionedFile { + object_meta, + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + metadata_size_hint: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::ListingTableUrl; + use datafusion_execution::object_store::{ + DefaultObjectStoreRegistry, ObjectStoreRegistry, + }; + use object_store::{local::LocalFileSystem, path::Path}; + use std::{ops::Not, sync::Arc}; + use url::Url; + + #[test] + fn test_object_store_listing_url() { + let listing = ListingTableUrl::parse("file:///").unwrap(); + let store = listing.object_store(); + assert_eq!(store.as_str(), "file:///"); + + let listing = ListingTableUrl::parse("s3://bucket/").unwrap(); + let store = listing.object_store(); + assert_eq!(store.as_str(), "s3://bucket/"); + } + + #[test] + fn test_get_store_hdfs() { + let sut = DefaultObjectStoreRegistry::default(); + let url = Url::parse("hdfs://localhost:8020").unwrap(); + sut.register_store(&url, Arc::new(LocalFileSystem::new())); + let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_get_store_s3() { + let sut = DefaultObjectStoreRegistry::default(); + let url = Url::parse("s3://bucket/key").unwrap(); + sut.register_store(&url, Arc::new(LocalFileSystem::new())); + let url = ListingTableUrl::parse("s3://bucket/key").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_get_store_file() { + let sut = DefaultObjectStoreRegistry::default(); + let url = ListingTableUrl::parse("file:///bucket/key").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_get_store_local() { + let sut = DefaultObjectStoreRegistry::default(); + let url = ListingTableUrl::parse("../").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_url_contains() { + let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap(); + + // standard case with default config + assert!(url.contains( + &Path::parse("/var/data/mytable/data.parquet").unwrap(), + true + )); + + // standard case with `ignore_subdirectory` set to false + assert!(url.contains( + &Path::parse("/var/data/mytable/data.parquet").unwrap(), + false + )); + + // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't + // a direct child of the `url` + assert!(url + .contains( + &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), + true + ) + .not()); + + // when we set `ignore_subdirectory` to false, we should not ignore the file + assert!(url.contains( + &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), + false + )); + + // as above, `ignore_subdirectory` is false, so we include the file + assert!(url.contains( + &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), + false + )); + + // in this case, we include the file even when `ignore_subdirectory` is true because the + // path segment is a hive partition which doesn't count as a subdirectory for the purposes + // of `Url::contains` + assert!(url.contains( + &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), + true + )); + + // testing an empty path with default config + assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true)); + + // testing an empty path with `ignore_subdirectory` set to false + assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false)); + } +} diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/catalog-listing/src/url.rs similarity index 98% rename from datafusion/core/src/datasource/listing/url.rs rename to datafusion/catalog-listing/src/url.rs index 6fb536ca2f05..2a9998513159 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/catalog-listing/src/url.rs @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::context::SessionState; +use datafusion_catalog::Session; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::object_store::ObjectStoreUrl; -use datafusion_optimizer::OptimizerConfig; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use glob::Pattern; @@ -194,7 +193,7 @@ impl ListingTableUrl { /// /// Examples: /// ```rust - /// use datafusion::datasource::listing::ListingTableUrl; + /// use datafusion_catalog_listing::ListingTableUrl; /// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap(); /// assert_eq!(url.file_extension(), Some("csv")); /// let url = ListingTableUrl::parse("file:///foo/bar").unwrap(); @@ -216,7 +215,7 @@ impl ListingTableUrl { /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning /// an iterator of the remaining path segments - pub(crate) fn strip_prefix<'a, 'b: 'a>( + pub fn strip_prefix<'a, 'b: 'a>( &'a self, path: &'b Path, ) -> Option + 'a> { @@ -230,11 +229,11 @@ impl ListingTableUrl { /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension` pub async fn list_all_files<'a>( &'a self, - ctx: &'a SessionState, + ctx: &'a dyn Session, store: &'a dyn ObjectStore, file_extension: &'a str, ) -> Result>> { - let exec_options = &ctx.options().execution; + let exec_options = &ctx.config_options().execution; let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory; // If the prefix is a file, use a head request, otherwise list let list = match self.is_collection() { diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 28c54cb444ce..3d3af66a1d0b 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -100,6 +100,7 @@ bytes = { workspace = true } bzip2 = { version = "0.5.0", optional = true } chrono = { workspace = true } datafusion-catalog = { workspace = true } +datafusion-catalog-listing = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index f11653ce1e52..39323b993d45 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -18,263 +18,6 @@ //! A table that uses the `ObjectStore` listing capability //! to get the list of files to process. -mod helpers; mod table; -mod url; - -use chrono::TimeZone; -use datafusion_common::Result; -use datafusion_common::{ScalarValue, Statistics}; -use futures::Stream; -use object_store::{path::Path, ObjectMeta}; -use std::pin::Pin; -use std::sync::Arc; - -pub use self::url::ListingTableUrl; +pub use datafusion_catalog_listing::*; pub use table::{ListingOptions, ListingTable, ListingTableConfig}; - -/// Stream of files get listed from object store -pub type PartitionedFileStream = - Pin> + Send + Sync + 'static>>; - -/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint" -/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping -/// sections of a Parquet file in parallel. -#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)] -pub struct FileRange { - /// Range start - pub start: i64, - /// Range end - pub end: i64, -} - -impl FileRange { - /// returns true if this file range contains the specified offset - pub fn contains(&self, offset: i64) -> bool { - offset >= self.start && offset < self.end - } -} - -#[derive(Debug, Clone)] -/// A single file or part of a file that should be read, along with its schema, statistics -/// and partition column values that need to be appended to each row. -pub struct PartitionedFile { - /// Path for the file (e.g. URL, filesystem path, etc) - pub object_meta: ObjectMeta, - /// Values of partition columns to be appended to each row. - /// - /// These MUST have the same count, order, and type than the [`table_partition_cols`]. - /// - /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type. - /// - /// - /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict - /// [`wrap_partition_value_in_dict`]: crate::datasource::physical_plan::wrap_partition_value_in_dict - /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols - pub partition_values: Vec, - /// An optional file range for a more fine-grained parallel execution - pub range: Option, - /// Optional statistics that describe the data in this file if known. - /// - /// DataFusion relies on these statistics for planning (in particular to sort file groups), - /// so if they are incorrect, incorrect answers may result. - pub statistics: Option, - /// An optional field for user defined per object metadata - pub extensions: Option>, - /// The estimated size of the parquet metadata, in bytes - pub metadata_size_hint: Option, -} - -impl PartitionedFile { - /// Create a simple file without metadata or partition - pub fn new(path: impl Into, size: u64) -> Self { - Self { - object_meta: ObjectMeta { - location: Path::from(path.into()), - last_modified: chrono::Utc.timestamp_nanos(0), - size: size as usize, - e_tag: None, - version: None, - }, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - } - } - - /// Create a file range without metadata or partition - pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self { - Self { - object_meta: ObjectMeta { - location: Path::from(path), - last_modified: chrono::Utc.timestamp_nanos(0), - size: size as usize, - e_tag: None, - version: None, - }, - partition_values: vec![], - range: Some(FileRange { start, end }), - statistics: None, - extensions: None, - metadata_size_hint: None, - } - .with_range(start, end) - } - - /// Provide a hint to the size of the file metadata. If a hint is provided - /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. - /// Without an appropriate hint, two read may be required to fetch the metadata. - pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { - self.metadata_size_hint = Some(metadata_size_hint); - self - } - - /// Return a file reference from the given path - pub fn from_path(path: String) -> Result { - let size = std::fs::metadata(path.clone())?.len(); - Ok(Self::new(path, size)) - } - - /// Return the path of this partitioned file - pub fn path(&self) -> &Path { - &self.object_meta.location - } - - /// Update the file to only scan the specified range (in bytes) - pub fn with_range(mut self, start: i64, end: i64) -> Self { - self.range = Some(FileRange { start, end }); - self - } - - /// Update the user defined extensions for this file. - /// - /// This can be used to pass reader specific information. - pub fn with_extensions( - mut self, - extensions: Arc, - ) -> Self { - self.extensions = Some(extensions); - self - } -} - -impl From for PartitionedFile { - fn from(object_meta: ObjectMeta) -> Self { - PartitionedFile { - object_meta, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - } - } -} - -#[cfg(test)] -mod tests { - use super::ListingTableUrl; - use datafusion_execution::object_store::{ - DefaultObjectStoreRegistry, ObjectStoreRegistry, - }; - use object_store::{local::LocalFileSystem, path::Path}; - use std::{ops::Not, sync::Arc}; - use url::Url; - - #[test] - fn test_object_store_listing_url() { - let listing = ListingTableUrl::parse("file:///").unwrap(); - let store = listing.object_store(); - assert_eq!(store.as_str(), "file:///"); - - let listing = ListingTableUrl::parse("s3://bucket/").unwrap(); - let store = listing.object_store(); - assert_eq!(store.as_str(), "s3://bucket/"); - } - - #[test] - fn test_get_store_hdfs() { - let sut = DefaultObjectStoreRegistry::default(); - let url = Url::parse("hdfs://localhost:8020").unwrap(); - sut.register_store(&url, Arc::new(LocalFileSystem::new())); - let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_get_store_s3() { - let sut = DefaultObjectStoreRegistry::default(); - let url = Url::parse("s3://bucket/key").unwrap(); - sut.register_store(&url, Arc::new(LocalFileSystem::new())); - let url = ListingTableUrl::parse("s3://bucket/key").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_get_store_file() { - let sut = DefaultObjectStoreRegistry::default(); - let url = ListingTableUrl::parse("file:///bucket/key").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_get_store_local() { - let sut = DefaultObjectStoreRegistry::default(); - let url = ListingTableUrl::parse("../").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_url_contains() { - let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap(); - - // standard case with default config - assert!(url.contains( - &Path::parse("/var/data/mytable/data.parquet").unwrap(), - true - )); - - // standard case with `ignore_subdirectory` set to false - assert!(url.contains( - &Path::parse("/var/data/mytable/data.parquet").unwrap(), - false - )); - - // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't - // a direct child of the `url` - assert!(url - .contains( - &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), - true - ) - .not()); - - // when we set `ignore_subdirectory` to false, we should not ignore the file - assert!(url.contains( - &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), - false - )); - - // as above, `ignore_subdirectory` is false, so we include the file - assert!(url.contains( - &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), - false - )); - - // in this case, we include the file even when `ignore_subdirectory` is true because the - // path segment is a hive partition which doesn't count as a subdirectory for the purposes - // of `Url::contains` - assert!(url.contains( - &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), - true - )); - - // testing an empty path with default config - assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true)); - - // testing an empty path with `ignore_subdirectory` set to false - assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false)); - } -} diff --git a/datafusion/core/tests/catalog_listing/helpers.rs b/datafusion/core/tests/catalog_listing/helpers.rs new file mode 100644 index 000000000000..6bd0feae70ed --- /dev/null +++ b/datafusion/core/tests/catalog_listing/helpers.rs @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::DataType; +use datafusion::test::object_store::make_test_store_and_state; +use datafusion_catalog_listing::helpers::*; +use datafusion_catalog_listing::*; +use datafusion_common::ScalarValue; +use datafusion_expr::{col, lit, Expr}; +use futures::StreamExt; +use futures::TryStreamExt; + +#[tokio::test] +async fn test_pruned_partition_list_empty() { + let (store, state) = make_test_store_and_state(&[ + ("tablepath/mypartition=val1/notparquetfile", 100), + ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), + ("tablepath/file.parquet", 100), + ]); + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + &state, + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[(String::from("mypartition"), DataType::Utf8)], + ) + .await + .expect("partition pruning failed") + .collect::>() + .await; + + assert_eq!(pruned.len(), 0); +} + +#[tokio::test] +async fn test_pruned_partition_list() { + let (store, state) = make_test_store_and_state(&[ + ("tablepath/mypartition=val1/file.parquet", 100), + ("tablepath/mypartition=val2/file.parquet", 100), + ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), + ("tablepath/mypartition=val1/other=val3/file.parquet", 100), + ]); + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + &state, + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[(String::from("mypartition"), DataType::Utf8)], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + let f1 = &pruned[0]; + assert_eq!( + f1.object_meta.location.as_ref(), + "tablepath/mypartition=val1/file.parquet" + ); + assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]); + let f2 = &pruned[1]; + assert_eq!( + f2.object_meta.location.as_ref(), + "tablepath/mypartition=val1/other=val3/file.parquet" + ); + assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); +} + +#[tokio::test] +async fn test_pruned_partition_list_multi() { + let (store, state) = make_test_store_and_state(&[ + ("tablepath/part1=p1v1/file.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), + ("tablepath/part1=p1v3/part2=p2v1/file2.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/file2.parquet", 100), + ]); + let filter1 = Expr::eq(col("part1"), lit("p1v2")); + let filter2 = Expr::eq(col("part2"), lit("p2v1")); + let pruned = pruned_partition_list( + &state, + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter1, filter2], + ".parquet", + &[ + (String::from("part1"), DataType::Utf8), + (String::from("part2"), DataType::Utf8), + ], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + let f1 = &pruned[0]; + assert_eq!( + f1.object_meta.location.as_ref(), + "tablepath/part1=p1v2/part2=p2v1/file1.parquet" + ); + assert_eq!( + &f1.partition_values, + &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1"),] + ); + let f2 = &pruned[1]; + assert_eq!( + f2.object_meta.location.as_ref(), + "tablepath/part1=p1v2/part2=p2v1/file2.parquet" + ); + assert_eq!( + &f2.partition_values, + &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1")] + ); +} + +#[tokio::test] +async fn test_list_partition() { + let (store, _) = make_test_store_and_state(&[ + ("tablepath/part1=p1v1/file.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), + ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0), + ]); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 0, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec![]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ] + ); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 1, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]), + ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]), + ] + ); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 2, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ( + "tablepath/part1=p1v2/part2=p2v1", + 2, + vec!["file1.parquet", "file2.parquet"] + ), + ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]), + ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]), + ] + ); +} diff --git a/datafusion/core/tests/catalog_listing/mod.rs b/datafusion/core/tests/catalog_listing/mod.rs new file mode 100644 index 000000000000..9df4cc1fa3ac --- /dev/null +++ b/datafusion/core/tests/catalog_listing/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod helpers; diff --git a/datafusion/core/tests/core_integration.rs b/datafusion/core/tests/core_integration.rs index 66b4103160e7..fb182baf1f87 100644 --- a/datafusion/core/tests/core_integration.rs +++ b/datafusion/core/tests/core_integration.rs @@ -46,6 +46,8 @@ mod physical_optimizer; mod catalog; +mod catalog_listing; + #[cfg(test)] #[ctor::ctor] fn init() { From 5744702c971748a2fd094ec01845090cab3c745f Mon Sep 17 00:00:00 2001 From: logan-keede Date: Tue, 4 Feb 2025 03:10:15 +0530 Subject: [PATCH 2/9] fix: this is a bit hacky --- datafusion/core/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 780b22983393..c3d0172f251f 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -809,7 +809,7 @@ pub mod variable { pub use datafusion_expr::var_provider::{VarProvider, VarType}; } -#[cfg(test)] +// #[cfg(test)] pub mod test; pub mod test_util; From ad46ac6c75cac7bb30816124f933b2e03618c711 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Tue, 4 Feb 2025 10:48:04 +0530 Subject: [PATCH 3/9] fixes: prettier, taplo etc --- datafusion-cli/Cargo.lock | 23 +++++++++++++++++++++++ datafusion/catalog-listing/README.md | 2 -- datafusion/catalog-listing/src/mod.rs | 6 +++--- datafusion/core/src/lib.rs | 2 +- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 060675098705..3eb408fbb7b4 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1222,6 +1222,7 @@ dependencies = [ "bzip2 0.5.0", "chrono", "datafusion-catalog", + "datafusion-catalog-listing", "datafusion-common", "datafusion-common-runtime", "datafusion-execution", @@ -1277,6 +1278,28 @@ dependencies = [ "sqlparser", ] +[[package]] +name = "datafusion-catalog-listing" +version = "45.0.0" +dependencies = [ + "arrow", + "arrow-schema", + "chrono", + "datafusion-catalog", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "datafusion-physical-plan", + "futures", + "glob", + "itertools 0.14.0", + "log", + "object_store", + "url", +] + [[package]] name = "datafusion-cli" version = "45.0.0" diff --git a/datafusion/catalog-listing/README.md b/datafusion/catalog-listing/README.md index fd2b80f913bf..65d480449130 100644 --- a/datafusion/catalog-listing/README.md +++ b/datafusion/catalog-listing/README.md @@ -23,6 +23,4 @@ This crate is a submodule of DataFusion that provides catalog management functionality, including catalogs, schemas, and tables. - - [df]: https://crates.io/crates/datafusion diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs index ade6ea9b3f73..3c5cdcb219b1 100644 --- a/datafusion/catalog-listing/src/mod.rs +++ b/datafusion/catalog-listing/src/mod.rs @@ -66,9 +66,9 @@ pub struct PartitionedFile { /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type. /// /// - /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict - /// [`wrap_partition_value_in_dict`]: crate::datasource::physical_plan::wrap_partition_value_in_dict - /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols + /// [`wrap_partition_type_in_dict`]: https://github.com/apache/datafusion/blob/ea788c72dddf24acd8dc8068cede939e11dfab84/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55 + /// [`wrap_partition_value_in_dict`]: https://github.com/apache/datafusion/blob/ea788c72dddf24acd8dc8068cede939e11dfab84/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62 + /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/ea788c72dddf24acd8dc8068cede939e11dfab84/datafusion/core/src/datasource/file_format/options.rs#L190 pub partition_values: Vec, /// An optional file range for a more fine-grained parallel execution pub range: Option, diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index c3d0172f251f..780b22983393 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -809,7 +809,7 @@ pub mod variable { pub use datafusion_expr::var_provider::{VarProvider, VarType}; } -// #[cfg(test)] +#[cfg(test)] pub mod test; pub mod test_util; From 83a8ede2f52cb13a9e8b79387773b5b185e04976 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Tue, 4 Feb 2025 23:55:19 +0530 Subject: [PATCH 4/9] fixes: clippy --- datafusion/core/src/lib.rs | 2 +- datafusion/core/src/test/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 780b22983393..7a6f8f68cbdf 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -809,8 +809,8 @@ pub mod variable { pub use datafusion_expr::var_provider::{VarProvider, VarType}; } -#[cfg(test)] pub mod test; + pub mod test_util; #[cfg(doctest)] diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 0d659582aca3..05e63a3c4fd4 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -61,7 +61,7 @@ pub fn create_table_dual() -> Arc { Field::new("name", DataType::Utf8, false), ])); let batch = RecordBatch::try_new( - dual_schema.clone(), + Arc::::clone(&dual_schema), vec![ Arc::new(Int32Array::from(vec![1])), Arc::new(array::StringArray::from(vec!["a"])), @@ -244,7 +244,7 @@ pub fn table_with_sequence( let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)])); let arr = Arc::new(Int32Array::from((seq_start..=seq_end).collect::>())); let partitions = vec![vec![RecordBatch::try_new( - schema.clone(), + Arc::::clone(&schema), vec![arr as ArrayRef], )?]]; Ok(Arc::new(MemTable::try_new(schema, partitions)?)) From c81e2f4182b458e5475003ad0b28f8b4a23d8fea Mon Sep 17 00:00:00 2001 From: logan-keede Date: Tue, 4 Feb 2025 23:59:04 +0530 Subject: [PATCH 5/9] minor: permalink commit hash -> main --- datafusion/catalog-listing/src/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs index 3c5cdcb219b1..e952e39fd479 100644 --- a/datafusion/catalog-listing/src/mod.rs +++ b/datafusion/catalog-listing/src/mod.rs @@ -66,9 +66,9 @@ pub struct PartitionedFile { /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type. /// /// - /// [`wrap_partition_type_in_dict`]: https://github.com/apache/datafusion/blob/ea788c72dddf24acd8dc8068cede939e11dfab84/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55 - /// [`wrap_partition_value_in_dict`]: https://github.com/apache/datafusion/blob/ea788c72dddf24acd8dc8068cede939e11dfab84/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62 - /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/ea788c72dddf24acd8dc8068cede939e11dfab84/datafusion/core/src/datasource/file_format/options.rs#L190 + /// [`wrap_partition_type_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55 + /// [`wrap_partition_value_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62 + /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L190 pub partition_values: Vec, /// An optional file range for a more fine-grained parallel execution pub range: Option, From fa6e2fdfe063da54e17cc71885c34433da1cb9d0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 4 Feb 2025 13:30:35 -0500 Subject: [PATCH 6/9] Tweak README --- datafusion/catalog-listing/README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/catalog-listing/README.md b/datafusion/catalog-listing/README.md index 65d480449130..771fe9eaab1d 100644 --- a/datafusion/catalog-listing/README.md +++ b/datafusion/catalog-listing/README.md @@ -17,10 +17,14 @@ under the License. --> -# DataFusion Catalog-listing +# DataFusion catalog-listing [DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. -This crate is a submodule of DataFusion that provides catalog management functionality, including catalogs, schemas, and tables. +This crate is a submodule of DataFusion with [ListingTable], an implementation +of [TableProvider] based on files in a directory (either locally or on remote +object storage such as S3). [df]: https://crates.io/crates/datafusion +[ListingTable]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html +[TableProvider]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html From 693f2204255cb1dd286d2d5118c18db671a85d67 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Wed, 5 Feb 2025 02:30:50 +0530 Subject: [PATCH 7/9] fix:prettier + wasm --- datafusion/catalog-listing/src/url.rs | 2 ++ datafusion/core/src/datasource/file_format/mod.rs | 2 ++ datafusion/core/src/lib.rs | 1 + 3 files changed, 5 insertions(+) diff --git a/datafusion/catalog-listing/src/url.rs b/datafusion/catalog-listing/src/url.rs index 2a9998513159..2e6415ba3b2b 100644 --- a/datafusion/catalog-listing/src/url.rs +++ b/datafusion/catalog-listing/src/url.rs @@ -324,6 +324,7 @@ impl std::fmt::Display for ListingTableUrl { } } +#[cfg(not(target_arch = "wasm32"))] const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; /// Splits `path` at the first path segment containing a glob expression, returning @@ -332,6 +333,7 @@ const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; /// Path delimiters are determined using [`std::path::is_separator`] which /// permits `/` as a path delimiter even on Windows platforms. /// +#[cfg(not(target_arch = "wasm32"))] fn split_glob_expression(path: &str) -> Option<(&str, &str)> { let mut last_separator = 0; diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index f47e2107ade6..2e2e6dba1c0e 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -425,6 +425,7 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema { } /// Coerces the file schema if the table schema uses a view type. +#[cfg(not(target_arch = "wasm32"))] pub(crate) fn coerce_file_schema_to_view_type( table_schema: &Schema, file_schema: &Schema, @@ -489,6 +490,7 @@ pub fn transform_binary_to_string(schema: &Schema) -> Schema { /// If the table schema uses a string type, coerce the file schema to use a string type. /// /// See [parquet::ParquetFormat::binary_as_string] for details +#[cfg(not(target_arch = "wasm32"))] pub(crate) fn coerce_file_schema_to_string_type( table_schema: &Schema, file_schema: &Schema, diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 7a6f8f68cbdf..ca0aa92ff1ed 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -809,6 +809,7 @@ pub mod variable { pub use datafusion_expr::var_provider::{VarProvider, VarType}; } +#[cfg(not(target_arch = "wasm32"))] pub mod test; pub mod test_util; From 9fd1eb33c9876f4c6bd05ebbe1a559f473cb3155 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Wed, 5 Feb 2025 02:42:11 +0530 Subject: [PATCH 8/9] prettier --- datafusion/catalog-listing/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/catalog-listing/README.md b/datafusion/catalog-listing/README.md index 771fe9eaab1d..b4760c413d60 100644 --- a/datafusion/catalog-listing/README.md +++ b/datafusion/catalog-listing/README.md @@ -26,5 +26,5 @@ of [TableProvider] based on files in a directory (either locally or on remote object storage such as S3). [df]: https://crates.io/crates/datafusion -[ListingTable]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html -[TableProvider]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html +[listingtable]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html +[tableprovider]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html From 53dbc2fb1b175cfaed4fe94f2331f8f2fcd71d1c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Feb 2025 08:24:27 -0500 Subject: [PATCH 9/9] Put unit tests with code --- datafusion/catalog-listing/Cargo.toml | 1 + datafusion/catalog-listing/src/helpers.rs | 283 +++++++++++++++++- .../core/tests/catalog_listing/helpers.rs | 224 -------------- datafusion/core/tests/catalog_listing/mod.rs | 18 -- datafusion/core/tests/core_integration.rs | 2 - 5 files changed, 281 insertions(+), 247 deletions(-) delete mode 100644 datafusion/core/tests/catalog_listing/helpers.rs delete mode 100644 datafusion/core/tests/catalog_listing/mod.rs diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index 2752e530264c..03132e7b7bb5 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -53,6 +53,7 @@ object_store = { workspace = true } url = { workspace = true } [dev-dependencies] +async-trait = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 94542446d952..6cb3f661e652 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -532,13 +532,21 @@ pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) { #[cfg(test)] mod tests { + use async_trait::async_trait; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::runtime_env::RuntimeEnv; + use futures::FutureExt; + use object_store::memory::InMemory; + use std::any::Any; use std::ops::Not; - // use futures::StreamExt; - use datafusion_expr::{case, col, lit, Expr}; - use super::*; + use datafusion_expr::{ + case, col, lit, AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF, + }; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_plan::ExecutionPlan; #[test] fn test_split_files() { @@ -580,6 +588,205 @@ mod tests { assert_eq!(0, chunks.len()); } + #[tokio::test] + async fn test_pruned_partition_list_empty() { + let (store, state) = make_test_store_and_state(&[ + ("tablepath/mypartition=val1/notparquetfile", 100), + ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), + ("tablepath/file.parquet", 100), + ]); + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + state.as_ref(), + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[(String::from("mypartition"), DataType::Utf8)], + ) + .await + .expect("partition pruning failed") + .collect::>() + .await; + + assert_eq!(pruned.len(), 0); + } + + #[tokio::test] + async fn test_pruned_partition_list() { + let (store, state) = make_test_store_and_state(&[ + ("tablepath/mypartition=val1/file.parquet", 100), + ("tablepath/mypartition=val2/file.parquet", 100), + ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), + ("tablepath/mypartition=val1/other=val3/file.parquet", 100), + ]); + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + state.as_ref(), + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[(String::from("mypartition"), DataType::Utf8)], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + let f1 = &pruned[0]; + assert_eq!( + f1.object_meta.location.as_ref(), + "tablepath/mypartition=val1/file.parquet" + ); + assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]); + let f2 = &pruned[1]; + assert_eq!( + f2.object_meta.location.as_ref(), + "tablepath/mypartition=val1/other=val3/file.parquet" + ); + assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); + } + + #[tokio::test] + async fn test_pruned_partition_list_multi() { + let (store, state) = make_test_store_and_state(&[ + ("tablepath/part1=p1v1/file.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), + ("tablepath/part1=p1v3/part2=p2v1/file2.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/file2.parquet", 100), + ]); + let filter1 = Expr::eq(col("part1"), lit("p1v2")); + let filter2 = Expr::eq(col("part2"), lit("p2v1")); + let pruned = pruned_partition_list( + state.as_ref(), + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter1, filter2], + ".parquet", + &[ + (String::from("part1"), DataType::Utf8), + (String::from("part2"), DataType::Utf8), + ], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + let f1 = &pruned[0]; + assert_eq!( + f1.object_meta.location.as_ref(), + "tablepath/part1=p1v2/part2=p2v1/file1.parquet" + ); + assert_eq!( + &f1.partition_values, + &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1"),] + ); + let f2 = &pruned[1]; + assert_eq!( + f2.object_meta.location.as_ref(), + "tablepath/part1=p1v2/part2=p2v1/file2.parquet" + ); + assert_eq!( + &f2.partition_values, + &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1")] + ); + } + + #[tokio::test] + async fn test_list_partition() { + let (store, _) = make_test_store_and_state(&[ + ("tablepath/part1=p1v1/file.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), + ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0), + ]); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 0, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec![]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ] + ); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 1, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]), + ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]), + ] + ); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 2, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ( + "tablepath/part1=p1v2/part2=p2v1", + 2, + vec!["file1.parquet", "file2.parquet"] + ), + ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]), + ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]), + ] + ); + } + #[test] fn test_parse_partitions_for_path() { assert_eq!( @@ -793,4 +1000,74 @@ mod tests { Some(Path::from("a=1970-01-05")), ); } + + pub fn make_test_store_and_state( + files: &[(&str, u64)], + ) -> (Arc, Arc) { + let memory = InMemory::new(); + + for (name, size) in files { + memory + .put(&Path::from(*name), vec![0; *size as usize].into()) + .now_or_never() + .unwrap() + .unwrap(); + } + + (Arc::new(memory), Arc::new(MockSession {})) + } + + struct MockSession {} + + #[async_trait] + impl Session for MockSession { + fn session_id(&self) -> &str { + unimplemented!() + } + + fn config(&self) -> &SessionConfig { + unimplemented!() + } + + async fn create_physical_plan( + &self, + _logical_plan: &LogicalPlan, + ) -> Result> { + unimplemented!() + } + + fn create_physical_expr( + &self, + _expr: Expr, + _df_schema: &DFSchema, + ) -> Result> { + unimplemented!() + } + + fn scalar_functions(&self) -> &std::collections::HashMap> { + unimplemented!() + } + + fn aggregate_functions( + &self, + ) -> &std::collections::HashMap> { + unimplemented!() + } + + fn window_functions(&self) -> &std::collections::HashMap> { + unimplemented!() + } + + fn runtime_env(&self) -> &Arc { + unimplemented!() + } + + fn execution_props(&self) -> &ExecutionProps { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + unimplemented!() + } + } } diff --git a/datafusion/core/tests/catalog_listing/helpers.rs b/datafusion/core/tests/catalog_listing/helpers.rs deleted file mode 100644 index 6bd0feae70ed..000000000000 --- a/datafusion/core/tests/catalog_listing/helpers.rs +++ /dev/null @@ -1,224 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::datatypes::DataType; -use datafusion::test::object_store::make_test_store_and_state; -use datafusion_catalog_listing::helpers::*; -use datafusion_catalog_listing::*; -use datafusion_common::ScalarValue; -use datafusion_expr::{col, lit, Expr}; -use futures::StreamExt; -use futures::TryStreamExt; - -#[tokio::test] -async fn test_pruned_partition_list_empty() { - let (store, state) = make_test_store_and_state(&[ - ("tablepath/mypartition=val1/notparquetfile", 100), - ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), - ("tablepath/file.parquet", 100), - ]); - let filter = Expr::eq(col("mypartition"), lit("val1")); - let pruned = pruned_partition_list( - &state, - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - &[filter], - ".parquet", - &[(String::from("mypartition"), DataType::Utf8)], - ) - .await - .expect("partition pruning failed") - .collect::>() - .await; - - assert_eq!(pruned.len(), 0); -} - -#[tokio::test] -async fn test_pruned_partition_list() { - let (store, state) = make_test_store_and_state(&[ - ("tablepath/mypartition=val1/file.parquet", 100), - ("tablepath/mypartition=val2/file.parquet", 100), - ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), - ("tablepath/mypartition=val1/other=val3/file.parquet", 100), - ]); - let filter = Expr::eq(col("mypartition"), lit("val1")); - let pruned = pruned_partition_list( - &state, - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - &[filter], - ".parquet", - &[(String::from("mypartition"), DataType::Utf8)], - ) - .await - .expect("partition pruning failed") - .try_collect::>() - .await - .unwrap(); - - assert_eq!(pruned.len(), 2); - let f1 = &pruned[0]; - assert_eq!( - f1.object_meta.location.as_ref(), - "tablepath/mypartition=val1/file.parquet" - ); - assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]); - let f2 = &pruned[1]; - assert_eq!( - f2.object_meta.location.as_ref(), - "tablepath/mypartition=val1/other=val3/file.parquet" - ); - assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); -} - -#[tokio::test] -async fn test_pruned_partition_list_multi() { - let (store, state) = make_test_store_and_state(&[ - ("tablepath/part1=p1v1/file.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), - ("tablepath/part1=p1v3/part2=p2v1/file2.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v2/file2.parquet", 100), - ]); - let filter1 = Expr::eq(col("part1"), lit("p1v2")); - let filter2 = Expr::eq(col("part2"), lit("p2v1")); - let pruned = pruned_partition_list( - &state, - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - &[filter1, filter2], - ".parquet", - &[ - (String::from("part1"), DataType::Utf8), - (String::from("part2"), DataType::Utf8), - ], - ) - .await - .expect("partition pruning failed") - .try_collect::>() - .await - .unwrap(); - - assert_eq!(pruned.len(), 2); - let f1 = &pruned[0]; - assert_eq!( - f1.object_meta.location.as_ref(), - "tablepath/part1=p1v2/part2=p2v1/file1.parquet" - ); - assert_eq!( - &f1.partition_values, - &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1"),] - ); - let f2 = &pruned[1]; - assert_eq!( - f2.object_meta.location.as_ref(), - "tablepath/part1=p1v2/part2=p2v1/file2.parquet" - ); - assert_eq!( - &f2.partition_values, - &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1")] - ); -} - -#[tokio::test] -async fn test_list_partition() { - let (store, _) = make_test_store_and_state(&[ - ("tablepath/part1=p1v1/file.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), - ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0), - ]); - - let partitions = list_partitions( - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - 0, - None, - ) - .await - .expect("listing partitions failed"); - - assert_eq!( - &partitions - .iter() - .map(describe_partition) - .collect::>(), - &vec![ - ("tablepath", 0, vec![]), - ("tablepath/part1=p1v1", 1, vec![]), - ("tablepath/part1=p1v2", 1, vec![]), - ("tablepath/part1=p1v3", 1, vec![]), - ] - ); - - let partitions = list_partitions( - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - 1, - None, - ) - .await - .expect("listing partitions failed"); - - assert_eq!( - &partitions - .iter() - .map(describe_partition) - .collect::>(), - &vec![ - ("tablepath", 0, vec![]), - ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), - ("tablepath/part1=p1v2", 1, vec![]), - ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]), - ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]), - ("tablepath/part1=p1v3", 1, vec![]), - ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]), - ] - ); - - let partitions = list_partitions( - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - 2, - None, - ) - .await - .expect("listing partitions failed"); - - assert_eq!( - &partitions - .iter() - .map(describe_partition) - .collect::>(), - &vec![ - ("tablepath", 0, vec![]), - ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), - ("tablepath/part1=p1v2", 1, vec![]), - ("tablepath/part1=p1v3", 1, vec![]), - ( - "tablepath/part1=p1v2/part2=p2v1", - 2, - vec!["file1.parquet", "file2.parquet"] - ), - ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]), - ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]), - ] - ); -} diff --git a/datafusion/core/tests/catalog_listing/mod.rs b/datafusion/core/tests/catalog_listing/mod.rs deleted file mode 100644 index 9df4cc1fa3ac..000000000000 --- a/datafusion/core/tests/catalog_listing/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -mod helpers; diff --git a/datafusion/core/tests/core_integration.rs b/datafusion/core/tests/core_integration.rs index fb182baf1f87..66b4103160e7 100644 --- a/datafusion/core/tests/core_integration.rs +++ b/datafusion/core/tests/core_integration.rs @@ -46,8 +46,6 @@ mod physical_optimizer; mod catalog; -mod catalog_listing; - #[cfg(test)] #[ctor::ctor] fn init() {