Skip to content

Commit

Permalink
feat(sqlalchemy): make ibis.connect with sqlalchemy backends
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Aug 19, 2022
1 parent 2c48835 commit b6cefb9
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 81 deletions.
11 changes: 7 additions & 4 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ jobs:
title: Datafusion
- name: pyspark
title: PySpark
parallel: false
- name: mysql
title: MySQL
services:
Expand Down Expand Up @@ -80,6 +81,7 @@ jobs:
backend:
name: impala
title: Impala
parallel: false
services:
- impala
- kudu
Expand All @@ -93,6 +95,7 @@ jobs:
backend:
name: impala
title: Impala
parallel: false
services:
- impala
- kudu
Expand Down Expand Up @@ -158,12 +161,12 @@ jobs:
- name: download backend data
run: just download-data

- name: run parallel ${{ matrix.backend.name }} tests
if: ${{ matrix.backend.name != 'pyspark' && matrix.backend.name != 'impala' }}
- name: "run parallel tests: ${{ matrix.backend.name }}"
if: ${{ matrix.backend.parallel || matrix.backend.parallel == null }}
run: just ci-check -m ${{ matrix.backend.name }} --numprocesses auto --dist=loadgroup

- name: run non-parallel ${{ matrix.backend.name }} tests
if: ${{ matrix.backend.name == 'pyspark' || matrix.backend.name == 'impala' }}
- name: "run serial tests: ${{ matrix.backend.name }}"
if: ${{ !matrix.backend.parallel }}
run: just ci-check -m ${{ matrix.backend.name }}
env:
IBIS_TEST_NN_HOST: localhost
Expand Down
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ repos:
rev: 5.10.1
hooks:
- id: isort
- repo: https://github.com/pycqa/flake8
rev: 5.0.4
hooks:
- id: flake8
- repo: https://github.com/psf/black
rev: 22.6.0
hooks:
- id: black
- repo: https://github.com/pycqa/flake8
rev: 5.0.4
hooks:
- id: flake8
- repo: https://github.com/MarcoGorelli/absolufy-imports
rev: v0.3.1
hooks:
Expand Down
88 changes: 66 additions & 22 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import abc
import collections.abc
import functools
import importlib.metadata
import keyword
import re
import sys
import urllib.parse
from pathlib import Path
from typing import (
TYPE_CHECKING,
Expand All @@ -14,6 +17,7 @@
Iterable,
Iterator,
Mapping,
MutableMapping,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -277,6 +281,16 @@ def connect(self, *args, **kwargs) -> BaseBackend:
new_backend.reconnect()
return new_backend

def _from_url(self, url: str) -> BaseBackend:
"""Construct an ibis backend from a SQLAlchemy-conforming URL."""
raise NotImplementedError(
f"`_from_url` not implemented for the {self.name} backend"
)

@staticmethod
def _convert_kwargs(kwargs: MutableMapping) -> None:
"""Manipulate keyword arguments to `.connect` method."""

def reconnect(self) -> None:
"""Reconnect to the database already configured with connect."""
self.do_connect(*self._con_args, **self._con_kwargs)
Expand Down Expand Up @@ -672,21 +686,60 @@ def has_operation(cls, operation: type[ops.Value]) -> bool:
_connect = RegexDispatcher("_connect")


@_connect.register(r"(?P<backend>.+)://(?P<path>.*)", priority=10)
def _(_: str, *, backend: str, path: str, **kwargs: Any) -> BaseBackend:
@functools.lru_cache(maxsize=None)
def _get_backend_names() -> frozenset[str]:
"""Return the set of known backend names.
Notes
-----
This function returns a frozenset to prevent cache pollution.
If a `set` is used, then any in-place modifications to the set
are visible to every caller of this function.
"""

if sys.version_info < (3, 10):
entrypoints = importlib.metadata.entry_points()["ibis.backends"]
else:
entrypoints = importlib.metadata.entry_points(group="ibis.backends")
return frozenset(ep.name for ep in entrypoints)


_PATTERN = "|".join(
sorted(_get_backend_names().difference(("duckdb", "sqlite")))
)


@_connect.register(rf"(?P<backend>{_PATTERN})://.+", priority=12)
def _(url: str, *, backend: str, **kwargs: Any) -> BaseBackend:
"""Connect to given `backend` with `path`.
Examples
--------
>>> con = ibis.connect("duckdb://relative/path/to/data.db")
>>> con = ibis.connect("postgres://user:pass@hostname:port/database")
>>> con = ibis.connect("mysql://user:pass@hostname:port/database")
"""
instance = getattr(ibis, backend)
instance: BaseBackend = getattr(ibis, backend)
backend += (backend == "postgres") * "ql"
try:
return instance.connect(url=f"{backend}://{path}", **kwargs)
except TypeError:
return instance.connect(path, **kwargs)
params = "?" * bool(kwargs) + urllib.parse.urlencode(kwargs)
url += params
return instance._from_url(url)


@_connect.register(r"(?P<backend>duckdb|sqlite)://(?P<path>.*)", priority=12)
def _(_: str, *, backend: str, path: str, **kwargs: Any) -> BaseBackend:
"""Connect to given `backend` with `path`.
Examples
--------
>>> con = ibis.connect("duckdb://relative/path/to/data.db")
>>> con = ibis.connect("sqlite:///absolute/path/to/data.db")
"""
instance: BaseBackend = getattr(ibis, backend)
params = "?" * bool(kwargs) + urllib.parse.urlencode(kwargs)
path += params
# extra slash for sqlalchemy
return instance._from_url(f"{backend}:///{path}")


@_connect.register(r"file://(?P<path>.*)", priority=10)
Expand Down Expand Up @@ -716,7 +769,7 @@ def connect(resource: Path | str, **_: Any) -> BaseBackend:
Examples
--------
>>> con = ibis.connect("duckdb://relative/path/to/data.db")
>>> con = ibis.connect("duckdb:///absolute/path/to/data.db")
>>> con = ibis.connect("relative/path/to/data.duckdb")
"""
raise NotImplementedError(type(resource))
Expand Down Expand Up @@ -752,29 +805,20 @@ def _(
Examples
--------
>>> con = ibis.connect("duckdb://relative/path/to/data.csv")
>>> con = ibis.connect("duckdb://relative/path/to/more/data.parquet")
>>> con = ibis.connect("duckdb:///absolute/path/to/more/data.parquet")
"""
con = getattr(ibis, backend).connect(**kwargs)
con.register(f"{extension}://{filename}")
return con


@_connect.register(
r"(?P<filename>.+\.(?P<extension>parquet|csv))",
priority=8,
)
def _(
_: str,
*,
filename: str,
extension: str,
**kwargs: Any,
) -> BaseBackend:
@_connect.register(r".+\.(?:parquet|csv)", priority=8)
def _(filename: str, **kwargs: Any) -> BaseBackend:
"""Connect to `duckdb` and register a parquet or csv file.
Examples
--------
>>> con = ibis.connect("relative/path/to/data.csv")
>>> con = ibis.connect("relative/path/to/more/data.parquet")
"""
return _connect(f"duckdb://{filename}", **kwargs)
return _connect(f"duckdb:///{filename}", **kwargs)
29 changes: 29 additions & 0 deletions ibis/backends/base/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from functools import lru_cache
from typing import Any, Mapping

import sqlalchemy as sa

import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
Expand All @@ -25,6 +27,33 @@ class BaseSQLBackend(BaseBackend):
table_class = ops.DatabaseTable
table_expr_class = ir.Table

def _from_url(self, url: str) -> BaseBackend:
"""Connect to a backend using a URL `url`.
Parameters
----------
url
URL with which to connect to a backend.
Returns
-------
BaseBackend
A backend instance
"""
url = sa.engine.make_url(url)

kwargs = {
name: value
for name in ("host", "port", "database", "password")
if (value := getattr(url, name, None))
}
if username := url.username:
kwargs["user"] = username

kwargs.update(url.query)
self._convert_kwargs(kwargs)
return self.connect(**kwargs)

def table(self, name: str, database: str | None = None) -> ir.Table:
"""Construct a table expression.
Expand Down
39 changes: 13 additions & 26 deletions ibis/backends/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import ibis
import ibis.util as util
from ibis.backends.base import _get_backend_names

TEST_TABLES = {
"functional_alltypes": ibis.schema(
Expand Down Expand Up @@ -221,28 +222,6 @@ def _random_identifier(suffix: str) -> str:
return f"__ibis_test_{suffix}_{util.guid()}"


@lru_cache(maxsize=None)
def _get_backend_names() -> frozenset[str]:
"""Return the set of known backend names.
Notes
-----
This function returns a frozenset to prevent cache pollution.
If a `set` is used, then any in-place modifications to the set
are visible to every caller of this function.
"""
import sys

if sys.version_info < (3, 10):
entrypoints = list(importlib.metadata.entry_points()['ibis.backends'])
else:
entrypoints = list(
importlib.metadata.entry_points(group="ibis.backends")
)
return frozenset(ep.name for ep in entrypoints)


def _get_backend_conf(backend_str: str):
"""Convert a backend string to the test class for the backend."""
conftest = importlib.import_module(
Expand Down Expand Up @@ -300,6 +279,8 @@ def pytest_ignore_collect(path, config):
def pytest_collection_modifyitems(session, config, items):
# add the backend marker to any tests are inside "ibis/backends"
all_backends = _get_backend_names()
xdist_group_markers = []

for item in items:
parts = item.path.parts
backend = _get_backend_from_parts(parts)
Expand All @@ -310,10 +291,16 @@ def pytest_collection_modifyitems(session, config, items):
# anything else is a "core" test and is run by default
item.add_marker(pytest.mark.core)

if "sqlite" in item.nodeid:
item.add_marker(pytest.mark.xdist_group(name="sqlite"))
if "duckdb" in item.nodeid:
item.add_marker(pytest.mark.xdist_group(name="duckdb"))
for name in ("duckdb", "sqlite"):
# build a list of markers so we're don't invalidate the item's
# marker iterator
for _ in item.iter_markers(name=name):
xdist_group_markers.append(
(item, pytest.mark.xdist_group(name=name))
)

for item, marker in xdist_group_markers:
item.add_marker(marker)


@lru_cache(maxsize=None)
Expand Down
14 changes: 13 additions & 1 deletion ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,19 @@ class Options(ibis.config.BaseModel):
description="Treat NaNs in floating point expressions as NULL.",
)

def do_connect(self, session: pyspark.sql.SparkSession) -> None:
def _from_url(self, url: str) -> Backend:
"""Construct a PySpark backend from a URL `url`."""
url = sa.engine.make_url(url)
params = list(itertools.chain.from_iterable(url.query.items()))
if database := url.database:
params.append("spark.sql.warehouse.dir")
params.append(str(Path(database).absolute()))

builder = SparkSession.builder.config(*params)
session = builder.getOrCreate()
return self.connect(session)

def do_connect(self, session: SparkSession) -> None:
"""Create a PySpark `Backend` for use with Ibis.
Parameters
Expand Down
Loading

0 comments on commit b6cefb9

Please sign in to comment.