-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34205][SQL][SS] Add pipe to Dataset to enable Streaming Dataset pipe #31296
Conversation
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @viirya . This looks like a nice feature. I left a few comments.
Thank you @dongjoon-hyun. |
Test build #134378 has finished for PR 31296 at commit
|
assert(piped2(1).getString(0).trim == "2") | ||
} | ||
|
||
test("SPARK-34205: pipe Dataset with empty partition") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for making this sure!
.pipe(command = PipedRDD.tokenize(command), printRDDElement = printRDDElement) | ||
.mapPartitionsInternal { iter => | ||
val outputObject = ObjectOperator.wrapObjectToRow(outputObjectType) | ||
iter.map(ele => outputObject(ele)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ele
-> e
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed the variable name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. This looks reasonable to me.
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #134386 has finished for PR 31296 at commit
|
Test build #134389 has finished for PR 31296 at commit
|
* @group typedrel | ||
* @since 3.2.0 | ||
*/ | ||
def pipe(command: String): Dataset[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An open question: should we expose other params in the API:
def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false,
bufferSize: Int = 8192,
encoding: String = Codec.defaultCharsetCodec.name): RDD[String]
I believe the pipe(command: String)
should be the most common API. But I'm not sure how many scenarios the other params are needed(seems the environment variables are useful).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, for now I think a simple command parameter is pretty common case and should be enough. If there is some needs for other parameters, we can add them later.
Could we change the description a little bit:
|
Could you please describe the actual use case? I would like to confirm this works with complicated schema like array/map/nested struct with some binary columns, and for the case how forked process can deserialize inputs properly and applies operations and serialize again (that wouldn't matter much as the return is In addition, this would incur non-trivial serde cost on communication between Spark process and external process. Probably we also need to revisit which benefits this gives to us compared to what Spark provides now (UDF, or some others if I miss?). |
Our internal client needs to pipe streaming read from Kafka through a forked process, and currently with SS users cannot do it. I think the above question also applied to RDD.pipe. It's users' responsibility to make sure the process can understand the input.
I believe there are some cases that users have to use pipe. Users should consider it before choosing and using pipe API. I think it is well known issue for pipe. The point is, when users need to use pipe on streaming data like RDD and batch Dataset, but streaming Dataset cannot support it. |
Sure, thanks. |
Agree with @HeartSaVioR here. I think it is more general if we just provide a UDF for this... it also doesn’t pollute the Dataset API with something so rarely used. |
I'm not sure I understand your point here. So you mean when users have to use pipe, it is simply to ask them to write UDF instead of using pipe? |
I understand the functionality is lacking on SS. There's a workaround like foreachBatch -> toRDD -> pipe but streaming operations can't be added after calling pipe. So I'd agree that it'd be better to address the gap in any way. I feel default serialization logic on PipedRDD is also fragile and not well documented as well. (This actually makes me wondering, is PipedRDD widely adopted?) Is there any documentation/mention that T.toString is used as a serialization, and it doesn't escape line break so multiple lines string will be printed as multiple lines without any guard? The default implementation is too naive and even for primitive type it's not hard to find the hole. There's a parameter to customize the serialization and we can add it as well so it makes me less concerned, but default should be still reasonable and well explained for the limitations if any. Adding this to top level API would be easiest way to do as this simply leverages PipedRDD and the diff of this PR except tests is very small. This is a major upside. The major downside is that this only works if end users intend to send all columns as input and use the output as single string column. Suppose end users have 10 columns in their DataFrame and want to use pipe with only one column and also retain 9 columns for next operation (so 10 columns still remain including the output of pipe). How to do it? And like I said above I don't think they'll be able to understand the serialized form for the multiple columns or complicated column types. They'll end up using custom class for type T which overrides toString. (And the output of toString shouldn't be multiple lines.) Adding this to function for parameter type string or binary would require more work and it would force end users to provide the serialized form as input if they want to pass multiple columns or non primitive column. But at least they should know what they are doing on using the function, and there's no magic behind the curtain, so should have no issue on serialization. Some functions like
I'd feel more natural if the relation is 1-to-1 for normal case and N-to-1 "across partitions" for aggregation (so the output will be |
Also, when they can do arbitrary aggregation in forked process per partition, the aggregation is only bound to the micro-batch, even only bound to the single partition, which is no longer matched the concept of aggregation in SS which should be across the micro-batches. (That said, the output can be different between batch and streaming even if the input is same.) |
The current RDD.pipe doesn't explicitly mention we output the string of T. This is what it said "All elements of each input partition are written to a process's stdin as lines of input separated by a newline." If you think it is not enough, we can improve the API document. About the parameter, do you mean
As we discussed before, users don't need to understand how Spark serializes object T to Internal Row, this still hide from users. With I think some of the questions are over the scope of the pipe concept. For example the one about only pipe one column but retain 9 columns for next operation. User also cannot only pipe only field of object T and retain all others after by using RDD's pipe. |
Hmm, I'd say not to think it as aggregation, they are different things. Pipe is widely used in Unix/Linux command line and I don't think we should mix it with aggregation. |
BTW, I know PipedRDD (RDD.pipe) is widely used. At least I heard many times people use pipe to adapt existing program with Spark. |
Yes. This is definitely not enough. This is only intuitive if the type T is primitive like integer, long, String, etc. If you have type T as Java bean and override toString with IDE toString generator, the format is depending on the IDE. case class is depending on Scala, and I don't think the representation of toString is something Scala should guarantee compatibility. Makes sense?
Once you're adding the pipe to the one of DataFrame operations, the operation 'pipe' should be evaluated as a DataFrame operation. End users using pipe wouldn't use the trivial external process like "cat" or "wc -l" which completely ignore the structure of input, but I can't find any example beyond such thing. That's why I want to hear the actual use case, what is the type of T Dataset, which task the external process does, what is the output of external process, should they need to break down the output to multiple columns after that.
The thing is that the output can be different between batch and streaming, and that is entirely depending on the external process. Any external process does the aggregation ("wc -l" does still an aggregation in effect, whatever you say) breaks the concept and the result is varying how the input stream is split to multiple micro-batches. I see the possibility existing APIs can also break such thing (like mapPartitions/flatMap with user function which doesn't consider the fact) so I'd be OK if everyone doesn't mind. I still think restricting the relation to 1-to-1 / N-to-1 would be ideal, but that requires external process to be implemented as Spark's requirement which might not be possible, so... |
I don't have a strong opinion. It seems consistent with |
That is what
I think these are mainly focus how object T is going to work. I said it is not different than RDD working because they both work on object T. Custom function provided by users should take the responsibility to produce necessary output to the forked process. (Note I have not add the parameter yet. Maybe do it tomorrow.)
"pipe" is not invented by Spark. I don't think we should provide a half-baked pipe function. It is worse than nothing. Not to mention that the technical point you said. IMHO, it brings more inconsistency between RDD pipe, Dataset pipe and streaming Dataset pipe. I think what we can do is to explicitly clarify the effect of pipe on micro-batch streaming is only per micro-batch not cross entire stream. |
Just like RDD pipe, there will be custom function users can provide for complex type T to produce necessary output to the forked process. I will add it later. |
Is it too hard requirement to explain the actual use case, especially you've said you have internal customer claiming this feature? I don't think my request requires anything needed redaction. (If there's something you can abstract the details or do some redaction by yourself.) My first comment was asking about the actual use case and I have been asking consistently. I don't think |
I'm not sure how much details you'd like to see? The script we will call? The data input it accepts? The use case is generally to pipe through an external process. But if you insist, I think I need to get more information for the client. I just wonder if it is necessary to provide details instead of general use case of pipe. To know what input/script the client uses will be helpful? We are not to provide the client-specific API. Okay, let me try to get more details tomorrow to make everyone happy here.
For DataFrame, we know it is actually Dataset[Row]. If users need custom print-out, |
I'm quoting my comment:
There's no "detail" on business logic. If the type T is Java bean or case class or something, just mention it as it's a Java bean/case class. If that's typed but bound to the non-primitive type like tuple please mention it. If that's untyped you need to mention it, including whether it has complex column(s). Fair enough for you?
What is the output of I already pointed out earlier that the default serializer only makes sense if the type T is primitive for Java/Scala which the output of T.toString is relatively intuitive for end users. For other types the default serializer won't work (or even end users are able to infer it, still fragile) including "untyped".
The problem is other typed functions get the Row as simply |
This is why we need a custom function |
OK so you seem to agree default serializer doesn't work for untyped. And I also think we agree default serializer is problematic for non-primitive type T on typed. These cases sound majority, and end users only get benefits if their Dataset is That said, is it still beneficial to provide default serializer and lead end users be confused if they don't know about the details? default must be reasonable and I don't think current default serializer is reasonable on majority cases. I think that is a non-trivial difference between RDD.pipe and Dataset.pipe. |
Hmm, I'm fine if you think we should always require a custom function to produce the output. |
Kubernetes integration test starting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm neutral on adding this to the top level API, but if everyone doesn't have concern about it, that is OK to me as well. Left some comments in case others are OK with adding this in the top level API.
* @group typedrel | ||
* @since 3.2.0 | ||
*/ | ||
def pipe(command: String): Dataset[String] = { | ||
def pipe(command: String, printElement: (T, String => Unit) => Unit): Dataset[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see all examples are simply calling print function with converted string. Could we simply get serializer func like serializeFn: (T => String)
instead, or have two overloaded methods allowing both cases if we are unsure printElement might be necessary in some cases? This should simplify the test codes and actual user codes (as _.toString
would simply work).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is okay. Most cases there should be no difference. Only difference might be when we want to print out multi-lines per obj:
def printElement(obj: T, printFunc: String => Unit) = {
printFunc(obj.a)
printFunc(obj.b)
...
}
def serializeFn(obj: T): String = {
s"${obj.a}\n${obj.b}\n..."
}
I'm fine with either one as they are working the same effect although taking different form.
@@ -2897,14 +2897,22 @@ class Dataset[T] private[sql]( | |||
* each line of stdout resulting in one element of the output partition. A process is invoked | |||
* even for empty partitions. | |||
* | |||
* @param command command to run in forked process. | |||
* Note that for micro-batch streaming Dataset, the effect of pipe is only per micro-batch, not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd kindly explain the case they need to be careful, like e.g. If your external process does aggregation on inputs, the aggregation is applied per a partition in micro-batch. You may want to aggregate these outputs after calling pipe to get global aggregation across partitions and also across micro-batches.
Test build #134424 has finished for PR 31296 at commit
|
AddData(inputData, 1, 2, 3), | ||
CheckAnswer(Row("1"), Row("2"), Row("3")), | ||
AddData(inputData, 4), | ||
CheckAnswer(Row("1"), Row("2"), Row("3"), Row("4"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather check CheckNewAnswer(Row("4"))
to ensure inputs in previous batch are not affected to the next batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified. Thanks.
Kubernetes integration test status failure |
assume(TestUtils.testCommandAvailable("cat")) | ||
|
||
val nums = spark.range(4) | ||
val piped = nums.pipe("cat", (l, printFunc) => printFunc(l.toString)).toDF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya, what do you think about we expose an transform
equivalent expression exposed as DSL? e.g.)
scala> val data = Seq((123, "first"), (4567, "second")).toDF("num", "word")
data: org.apache.spark.sql.DataFrame = [num: int, word: string]
scala> data.createOrReplaceTempView("t1")
scala> sql("select transform(*) using 'cat' from t1").show()
+----+------+
| key| value|
+----+------+
| 123| first|
|4567|second|
+----+------+
scala> data.repartition(1).createOrReplaceTempView("t1")
scala> sql("select transform(*) using 'wc -l' as (echo) from t1").show()
+--------+
| echo|
+--------+
| 2|
+--------+
Spark lately added the native support of script transformation, and I think it could do what you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could address most of comments here such as #31296 (comment), being typed or non-standard stuff (as it follows Hive's feature) - at least we have one format to follow, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great if we can leverage the script transform #29414.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point! I don't know how exhaustive Spark implements the Hive's transform feature, but the description in Hive's manual for transform looks pretty much powerful, and much beyond on what we plan to provide with pipe.
Looks like the reason of absence of pipe in DataFrame is obvious - transform just replaced it. (Not valid as it was only available for Hive support) That looks to be only available in SQL statement so we still need DSL support for using it in SS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered transform
at the beginning as it looks close to pipe. I don't pick it for this because I only see it is exposed as SQL syntax and I am not sure if it works for streaming Dataset? Another reason is that it is designed for untyped Dataset. So if you want to pipe complex object T with custom output instead of column-wise output, "transform" isn't as powerful as "pipe".
Although I asked our customer and they only use primitive type Dataset for now. So untyped Dataset should be enough for the purpose.
Another reason is although the query looks like "SELECT TRANSFORM(...) FROM ...", it is actually not an expression but implemented as an operator. If we have it as DSL expression, there will be some problems.
Unlike Window function, it seems to me that we cannot have a query like "SELECT a, TRANSFORM(...), c FROM ..." or in DSL format like:
df.select($"a", $"b", transform(...) ...)
But for Window function we can do:
df.select($"a", $"b", lead("key", 1).over(window) ...)
That being said, in the end it is also Dataset.transform
, instead of an expression DSL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what @viirya said. I'd agree that transform looks to behave as an operation (not sure that is intended or not, but looks like at least for now) and transform also requires top level API to cover up like we did for mapPartition
.
If we are OK to add the top level API (again not yet decided so just a 2 cents) then which one? I'd rather say transform
is something we'd like to be consistent with, instead of pipe
. They have been exposed as SQL statement, and probably used widely for Spark SQL users, and even Hive users. If we want feature parity then my vote goes to transform
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the top-level API, you mean Plan node like CollectSet or other thing?
@AngersZhuuuu The top-level API here means the new API added in Dataset.
Can you share how to make transformation as an expression? I don't think It is an expression at all.
@viirya Sure. I followed the comment "I have thought this problem too, first I want to add transform as a DSL function, in this way, we need to make an equivalent ScriptTransformation expression first. We can think that this is just a new expression, or a new function
" from @AngersZhuuuu. To add a new expression ScriptTransformationExpression
for ScriptTransformation
and turn to ScriptTransformationExec
.
Two limitations here might need more discussion:
- The script transformation may produce more than one row for a single row, so it cannot use together with other expressions.
- The script in hive transformation is partition-based, but if we make it an expression, it becomes row based.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @HeartSaVioR. At least I am glad that the discussion can go forward no matter which one you prefer to add.
Honestly I think transform is a weird stuff and it is only for to have pipe feature under Hive SQL syntax. I don't like the transform syntax which is inconvenient to use and verbose. It is not as flexible as pipe's custom print-out function. BTW, for typed dataset, because transform is for untyped, so it is bound to its serialization row format. In the early discussion there are some comments against that, although it is clarified later pipe doesn't suffer from this issue.
If we still cannot get a consensus, maybe I should raise a discussion on dev mailing list to decide pipe or transform top-level API should be added.
@xuanyuanking @AngersZhuuuu The SQL syntax of transform "SELECT TRANSFORM(...)" is pretty confusing. It looks like expression but actually it is an operator, and IMHO you cannot turn it to an expression. If you force it to be an expression, you will create some inconsistency and weird cases. transform is like pipe and their input/output relation is not 1:1 or N:1 but arbitrary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, just a clarify, we mean we can add an expression (or function?) like TRANSFORM
, not convert TRANSFORM
to it. And we can extract some common logic with ScriptTransformationExec
. The usage such as
script_transform(input, script, output)
input can be a list of input col such as a, b, c
out put can a define such as col1 string, col2 Int
and the return type is Array<Struct<col1: String, col2: Int>>
(This DataType can cover all case, and let user to handle)
Then when execute we can make it just run as default format such as ROW FORMAT DELIMIT
A simple and general way to implement and then we can add it as a DSL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think this expression can cover all kind of external process output? Transform and pipe have arbitrary relation between input and output. External process can output a line for each input line, can do aggregation-like output like wc -l
, can output a line per 2 or 3 input lines, etc. I don't know how do you define an expression that the output type is not deterministic.
cc @AngersZhuuuu too FYI |
Test build #134493 has finished for PR 31296 at commit
|
assume(TestUtils.testCommandAvailable("cat")) | ||
|
||
val nums = spark.range(4) | ||
val piped = nums.pipe("cat", (l, printFunc) => printFunc(l.toString)).toDF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: any reason for toDF
(as pipe
gives a Dataset[String]
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just for checkAnswer
.
|
||
checkAnswer(piped, Row("0") :: Row("1") :: Row("2") :: Row("3") :: Nil) | ||
|
||
val piped2 = nums.pipe("wc -l", (l, printFunc) => printFunc(l.toString)).toDF.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why toDF
?
|
||
val inputData = MemoryStream[Int] | ||
val piped = inputData.toDS() | ||
.pipe("cat", (n, printFunc) => printFunc(n.toString)).toDF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why toDF
?
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This patch proposes to add
pipe
API toDataset
to enable pipe feature for streaming Dataset.Why are the changes needed?
Dataset
doesn't havepipe
API butRDD
has it. Although for normalDataset
, user can convert aDataset
toRDD
and callRDD.pipe
and then convert it back to aDataset
, for streaming Dataset it is not possible since queries with streaming sources must be executed withwriteStream.start()
.So that being said, this is actually a requirement from Structured Streaming, but we need to add
pipe
API toDataset
to enable it in Structured Streaming. FromDataset
perspective, it is also easier to useDataset
API instead of converting betweenDataset
andRDD
.Does this PR introduce any user-facing change?
Yes, a new API
pipe
is added toDataset
.How was this patch tested?
Unit test.