Skip to content

Commit

Permalink
feat: public method to get partitions for DeltaTable (delta-io#2671)
Browse files Browse the repository at this point in the history
This adds a public method `partitions()` to the `DeltaTable` class
to get properly formatted partitions (list of dicts) for the table.
Also provides an option to return partitions as a list of tuples,
and proxies the partition filters to rust `get_active_partitions()`.

This also adds supporting tests for this feature.
  • Loading branch information
omkar-foss committed Aug 22, 2024
1 parent 1f45881 commit 298cd90
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 0 deletions.
24 changes: 24 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,30 @@ def version(self) -> int:
"""
return self._table.version()

def partitions(
self,
partition_filters: Optional[List[Tuple[str, str, Any]]] = None,
as_tuple_list: bool = False,
) -> List[Dict[str, str]] | List[Tuple[str]]:
"""
Returns the partitions as a list of dicts. Example: `[{'month': '1', 'year': '2020', 'day': '1'}, ...]`
Args:
partition_filters: The partition filters that will be used for getting the matched partitions, defaults to `None` (no filtering).
as_tuple_list: If `True`, returns the partitions as a list of tuples. Example: `[(("day", "5"), ("month", "4"), ("year", "2021")), ...]`
"""

partitions: List[Any] = []
for partition in self._table.get_active_partitions(partition_filters):
if not partition:
continue
if as_tuple_list:
sorted_partition = sorted(tuple(partition), key=lambda x: x[0])
partitions.append(tuple(sorted_partition))
else:
partitions.append({k: v for (k, v) in partition})
return partitions

def files(
self, partition_filters: Optional[List[Tuple[str, str, Any]]] = None
) -> List[str]:
Expand Down
72 changes: 72 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,78 @@ def test_encode_partition_value(input_value: Any, expected: str) -> None:
assert encode_partition_value(input_value) == expected


def test_partitions_partitioned_table():
table_path = "../crates/test/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)
expected = [
{"year": "2020", "month": "2", "day": "5"},
{"year": "2021", "month": "12", "day": "4"},
{"year": "2020", "month": "2", "day": "3"},
{"year": "2021", "month": "4", "day": "5"},
{"year": "2020", "month": "1", "day": "1"},
{"year": "2021", "month": "12", "day": "20"},
]
actual = dt.partitions()
for partition in expected:
assert partition in actual


def test_partitions_tuples_partitioned_table():
table_path = "../crates/test/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)
expected = [
(("day", "5"), ("month", "2"), ("year", "2020")),
(("day", "1"), ("month", "1"), ("year", "2020")),
(("day", "5"), ("month", "4"), ("year", "2021")),
(("day", "3"), ("month", "2"), ("year", "2020")),
(("day", "20"), ("month", "12"), ("year", "2021")),
(("day", "4"), ("month", "12"), ("year", "2021")),
]
actual = dt.partitions(as_tuple_list=True)
assert len(expected) == len(actual)
for partition in expected:
partition in actual


def test_partitions_filtering_partitioned_table():
table_path = "../crates/test/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)
expected = [
(("day", "5"), ("month", "4"), ("year", "2021")),
(("day", "20"), ("month", "12"), ("year", "2021")),
(("day", "4"), ("month", "12"), ("year", "2021")),
]
partition_filters = [("year", ">=", "2021")]
actual = dt.partitions(partition_filters=partition_filters, as_tuple_list=True)
assert len(expected) == len(actual)
for partition in expected:
partition in actual


def test_partitions_special_partitioned_table():
table_path = "../crates/test/tests/data/delta-0.8.0-special-partition"
dt = DeltaTable(table_path)

# Partitions as list of dicts (default).
expected_dict = [{"x": "A/A"}, {"x": "B B"}]
actual_dict = dt.partitions()
for partition in expected_dict:
partition in actual_dict

# Partitions as list of tuples.
expected_tuple = [[("x", "B B")], [("x", "A/A")]]
actual_tuple = dt.partitions(as_tuple_list=True)
assert len(expected_tuple) == len(actual_tuple)
for partition in expected_tuple:
partition in actual_tuple


def test_partitions_unpartitioned_table():
table_path = "../crates/test/tests/data/simple_table"
dt = DeltaTable(table_path)
assert len(dt.partitions()) == 0


def test_read_table_last_checkpoint_not_updated():
dt = DeltaTable("../crates/test/tests/data/table_failed_last_checkpoint_update")

Expand Down

0 comments on commit 298cd90

Please sign in to comment.