Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28270][SQL][FOLLOW-UP] Explicitly cast into int/long/decimal in udf-aggregates_part1.sql to avoid Python float limitation #25110

Closed
wants to merge 1 commit into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jul 11, 2019

What changes were proposed in this pull request?

The tests added at #25069 seem flaky in some environments. See #25069 (comment)

Python's string representation of floats can make the tests flaky. See https://docs.python.org/3/tutorial/floatingpoint.html.

I think it's just better to explicitly cast everywhere udf returns a float (or a double) to stay safe. (note that we're not targeting the Python <> Scala value conversions - there are inevitable differences between Python and Scala; therefore, other languages' UDFs cannot guarantee the same results between Python and Scala).

This PR proposes to cast cases to long, integer and decimal explicitly to make the test cases robust.

Diff comparing to 'pgSQL/aggregates_part1.sql'

diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
index 51ca1d55869..734634b7388 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
@@ -3,23 +3,23 @@


 -- !query 0
-SELECT avg(four) AS avg_1 FROM onek
+SELECT CAST(avg(udf(four)) AS decimal(10,3)) AS avg_1 FROM onek
 -- !query 0 schema
-struct<avg_1:double>
+struct<avg_1:decimal(10,3)>
 -- !query 0 output
 1.5


 -- !query 1
-SELECT avg(a) AS avg_32 FROM aggtest WHERE a < 100
+SELECT CAST(udf(avg(a)) AS decimal(10,3)) AS avg_32 FROM aggtest WHERE a < 100
 -- !query 1 schema
-struct<avg_32:double>
+struct<avg_32:decimal(10,3)>
 -- !query 1 output
-32.666666666666664
+32.667


 -- !query 2
-select CAST(avg(b) AS Decimal(10,3)) AS avg_107_943 FROM aggtest
+select CAST(avg(udf(b)) AS Decimal(10,3)) AS avg_107_943 FROM aggtest
 -- !query 2 schema
 struct<avg_107_943:decimal(10,3)>
 -- !query 2 output
@@ -27,39 +27,39 @@ struct<avg_107_943:decimal(10,3)>


 -- !query 3
-SELECT sum(four) AS sum_1500 FROM onek
+SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek
 -- !query 3 schema
-struct<sum_1500:bigint>
+struct<sum_1500:int>
 -- !query 3 output
 1500


 -- !query 4
-SELECT sum(a) AS sum_198 FROM aggtest
+SELECT udf(sum(a)) AS sum_198 FROM aggtest
 -- !query 4 schema
-struct<sum_198:bigint>
+struct<sum_198:string>
 -- !query 4 output
 198


 -- !query 5
-SELECT sum(b) AS avg_431_773 FROM aggtest
+SELECT CAST(udf(udf(sum(b))) AS decimal(10,3)) AS avg_431_773 FROM aggtest
 -- !query 5 schema
-struct<avg_431_773:double>
+struct<avg_431_773:decimal(10,3)>
 -- !query 5 output
-431.77260909229517
+431.773


 -- !query 6
-SELECT max(four) AS max_3 FROM onek
+SELECT udf(max(four)) AS max_3 FROM onek
 -- !query 6 schema
-struct<max_3:int>
+struct<max_3:string>
 -- !query 6 output
 3


 -- !query 7
-SELECT max(a) AS max_100 FROM aggtest
+SELECT max(CAST(udf(a) AS int)) AS max_100 FROM aggtest
 -- !query 7 schema
 struct<max_100:int>
 -- !query 7 output
@@ -67,245 +67,246 @@ struct<max_100:int>


 -- !query 8
-SELECT max(aggtest.b) AS max_324_78 FROM aggtest
+SELECT CAST(udf(udf(max(aggtest.b))) AS decimal(10,3)) AS max_324_78 FROM aggtest
 -- !query 8 schema
-struct<max_324_78:float>
+struct<max_324_78:decimal(10,3)>
 -- !query 8 output
 324.78


 -- !query 9
-SELECT stddev_pop(b) FROM aggtest
+SELECT CAST(stddev_pop(udf(b)) AS decimal(10,3)) FROM aggtest
 -- !query 9 schema
-struct<stddev_pop(CAST(b AS DOUBLE)):double>
+struct<CAST(stddev_pop(CAST(udf(b) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 9 output
-131.10703231895047
+131.107


 -- !query 10
-SELECT stddev_samp(b) FROM aggtest
+SELECT CAST(udf(stddev_samp(b)) AS decimal(10,3)) FROM aggtest
 -- !query 10 schema
-struct<stddev_samp(CAST(b AS DOUBLE)):double>
+struct<CAST(udf(stddev_samp(cast(b as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 10 output
-151.38936080399804
+151.389


 -- !query 11
-SELECT var_pop(b) FROM aggtest
+SELECT CAST(var_pop(udf(b)) AS decimal(10,3)) FROM aggtest
 -- !query 11 schema
-struct<var_pop(CAST(b AS DOUBLE)):double>
+struct<CAST(var_pop(CAST(udf(b) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 11 output
-17189.053923482323
+17189.054


 -- !query 12
-SELECT var_samp(b) FROM aggtest
+SELECT CAST(udf(var_samp(b)) AS decimal(10,3)) FROM aggtest
 -- !query 12 schema
-struct<var_samp(CAST(b AS DOUBLE)):double>
+struct<CAST(udf(var_samp(cast(b as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 12 output
-22918.738564643096
+22918.739


 -- !query 13
-SELECT stddev_pop(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(udf(stddev_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
 -- !query 13 schema
-struct<stddev_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(udf(stddev_pop(cast(cast(b as decimal(38,0)) as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 13 output
-131.18117242958306
+131.181


 -- !query 14
-SELECT stddev_samp(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(stddev_samp(CAST(udf(b) AS Decimal(38,0))) AS decimal(10,3)) FROM aggtest
 -- !query 14 schema
-struct<stddev_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(stddev_samp(CAST(CAST(udf(b) AS DECIMAL(38,0)) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 14 output
-151.47497042966097
+151.475


 -- !query 15
-SELECT var_pop(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(udf(var_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
 -- !query 15 schema
-struct<var_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(udf(var_pop(cast(cast(b as decimal(38,0)) as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 15 output
 17208.5


 -- !query 16
-SELECT var_samp(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(var_samp(udf(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
 -- !query 16 schema
-struct<var_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(var_samp(CAST(udf(cast(b as decimal(38,0))) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 16 output
-22944.666666666668
+22944.667


 -- !query 17
-SELECT var_pop(1.0), var_samp(2.0)
+SELECT CAST(udf(var_pop(1.0)) AS int), var_samp(udf(2.0))
 -- !query 17 schema
-struct<var_pop(CAST(1.0 AS DOUBLE)):double,var_samp(CAST(2.0 AS DOUBLE)):double>
+struct<CAST(udf(var_pop(cast(1.0 as double))) AS INT):int,var_samp(CAST(udf(2.0) AS DOUBLE)):double>
 -- !query 17 output
-0.0    NaN
+0      NaN


 -- !query 18
-SELECT stddev_pop(CAST(3.0 AS Decimal(38,0))), stddev_samp(CAST(4.0 AS Decimal(38,0)))
+SELECT CAST(stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))) AS int), stddev_samp(CAST(udf(4.0) AS Decimal(38,0)))
 -- !query 18 schema
-struct<stddev_pop(CAST(CAST(3.0 AS DECIMAL(38,0)) AS DOUBLE)):double,stddev_samp(CAST(CAST(4.0 AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(stddev_pop(CAST(udf(cast(3.0 as decimal(38,0))) AS DOUBLE)) AS INT):int,stddev_samp(CAST(CAST(udf(4.0) AS DECIMAL(38,0)) AS DOUBLE)):double>
 -- !query 18 output
-0.0    NaN
+0      NaN


 -- !query 19
-select sum(CAST(null AS int)) from range(1,4)
+select sum(udf(CAST(null AS int))) from range(1,4)
 -- !query 19 schema
-struct<sum(CAST(NULL AS INT)):bigint>
+struct<sum(CAST(udf(cast(null as int)) AS DOUBLE)):double>
 -- !query 19 output
 NULL


 -- !query 20
-select sum(CAST(null AS long)) from range(1,4)
+select sum(udf(CAST(null AS long))) from range(1,4)
 -- !query 20 schema
-struct<sum(CAST(NULL AS BIGINT)):bigint>
+struct<sum(CAST(udf(cast(null as bigint)) AS DOUBLE)):double>
 -- !query 20 output
 NULL


 -- !query 21
-select sum(CAST(null AS Decimal(38,0))) from range(1,4)
+select sum(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
 -- !query 21 schema
-struct<sum(CAST(NULL AS DECIMAL(38,0))):decimal(38,0)>
+struct<sum(CAST(udf(cast(null as decimal(38,0))) AS DOUBLE)):double>
 -- !query 21 output
 NULL


 -- !query 22
-select sum(CAST(null AS DOUBLE)) from range(1,4)
+select sum(udf(CAST(null AS DOUBLE))) from range(1,4)
 -- !query 22 schema
-struct<sum(CAST(NULL AS DOUBLE)):double>
+struct<sum(CAST(udf(cast(null as double)) AS DOUBLE)):double>
 -- !query 22 output
 NULL


 -- !query 23
-select avg(CAST(null AS int)) from range(1,4)
+select avg(udf(CAST(null AS int))) from range(1,4)
 -- !query 23 schema
-struct<avg(CAST(NULL AS INT)):double>
+struct<avg(CAST(udf(cast(null as int)) AS DOUBLE)):double>
 -- !query 23 output
 NULL


 -- !query 24
-select avg(CAST(null AS long)) from range(1,4)
+select avg(udf(CAST(null AS long))) from range(1,4)
 -- !query 24 schema
-struct<avg(CAST(NULL AS BIGINT)):double>
+struct<avg(CAST(udf(cast(null as bigint)) AS DOUBLE)):double>
 -- !query 24 output
 NULL


 -- !query 25
-select avg(CAST(null AS Decimal(38,0))) from range(1,4)
+select avg(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
 -- !query 25 schema
-struct<avg(CAST(NULL AS DECIMAL(38,0))):decimal(38,4)>
+struct<avg(CAST(udf(cast(null as decimal(38,0))) AS DOUBLE)):double>
 -- !query 25 output
 NULL


 -- !query 26
-select avg(CAST(null AS DOUBLE)) from range(1,4)
+select avg(udf(CAST(null AS DOUBLE))) from range(1,4)
 -- !query 26 schema
-struct<avg(CAST(NULL AS DOUBLE)):double>
+struct<avg(CAST(udf(cast(null as double)) AS DOUBLE)):double>
 -- !query 26 output
 NULL


 -- !query 27
-select sum(CAST('NaN' AS DOUBLE)) from range(1,4)
+select sum(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
 -- !query 27 schema
-struct<sum(CAST(NaN AS DOUBLE)):double>
+struct<sum(CAST(udf(NaN) AS DOUBLE)):double>
 -- !query 27 output
 NaN


 -- !query 28
-select avg(CAST('NaN' AS DOUBLE)) from range(1,4)
+select avg(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
 -- !query 28 schema
-struct<avg(CAST(NaN AS DOUBLE)):double>
+struct<avg(CAST(udf(NaN) AS DOUBLE)):double>
 -- !query 28 output
 NaN

 -- !query 30
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
 FROM (VALUES ('Infinity'), ('1')) v(x)
 -- !query 30 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
 -- !query 30 output
 Infinity       NaN


 -- !query 31
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
 FROM (VALUES ('Infinity'), ('Infinity')) v(x)
 -- !query 31 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
 -- !query 31 output
 Infinity       NaN


 -- !query 32
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
 FROM (VALUES ('-Infinity'), ('Infinity')) v(x)
 -- !query 32 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
 -- !query 32 output
 NaN    NaN


 -- !query 33
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS int), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))
 FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x)
 -- !query 33 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<CAST(avg(CAST(udf(cast(x as double)) AS DOUBLE)) AS INT):int,CAST(udf(var_pop(cast(x as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 33 output
-1.00000005E8   2.5
+100000005      2.5


 -- !query 34
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))
 FROM (VALUES (7000000000005), (7000000000007)) v(x)
 -- !query 34 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<CAST(avg(CAST(udf(cast(x as double)) AS DOUBLE)) AS BIGINT):bigint,CAST(udf(var_pop(cast(x as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 34 output
-7.000000000006E12      1.0
+7000000000006  1


 -- !query 35
-SELECT covar_pop(b, a), covar_samp(b, a) FROM aggtest
+SELECT CAST(udf(covar_pop(b, udf(a))) AS decimal(10,3)), CAST(covar_samp(udf(b), a) as decimal(10,3)) FROM aggtest
 -- !query 35 schema
-struct<covar_pop(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double,covar_samp(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double>
+struct<CAST(udf(covar_pop(cast(b as double), cast(udf(a) as double))) AS DECIMAL(10,3)):decimal(10,3),CAST(covar_samp(CAST(udf(b) AS DOUBLE), CAST(a AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 35 output
-653.6289553875104      871.5052738500139
+653.629        871.505


 -- !query 36
-SELECT corr(b, a) FROM aggtest
+SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest
 -- !query 36 schema
-struct<corr(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double>
+struct<CAST(corr(CAST(b AS DOUBLE), CAST(udf(a) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 36 output
-0.1396345165178734
+0.14


 -- !query 37
-SELECT count(four) AS cnt_1000 FROM onek
+SELECT count(udf(four)) AS cnt_1000 FROM onek
 -- !query 37 schema
 struct<cnt_1000:bigint>
 -- !query 37 output
@@ -313,18 +314,18 @@ struct<cnt_1000:bigint>


 -- !query 38
-SELECT count(DISTINCT four) AS cnt_4 FROM onek
+SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek
 -- !query 38 schema
-struct<cnt_4:bigint>
+struct<cnt_4:string>
 -- !query 38 output
 4


 -- !query 39
-select ten, count(*), sum(four) from onek
+select ten, udf(count(*)), CAST(sum(udf(four)) AS int) from onek
 group by ten order by ten
 -- !query 39 schema
-struct<ten:int,count(1):bigint,sum(four):bigint>
+struct<ten:int,udf(count(1)):string,CAST(sum(CAST(udf(four) AS DOUBLE)) AS INT):int>
 -- !query 39 output
 0      100     100
 1      100     200
@@ -339,10 +340,10 @@ struct<ten:int,count(1):bigint,sum(four):bigint>


 -- !query 40
-select ten, count(four), sum(DISTINCT four) from onek
+select ten, count(udf(four)), udf(sum(DISTINCT four)) from onek
 group by ten order by ten
 -- !query 40 schema
-struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint>
+struct<ten:int,count(udf(four)):bigint,udf(sum(distinct cast(four as bigint))):string>
 -- !query 40 output
 0      100     2
 1      100     4
@@ -357,11 +358,11 @@ struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint>


 -- !query 41
-select ten, sum(distinct four) from onek a
+select ten, udf(sum(distinct four)) from onek a
 group by ten
-having exists (select 1 from onek b where sum(distinct a.four) = b.four)
+having exists (select 1 from onek b where udf(sum(distinct a.four)) = b.four)
 -- !query 41 schema
-struct<ten:int,sum(DISTINCT four):bigint>
+struct<ten:int,udf(sum(distinct cast(four as bigint))):string>
 -- !query 41 output
 0      2
 2      2
@@ -374,23 +375,23 @@ struct<ten:int,sum(DISTINCT four):bigint>
 select ten, sum(distinct four) from onek a
 group by ten
 having exists (select 1 from onek b
-               where sum(distinct a.four + b.four) = b.four)
+               where sum(distinct a.four + b.four) = udf(b.four))
 -- !query 42 schema
 struct<>
 -- !query 42 output
 org.apache.spark.sql.AnalysisException

 Aggregate/Window/Generate expressions are not valid in where clause of the query.
-Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(b.`four` AS BIGINT))]
+Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(udf(four) AS BIGINT))]
 Invalid expressions: [sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT))];


 -- !query 43
 select
-  (select max((select i.unique2 from tenk1 i where i.unique1 = o.unique1)))
+  (select udf(max((select i.unique2 from tenk1 i where i.unique1 = o.unique1))))
 from tenk1 o
 -- !query 43 schema
 struct<>
 -- !query 43 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 63
+cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 67

How was this patch tested?

Manually tested in local.

Also, with JDK 11:

Using /.../jdk-11.0.3.jdk/Contents/Home as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from /.../spark/project
[info] Updating {file:/.../spark/project/}spark-build...
...
[info] SQLQueryTestSuite:
...
[info] - udf/pgSQL/udf-aggregates_part1.sql - Scala UDF (17 seconds, 228 milliseconds)
[info] - udf/pgSQL/udf-aggregates_part1.sql - Regular Python UDF (36 seconds, 170 milliseconds)
[info] - udf/pgSQL/udf-aggregates_part1.sql - Scalar Pandas UDF (41 seconds, 132 milliseconds)
...

@HyukjinKwon
Copy link
Member Author

cc @dongjoon-hyun

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jul 11, 2019

FYI @viirya, @skonto, @imback82, @huaxingao, @vinodkc, @manuzhang, @chitralverma since you guys are working on this.

@HyukjinKwon
Copy link
Member Author

I double checked that it works with JDK 11 just for doubly sure @dongjoon-hyun:

Using /.../jdk-11.0.3.jdk/Contents/Home as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from /.../spark/project
[info] Updating {file:/.../spark/project/}spark-build...
...
[info] SQLQueryTestSuite:
...
[info] - udf/pgSQL/udf-aggregates_part1.sql - Scala UDF (17 seconds, 228 milliseconds)
[info] - udf/pgSQL/udf-aggregates_part1.sql - Regular Python UDF (36 seconds, 170 milliseconds)
[info] - udf/pgSQL/udf-aggregates_part1.sql - Scalar Pandas UDF (41 seconds, 132 milliseconds)
...

@dongjoon-hyun
Copy link
Member

Nit. Can we use Decimal instead? In case of 'corr', int casting seems to return 0 always.

@HyukjinKwon
Copy link
Member Author

Yea Let me fix that one while I'm here.

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107511 has finished for PR 25110 at commit a0a6219.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107510 has finished for PR 25110 at commit e7143de.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Ah, using decimals are closer to the original 'aggregates_part1.sql'. I updated the diff comparing to the original file in the PR description to check out as well.

@HyukjinKwon HyukjinKwon changed the title [SPARK-28270][SQL][FOLLOW-UP] Explicitly cast into integer/long in udf-aggregates_part1.sql to avoid Python float limitation [SPARK-28270][SQL][FOLLOW-UP] Explicitly cast into int/long/decimal in udf-aggregates_part1.sql to avoid Python float limitation Jul 11, 2019
@@ -23,32 +23,32 @@ select CAST(avg(udf(b)) AS Decimal(10,3)) AS avg_107_943 FROM aggtest;
-- `student` has a column with data type POINT, which is not supported by Spark [SPARK-27766]
-- SELECT avg(gpa) AS avg_3_4 FROM ONLY student;

SELECT sum(udf(four)) AS sum_1500 FROM onek;
SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess only float/double has the issue, we cast int/long just for more robust too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, we don't have to. But casting makes closer to the original results:

 -- !query 3
-SELECT sum(four) AS sum_1500 FROM onek
+SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek
 -- !query 3 schema
-struct<sum_1500:bigint>
+struct<sum_1500:int>
 -- !query 3 output
 1500

If we don't cast, it returns a double due to type coercion. Should be fine.

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107522 has finished for PR 25110 at commit 21870c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107516 has finished for PR 25110 at commit fe48a36.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Merged to master.

-- !query 33 output
7.000000000006E12 1.0
7000000000006 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazingly, this still fails with INT type. And, Spark seems to return a wrong value due to truncation.

Expected "700000000000[6] 1", but got "700000000000[5] 1" Result did not match for query #33&#010;SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))&#010;FROM (VALUES (7000000000005), (7000000000007)) v(x)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In master branch, all maven Jenkins fail with this. Only SBT Jenkins are running.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dongjoon-hyun. Let me take a look and make a fix soon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jul 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It passes with Maven in my local:

- udf/pgSQL/udf-aggregates_part1.sql - Scala UDF
- udf/pgSQL/udf-aggregates_part1.sql - Regular Python UDF
- udf/pgSQL/udf-aggregates_part1.sql - Scalar Pandas UDF

I believe the problem seems similarly the Python or OS installed in the machine.

Looks here's what's going on:

scala> Seq("7000000000004.999", "7000000000006.999").toDF().selectExpr("CAST(avg(value) AS long)").show()
+--------------------------+
|CAST(avg(value) AS BIGINT)|
+--------------------------+
|             7000000000005|
+--------------------------+

Let me try to find a better way to work around it ...

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we ignore this test first because this is consistent and permanent in Jenkins Maven? This will hide another PR's bugs.

I mean this line of this file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give me a couple of hours? I will disable this line if I can't make it ..

HyukjinKwon added a commit to HyukjinKwon/spark that referenced this pull request Jul 12, 2019
…nput of UDF as double in the failed test in udf-aggregate_part1.sql

## What changes were proposed in this pull request?

It still can be flaky on certain environments due to float limitation described at apache#25110 . See apache#25110 (comment)

- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/6584/testReport/org.apache.spark.sql/SQLQueryTestSuite/udf_pgSQL_udf_aggregates_part1_sql___Regular_Python_UDF/

```
Expected "700000000000[6] 1", but got "700000000000[5] 1" Result did not match for query #33&#10;SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))&#10;FROM (VALUES (7000000000005), (7000000000007)) v(x)
```

Here;s what's going on: apache#25110 (comment)

```
scala> Seq("7000000000004.999", "7000000000006.999").toDF().selectExpr("CAST(avg(value) AS long)").show()
+--------------------------+
|CAST(avg(value) AS BIGINT)|
+--------------------------+
|             7000000000005|
+--------------------------+
```

Therefore, this PR just avoid to cast in the specific test.

This is a temp fix. We need more robust way to avoid such cases.

## How was this patch tested?

It passes with Maven in my local before/after this PR. I believe the problem seems similarly the Python or OS installed in the machine. I should test this against PR builder with `test-maven` for sure..

Closes apache#25128 from HyukjinKwon/SPARK-28270-2.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
vinodkc pushed a commit to vinodkc/spark that referenced this pull request Jul 18, 2019
…n udf-aggregates_part1.sql to avoid Python float limitation

## What changes were proposed in this pull request?

The tests added at apache#25069 seem flaky in some environments. See apache#25069 (comment)

Python's string representation of floats can make the tests flaky. See https://docs.python.org/3/tutorial/floatingpoint.html.

I think it's just better to explicitly cast everywhere udf returns a float (or a double) to stay safe. (note that we're not targeting the Python <> Scala value conversions - there are inevitable differences between Python and Scala; therefore, other languages' UDFs cannot guarantee the same results between Python and Scala).

This PR proposes to cast cases to long, integer and decimal explicitly to make the test cases robust.

<details><summary>Diff comparing to 'pgSQL/aggregates_part1.sql'</summary>
<p>

```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
index 51ca1d55869..734634b7388 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
 -3,23 +3,23

 -- !query 0
-SELECT avg(four) AS avg_1 FROM onek
+SELECT CAST(avg(udf(four)) AS decimal(10,3)) AS avg_1 FROM onek
 -- !query 0 schema
-struct<avg_1:double>
+struct<avg_1:decimal(10,3)>
 -- !query 0 output
 1.5

 -- !query 1
-SELECT avg(a) AS avg_32 FROM aggtest WHERE a < 100
+SELECT CAST(udf(avg(a)) AS decimal(10,3)) AS avg_32 FROM aggtest WHERE a < 100
 -- !query 1 schema
-struct<avg_32:double>
+struct<avg_32:decimal(10,3)>
 -- !query 1 output
-32.666666666666664
+32.667

 -- !query 2
-select CAST(avg(b) AS Decimal(10,3)) AS avg_107_943 FROM aggtest
+select CAST(avg(udf(b)) AS Decimal(10,3)) AS avg_107_943 FROM aggtest
 -- !query 2 schema
 struct<avg_107_943:decimal(10,3)>
 -- !query 2 output
 -27,39 +27,39  struct<avg_107_943:decimal(10,3)>

 -- !query 3
-SELECT sum(four) AS sum_1500 FROM onek
+SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek
 -- !query 3 schema
-struct<sum_1500:bigint>
+struct<sum_1500:int>
 -- !query 3 output
 1500

 -- !query 4
-SELECT sum(a) AS sum_198 FROM aggtest
+SELECT udf(sum(a)) AS sum_198 FROM aggtest
 -- !query 4 schema
-struct<sum_198:bigint>
+struct<sum_198:string>
 -- !query 4 output
 198

 -- !query 5
-SELECT sum(b) AS avg_431_773 FROM aggtest
+SELECT CAST(udf(udf(sum(b))) AS decimal(10,3)) AS avg_431_773 FROM aggtest
 -- !query 5 schema
-struct<avg_431_773:double>
+struct<avg_431_773:decimal(10,3)>
 -- !query 5 output
-431.77260909229517
+431.773

 -- !query 6
-SELECT max(four) AS max_3 FROM onek
+SELECT udf(max(four)) AS max_3 FROM onek
 -- !query 6 schema
-struct<max_3:int>
+struct<max_3:string>
 -- !query 6 output
 3

 -- !query 7
-SELECT max(a) AS max_100 FROM aggtest
+SELECT max(CAST(udf(a) AS int)) AS max_100 FROM aggtest
 -- !query 7 schema
 struct<max_100:int>
 -- !query 7 output
 -67,245 +67,246  struct<max_100:int>

 -- !query 8
-SELECT max(aggtest.b) AS max_324_78 FROM aggtest
+SELECT CAST(udf(udf(max(aggtest.b))) AS decimal(10,3)) AS max_324_78 FROM aggtest
 -- !query 8 schema
-struct<max_324_78:float>
+struct<max_324_78:decimal(10,3)>
 -- !query 8 output
 324.78

 -- !query 9
-SELECT stddev_pop(b) FROM aggtest
+SELECT CAST(stddev_pop(udf(b)) AS decimal(10,3)) FROM aggtest
 -- !query 9 schema
-struct<stddev_pop(CAST(b AS DOUBLE)):double>
+struct<CAST(stddev_pop(CAST(udf(b) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 9 output
-131.10703231895047
+131.107

 -- !query 10
-SELECT stddev_samp(b) FROM aggtest
+SELECT CAST(udf(stddev_samp(b)) AS decimal(10,3)) FROM aggtest
 -- !query 10 schema
-struct<stddev_samp(CAST(b AS DOUBLE)):double>
+struct<CAST(udf(stddev_samp(cast(b as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 10 output
-151.38936080399804
+151.389

 -- !query 11
-SELECT var_pop(b) FROM aggtest
+SELECT CAST(var_pop(udf(b)) AS decimal(10,3)) FROM aggtest
 -- !query 11 schema
-struct<var_pop(CAST(b AS DOUBLE)):double>
+struct<CAST(var_pop(CAST(udf(b) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 11 output
-17189.053923482323
+17189.054

 -- !query 12
-SELECT var_samp(b) FROM aggtest
+SELECT CAST(udf(var_samp(b)) AS decimal(10,3)) FROM aggtest
 -- !query 12 schema
-struct<var_samp(CAST(b AS DOUBLE)):double>
+struct<CAST(udf(var_samp(cast(b as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 12 output
-22918.738564643096
+22918.739

 -- !query 13
-SELECT stddev_pop(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(udf(stddev_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
 -- !query 13 schema
-struct<stddev_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(udf(stddev_pop(cast(cast(b as decimal(38,0)) as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 13 output
-131.18117242958306
+131.181

 -- !query 14
-SELECT stddev_samp(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(stddev_samp(CAST(udf(b) AS Decimal(38,0))) AS decimal(10,3)) FROM aggtest
 -- !query 14 schema
-struct<stddev_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(stddev_samp(CAST(CAST(udf(b) AS DECIMAL(38,0)) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 14 output
-151.47497042966097
+151.475

 -- !query 15
-SELECT var_pop(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(udf(var_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
 -- !query 15 schema
-struct<var_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(udf(var_pop(cast(cast(b as decimal(38,0)) as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 15 output
 17208.5

 -- !query 16
-SELECT var_samp(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(var_samp(udf(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
 -- !query 16 schema
-struct<var_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(var_samp(CAST(udf(cast(b as decimal(38,0))) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 16 output
-22944.666666666668
+22944.667

 -- !query 17
-SELECT var_pop(1.0), var_samp(2.0)
+SELECT CAST(udf(var_pop(1.0)) AS int), var_samp(udf(2.0))
 -- !query 17 schema
-struct<var_pop(CAST(1.0 AS DOUBLE)):double,var_samp(CAST(2.0 AS DOUBLE)):double>
+struct<CAST(udf(var_pop(cast(1.0 as double))) AS INT):int,var_samp(CAST(udf(2.0) AS DOUBLE)):double>
 -- !query 17 output
-0.0    NaN
+0      NaN

 -- !query 18
-SELECT stddev_pop(CAST(3.0 AS Decimal(38,0))), stddev_samp(CAST(4.0 AS Decimal(38,0)))
+SELECT CAST(stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))) AS int), stddev_samp(CAST(udf(4.0) AS Decimal(38,0)))
 -- !query 18 schema
-struct<stddev_pop(CAST(CAST(3.0 AS DECIMAL(38,0)) AS DOUBLE)):double,stddev_samp(CAST(CAST(4.0 AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(stddev_pop(CAST(udf(cast(3.0 as decimal(38,0))) AS DOUBLE)) AS INT):int,stddev_samp(CAST(CAST(udf(4.0) AS DECIMAL(38,0)) AS DOUBLE)):double>
 -- !query 18 output
-0.0    NaN
+0      NaN

 -- !query 19
-select sum(CAST(null AS int)) from range(1,4)
+select sum(udf(CAST(null AS int))) from range(1,4)
 -- !query 19 schema
-struct<sum(CAST(NULL AS INT)):bigint>
+struct<sum(CAST(udf(cast(null as int)) AS DOUBLE)):double>
 -- !query 19 output
 NULL

 -- !query 20
-select sum(CAST(null AS long)) from range(1,4)
+select sum(udf(CAST(null AS long))) from range(1,4)
 -- !query 20 schema
-struct<sum(CAST(NULL AS BIGINT)):bigint>
+struct<sum(CAST(udf(cast(null as bigint)) AS DOUBLE)):double>
 -- !query 20 output
 NULL

 -- !query 21
-select sum(CAST(null AS Decimal(38,0))) from range(1,4)
+select sum(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
 -- !query 21 schema
-struct<sum(CAST(NULL AS DECIMAL(38,0))):decimal(38,0)>
+struct<sum(CAST(udf(cast(null as decimal(38,0))) AS DOUBLE)):double>
 -- !query 21 output
 NULL

 -- !query 22
-select sum(CAST(null AS DOUBLE)) from range(1,4)
+select sum(udf(CAST(null AS DOUBLE))) from range(1,4)
 -- !query 22 schema
-struct<sum(CAST(NULL AS DOUBLE)):double>
+struct<sum(CAST(udf(cast(null as double)) AS DOUBLE)):double>
 -- !query 22 output
 NULL

 -- !query 23
-select avg(CAST(null AS int)) from range(1,4)
+select avg(udf(CAST(null AS int))) from range(1,4)
 -- !query 23 schema
-struct<avg(CAST(NULL AS INT)):double>
+struct<avg(CAST(udf(cast(null as int)) AS DOUBLE)):double>
 -- !query 23 output
 NULL

 -- !query 24
-select avg(CAST(null AS long)) from range(1,4)
+select avg(udf(CAST(null AS long))) from range(1,4)
 -- !query 24 schema
-struct<avg(CAST(NULL AS BIGINT)):double>
+struct<avg(CAST(udf(cast(null as bigint)) AS DOUBLE)):double>
 -- !query 24 output
 NULL

 -- !query 25
-select avg(CAST(null AS Decimal(38,0))) from range(1,4)
+select avg(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
 -- !query 25 schema
-struct<avg(CAST(NULL AS DECIMAL(38,0))):decimal(38,4)>
+struct<avg(CAST(udf(cast(null as decimal(38,0))) AS DOUBLE)):double>
 -- !query 25 output
 NULL

 -- !query 26
-select avg(CAST(null AS DOUBLE)) from range(1,4)
+select avg(udf(CAST(null AS DOUBLE))) from range(1,4)
 -- !query 26 schema
-struct<avg(CAST(NULL AS DOUBLE)):double>
+struct<avg(CAST(udf(cast(null as double)) AS DOUBLE)):double>
 -- !query 26 output
 NULL

 -- !query 27
-select sum(CAST('NaN' AS DOUBLE)) from range(1,4)
+select sum(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
 -- !query 27 schema
-struct<sum(CAST(NaN AS DOUBLE)):double>
+struct<sum(CAST(udf(NaN) AS DOUBLE)):double>
 -- !query 27 output
 NaN

 -- !query 28
-select avg(CAST('NaN' AS DOUBLE)) from range(1,4)
+select avg(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
 -- !query 28 schema
-struct<avg(CAST(NaN AS DOUBLE)):double>
+struct<avg(CAST(udf(NaN) AS DOUBLE)):double>
 -- !query 28 output
 NaN

 -- !query 30
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
 FROM (VALUES ('Infinity'), ('1')) v(x)
 -- !query 30 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
 -- !query 30 output
 Infinity       NaN

 -- !query 31
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
 FROM (VALUES ('Infinity'), ('Infinity')) v(x)
 -- !query 31 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
 -- !query 31 output
 Infinity       NaN

 -- !query 32
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
 FROM (VALUES ('-Infinity'), ('Infinity')) v(x)
 -- !query 32 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
 -- !query 32 output
 NaN    NaN

 -- !query 33
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS int), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))
 FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x)
 -- !query 33 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<CAST(avg(CAST(udf(cast(x as double)) AS DOUBLE)) AS INT):int,CAST(udf(var_pop(cast(x as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 33 output
-1.00000005E8   2.5
+100000005      2.5

 -- !query 34
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))
 FROM (VALUES (7000000000005), (7000000000007)) v(x)
 -- !query 34 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<CAST(avg(CAST(udf(cast(x as double)) AS DOUBLE)) AS BIGINT):bigint,CAST(udf(var_pop(cast(x as double))) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 34 output
-7.000000000006E12      1.0
+7000000000006  1

 -- !query 35
-SELECT covar_pop(b, a), covar_samp(b, a) FROM aggtest
+SELECT CAST(udf(covar_pop(b, udf(a))) AS decimal(10,3)), CAST(covar_samp(udf(b), a) as decimal(10,3)) FROM aggtest
 -- !query 35 schema
-struct<covar_pop(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double,covar_samp(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double>
+struct<CAST(udf(covar_pop(cast(b as double), cast(udf(a) as double))) AS DECIMAL(10,3)):decimal(10,3),CAST(covar_samp(CAST(udf(b) AS DOUBLE), CAST(a AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 35 output
-653.6289553875104      871.5052738500139
+653.629        871.505

 -- !query 36
-SELECT corr(b, a) FROM aggtest
+SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest
 -- !query 36 schema
-struct<corr(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double>
+struct<CAST(corr(CAST(b AS DOUBLE), CAST(udf(a) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
 -- !query 36 output
-0.1396345165178734
+0.14

 -- !query 37
-SELECT count(four) AS cnt_1000 FROM onek
+SELECT count(udf(four)) AS cnt_1000 FROM onek
 -- !query 37 schema
 struct<cnt_1000:bigint>
 -- !query 37 output
 -313,18 +314,18  struct<cnt_1000:bigint>

 -- !query 38
-SELECT count(DISTINCT four) AS cnt_4 FROM onek
+SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek
 -- !query 38 schema
-struct<cnt_4:bigint>
+struct<cnt_4:string>
 -- !query 38 output
 4

 -- !query 39
-select ten, count(*), sum(four) from onek
+select ten, udf(count(*)), CAST(sum(udf(four)) AS int) from onek
 group by ten order by ten
 -- !query 39 schema
-struct<ten:int,count(1):bigint,sum(four):bigint>
+struct<ten:int,udf(count(1)):string,CAST(sum(CAST(udf(four) AS DOUBLE)) AS INT):int>
 -- !query 39 output
 0      100     100
 1      100     200
 -339,10 +340,10  struct<ten:int,count(1):bigint,sum(four):bigint>

 -- !query 40
-select ten, count(four), sum(DISTINCT four) from onek
+select ten, count(udf(four)), udf(sum(DISTINCT four)) from onek
 group by ten order by ten
 -- !query 40 schema
-struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint>
+struct<ten:int,count(udf(four)):bigint,udf(sum(distinct cast(four as bigint))):string>
 -- !query 40 output
 0      100     2
 1      100     4
 -357,11 +358,11  struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint>

 -- !query 41
-select ten, sum(distinct four) from onek a
+select ten, udf(sum(distinct four)) from onek a
 group by ten
-having exists (select 1 from onek b where sum(distinct a.four) = b.four)
+having exists (select 1 from onek b where udf(sum(distinct a.four)) = b.four)
 -- !query 41 schema
-struct<ten:int,sum(DISTINCT four):bigint>
+struct<ten:int,udf(sum(distinct cast(four as bigint))):string>
 -- !query 41 output
 0      2
 2      2
 -374,23 +375,23  struct<ten:int,sum(DISTINCT four):bigint>
 select ten, sum(distinct four) from onek a
 group by ten
 having exists (select 1 from onek b
-               where sum(distinct a.four + b.four) = b.four)
+               where sum(distinct a.four + b.four) = udf(b.four))
 -- !query 42 schema
 struct<>
 -- !query 42 output
 org.apache.spark.sql.AnalysisException

 Aggregate/Window/Generate expressions are not valid in where clause of the query.
-Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(b.`four` AS BIGINT))]
+Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(udf(four) AS BIGINT))]
 Invalid expressions: [sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT))];

 -- !query 43
 select
-  (select max((select i.unique2 from tenk1 i where i.unique1 = o.unique1)))
+  (select udf(max((select i.unique2 from tenk1 i where i.unique1 = o.unique1))))
 from tenk1 o
 -- !query 43 schema
 struct<>
 -- !query 43 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 63
+cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 67
```

</p>
</details>

## How was this patch tested?

Manually tested in local.

Also, with JDK 11:

```
Using /.../jdk-11.0.3.jdk/Contents/Home as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from /.../spark/project
[info] Updating {file:/.../spark/project/}spark-build...
...
[info] SQLQueryTestSuite:
...
[info] - udf/pgSQL/udf-aggregates_part1.sql - Scala UDF (17 seconds, 228 milliseconds)
[info] - udf/pgSQL/udf-aggregates_part1.sql - Regular Python UDF (36 seconds, 170 milliseconds)
[info] - udf/pgSQL/udf-aggregates_part1.sql - Scalar Pandas UDF (41 seconds, 132 milliseconds)
...
```

Closes apache#25110 from HyukjinKwon/SPARK-28270-1.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
vinodkc pushed a commit to vinodkc/spark that referenced this pull request Jul 18, 2019
…nput of UDF as double in the failed test in udf-aggregate_part1.sql

## What changes were proposed in this pull request?

It still can be flaky on certain environments due to float limitation described at apache#25110 . See apache#25110 (comment)

- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/6584/testReport/org.apache.spark.sql/SQLQueryTestSuite/udf_pgSQL_udf_aggregates_part1_sql___Regular_Python_UDF/

```
Expected "700000000000[6] 1", but got "700000000000[5] 1" Result did not match for query apache#33&apache#10;SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))&apache#10;FROM (VALUES (7000000000005), (7000000000007)) v(x)
```

Here;s what's going on: apache#25110 (comment)

```
scala> Seq("7000000000004.999", "7000000000006.999").toDF().selectExpr("CAST(avg(value) AS long)").show()
+--------------------------+
|CAST(avg(value) AS BIGINT)|
+--------------------------+
|             7000000000005|
+--------------------------+
```

Therefore, this PR just avoid to cast in the specific test.

This is a temp fix. We need more robust way to avoid such cases.

## How was this patch tested?

It passes with Maven in my local before/after this PR. I believe the problem seems similarly the Python or OS installed in the machine. I should test this against PR builder with `test-maven` for sure..

Closes apache#25128 from HyukjinKwon/SPARK-28270-2.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
vinodkc pushed a commit to vinodkc/spark that referenced this pull request Jul 18, 2019
… making UDFs (virtually) no-op

## What changes were proposed in this pull request?

Current UDFs available in `IntegratedUDFTestUtils` are not exactly no-op. It converts input column to strings and outputs to strings.

It causes some issues when we convert and port the tests at SPARK-27921. Integrated UDF test cases share one output file and it should outputs the same. However,

1. Special values are converted into strings differently:

    | Scala      | Python |
    | ---------- | ------ |
    | `null`     | `None` |
    | `Infinity` | `inf`  |
    | `-Infinity`| `-inf` |
    | `NaN`      | `nan`  |

2. Due to float limitation at Python (see https://docs.python.org/3/tutorial/floatingpoint.html), if float is passed into Python and sent back to JVM, the values are potentially not exactly correct. See apache#25128 and apache#25110

To work around this, this PR targets to change the current UDF to be wrapped by cast. So, Input column is casted into string, UDF returns strings as are, and then output column is casted back to the input column.

Roughly:

**Before:**

```
JVM (col1) -> (cast to string within Python) Python (string) -> (string) JVM
```

**After:**

```
JVM (cast col1 to string) -> (string) Python (string) -> (cast back to col1's type) JVM
```

In this way, UDF is virtually no-op although there might be some subtleties due to roundtrip in string cast. I believe this is good enough.

Python native functions and Scala native functions will take strings and output strings as are. So, there will be no potential test failures due to differences of conversion between Python and Scala.

After this fix, for instance, `udf-aggregates_part1.sql` outputs exactly same as `aggregates_part1.sql`:

<details><summary>Diff comparing to 'pgSQL/aggregates_part1.sql'</summary>
<p>

```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
index 51ca1d55869..801735781c7 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
 -3,7 +3,7

 -- !query 0
-SELECT avg(four) AS avg_1 FROM onek
+SELECT avg(udf(four)) AS avg_1 FROM onek
 -- !query 0 schema
 struct<avg_1:double>
 -- !query 0 output
 -11,7 +11,7  struct<avg_1:double>

 -- !query 1
-SELECT avg(a) AS avg_32 FROM aggtest WHERE a < 100
+SELECT udf(avg(a)) AS avg_32 FROM aggtest WHERE a < 100
 -- !query 1 schema
 struct<avg_32:double>
 -- !query 1 output
 -19,7 +19,7  struct<avg_32:double>

 -- !query 2
-select CAST(avg(b) AS Decimal(10,3)) AS avg_107_943 FROM aggtest
+select CAST(avg(udf(b)) AS Decimal(10,3)) AS avg_107_943 FROM aggtest
 -- !query 2 schema
 struct<avg_107_943:decimal(10,3)>
 -- !query 2 output
 -27,7 +27,7  struct<avg_107_943:decimal(10,3)>

 -- !query 3
-SELECT sum(four) AS sum_1500 FROM onek
+SELECT sum(udf(four)) AS sum_1500 FROM onek
 -- !query 3 schema
 struct<sum_1500:bigint>
 -- !query 3 output
 -35,7 +35,7  struct<sum_1500:bigint>

 -- !query 4
-SELECT sum(a) AS sum_198 FROM aggtest
+SELECT udf(sum(a)) AS sum_198 FROM aggtest
 -- !query 4 schema
 struct<sum_198:bigint>
 -- !query 4 output
 -43,7 +43,7  struct<sum_198:bigint>

 -- !query 5
-SELECT sum(b) AS avg_431_773 FROM aggtest
+SELECT udf(udf(sum(b))) AS avg_431_773 FROM aggtest
 -- !query 5 schema
 struct<avg_431_773:double>
 -- !query 5 output
 -51,7 +51,7  struct<avg_431_773:double>

 -- !query 6
-SELECT max(four) AS max_3 FROM onek
+SELECT udf(max(four)) AS max_3 FROM onek
 -- !query 6 schema
 struct<max_3:int>
 -- !query 6 output
 -59,7 +59,7  struct<max_3:int>

 -- !query 7
-SELECT max(a) AS max_100 FROM aggtest
+SELECT max(udf(a)) AS max_100 FROM aggtest
 -- !query 7 schema
 struct<max_100:int>
 -- !query 7 output
 -67,7 +67,7  struct<max_100:int>

 -- !query 8
-SELECT max(aggtest.b) AS max_324_78 FROM aggtest
+SELECT udf(udf(max(aggtest.b))) AS max_324_78 FROM aggtest
 -- !query 8 schema
 struct<max_324_78:float>
 -- !query 8 output
 -75,237 +75,238  struct<max_324_78:float>

 -- !query 9
-SELECT stddev_pop(b) FROM aggtest
+SELECT stddev_pop(udf(b)) FROM aggtest
 -- !query 9 schema
-struct<stddev_pop(CAST(b AS DOUBLE)):double>
+struct<stddev_pop(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS DOUBLE)):double>
 -- !query 9 output
 131.10703231895047

 -- !query 10
-SELECT stddev_samp(b) FROM aggtest
+SELECT udf(stddev_samp(b)) FROM aggtest
 -- !query 10 schema
-struct<stddev_samp(CAST(b AS DOUBLE)):double>
+struct<CAST(udf(cast(stddev_samp(cast(b as double)) as string)) AS DOUBLE):double>
 -- !query 10 output
 151.38936080399804

 -- !query 11
-SELECT var_pop(b) FROM aggtest
+SELECT var_pop(udf(b)) FROM aggtest
 -- !query 11 schema
-struct<var_pop(CAST(b AS DOUBLE)):double>
+struct<var_pop(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS DOUBLE)):double>
 -- !query 11 output
 17189.053923482323

 -- !query 12
-SELECT var_samp(b) FROM aggtest
+SELECT udf(var_samp(b)) FROM aggtest
 -- !query 12 schema
-struct<var_samp(CAST(b AS DOUBLE)):double>
+struct<CAST(udf(cast(var_samp(cast(b as double)) as string)) AS DOUBLE):double>
 -- !query 12 output
 22918.738564643096

 -- !query 13
-SELECT stddev_pop(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT udf(stddev_pop(CAST(b AS Decimal(38,0)))) FROM aggtest
 -- !query 13 schema
-struct<stddev_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(udf(cast(stddev_pop(cast(cast(b as decimal(38,0)) as double)) as string)) AS DOUBLE):double>
 -- !query 13 output
 131.18117242958306

 -- !query 14
-SELECT stddev_samp(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT stddev_samp(CAST(udf(b) AS Decimal(38,0))) FROM aggtest
 -- !query 14 schema
-struct<stddev_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<stddev_samp(CAST(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS DECIMAL(38,0)) AS DOUBLE)):double>
 -- !query 14 output
 151.47497042966097

 -- !query 15
-SELECT var_pop(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT udf(var_pop(CAST(b AS Decimal(38,0)))) FROM aggtest
 -- !query 15 schema
-struct<var_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(udf(cast(var_pop(cast(cast(b as decimal(38,0)) as double)) as string)) AS DOUBLE):double>
 -- !query 15 output
 17208.5

 -- !query 16
-SELECT var_samp(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT var_samp(udf(CAST(b AS Decimal(38,0)))) FROM aggtest
 -- !query 16 schema
-struct<var_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<var_samp(CAST(CAST(udf(cast(cast(b as decimal(38,0)) as string)) AS DECIMAL(38,0)) AS DOUBLE)):double>
 -- !query 16 output
 22944.666666666668

 -- !query 17
-SELECT var_pop(1.0), var_samp(2.0)
+SELECT udf(var_pop(1.0)), var_samp(udf(2.0))
 -- !query 17 schema
-struct<var_pop(CAST(1.0 AS DOUBLE)):double,var_samp(CAST(2.0 AS DOUBLE)):double>
+struct<CAST(udf(cast(var_pop(cast(1.0 as double)) as string)) AS DOUBLE):double,var_samp(CAST(CAST(udf(cast(2.0 as string)) AS DECIMAL(2,1)) AS DOUBLE)):double>
 -- !query 17 output
 0.0    NaN

 -- !query 18
-SELECT stddev_pop(CAST(3.0 AS Decimal(38,0))), stddev_samp(CAST(4.0 AS Decimal(38,0)))
+SELECT stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))), stddev_samp(CAST(udf(4.0) AS Decimal(38,0)))
 -- !query 18 schema
-struct<stddev_pop(CAST(CAST(3.0 AS DECIMAL(38,0)) AS DOUBLE)):double,stddev_samp(CAST(CAST(4.0 AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<stddev_pop(CAST(CAST(udf(cast(cast(3.0 as decimal(38,0)) as string)) AS DECIMAL(38,0)) AS DOUBLE)):double,stddev_samp(CAST(CAST(CAST(udf(cast(4.0 as string)) AS DECIMAL(2,1)) AS DECIMAL(38,0)) AS DOUBLE)):double>
 -- !query 18 output
 0.0    NaN

 -- !query 19
-select sum(CAST(null AS int)) from range(1,4)
+select sum(udf(CAST(null AS int))) from range(1,4)
 -- !query 19 schema
-struct<sum(CAST(NULL AS INT)):bigint>
+struct<sum(CAST(udf(cast(cast(null as int) as string)) AS INT)):bigint>
 -- !query 19 output
 NULL

 -- !query 20
-select sum(CAST(null AS long)) from range(1,4)
+select sum(udf(CAST(null AS long))) from range(1,4)
 -- !query 20 schema
-struct<sum(CAST(NULL AS BIGINT)):bigint>
+struct<sum(CAST(udf(cast(cast(null as bigint) as string)) AS BIGINT)):bigint>
 -- !query 20 output
 NULL

 -- !query 21
-select sum(CAST(null AS Decimal(38,0))) from range(1,4)
+select sum(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
 -- !query 21 schema
-struct<sum(CAST(NULL AS DECIMAL(38,0))):decimal(38,0)>
+struct<sum(CAST(udf(cast(cast(null as decimal(38,0)) as string)) AS DECIMAL(38,0))):decimal(38,0)>
 -- !query 21 output
 NULL

 -- !query 22
-select sum(CAST(null AS DOUBLE)) from range(1,4)
+select sum(udf(CAST(null AS DOUBLE))) from range(1,4)
 -- !query 22 schema
-struct<sum(CAST(NULL AS DOUBLE)):double>
+struct<sum(CAST(udf(cast(cast(null as double) as string)) AS DOUBLE)):double>
 -- !query 22 output
 NULL

 -- !query 23
-select avg(CAST(null AS int)) from range(1,4)
+select avg(udf(CAST(null AS int))) from range(1,4)
 -- !query 23 schema
-struct<avg(CAST(NULL AS INT)):double>
+struct<avg(CAST(udf(cast(cast(null as int) as string)) AS INT)):double>
 -- !query 23 output
 NULL

 -- !query 24
-select avg(CAST(null AS long)) from range(1,4)
+select avg(udf(CAST(null AS long))) from range(1,4)
 -- !query 24 schema
-struct<avg(CAST(NULL AS BIGINT)):double>
+struct<avg(CAST(udf(cast(cast(null as bigint) as string)) AS BIGINT)):double>
 -- !query 24 output
 NULL

 -- !query 25
-select avg(CAST(null AS Decimal(38,0))) from range(1,4)
+select avg(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
 -- !query 25 schema
-struct<avg(CAST(NULL AS DECIMAL(38,0))):decimal(38,4)>
+struct<avg(CAST(udf(cast(cast(null as decimal(38,0)) as string)) AS DECIMAL(38,0))):decimal(38,4)>
 -- !query 25 output
 NULL

 -- !query 26
-select avg(CAST(null AS DOUBLE)) from range(1,4)
+select avg(udf(CAST(null AS DOUBLE))) from range(1,4)
 -- !query 26 schema
-struct<avg(CAST(NULL AS DOUBLE)):double>
+struct<avg(CAST(udf(cast(cast(null as double) as string)) AS DOUBLE)):double>
 -- !query 26 output
 NULL

 -- !query 27
-select sum(CAST('NaN' AS DOUBLE)) from range(1,4)
+select sum(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
 -- !query 27 schema
-struct<sum(CAST(NaN AS DOUBLE)):double>
+struct<sum(CAST(CAST(udf(cast(NaN as string)) AS STRING) AS DOUBLE)):double>
 -- !query 27 output
 NaN

 -- !query 28
-select avg(CAST('NaN' AS DOUBLE)) from range(1,4)
+select avg(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
 -- !query 28 schema
-struct<avg(CAST(NaN AS DOUBLE)):double>
+struct<avg(CAST(CAST(udf(cast(NaN as string)) AS STRING) AS DOUBLE)):double>
 -- !query 28 output
 NaN

 -- !query 30
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
 FROM (VALUES ('Infinity'), ('1')) v(x)
 -- !query 30 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double,var_pop(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double>
 -- !query 30 output
 Infinity       NaN

 -- !query 31
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
 FROM (VALUES ('Infinity'), ('Infinity')) v(x)
 -- !query 31 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double,var_pop(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double>
 -- !query 31 output
 Infinity       NaN

 -- !query 32
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
 FROM (VALUES ('-Infinity'), ('Infinity')) v(x)
 -- !query 32 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double,var_pop(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double>
 -- !query 32 output
 NaN    NaN

 -- !query 33
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE)))
 FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x)
 -- !query 33 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(cast(cast(x as double) as string)) AS DOUBLE)):double,CAST(udf(cast(var_pop(cast(x as double)) as string)) AS DOUBLE):double>
 -- !query 33 output
 1.00000005E8   2.5

 -- !query 34
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE)))
 FROM (VALUES (7000000000005), (7000000000007)) v(x)
 -- !query 34 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(cast(cast(x as double) as string)) AS DOUBLE)):double,CAST(udf(cast(var_pop(cast(x as double)) as string)) AS DOUBLE):double>
 -- !query 34 output
 7.000000000006E12      1.0

 -- !query 35
-SELECT covar_pop(b, a), covar_samp(b, a) FROM aggtest
+SELECT udf(covar_pop(b, udf(a))), covar_samp(udf(b), a) FROM aggtest
 -- !query 35 schema
-struct<covar_pop(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double,covar_samp(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double>
+struct<CAST(udf(cast(covar_pop(cast(b as double), cast(cast(udf(cast(a as string)) as int) as double)) as string)) AS DOUBLE):double,covar_samp(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS DOUBLE), CAST(a AS DOUBLE)):double>
 -- !query 35 output
 653.6289553875104      871.5052738500139

 -- !query 36
-SELECT corr(b, a) FROM aggtest
+SELECT corr(b, udf(a)) FROM aggtest
 -- !query 36 schema
-struct<corr(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double>
+struct<corr(CAST(b AS DOUBLE), CAST(CAST(udf(cast(a as string)) AS INT) AS DOUBLE)):double>
 -- !query 36 output
 0.1396345165178734

 -- !query 37
-SELECT count(four) AS cnt_1000 FROM onek
+SELECT count(udf(four)) AS cnt_1000 FROM onek
 -- !query 37 schema
 struct<cnt_1000:bigint>
 -- !query 37 output
 -313,7 +314,7  struct<cnt_1000:bigint>

 -- !query 38
-SELECT count(DISTINCT four) AS cnt_4 FROM onek
+SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek
 -- !query 38 schema
 struct<cnt_4:bigint>
 -- !query 38 output
 -321,10 +322,10  struct<cnt_4:bigint>

 -- !query 39
-select ten, count(*), sum(four) from onek
+select ten, udf(count(*)), sum(udf(four)) from onek
 group by ten order by ten
 -- !query 39 schema
-struct<ten:int,count(1):bigint,sum(four):bigint>
+struct<ten:int,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint,sum(CAST(udf(cast(four as string)) AS INT)):bigint>
 -- !query 39 output
 0      100     100
 1      100     200
 -339,10 +340,10  struct<ten:int,count(1):bigint,sum(four):bigint>

 -- !query 40
-select ten, count(four), sum(DISTINCT four) from onek
+select ten, count(udf(four)), udf(sum(DISTINCT four)) from onek
 group by ten order by ten
 -- !query 40 schema
-struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint>
+struct<ten:int,count(CAST(udf(cast(four as string)) AS INT)):bigint,CAST(udf(cast(sum(distinct cast(four as bigint)) as string)) AS BIGINT):bigint>
 -- !query 40 output
 0      100     2
 1      100     4
 -357,11 +358,11  struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint>

 -- !query 41
-select ten, sum(distinct four) from onek a
+select ten, udf(sum(distinct four)) from onek a
 group by ten
-having exists (select 1 from onek b where sum(distinct a.four) = b.four)
+having exists (select 1 from onek b where udf(sum(distinct a.four)) = b.four)
 -- !query 41 schema
-struct<ten:int,sum(DISTINCT four):bigint>
+struct<ten:int,CAST(udf(cast(sum(distinct cast(four as bigint)) as string)) AS BIGINT):bigint>
 -- !query 41 output
 0      2
 2      2
 -374,23 +375,23  struct<ten:int,sum(DISTINCT four):bigint>
 select ten, sum(distinct four) from onek a
 group by ten
 having exists (select 1 from onek b
-               where sum(distinct a.four + b.four) = b.four)
+               where sum(distinct a.four + b.four) = udf(b.four))
 -- !query 42 schema
 struct<>
 -- !query 42 output
 org.apache.spark.sql.AnalysisException

 Aggregate/Window/Generate expressions are not valid in where clause of the query.
-Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(b.`four` AS BIGINT))]
+Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(CAST(udf(cast(four as string)) AS INT) AS BIGINT))]
 Invalid expressions: [sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT))];

 -- !query 43
 select
-  (select max((select i.unique2 from tenk1 i where i.unique1 = o.unique1)))
+  (select udf(max((select i.unique2 from tenk1 i where i.unique1 = o.unique1))))
 from tenk1 o
 -- !query 43 schema
 struct<>
 -- !query 43 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 63
+cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 67
```
</p>
</details>

## How was this patch tested?

Manually tested.

Closes apache#25130 from HyukjinKwon/SPARK-28359.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@HyukjinKwon HyukjinKwon deleted the SPARK-28270-1 branch March 3, 2020 01:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants