Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44059] Add analyzer support of named arguments for built-in functions #42020

Closed
wants to merge 5 commits into from
Closed

[SPARK-44059] Add analyzer support of named arguments for built-in functions #42020

wants to merge 5 commits into from

Conversation

learningchess2003
Copy link
Contributor

What changes were proposed in this pull request?

Add analyzer support for named function arguments.

Why are the changes needed?

Part of the project needed for general named function argument support.

Does this PR introduce any user-facing change?

We added support for named arguments for the CountMinSketchAgg and Mask SQL functions.

How was this patch tested?

A new suite was added for this test called NamedArgumentFunctionSuite.

@learningchess2003
Copy link
Contributor Author

@dtenedor @cloud-fan @anchovYu @MaxGekk @allisonwang-db Please take a look when you have time!

usage = "_FUNC_(expr) - Separates the elements of array `expr` into multiple rows, or the elements of map `expr` into multiple rows and columns. Unless specified otherwise, uses the default column name `col` for elements of the array or `key` and `value` for the elements of the map.",
examples = """
Examples:
> SELECT _FUNC_(array(10, 20));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the function doc of the TVF version of explode. Shall we show the example with the TVF syntax? SELECT * FROM _FUNC_(array(10, 20))

usage = "_FUNC_(expr) - Separates the elements of array `expr` into multiple rows, or the elements of map `expr` into multiple rows and columns. Unless specified otherwise, uses the default column name `col` for elements of the array or `key` and `value` for the elements of the map.",
examples = """
Examples:
> SELECT _FUNC_(array(10, 20));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

since = "3.4.0",
group = "string_funcs")
// scalastyle:on line.size.limit
object MaskExpressionBuilder extends ExpressionBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: if we put MaskExpressionBuilder right before case class Mask, the git code diff can be smaller.

Copy link
Contributor

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

Copy link
Contributor

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

> SELECT _FUNC_(array(10, 20));
10
20
> SELECT _FUNC_(collection => array(10, 20));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add one example for pos_explode?

Copy link
Contributor

@anchovYu anchovYu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

* @param default The default value of the argument. If the default is none, then that means the
* argument is required. If no argument is provided, an exception is thrown.
*/
case class NamedArgument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can this case class fit on the same line?

}

override def build(funcName: String, expressions: Seq[Expression]): Expression = {
new Mask(expressions(0), expressions(1), expressions(2), expressions(3), expressions(4))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could you still add an assertion on the size of the expressions? This way build here as an independent function is more complete itself.

@@ -1956,7 +1974,13 @@
"Not allowed to implement multiple UDF interfaces, UDF class <className>."
]
},
"NAMED_ARGUMENTS_SUPPORT_DISABLED" : {
"NAMED_PARAMETERS_NOT_SUPPORTED" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The object is named as object NamedArgumentsSupport. Could you unify the naming on the 'parameters' and 'arguments'?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, it seems reasonable for the naming of NamedArgumentsSupport but NAMED_PARAMETERS_NOT_SUPPORTED.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serge was actually the one who recommended the name change. It should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I agree with the change in the error class to parameter, but I was thinking if similar changes should be applied on NamedArgumentsSupport, e.g. NamedParametersSupport.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that makes sense.

@learningchess2003
Copy link
Contributor Author

@cloud-fan Thanks for all the comments! I've addressed comments from the other reviewers. All of the tests passed except a flaky which I might rerun to see if we can pass it. Otherwise, I've already seen an instance where we had an all green. If you think this PR is ready, you can merge it.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.5! (half of this feature is already in 3.5)

@cloud-fan cloud-fan closed this in 228b5db Jul 18, 2023
cloud-fan pushed a commit that referenced this pull request Jul 18, 2023
…nctions

### What changes were proposed in this pull request?
Add analyzer support for named function arguments.

### Why are the changes needed?
Part of the project needed for general named function argument support.

### Does this PR introduce _any_ user-facing change?
We added support for named arguments for the ```CountMinSketchAgg``` and ```Mask``` SQL functions.

### How was this patch tested?
A new suite was added for this test called NamedArgumentFunctionSuite.

Closes #42020 from learningchess2003/44059-final.

Authored-by: Richard Yu <richard.yu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 228b5db)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ueshin added a commit that referenced this pull request Aug 14, 2023
### What changes were proposed in this pull request?

Supports named arguments in Python UDTF.

For example:

```py
>>> udtf(returnType="a: int")
... class TestUDTF:
...     def eval(self, a, b):
...         yield a,
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> TestUDTF(a=lit(10), b=lit("x")).show()
+---+
|  a|
+---+
| 10|
+---+

>>> TestUDTF(b=lit("x"), a=lit(10)).show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x')").show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(b=>'x', a=>10)").show()
+---+
|  a|
+---+
| 10|
+---+
```

or:

```py
>>> udtf
... class TestUDTF:
...     staticmethod
...     def analyze(**kwargs: AnalyzeArgument) -> AnalyzeResult:
...         return AnalyzeResult(
...             StructType(
...                 [StructField(key, arg.data_type) for key, arg in sorted(kwargs.items())]
...             )
...         )
...     def eval(self, **kwargs):
...         yield tuple(value for _, value in sorted(kwargs.items()))
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x', x=>100.0)").show()
+---+---+-----+
|  a|  b|    x|
+---+---+-----+
| 10|  x|100.0|
+---+---+-----+

>>> spark.sql("SELECT * FROM test_udtf(x=>10, a=>'x', z=>100.0)").show()
+---+---+-----+
|  a|  x|    z|
+---+---+-----+
|  x| 10|100.0|
+---+---+-----+
```

### Why are the changes needed?

Now that named arguments are supported (#41796, #42020).

It should be supported in Python UDTF.

### Does this PR introduce _any_ user-facing change?

Yes, named arguments will be available for Python UDTF.

### How was this patch tested?

Added related tests.

Closes #42422 from ueshin/issues/SPARK-44749/kwargs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
valentinp17 pushed a commit to valentinp17/spark that referenced this pull request Aug 24, 2023
### What changes were proposed in this pull request?

Supports named arguments in Python UDTF.

For example:

```py
>>> udtf(returnType="a: int")
... class TestUDTF:
...     def eval(self, a, b):
...         yield a,
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> TestUDTF(a=lit(10), b=lit("x")).show()
+---+
|  a|
+---+
| 10|
+---+

>>> TestUDTF(b=lit("x"), a=lit(10)).show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x')").show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(b=>'x', a=>10)").show()
+---+
|  a|
+---+
| 10|
+---+
```

or:

```py
>>> udtf
... class TestUDTF:
...     staticmethod
...     def analyze(**kwargs: AnalyzeArgument) -> AnalyzeResult:
...         return AnalyzeResult(
...             StructType(
...                 [StructField(key, arg.data_type) for key, arg in sorted(kwargs.items())]
...             )
...         )
...     def eval(self, **kwargs):
...         yield tuple(value for _, value in sorted(kwargs.items()))
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x', x=>100.0)").show()
+---+---+-----+
|  a|  b|    x|
+---+---+-----+
| 10|  x|100.0|
+---+---+-----+

>>> spark.sql("SELECT * FROM test_udtf(x=>10, a=>'x', z=>100.0)").show()
+---+---+-----+
|  a|  x|    z|
+---+---+-----+
|  x| 10|100.0|
+---+---+-----+
```

### Why are the changes needed?

Now that named arguments are supported (apache#41796, apache#42020).

It should be supported in Python UDTF.

### Does this PR introduce _any_ user-facing change?

Yes, named arguments will be available for Python UDTF.

### How was this patch tested?

Added related tests.

Closes apache#42422 from ueshin/issues/SPARK-44749/kwargs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
ueshin added a commit that referenced this pull request Aug 24, 2023
…andas UDFs

### What changes were proposed in this pull request?

Supports named arguments in scalar Python/Pandas UDF.

For example:

```py
>>> udf("int")
... def test_udf(a, b):
...     return a + 10 * b
...
>>> spark.udf.register("test_udf", test_udf)

>>> spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))).show()
+---------------------------------+
|test_udf(b => (id * 10), a => id)|
+---------------------------------+
|                                0|
|                              101|
+---------------------------------+

>>> spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)").show()
+---------------------------------+
|test_udf(b => (id * 10), a => id)|
+---------------------------------+
|                                0|
|                              101|
+---------------------------------+
```

or:

```py
>>> pandas_udf("int")
... def test_udf(a, b):
...     return a + 10 * b
...
>>> spark.udf.register("test_udf", test_udf)

>>> spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))).show()
+---------------------------------+
|test_udf(b => (id * 10), a => id)|
+---------------------------------+
|                                0|
|                              101|
+---------------------------------+

>>> spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)").show()
+---------------------------------+
|test_udf(b => (id * 10), a => id)|
+---------------------------------+
|                                0|
|                              101|
+---------------------------------+
```

### Why are the changes needed?

Now that named arguments support was added (#41796, #42020).

Scalar Python/Pandas UDFs can support it.

### Does this PR introduce _any_ user-facing change?

Yes, named arguments will be available for scalar Python/Pandas UDFs.

### How was this patch tested?

Added related tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42617 from ueshin/issues/SPARK-44918/kwargs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
ueshin added a commit that referenced this pull request Sep 1, 2023
…s UDFs

### What changes were proposed in this pull request?

Supports named arguments in aggregate Pandas UDFs.

For example:

```py
>>> pandas_udf("double")
... def weighted_mean(v: pd.Series, w: pd.Series) -> float:
...     import numpy as np
...     return np.average(v, weights=w)
...
>>> df = spark.createDataFrame(
...     [(1, 1.0, 1.0), (1, 2.0, 2.0), (2, 3.0, 1.0), (2, 5.0, 2.0), (2, 10.0, 3.0)],
...     ("id", "v", "w"))

>>> df.groupby("id").agg(weighted_mean(v=df["v"], w=df["w"])).show()
+---+-----------------------------+
| id|weighted_mean(v => v, w => w)|
+---+-----------------------------+
|  1|           1.6666666666666667|
|  2|            7.166666666666667|
+---+-----------------------------+

>>> df.groupby("id").agg(weighted_mean(w=df["w"], v=df["v"])).show()
+---+-----------------------------+
| id|weighted_mean(w => w, v => v)|
+---+-----------------------------+
|  1|           1.6666666666666667|
|  2|            7.166666666666667|
+---+-----------------------------+
```

or with window:

```py
>>> w = Window.partitionBy("id").orderBy("v").rowsBetween(-2, 1)

>>> df.withColumn("wm", weighted_mean(v=df.v, w=df.w).over(w)).show()
+---+----+---+------------------+
| id|   v|  w|                wm|
+---+----+---+------------------+
|  1| 1.0|1.0|1.6666666666666667|
|  1| 2.0|2.0|1.6666666666666667|
|  2| 3.0|1.0| 4.333333333333333|
|  2| 5.0|2.0| 7.166666666666667|
|  2|10.0|3.0| 7.166666666666667|
+---+----+---+------------------+

>>> df.withColumn("wm", weighted_mean_udf(w=df.w, v=df.v).over(w)).show()
+---+----+---+------------------+
| id|   v|  w|                wm|
+---+----+---+------------------+
|  1| 1.0|1.0|1.6666666666666667|
|  1| 2.0|2.0|1.6666666666666667|
|  2| 3.0|1.0| 4.333333333333333|
|  2| 5.0|2.0| 7.166666666666667|
|  2|10.0|3.0| 7.166666666666667|
+---+----+---+------------------+
```

### Why are the changes needed?

Now that named arguments support was added (#41796, #42020).

Aggregate Pandas UDFs can support it.

### Does this PR introduce _any_ user-facing change?

Yes, named arguments will be available for aggregate Pandas UDFs.

### How was this patch tested?

Added related tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42663 from ueshin/issues/SPARK-44952/kwargs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
a0x8o added a commit to a0x8o/spark that referenced this pull request Sep 1, 2023
…s UDFs

### What changes were proposed in this pull request?

Supports named arguments in aggregate Pandas UDFs.

For example:

```py
>>> pandas_udf("double")
... def weighted_mean(v: pd.Series, w: pd.Series) -> float:
...     import numpy as np
...     return np.average(v, weights=w)
...
>>> df = spark.createDataFrame(
...     [(1, 1.0, 1.0), (1, 2.0, 2.0), (2, 3.0, 1.0), (2, 5.0, 2.0), (2, 10.0, 3.0)],
...     ("id", "v", "w"))

>>> df.groupby("id").agg(weighted_mean(v=df["v"], w=df["w"])).show()
+---+-----------------------------+
| id|weighted_mean(v => v, w => w)|
+---+-----------------------------+
|  1|           1.6666666666666667|
|  2|            7.166666666666667|
+---+-----------------------------+

>>> df.groupby("id").agg(weighted_mean(w=df["w"], v=df["v"])).show()
+---+-----------------------------+
| id|weighted_mean(w => w, v => v)|
+---+-----------------------------+
|  1|           1.6666666666666667|
|  2|            7.166666666666667|
+---+-----------------------------+
```

or with window:

```py
>>> w = Window.partitionBy("id").orderBy("v").rowsBetween(-2, 1)

>>> df.withColumn("wm", weighted_mean(v=df.v, w=df.w).over(w)).show()
+---+----+---+------------------+
| id|   v|  w|                wm|
+---+----+---+------------------+
|  1| 1.0|1.0|1.6666666666666667|
|  1| 2.0|2.0|1.6666666666666667|
|  2| 3.0|1.0| 4.333333333333333|
|  2| 5.0|2.0| 7.166666666666667|
|  2|10.0|3.0| 7.166666666666667|
+---+----+---+------------------+

>>> df.withColumn("wm", weighted_mean_udf(w=df.w, v=df.v).over(w)).show()
+---+----+---+------------------+
| id|   v|  w|                wm|
+---+----+---+------------------+
|  1| 1.0|1.0|1.6666666666666667|
|  1| 2.0|2.0|1.6666666666666667|
|  2| 3.0|1.0| 4.333333333333333|
|  2| 5.0|2.0| 7.166666666666667|
|  2|10.0|3.0| 7.166666666666667|
+---+----+---+------------------+
```

### Why are the changes needed?

Now that named arguments support was added (apache/spark#41796, apache/spark#42020).

Aggregate Pandas UDFs can support it.

### Does this PR introduce _any_ user-facing change?

Yes, named arguments will be available for aggregate Pandas UDFs.

### How was this patch tested?

Added related tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42663 from ueshin/issues/SPARK-44952/kwargs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…nctions

### What changes were proposed in this pull request?
Add analyzer support for named function arguments.

### Why are the changes needed?
Part of the project needed for general named function argument support.

### Does this PR introduce _any_ user-facing change?
We added support for named arguments for the ```CountMinSketchAgg``` and ```Mask``` SQL functions.

### How was this patch tested?
A new suite was added for this test called NamedArgumentFunctionSuite.

Closes apache#42020 from learningchess2003/44059-final.

Authored-by: Richard Yu <richard.yu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
### What changes were proposed in this pull request?

Supports named arguments in Python UDTF.

For example:

```py
>>> udtf(returnType="a: int")
... class TestUDTF:
...     def eval(self, a, b):
...         yield a,
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> TestUDTF(a=lit(10), b=lit("x")).show()
+---+
|  a|
+---+
| 10|
+---+

>>> TestUDTF(b=lit("x"), a=lit(10)).show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x')").show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(b=>'x', a=>10)").show()
+---+
|  a|
+---+
| 10|
+---+
```

or:

```py
>>> udtf
... class TestUDTF:
...     staticmethod
...     def analyze(**kwargs: AnalyzeArgument) -> AnalyzeResult:
...         return AnalyzeResult(
...             StructType(
...                 [StructField(key, arg.data_type) for key, arg in sorted(kwargs.items())]
...             )
...         )
...     def eval(self, **kwargs):
...         yield tuple(value for _, value in sorted(kwargs.items()))
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x', x=>100.0)").show()
+---+---+-----+
|  a|  b|    x|
+---+---+-----+
| 10|  x|100.0|
+---+---+-----+

>>> spark.sql("SELECT * FROM test_udtf(x=>10, a=>'x', z=>100.0)").show()
+---+---+-----+
|  a|  x|    z|
+---+---+-----+
|  x| 10|100.0|
+---+---+-----+
```

### Why are the changes needed?

Now that named arguments are supported (apache#41796, apache#42020).

It should be supported in Python UDTF.

### Does this PR introduce _any_ user-facing change?

Yes, named arguments will be available for Python UDTF.

### How was this patch tested?

Added related tests.

Closes apache#42422 from ueshin/issues/SPARK-44749/kwargs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants