Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SparkConnect] Initial Protobuf Definitions #37075

Closed
wants to merge 2 commits into from

Conversation

grundprinzip
Copy link
Contributor

WIP

What changes were proposed in this pull request?

This is the first patch for Spark Connect that adds the base protocol buffers definitions.

Why are the changes needed?

These changes are part of the Spark Connect efforts.

Does this PR introduce any user-facing change?

No

How was this patch tested?

No tests add this point in time.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

sql/core/pom.xml Outdated Show resolved Hide resolved
@grundprinzip
Copy link
Contributor Author

Added the missing licence headers, fixed the M1 build.

@HyukjinKwon HyukjinKwon marked this pull request as draft July 6, 2022 00:27
@HyukjinKwon
Copy link
Member

Let me turn this to a draft since it's WIP

@@ -116,7 +116,7 @@
<log4j.version>2.17.2</log4j.version>
<!-- make sure to update IsolatedClientLoader whenever this version is changed -->
<hadoop.version>3.3.3</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<protobuf.version>3.21.1</protobuf.version>
Copy link
Contributor

@LuciferYang LuciferYang Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should shade and relocation protobuf in spark to avoid potential conflicts with other third-party libraries, such as hadoop

Copy link
Contributor Author

@grundprinzip grundprinzip Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely, the shading and relocation rules haven't been updated yet. I'm still a bit unclear what the best way to progress is to avoid conflicts with third-party packages or Spark consumers. I've discussed some approaches and one way would be to produce a shaded spark connect artifact that is then consumed in it's shaded version by spark itself or to shade and relocate after the build.

However, this PR is mostly for discussing the proto interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hadoop hdfs has its own shaded copy now so doesn't care(*); a protobuf upgrade is incompatible with any code compiled against the later version, so must be shaded somehow.

(*) more specifically, has a new problem, how to safely upgrade that shaded hadoop-thirdparty jar with guava, protobuf etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a protobuf upgrade is incompatible with any code compiled against the later version, so must be shaded somehow.

@steveloughran what does this mean? could you point to more details?
If I shaded 'com.google.protobuf', in my jar, I can update it anytime, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shaded, yes. if unshaded, then if you update protobuf.jar, all .class files compiled with the older version of protobuf are unlikely to link. let alone work

Metrics metrics = 3;

// Batch results of metrics.
message ArrowBatch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curius is it data in the arrow format, so we call it ArrowBatch here? Normally we should have same schema across batches, right? Why we need to store schema field per each batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the desired format is actually Arrow IPC Streams which include the schema directly. I will address this in this PR.

JOIN_TYPE_OUTER = 2;
JOIN_TYPE_LEFT_OUTER = 3;
JOIN_TYPE_RIGHT_OUTER = 4;
JOIN_TYPE_ANTI = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can also have JOIN_TYPE_SEMI.

enum JoinType {
JOIN_TYPE_UNSPECIFIED = 0;
JOIN_TYPE_INNER = 1;
JOIN_TYPE_OUTER = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: JOIN_TYPE_FULL_OUTER sounds clearer.

/*
Relation of type [[Sort]].
*/
message Sort {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to differentiate local vs global sort right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I don't think we need to differentiate between local and global sort here as this will always treat the sort on the full relation. Pushing a local sort, FWIR, is a physical optimization.


message AggregateFunction {
string name = 1;
repeated Expression arguments = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregate function (UDAF) would be very different from UDF/expression right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my experiments UDAF registered by name should just work, as the AggregateFunction is "unresolved". I might make it clearer by calling it UnresolvedAggregateFunction


When adding new relation types, they have to be registered here.
*/
message Relation {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add ID for incremental plan building.


message AggregateFunction {
string name = 1;
repeated Expression arguments = 2;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my experiments UDAF registered by name should just work, as the AggregateFunction is "unresolved". I might make it clearer by calling it UnresolvedAggregateFunction

/*
Relation of type [[Sort]].
*/
message Sort {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I don't think we need to differentiate between local and global sort here as this will always treat the sort on the full relation. Pushing a local sort, FWIR, is a physical optimization.

}

// A request to be executed by the service.
message Request {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add option to pass spark conf values as part of the request?

Metrics metrics = 3;

// Batch results of metrics.
message ArrowBatch {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the desired format is actually Arrow IPC Streams which include the schema directly. I will address this in this PR.

service SparkConnectService {

// Executes a request that contains the query and returns a stream of [[Response]].
rpc ExecutePlan(Request) returns (stream Response) {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest renaming this to something that avoids having Plan in the request as it can be confusing.

@@ -0,0 +1,155 @@
// Protocol Buffers - Google's data interchange format
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we change the header here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants