Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43295][PS] Support string type columns for DataFrameGroupBy.sum #42798

Closed
wants to merge 9 commits into from

Conversation

itholic
Copy link
Contributor

@itholic itholic commented Sep 4, 2023

What changes were proposed in this pull request?

This PR proposes to support string type columns for DataFrameGroupBy.sum.

Why are the changes needed?

To match the behavior with latest pandas.

Does this PR introduce any user-facing change?

Yes, from now on the DataFrameGroupBy.sum follows the behavior of latest pandas as below:

Test DataFrame

>>> psdf
   A    B  C      D
0  1  3.1  a   True
1  2  4.1  b  False
2  1  4.1  b  False
3  2  3.1  a   True

Before

>>> psdf.groupby("A").sum().sort_index()
     B  D
A
1  7.2  1
2  7.2  1

After

>>> psdf.groupby("A").sum().sort_index()
     B   C  D
A
1  7.2  ab  1
2  7.2  ba  1

How was this patch tested?

Updated the existing UTs to support string type columns.

Was this patch authored or co-authored using generative AI tooling?

No.

if sfun.__name__ == "sum" and isinstance(
psdf._internal.spark_type_for(label), StringType
):
output_scol = F.concat_ws("", F.collect_list(input_scol))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use combination of concat_ws and collect_list instead of sum to match the behavior with Pandas for string summation as below:

>>> import pyspark.sql.functions as sf
>>> sdf.show()
+---+
|  A|
+---+
|  a|
|  b|
|  c|
+---+

# Using `sum` over string type column returns `NULL` which is not matched with pandas.
>>> sdf.select(sf.sum(sdf.A)).show()
+------+
|sum(A)|
+------+
|  NULL|
+------+

# Using combination of `concat_ws` and `collect_list` to match the pandas behavior
>>> sdf.select(sf.concat_ws("", sf.collect_list(sdf.A))).show()
+----------------------------+
|concat_ws(, collect_list(A))|
+----------------------------+
|                         abc|
+----------------------------+

@zhengruifeng
Copy link
Contributor

@itholic I suspect the behavior is not deterministic: it depends on the internal order of collect_list

To make it deterministic: I think we need to collect_list both value and index, and sort by the indices before concat_ws

@itholic
Copy link
Contributor Author

itholic commented Sep 5, 2023

@zhengruifeng I think the problem is that the Pandas compute the concat without sorting, so the result can be difficult when the index is not sorted as below:

Problem

Pandas

>>> pdf
   A  B
4  a  1
3  b  2
2  c  3
>>> pdf.sum()
A    abc
B      6
dtype: object

Pandas API on Spark

>>> psdf
   A  B
4  a  1
3  b  2
2  c  3
>>> psdf.sum()
A    cba  # we internally sorted the index, so the result is different from Pandas
B      6
dtype: object

Solution

I think for now we can pick the one of three ways below:

  1. We can document the warning note as below:
    The result for string type column is non-deterministic since the implementation depends on `collect_list` API from PySpark which is non-deterministic as well.
    
  2. We can collect_list both value and index, and sort by the indices before concat_ws as you suggested, and document the warning note as below:
    The result for string type column can be different from Pandas when the index is not sorted, since we always sort the indexes before computing since the implementation depends on `collect_list` API from PySpark which is non-deterministic.
    
  3. We don't support the string type column like so far, and add a note that why we don't support the string type column as below:
    String type column is not support for now, because it might yield non-deterministic results unlike in Pandas.
    

WDYT? Also cc @HyukjinKwon, @ueshin @xinrong-meng , What strategy do we take for this situation? I believe that the same rules should apply to similar cases that already exist or may arise in the future.

@@ -910,7 +910,7 @@ def sum(self, numeric_only: Optional[bool] = True, min_count: int = 0) -> FrameL

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you gotta fix the log above too since not we support strings too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should update. Thanks for catching this out!

if sfun.__name__ == "sum" and isinstance(
psdf._internal.spark_type_for(label), StringType
):
output_scol = F.concat_ws("", F.collect_list(input_scol))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we sort by natural order? we have compute.ordered_head config. We can sort it by natural order, and perform collect_list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe this is different case from head?

In head, we do sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME) and compute the sdf.limit(n), so that we can keep the order because DataFrame.limit doesn't shuffle the data.

But in this case, it shuffles the data again when computing the collect_list even after sorting the DataFrame by natural order in advance, so I think the order would not be guaranteed.

Please let me know if I missed something??

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since string columns are computed together with numerical ones, I think we have to compute strings' sum in an aggregation way:

F.concat_ws("", F.array_sort(
  F.collect_list(F.struct(NATURAL_ORDER_COLUMN_NAME, input_scol))
)

For struct type, array_sort sort elements by first field then second field, IIRC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, then maybe we should extract the only string column from nested arrays to pass as arguments to concat_ws?

F.concat_ws(
    "",
    F.array_sort(
        F.collect_list(F.struct(NATURAL_ORDER_COLUMN_NAME, input_scol))
    ).getField(input_scol_name),
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just adjusted the comments. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, should extract the string col

"",
F.array_sort(
F.collect_list(F.struct(NATURAL_ORDER_COLUMN_NAME, input_scol))
).getField(input_scol_name),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you will need F.transform to extract the strings.

Other wise, you can use F.reduce to directly concate the strings from structs [<long, string>]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Updated the code with transform. Thanks!

@zhengruifeng
Copy link
Contributor

merged to master

@itholic itholic deleted the SPARK-43295 branch November 20, 2023 01:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants