Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28270][SQL][FOLLOW-UP] Explicitly cast into int/long/decimal in udf-aggregates_part1.sql to avoid Python float limitation #25110

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
-- Note that currently registered UDF returns a string. So there are some differences, for instance
-- in string cast within UDF in Scala and Python.

SELECT avg(udf(four)) AS avg_1 FROM onek;
SELECT CAST(avg(udf(four)) AS decimal(10,3)) AS avg_1 FROM onek;

SELECT udf(avg(a)) AS avg_32 FROM aggtest WHERE a < 100;
SELECT CAST(udf(avg(a)) AS decimal(10,3)) AS avg_32 FROM aggtest WHERE a < 100;

-- In 7.1, avg(float4) is computed using float8 arithmetic.
-- Round the result to 3 digits to avoid platform-specific results.
Expand All @@ -23,32 +23,32 @@ select CAST(avg(udf(b)) AS Decimal(10,3)) AS avg_107_943 FROM aggtest;
-- `student` has a column with data type POINT, which is not supported by Spark [SPARK-27766]
-- SELECT avg(gpa) AS avg_3_4 FROM ONLY student;

SELECT sum(udf(four)) AS sum_1500 FROM onek;
SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess only float/double has the issue, we cast int/long just for more robust too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, we don't have to. But casting makes closer to the original results:

 -- !query 3
-SELECT sum(four) AS sum_1500 FROM onek
+SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek
 -- !query 3 schema
-struct<sum_1500:bigint>
+struct<sum_1500:int>
 -- !query 3 output
 1500

If we don't cast, it returns a double due to type coercion. Should be fine.

SELECT udf(sum(a)) AS sum_198 FROM aggtest;
SELECT udf(udf(sum(b))) AS avg_431_773 FROM aggtest;
SELECT CAST(udf(udf(sum(b))) AS decimal(10,3)) AS avg_431_773 FROM aggtest;
-- `student` has a column with data type POINT, which is not supported by Spark [SPARK-27766]
-- SELECT sum(gpa) AS avg_6_8 FROM ONLY student;

SELECT udf(max(four)) AS max_3 FROM onek;
SELECT max(udf(a)) AS max_100 FROM aggtest;
SELECT CAST(udf(udf(max(aggtest.b))) AS int) AS max_324_78 FROM aggtest;
SELECT max(CAST(udf(a) AS int)) AS max_100 FROM aggtest;
SELECT CAST(udf(udf(max(aggtest.b))) AS decimal(10,3)) AS max_324_78 FROM aggtest;
-- `student` has a column with data type POINT, which is not supported by Spark [SPARK-27766]
-- SELECT max(student.gpa) AS max_3_7 FROM student;

SELECT CAST(stddev_pop(udf(b)) AS int) FROM aggtest;
SELECT udf(stddev_samp(b)) FROM aggtest;
SELECT CAST(var_pop(udf(b)) as int) FROM aggtest;
SELECT udf(var_samp(b)) FROM aggtest;
SELECT CAST(stddev_pop(udf(b)) AS decimal(10,3)) FROM aggtest;
SELECT CAST(udf(stddev_samp(b)) AS decimal(10,3)) FROM aggtest;
SELECT CAST(var_pop(udf(b)) AS decimal(10,3)) FROM aggtest;
SELECT CAST(udf(var_samp(b)) AS decimal(10,3)) FROM aggtest;

SELECT udf(stddev_pop(CAST(b AS Decimal(38,0)))) FROM aggtest;
SELECT stddev_samp(CAST(udf(b) AS Decimal(38,0))) FROM aggtest;
SELECT udf(var_pop(CAST(b AS Decimal(38,0)))) FROM aggtest;
SELECT var_samp(udf(CAST(b AS Decimal(38,0)))) FROM aggtest;
SELECT CAST(udf(stddev_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest;
SELECT CAST(stddev_samp(CAST(udf(b) AS Decimal(38,0))) AS decimal(10,3)) FROM aggtest;
SELECT CAST(udf(var_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest;
SELECT CAST(var_samp(udf(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest;

-- population variance is defined for a single tuple, sample variance
-- is not
SELECT udf(var_pop(1.0)), var_samp(udf(2.0));
SELECT stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))), stddev_samp(CAST(udf(4.0) AS Decimal(38,0)));
SELECT CAST(udf(var_pop(1.0)) AS int), var_samp(udf(2.0));
SELECT CAST(stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))) AS int), stddev_samp(CAST(udf(4.0) AS Decimal(38,0)));


-- verify correct results for null and NaN inputs
Expand Down Expand Up @@ -76,9 +76,9 @@ FROM (VALUES ('-Infinity'), ('Infinity')) v(x);


-- test accuracy with a large input offset
SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE)))
SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS int), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))
FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x);
SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE)))
SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))
FROM (VALUES (7000000000005), (7000000000007)) v(x);

-- SQL2003 binary aggregates [SPARK-23907]
Expand All @@ -89,8 +89,8 @@ FROM (VALUES (7000000000005), (7000000000007)) v(x);
-- SELECT regr_avgx(b, a), regr_avgy(b, a) FROM aggtest;
-- SELECT regr_r2(b, a) FROM aggtest;
-- SELECT regr_slope(b, a), regr_intercept(b, a) FROM aggtest;
SELECT CAST(udf(covar_pop(b, udf(a))) AS int), CAST(covar_samp(udf(b), a) as int) FROM aggtest;
SELECT corr(b, udf(a)) FROM aggtest;
SELECT CAST(udf(covar_pop(b, udf(a))) AS decimal(10,3)), CAST(covar_samp(udf(b), a) as decimal(10,3)) FROM aggtest;
SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest;


-- test accum and combine functions directly [SPARK-23907]
Expand Down Expand Up @@ -122,7 +122,7 @@ SELECT corr(b, udf(a)) FROM aggtest;
SELECT count(udf(four)) AS cnt_1000 FROM onek;
SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek;

select ten, udf(count(*)), sum(udf(four)) from onek
select ten, udf(count(*)), CAST(sum(udf(four)) AS int) from onek
group by ten order by ten;

select ten, count(udf(four)), udf(sum(DISTINCT four)) from onek
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@


-- !query 0
SELECT avg(udf(four)) AS avg_1 FROM onek
SELECT CAST(avg(udf(four)) AS decimal(10,3)) AS avg_1 FROM onek
-- !query 0 schema
struct<avg_1:double>
struct<avg_1:decimal(10,3)>
-- !query 0 output
1.5


-- !query 1
SELECT udf(avg(a)) AS avg_32 FROM aggtest WHERE a < 100
SELECT CAST(udf(avg(a)) AS decimal(10,3)) AS avg_32 FROM aggtest WHERE a < 100
-- !query 1 schema
struct<avg_32:string>
struct<avg_32:decimal(10,3)>
-- !query 1 output
32.666666666666664
32.667


-- !query 2
Expand All @@ -27,11 +27,11 @@ struct<avg_107_943:decimal(10,3)>


-- !query 3
SELECT sum(udf(four)) AS sum_1500 FROM onek
SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek
-- !query 3 schema
struct<sum_1500:double>
struct<sum_1500:int>
-- !query 3 output
1500.0
1500


-- !query 4
Expand All @@ -43,11 +43,11 @@ struct<sum_198:string>


-- !query 5
SELECT udf(udf(sum(b))) AS avg_431_773 FROM aggtest
SELECT CAST(udf(udf(sum(b))) AS decimal(10,3)) AS avg_431_773 FROM aggtest
-- !query 5 schema
struct<avg_431_773:string>
struct<avg_431_773:decimal(10,3)>
-- !query 5 output
431.77260909229517
431.773


-- !query 6
Expand All @@ -59,99 +59,99 @@ struct<max_3:string>


-- !query 7
SELECT max(udf(a)) AS max_100 FROM aggtest
SELECT max(CAST(udf(a) AS int)) AS max_100 FROM aggtest
-- !query 7 schema
struct<max_100:string>
struct<max_100:int>
-- !query 7 output
56
100


-- !query 8
SELECT CAST(udf(udf(max(aggtest.b))) AS int) AS max_324_78 FROM aggtest
SELECT CAST(udf(udf(max(aggtest.b))) AS decimal(10,3)) AS max_324_78 FROM aggtest
-- !query 8 schema
struct<max_324_78:int>
struct<max_324_78:decimal(10,3)>
-- !query 8 output
324
324.78


-- !query 9
SELECT CAST(stddev_pop(udf(b)) AS int) FROM aggtest
SELECT CAST(stddev_pop(udf(b)) AS decimal(10,3)) FROM aggtest
-- !query 9 schema
struct<CAST(stddev_pop(CAST(udf(b) AS DOUBLE)) AS INT):int>
struct<CAST(stddev_pop(CAST(udf(b) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 9 output
131
131.107


-- !query 10
SELECT udf(stddev_samp(b)) FROM aggtest
SELECT CAST(udf(stddev_samp(b)) AS decimal(10,3)) FROM aggtest
-- !query 10 schema
struct<udf(stddev_samp(cast(b as double))):string>
struct<CAST(udf(stddev_samp(cast(b as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 10 output
151.38936080399804
151.389


-- !query 11
SELECT CAST(var_pop(udf(b)) as int) FROM aggtest
SELECT CAST(var_pop(udf(b)) AS decimal(10,3)) FROM aggtest
-- !query 11 schema
struct<CAST(var_pop(CAST(udf(b) AS DOUBLE)) AS INT):int>
struct<CAST(var_pop(CAST(udf(b) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 11 output
17189
17189.054


-- !query 12
SELECT udf(var_samp(b)) FROM aggtest
SELECT CAST(udf(var_samp(b)) AS decimal(10,3)) FROM aggtest
-- !query 12 schema
struct<udf(var_samp(cast(b as double))):string>
struct<CAST(udf(var_samp(cast(b as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 12 output
22918.738564643096
22918.739


-- !query 13
SELECT udf(stddev_pop(CAST(b AS Decimal(38,0)))) FROM aggtest
SELECT CAST(udf(stddev_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
-- !query 13 schema
struct<udf(stddev_pop(cast(cast(b as decimal(38,0)) as double))):string>
struct<CAST(udf(stddev_pop(cast(cast(b as decimal(38,0)) as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 13 output
131.18117242958306
131.181


-- !query 14
SELECT stddev_samp(CAST(udf(b) AS Decimal(38,0))) FROM aggtest
SELECT CAST(stddev_samp(CAST(udf(b) AS Decimal(38,0))) AS decimal(10,3)) FROM aggtest
-- !query 14 schema
struct<stddev_samp(CAST(CAST(udf(b) AS DECIMAL(38,0)) AS DOUBLE)):double>
struct<CAST(stddev_samp(CAST(CAST(udf(b) AS DECIMAL(38,0)) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 14 output
151.47497042966097
151.475


-- !query 15
SELECT udf(var_pop(CAST(b AS Decimal(38,0)))) FROM aggtest
SELECT CAST(udf(var_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
-- !query 15 schema
struct<udf(var_pop(cast(cast(b as decimal(38,0)) as double))):string>
struct<CAST(udf(var_pop(cast(cast(b as decimal(38,0)) as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 15 output
17208.5


-- !query 16
SELECT var_samp(udf(CAST(b AS Decimal(38,0)))) FROM aggtest
SELECT CAST(var_samp(udf(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
-- !query 16 schema
struct<var_samp(CAST(udf(cast(b as decimal(38,0))) AS DOUBLE)):double>
struct<CAST(var_samp(CAST(udf(cast(b as decimal(38,0))) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 16 output
22944.666666666668
22944.667


-- !query 17
SELECT udf(var_pop(1.0)), var_samp(udf(2.0))
SELECT CAST(udf(var_pop(1.0)) AS int), var_samp(udf(2.0))
-- !query 17 schema
struct<udf(var_pop(cast(1.0 as double))):string,var_samp(CAST(udf(2.0) AS DOUBLE)):double>
struct<CAST(udf(var_pop(cast(1.0 as double))) AS INT):int,var_samp(CAST(udf(2.0) AS DOUBLE)):double>
-- !query 17 output
0.0 NaN
0 NaN


-- !query 18
SELECT stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))), stddev_samp(CAST(udf(4.0) AS Decimal(38,0)))
SELECT CAST(stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))) AS int), stddev_samp(CAST(udf(4.0) AS Decimal(38,0)))
-- !query 18 schema
struct<stddev_pop(CAST(udf(cast(3.0 as decimal(38,0))) AS DOUBLE)):double,stddev_samp(CAST(CAST(udf(4.0) AS DECIMAL(38,0)) AS DOUBLE)):double>
struct<CAST(stddev_pop(CAST(udf(cast(3.0 as decimal(38,0))) AS DOUBLE)) AS INT):int,stddev_samp(CAST(CAST(udf(4.0) AS DECIMAL(38,0)) AS DOUBLE)):double>
-- !query 18 output
0.0 NaN
0 NaN


-- !query 19
Expand Down Expand Up @@ -262,37 +262,37 @@ NaN NaN


-- !query 32
SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE)))
SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS int), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))
FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x)
-- !query 32 schema
struct<avg(CAST(udf(cast(x as double)) AS DOUBLE)):double,udf(var_pop(cast(x as double))):string>
struct<CAST(avg(CAST(udf(cast(x as double)) AS DOUBLE)) AS INT):int,CAST(udf(var_pop(cast(x as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 32 output
1.00000005E8 2.5
100000005 2.5


-- !query 33
SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE)))
SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))
FROM (VALUES (7000000000005), (7000000000007)) v(x)
-- !query 33 schema
struct<avg(CAST(udf(cast(x as double)) AS DOUBLE)):double,udf(var_pop(cast(x as double))):string>
struct<CAST(avg(CAST(udf(cast(x as double)) AS DOUBLE)) AS BIGINT):bigint,CAST(udf(var_pop(cast(x as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 33 output
7.000000000006E12 1.0
7000000000006 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazingly, this still fails with INT type. And, Spark seems to return a wrong value due to truncation.

Expected "700000000000[6] 1", but got "700000000000[5] 1" Result did not match for query #33&#010;SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))&#010;FROM (VALUES (7000000000005), (7000000000007)) v(x)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In master branch, all maven Jenkins fail with this. Only SBT Jenkins are running.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dongjoon-hyun. Let me take a look and make a fix soon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jul 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It passes with Maven in my local:

- udf/pgSQL/udf-aggregates_part1.sql - Scala UDF
- udf/pgSQL/udf-aggregates_part1.sql - Regular Python UDF
- udf/pgSQL/udf-aggregates_part1.sql - Scalar Pandas UDF

I believe the problem seems similarly the Python or OS installed in the machine.

Looks here's what's going on:

scala> Seq("7000000000004.999", "7000000000006.999").toDF().selectExpr("CAST(avg(value) AS long)").show()
+--------------------------+
|CAST(avg(value) AS BIGINT)|
+--------------------------+
|             7000000000005|
+--------------------------+

Let me try to find a better way to work around it ...

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we ignore this test first because this is consistent and permanent in Jenkins Maven? This will hide another PR's bugs.

I mean this line of this file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give me a couple of hours? I will disable this line if I can't make it ..



-- !query 34
SELECT CAST(udf(covar_pop(b, udf(a))) AS int), CAST(covar_samp(udf(b), a) as int) FROM aggtest
SELECT CAST(udf(covar_pop(b, udf(a))) AS decimal(10,3)), CAST(covar_samp(udf(b), a) as decimal(10,3)) FROM aggtest
-- !query 34 schema
struct<CAST(udf(covar_pop(cast(b as double), cast(udf(a) as double))) AS INT):int,CAST(covar_samp(CAST(udf(b) AS DOUBLE), CAST(a AS DOUBLE)) AS INT):int>
struct<CAST(udf(covar_pop(cast(b as double), cast(udf(a) as double))) AS DECIMAL(10,3)):decimal(10,3),CAST(covar_samp(CAST(udf(b) AS DOUBLE), CAST(a AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 34 output
653 871
653.629 871.505


-- !query 35
SELECT corr(b, udf(a)) FROM aggtest
SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest
-- !query 35 schema
struct<corr(CAST(b AS DOUBLE), CAST(udf(a) AS DOUBLE)):double>
struct<CAST(corr(CAST(b AS DOUBLE), CAST(udf(a) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 35 output
0.1396345165178734
0.14


-- !query 36
Expand All @@ -312,21 +312,21 @@ struct<cnt_4:string>


-- !query 38
select ten, udf(count(*)), sum(udf(four)) from onek
select ten, udf(count(*)), CAST(sum(udf(four)) AS int) from onek
group by ten order by ten
-- !query 38 schema
struct<ten:int,udf(count(1)):string,sum(CAST(udf(four) AS DOUBLE)):double>
struct<ten:int,udf(count(1)):string,CAST(sum(CAST(udf(four) AS DOUBLE)) AS INT):int>
-- !query 38 output
0 100 100.0
1 100 200.0
2 100 100.0
3 100 200.0
4 100 100.0
5 100 200.0
6 100 100.0
7 100 200.0
8 100 100.0
9 100 200.0
0 100 100
1 100 200
2 100 100
3 100 200
4 100 100
5 100 200
6 100 100
7 100 200
8 100 100
9 100 200


-- !query 39
Expand Down