Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40654][SQL] Protobuf support for Spark - from_protobuf AND to_protobuf #37972

Closed
wants to merge 47 commits into from

Conversation

SandishKumarHN
Copy link
Contributor

@SandishKumarHN SandishKumarHN commented Sep 22, 2022

From SandishKumarHN(sanysandish@gmail.com) and Mohan Parthasarathy(mposdev21@gmail.com)

Introduction

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. It is widely used in Kafka-based data pipelines. Unlike Avro, Spark does not have native support for protobuf. This PR provides two new functions from_protobuf/to_protobuf to read and write Protobuf data within a data frame.

The implementation is closely modeled after Avro implementation so that it is easy to understand and review the changes.

Following is an example of typical usage.

// `from_proto` requires absolute path of Protobuf schema file
// and the protobuf message within the file
val userProtoFile = "./examples/src/main/resources/user.desc"
val userProtoMsg = "User"

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "proto-topic-in")
  .load()

// 1. Decode the Protobuf data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Protobuf format.
val output = df
  .select(from_protobuf('value, userProtoFile, userProtoMsg) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_protobuf($"user.name", userProtoFile, userProtoMsg) as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "proto-topic-out")
  .start()

The new functions are very similar to Avro

  • from_protobuf requires the proto descriptor file and the message type within that file which is similar to from_avro requiring the JSON schema.
  • to_protobuf is similar to to_avro and does not require the proto descriptor file as it can build the schema (protobuf descriptor) from the catalyst types. Similarly, to_proto (like to_avro) can also take in the descriptor file for describing the schema

What is supported

  • Protobuf format proto3 is supported ( Even though proto2 and proto3 are inter-operable, we have explicitly tested only with proto3)
  • Google protobuf supported types
    • Scalar value types
    • Enumerations
    • Message types as field types
    • Nested Messages
    • Maps
    • Unknown fields are well-formed protocol buffer serialized data representing fields that the parser does not recognize. Original version of proto3 did not include this when there are parsing problems. This feature is needed to detect schemas that does not match the message type and needed to support FAIL_SAFE and PERMISSIVE mode. This feature is available in proto3 with version. 3.5 onwards

What is not supported

  • Any requires the knowledge of the underlying object type when deserializing the message and generally not considered type safe
  • OneOf requires the knowledge of the object type that was encoded when deserializing the message
  • Custom Options is an advanced feature within protobuf where the users can define their own options
  • Catalyst types that are not natively supported in protobuf. This happens normally during serialization and an exception will be thrown when following types are encountered
    • DecimalType
    • DateType
    • TimestampType

Test cases covered

Tests have been written to test at different levels

  • from_protobuf / to_protobuf (ProtoFunctionSuite)
  • ProtoToCatalyst / CatalystToProto (ProtoCatalystDataConversionSuite)
  • ProtoDeserializer / ProtoSerializer (ProtoSerdeSuite)

ProtoFunctionSuite

A bunch of roundtrip tests go through to_protobuf(from_proto) or from_protobuf(to_proto) and compare the results. It also repeats some of the tests where to_protobuf is called without a descriptor file where the protobuf descriptor is built from the catalyst types.

  • roundtrip in to_protobuf and from_protobuf for struct for protobuf scalar types
  • roundtrip in to_protobuf(without descriptor params) and from_proto - struct for protobuf scalar types
  • roundtrip in from_protobuf and to_protobuf - Repeated protobuf types
  • roundtrip in from_protobuf and to_protobuf - Repeated Message Once
  • roundtrip in from_protobuf and to_protobuf - Repeated Message Twice
  • roundtrip in from_protobuf and to_protobuf - Map
  • roundtrip in from_protobuf and to_protobuf - Enum
  • roundtrip in from_protobuf and to_protobuf - Multiple Message
  • roundtrip in to_protobuf and from_protobuf - with null

ProtoSerdeSuite

  • Test basic conversion - serialize(deserialize(message)) == message
  • Fail to convert with field type mismatch - Make sure the right exception is thrown for incompatible schema for serializer and deserializer
  • Fail to convert with missing nested Protobuf fields
  • Fail to convert with deeply nested field type mismatch
  • Fail to convert with missing Catalyst fields

ProtoCatalystDataConversionSuite

  • ProtoToCatalyst(to_protobuf(basic_catalyst_types )): Boolean,Integer,Double,Float,Binary,String,Byte,Shost
  • Handle unsupported input of Message type: Serialize a message first and deserialize using a bad schema. Test with FAILFAST to get an exception and PERMISSIVE to get a null row
  • filter push-down to proto deserializer: Filtering the rows based on the filter during proto deserialization
  • Test ProtoDeserializer with binary message type

Cluster Testing

Recent(10-04-2022) changes have been tested with the configurations listed below.
Job: Kafka + Spark Structure Streaming
2 executors, each with 2048m and 2 cores
150-200 events/second each event having 100 fields(basic types, message, map type, enum)

Sandish Kumar Hebbani Naga and others added 30 commits August 11, 2022 11:07
array type support for from_proto function.
1) Basic Message repeated once
2) Basic Message repeated twice
unit tests for protobuf repeated message
1) Primitive types (TBD: Decimal)
2) Deserialization (push down filters not working yet)

Formatting errors to keep the IDE happy
successfully but also creates an UnkonwnFieldSet and use that
to infer that there was a problem with the schema.
case (INT, ShortType) =>
(updater, ordinal, value) => updater.setShort(ordinal, value.asInstanceOf[Short])

case (INT, ArrayType(IntegerType, containsNull)) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to detect if the proto type is repeated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang yes, The protobuf descriptor is used to build the catalyst schema. However, we can still make modifications like the one below. Do you believe it to be an improvement over the prior strategy?

      case  (INT, ArrayType(IntegerType, containsNull)) if protoType.isRepeated =>
        newArrayWriter(protoType, protoPath, catalystPath, IntegerType, containsNull)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can avoid duplicated code for the array branches. It seems that we can avoid so many newArrayWriter method calls.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, feel free to improve it in a followup PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang made changes.

@gengliangwang
Copy link
Member

@SandishKumarHN This PR LGTM overall.
Two important follow-ups:

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, let's have followup for documentation and other improvements

@gengliangwang
Copy link
Member

@SandishKumarHN @mposdev21 Thanks for the great work!
@rangadi Thanks for the review.
Merging to master

@gengliangwang
Copy link
Member

@SandishKumarHN I am going to set you as the assignee in https://issues.apache.org/jira/browse/SPARK-40654. Do you have a jira account?

@SandishKumarHN
Copy link
Contributor Author

@SandishKumarHN I am going to set you as the assignee in https://issues.apache.org/jira/browse/SPARK-40654. Do you have a jira account?

@gengliangwang yes, SandishKumarHN or sanysandish@gmail.com

@@ -1208,6 +1254,9 @@ object CopyDependencies {
if (jar.getName.contains("spark-connect") &&
!SbtPomKeys.profiles.value.contains("noshade-connect")) {
Files.copy(fid.toPath, destJar.toPath)
} else if (jar.getName.contains("spark-protobuf") &&
!SbtPomKeys.profiles.value.contains("noshade-protobuf")) {
Files.copy(fid.toPath, destJar.toPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have the variable fidProtobuf, but you don't use it anywhere. Possibly it should be used here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bersprockets thanks for pointing this out; I have made changes in the follow-up PR.

@@ -59,7 +59,7 @@ object BuildCommons {
) = Seq(
"core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe",
"tags", "sketch", "kvstore"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ Seq(connect)
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ Seq(connect) ++ Seq(protobuf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not needed right? (noticed this while fixing merge conflicts for Databricks's own repo).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean protobuf is already included in sqlProjeccts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi yes looks like this change is not needed. will make change this followup PR

@@ -390,7 +390,7 @@ object SparkBuild extends PomBuild {
val mimaProjects = allProjects.filterNot { x =>
Seq(
spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, networkYarn,
unsafe, tags, tokenProviderKafka010, sqlKafka010, connect
unsafe, tags, tokenProviderKafka010, sqlKafka010, connect, protobuf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for? Avro is not included here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi it is part of shading google-protobuf jar through sbt. so passing protobuf module through MimaBuild(is a tool for identifying binary incompatibilities in Scala libraries.).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @SandishKumarHN. How do we trigger this? With sbt build?

Copy link
Contributor Author

@SandishKumarHN SandishKumarHN Oct 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi there is a file to trigger,

dev/mima

@@ -1107,10 +1152,10 @@ object Unidoc {

(ScalaUnidoc / unidoc / unidocProjectFilter) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes,
yarn, tags, streamingKafka010, sqlKafka010, connect),
yarn, tags, streamingKafka010, sqlKafka010, connect, protobuf),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: Is this required? Wasn't needed for Avro. I don't have much context here. cc: @gengliangwang, @SandishKumarHN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi I was following the connector/connect module for shading jars and thought this is needed. Based on sbt-unidoc it skips Scaladoc for the protobuf project. not sure if we need this or not.

(JavaUnidoc / unidoc / unidocProjectFilter) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes,
yarn, tags, streamingKafka010, sqlKafka010, connect),
yarn, tags, streamingKafka010, sqlKafka010, connect, protobuf),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

@rangadi
Copy link
Contributor

rangadi commented Oct 12, 2022

Added two tickets to the Epic:

HyukjinKwon pushed a commit that referenced this pull request Oct 19, 2022
… to_protobuf

From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com)

This PR follows main PR #37972

The following is an example of how to use from_protobuf and to_protobuf in Pyspark.

```python
data = [("1", (2, "Alice", 109200))]
ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>"
df = spark.createDataFrame(data, ddl_schema)
desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
 import tempfile
# Writing a protobuf description into a file, generated by using
# connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
with tempfile.TemporaryDirectory() as tmp_dir:
...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
...     with open(desc_file_path, "wb") as f:
...         _ = f.write(bytearray.fromhex(desc_hex))
...         f.flush()
...         message_name = 'SimpleMessage'
...         proto_df = df.select(to_protobuf(df.value,
...         desc_file_path, message_name).alias("value"))
...         proto_df.show(truncate=False)
...         proto_df = proto_df.select(from_protobuf(proto_df.value,
...         desc_file_path, message_name).alias("value"))
...         proto_df.show(truncate=False)
    +----------------------------------------+
    |value                                   |
    +----------------------------------------+
    |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
    +----------------------------------------+
    +------------------+
    |value             |
    +------------------+
    |{2, Alice, 109200}|
    +------------------+
```

### ****Tests Covered****
- from_protobuf / to_protobuf (functions.py)

Closes #38212 from SandishKumarHN/PYSPARK_PROTOBUF.

Authored-by: SandishKumarHN <sanysandish@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
testingTypes.foreach { dt =>
val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1)
val filePath = testFile("protobuf/catalyst_types.desc").replace("file:/", "/")
test(s"single $dt with seed $seed") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SandishKumarHN Is it a flaky test?
image

Copy link
Contributor Author

@SandishKumarHN SandishKumarHN Oct 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum yeah it randomly fails(because of random() usage), the fix is done here #38286

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#38286 itself is blocked by other flaky tests.
sad irony or bad karma, can't decide ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi the above error occurs randomly, Can we contact someone from the spark-connect team to help us fix the #38286 error?

DeZepTup pushed a commit to DeZepTup/spark-custom that referenced this pull request Oct 31, 2022
…protobuf

From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com)

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. It is widely used in Kafka-based data pipelines. Unlike Avro, Spark does not have native support for protobuf. This PR provides two new functions from_protobuf/to_protobuf to read and write Protobuf data within a data frame.

The implementation is closely modeled after Avro implementation so that it is easy to understand and review the changes.

Following is an example of typical usage.

```scala
// `from_proto` requires absolute path of Protobuf schema file
// and the protobuf message within the file
val userProtoFile = "./examples/src/main/resources/user.desc"
val userProtoMsg = "User"

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "proto-topic-in")
  .load()

// 1. Decode the Protobuf data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Protobuf format.
val output = df
  .select(from_protobuf('value, userProtoFile, userProtoMsg) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_protobuf($"user.name", userProtoFile, userProtoMsg) as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "proto-topic-out")
  .start()
```

The new functions are very similar to Avro

- from_protobuf requires the proto descriptor file and the message type within that file which is similar to from_avro requiring the JSON schema.
- to_protobuf is similar to to_avro and does not require the proto descriptor file as it can build the schema (protobuf descriptor) from the catalyst types. Similarly, to_proto (like to_avro) can also take in the descriptor file for describing the schema

- Protobuf format proto3 is supported ( Even though proto2 and proto3 are inter-operable, we have explicitly tested only with proto3)
- Google protobuf supported types
    - Scalar value types
    - Enumerations
    - Message types as field types
    - Nested Messages
    - Maps
    - Unknown fields are well-formed protocol buffer serialized data representing fields that the parser does not recognize. Original version of proto3 did not include this when there are parsing problems. This feature is needed to detect schemas that does not match the message type and needed to support FAIL_SAFE and PERMISSIVE mode. This feature is available in proto3 with version. 3.5 onwards

- Any requires the knowledge of the underlying object type when deserializing the message and generally not considered type safe
- OneOf requires the knowledge of the object type that was encoded when deserializing the message
- Custom Options is an advanced feature within protobuf where the users can define their own options
- Catalyst types that are not natively supported in protobuf. This happens normally during serialization and an exception will be thrown when following types are encountered
    - DecimalType
    - DateType
    - TimestampType

Tests have been written to test at different levels

- from_protobuf / to_protobuf (ProtoFunctionSuite)
- ProtoToCatalyst / CatalystToProto (ProtoCatalystDataConversionSuite)
- ProtoDeserializer / ProtoSerializer (ProtoSerdeSuite)

A bunch of roundtrip tests go through to_protobuf(from_proto) or from_protobuf(to_proto) and compare the results. It also repeats some of the tests where to_protobuf is called without a descriptor file where the protobuf descriptor is built from the catalyst types.

- roundtrip in to_protobuf and from_protobuf for struct for protobuf scalar types
- roundtrip in to_protobuf(without descriptor params) and from_proto - struct for protobuf scalar types
- roundtrip in from_protobuf and to_protobuf - Repeated protobuf types
- roundtrip in from_protobuf and to_protobuf - Repeated Message Once
- roundtrip in from_protobuf and to_protobuf - Repeated Message Twice
- roundtrip in from_protobuf and to_protobuf - Map
- roundtrip in from_protobuf and to_protobuf - Enum
- roundtrip in from_protobuf and to_protobuf - Multiple Message
- roundtrip in to_protobuf and from_protobuf - with null

- Test basic conversion - serialize(deserialize(message)) == message
- Fail to convert with field type mismatch - Make sure the right exception is thrown for incompatible schema for serializer and deserializer
- Fail to convert with missing nested Protobuf fields
- Fail to convert with deeply nested field type mismatch
- Fail to convert with missing Catalyst fields

- ProtoToCatalyst(to_protobuf(basic_catalyst_types )): Boolean,Integer,Double,Float,Binary,String,Byte,Shost
- Handle unsupported input of Message type: Serialize a message first and deserialize using a bad schema. Test with FAILFAST to get an exception and PERMISSIVE to get a null row
- filter push-down to proto deserializer: Filtering the rows based on the filter during proto deserialization
- Test ProtoDeserializer with binary message type

Recent(10-04-2022) changes have been tested with the configurations listed below.
Job: Kafka + Spark Structure Streaming
2 executors, each with 2048m and 2 cores
150-200 events/second each event having 100 fields(basic types, message, map type, enum)

Closes apache#37972 from SandishKumarHN/SPARK_PROTO_1.

Lead-authored-by: SandishKumarHN <sanysandish@gmail.com>
Co-authored-by: Sandish Kumar Hebbani Naga <sandish-kumar.h-n@hpe.com>
Co-authored-by: Mohan Parthasarathy <mposdev21@gmail.com>
Co-authored-by: sandishkumarhn <sandishkumarhn@sandishkumarhns-MacBook-Pro.local>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
DeZepTup pushed a commit to DeZepTup/spark-custom that referenced this pull request Oct 31, 2022
… to_protobuf

From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com)

This PR follows main PR apache#37972

The following is an example of how to use from_protobuf and to_protobuf in Pyspark.

```python
data = [("1", (2, "Alice", 109200))]
ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>"
df = spark.createDataFrame(data, ddl_schema)
desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
 import tempfile
with tempfile.TemporaryDirectory() as tmp_dir:
...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
...     with open(desc_file_path, "wb") as f:
...         _ = f.write(bytearray.fromhex(desc_hex))
...         f.flush()
...         message_name = 'SimpleMessage'
...         proto_df = df.select(to_protobuf(df.value,
...         desc_file_path, message_name).alias("value"))
...         proto_df.show(truncate=False)
...         proto_df = proto_df.select(from_protobuf(proto_df.value,
...         desc_file_path, message_name).alias("value"))
...         proto_df.show(truncate=False)
    +----------------------------------------+
    |value                                   |
    +----------------------------------------+
    |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
    +----------------------------------------+
    +------------------+
    |value             |
    +------------------+
    |{2, Alice, 109200}|
    +------------------+
```

- from_protobuf / to_protobuf (functions.py)

Closes apache#38212 from SandishKumarHN/PYSPARK_PROTOBUF.

Authored-by: SandishKumarHN <sanysandish@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HeartSaVioR pushed a commit that referenced this pull request Nov 4, 2022
…lasses

This is the follow-up PR to #37972 and #38212

### What changes were proposed in this pull request?
1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json).
2. Support protobuf imports
3. validate protobuf timestamp and duration types.

### Why are the changes needed?
N/A

### Does this PR introduce _any_ user-facing change?
None

### How was this patch tested?
Existing tests should cover the validation of this PR.

CC: rangadi mposdev21 gengliangwang

Closes #38344 from SandishKumarHN/SPARK-40777-ProtoErrorCls.

Authored-by: SandishKumarHN <sanysandish@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
StringType -> "StringMsg")

testingTypes.foreach { dt =>
val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk This problem was solved by #38286. Was your fork in sync with master? I can open a JIRA if it fails again. I repeated the test with the above seed = 38. There were no failures.

[info] ProtobufCatalystDataConversionSuite:
[info] - single StructType(StructField(int32_type,IntegerType,true)) with seed 38 (67 milliseconds)
[info] - single StructType(StructField(double_type,DoubleType,true)) with seed 38 (22 milliseconds)
[info] - single StructType(StructField(float_type,FloatType,true)) with seed 38 (18 milliseconds)
[info] - single StructType(StructField(bytes_type,BinaryType,true)) with seed 38 (23 milliseconds)
[info] - single StructType(StructField(string_type,StringType,true)) with seed 38 (27 milliseconds)
[info] - Handle unsupported input of message type (28 milliseconds)
[info] - filter push-down to Protobuf deserializer (13 milliseconds)
[info] - ProtobufDeserializer with binary type (0 milliseconds)
[info] - Full names for message using descriptor file (1 millisecond)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happened on the recent master. The commit is 216a5d2c39ea3d9e869818692b2f4c0f2652aa56:

$ git diff
diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index 271c5b0fec..080bf1eb1f 100644
--- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -123,7 +123,7 @@ class ProtobufCatalystDataConversionSuite
     StringType -> ("StringMsg", ""))

   testingTypes.foreach { dt =>
-    val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1)
+    val seed = 38
     test(s"single $dt with seed $seed") {

       val (messageName, defaultValue) = catalystTypesToProtoMessages(dt.fields(0).dataType)
$ build/sbt "test:testOnly *ProtobufCatalystDataConversionSuite"
[info] - single StructType(StructField(double_type,DoubleType,true)) with seed 38 *** FAILED *** (10 milliseconds)
[info]   java.lang.NullPointerException:
[info]   at org.apache.spark.sql.protobuf.ProtobufCatalystDataConversionSuite.$anonfun$new$2(ProtobufCatalystDataConversionSuite.scala:134)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk thank you will look into it.

SandishKumarHN added a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
… to_protobuf

From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com)

This PR follows main PR apache#37972

The following is an example of how to use from_protobuf and to_protobuf in Pyspark.

```python
data = [("1", (2, "Alice", 109200))]
ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>"
df = spark.createDataFrame(data, ddl_schema)
desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
 import tempfile
# Writing a protobuf description into a file, generated by using
# connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
with tempfile.TemporaryDirectory() as tmp_dir:
...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
...     with open(desc_file_path, "wb") as f:
...         _ = f.write(bytearray.fromhex(desc_hex))
...         f.flush()
...         message_name = 'SimpleMessage'
...         proto_df = df.select(to_protobuf(df.value,
...         desc_file_path, message_name).alias("value"))
...         proto_df.show(truncate=False)
...         proto_df = proto_df.select(from_protobuf(proto_df.value,
...         desc_file_path, message_name).alias("value"))
...         proto_df.show(truncate=False)
    +----------------------------------------+
    |value                                   |
    +----------------------------------------+
    |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
    +----------------------------------------+
    +------------------+
    |value             |
    +------------------+
    |{2, Alice, 109200}|
    +------------------+
```

### ****Tests Covered****
- from_protobuf / to_protobuf (functions.py)

Closes apache#38212 from SandishKumarHN/PYSPARK_PROTOBUF.

Authored-by: SandishKumarHN <sanysandish@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
SandishKumarHN added a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…lasses

This is the follow-up PR to apache#37972 and apache#38212

### What changes were proposed in this pull request?
1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json).
2. Support protobuf imports
3. validate protobuf timestamp and duration types.

### Why are the changes needed?
N/A

### Does this PR introduce _any_ user-facing change?
None

### How was this patch tested?
Existing tests should cover the validation of this PR.

CC: rangadi mposdev21 gengliangwang

Closes apache#38344 from SandishKumarHN/SPARK-40777-ProtoErrorCls.

Authored-by: SandishKumarHN <sanysandish@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
dongjoon-hyun pushed a commit that referenced this pull request Jul 24, 2024
…nition from `protobuf`

### What changes were proposed in this pull request?
This PR removes the unused object definition `ScalaReflectionLock` from the `protobuf` module. `ScalaReflectionLock` is a definition at the access scope of `protobuf` package, which was defined in SPARK-40654 | #37972 and become unused in SPARK-41639 | #39147.

### Why are the changes needed?
Clean up unused definitions.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #47459 from LuciferYang/remove-ScalaReflectionLock.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
ilicmarkodb pushed a commit to ilicmarkodb/spark that referenced this pull request Jul 29, 2024
…nition from `protobuf`

### What changes were proposed in this pull request?
This PR removes the unused object definition `ScalaReflectionLock` from the `protobuf` module. `ScalaReflectionLock` is a definition at the access scope of `protobuf` package, which was defined in SPARK-40654 | apache#37972 and become unused in SPARK-41639 | apache#39147.

### Why are the changes needed?
Clean up unused definitions.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47459 from LuciferYang/remove-ScalaReflectionLock.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
fusheng-rd pushed a commit to fusheng-rd/spark that referenced this pull request Aug 6, 2024
…nition from `protobuf`

### What changes were proposed in this pull request?
This PR removes the unused object definition `ScalaReflectionLock` from the `protobuf` module. `ScalaReflectionLock` is a definition at the access scope of `protobuf` package, which was defined in SPARK-40654 | apache#37972 and become unused in SPARK-41639 | apache#39147.

### Why are the changes needed?
Clean up unused definitions.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47459 from LuciferYang/remove-ScalaReflectionLock.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…nition from `protobuf`

### What changes were proposed in this pull request?
This PR removes the unused object definition `ScalaReflectionLock` from the `protobuf` module. `ScalaReflectionLock` is a definition at the access scope of `protobuf` package, which was defined in SPARK-40654 | apache#37972 and become unused in SPARK-41639 | apache#39147.

### Why are the changes needed?
Clean up unused definitions.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47459 from LuciferYang/remove-ScalaReflectionLock.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants