Skip to content

Commit

Permalink
feat(duckdb): use delta_scan instead of reading pyarrow datasets (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored Jul 13, 2024
1 parent 6e0b5f5 commit 0ff595e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 26 deletions.
43 changes: 22 additions & 21 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,23 +885,20 @@ def read_in_memory(
return self.table(table_name)

def read_delta(
self,
source_table: str,
table_name: str | None = None,
**kwargs: Any,
self, source_table: str, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
"""Register a Delta Lake table as a table in the current database.
Parameters
----------
source_table
The data source. Must be a directory
containing a Delta Lake table.
The data source. Must be a directory containing a Delta Lake table.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
**kwargs
Additional keyword arguments passed to deltalake.DeltaTable.
An optional name to use for the created table. This defaults to a
generated name.
kwargs
Additional keyword arguments passed to the `delta` extension's
`delta_scan` function.
Returns
-------
Expand All @@ -913,21 +910,25 @@ def read_delta(

table_name = table_name or util.gen_name("read_delta")

try:
from deltalake import DeltaTable
except ImportError:
raise ImportError(
"The deltalake extra is required to use the "
"read_delta method. You can install it using pip:\n\n"
"pip install 'ibis-framework[deltalake]'\n"
)
# always try to load the delta extension
extensions = ["delta"]

# delta handles s3 itself, not with httpfs
if source_table.startswith(("http://", "https://")):
extensions.append("httpfs")

delta_table = DeltaTable(source_table, **kwargs)
self._load_extensions(extensions)

return self.read_in_memory(
delta_table.to_pyarrow_dataset(), table_name=table_name
options = [
sg.to_identifier(key).eq(sge.convert(val)) for key, val in kwargs.items()
]
self._create_temp_view(
table_name,
sg.select(STAR).from_(self.compiler.f.delta_scan(source_table, *options)),
)

return self.table(table_name)

def list_tables(
self,
like: str | None = None,
Expand Down
9 changes: 4 additions & 5 deletions ibis/backends/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def test_to_pyarrow_decimal(backend, dtype, pyarrow_dtype):
)
@pytest.mark.notyet(["clickhouse"], raises=Exception)
@pytest.mark.notyet(["mssql", "pandas"], raises=PyDeltaTableError)
def test_roundtrip_delta(backend, con, alltypes, tmp_path, monkeypatch):
def test_roundtrip_delta(backend, con, alltypes, tmp_path):
if con.name == "pyspark":
pytest.importorskip("delta")
else:
Expand All @@ -416,11 +416,10 @@ def test_roundtrip_delta(backend, con, alltypes, tmp_path, monkeypatch):
path = tmp_path / "test.delta"
t.to_delta(path)

monkeypatch.setattr(ibis.options, "default_backend", con)
dt = ibis.read_delta(path)
result = dt.to_pandas()
dt = con.read_delta(path)
result = con.to_pandas(dt)

backend.assert_frame_equal(result, expected)
backend.assert_frame_equal(result, expected, check_dtype=con.name != "duckdb")


@pytest.mark.notimpl(
Expand Down

0 comments on commit 0ff595e

Please sign in to comment.