-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27658][SQL] Add FunctionCatalog API #24559
Conversation
@jzhuge, @mccheah, @cloud-fan, and @marmbrus, this PR adds a |
Test build #105263 has finished for PR 24559 at commit
|
* @param input an input row | ||
* @return a result value | ||
*/ | ||
R produceResult(InternalRow input); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A UDF doesn't take an entire row as it's input, but some columns. e.g. SELECT substring(strCol, 3)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumes that Spark will create an InternalRow
to pass to the function. That's the easiest way to pass an arbitrary number of arguments that correspond to a struct schema.
Note that this doesn't need to be expensive. We can build an InternalRow
that exposes a projection of another and that can be reused for all of the UDF calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think about a UDF that adds 2 ints.
val row = InternalRow(i, j)
udf.call(row)
// inside udf.call
return row.getInt(0) + row.getInt(1)
is much slower than
udf.call(i, j)
// inside udf.call
return i + j
We need to think about the tradeoffs and pick between perf and ease-of-use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's a relevant comparison. Clearly, it's a bad idea to copy data into a new InternalRow
to pass it into a UDF. But InternalRow
is an interface so we can change how it works. We have an InternalRow
that exposes data from a ColumnarBatch
and one that joins partition values, we could similarly have an InternalRow
that wraps another InternalRow
for this access.
class ProjectingRow(wrappedPositions: Array[Int]) extends InternalRow {
var wrapped = null
def set(row: InternalRow): Unit = this.wrapped = row
def getInt(pos: Int): Int = wrapped.getInt(wrappedPositions(pos))
...
}
Then each UDF call becomes:
udfRow.set(inputRow)
val result = udf.call(udfRow)
And call
could be implemented as you'd expect:
public int call(InternalRow row) {
return row.getInt(0) + row.getInt(1)
}
I think that the overhead of set
is much better than using reflection or object inspectors like Hive.
I think we need a design doc for the UDF API. We need to think about ease-of-use and performance. |
@cloud-fan, I agree that we will eventually want a doc. This is intended to get everyone thinking about what it will look like and what the performance would be. |
@AndrewKL you might be interested in this. |
6b9b7eb
to
e520366
Compare
Test build #105705 has finished for PR 24559 at commit
|
Test build #105709 has finished for PR 24559 at commit
|
Any progress or unaddressed issue here? |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
Test build #129473 has finished for PR 24559 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of having a function catalog API, left some initial questions/suggestions.
* @param input an input row | ||
* @return updated aggregation state | ||
*/ | ||
S update(S state, InternalRow input); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to have a default implementation for taking an iterator of internalrows? Just thinking out loud.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there value for a function in being able to control iteration? And can Spark support it if there is?
I think there could be value for a function like limit
because the source could stop iteration early. But, I'm not sure what effect that would have on Spark to have an iterator that is not exhausted. Overall, I think there aren't very many cases where controlling iteration in the function has enough value to warrant the additional complexity in the API.
import org.apache.spark.sql.types.DataType; | ||
|
||
/** | ||
* Interface for a function that produces a result value by aggregating over multiple input rows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thinking back to our initial groupByKey impl, can we add a warning here that if folks do not implement the AssociativeAggregateFunction
they are going to force all values for a key onto a single node.
21a5f07
to
b3ba28d
Compare
Test build #131304 has finished for PR 24559 at commit
|
Test build #131305 has finished for PR 24559 at commit
|
Test build #131306 has finished for PR 24559 at commit
|
@rdblue @cloud-fan What do you think of the Transport API? It is simple, wraps InternalRows in the case of Spark, and portable between Spark, Presto, Hive and Avro (and potentially other data formats, so UDFs can probably be pushed to the format layer) |
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StructType} | ||
|
||
class AggregateFunctionSuite extends SparkFunSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add the missing import, @rdblue ?
[error] /home/jenkins/workspace/SparkPullRequestBuilder/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/AggregateFunctionSuite.scala:23:38: not found: type SparkFunSuite
[error] class AggregateFunctionSuite extends SparkFunSuite {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
Test build #131445 has finished for PR 24559 at commit
|
I think it's great to have people working on APIs for maintaining UDF libraries across projects. You may be wondering whether I think we should use that to call UDFs. I don't think that we would want to build support for a generic framework into Spark itself. I think Spark's API should be specific to Spark, just like the data source APIs are specific to Spark. That avoids complications like converting to Row or another representation for Hive. It should be possible to build a library using Transport that plugs in through this API, though. And it is great to have you looking at this and thinking about how it may be limited by the choices here. |
I think there are 2 types of APIs: Function Catalog APIs and UDF expression APIs (e.g., Generic UDFs). I mentioned the Transport API as a way to do the latter, and wanted to get your thoughts on the friendly-ship of the Function Catalog APIs to UDF expression APIs like Transport. To the user, Transport provides tools to make type validation and inference user-friendly (declarative using type signatures), and Java types that map to their SQL counterparts. To Spark, it is just an Expression API processing InternalRows. |
BoundFunction bind(StructType inputType); | ||
|
||
/** | ||
* Returns Function documentation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit -- the method is called "description", should we say that this returns a description of the function (as opposed to "documentation")?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think they're synonymous in this context.
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/FunctionCatalog.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may also need to update the PR description. For instance, it still mentions PartialAggregateFunction
.
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/BoundFunction.java
Outdated
Show resolved
Hide resolved
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #136783 has finished for PR 24559 at commit
|
Retest this please. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #136919 has finished for PR 24559 at commit
|
I saw this on other PRs as well. @maropu do you have any clue about it? |
Looks like there is something wrong with the GA's DNS.. We encounter the same binding issue in Kyuubi too - apache/kyuubi#489 |
37439c6
to
bb8f2aa
Compare
Kubernetes integration test starting |
Kubernetes integration test status failure |
Yea, it looks like a GA env issue. I saw the same error message in other test suites, e.g., RateStreamProviderSuite) I think the workaround for now is just to re-run a GA job (but, I will try to find a solution). FYI: @dongjoon-hyun @HyukjinKwon |
Test build #136966 has finished for PR 24559 at commit
|
Jenkins has passed and the GA failure is unrelated. I'm merging it to master, thanks! |
Thank you, @rdblue and all! |
🎉 |
Not sure if this is the best place for this, but we've encountered the binding failure multiple times in our own containerized environments and found this issue to be in containers that ended up getting entirely numeric hostnames.
In our case, setting a numeric hostname was our fault, and docker explicitly rejects numeric hostnames, it seems for the same reason. I'm not very familiar with GA and from a quick browsing am unsure if this could ever happen, but thought it might be good to keep in mind if this continues to be a sporadic failure and whether or not Spark should be aware of this failure mode. |
What changes were proposed in this pull request?
This adds a new API for catalog plugins that exposes functions to Spark. The API can list and load functions. This does not include create, delete, or alter operations.
There are 3 types of functions defined:
ScalarFunction
that produces a value for every callAggregateFunction
that produces a value after updates for a group of rowsFunctions loaded from the catalog by name as
UnboundFunction
. Once input arguments are determinedbind
is called on the unbound function to get aBoundFunction
implementation that is one of the 3 types above. Binding can fail if the function doesn't support the input type.BoundFunction
returns the result type produced by the function.How was this patch tested?
This includes a test that demonstrates the new API.