Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44749][SQL][PYTHON] Support named arguments in Python UDTF #42422

Closed
wants to merge 11 commits into from

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Aug 9, 2023

What changes were proposed in this pull request?

Supports named arguments in Python UDTF.

For example:

>>> @udtf(returnType="a: int")
... class TestUDTF:
...     def eval(self, a, b):
...         yield a,
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> TestUDTF(a=lit(10), b=lit("x")).show()
+---+
|  a|
+---+
| 10|
+---+

>>> TestUDTF(b=lit("x"), a=lit(10)).show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x')").show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(b=>'x', a=>10)").show()
+---+
|  a|
+---+
| 10|
+---+

or:

>>> @udtf
... class TestUDTF:
...     @staticmethod
...     def analyze(**kwargs: AnalyzeArgument) -> AnalyzeResult:
...         return AnalyzeResult(
...             StructType(
...                 [StructField(key, arg.data_type) for key, arg in sorted(kwargs.items())]
...             )
...         )
...     def eval(self, **kwargs):
...         yield tuple(value for _, value in sorted(kwargs.items()))
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x', x=>100.0)").show()
+---+---+-----+
|  a|  b|    x|
+---+---+-----+
| 10|  x|100.0|
+---+---+-----+

>>> spark.sql("SELECT * FROM test_udtf(x=>10, a=>'x', z=>100.0)").show()
+---+---+-----+
|  a|  x|    z|
+---+---+-----+
|  x| 10|100.0|
+---+---+-----+

Why are the changes needed?

Now that named arguments are supported (#41796, #42020).

It should be supported in Python UDTF.

Does this PR introduce any user-facing change?

Yes, named arguments will be available for Python UDTF.

How was this patch tested?

Added related tests.

for i, df in enumerate(
[
self.spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x')"),
self.spark.sql("SELECT * FROM test_udtf(b=>'x', a=>10)"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the error message if the named argument is used incorrectly? For example

  1. duplicated input argument names: a => 10, a => 10
  2. non-existing argument name: c => 10
  3. incorrect combination of positional and named arguments: test_udtf(a => 10, 'x')

I am afraid that if we directly leverage Python's kwargs, the error messages wouldn't be as user-friendly as the SQL function ones.

Copy link
Member Author

@ueshin ueshin Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. So far just rely on the Python's error.

@dtenedor What's the error message like when applying name arguments with the above cases to other functions? Are there any example we can follow here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I believe @learningchess2003 added these checks in [1]. They are currently in the FunctionBuilderBase.scala file in [2]. If we want to reuse those checks, we could be consistent between error messages for Python UDTFs and other Spark functions.

[1] #42020
[2] https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/FunctionBuilderBase.scala#L107-L128

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to raise the following errors:

  1. duplicated input argument names: a => 10, a => 10

It will be checked in the analysis phase and an error with the error class DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE will be raised.

  1. non-existing argument name: c => 10

It will be handled in Python runtime and an error will be raised.

...PySparkRuntimeError: [UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' method: eval() got an unexpected keyword argument 'c'
  1. incorrect combination of positional and named arguments: test_udtf(a => 10, 'x')

It will be checked in the analysis phase and an error with the error class UNEXPECTED_POSITIONAL_ARGUMENT will be raised.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Contributor

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach looks good! Besides @allisonwang-db's suggestion, I just have a couple comments.

python/pyspark/worker.py Outdated Show resolved Hide resolved
for i, df in enumerate(
[
self.spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x')"),
self.spark.sql("SELECT * FROM test_udtf(b=>'x', a=>10)"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@ueshin ueshin marked this pull request as ready for review August 11, 2023 21:48
@ueshin
Copy link
Member Author

ueshin commented Aug 14, 2023

Let me merge this now to unblock the issue #42385 (comment).

@ueshin
Copy link
Member Author

ueshin commented Aug 14, 2023

Thanks! merging to master.

@ueshin ueshin closed this in d462956 Aug 14, 2023
Copy link
Contributor

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Late LGTM! Left a few comments.

python/pyspark/sql/functions.py Show resolved Hide resolved
Comment on lines +292 to +298
try {
bufferStream.close()
} finally {
if (!releasedOrClosed) {
// An error happened. Force to close the worker.
env.destroyPythonWorker(pythonExec, workerModule, envVars.asScala.toMap, worker)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why do we need to change this part?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#42385 changed to use bufferStream = new DirectByteBufferOutputStream(), but it was not closed.

python/pyspark/sql/tests/test_udtf.py Show resolved Hide resolved
python/pyspark/sql/tests/test_udtf.py Show resolved Hide resolved
@ueshin
Copy link
Member Author

ueshin commented Aug 14, 2023

I submitted another PR to address the above comments. #42490

ueshin added a commit that referenced this pull request Aug 15, 2023
…ents in Python UDTF

### What changes were proposed in this pull request?

This is a follow-up of #42422.

Adds more tests for named arguments in Python UDTF.

### Why are the changes needed?

There are more cases to test.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added related tests.

Closes #42490 from ueshin/issues/SPARK-44749/tests.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
valentinp17 pushed a commit to valentinp17/spark that referenced this pull request Aug 24, 2023
### What changes were proposed in this pull request?

Supports named arguments in Python UDTF.

For example:

```py
>>> udtf(returnType="a: int")
... class TestUDTF:
...     def eval(self, a, b):
...         yield a,
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> TestUDTF(a=lit(10), b=lit("x")).show()
+---+
|  a|
+---+
| 10|
+---+

>>> TestUDTF(b=lit("x"), a=lit(10)).show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x')").show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(b=>'x', a=>10)").show()
+---+
|  a|
+---+
| 10|
+---+
```

or:

```py
>>> udtf
... class TestUDTF:
...     staticmethod
...     def analyze(**kwargs: AnalyzeArgument) -> AnalyzeResult:
...         return AnalyzeResult(
...             StructType(
...                 [StructField(key, arg.data_type) for key, arg in sorted(kwargs.items())]
...             )
...         )
...     def eval(self, **kwargs):
...         yield tuple(value for _, value in sorted(kwargs.items()))
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x', x=>100.0)").show()
+---+---+-----+
|  a|  b|    x|
+---+---+-----+
| 10|  x|100.0|
+---+---+-----+

>>> spark.sql("SELECT * FROM test_udtf(x=>10, a=>'x', z=>100.0)").show()
+---+---+-----+
|  a|  x|    z|
+---+---+-----+
|  x| 10|100.0|
+---+---+-----+
```

### Why are the changes needed?

Now that named arguments are supported (apache#41796, apache#42020).

It should be supported in Python UDTF.

### Does this PR introduce _any_ user-facing change?

Yes, named arguments will be available for Python UDTF.

### How was this patch tested?

Added related tests.

Closes apache#42422 from ueshin/issues/SPARK-44749/kwargs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
valentinp17 pushed a commit to valentinp17/spark that referenced this pull request Aug 24, 2023
…ents in Python UDTF

### What changes were proposed in this pull request?

This is a follow-up of apache#42422.

Adds more tests for named arguments in Python UDTF.

### Why are the changes needed?

There are more cases to test.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added related tests.

Closes apache#42490 from ueshin/issues/SPARK-44749/tests.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
### What changes were proposed in this pull request?

Supports named arguments in Python UDTF.

For example:

```py
>>> udtf(returnType="a: int")
... class TestUDTF:
...     def eval(self, a, b):
...         yield a,
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> TestUDTF(a=lit(10), b=lit("x")).show()
+---+
|  a|
+---+
| 10|
+---+

>>> TestUDTF(b=lit("x"), a=lit(10)).show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x')").show()
+---+
|  a|
+---+
| 10|
+---+

>>> spark.sql("SELECT * FROM test_udtf(b=>'x', a=>10)").show()
+---+
|  a|
+---+
| 10|
+---+
```

or:

```py
>>> udtf
... class TestUDTF:
...     staticmethod
...     def analyze(**kwargs: AnalyzeArgument) -> AnalyzeResult:
...         return AnalyzeResult(
...             StructType(
...                 [StructField(key, arg.data_type) for key, arg in sorted(kwargs.items())]
...             )
...         )
...     def eval(self, **kwargs):
...         yield tuple(value for _, value in sorted(kwargs.items()))
...
>>> spark.udtf.register("test_udtf", TestUDTF)

>>> spark.sql("SELECT * FROM test_udtf(a=>10, b=>'x', x=>100.0)").show()
+---+---+-----+
|  a|  b|    x|
+---+---+-----+
| 10|  x|100.0|
+---+---+-----+

>>> spark.sql("SELECT * FROM test_udtf(x=>10, a=>'x', z=>100.0)").show()
+---+---+-----+
|  a|  x|    z|
+---+---+-----+
|  x| 10|100.0|
+---+---+-----+
```

### Why are the changes needed?

Now that named arguments are supported (apache#41796, apache#42020).

It should be supported in Python UDTF.

### Does this PR introduce _any_ user-facing change?

Yes, named arguments will be available for Python UDTF.

### How was this patch tested?

Added related tests.

Closes apache#42422 from ueshin/issues/SPARK-44749/kwargs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…ents in Python UDTF

### What changes were proposed in this pull request?

This is a follow-up of apache#42422.

Adds more tests for named arguments in Python UDTF.

### Why are the changes needed?

There are more cases to test.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added related tests.

Closes apache#42490 from ueshin/issues/SPARK-44749/tests.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants