From b7b572bc97825b369ee106baea6b5ff5b62066c8 Mon Sep 17 00:00:00 2001 From: Giacomo Rebecchi <98255907+giacomorebecchi@users.noreply.github.com> Date: Mon, 17 Jun 2024 16:27:09 +0200 Subject: [PATCH] feat(#2597): allow pyarrow.dataset.Expression in filters kwarg (#2600) # Description Allow `pyarrow.dataset.Expression` in `filters` kwarg of `deltalake.DeltaTable.to_pyarrow_table` and `deltalake.DeltaTable.to_pandas` methods, and remove code dupllication in favor of the pyarrow implementation. # Related Issue(s) - closes #2597 --- python/deltalake/table.py | 128 +++---------------------------- python/stubs/pyarrow/parquet.pyi | 8 ++ python/tests/test_table_read.py | 8 +- 3 files changed, 25 insertions(+), 119 deletions(-) create mode 100644 python/stubs/pyarrow/parquet.pyi diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 647a459438..ee4e45c171 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1,10 +1,8 @@ import json -import operator import warnings from dataclasses import dataclass from datetime import datetime, timedelta, timezone from enum import Enum -from functools import reduce from pathlib import Path from typing import ( TYPE_CHECKING, @@ -19,7 +17,6 @@ Optional, Tuple, Union, - cast, ) import pyarrow @@ -34,6 +31,11 @@ ParquetReadOptions, ) +try: + from pyarrow.parquet import filters_to_expression # pyarrow >= 10.0.0 +except ImportError: + from pyarrow.parquet import _filters_to_expression as filters_to_expression + if TYPE_CHECKING: import os @@ -260,116 +262,6 @@ class ProtocolVersions(NamedTuple): FilterType = Union[FilterConjunctionType, FilterDNFType] -def _check_contains_null(value: Any) -> bool: - """ - Check if target contains nullish value. - """ - if isinstance(value, bytes): - for byte in value: - if isinstance(byte, bytes): - compare_to = chr(0) - else: - compare_to = 0 - if byte == compare_to: - return True - elif isinstance(value, str): - return "\x00" in value - return False - - -def _check_dnf( - dnf: FilterDNFType, - check_null_strings: bool = True, -) -> FilterDNFType: - """ - Check if DNF are well-formed. - """ - if len(dnf) == 0 or any(len(c) == 0 for c in dnf): - raise ValueError("Malformed DNF") - if check_null_strings: - for conjunction in dnf: - for col, op, val in conjunction: - if ( - isinstance(val, list) - and all(_check_contains_null(v) for v in val) - or _check_contains_null(val) - ): - raise NotImplementedError( - "Null-terminated binary strings are not supported " - "as filter values." - ) - return dnf - - -def _convert_single_predicate(column: str, op: str, value: Any) -> Expression: - """ - Convert given `tuple` to [pyarrow.dataset.Expression]. - """ - import pyarrow.dataset as ds - - field = ds.field(column) - if op == "=" or op == "==": - return field == value - elif op == "!=": - return field != value - elif op == "<": - return field < value - elif op == ">": - return field > value - elif op == "<=": - return field <= value - elif op == ">=": - return field >= value - elif op == "in": - return field.isin(value) - elif op == "not in": - return ~field.isin(value) - else: - raise ValueError( - f'"{(column, op, value)}" is not a valid operator in predicates.' - ) - - -def _filters_to_expression(filters: FilterType) -> Expression: - """ - Check if filters are well-formed and convert to an [pyarrow.dataset.Expression]. - """ - if isinstance(filters[0][0], str): - # We have encountered the situation where we have one nesting level too few: - # We have [(,,), ..] instead of [[(,,), ..]] - dnf = cast(FilterDNFType, [filters]) - else: - dnf = cast(FilterDNFType, filters) - dnf = _check_dnf(dnf, check_null_strings=False) - disjunction_members = [] - for conjunction in dnf: - conjunction_members = [ - _convert_single_predicate(col, op, val) for col, op, val in conjunction - ] - disjunction_members.append(reduce(operator.and_, conjunction_members)) - return reduce(operator.or_, disjunction_members) - - -_DNF_filter_doc = """ -Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...]. -DNF allows arbitrary boolean logical combinations of single partition predicates. -The innermost tuples each describe a single partition predicate. The list of inner -predicates is interpreted as a conjunction (AND), forming a more selective and -multiple partition predicates. Each tuple has format: (key, op, value) and compares -the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If -the op is in or not in, the value must be a collection such as a list, a set or a tuple. -The supported type for value is str. Use empty string `''` for Null partition value. - -Example: - ``` - ("x", "=", "a") - ("x", "!=", "a") - ("y", "in", ["a", "b", "c"]) - ("z", "not in", ["a","b"]) - ``` -""" - - @dataclass(init=False) class DeltaTable: """Represents a Delta Table""" @@ -1145,7 +1037,7 @@ def to_pyarrow_table( partitions: Optional[List[Tuple[str, str, Any]]] = None, columns: Optional[List[str]] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, - filters: Optional[FilterType] = None, + filters: Optional[Union[FilterType, Expression]] = None, ) -> pyarrow.Table: """ Build a PyArrow Table using data from the DeltaTable. @@ -1154,10 +1046,10 @@ def to_pyarrow_table( partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax columns: The columns to project. This can be a list of column names to include (order and duplicates will be preserved) filesystem: A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem - filters: A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass ``partitions`` + filters: A disjunctive normal form (DNF) predicate for filtering rows, or directly a pyarrow.dataset.Expression. If you pass a filter you do not need to pass ``partitions`` """ if filters is not None: - filters = _filters_to_expression(filters) + filters = filters_to_expression(filters) return self.to_pyarrow_dataset( partitions=partitions, filesystem=filesystem ).to_table(columns=columns, filter=filters) @@ -1167,7 +1059,7 @@ def to_pandas( partitions: Optional[List[Tuple[str, str, Any]]] = None, columns: Optional[List[str]] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, - filters: Optional[FilterType] = None, + filters: Optional[Union[FilterType, Expression]] = None, ) -> "pd.DataFrame": """ Build a pandas dataframe using data from the DeltaTable. @@ -1176,7 +1068,7 @@ def to_pandas( partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax columns: The columns to project. This can be a list of column names to include (order and duplicates will be preserved) filesystem: A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem - filters: A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass ``partitions`` + filters: A disjunctive normal form (DNF) predicate for filtering rows, or directly a pyarrow.dataset.Expression. If you pass a filter you do not need to pass ``partitions`` """ return self.to_pyarrow_table( partitions=partitions, diff --git a/python/stubs/pyarrow/parquet.pyi b/python/stubs/pyarrow/parquet.pyi new file mode 100644 index 0000000000..26db6a2fa2 --- /dev/null +++ b/python/stubs/pyarrow/parquet.pyi @@ -0,0 +1,8 @@ +from typing import Callable + +from pyarrow.dataset import Expression + +from deltalake.table import FilterType + +filters_to_expression: Callable[[FilterType], Expression] +_filters_to_expression: Callable[[FilterType], Expression] diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 5e2e4f167b..efe2385b6c 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -507,7 +507,11 @@ def test_delta_table_with_filters(): filter_expr = ds.field("date") > "2021-02-20" data = dataset.to_table(filter=filter_expr) - assert len(dt.to_pandas(filters=[("date", ">", "2021-02-20")])) == data.num_rows + assert ( + len(dt.to_pandas(filters=[("date", ">", "2021-02-20")])) + == len(dt.to_pandas(filters=filter_expr)) + == data.num_rows + ) filter_expr = (ds.field("date") > "2021-02-20") | ( ds.field("state").isin(["Alabama", "Wyoming"]) @@ -522,6 +526,7 @@ def test_delta_table_with_filters(): ] ) ) + == len(dt.to_pandas(filters=filter_expr)) == data.num_rows ) @@ -538,6 +543,7 @@ def test_delta_table_with_filters(): ] ) ) + == len(dt.to_pandas(filters=filter_expr)) == data.num_rows )