-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings #45611
Conversation
val tableName = "test_agg_arr_collated" | ||
val simple = Seq( | ||
// binary | ||
("utf8_binary", Seq("array('aaa')", "array('AAA')"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use named case objects instead of tuples? Not sure about the others, but I can't read this :)
("utf8_binary", Seq("array('aaa', 'bbb')", "array('AAA', 'BBB')"), | ||
Seq((Seq("aaa", "bbb"), 1), (Seq("AAA", "BBB"), 1))), | ||
("utf8_binary", Seq("array('aaa')", "array('bbb')", "array('AAA')", "array('BBB')"), | ||
Seq((Seq("aaa"), 1), (Seq("bbb"), 1), (Seq("AAA"), 1), (Seq("BBB"), 1))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to cover UTF_BINARY test case here at all? In previous examples we mainly used UTF8_BINARY as validation but the real thing we want to test here are collations other than UTF8_BINARY.
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I would prefer if you could simplify the test cases.
count.map{ case (aggStr, _) => Row(aggStr)}) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test pattern that can be useful here is what we did for Window Aggs.
In short, just create a query that targets mixed-case data with LCASE collation (e.g. "aA", "aa", "AA") and query that targets normalized data with UTF8_BINARY ("aa", "aa", "aa"). Aggs and Joins should return the same result.
You can find test example here:
#45568
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated tests to support this and added case classes for better readability. Please check again.
// binary collation with values converted to lowercase should match the results as well | ||
sql(s"create table $tableNameLowercase(a ${check.dataType}) using parquet") | ||
check.rows.map(row => | ||
sql(s"insert into $tableNameLowercase(a) values(${row.toLowerCase(Locale.ROOT)})")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use UTF8String
toLowerCase
function instead of java.util.locale
? For this case it is ok, but we shouldn't start mixing ICU
collator with Java's default one.
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
Outdated
Show resolved
Hide resolved
).map((check: AggCheck) => | ||
withTable(tableName, tableNameLowercase) { | ||
def checkResults(table: String): Unit = { | ||
checkAnswer(sql(s"select a, count(*) from $table group by a"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bad practice that we try to test all cases in end-to-end tests. We should improve the test coverage in the unit test.
I suggest we add tests in UnsafeRowUtilsSuite
to test the isBinaryStable
function with different cases: array of string, array of array of string, struct of array of string, etc. The end-to-end test here should just run a few queries to show it works.
def isBinaryStable(dataType: DataType): Boolean = dataType.existsRecursively { | ||
case st: StringType => CollationFactory.fetchCollation(st.collationId).isBinaryCollation | ||
case _ => true | ||
def isBinaryStable(dataType: DataType): Boolean = !dataType.existsRecursively { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why isBinaryStable
is in UnsafeRowUtils
. Is the implementation bound somehow to unsafe row?
Why it is not in DataTypeUtils
or Collation...
, for example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what are the consequences of moving this function, do you know if we can do that @dbatomic?
Seq("array(array('AAA'), array('CCC'))"), | ||
Seq() | ||
) | ||
).map((check: JoinCheck) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to use .foreach
here. map
is usually used to build new collection which is not need in this case.
@@ -640,6 +641,201 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { | |||
"reason" -> "generation expression cannot contain non-default collated string type")) | |||
} | |||
|
|||
trait ArrayCheck { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you put the new code to the end of the test suite?
@MaxGekk @cloud-fan please check the updated tests. |
+1, LGTM. Merging to master. |
… arrays of collated strings ### What changes were proposed in this pull request? Example of aggregation sequence: ``` create table t(a array<string collate utf8_binary_lcase>) using parquet; insert into t(a) values(array('a' collate utf8_binary_lcase)); insert into t(a) values(array('A' collate utf8_binary_lcase)); select distinct a from t; ``` Example of join sequence: ``` create table l(a array<string collate utf8_binary_lcase>) using parquet; create table r(a array<string collate utf8_binary_lcase>) using parquet; insert into l(a) values(array('a' collate utf8_binary_lcase)); insert into r(a) values(array('A' collate utf8_binary_lcase)); select * from l join r where l.a = r.a; ``` Both runs should yield one row since the arrays are considered equal. Problem is in `isBinaryStable` function which should return false if **any** of its subtypes is non-binary collated string. ### Why are the changes needed? To support aggregates and joins in arrays of collated strings properly. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the described scenarios. ### How was this patch tested? Added new checks to collation suite. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#45611 from nikolamand-db/SPARK-47483. Authored-by: Nikola Mandic <nikola.mandic@databricks.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
What changes were proposed in this pull request?
Example of aggregation sequence:
Example of join sequence:
Both runs should yield one row since the arrays are considered equal.
Problem is in
isBinaryStable
function which should return false if any of its subtypes is non-binary collated string.Why are the changes needed?
To support aggregates and joins in arrays of collated strings properly.
Does this PR introduce any user-facing change?
Yes, it fixes the described scenarios.
How was this patch tested?
Added new checks to collation suite.
Was this patch authored or co-authored using generative AI tooling?
No.