Skip to content

Commit

Permalink
refactor(pyspark): reimplement the backend using the new relational o…
Browse files Browse the repository at this point in the history
…perations an spark SQL

Summary
-------
Port the Spark backend to use the new relational operations as well as move to
Spark SQL from PySpark and take advantage of the new SQLGlot backend.

Test suite changes
------------------
```
OLD: 1223 passed, 33 skipped, 29316 deselected, 285 xfailed in 375.24s (0:06:15)
NEW: 1334 passed, 33 skipped, 30002 deselected, 155 xfailed in 333.94s (0:05:33)
```

Advantages
----------
- Spark SQL is faster than PySpark.
- Spark SQL has broader feature support than PySpark, for example it properly
  supports subqueries.
- We can reuse the base SQLGlot backend making the PySpark compiler as thin as
  500 lines of code.

I kept the previous datatype and schema mappers since some of the DDL operations
directly work on the PySpark session and Catalog objects, most of these would
be harder to express in Spark SQL.
  • Loading branch information
kszucs committed Feb 12, 2024
1 parent 4d99314 commit 32efbe7
Show file tree
Hide file tree
Showing 48 changed files with 1,102 additions and 4,016 deletions.
136 changes: 68 additions & 68 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -536,73 +536,73 @@ jobs:
if: matrix.backend.services != null && failure()
run: docker compose logs

# test_pyspark:
# name: PySpark ${{ matrix.os }} python-${{ matrix.python-version }}
# runs-on: ${{ matrix.os }}
# strategy:
# fail-fast: false
# matrix:
# os:
# - ubuntu-latest
# python-version:
# - "3.10"
# steps:
# - name: checkout
# uses: actions/checkout@v4
#
# - uses: actions/setup-java@v4
# with:
# distribution: microsoft
# java-version: 17
#
# - uses: extractions/setup-just@v1
# env:
# GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
#
# - name: download backend data
# run: just download-data
#
# - name: install python
# uses: actions/setup-python@v5
# id: install_python
# with:
# python-version: ${{ matrix.python-version }}
#
# - name: install poetry
# run: python -m pip install --upgrade pip 'poetry==1.7.1'
#
# - name: remove lonboard
# # it requires a version of pandas that pyspark is not compatible with
# run: poetry remove lonboard
#
# - name: install maximum versions of pandas and numpy
# run: poetry add --lock 'pandas@<2' 'numpy<1.24'
#
# - name: checkout the lock file
# run: git checkout poetry.lock
#
# - name: lock with no updates
# # poetry add is aggressive and will update other dependencies like
# # numpy and pandas so we keep the pyproject.toml edits and then relock
# # without updating anything except the requested versions
# run: poetry lock --no-update
#
# - name: install ibis
# run: poetry install --without dev --without docs --extras pyspark
#
# - name: run tests
# run: just ci-check -m pyspark
#
# - name: check that no untracked files were produced
# shell: bash
# run: git checkout poetry.lock pyproject.toml && ! git status --porcelain | tee /dev/stderr | grep .
#
# - name: upload code coverage
# # only upload coverage for jobs that aren't mostly xfails
# if: success() && matrix.python-version != '3.11'
# uses: codecov/codecov-action@v4
# with:
# flags: backend,pyspark,${{ runner.os }},python-${{ steps.install_python.outputs.python-version }}
test_pyspark:
name: PySpark ${{ matrix.os }} python-${{ matrix.python-version }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
python-version:
- "3.10"
steps:
- name: checkout
uses: actions/checkout@v4

- uses: actions/setup-java@v4
with:
distribution: microsoft
java-version: 17

- uses: extractions/setup-just@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: download backend data
run: just download-data

- name: install python
uses: actions/setup-python@v5
id: install_python
with:
python-version: ${{ matrix.python-version }}

- name: install poetry
run: python -m pip install --upgrade pip 'poetry==1.7.1'

- name: remove lonboard
# it requires a version of pandas that pyspark is not compatible with
run: poetry remove lonboard

- name: install maximum versions of pandas and numpy
run: poetry add --lock 'pandas@<2' 'numpy<1.24'

- name: checkout the lock file
run: git checkout poetry.lock

- name: lock with no updates
# poetry add is aggressive and will update other dependencies like
# numpy and pandas so we keep the pyproject.toml edits and then relock
# without updating anything except the requested versions
run: poetry lock --no-update

- name: install ibis
run: poetry install --without dev --without docs --extras pyspark

- name: run tests
run: just ci-check -m pyspark

- name: check that no untracked files were produced
shell: bash
run: git checkout poetry.lock pyproject.toml && ! git status --porcelain | tee /dev/stderr | grep .

- name: upload code coverage
# only upload coverage for jobs that aren't mostly xfails
if: success() && matrix.python-version != '3.11'
uses: codecov/codecov-action@v4
with:
flags: backend,pyspark,${{ runner.os }},python-${{ steps.install_python.outputs.python-version }}

# gen_lockfile_sqlalchemy2:
# name: Generate Poetry Lockfile for SQLAlchemy 2
Expand Down Expand Up @@ -768,6 +768,6 @@ jobs:
# - test_backends_min_version
- test_backends
# - test_backends_sqlalchemy2
# - test_pyspark
- test_pyspark
steps:
- run: exit 0
3 changes: 3 additions & 0 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,8 @@ def db_identity(self) -> str:
parts.extend(f"{k}={v}" for k, v in self._con_kwargs.items())
return "_".join(map(str, parts))

# TODO(kszucs): this should be a classmethod returning with a new backend
# instance which does instantiate the connection
def connect(self, *args, **kwargs) -> BaseBackend:
"""Connect to the database.
Expand Down Expand Up @@ -857,6 +859,7 @@ def _from_url(self, url: str, **kwargs) -> BaseBackend:
def _convert_kwargs(kwargs: MutableMapping) -> None:
"""Manipulate keyword arguments to `.connect` method."""

# TODO(kszucs): should call self.connect(*self._con_args, **self._con_kwargs)
def reconnect(self) -> None:
"""Reconnect to the database already configured with connect."""
self.do_connect(*self._con_args, **self._con_kwargs)
Expand Down
6 changes: 5 additions & 1 deletion ibis/backends/base/sqlglot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def has_operation(cls, operation: type[ops.Value]) -> bool:
dispatcher = cls.compiler.visit_node.register.__self__.dispatcher
return dispatcher.dispatch(operation) is not dispatcher.dispatch(object)

# TODO(kszucs): get_schema() is not registered as an abstract method
def table(
self, name: str, schema: str | None = None, database: str | None = None
) -> ir.Table:
Expand Down Expand Up @@ -90,7 +91,7 @@ def compile(
):
"""Compile an Ibis expression to a SQL string."""
query = self._to_sqlglot(expr, limit=limit, params=params, **kwargs)
sql = query.sql(dialect=self.name, pretty=True)
sql = query.sql(dialect=self.compiler.dialect, pretty=True)
self._log(sql)
return sql

Expand Down Expand Up @@ -118,6 +119,7 @@ def sql(
schema = self._get_schema_using_query(query)
return ops.SQLQueryResult(query, ibis.schema(schema), self).to_expr()

# TODO(kszucs): should be removed in favor of _get_schema_using_query()
@abc.abstractmethod
def _metadata(self, query: str) -> Iterator[tuple[str, dt.DataType]]:
"""Return the metadata of a SQL query."""
Expand Down Expand Up @@ -223,6 +225,8 @@ def execute(

schema = table.schema()

# TODO(kszucs): these methods should be abstractmethods or this default
# implementation should be removed
with self._safe_raw_sql(sql) as cur:
result = self._fetch_from_cursor(cur, schema)
return expr.__pandas_result__(result)
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/base/sqlglot/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def visit_DefaultLiteral(self, op, *, value, dtype):
if math.isnan(value):
return self.NAN
elif math.isinf(value):
return self.POS_INF if value < 0 else self.NEG_INF
return self.POS_INF if value > 0 else self.NEG_INF
return sge.convert(value)
elif dtype.is_decimal():
return self.cast(str(value), dtype)
Expand Down Expand Up @@ -864,6 +864,7 @@ def visit_Argument(self, op, *, name: str, shape, dtype):
def visit_RowID(self, op, *, table):
return sg.column(op.name, table=table.alias_or_name, quoted=self.quoted)

# TODO(kszucs): this should be renamed to something UDF related
def __sql_name__(self, op: ops.ScalarUDF | ops.AggUDF) -> str:
# not actually a table, but easier to quote individual namespace
# components this way
Expand Down
6 changes: 6 additions & 0 deletions ibis/backends/base/sqlglot/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,9 @@ def _from_ibis_Struct(cls, dtype: dt.Struct) -> sge.DataType:

class SQLiteType(SqlglotType):
dialect = "sqlite"


class PySparkType(SqlglotType):
dialect = "spark"
default_decimal_precision = 38
default_decimal_scale = 18
2 changes: 2 additions & 0 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def current_schema(self) -> str:
[(schema,)] = cur.fetchall()
return schema

# TODO(kszucs): should be moved to the base SQLGLot backend
def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any:
with contextlib.suppress(AttributeError):
query = query.sql(dialect=self.name)
Expand Down Expand Up @@ -440,6 +441,7 @@ def _load_extensions(
cur.install_extension(extension, force_install=force_install)
cur.load_extension(extension)

# TODO(kszucs): should be a classmethod
def _from_url(self, url: str, **kwargs) -> BaseBackend:
"""Connect to a backend using a URL `url`.
Expand Down
1 change: 1 addition & 0 deletions ibis/backends/duckdb/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def visit_ArrayRepeat(self, op, *, arg, times):
func = sge.Lambda(this=arg, expressions=[sg.to_identifier("_")])
return self.f.flatten(self.f.list_apply(self.f.range(times), func))

# TODO(kszucs): this could be moved to the base SQLGlotCompiler
@visit_node.register(ops.Sample)
def visit_Sample(
self, op, *, parent, fraction: float, method: str, seed: int | None, **_
Expand Down
2 changes: 2 additions & 0 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ def begin(self):
finally:
cur.close()

# TODO(kszucs): should make it an abstract method or remove the use of it
# from .execute()
@contextlib.contextmanager
def _safe_raw_sql(self, *args, **kwargs):
with contextlib.closing(self.raw_sql(*args, **kwargs)) as result:
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/mysql/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


class MySQLPandasData(PandasData):
# TODO(kszucs): this could be reused at other backends, like pyspark
@classmethod
def convert_Time(cls, s, dtype, pandas_type):
def convert(timedelta):
Expand All @@ -14,7 +15,7 @@ def convert(timedelta):
hour=comps.hours,
minute=comps.minutes,
second=comps.seconds,
microsecond=comps.microseconds,
microsecond=comps.milliseconds * 1000 + comps.microseconds,
)

return s.map(convert, na_action="ignore")
Expand Down
Loading

0 comments on commit 32efbe7

Please sign in to comment.