Skip to content

Commit

Permalink
[SPARK-40655][PYTHON][PROTOBUF] PySpark support for from_protobuf and…
Browse files Browse the repository at this point in the history
… to_protobuf

From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com)

This PR follows main PR #37972

The following is an example of how to use from_protobuf and to_protobuf in Pyspark.

```python
data = [("1", (2, "Alice", 109200))]
ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>"
df = spark.createDataFrame(data, ddl_schema)
desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
 import tempfile
# Writing a protobuf description into a file, generated by using
# connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
with tempfile.TemporaryDirectory() as tmp_dir:
...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
...     with open(desc_file_path, "wb") as f:
...         _ = f.write(bytearray.fromhex(desc_hex))
...         f.flush()
...         message_name = 'SimpleMessage'
...         proto_df = df.select(to_protobuf(df.value,
...         desc_file_path, message_name).alias("value"))
...         proto_df.show(truncate=False)
...         proto_df = proto_df.select(from_protobuf(proto_df.value,
...         desc_file_path, message_name).alias("value"))
...         proto_df.show(truncate=False)
    +----------------------------------------+
    |value                                   |
    +----------------------------------------+
    |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
    +----------------------------------------+
    +------------------+
    |value             |
    +------------------+
    |{2, Alice, 109200}|
    +------------------+
```

### ****Tests Covered****
- from_protobuf / to_protobuf (functions.py)

Closes #38212 from SandishKumarHN/PYSPARK_PROTOBUF.

Authored-by: SandishKumarHN <sanysandish@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
SandishKumarHN authored and HyukjinKwon committed Oct 19, 2022
1 parent 14d8604 commit ed9db14
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ jobs:
streaming, sql-kafka-0-10, streaming-kafka-0-10,
mllib-local, mllib,
yarn, mesos, kubernetes, hadoop-cloud, spark-ganglia-lgpl,
connect
connect, protobuf
# Here, we split Hive and SQL tests into some of slow ones and the rest of them.
included-tags: [""]
excluded-tags: [""]
Expand Down
32 changes: 32 additions & 0 deletions connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// TODO(SPARK-40777): Instead of saving .desc files in resources, generate during build.
// To compile and create test class:
// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/pyspark_test.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto

syntax = "proto3";

package org.apache.spark.sql.protobuf;
option java_outer_classname = "SimpleMessageProtos";


message SimpleMessage {
int32 age = 1;
string name = 2;
int64 score = 3;
}
14 changes: 13 additions & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,17 @@ def __hash__(self):
],
)

protobuf = Module(
name="protobuf",
dependencies=[sql],
source_file_regexes=[
"connector/protobuf",
],
sbt_test_goals=[
"protobuf/test",
],
)

sketch = Module(
name="sketch",
dependencies=[tags],
Expand Down Expand Up @@ -423,7 +434,7 @@ def __hash__(self):

pyspark_sql = Module(
name="pyspark-sql",
dependencies=[pyspark_core, hive, avro],
dependencies=[pyspark_core, hive, avro, protobuf],
source_file_regexes=["python/pyspark/sql"],
python_test_goals=[
# doctests
Expand All @@ -443,6 +454,7 @@ def __hash__(self):
"pyspark.sql.udf",
"pyspark.sql.window",
"pyspark.sql.avro.functions",
"pyspark.sql.protobuf.functions",
"pyspark.sql.pandas.conversion",
"pyspark.sql.pandas.map_ops",
"pyspark.sql.pandas.group_ops",
Expand Down
16 changes: 8 additions & 8 deletions dev/sparktestsupport/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,23 @@ def determine_modules_to_test(changed_modules, deduplicated=True):
['graphx', 'examples']
>>> [x.name for x in determine_modules_to_test([modules.sql])]
... # doctest: +NORMALIZE_WHITESPACE
['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 'sql-kafka-0-10',
'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 'sparkr', 'pyspark-connect',
'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-ml']
['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 'protobuf',
'sql-kafka-0-10', 'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 'sparkr',
'pyspark-connect', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-ml']
>>> sorted([x.name for x in determine_modules_to_test(
... [modules.sparkr, modules.sql], deduplicated=False)])
... # doctest: +NORMALIZE_WHITESPACE
['avro', 'connect', 'docker-integration-tests', 'examples', 'hive', 'hive-thriftserver',
'mllib', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas',
'mllib', 'protobuf', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas',
'pyspark-pandas-slow', 'pyspark-sql', 'repl', 'sparkr', 'sql', 'sql-kafka-0-10']
>>> sorted([x.name for x in determine_modules_to_test(
... [modules.sql, modules.core], deduplicated=False)])
... # doctest: +NORMALIZE_WHITESPACE
['avro', 'catalyst', 'connect', 'core', 'docker-integration-tests', 'examples', 'graphx',
'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'pyspark-connect', 'pyspark-core',
'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-resource',
'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 'sparkr', 'sql', 'sql-kafka-0-10',
'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl']
'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'protobuf', 'pyspark-connect',
'pyspark-core', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow',
'pyspark-resource', 'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 'sparkr', 'sql',
'sql-kafka-0-10', 'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl']
"""
modules_to_test = set()
for module in changed_modules:
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ This page gives an overview of all public Spark SQL API.
avro
observation
udf
protobuf
28 changes: 28 additions & 0 deletions python/docs/source/reference/pyspark.sql/protobuf.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
========
Protobuf
========
.. currentmodule:: pyspark.sql.protobuf.functions

.. autosummary::
:toctree: api/

from_protobuf
to_protobuf
18 changes: 18 additions & 0 deletions python/pyspark/sql/protobuf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

__all__ = ["functions"]
215 changes: 215 additions & 0 deletions python/pyspark/sql/protobuf/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A collections of builtin protobuf functions
"""


from typing import Dict, Optional, TYPE_CHECKING
from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.util import _print_missing_jar

if TYPE_CHECKING:
from pyspark.sql._typing import ColumnOrName


def from_protobuf(
data: "ColumnOrName",
descFilePath: str,
messageName: str,
options: Optional[Dict[str, str]] = None,
) -> Column:
"""
Converts a binary column of Protobuf format into its corresponding catalyst value.
The specified schema must match the read data, otherwise the behavior is undefined:
it may fail or return arbitrary result.
To deserialize the data with a compatible and evolved schema, the expected
Protobuf schema can be set via the option protobuf descriptor.
.. versionadded:: 3.4.0
Parameters
----------
data : :class:`~pyspark.sql.Column` or str
the binary column.
descFilePath : str
the protobuf descriptor in Message GeneratedMessageV3 format.
messageName: str
the protobuf message name to look for in descriptor file.
options : dict, optional
options to control how the protobuf record is parsed.
Notes
-----
Protobuf functionality is provided as an pluggable external module.
Examples
--------
>>> import tempfile
>>> data = [("1", (2, "Alice", 109200))]
>>> ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
>>> # Writing a protobuf description into a file, generated by using
>>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
>>> with tempfile.TemporaryDirectory() as tmp_dir:
... desc_file_path = "%s/pyspark_test.desc" % tmp_dir
... with open(desc_file_path, "wb") as f:
... _ = f.write(bytearray.fromhex(desc_hex))
... f.flush()
... message_name = 'SimpleMessage'
... proto_df = df.select(
... to_protobuf(df.value, desc_file_path, message_name).alias("value"))
... proto_df.show(truncate=False)
... proto_df = proto_df.select(
... from_protobuf(proto_df.value, desc_file_path, message_name).alias("value"))
... proto_df.show(truncate=False)
+----------------------------------------+
|value |
+----------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
+----------------------------------------+
+------------------+
|value |
+------------------+
|{2, Alice, 109200}|
+------------------+
"""

sc = SparkContext._active_spark_context
assert sc is not None and sc._jvm is not None
try:
jc = sc._jvm.org.apache.spark.sql.protobuf.functions.from_protobuf(
_to_java_column(data), descFilePath, messageName, options or {}
)
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
_print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version)
raise
return Column(jc)


def to_protobuf(data: "ColumnOrName", descFilePath: str, messageName: str) -> Column:
"""
Converts a column into binary of protobuf format.
.. versionadded:: 3.4.0
Parameters
----------
data : :class:`~pyspark.sql.Column` or str
the data column.
descFilePath : str
the protobuf descriptor in Message GeneratedMessageV3 format.
messageName: str
the protobuf message name to look for in descriptor file.
Notes
-----
Protobuf functionality is provided as an pluggable external module
Examples
--------
>>> import tempfile
>>> data = [([(2, "Alice", 13093020)])]
>>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
>>> # Writing a protobuf description into a file, generated by using
>>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
>>> with tempfile.TemporaryDirectory() as tmp_dir:
... desc_file_path = "%s/pyspark_test.desc" % tmp_dir
... with open(desc_file_path, "wb") as f:
... _ = f.write(bytearray.fromhex(desc_hex))
... f.flush()
... message_name = 'SimpleMessage'
... proto_df = df.select(
... to_protobuf(df.value, desc_file_path, message_name).alias("suite"))
... proto_df.show(truncate=False)
+-------------------------------------------+
|suite |
+-------------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+-------------------------------------------+
"""
sc = SparkContext._active_spark_context
assert sc is not None and sc._jvm is not None
try:
jc = sc._jvm.org.apache.spark.sql.protobuf.functions.to_protobuf(
_to_java_column(data), descFilePath, messageName
)
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
_print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version)
raise
return Column(jc)


def _test() -> None:
import os
import sys
from pyspark.testing.utils import search_jar

protobuf_jar = search_jar("connector/protobuf", "spark-protobuf-assembly-", "spark-protobuf")
if protobuf_jar is None:
print(
"Skipping all Protobuf Python tests as the optional Protobuf project was "
"not compiled into a JAR. To run these tests, "
"you need to build Spark with 'build/sbt package' or "
"'build/mvn package' before running this test."
)
sys.exit(0)
else:
existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
jars_args = "--jars %s" % protobuf_jar
os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])

import doctest
from pyspark.sql import SparkSession
import pyspark.sql.protobuf.functions

globs = pyspark.sql.protobuf.functions.__dict__.copy()
spark = (
SparkSession.builder.master("local[2]")
.appName("sql.protobuf.functions tests")
.getOrCreate()
)
globs["spark"] = spark
(failure_count, test_count) = doctest.testmod(
pyspark.sql.protobuf.functions,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()

0 comments on commit ed9db14

Please sign in to comment.