Skip to content

Commit

Permalink
feat(pyspark): add json string unwrap implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Apr 13, 2024
1 parent d13c533 commit 7d762f2
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 1 deletion.
19 changes: 19 additions & 0 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pyspark import SparkConf
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import PandasUDFType, pandas_udf
from pyspark.sql.types import StringType

import ibis.common.exceptions as com
import ibis.config
Expand Down Expand Up @@ -40,6 +41,22 @@ def normalize_filenames(source_list):
return list(map(util.normalize_filename, source_list))


@pandas_udf(StringType(), PandasUDFType.SCALAR)
def unwrap_json(s: pd.Series) -> pd.Series:
import json

Check warning on line 46 in ibis/backends/pyspark/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pyspark/__init__.py#L46

Added line #L46 was not covered by tests

import pandas as pd

Check warning on line 48 in ibis/backends/pyspark/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pyspark/__init__.py#L48

Added line #L48 was not covered by tests

def nullify_non_string(raw):

Check warning on line 50 in ibis/backends/pyspark/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pyspark/__init__.py#L50

Added line #L50 was not covered by tests
if pd.isna(raw):
return None

Check warning on line 52 in ibis/backends/pyspark/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pyspark/__init__.py#L52

Added line #L52 was not covered by tests

value = json.loads(raw)
return value if isinstance(value, str) else None

Check warning on line 55 in ibis/backends/pyspark/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pyspark/__init__.py#L54-L55

Added lines #L54 - L55 were not covered by tests

return s.map(nullify_non_string)

Check warning on line 57 in ibis/backends/pyspark/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pyspark/__init__.py#L57

Added line #L57 was not covered by tests


class _PySparkCursor:
"""Spark cursor.
Expand Down Expand Up @@ -252,6 +269,8 @@ def _register_udfs(self, expr: ir.Expr) -> None:
spark_udf = pandas_udf(udf_func, udf_return, PandasUDFType.GROUPED_AGG)
self._session.udf.register(udf_name, spark_udf)

self._session.udf.register("unwrap_json", unwrap_json)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = PySparkSchema.from_ibis(op.schema)
df = self._session.createDataFrame(data=op.data.to_frame(), schema=schema)
Expand Down
1 change: 1 addition & 0 deletions ibis/backends/pyspark/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class PySparkCompiler(SQLGlotCompiler):
ops.MapMerge: "map_concat",
ops.MapKeys: "map_keys",
ops.MapValues: "map_values",
ops.UnwrapJSONString: "unwrap_json",
}

def _aggregate(self, funcname: str, *args, where):
Expand Down
4 changes: 4 additions & 0 deletions ibis/backends/tests/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@
"null",
"[42,47,55]",
"[]",
'"a"',
'""',
'"b"',
None,
]
}
)
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_json_array(backend, json_t):

@pytest.mark.notimpl(["dask", "pandas", "risingwave"])
@pytest.mark.notyet(
["pyspark", "trino", "flink"], reason="should work but doesn't deserialize JSON"
["trino", "flink"], reason="should work but doesn't deserialize JSON"
)
def test_json_string(backend, json_t):
expr = json_t.js.string.name("res")
Expand Down

0 comments on commit 7d762f2

Please sign in to comment.