diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index f6f5f026537a5..64dbe30012cd1 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -153,7 +153,7 @@ jobs: streaming, sql-kafka-0-10, streaming-kafka-0-10, mllib-local, mllib, yarn, mesos, kubernetes, hadoop-cloud, spark-ganglia-lgpl, - connect + connect, protobuf # Here, we split Hive and SQL tests into some of slow ones and the rest of them. included-tags: [""] excluded-tags: [""] diff --git a/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto b/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto new file mode 100644 index 0000000000000..8750371349a06 --- /dev/null +++ b/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// TODO(SPARK-40777): Instead of saving .desc files in resources, generate during build. +// To compile and create test class: +// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto +// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/pyspark_test.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto + +syntax = "proto3"; + +package org.apache.spark.sql.protobuf; +option java_outer_classname = "SimpleMessageProtos"; + + +message SimpleMessage { + int32 age = 1; + string name = 2; + int64 score = 3; +} \ No newline at end of file diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index e4a515d203ccb..2a427139148ad 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -282,6 +282,17 @@ def __hash__(self): ], ) +protobuf = Module( + name="protobuf", + dependencies=[sql], + source_file_regexes=[ + "connector/protobuf", + ], + sbt_test_goals=[ + "protobuf/test", + ], +) + sketch = Module( name="sketch", dependencies=[tags], @@ -423,7 +434,7 @@ def __hash__(self): pyspark_sql = Module( name="pyspark-sql", - dependencies=[pyspark_core, hive, avro], + dependencies=[pyspark_core, hive, avro, protobuf], source_file_regexes=["python/pyspark/sql"], python_test_goals=[ # doctests @@ -443,6 +454,7 @@ def __hash__(self): "pyspark.sql.udf", "pyspark.sql.window", "pyspark.sql.avro.functions", + "pyspark.sql.protobuf.functions", "pyspark.sql.pandas.conversion", "pyspark.sql.pandas.map_ops", "pyspark.sql.pandas.group_ops", diff --git a/dev/sparktestsupport/utils.py b/dev/sparktestsupport/utils.py index 11d64c4f0bc7b..37e023aaa6347 100755 --- a/dev/sparktestsupport/utils.py +++ b/dev/sparktestsupport/utils.py @@ -108,23 +108,23 @@ def determine_modules_to_test(changed_modules, deduplicated=True): ['graphx', 'examples'] >>> [x.name for x in determine_modules_to_test([modules.sql])] ... # doctest: +NORMALIZE_WHITESPACE - ['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 'sql-kafka-0-10', - 'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 'sparkr', 'pyspark-connect', - 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-ml'] + ['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 'protobuf', + 'sql-kafka-0-10', 'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 'sparkr', + 'pyspark-connect', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-ml'] >>> sorted([x.name for x in determine_modules_to_test( ... [modules.sparkr, modules.sql], deduplicated=False)]) ... # doctest: +NORMALIZE_WHITESPACE ['avro', 'connect', 'docker-integration-tests', 'examples', 'hive', 'hive-thriftserver', - 'mllib', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', + 'mllib', 'protobuf', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-sql', 'repl', 'sparkr', 'sql', 'sql-kafka-0-10'] >>> sorted([x.name for x in determine_modules_to_test( ... [modules.sql, modules.core], deduplicated=False)]) ... # doctest: +NORMALIZE_WHITESPACE ['avro', 'catalyst', 'connect', 'core', 'docker-integration-tests', 'examples', 'graphx', - 'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'pyspark-connect', 'pyspark-core', - 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-resource', - 'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 'sparkr', 'sql', 'sql-kafka-0-10', - 'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl'] + 'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'protobuf', 'pyspark-connect', + 'pyspark-core', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', + 'pyspark-resource', 'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 'sparkr', 'sql', + 'sql-kafka-0-10', 'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl'] """ modules_to_test = set() for module in changed_modules: diff --git a/python/docs/source/reference/pyspark.sql/index.rst b/python/docs/source/reference/pyspark.sql/index.rst index 52aca086cb44d..fc4569486a77b 100644 --- a/python/docs/source/reference/pyspark.sql/index.rst +++ b/python/docs/source/reference/pyspark.sql/index.rst @@ -40,3 +40,4 @@ This page gives an overview of all public Spark SQL API. avro observation udf + protobuf diff --git a/python/docs/source/reference/pyspark.sql/protobuf.rst b/python/docs/source/reference/pyspark.sql/protobuf.rst new file mode 100644 index 0000000000000..0ba3d56c4c371 --- /dev/null +++ b/python/docs/source/reference/pyspark.sql/protobuf.rst @@ -0,0 +1,28 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +======== +Protobuf +======== +.. currentmodule:: pyspark.sql.protobuf.functions + +.. autosummary:: + :toctree: api/ + + from_protobuf + to_protobuf diff --git a/python/pyspark/sql/protobuf/__init__.py b/python/pyspark/sql/protobuf/__init__.py new file mode 100644 index 0000000000000..ac530a5495faa --- /dev/null +++ b/python/pyspark/sql/protobuf/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = ["functions"] diff --git a/python/pyspark/sql/protobuf/functions.py b/python/pyspark/sql/protobuf/functions.py new file mode 100644 index 0000000000000..9f8b90095dfd9 --- /dev/null +++ b/python/pyspark/sql/protobuf/functions.py @@ -0,0 +1,215 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A collections of builtin protobuf functions +""" + + +from typing import Dict, Optional, TYPE_CHECKING +from pyspark import SparkContext +from pyspark.sql.column import Column, _to_java_column +from pyspark.util import _print_missing_jar + +if TYPE_CHECKING: + from pyspark.sql._typing import ColumnOrName + + +def from_protobuf( + data: "ColumnOrName", + descFilePath: str, + messageName: str, + options: Optional[Dict[str, str]] = None, +) -> Column: + """ + Converts a binary column of Protobuf format into its corresponding catalyst value. + The specified schema must match the read data, otherwise the behavior is undefined: + it may fail or return arbitrary result. + To deserialize the data with a compatible and evolved schema, the expected + Protobuf schema can be set via the option protobuf descriptor. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + data : :class:`~pyspark.sql.Column` or str + the binary column. + descFilePath : str + the protobuf descriptor in Message GeneratedMessageV3 format. + messageName: str + the protobuf message name to look for in descriptor file. + options : dict, optional + options to control how the protobuf record is parsed. + + Notes + ----- + Protobuf functionality is provided as an pluggable external module. + + Examples + -------- + >>> import tempfile + >>> data = [("1", (2, "Alice", 109200))] + >>> ddl_schema = "key STRING, value STRUCT" + >>> df = spark.createDataFrame(data, ddl_schema) + >>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' + ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' + ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' + ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' + ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' + ... '26F746F33') + >>> # Writing a protobuf description into a file, generated by using + >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file + >>> with tempfile.TemporaryDirectory() as tmp_dir: + ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir + ... with open(desc_file_path, "wb") as f: + ... _ = f.write(bytearray.fromhex(desc_hex)) + ... f.flush() + ... message_name = 'SimpleMessage' + ... proto_df = df.select( + ... to_protobuf(df.value, desc_file_path, message_name).alias("value")) + ... proto_df.show(truncate=False) + ... proto_df = proto_df.select( + ... from_protobuf(proto_df.value, desc_file_path, message_name).alias("value")) + ... proto_df.show(truncate=False) + +----------------------------------------+ + |value | + +----------------------------------------+ + |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]| + +----------------------------------------+ + +------------------+ + |value | + +------------------+ + |{2, Alice, 109200}| + +------------------+ + """ + + sc = SparkContext._active_spark_context + assert sc is not None and sc._jvm is not None + try: + jc = sc._jvm.org.apache.spark.sql.protobuf.functions.from_protobuf( + _to_java_column(data), descFilePath, messageName, options or {} + ) + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": + _print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version) + raise + return Column(jc) + + +def to_protobuf(data: "ColumnOrName", descFilePath: str, messageName: str) -> Column: + """ + Converts a column into binary of protobuf format. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + data : :class:`~pyspark.sql.Column` or str + the data column. + descFilePath : str + the protobuf descriptor in Message GeneratedMessageV3 format. + messageName: str + the protobuf message name to look for in descriptor file. + + Notes + ----- + Protobuf functionality is provided as an pluggable external module + + Examples + -------- + >>> import tempfile + >>> data = [([(2, "Alice", 13093020)])] + >>> ddl_schema = "value struct" + >>> df = spark.createDataFrame(data, ddl_schema) + >>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' + ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' + ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' + ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' + ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' + ... '26F746F33') + >>> # Writing a protobuf description into a file, generated by using + >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file + >>> with tempfile.TemporaryDirectory() as tmp_dir: + ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir + ... with open(desc_file_path, "wb") as f: + ... _ = f.write(bytearray.fromhex(desc_hex)) + ... f.flush() + ... message_name = 'SimpleMessage' + ... proto_df = df.select( + ... to_protobuf(df.value, desc_file_path, message_name).alias("suite")) + ... proto_df.show(truncate=False) + +-------------------------------------------+ + |suite | + +-------------------------------------------+ + |[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]| + +-------------------------------------------+ + """ + sc = SparkContext._active_spark_context + assert sc is not None and sc._jvm is not None + try: + jc = sc._jvm.org.apache.spark.sql.protobuf.functions.to_protobuf( + _to_java_column(data), descFilePath, messageName + ) + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": + _print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version) + raise + return Column(jc) + + +def _test() -> None: + import os + import sys + from pyspark.testing.utils import search_jar + + protobuf_jar = search_jar("connector/protobuf", "spark-protobuf-assembly-", "spark-protobuf") + if protobuf_jar is None: + print( + "Skipping all Protobuf Python tests as the optional Protobuf project was " + "not compiled into a JAR. To run these tests, " + "you need to build Spark with 'build/sbt package' or " + "'build/mvn package' before running this test." + ) + sys.exit(0) + else: + existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell") + jars_args = "--jars %s" % protobuf_jar + os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args]) + + import doctest + from pyspark.sql import SparkSession + import pyspark.sql.protobuf.functions + + globs = pyspark.sql.protobuf.functions.__dict__.copy() + spark = ( + SparkSession.builder.master("local[2]") + .appName("sql.protobuf.functions tests") + .getOrCreate() + ) + globs["spark"] = spark + (failure_count, test_count) = doctest.testmod( + pyspark.sql.protobuf.functions, + globs=globs, + optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE, + ) + spark.stop() + if failure_count: + sys.exit(-1) + + +if __name__ == "__main__": + _test()