Skip to content

Commit

Permalink
refactor(api!): refactor Python APIs for getting file list (#1032)
Browse files Browse the repository at this point in the history
# Description

This PR consolidates the four methods `files()`, `file_paths()`,
`file_uris()`, `files_by_partitions()` into just two methods:

* `files()` -> which returns paths as they are in the Delta Log (usually
relative, but *can* be absolute, particularly if they are located
outside of the delta table root).
 * `file_uris()`, which returns absolute URIs for all files.

Both of these now take the `partition_filters` parameter, making
`files_by_partitions()` obsolete. That latter function has been marked
deprecated, but it also returns it to its original behavior of returning
absolute file paths and not relative ones, resolving #894.

Finally, the `partition_filters` parameter now supports passing values
other than strings, such as integers and floats.

TODO:

 * [x] Update documentation
* [ ] ~~Test behavior of filtering for null or non-null~~ Null handling
isn't supported by DNF filters IIUC
 * [x] Test behavior of paths on object stores.

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
wjones127 authored Dec 27, 2022
1 parent 03151e7 commit 4d64a1f
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 59 deletions.
106 changes: 79 additions & 27 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ class ProtocolVersions(NamedTuple):
min_writer_version: int


_DNF_filter_doc = """
Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...].
DNF allows arbitrary boolean logical combinations of single partition predicates.
The innermost tuples each describe a single partition predicate. The list of inner
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string `''` for Null partition value.
Examples:
("x", "=", "a")
("x", "!=", "a")
("y", "in", ["a", "b", "c"])
("z", "not in", ["a","b"])
"""


@dataclass(init=False)
class DeltaTable:
"""Create a DeltaTable instance."""
Expand Down Expand Up @@ -142,19 +160,34 @@ def version(self) -> int:
"""
return self._table.version()

def files(self) -> List[str]:
"""
Get the .parquet files of the DeltaTable.
def files(
self, partition_filters: Optional[List[Tuple[str, str, Any]]] = None
) -> List[str]:

:return: list of the .parquet files referenced for the current version of the DeltaTable
"""
return self._table.files()
return self._table.files(self.__stringify_partition_values(partition_filters))

files.__doc__ = f"""
Get the .parquet files of the DeltaTable.
The paths are as they are saved in the delta log, which may either be
relative to the table root or absolute URIs.
:param partition_filters: the partition filters that will be used for
getting the matched files
:return: list of the .parquet files referenced for the current version
of the DeltaTable
{_DNF_filter_doc}
"""

def files_by_partitions(
self, partition_filters: List[Tuple[str, str, Any]]
) -> List[str]:
"""
Get the files that match a given list of partitions filters.
.. deprecated:: 0.7.0
Use :meth:`file_uris` instead.
Partitions which do not match the filter predicate will be removed from scanned data.
Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...].
DNF allows arbitrary boolean logical combinations of single partition predicates.
Expand All @@ -174,33 +207,33 @@ def files_by_partitions(
:param partition_filters: the partition filters that will be used for getting the matched files
:return: list of the .parquet files after applying the partition filters referenced for the current version of the DeltaTable.
"""
try:
return self._table.files_by_partitions(partition_filters)
except TypeError:
raise ValueError(
"Only the type String is currently allowed inside the partition filters."
)

def file_paths(self) -> List[str]:
"""
Get the list of files with an absolute path.
:return: list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable
"""
warnings.warn(
"Call to deprecated method file_paths. Please use file_uris instead.",
"Call to deprecated method files_by_partitions. Please use file_uris instead.",
category=DeprecationWarning,
stacklevel=2,
)
return self.file_uris()
return self.file_uris(partition_filters)

def file_uris(self) -> List[str]:
"""
Get the list of files with an absolute path.
def file_uris(
self, partition_filters: Optional[List[Tuple[str, str, Any]]] = None
) -> List[str]:
return self._table.file_uris(
self.__stringify_partition_values(partition_filters)
)

:return: list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable
"""
return self._table.file_uris()
file_uris.__doc__ = f"""
Get the list of files as absolute URIs, including the scheme (e.g. "s3://").
Local files will be just plain absolute paths, without a scheme. (That is,
no 'file://' prefix.)
Use the partition_filters parameter to retrieve a subset of files that match the
given filters.
:param partition_filters: the partition filters that will be used for getting the matched files
:return: list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable
{_DNF_filter_doc}
"""

def load_version(self, version: int) -> None:
"""
Expand All @@ -224,6 +257,10 @@ def load_with_datetime(self, datetime_string: str) -> None:
"""
self._table.load_with_datetime(datetime_string)

@property
def table_uri(self) -> str:
return self._table.table_uri()

def schema(self) -> Schema:
"""
Get the current schema of the DeltaTable.
Expand Down Expand Up @@ -389,3 +426,18 @@ def update_incremental(self) -> None:
newer versions.
"""
self._table.update_incremental()

def __stringify_partition_values(
self, partition_filters: Optional[List[Tuple[str, str, Any]]]
) -> Optional[List[Tuple[str, str, Union[str, List[str]]]]]:
if partition_filters is None:
return partition_filters
out = []
for field, op, value in partition_filters:
str_value: Union[str, List[str]]
if isinstance(value, (list, tuple)):
str_value = [str(val) for val in value]
else:
str_value = str(value)
out.append((field, op, str_value))
return out
55 changes: 47 additions & 8 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,43 @@ impl RawDeltaTable {
}
}

pub fn files(&self) -> PyResult<Vec<String>> {
Ok(self
._table
.get_files_iter()
.map(|f| f.to_string())
.collect())
pub fn files(
&self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
) -> PyResult<Vec<String>> {
if let Some(filters) = partition_filters {
let filters =
convert_partition_filters(filters).map_err(PyDeltaTableError::from_raw)?;
Ok(self
._table
.get_files_by_partitions(&filters)
.map_err(PyDeltaTableError::from_raw)?
.into_iter()
.map(|p| p.to_string())
.collect())
} else {
Ok(self
._table
.get_files_iter()
.map(|f| f.to_string())
.collect())
}
}

pub fn file_uris(&self) -> PyResult<Vec<String>> {
Ok(self._table.get_file_uris().collect())
pub fn file_uris(
&self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
) -> PyResult<Vec<String>> {
if let Some(filters) = partition_filters {
let filters =
convert_partition_filters(filters).map_err(PyDeltaTableError::from_raw)?;
Ok(self
._table
.get_file_uris_by_partitions(&filters)
.map_err(PyDeltaTableError::from_raw)?)
} else {
Ok(self._table.get_file_uris().collect())
}
}

#[getter]
Expand Down Expand Up @@ -403,6 +430,18 @@ impl RawDeltaTable {
}
}

fn convert_partition_filters<'a>(
partitions_filters: Vec<(&'a str, &'a str, PartitionFilterValue<'a>)>,
) -> Result<Vec<PartitionFilter<&'a str>>, deltalake::DeltaTableError> {
partitions_filters
.into_iter()
.map(|filter| match filter {
(key, op, PartitionFilterValue::Single(v)) => PartitionFilter::try_from((key, op, v)),
(key, op, PartitionFilterValue::Multiple(v)) => PartitionFilter::try_from((key, op, v)),
})
.collect()
}

fn json_value_to_py(value: &serde_json::Value, py: Python) -> PyObject {
match value {
serde_json::Value::Null => py.None(),
Expand Down
11 changes: 11 additions & 0 deletions python/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ def test_read_simple_table_from_remote(s3_localstack):
dt = DeltaTable(table_path)
assert dt.to_pyarrow_table().equals(pa.table({"id": [5, 7, 9]}))

expected_files = [
"part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet",
"part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet",
"part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet",
"part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet",
"part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet",
]

assert dt.files() == expected_files
assert dt.file_uris() == [table_path + "/" + path for path in expected_files]


@pytest.mark.s3
@pytest.mark.integration
Expand Down
61 changes: 37 additions & 24 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,68 +269,81 @@ def test_history_partitioned_table_metadata():
}


def assert_correct_files(dt: DeltaTable, partition_filters, expected_paths):
assert dt.files(partition_filters) == expected_paths
absolute_paths = [os.path.join(dt.table_uri, path) for path in expected_paths]
assert dt.file_uris(partition_filters) == absolute_paths


def test_get_files_partitioned_table():
table_path = "../rust/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)
table_path = (
Path.cwd().parent / "rust/tests/data/delta-0.8.0-partitioned"
).as_posix()

partition_filters = [("day", "=", "3")]
assert dt.files_by_partitions(partition_filters=partition_filters) == [
f"year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"
paths = [
"year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"
]
assert_correct_files(dt, partition_filters, paths)

# Also accepts integers
partition_filters = [("day", "=", 3)]
assert_correct_files(dt, partition_filters, paths)

partition_filters = [("day", "!=", "3")]
assert dt.files_by_partitions(partition_filters=partition_filters) == [
f"year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet",
f"year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet",
f"year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet",
f"year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet",
f"year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet",
paths = [
"year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet",
"year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet",
"year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet",
"year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet",
"year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet",
]
assert_correct_files(dt, partition_filters, paths)

partition_filters = [("day", "in", ["3", "20"])]
assert dt.files_by_partitions(partition_filters=partition_filters) == [
f"year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet",
f"year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet",
paths = [
"year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet",
"year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet",
]
assert_correct_files(dt, partition_filters, paths)

partition_filters = [("day", "not in", ["3", "20"])]
assert dt.files_by_partitions(partition_filters=partition_filters) == [
paths = [
f"year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet",
f"year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet",
f"year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet",
f"year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet",
]
assert_correct_files(dt, partition_filters, paths)

partition_filters = [("day", "not in", ["3", "20"]), ("year", "=", "2021")]
assert dt.files_by_partitions(partition_filters=partition_filters) == [
paths = [
f"year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet",
f"year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet",
]
assert_correct_files(dt, partition_filters, paths)

partition_filters = [("invalid_operation", "=>", "3")]
with pytest.raises(Exception) as exception:
dt.files_by_partitions(partition_filters=partition_filters)
dt.files(partition_filters)
assert (
str(exception.value)
== 'Invalid partition filter found: ("invalid_operation", "=>", "3").'
)

partition_filters = [("invalid_operation", "=", ["3", "20"])]
with pytest.raises(Exception) as exception:
dt.files_by_partitions(partition_filters=partition_filters)
dt.files(partition_filters)
assert (
str(exception.value)
== 'Invalid partition filter found: ("invalid_operation", "=", ["3", "20"]).'
)

partition_filters = [("day", "=", 3)]
with pytest.raises(Exception) as exception:
dt.files_by_partitions(partition_filters=partition_filters)
assert (
str(exception.value)
== "Only the type String is currently allowed inside the partition filters."
)

partition_filters = [("unknown", "=", "3")]
with pytest.raises(Exception) as exception:
dt.files_by_partitions(partition_filters=partition_filters)
dt.files(partition_filters)
assert (
str(exception.value)
== 'Invalid partition filter found: [PartitionFilter { key: "unknown", value: Equal("3") }].'
Expand Down

0 comments on commit 4d64a1f

Please sign in to comment.