Skip to content

Commit

Permalink
feat(snowflake): create an ibis backend from a snowpark session (#8962)
Browse files Browse the repository at this point in the history
Add a `from_snowpark` method to Snowflake backend. cc @IndexSeek.
  • Loading branch information
cpcloud authored Apr 15, 2024
1 parent 964ac3e commit f15d033
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 9 deletions.
17 changes: 16 additions & 1 deletion .github/workflows/ibis-backends-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
# only a single bigquery or snowflake run at a time, otherwise test data is
# clobbered by concurrent runs
concurrency:
group: ${{ matrix.backend.name }}-${{ matrix.python-version }}-${{ github.event.label.name || 'ci-run-cloud' }}
group: ${{ matrix.backend.title }}-${{ matrix.python-version }}-${{ github.event.label.name || 'ci-run-cloud' }}
cancel-in-progress: false

runs-on: ubuntu-latest
Expand Down Expand Up @@ -61,6 +61,13 @@ jobs:
extras:
- bigquery
- geospatial
- python-version: "3.11"
backend:
name: snowflake
title: Snowflake + Snowpark
key: snowflake-snowpark
extras:
- snowflake
steps:
- name: checkout
uses: actions/checkout@v4
Expand Down Expand Up @@ -95,6 +102,10 @@ jobs:
- name: install poetry
run: pip install 'poetry==1.8.2'

- name: install additional deps
if: matrix.backend.key == 'snowflake-snowpark'
run: poetry add snowflake-snowpark-python --optional --python="${{ matrix.python-version }}"

- name: install ibis
run: poetry install --without dev --without docs --extras "${{ join(matrix.backend.extras, ' ') }}"

Expand Down Expand Up @@ -129,6 +140,10 @@ jobs:
SNOWFLAKE_SCHEMA: ${{ secrets.SNOWFLAKE_SCHEMA }}
SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }}

- name: enable snowpark testing
if: matrix.backend.key == 'snowflake-snowpark'
run: echo "SNOWFLAKE_SNOWPARK=1" >> "$GITHUB_ENV"

- name: "run parallel tests: ${{ matrix.backend.name }}"
run: just ci-check -m ${{ matrix.backend.name }} --numprocesses auto --dist=loadgroup

Expand Down
3 changes: 3 additions & 0 deletions ibis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,8 @@ def connect(*args, **kwargs):
proxy.name = name
proxy._from_url = backend._from_url
proxy._to_sqlglot = backend._to_sqlglot
# Add any additional methods that should be exposed at the top level
for name in getattr(backend, "_top_level_methods", ()):
setattr(proxy, name, getattr(backend, name))

return proxy
65 changes: 58 additions & 7 deletions ibis/backends/snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class Backend(SQLBackend, CanCreateCatalog, CanCreateDatabase, CanCreateSchema):
supports_python_udfs = True

_latest_udf_python_version = (3, 10)
_top_level_methods = ("from_snowpark",)

def _convert_kwargs(self, kwargs):
with contextlib.suppress(KeyError):
Expand Down Expand Up @@ -203,17 +204,23 @@ def do_connect(self, create_object_udfs: bool = True, **kwargs: Any):
create_object_udfs
Enable object UDF extensions defined by ibis on the first
connection to the database.
connect_args
Additional arguments passed to the DBAPI connection call.
kwargs
Additional arguments passed to the URL constructor.
Additional arguments passed to the DBAPI connection call.
"""
import snowflake.connector as sc

connect_args = kwargs.copy()
session_parameters = connect_args.pop("session_parameters", {})

self._setup_session(
con=sc.connect(**connect_args),
session_parameters=session_parameters,
create_object_udfs=create_object_udfs,
)

def _setup_session(self, *, con, session_parameters, create_object_udfs: bool):
self.con = con

# enable multiple SQL statements by default
session_parameters.setdefault("MULTI_STATEMENT_COUNT", 0)
# don't format JSON output by default
Expand All @@ -232,8 +239,6 @@ def do_connect(self, create_object_udfs: bool = True, **kwargs: Any):
),
)

con = sc.connect(**connect_args)

with contextlib.closing(con.cursor()) as cur:
cur.execute(
"ALTER SESSION SET {}".format(
Expand Down Expand Up @@ -269,7 +274,53 @@ def do_connect(self, create_object_udfs: bool = True, **kwargs: Any):
warnings.warn(
f"Unable to create Ibis UDFs, some functionality will not work: {e}"
)
self.con = con

@util.experimental
@classmethod
def from_snowpark(cls, session, *, create_object_udfs: bool = True) -> Backend:
"""Create an Ibis Snowflake backend from a Snowpark session.
Parameters
----------
session
A Snowpark session instance.
create_object_udfs
Enable object UDF extensions defined by ibis on the first
connection to the database.
Returns
-------
Backend
An Ibis Snowflake backend instance.
Examples
--------
>>> import ibis
>>> ibis.options.interactive = True
>>> import snowflake.snowpark as sp # doctest: +SKIP
>>> session = sp.Session.builder.configs(...).create() # doctest: +SKIP
>>> con = ibis.snowflake.from_snowpark(session) # doctest: +SKIP
>>> batting = con.tables.BATTING # doctest: +SKIP
>>> batting[["playerID", "RBI"]].head() # doctest: +SKIP
┏━━━━━━━━━━━┳━━━━━━━┓
┃ playerID ┃ RBI ┃
┡━━━━━━━━━━━╇━━━━━━━┩
│ string │ int64 │
├───────────┼───────┤
│ abercda01 │ 0 │
│ addybo01 │ 13 │
│ allisar01 │ 19 │
│ allisdo01 │ 27 │
│ ansonca01 │ 16 │
└───────────┴───────┘
"""
backend = cls()
backend._setup_session(
con=session._conn._conn,
session_parameters={},
create_object_udfs=create_object_udfs,
)
return backend

def _get_udf_source(self, udf_node: ops.ScalarUDF):
name = type(udf_node).__name__
Expand Down
20 changes: 19 additions & 1 deletion ibis/backends/snowflake/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,25 @@ def _load_data(self, **_: Any) -> None:

@staticmethod
def connect(*, tmpdir, worker_id, **kw) -> BaseBackend:
return ibis.connect(_get_url(), **kw)
if os.environ.get("SNOWFLAKE_SNOWPARK"):
import snowflake.snowpark as sp

if connection_name := os.environ.get("SNOWFLAKE_DEFAULT_CONNECTION_NAME"):
builder = sp.Session.builder.config("connection_name", connection_name)
else:
builder = sp.Session.builder.configs(
{
"user": os.environ["SNOWFLAKE_USER"],
"account": os.environ["SNOWFLAKE_ACCOUNT"],
"password": os.environ["SNOWFLAKE_PASSWORD"],
"warehouse": os.environ["SNOWFLAKE_WAREHOUSE"],
"database": os.environ["SNOWFLAKE_DATABASE"],
"schema": os.environ["SNOWFLAKE_SCHEMA"],
}
)
return ibis.backends.snowflake.Backend.from_snowpark(builder.create())
else:
return ibis.connect(_get_url(), **kw)


@pytest.fixture(scope="session")
Expand Down

0 comments on commit f15d033

Please sign in to comment.