Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(duckdb): add catalog support to create_table #9147

Merged
merged 1 commit into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from ibis.backends.duckdb.compiler import DuckDBCompiler
from ibis.backends.duckdb.converter import DuckDBPandasData
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compiler import STAR, C, F
from ibis.backends.sql.compiler import STAR, C
from ibis.expr.operations.udf import InputType

if TYPE_CHECKING:
Expand Down Expand Up @@ -154,20 +154,39 @@ def create_table(
database
The name of the database in which to create the table; if not
passed, the current database is used.

For multi-level table hierarchies, you can pass in a dotted string
path like `"catalog.database"` or a tuple of strings like
`("catalog", "database")`.
temp
Create a temporary table
overwrite
If `True`, replace the table if it already exists, otherwise fail
if the table exists

"""
table_loc = self._to_sqlglot_table(database)

if getattr(table_loc, "catalog", False) and temp:
raise exc.UnsupportedArgumentError(
"DuckDB can only create temporary tables in the `temp` catalog. "
"Don't specify a catalog to enable temp table creation."
)

catalog = self.current_catalog
database = self.current_database
if table_loc is not None:
catalog = table_loc.catalog or catalog
database = table_loc.db or database

if obj is None and schema is None:
raise ValueError("Either `obj` or `schema` must be specified")

properties = []

if temp:
properties.append(sge.TemporaryProperty())
catalog = "temp"

temp_memtable_view = None

Expand Down Expand Up @@ -202,8 +221,10 @@ def create_table(
else:
temp_name = name

initial_table = sg.table(
temp_name, catalog=database, quoted=self.compiler.quoted
initial_table = sge.Table(
this=sg.to_identifier(temp_name, quoted=self.compiler.quoted),
catalog=catalog,
db=database,
)
target = sge.Schema(this=initial_table, expressions=column_defs)

Expand All @@ -214,7 +235,11 @@ def create_table(
)

# This is the same table as initial_table unless overwrite == True
final_table = sg.table(name, catalog=database, quoted=self.compiler.quoted)
final_table = sge.Table(
this=sg.to_identifier(name, quoted=self.compiler.quoted),
catalog=catalog,
db=database,
)
with self._safe_raw_sql(create_stmt) as cur:
if query is not None:
insert_stmt = sge.insert(query, into=initial_table).sql(self.name)
Expand Down Expand Up @@ -254,7 +279,7 @@ def create_table(
if temp_memtable_view is not None:
self.con.unregister(temp_memtable_view)

return self.table(name, database=database)
return self.table(name, database=(catalog, database))

def _load_into_cache(self, name, expr):
self.create_table(name, expr, schema=expr.schema(), temp=True)
Expand Down Expand Up @@ -971,8 +996,8 @@ def list_tables(
"""
table_loc = self._warn_and_create_table_loc(database, schema)

catalog = F.current_database()
database = F.current_schema()
catalog = self.current_catalog
database = self.current_database
if table_loc is not None:
catalog = table_loc.catalog or catalog
database = table_loc.db or database
Expand Down
73 changes: 73 additions & 0 deletions ibis/backends/duckdb/tests/test_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from __future__ import annotations

import pandas as pd
import pandas.testing as tm
import pytest

import ibis
import ibis.common.exceptions as exc


@pytest.fixture(scope="session")
def external_duckdb_file(tmpdir_factory): # pragma: no cover
ddb_path = str(tmpdir_factory.mktemp("data") / "starwars.ddb")
con = ibis.duckdb.connect(ddb_path)

starwars_df = pd.DataFrame(
{
"name": ["Luke Skywalker", "C-3PO", "R2-D2"],
"height": [172, 167, 96],
"mass": [77.0, 75.0, 32.0],
}
)
con.create_table("starwars", obj=starwars_df)
con.disconnect()

return ddb_path, starwars_df


def test_read_write_external_catalog(con, external_duckdb_file, monkeypatch):
monkeypatch.setattr(ibis.options, "default_backend", con)

ddb_path, starwars_df = external_duckdb_file
con.attach(ddb_path, name="ext")

# Read from catalog
assert "ext" in con.list_catalogs()
assert "main" in con.list_databases(catalog="ext")

assert "starwars" in con.list_tables(database="ext.main")
assert "starwars" not in con.list_tables()

starwars = con.table("starwars", database="ext.main")
tm.assert_frame_equal(starwars.to_pandas(), starwars_df)

# Write to catalog
t = ibis.memtable([{"a": 1, "b": "foo"}, {"a": 2, "b": "baz"}])

_ = con.create_table("t2", obj=t, database="ext.main")

assert "t2" in con.list_tables(database="ext.main")
assert "t2" not in con.list_tables()

table = con.table("t2", database="ext.main")

tm.assert_frame_equal(t.to_pandas(), table.to_pandas())

# Overwrite table in catalog

t_overwrite = ibis.memtable([{"a": 8, "b": "bing"}, {"a": 9, "b": "bong"}])

_ = con.create_table("t2", obj=t_overwrite, database="ext.main", overwrite=True)

assert "t2" in con.list_tables(database="ext.main")
assert "t2" not in con.list_tables()

table = con.table("t2", database="ext.main")

tm.assert_frame_equal(t_overwrite.to_pandas(), table.to_pandas())
Comment on lines +29 to +68
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to break this up into separate tests, but there are locking issues when attaching to the same external source -- I'm sure we can work around it, but I think this is good enough for now.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could design the tests to run in isolation by creating and tearing down separate external sources for each test case, though this might increase test execution time.



def test_raise_if_catalog_and_temp(con):
with pytest.raises(exc.UnsupportedArgumentError):
con.create_table("some_table", obj="hi", temp=True, database="ext.main")
Loading