From 4d64a1f485168ff1e21e82fcf4e6fa57c8641de5 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 27 Dec 2022 01:40:41 -0800 Subject: [PATCH] refactor(api!): refactor Python APIs for getting file list (#1032) # Description This PR consolidates the four methods `files()`, `file_paths()`, `file_uris()`, `files_by_partitions()` into just two methods: * `files()` -> which returns paths as they are in the Delta Log (usually relative, but *can* be absolute, particularly if they are located outside of the delta table root). * `file_uris()`, which returns absolute URIs for all files. Both of these now take the `partition_filters` parameter, making `files_by_partitions()` obsolete. That latter function has been marked deprecated, but it also returns it to its original behavior of returning absolute file paths and not relative ones, resolving #894. Finally, the `partition_filters` parameter now supports passing values other than strings, such as integers and floats. TODO: * [x] Update documentation * [ ] ~~Test behavior of filtering for null or non-null~~ Null handling isn't supported by DNF filters IIUC * [x] Test behavior of paths on object stores. # Related Issue(s) # Documentation --- python/deltalake/table.py | 106 ++++++++++++++++++++++++-------- python/src/lib.rs | 55 ++++++++++++++--- python/tests/test_fs.py | 11 ++++ python/tests/test_table_read.py | 61 ++++++++++-------- 4 files changed, 174 insertions(+), 59 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 35917864bb..eedf3f0825 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -76,6 +76,24 @@ class ProtocolVersions(NamedTuple): min_writer_version: int +_DNF_filter_doc = """ +Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...]. +DNF allows arbitrary boolean logical combinations of single partition predicates. +The innermost tuples each describe a single partition predicate. The list of inner +predicates is interpreted as a conjunction (AND), forming a more selective and +multiple partition predicates. Each tuple has format: (key, op, value) and compares +the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If +the op is in or not in, the value must be a collection such as a list, a set or a tuple. +The supported type for value is str. Use empty string `''` for Null partition value. + +Examples: +("x", "=", "a") +("x", "!=", "a") +("y", "in", ["a", "b", "c"]) +("z", "not in", ["a","b"]) +""" + + @dataclass(init=False) class DeltaTable: """Create a DeltaTable instance.""" @@ -142,19 +160,34 @@ def version(self) -> int: """ return self._table.version() - def files(self) -> List[str]: - """ - Get the .parquet files of the DeltaTable. + def files( + self, partition_filters: Optional[List[Tuple[str, str, Any]]] = None + ) -> List[str]: - :return: list of the .parquet files referenced for the current version of the DeltaTable - """ - return self._table.files() + return self._table.files(self.__stringify_partition_values(partition_filters)) + + files.__doc__ = f""" +Get the .parquet files of the DeltaTable. + +The paths are as they are saved in the delta log, which may either be +relative to the table root or absolute URIs. + +:param partition_filters: the partition filters that will be used for + getting the matched files +:return: list of the .parquet files referenced for the current version + of the DeltaTable +{_DNF_filter_doc} + """ def files_by_partitions( self, partition_filters: List[Tuple[str, str, Any]] ) -> List[str]: """ Get the files that match a given list of partitions filters. + + .. deprecated:: 0.7.0 + Use :meth:`file_uris` instead. + Partitions which do not match the filter predicate will be removed from scanned data. Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...]. DNF allows arbitrary boolean logical combinations of single partition predicates. @@ -174,33 +207,33 @@ def files_by_partitions( :param partition_filters: the partition filters that will be used for getting the matched files :return: list of the .parquet files after applying the partition filters referenced for the current version of the DeltaTable. """ - try: - return self._table.files_by_partitions(partition_filters) - except TypeError: - raise ValueError( - "Only the type String is currently allowed inside the partition filters." - ) - - def file_paths(self) -> List[str]: - """ - Get the list of files with an absolute path. - - :return: list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable - """ warnings.warn( - "Call to deprecated method file_paths. Please use file_uris instead.", + "Call to deprecated method files_by_partitions. Please use file_uris instead.", category=DeprecationWarning, stacklevel=2, ) - return self.file_uris() + return self.file_uris(partition_filters) - def file_uris(self) -> List[str]: - """ - Get the list of files with an absolute path. + def file_uris( + self, partition_filters: Optional[List[Tuple[str, str, Any]]] = None + ) -> List[str]: + return self._table.file_uris( + self.__stringify_partition_values(partition_filters) + ) - :return: list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable - """ - return self._table.file_uris() + file_uris.__doc__ = f""" +Get the list of files as absolute URIs, including the scheme (e.g. "s3://"). + +Local files will be just plain absolute paths, without a scheme. (That is, +no 'file://' prefix.) + +Use the partition_filters parameter to retrieve a subset of files that match the +given filters. + +:param partition_filters: the partition filters that will be used for getting the matched files +:return: list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable +{_DNF_filter_doc} + """ def load_version(self, version: int) -> None: """ @@ -224,6 +257,10 @@ def load_with_datetime(self, datetime_string: str) -> None: """ self._table.load_with_datetime(datetime_string) + @property + def table_uri(self) -> str: + return self._table.table_uri() + def schema(self) -> Schema: """ Get the current schema of the DeltaTable. @@ -389,3 +426,18 @@ def update_incremental(self) -> None: newer versions. """ self._table.update_incremental() + + def __stringify_partition_values( + self, partition_filters: Optional[List[Tuple[str, str, Any]]] + ) -> Optional[List[Tuple[str, str, Union[str, List[str]]]]]: + if partition_filters is None: + return partition_filters + out = [] + for field, op, value in partition_filters: + str_value: Union[str, List[str]] + if isinstance(value, (list, tuple)): + str_value = [str(val) for val in value] + else: + str_value = str(value) + out.append((field, op, str_value)) + return out diff --git a/python/src/lib.rs b/python/src/lib.rs index 794cab92dc..2cff068c9e 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -223,16 +223,43 @@ impl RawDeltaTable { } } - pub fn files(&self) -> PyResult> { - Ok(self - ._table - .get_files_iter() - .map(|f| f.to_string()) - .collect()) + pub fn files( + &self, + partition_filters: Option>, + ) -> PyResult> { + if let Some(filters) = partition_filters { + let filters = + convert_partition_filters(filters).map_err(PyDeltaTableError::from_raw)?; + Ok(self + ._table + .get_files_by_partitions(&filters) + .map_err(PyDeltaTableError::from_raw)? + .into_iter() + .map(|p| p.to_string()) + .collect()) + } else { + Ok(self + ._table + .get_files_iter() + .map(|f| f.to_string()) + .collect()) + } } - pub fn file_uris(&self) -> PyResult> { - Ok(self._table.get_file_uris().collect()) + pub fn file_uris( + &self, + partition_filters: Option>, + ) -> PyResult> { + if let Some(filters) = partition_filters { + let filters = + convert_partition_filters(filters).map_err(PyDeltaTableError::from_raw)?; + Ok(self + ._table + .get_file_uris_by_partitions(&filters) + .map_err(PyDeltaTableError::from_raw)?) + } else { + Ok(self._table.get_file_uris().collect()) + } } #[getter] @@ -403,6 +430,18 @@ impl RawDeltaTable { } } +fn convert_partition_filters<'a>( + partitions_filters: Vec<(&'a str, &'a str, PartitionFilterValue<'a>)>, +) -> Result>, deltalake::DeltaTableError> { + partitions_filters + .into_iter() + .map(|filter| match filter { + (key, op, PartitionFilterValue::Single(v)) => PartitionFilter::try_from((key, op, v)), + (key, op, PartitionFilterValue::Multiple(v)) => PartitionFilter::try_from((key, op, v)), + }) + .collect() +} + fn json_value_to_py(value: &serde_json::Value, py: Python) -> PyObject { match value { serde_json::Value::Null => py.None(), diff --git a/python/tests/test_fs.py b/python/tests/test_fs.py index 17e6125da4..9418ff3862 100644 --- a/python/tests/test_fs.py +++ b/python/tests/test_fs.py @@ -60,6 +60,17 @@ def test_read_simple_table_from_remote(s3_localstack): dt = DeltaTable(table_path) assert dt.to_pyarrow_table().equals(pa.table({"id": [5, 7, 9]})) + expected_files = [ + "part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet", + "part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet", + "part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet", + "part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet", + "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet", + ] + + assert dt.files() == expected_files + assert dt.file_uris() == [table_path + "/" + path for path in expected_files] + @pytest.mark.s3 @pytest.mark.integration diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 2c8b085a6b..bbeca64a7d 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -269,44 +269,65 @@ def test_history_partitioned_table_metadata(): } +def assert_correct_files(dt: DeltaTable, partition_filters, expected_paths): + assert dt.files(partition_filters) == expected_paths + absolute_paths = [os.path.join(dt.table_uri, path) for path in expected_paths] + assert dt.file_uris(partition_filters) == absolute_paths + + def test_get_files_partitioned_table(): table_path = "../rust/tests/data/delta-0.8.0-partitioned" dt = DeltaTable(table_path) table_path = ( Path.cwd().parent / "rust/tests/data/delta-0.8.0-partitioned" ).as_posix() + partition_filters = [("day", "=", "3")] - assert dt.files_by_partitions(partition_filters=partition_filters) == [ - f"year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet" + paths = [ + "year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet" ] + assert_correct_files(dt, partition_filters, paths) + + # Also accepts integers + partition_filters = [("day", "=", 3)] + assert_correct_files(dt, partition_filters, paths) + partition_filters = [("day", "!=", "3")] - assert dt.files_by_partitions(partition_filters=partition_filters) == [ - f"year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet", - f"year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet", - f"year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet", - f"year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet", - f"year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet", + paths = [ + "year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet", + "year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet", + "year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet", + "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet", + "year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet", ] + assert_correct_files(dt, partition_filters, paths) + partition_filters = [("day", "in", ["3", "20"])] - assert dt.files_by_partitions(partition_filters=partition_filters) == [ - f"year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet", - f"year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet", + paths = [ + "year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet", + "year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet", ] + assert_correct_files(dt, partition_filters, paths) + partition_filters = [("day", "not in", ["3", "20"])] - assert dt.files_by_partitions(partition_filters=partition_filters) == [ + paths = [ f"year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet", f"year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet", f"year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet", f"year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet", ] + assert_correct_files(dt, partition_filters, paths) + partition_filters = [("day", "not in", ["3", "20"]), ("year", "=", "2021")] - assert dt.files_by_partitions(partition_filters=partition_filters) == [ + paths = [ f"year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet", f"year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet", ] + assert_correct_files(dt, partition_filters, paths) + partition_filters = [("invalid_operation", "=>", "3")] with pytest.raises(Exception) as exception: - dt.files_by_partitions(partition_filters=partition_filters) + dt.files(partition_filters) assert ( str(exception.value) == 'Invalid partition filter found: ("invalid_operation", "=>", "3").' @@ -314,23 +335,15 @@ def test_get_files_partitioned_table(): partition_filters = [("invalid_operation", "=", ["3", "20"])] with pytest.raises(Exception) as exception: - dt.files_by_partitions(partition_filters=partition_filters) + dt.files(partition_filters) assert ( str(exception.value) == 'Invalid partition filter found: ("invalid_operation", "=", ["3", "20"]).' ) - partition_filters = [("day", "=", 3)] - with pytest.raises(Exception) as exception: - dt.files_by_partitions(partition_filters=partition_filters) - assert ( - str(exception.value) - == "Only the type String is currently allowed inside the partition filters." - ) - partition_filters = [("unknown", "=", "3")] with pytest.raises(Exception) as exception: - dt.files_by_partitions(partition_filters=partition_filters) + dt.files(partition_filters) assert ( str(exception.value) == 'Invalid partition filter found: [PartitionFilter { key: "unknown", value: Equal("3") }].'