Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34205][SQL][SS] Add pipe to Dataset to enable Streaming Dataset pipe #31296
[SPARK-34205][SQL][SS] Add pipe to Dataset to enable Streaming Dataset pipe #31296
Changes from 1 commit
d4f9457
c6e57c1
ff0da84
45bd4a7
ac7460b
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd kindly explain the case they need to be careful, like
e.g. If your external process does aggregation on inputs, the aggregation is applied per a partition in micro-batch. You may want to aggregate these outputs after calling pipe to get global aggregation across partitions and also across micro-batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see all examples are simply calling print function with converted string. Could we simply get serializer func like
serializeFn: (T => String)
instead, or have two overloaded methods allowing both cases if we are unsure printElement might be necessary in some cases? This should simplify the test codes and actual user codes (as_.toString
would simply work).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is okay. Most cases there should be no difference. Only difference might be when we want to print out multi-lines per obj:
I'm fine with either one as they are working the same effect although taking different form.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya, what do you think about we expose an
transform
equivalent expression exposed as DSL? e.g.)Spark lately added the native support of script transformation, and I think it could do what you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could address most of comments here such as #31296 (comment), being typed or non-standard stuff (as it follows Hive's feature) - at least we have one format to follow, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great if we can leverage the script transform #29414.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point! I don't know how exhaustive Spark implements the Hive's transform feature, but the description in Hive's manual for transform looks pretty much powerful, and much beyond on what we plan to provide with pipe.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform#LanguageManualTransform-Transform/Map-ReduceSyntax
Looks like the reason of absence of pipe in DataFrame is obvious - transform just replaced it.(Not valid as it was only available for Hive support) That looks to be only available in SQL statement so we still need DSL support for using it in SS.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered
transform
at the beginning as it looks close to pipe. I don't pick it for this because I only see it is exposed as SQL syntax and I am not sure if it works for streaming Dataset? Another reason is that it is designed for untyped Dataset. So if you want to pipe complex object T with custom output instead of column-wise output, "transform" isn't as powerful as "pipe".Although I asked our customer and they only use primitive type Dataset for now. So untyped Dataset should be enough for the purpose.
Another reason is although the query looks like "SELECT TRANSFORM(...) FROM ...", it is actually not an expression but implemented as an operator. If we have it as DSL expression, there will be some problems.
Unlike Window function, it seems to me that we cannot have a query like "SELECT a, TRANSFORM(...), c FROM ..." or in DSL format like:
But for Window function we can do:
That being said, in the end it is also
Dataset.transform
, instead of an expression DSL.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what @viirya said. I'd agree that transform looks to behave as an operation (not sure that is intended or not, but looks like at least for now) and transform also requires top level API to cover up like we did for
mapPartition
.If we are OK to add the top level API (again not yet decided so just a 2 cents) then which one? I'd rather say
transform
is something we'd like to be consistent with, instead ofpipe
. They have been exposed as SQL statement, and probably used widely for Spark SQL users, and even Hive users. If we want feature parity then my vote goes totransform
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu The top-level API here means the new API added in Dataset.
@viirya Sure. I followed the comment "
I have thought this problem too, first I want to add transform as a DSL function, in this way, we need to make an equivalent ScriptTransformation expression first. We can think that this is just a new expression, or a new function
" from @AngersZhuuuu. To add a new expressionScriptTransformationExpression
forScriptTransformation
and turn toScriptTransformationExec
.Two limitations here might need more discussion:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @HeartSaVioR. At least I am glad that the discussion can go forward no matter which one you prefer to add.
Honestly I think transform is a weird stuff and it is only for to have pipe feature under Hive SQL syntax. I don't like the transform syntax which is inconvenient to use and verbose. It is not as flexible as pipe's custom print-out function. BTW, for typed dataset, because transform is for untyped, so it is bound to its serialization row format. In the early discussion there are some comments against that, although it is clarified later pipe doesn't suffer from this issue.
If we still cannot get a consensus, maybe I should raise a discussion on dev mailing list to decide pipe or transform top-level API should be added.
@xuanyuanking @AngersZhuuuu The SQL syntax of transform "SELECT TRANSFORM(...)" is pretty confusing. It looks like expression but actually it is an operator, and IMHO you cannot turn it to an expression. If you force it to be an expression, you will create some inconsistency and weird cases. transform is like pipe and their input/output relation is not 1:1 or N:1 but arbitrary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, just a clarify, we mean we can add an expression (or function?) like
TRANSFORM
, not convertTRANSFORM
to it. And we can extract some common logic withScriptTransformationExec
. The usage such asinput can be a list of input col such as
a, b, c
out put can a define such as
col1 string, col2 Int
and the return type is
Array<Struct<col1: String, col2: Int>>
(This DataType can cover all case, and let user to handle)Then when execute we can make it just run as default format such as
ROW FORMAT DELIMIT
A simple and general way to implement and then we can add it as a DSL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think this expression can cover all kind of external process output? Transform and pipe have arbitrary relation between input and output. External process can output a line for each input line, can do aggregation-like output like
wc -l
, can output a line per 2 or 3 input lines, etc. I don't know how do you define an expression that the output type is not deterministic.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: any reason for
toDF
(aspipe
gives aDataset[String]
)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just for
checkAnswer
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why
toDF
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for making this sure!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why
toDF
?