Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filesystem argument for reading DeltaTable in Python binding #414

Merged
merged 3 commits into from
Aug 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pyarrow
from pyarrow.dataset import dataset, partitioning
from pyarrow.fs import FileSystem

if TYPE_CHECKING:
import pandas
Expand Down Expand Up @@ -204,15 +205,16 @@ def pyarrow_schema(self) -> pyarrow.Schema:
return pyarrow_schema_from_json(self._table.arrow_schema_json())

def to_pyarrow_dataset(
self, partitions: Optional[List[Tuple[str, str, Any]]] = None
self, partitions: Optional[List[Tuple[str, str, Any]]] = None, filesystem: Optional[FileSystem] = None
) -> pyarrow.dataset.Dataset:
"""
Build a PyArrow Dataset using data from the DeltaTable.

:param partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
:param filesystem: A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
:return: the PyArrow dataset in PyArrow
"""
if partitions is None:
if not partitions:
file_paths = self._table.file_uris()
else:
file_paths = self._table.files_by_partitions(partitions)
Expand All @@ -233,56 +235,63 @@ def to_pyarrow_dataset(
# for non-AWS S3 like resources. This is a slight hack until such a
# point when pyarrow learns about AWS_ENDPOINT_URL
endpoint_url = os.environ.get("AWS_ENDPOINT_URL")
if endpoint_url is not None:
if endpoint_url:
endpoint = urlparse(endpoint_url)
# This format specific to the URL schema inference done inside
# of pyarrow, consult their tests/dataset.py for examples
query_str += (
f"?scheme={endpoint.scheme}&endpoint_override={endpoint.netloc}"
)
if not filesystem:
filesystem = f"{paths[0].scheme}://{paths[0].netloc}{query_str}"

keys = [curr_file.path for curr_file in paths]
return dataset(
keys,
schema=self.pyarrow_schema(),
filesystem=f"{paths[0].scheme}://{paths[0].netloc}{query_str}",
filesystem=filesystem,
partitioning=partitioning(flavor="hive"),
)
else:
return dataset(
file_paths,
schema=self.pyarrow_schema(),
format="parquet",
filesystem=filesystem,
partitioning=partitioning(flavor="hive"),
)

def to_pyarrow_table(
self,
partitions: Optional[List[Tuple[str, str, Any]]] = None,
columns: Optional[List[str]] = None,
filesystem: Optional[FileSystem] = None
) -> pyarrow.Table:
"""
Build a PyArrow Table using data from the DeltaTable.

:param partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
:param columns: The columns to project. This can be a list of column names to include (order and duplicates will be preserved)
:param filesystem: A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
:return: the PyArrow table
"""
return self.to_pyarrow_dataset(partitions).to_table(columns=columns)
return self.to_pyarrow_dataset(partitions=partitions, filesystem=filesystem).to_table(columns=columns)

def to_pandas(
self,
partitions: Optional[List[Tuple[str, str, Any]]] = None,
columns: Optional[List[str]] = None,
filesystem: Optional[FileSystem] = None
) -> "pandas.DataFrame":
"""
Build a pandas dataframe using data from the DeltaTable.

:param partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
:param columns: The columns to project. This can be a list of column names to include (order and duplicates will be preserved)
:param filesystem: A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
:return: a pandas dataframe
"""
return self.to_pyarrow_table(partitions, columns).to_pandas()
return self.to_pyarrow_table(partitions=partitions, columns=columns, filesystem=filesystem).to_pandas()

def update_incremental(self) -> None:
"""
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def get_release_version() -> str:
("py:class", "pyarrow.lib.Table"),
("py:class", "pyarrow.lib.DataType"),
("py:class", "pyarrow.lib.Field"),
("py:class", "pyarrow._fs.FileSystem"),
("py:class", "RawDeltaTable"),
("py:class", "pandas.DataFrame"),
]
Expand Down
3 changes: 3 additions & 0 deletions python/stubs/pyarrow/fs.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typing import Any

FileSystem: Any
40 changes: 24 additions & 16 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pandas as pd
import pytest
from pyarrow.fs import LocalFileSystem

from deltalake import DeltaTable, Metadata

Expand Down Expand Up @@ -48,20 +49,20 @@ def test_load_with_datetime_bad_format():
with pytest.raises(Exception) as exception:
dt.load_with_datetime("2020-05-01T00:47:31")
assert (
str(exception.value)
== "Parse date and time string failed: premature end of input"
str(exception.value)
== "Parse date and time string failed: premature end of input"
)
with pytest.raises(Exception) as exception:
dt.load_with_datetime("2020-05-01 00:47:31")
assert (
str(exception.value)
== "Parse date and time string failed: input contains invalid characters"
str(exception.value)
== "Parse date and time string failed: input contains invalid characters"
)
with pytest.raises(Exception) as exception:
dt.load_with_datetime("2020-05-01T00:47:31+08")
assert (
str(exception.value)
== "Parse date and time string failed: premature end of input"
str(exception.value)
== "Parse date and time string failed: premature end of input"
)


Expand Down Expand Up @@ -115,8 +116,8 @@ def test_read_table_with_column_subset():
"day": ["1", "3", "5", "20", "20", "4", "5"],
}
assert (
dt.to_pyarrow_dataset().to_table(columns=["value", "day"]).to_pydict()
== expected
dt.to_pyarrow_dataset().to_table(columns=["value", "day"]).to_pydict()
== expected
)


Expand All @@ -142,8 +143,8 @@ def test_vacuum_dry_run_simple_table():
with pytest.raises(Exception) as exception:
dt.vacuum(retention_periods)
assert (
str(exception.value)
== "Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)"
str(exception.value)
== "Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)"
)


Expand Down Expand Up @@ -195,24 +196,24 @@ def test_get_files_partitioned_table():
with pytest.raises(Exception) as exception:
dt.files_by_partitions(partition_filters=partition_filters)
assert (
str(exception.value)
== 'Invalid partition filter found: ("invalid_operation", "=>", "3").'
str(exception.value)
== 'Invalid partition filter found: ("invalid_operation", "=>", "3").'
)

partition_filters = [("invalid_operation", "=", ["3", "20"])]
with pytest.raises(Exception) as exception:
dt.files_by_partitions(partition_filters=partition_filters)
assert (
str(exception.value)
== 'Invalid partition filter found: ("invalid_operation", "=", ["3", "20"]).'
str(exception.value)
== 'Invalid partition filter found: ("invalid_operation", "=", ["3", "20"]).'
)

partition_filters = [("day", "=", 3)]
with pytest.raises(Exception) as exception:
dt.files_by_partitions(partition_filters=partition_filters)
assert (
str(exception.value)
== "Only the type String is currently allowed inside the partition filters."
str(exception.value)
== "Only the type String is currently allowed inside the partition filters."
)

partition_filters = [("unknown", "=", "3")]
Expand All @@ -225,6 +226,13 @@ def test_delta_table_to_pandas():
assert dt.to_pandas().equals(pd.DataFrame({"id": [5, 7, 9]}))


def test_delta_table_with_filesystem():
table_path = "../rust/tests/data/simple_table"
dt = DeltaTable(table_path)
filesystem = LocalFileSystem()
assert dt.to_pandas(filesystem=filesystem).equals(pd.DataFrame({"id": [5, 7, 9]}))


class ExcPassThroughThread(Thread):
"""Wrapper around `threading.Thread` that propagates exceptions."""

Expand Down