Skip to content

Commit

Permalink
[CARMEL-5851] Push partial aggregate through join (#1043)
Browse files Browse the repository at this point in the history
* [CARMEL-5851] Push partial aggregate through join  (#999)

* [CARMEL-5851] Push partial aggregate through join (#977)

* [CARMEL-5851] Make partial aggregation adaptive (#892)

* Make partial aggregation adaptive

* Fix codegen

* fix

* Support group only

* Fix test error

* Only support deterministic

* Add another config

* Fix data issue

* fix

* Remove isSupportPartialAgg

* Fix

* Deduplicate right side of left semi anti join (#893)

* DeduplicateRightSideOfLeftSemiAntiJoin

* Fix test

* Add test

* Introduce stats

* Fix

* PushPartialAggregationThroughJoin

* PushPartialAggregationThroughJoin

* isPartialAgg = true,

* push project through join

* Add PullOutGroupingExpressions and reduce changes

* val (leftProjectList, rightProjectList, remainingProjectList) =
      split(projectList ++ join.condition.map(_.references.toSeq).getOrElse(Nil),
        join.left, join.right)

* Fix java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression cannot be cast to org.apache.spark.sql.catalyst.expressions.NamedExpression
	at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:297)
	at org.apache.hive.jdbc.HiveStatement.executeQuery(HiveStatement.java:392)
	at com.ebay.carmel.spark.BenchmarkAndVerifyResult$.$anonfun$main$1(BenchmarkAndVerifyResult.scala:156)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at com.ebay.carmel.spark.BenchmarkAndVerifyResult$.main(BenchmarkAndVerifyResult.scala:144)
	at com.ebay.carmel.spark.BenchmarkAndVerifyResult.main(BenchmarkAndVerifyResult.scala)

* Fix TPCDS q3 reuslt incorrect:
```
0: jdbc:hive2://10.211.174.151:10000/access_v> SELECT
. . . . . . . . . . . . . . . . . . . . . . .>   dt.d_year,
. . . . . . . . . . . . . . . . . . . . . . .>   item.i_brand_id brand_id,
. . . . . . . . . . . . . . . . . . . . . . .>   item.i_brand brand,
. . . . . . . . . . . . . . . . . . . . . . .>   SUM(cast(ss_ext_sales_price as decimal(17, 2))) sum_agg
. . . . . . . . . . . . . . . . . . . . . . .> FROM date_dim dt, store_sales, item
. . . . . . . . . . . . . . . . . . . . . . .> WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
. . . . . . . . . . . . . . . . . . . . . . .>   AND store_sales.ss_item_sk = item.i_item_sk
. . . . . . . . . . . . . . . . . . . . . . .>   AND item.i_manufact_id = 128
. . . . . . . . . . . . . . . . . . . . . . .>   AND dt.d_moy = 11
. . . . . . . . . . . . . . . . . . . . . . .> GROUP BY dt.d_year, item.i_brand, item.i_brand_id
. . . . . . . . . . . . . . . . . . . . . . .> ORDER BY dt.d_year, sum_agg DESC, brand_id, brand
. . . . . . . . . . . . . . . . . . . . . . .> LIMIT 10;
+---------+-----------+---------------------+--------------+
| d_year  | brand_id  |        brand        |   sum_agg    |
+---------+-----------+---------------------+--------------+
| 1998    | 2003001   | exportiimporto #1   | 43900603.69  |
| 1998    | 1002001   | importoamalg #1     | 35836273.32  |
| 1998    | 1004001   | edu packamalg #1    | 35775953.92  |
| 1998    | 5001001   | amalgscholar #1     | 35538345.92  |
| 1998    | 4001001   | amalgedu pack #1    | 35317861.64  |
| 1998    | 5004001   | edu packscholar #1  | 35302613.66  |
| 1998    | 3003001   | exportiexporti #1   | 35006929.11  |
| 1998    | 2004001   | edu packimporto #1  | 26473180.83  |
| 1998    | 4002001   | importoedu pack #1  | 26176292.12  |
| 1998    | 2002001   | importoimporto #1   | 26171441.74  |
+---------+-----------+---------------------+--------------+
10 rows selected (5.041 seconds)
0: jdbc:hive2://10.211.174.151:10000/access_v> SELECT
. . . . . . . . . . . . . . . . . . . . . . .>   dt.d_year,
. . . . . . . . . . . . . . . . . . . . . . .>   item.i_brand_id brand_id,
. . . . . . . . . . . . . . . . . . . . . . .>   item.i_brand brand,
. . . . . . . . . . . . . . . . . . . . . . .>   SUM(ss_ext_sales_price) sum_agg
. . . . . . . . . . . . . . . . . . . . . . .> FROM date_dim dt, store_sales, item
. . . . . . . . . . . . . . . . . . . . . . .> WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
. . . . . . . . . . . . . . . . . . . . . . .>   AND store_sales.ss_item_sk = item.i_item_sk
. . . . . . . . . . . . . . . . . . . . . . .>   AND item.i_manufact_id = 128
. . . . . . . . . . . . . . . . . . . . . . .>   AND dt.d_moy = 11
. . . . . . . . . . . . . . . . . . . . . . .> GROUP BY dt.d_year, item.i_brand, item.i_brand_id
. . . . . . . . . . . . . . . . . . . . . . .> ORDER BY dt.d_year, sum_agg DESC, brand_id, brand
. . . . . . . . . . . . . . . . . . . . . . .> LIMIT 10;
+---------+-----------+----------------------+--------------+
| d_year  | brand_id  |        brand         |   sum_agg    |
+---------+-----------+----------------------+--------------+
| 1998    | 2003001   | exportiimporto #1    | 15851205.06  |
| 1998    | 3003001   | exportiexporti #1    | 12790869.96  |
| 1998    | 5001001   | amalgscholar #1      | 12763633.47  |
| 1998    | 1004001   | edu packamalg #1     | 12603183.68  |
| 1998    | 4001001   | amalgedu pack #1     | 12268486.99  |
| 1998    | 5004001   | edu packscholar #1   | 11667142.19  |
| 1998    | 1002001   | importoamalg #1      | 11336379.88  |
| 1998    | 2004001   | edu packimporto #1   | 9165179.56   |
| 1998    | 2002001   | importoimporto #1    | 9148014.59   |
| 1998    | 4004001   | edu packedu pack #1  | 8314235.98   |
+---------+-----------+----------------------+--------------+
10 rows selected (8.508 seconds)
```

Try to fix BigInteger out of long range
22/04/23 00:08:59 ERROR Executor: Exception in task 294.3 in stage 3.0 of app application_1644958298137_48668 (TID 471)
java.lang.ArithmeticException: BigInteger out of long range
at java.math.BigInteger.longValueExact(BigInteger.java:4632)
at org.apache.spark.sql.types.Decimal.toUnscaledLong(Decimal.scala:220)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.setDecimal(UnsafeRow.java:281)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregate_sum_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeysOutput_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:50)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:730)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.rdd.RDD$$anon$2.hasNext(RDD.scala:332)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:176)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:129)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:486)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1391)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:489)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

* Fix tpcds q93 RuntimeException: Couldn't find sr_return_quantity#34 in [ss_item_sk#3,ss_customer_sk#4,ss_ticket_number#10L,sr_item_sk#26,sr_reason_sk#32,sr_ticket_number#33]
              	at scala.sys.package$.error(package.scala:30)
              	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
              	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
              	... 217 more

* Fix bbensid q1 TreeNodeException:
```
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: item_id#1077
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:75)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:324)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:324)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:329)
```
```sql
SELECT

	u.user_cntry_id AS byr_cntry_id,

	l.item_site_id AS list_site_id,

	l.auct_type_code,

	cat.sap_category_id AS sap_id,

	bb.src_cre_dt,

	COUNT(bb.item_id) AS blocked_bids,

	COUNT(DISTINCT u.prmry_user_id) AS blocked_buyers

FROM

	access_views.dw_tns_blkd_bid bb

	INNER JOIN access_views.dw_users u ON (bb.byr_id = u.user_id)

	INNER JOIN access_views.dw_lstg_item l ON (bb.item_id = l.item_id)

	INNER JOIN access_views.dw_category_groupings cat ON (l.leaf_categ_id = cat.leaf_categ_id AND l.item_site_id = cat.site_id)

WHERE

	bb.src_cre_dt BETWEEN '2021-07-12' AND '2021-07-15'

	AND l.auct_end_dt >= '2021-07-12'

GROUP BY

	1,2,3,4,5
```

* Fix bbensid q194 NPE:
```
    spark.sql("create table t1(a bigint, b string) using parquet")
    spark.sql("create table t2(x bigint, y string) using parquet")

    spark.sql("insert into t1 values(1, 1), (2, 2)")
    spark.sql("insert into t2 values(1, 1)")

    sql("SELECT distinct COALESCE(t2.y, '100') AS rev_rollup2 FROM t1 left JOIN t2 ON t1.a = t2.x").collect().foreach(println)
    sql("SELECT distinct rev_rollup2 FROM t1 left JOIN (select x,COALESCE(t2.y, '100') AS rev_rollup2 from t2) t2 ON t1.a = t2.x").collect().foreach(println)
```

```
0: jdbc:hive2://10.211.174.26:10000/access_vi> create table t1(a bigint, b string) using parquet;
+---------+
| Result  |
+---------+
+---------+
No rows selected (0.816 seconds)
0: jdbc:hive2://10.211.174.26:10000/access_vi> create table t2(x bigint, y string) using parquet;
+---------+
| Result  |
+---------+
+---------+
No rows selected (0.95 seconds)
0: jdbc:hive2://10.211.174.26:10000/access_vi> insert into t1 values(1, 1), (2, 2);
+---------+
| Result  |
+---------+
+---------+
No rows selected (14.762 seconds)
0: jdbc:hive2://10.211.174.26:10000/access_vi> insert into t2 values(1, 1);
+---------+
| Result  |
+---------+
+---------+
No rows selected (20.527 seconds)
0: jdbc:hive2://10.211.174.26:10000/access_vi> SELECT distinct COALESCE(t2.y, '100') AS rev_rollup2 FROM t1 left JOIN t2 ON t1.a = t2.x;
+--------------+
| rev_rollup2  |
+--------------+
| 100          |
| 1            |
+--------------+
2 rows selected (8.685 seconds)
0: jdbc:hive2://10.211.174.26:10000/access_vi> SELECT distinct rev_rollup2 FROM t1 left JOIN (select x,COALESCE(t2.y, '100') AS rev_rollup2 from t2) t2 ON t1.a = t2.x;
+--------------+
| rev_rollup2  |
+--------------+
| NULL         |
| 1            |
+--------------+
2 rows selected (2.02 seconds)
```

* Enhance: OUTER joins are supported for group by without aggregate functions

* ColumnPruning and CollapseProject support PartialAggregate

* Fix bbendis q65 reuslt incorrect:
```sql
spark-sql> SELECT
         >     w.src_cre_dt,
         >     w.site_id,
         >     l.auct_type_code,
         >     w.vstr_yn_id,
         >     COUNT(w.item_id) AS watches,
         >     count(*)
         > FROM
         >     access_views.dw_myebay_wtch_trk w
         >     INNER JOIN access_views.dw_lstg_item l ON (w.item_id = l.item_id)
         > WHERE
         >     w.src_cre_dt BETWEEN '2021-07-08' AND '2021-07-15'
         >     AND l.auct_end_dt >= '2021-07-08'
         >     AND w.cnvrted_yn_id = 0
         > GROUP BY
         >     1,2,3,4
         > ORDER by 1,2,3,4 limit 5;
```
2021-07-08	0	1	0	4929026	4930784
2021-07-08	0	7	0	711405	711413
2021-07-08	0	8	0	154	154
2021-07-08	0	9	0	6097525	6097948
2021-07-08	0	13	0	123415	123482

rewrite:
```sql
SELECT
	w.src_cre_dt,
	w.site_id,
	l.auct_type_code,
	w.vstr_yn_id,
	COUNT(w.item_id) AS watches,
	sum(w.cnt * l.cnt) AS watches2
FROM
	(select src_cre_dt, site_id, vstr_yn_id, item_id, COUNT(item_id) as cnt from access_views.dw_myebay_wtch_trk where src_cre_dt BETWEEN '2021-07-08' AND '2021-07-15' and cnvrted_yn_id = 0 group by src_cre_dt, site_id, vstr_yn_id, item_id) w
	INNER JOIN (select auct_type_code, item_id, COUNT(*) as cnt from access_views.dw_lstg_item where auct_end_dt >= '2021-07-08' group by auct_type_code, item_id) l ON (w.item_id = l.item_id)

GROUP BY
	1,2,3,4
```

* Fix should not push count aggregate expression if groupingExpressions is empty:
```
spark-sql> create table t1(id int) using parquet;
spark-sql> select count(*) from t1;
0
spark-sql> select sum(0) from t1;
NULL
```

* Fix bbendis q323 RuntimeException:
```sql
0: jdbc:hive2://10.211.174.151:10000/access_v> SELECT
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                    COALESCE(u.prmry_user_id, a.user_id) AS parent_uid,
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                    CAST(modified_date AS DATE) AS modified_date
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                 FROM
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                    access_views.dw_user_past_aliases a
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                    INNER JOIN access_views.dw_users u ON (a.user_id = u.user_id)
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                 WHERE
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                 a.alias_flag = '2'
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                 GROUP BY
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                 1,2
. . . . . . . . . . . . . . . . . . . . . . .> limit 1;
Error: Error running query: java.lang.RuntimeException: Couldn't find _groupingexpression#174653 in [_groupingexpression#174654] (state=,code=0)
```

* Refactor the code

* Support range join case:
```sql

use access_views;

CREATE TEMPORARY TABLE DATE_RANGE AS
(
SELECT
  CAL_DT,
  RETAIL_WEEK,
  RETAIL_YEAR
, TRIM(CAST(RETAIL_YEAR AS INT)) || 'W' || TRIM(SUBSTR(CAST(CAST(RETAIL_WEEK+1000 AS INT) AS VARCHAR(20)), 3)) AS WEEK_ID
, RTL_WEEK_BEG_DT AS WEEK_BEG_DT
, RETAIL_WK_END_DATE AS WEEK_END_DT
, MONTH_BEG_DT, MONTH_END_DT, MONTH_ID
, QTR_BEG_DT, QTR_END_DT, QTR_ID
, YEAR_ID
FROM DW_CAL_DT
WHERE 1=1
AND CAL_DT BETWEEN DATE'2020-01-01' AND CURRENT_DATE
);

create temp table t11 using parquet as
SELECT D.CAL_DT,
	   COUNT(DISTINCT LSTG.ITEM_ID) ITEM_NUM
FROM DATE_RANGE D
INNER JOIN
	(SELECT HOT.AUCT_START_DT,
		   HOT.AUCT_END_DT,
		   HOT.ITEM_ID
	FROM  ACCESS_VIEWS.DW_LSTG_ITEM HOT
	INNER JOIN ACCESS_VIEWS.DW_CATEGORY_GROUPINGS CATE ON CATE.SITE_ID = HOT.ITEM_SITE_ID AND CATE.LEAF_CATEG_ID = HOT.LEAF_CATEG_ID
	INNER JOIN ACCESS_VIEWS.SSA_CURNCY_PLAN_RATE_DIM FX ON FX.CURNCY_ID = HOT.LSTG_CURNCY_ID
	INNER JOIN ACCESS_VIEWS.DW_ITEMS_SHIPPING SHIP ON HOT.ITEM_ID=SHIP.ITEM_ID AND HOT.ITEM_VRSN_ID=SHIP.ITEM_VRSN_ID
	WHERE 1 = 1
	AND HOT.AUCT_TYPE_CODE NOT IN (10,15)
	AND HOT.ITEM_SITE_ID <> 223
	AND CATE.SAP_CATEGORY_ID NOT IN (5,7,23,41,-999)) LSTG
ON D.CAL_DT BETWEEN LSTG.AUCT_START_DT AND LSTG.AUCT_END_DT
GROUP BY 1;

```

* Fix bbendis q311 introduce 2 PartialAggregates:
```sql
== Optimized Logical Plan ==
Aggregate [session_start_dt#84199, site_id#84163, _groupingexpression#84731], [session_start_dt#84199, site_id#84163, count(if ((gid#84733 = 4)) CASE WHEN (av.`type_1` = 'sign_in_visit') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84737 else null) AS sign_in_visit#84095L, count(if ((gid#84733 = 5)) CASE WHEN (av.`type_1` IN ('sign_in_suc', 'reg_suc', 'gxo_suc')) THEN spark_catalog.ubi_t.ubi_event.`guid` END#84735 else null) AS access_succ#84096L, count(if ((gid#84733 = 6)) CASE WHEN (av.`type_1` = 'sign_in_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84736 else null) AS sign_in_succ#84097L, count(if ((gid#84733 = 1)) CASE WHEN (av.`type_1` = 'reg_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84739 else null) AS reg_succ#84098L, count(if ((gid#84733 = 2)) CASE WHEN (av.`type_1` = 'gxo_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84738 else null) AS gxo_succ#84099L, count(if ((gid#84733 = 3)) s165.`guid`#84734 else null) AS tot_visitors#84100L, _groupingexpression#84731 AS experience#84101], Statistics(sizeInBytes=2.96E+38 B)
+- Aggregate [session_start_dt#84199, site_id#84163, _groupingexpression#84731, s165.`guid`#84734, CASE WHEN (av.`type_1` IN ('sign_in_suc', 'reg_suc', 'gxo_suc')) THEN spark_catalog.ubi_t.ubi_event.`guid` END#84735, CASE WHEN (av.`type_1` = 'sign_in_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84736, CASE WHEN (av.`type_1` = 'sign_in_visit') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84737, CASE WHEN (av.`type_1` = 'gxo_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84738, CASE WHEN (av.`type_1` = 'reg_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84739, gid#84733], [session_start_dt#84199, site_id#84163, _groupingexpression#84731, s165.`guid`#84734, CASE WHEN (av.`type_1` IN ('sign_in_suc', 'reg_suc', 'gxo_suc')) THEN spark_catalog.ubi_t.ubi_event.`guid` END#84735, CASE WHEN (av.`type_1` = 'sign_in_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84736, CASE WHEN (av.`type_1` = 'sign_in_visit') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84737, CASE WHEN (av.`type_1` = 'gxo_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84738, CASE WHEN (av.`type_1` = 'reg_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84739, gid#84733], Statistics(sizeInBytes=5.70E+38 B)
   +- Expand [Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, null, null, null, null, null, CASE WHEN (type_1#84089 = reg_suc) THEN guid#80355 END, 1), Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, null, null, null, null, CASE WHEN (type_1#84089 = gxo_suc) THEN guid#80355 END, null, 2), Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, guid#84161, null, null, null, null, null, 3), Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, null, null, null, CASE WHEN (type_1#84089 = sign_in_visit) THEN guid#80355 END, null, null, 4), Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, null, CASE WHEN type_1#84089 IN (sign_in_suc,reg_suc,gxo_suc) THEN guid#80355 END, null, null, null, null, 5), Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, null, null, CASE WHEN (type_1#84089 = sign_in_suc) THEN guid#80355 END, null, null, null, 6)], [session_start_dt#84199, site_id#84163, _groupingexpression#84731, s165.`guid`#84734, CASE WHEN (av.`type_1` IN ('sign_in_suc', 'reg_suc', 'gxo_suc')) THEN spark_catalog.ubi_t.ubi_event.`guid` END#84735, CASE WHEN (av.`type_1` = 'sign_in_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84736, CASE WHEN (av.`type_1` = 'sign_in_visit') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84737, CASE WHEN (av.`type_1` = 'gxo_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84738, CASE WHEN (av.`type_1` = 'reg_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84739, gid#84733], Statistics(sizeInBytes=5.70E+38 B)
      +- Project [guid#84161, site_id#84163, session_start_dt#84199, guid#80355, type_1#84089, CASE WHEN (cobrand#84164 = 0) THEN dWeb WHEN (cobrand#84164 = 7) THEN FSoM WHEN ((cobrand#84164 = 6) AND primary_app_id#84182 IN (1462,2878)) THEN iOS WHEN ((cobrand#84164 = 6) AND (primary_app_id#84182 = 2571)) THEN Android WHEN ((cobrand#84164 = 6) AND (primary_app_id#84182 = 3564)) THEN mWeb ELSE Other END AS _groupingexpression#84731], Statistics(sizeInBytes=5.65E+37 B)
         +- Join LeftOuter, ((((guid#80355 = guid#84161) AND (session_skey#84201L = session_skey#84162L)) AND (session_start_dt#84203 = session_start_dt#84199)) AND (site_id#84714 = cast(site_id#84163 as decimal(10,0)))), Statistics(sizeInBytes=6.43E+37 B)
            :- Project [guid#84161, session_skey#84162L, site_id#84163, cobrand#84164, session_start_dt#84199, primary_app_id#84182], Statistics(sizeInBytes=11.9 GiB)
            :  +- Filter (((((isnotnull(exclude#84189) AND (exclude#84189 = 0)) AND isnotnull(session_start_dt#84199)) AND NOT cast(cobrand#84164 as int) IN (2,3,4,5,9)) AND (session_start_dt#84199 >= 2021-07-14)) AND (session_start_dt#84199 <= 2021-07-15)), Statistics(sizeInBytes=77.0 GiB)
            :     +- Relation p_soj_cl_t.clav_session[guid#84161,session_skey#84162L,site_id#84163,cobrand#84164,cguid#84165,buyer_site_id#84166,lndg_page_id#84167,start_timestamp#84168,end_timestamp#84169,exit_page_id#84170,valid_page_count#84171,gr_cnt#84172,gr_1_cnt#84173,vi_cnt#84174,homepage_cnt#84175,myebay_cnt#84176,signin_cnt#84177,min_sc_seqnum#84178,max_sc_seqnum#84179,signedin_user_id#84180,mapped_user_id#84181,primary_app_id#84182,agent_id#84183L,session_cntry_id#84184,... 15 more fields] parquet, Statistics(sizeInBytes=77.0 GiB)
            +- Union, Statistics(sizeInBytes=5.05E+27 B)
               :- Aggregate [session_start_dt#84203, guid#80355, session_skey#84201L, site_id#84205, _groupingexpression#84732], [session_start_dt#84203, guid#80355, session_skey#84201L, cast(site_id#84205 as decimal(10,0)) AS site_id#84714, _groupingexpression#84732 AS type_1#84089], Statistics(sizeInBytes=8.0 TiB)
               :  +- Project [GUID#80355, SESSIONSKEY#80356L AS SESSION_SKEY#84201L, cast(concat(substr(dt#80385, 0, 4), -, substr(dt#80385, 5, 2), -, substr(dt#80385, 7, 2)) as date) AS SESSION_START_DT#84203, SITEID#80361 AS SITE_ID#84205, CASE WHEN ((PAGEID#80363 IN (4853,2487283,2487285) AND (RDT#80376 = 0)) OR PAGEID#80363 IN (2050445,2050533)) THEN sign_in_visit WHEN PAGEID#80363 IN (2052190,2053938) THEN reg_suc WHEN PAGEID#80363 IN (4852,2051246,2266111) THEN sign_in_suc END AS _groupingexpression#84732], Statistics(sizeInBytes=7.5 TiB)
               :     +- Filter (((((isnotnull(SESSIONSKEY#80356L) AND isnotnull(cast(SITEID#80361 as decimal(10,0)))) AND isnotnull(guid#80355)) AND (cast(concat(substr(dt#80385, 0, 4), -, substr(dt#80385, 5, 2), -, substr(dt#80385, 7, 2)) as date) >= 2021-07-14)) AND (cast(concat(substr(dt#80385, 0, 4), -, substr(dt#80385, 5, 2), -, substr(dt#80385, 7, 2)) as date) <= 2021-07-15)) AND ((((((PAGEID#80363 = 4853) AND (PAGENAME#80364 = signin2)) AND ((lower(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,sgnTabClick)) = signin) OR isnull(lower(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,sgnTabClick))))) OR (PAGEID#80363 IN (2052190,2053938) AND (lower(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,type)) = reg_confirm))) OR (((PAGEID#80363 = 4852) AND isnotnull(cast(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,uid) as decimal(18,0)))) AND isnull(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,sgnFastFYPReset)))) OR (((((PAGEID#80363 = 2266111) AND (cast(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,sgnStatus) as int) = 0)) AND (HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,sgnChannelType) IN (0,2) AND NOT (HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,authMethod) = guest_id_token))) OR PAGEID#80363 IN (2050445,2050533,2051246)) OR (PAGEID#80363 IN (2487283,2487285) AND NOT (HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,SigninRedirect) = V4))))), Statistics(sizeInBytes=46.7 TiB)
               :        +- Relation ubi_t.ubi_event[guid#80355,sessionskey#80356L,seqnum#80357,sessionstartdt#80358L,sojdatadt#80359L,clickid#80360,siteid#80361,version#80362,pageid#80363,pagename#80364,refererhash#80365L,eventtimestamp#80366L,urlquerystring#80367,clientdata#80368,cookies#80369,applicationpayload#80370,webserver#80371,referrer#80372,userid#80373,itemid#80374L,flags#80375,rdt#80376,regu#80377,sqr#80378,... 8 more fields] parquet, Statistics(sizeInBytes=46.7 TiB)
               +- Aggregate [sess_session_start_dt#84669, sess_guid#84665, sess_session_skey#84666L, sess_site_id#84670], [sess_session_start_dt#84669 AS session_start_dt#84090, sess_guid#84665 AS guid#84091, sess_session_skey#84666L AS session_skey#84092L, cast(sess_site_id#84670 as decimal(10,0)) AS site_id#84715, gxo_suc AS type_1#84094], Statistics(sizeInBytes=5.05E+27 B)
                  +- Project [sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], Statistics(sizeInBytes=3.56E+27 B)
                     +- Join Inner, ((((item_id#84428 = item_id#84639) AND (transaction_id#84436 = transaction_id#84640)) AND (auct_end_dt#84429 = auct_end_dt#84641)) AND (created_dt#84440 = created_dt#84650)), Statistics(sizeInBytes=7.12E+27 B)
                        :- PartialAggregate [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440], [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440], Statistics(sizeInBytes=206.1 TiB)
                        :  +- Project [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440], Statistics(sizeInBytes=206.1 TiB)
                        :     +- Join LeftOuter, (cast(lstg_curncy_id#84474 as decimal(9,0)) = curncy_id#72721), Statistics(sizeInBytes=309.2 TiB)
                        :        :- PartialAggregate [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440, lstg_curncy_id#84474], [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440, lstg_curncy_id#84474], Statistics(sizeInBytes=330.8 GiB)
                        :        :  +- Project [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440, lstg_curncy_id#84474], Statistics(sizeInBytes=330.8 GiB)
                        :        :     +- Filter ((((((((isnotnull(created_dt#84440) AND isnotnull(auct_end_dt#84429)) AND isnotnull(CHECKOUT_FLAGS4#84486)) AND (created_dt#84440 >= 2021-07-14)) AND (created_dt#84440 <= 2021-07-16)) AND (auct_end_dt#84429 >= 2021-07-14)) AND isnotnull(item_id#84428)) AND isnotnull(transaction_id#84436)) AND ((cast(CHECKOUT_FLAGS4#84486 as bigint) & 2) > 0)), Statistics(sizeInBytes=10.2 TiB)
                        :        :        +- Relation gdw_tables.dw_checkout_trans[item_id#84428,auct_end_dt#84429,site_id#84430,leaf_categ_id#84431,seller_id#84432,slr_cntry_id#84433,buyer_id#84434,byr_cntry_id#84435,transaction_id#84436,shipping_address_id#84437,sale_type#84438,created_time#84439,created_dt#84440,last_modified#84441,last_modified_dt#84442,checkout_flags#84443,checkout_status#84444,checkout_status_details#84445,payment_method#84446,shipping_fee#84447,shipping_xfee#84448,tax#84449,tax_state#84450,instruction_flag#84451,... 110 more fields] parquet, Statistics(sizeInBytes=10.2 TiB)
                        :        +- PartialAggregate [curncy_id#72721], [curncy_id#72721], Statistics(sizeInBytes=957.0 B)
                        :           +- Project [curncy_id#72721], Statistics(sizeInBytes=957.0 B)
                        :              +- Filter isnotnull(curncy_id#72721), Statistics(sizeInBytes=4.4 KiB)
                        :                 +- Relation gdw_tables.ssa_curncy_plan_rate_dim[CURNCY_ID#72721,CURNCY_PLAN_RATE#72722,CRE_DATE#72723,CRE_USER#72724,UPD_DATE#72725,UPD_USER#72726] parquet, Statistics(sizeInBytes=4.4 KiB)
                        +- PartialAggregate [item_id#84639, transaction_id#84640, auct_end_dt#84641, created_dt#84650, sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], [item_id#84639, transaction_id#84640, auct_end_dt#84641, created_dt#84650, sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], Statistics(sizeInBytes=28.6 TiB)
                           +- PartialAggregate [item_id#84639, transaction_id#84640, auct_end_dt#84641, created_dt#84650, sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], [item_id#84639, transaction_id#84640, auct_end_dt#84641, created_dt#84650, sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], Statistics(sizeInBytes=28.6 TiB)
                              +- Project [item_id#84639, transaction_id#84640, auct_end_dt#84641, created_dt#84650, sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], Statistics(sizeInBytes=28.6 TiB)
                                 +- Filter ((((((((((((isnotnull(sess_session_start_dt#84669) AND isnotnull(created_dt#84650)) AND isnotnull(auct_end_dt#84641)) AND (created_dt#84650 >= 2021-07-14)) AND (created_dt#84650 <= 2021-07-16)) AND (sess_session_start_dt#84669 >= 2021-07-14)) AND (sess_session_start_dt#84669 <= 2021-07-15)) AND (auct_end_dt#84641 >= 2021-07-14)) AND isnotnull(item_id#84639)) AND isnotnull(transaction_id#84640)) AND isnotnull(sess_session_skey#84666L)) AND isnotnull(cast(sess_site_id#84670 as decimal(10,0)))) AND isnotnull(sess_guid#84665)), Statistics(sizeInBytes=318.1 TiB)
                                    +- Relation p_soj_cl_t.checkout_metric_item[item_id#84639,transaction_id#84640,auct_end_dt#84641,item_site_id#84642,trans_site_id#84643,auct_type_code#84644,leaf_categ_id#84645,seller_id#84646,buyer_id#84647,seller_country_id#84648,buyer_country_id#84649,created_dt#84650,created_time#84651,item_price#84652,quantity#84653,lstg_curncy_exchng_rate#84654,lstg_curncy_id#84655,ck_wacko_yn#84656,variation_id#84657,version_id#84658,app_id#84659,format_flags64#84660L,auct_start_dt#84661,leaf_categ_id2#84662,... 51 more fields] parquet, Statistics(sizeInBytes=318.1 TiB)
```

* Fix tpcds q24a DecimalAggregates issue:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.DecimalAggregates ===
 Subquery false                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           Subquery false
 +- Aggregate [CheckOverflow((0.050000 * promote_precision(avg(netpaid#166))), DecimalType(24,8), true) AS (CAST(0.05 AS DECIMAL(21,6)) * CAST(avg(netpaid) AS DECIMAL(21,6)))#176]                                                                                                                                                                                                                                                                                                                                       +- Aggregate [CheckOverflow((0.050000 * promote_precision(avg(netpaid#166))), DecimalType(24,8), true) AS (CAST(0.05 AS DECIMAL(21,6)) * CAST(avg(netpaid) AS DECIMAL(21,6)))#176]
!   +- Aggregate [c_last_name#121, c_first_name#120, s_store_name#66, ca_state#138, s_state#85, i_color#107, i_current_price#95, i_manager_id#110, i_units#108, i_size#105], [sum(ss_net_paid#37, None) AS netpaid#166]                                                                                                                                                                                                                                                                                                      +- Aggregate [c_last_name#121, c_first_name#120, s_store_name#66, ca_state#138, s_state#85, i_color#107, i_current_price#95, i_manager_id#110, i_units#108, i_size#105], [MakeDecimal(sum(UnscaledValue(ss_net_paid#37), None),17,2) AS netpaid#166]
       +- Project [ss_net_paid#37, s_store_name#66, s_state#85, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110, c_first_name#120, c_last_name#121, ca_state#138]                                                                                                                                                                                                                                                                                                                                    +- Project [ss_net_paid#37, s_store_name#66, s_state#85, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110, c_first_name#120, c_last_name#121, ca_state#138]
          +- Join Inner, ((s_zip#86 = ca_zip#139) AND (c_birth_country#126 = upper(ca_country#140)))                                                                                                                                                                                                                                                                                                                                                                                                                               +- Join Inner, ((s_zip#86 = ca_zip#139) AND (c_birth_country#126 = upper(ca_country#140)))
             :- Project [s_store_name#66, s_state#85, s_zip#86, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]                                                                                                                                                                                                                                                                                                                   :- Project [s_store_name#66, s_state#85, s_zip#86, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]
             :  +- Join Inner, ((ss_item_sk#19 = sr_item_sk#42) AND (ss_ticket_number#26L = sr_ticket_number#49L))                                                                                                                                                                                                                                                                                                                                                                                                                    :  +- Join Inner, ((ss_item_sk#19 = sr_item_sk#42) AND (ss_ticket_number#26L = sr_ticket_number#49L))
             :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_ticket_number#26L, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]                                                                                                                                                                                                                                                                        :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_ticket_number#26L, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]
             :     :  +- Join Inner, (ss_item_sk#19 = i_item_sk#90)                                                                                                                                                                                                                                                                                                                                                                                                                                                                   :     :  +- Join Inner, (ss_item_sk#19 = i_item_sk#90)
             :     :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_ticket_number#26L, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126]                                                                                                                                                                                                                                                                                                                                              :     :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_ticket_number#26L, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126]
             :     :     :  +- Join Inner, (ss_customer_sk#20 = c_customer_sk#112)                                                                                                                                                                                                                                                                                                                                                                                                                                                    :     :     :  +- Join Inner, (ss_customer_sk#20 = c_customer_sk#112)
             :     :     :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_customer_sk#20, ss_ticket_number#26L, ss_net_paid#37]                                                                                                                                                                                                                                                                                                                                                                             :     :     :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_customer_sk#20, ss_ticket_number#26L, ss_net_paid#37]
             :     :     :     :  +- Join Inner, (ss_store_sk#24 = s_store_sk#61)                                                                                                                                                                                                                                                                                                                                                                                                                                                     :     :     :     :  +- Join Inner, (ss_store_sk#24 = s_store_sk#61)
             :     :     :     :     :- Project [s_store_sk#61, s_store_name#66, s_state#85, s_zip#86]                                                                                                                                                                                                                                                                                                                                                                                                                                :     :     :     :     :- Project [s_store_sk#61, s_store_name#66, s_state#85, s_zip#86]
             :     :     :     :     :  +- Filter ((((s_market_id#71 = 8) AND isnotnull(s_market_id#71)) AND isnotnull(s_zip#86)) AND isnotnull(s_store_sk#61))                                                                                                                                                                                                                                                                                                                                                                       :     :     :     :     :  +- Filter ((((s_market_id#71 = 8) AND isnotnull(s_market_id#71)) AND isnotnull(s_zip#86)) AND isnotnull(s_store_sk#61))
             :     :     :     :     :     +- Relation hermes_tpcds5t.store[s_store_sk#61,s_store_id#62,s_rec_start_date#63,s_rec_end_date#64,s_closed_date_sk#65,s_store_name#66,s_number_employees#67,s_floor_space#68,s_hours#69,s_manager#70,s_market_id#71,s_geography_class#72,s_market_desc#73,s_market_manager#74,s_division_id#75,s_division_name#76,s_company_id#77,s_company_name#78,s_street_number#79,s_street_name#80,s_street_type#81,s_suite_number#82,s_city#83,s_county#84,... 5 more fields] parquet               :     :     :     :     :     +- Relation hermes_tpcds5t.store[s_store_sk#61,s_store_id#62,s_rec_start_date#63,s_rec_end_date#64,s_closed_date_sk#65,s_store_name#66,s_number_employees#67,s_floor_space#68,s_hours#69,s_manager#70,s_market_id#71,s_geography_class#72,s_market_desc#73,s_market_manager#74,s_division_id#75,s_division_name#76,s_company_id#77,s_company_name#78,s_street_number#79,s_street_name#80,s_street_type#81,s_suite_number#82,s_city#83,s_county#84,... 5 more fields] parquet
             :     :     :     :     +- Project [ss_item_sk#19, ss_customer_sk#20, ss_store_sk#24, ss_ticket_number#26L, ss_net_paid#37]                                                                                                                                                                                                                                                                                                                                                                                              :     :     :     :     +- Project [ss_item_sk#19, ss_customer_sk#20, ss_store_sk#24, ss_ticket_number#26L, ss_net_paid#37]
             :     :     :     :        +- Filter (((isnotnull(ss_customer_sk#20) AND isnotnull(ss_store_sk#24)) AND isnotnull(ss_ticket_number#26L)) AND isnotnull(ss_item_sk#19))                                                                                                                                                                                                                                                                                                                                                   :     :     :     :        +- Filter (((isnotnull(ss_customer_sk#20) AND isnotnull(ss_store_sk#24)) AND isnotnull(ss_ticket_number#26L)) AND isnotnull(ss_item_sk#19))
             :     :     :     :           +- Relation hermes_tpcds5t.store_sales[ss_sold_time_sk#18,ss_item_sk#19,ss_customer_sk#20,ss_cdemo_sk#21,ss_hdemo_sk#22,ss_addr_sk#23,ss_store_sk#24,ss_promo_sk#25,ss_ticket_number#26L,ss_quantity#27,ss_wholesale_cost#28,ss_list_price#29,ss_sales_price#30,ss_ext_discount_amt#31,ss_ext_sales_price#32,ss_ext_wholesale_cost#33,ss_ext_list_price#34,ss_ext_tax#35,ss_coupon_amt#36,ss_net_paid#37,ss_net_paid_inc_tax#38,ss_net_profit#39,ss_sold_date_sk#40] parquet               :     :     :     :           +- Relation hermes_tpcds5t.store_sales[ss_sold_time_sk#18,ss_item_sk#19,ss_customer_sk#20,ss_cdemo_sk#21,ss_hdemo_sk#22,ss_addr_sk#23,ss_store_sk#24,ss_promo_sk#25,ss_ticket_number#26L,ss_quantity#27,ss_wholesale_cost#28,ss_list_price#29,ss_sales_price#30,ss_ext_discount_amt#31,ss_ext_sales_price#32,ss_ext_wholesale_cost#33,ss_ext_list_price#34,ss_ext_tax#35,ss_coupon_amt#36,ss_net_paid#37,ss_net_paid_inc_tax#38,ss_net_profit#39,ss_sold_date_sk#40] parquet
             :     :     :     +- Project [c_customer_sk#112, c_first_name#120, c_last_name#121, c_birth_country#126]                                                                                                                                                                                                                                                                                                                                                                                                                 :     :     :     +- Project [c_customer_sk#112, c_first_name#120, c_last_name#121, c_birth_country#126]
             :     :     :        +- Filter (isnotnull(c_birth_country#126) AND isnotnull(c_customer_sk#112))                                                                                                                                                                                                                                                                                                                                                                                                                         :     :     :        +- Filter (isnotnull(c_birth_country#126) AND isnotnull(c_customer_sk#112))
             :     :     :           +- Relation hermes_tpcds5t.customer[c_customer_sk#112,c_customer_id#113,c_current_cdemo_sk#114,c_current_hdemo_sk#115,c_current_addr_sk#116,c_first_shipto_date_sk#117,c_first_sales_date_sk#118,c_salutation#119,c_first_name#120,c_last_name#121,c_preferred_cust_flag#122,c_birth_day#123,c_birth_month#124,c_birth_year#125,c_birth_country#126,c_login#127,c_email_address#128,c_last_review_date#129] parquet                                                                              :     :     :           +- Relation hermes_tpcds5t.customer[c_customer_sk#112,c_customer_id#113,c_current_cdemo_sk#114,c_current_hdemo_sk#115,c_current_addr_sk#116,c_first_shipto_date_sk#117,c_first_sales_date_sk#118,c_salutation#119,c_first_name#120,c_last_name#121,c_preferred_cust_flag#122,c_birth_day#123,c_birth_month#124,c_birth_year#125,c_birth_country#126,c_login#127,c_email_address#128,c_last_review_date#129] parquet
             :     :     +- Project [i_item_sk#90, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]                                                                                                                                                                                                                                                                                                                                                                                                        :     :     +- Project [i_item_sk#90, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]
             :     :        +- Filter isnotnull(i_item_sk#90)                                                                                                                                                                                                                                                                                                                                                                                                                                                                         :     :        +- Filter isnotnull(i_item_sk#90)
             :     :           +- Relation hermes_tpcds5t.item[i_item_sk#90,i_item_id#91,i_rec_start_date#92,i_rec_end_date#93,i_item_desc#94,i_current_price#95,i_wholesale_cost#96,i_brand_id#97,i_brand#98,i_class_id#99,i_class#100,i_category_id#101,i_category#102,i_manufact_id#103,i_manufact#104,i_size#105,i_formulation#106,i_color#107,i_units#108,i_container#109,i_manager_id#110,i_product_name#111] parquet                                                                                                           :     :           +- Relation hermes_tpcds5t.item[i_item_sk#90,i_item_id#91,i_rec_start_date#92,i_rec_end_date#93,i_item_desc#94,i_current_price#95,i_wholesale_cost#96,i_brand_id#97,i_brand#98,i_class_id#99,i_class#100,i_category_id#101,i_category#102,i_manufact_id#103,i_manufact#104,i_size#105,i_formulation#106,i_color#107,i_units#108,i_container#109,i_manager_id#110,i_product_name#111] parquet
             :     +- Project [sr_item_sk#42, sr_ticket_number#49L]                                                                                                                                                                                                                                                                                                                                                                                                                                                                   :     +- Project [sr_item_sk#42, sr_ticket_number#49L]
             :        +- Filter (isnotnull(sr_ticket_number#49L) AND isnotnull(sr_item_sk#42))                                                                                                                                                                                                                                                                                                                                                                                                                                        :        +- Filter (isnotnull(sr_ticket_number#49L) AND isnotnull(sr_item_sk#42))
             :           +- Relation hermes_tpcds5t.store_returns[sr_return_time_sk#41,sr_item_sk#42,sr_customer_sk#43,sr_cdemo_sk#44,sr_hdemo_sk#45,sr_addr_sk#46,sr_store_sk#47,sr_reason_sk#48,sr_ticket_number#49L,sr_return_quantity#50,sr_return_amt#51,sr_return_tax#52,sr_return_amt_inc_tax#53,sr_fee#54,sr_return_ship_cost#55,sr_refunded_cash#56,sr_reversed_charge#57,sr_store_credit#58,sr_net_loss#59,sr_returned_date_sk#60] parquet                                                                                  :           +- Relation hermes_tpcds5t.store_returns[sr_return_time_sk#41,sr_item_sk#42,sr_customer_sk#43,sr_cdemo_sk#44,sr_hdemo_sk#45,sr_addr_sk#46,sr_store_sk#47,sr_reason_sk#48,sr_ticket_number#49L,sr_return_quantity#50,sr_return_amt#51,sr_return_tax#52,sr_return_amt_inc_tax#53,sr_fee#54,sr_return_ship_cost#55,sr_refunded_cash#56,sr_reversed_charge#57,sr_store_credit#58,sr_net_loss#59,sr_returned_date_sk#60] parquet
             +- Project [ca_state#138, ca_zip#139, ca_country#140]                                                                                                                                                                                                                                                                                                                                                                                                                                                                    +- Project [ca_state#138, ca_zip#139, ca_country#140]
                +- Filter (isnotnull(ca_country#140) AND isnotnull(ca_zip#139))                                                                                                                                                                                                                                                                                                                                                                                                                                                          +- Filter (isnotnull(ca_country#140) AND isnotnull(ca_zip#139))
                   +- Relation hermes_tpcds5t.customer_address[ca_address_sk#130,ca_address_id#131,ca_street_number#132,ca_street_name#133,ca_street_type#134,ca_suite_number#135,ca_city#136,ca_county#137,ca_state#138,ca_zip#139,ca_country#140,ca_gmt_offset#141,ca_location_type#142] parquet                                                                                                                                                                                                                                          +- Relation hermes_tpcds5t.customer_address[ca_address_sk#130,ca_address_id#131,ca_street_number#132,ca_street_name#133,ca_street_type#134,ca_suite_number#135,ca_city#136,ca_county#137,ca_state#138,ca_zip#139,ca_country#140,ca_gmt_offset#141,ca_location_type#142] parquet

```

* 1. Fix tpcds q82 can't add runtime filter
2. Fix Statistics issue

* Fix a bug

* Support avg

* 1. Support push down if AggregateExpression contains complex expressions
2. Deduplicate and reorder aggregate expressions to find more ReuseExchanges

* Fix TPC-DS v2.7 q57 and q67a can't re-use exchange issue:
```scala
class TPCDSV2_7_PlanStabilityWithStatsSuite extends PlanStabilitySuite with TPCDSBase {
  override def injectStats: Boolean = true

  override val goldenFilePath: String =
    new File(baseResourcePath, s"approved-plans-v2_7").getAbsolutePath

  Seq(
    // "q5a", "q6", "q10a", "q11", "q12", "q14", "q14a",
    // "q18a",
    "q51a",
    "q57",
    "q67a").foreach { q =>
    test(s"check simplified sf100 (tpcds-v2.7.0/$q)") {
      println(s"=================${q}")
      testQuery("tpcds-v2.7.0", q, ".sf100")
    }
  }

//  test("check simplified sf100 (tpcds-v2.7.0/)") {
//    testQuery("tpcds-v2.7.0", "q57", ".sf100")
//  }
}
```

* Fix bbensid2 q12 java.lang.RuntimeException: Couldn't find date_confirm#25512

```
java.sql.SQLException: Error running query: java.lang.RuntimeException: Couldn't find date_confirm#25512 in sum#31527L,sum#31528L,count#31529L,sum#31530L,user_site_id#25498,half_origin_user#31347,_groupingexpression#31516,_groupingexpression#31517,_groupingexpression#31518,_groupingexpression#31519,pushed_count(user_id#25495)#31520L,pushed_count(date_confirm#25512)#31521L,pushed_sum(CASE WHEN CASE WHEN (flagsex6#25555 = -999) THEN false ELSE (((cast(flagsex6#25555 as bigint) & 65536) >= 1) <=> true) END THEN 1 ELSE 0 END, None)#31522L,site_name#30052,cnt#31525L
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:297)
at org.apache.hive.jdbc.HiveStatement.executeQuery(HiveStatement.java:392)
at com.ebay.carmel.spark.BenchmarkAndVerifyResult$.$anonfun$main$1(BenchmarkAndVerifyResult.scala:162)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at com.ebay.carmel.spark.BenchmarkAndVerifyResult$.main(BenchmarkAndVerifyResult.scala:146)
at com.ebay.carmel.spark.BenchmarkAndVerifyResult.main(BenchmarkAndVerifyResult.scala)
```

https://jirap.corp.ebay.com/browse/CARMEL-5966

* Merge code from Apache Spark

* Split Aggregate to Partial Agg and Final Agg.

* Enhance supportPushedAgg to do not downgrade tpcds q4 performance

* Do not downgrade bbendis 367 performance:
```
MAX(CASE WHEN sojlib.soj_extract_flag(sojlib.soj_nvl(e.soj, 'cflgs'), 15) = 1 THEN 1 ELSE 0 END) AS gbh_yn,
```

* Do not push if it is contains count distinct

* Simplify the code

* Fix bug

* Fix: org.apache.spark.sql.hive.execution.ObjectHashAggregateSuite.randomized aggregation test - [with distinct] - without grouping keys - with empty input

Error Message
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c3#7123
Stacktrace
sbt.ForkMain$ForkError: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c3#7123
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:75)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:324)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:324)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:329)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:414)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:252)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:412)

* Simplify the code

* Add config spark.sql.optimizer.partialAggregationOptimization.enabled

* Fix test

* Add tests

* Add partialAggregationOptimization.benefitRatio and partialAggregationOptimization.fallbackReductionRatio

* Port [SPARK-39248][SQL] Improve divide performance for decimal type

* fix

* Support sum(1)

* Aggregate expression's references from Alias

* sync code

* fix test

* fix

* Fix avg data issue

* Fix test error

* fix

* 1. Only push down if has benefit
2. Introduce FinalAggregate

* Fix
  • Loading branch information
wangyum authored and GitHub Enterprise committed Aug 19, 2022
1 parent e12e8d3 commit 6dd2e0b
Show file tree
Hide file tree
Showing 44 changed files with 1,975 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3729,9 +3729,12 @@ object CleanupAliases extends Rule[LogicalPlan] with AliasHelper {
val cleanedProjectList = projectList.map(trimNonTopLevelAliases)
Project(cleanedProjectList, child)

case Aggregate(grouping, aggs, child) =>
val cleanedAggs = aggs.map(trimNonTopLevelAliases)
Aggregate(grouping.map(trimAliases), cleanedAggs, child)
case a: AggregateBase =>
val cleanedGroupings = a.groupingExpressions.map(trimAliases)
val cleanedAggs = a.aggregateExpressions.map(trimNonTopLevelAliases)
a.withGroupingExpressions(cleanedGroupings)
.withAggregateExpressions(cleanedAggs)
.withNewChildren(Seq(a.child))

case Window(windowExprs, partitionSpec, orderSpec, limit, child) =>
val cleanedWindowExprs = windowExprs.map(trimNonTopLevelAliases)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ object TypeCoercion {
p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType)))

case Abs(e @ StringType()) => Abs(Cast(e, DoubleType))
case Sum(e @ StringType()) => Sum(Cast(e, DoubleType))
case Sum(e @ StringType(), None) => Sum(Cast(e, DoubleType))
case Average(e @ StringType()) => Average(Cast(e, DoubleType))
case StddevPop(e @ StringType()) => StddevPop(Cast(e, DoubleType))
case StddevSamp(e @ StringType()) => StddevSamp(Cast(e, DoubleType))
Expand Down Expand Up @@ -628,7 +628,7 @@ object TypeCoercion {
m.copy(newKeys.zip(newValues).flatMap { case (k, v) => Seq(k, v) })

// Hive lets you do aggregation of timestamps... for some reason
case Sum(e @ TimestampType()) => Sum(Cast(e, DoubleType))
case Sum(e @ TimestampType(), None) => Sum(Cast(e, DoubleType))
case Average(e @ TimestampType()) => Average(Cast(e, DoubleType))

// Coalesce should return the first non-null value, which could be any column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.analysis.MultiAlias
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project}
import org.apache.spark.sql.catalyst.plans.logical.{AggregateBase, Project}

/**
* Helper methods for collecting and replacing aliases.
Expand All @@ -32,7 +32,7 @@ trait AliasHelper {
getAliasMap(plan.projectList)
}

protected def getAliasMap(plan: Aggregate): AttributeMap[Alias] = {
protected def getAliasMap(plan: AggregateBase): AttributeMap[Alias] = {
// Find all the aliased expressions in the aggregate list that don't include any actual
// AggregateExpression or PythonUDF, and create a map from the alias to the expression
val aliasMap = plan.aggregateExpressions.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ case class Average(child: Expression) extends DeclarativeAggregate with Implicit
case _ => DoubleType
}

private lazy val sumDataType = child.dataType match {
private[sql] lazy val sumDataType = child.dataType match {
case _ @ DecimalType.Fixed(p, s) => DecimalType.bounded(p + 10, s)
case _ => DoubleType
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ import org.apache.spark.sql.types._
""",
group = "agg_funcs",
since = "1.0.0")
case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCastInputTypes {
case class Sum(child: Expression, resultDataType: Option[DataType] = None)
extends DeclarativeAggregate with ImplicitCastInputTypes {

def this(child: Expression) = {
this(child, None)
}

override def children: Seq[Expression] = child :: Nil

Expand All @@ -51,12 +56,13 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast
override def checkInputDataTypes(): TypeCheckResult =
TypeUtils.checkForNumericExpr(child.dataType, "function sum")

private lazy val resultType = child.dataType match {
case DecimalType.Fixed(precision, scale) =>
DecimalType.bounded(precision + 10, scale)
case _: IntegralType => LongType
case _ => DoubleType
}
private lazy val resultType = resultDataType.getOrElse(
child.dataType match {
case DecimalType.Fixed(precision, scale) => DecimalType.bounded(precision + 10, scale)
case _: IntegralType => LongType
case _ => DoubleType
}
)

private lazy val sum = AttributeReference("sum", resultType)()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ trait PredicateHelper extends AliasHelper with Logging {
val aliases = getAliasMap(p)
findExpressionAndTrackLineageDown(replaceAlias(exp, aliases), p.child)
// we can unwrap only if there are row projections, and no aggregation operation
case a: Aggregate =>
case a: AggregateBase =>
val aliasMap = getAliasMap(a)
findExpressionAndTrackLineageDown(replaceAlias(exp, aliasMap), a.child)
case l: LeafNode if exp.references.subsetOf(l.outputSet) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.LeftSemiOrAnti
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule

/**
* Deduplicate the right side of left semi/anti join.
*/
object DeduplicateRightSideOfLeftSemiAntiJoin extends Rule[LogicalPlan] with JoinSelectionHelper {

def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.partialAggregationOptimizationEnabled) {
plan.transform {
case j @ Join(_, _: AggregateBase, LeftSemiOrAnti(_), _, _) =>
j
case j @ Join(_, right, LeftSemiOrAnti(_), _, _)
if PushPartialAggregationThroughJoin.pushPartialAggHasBenefit(right.output, right) =>
j.copy(right = PartialAggregate(right.output, right.output, right))
}
} else {
plan
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
// aggregate distinct column
Batch("Distinct Aggregate Rewrite", Once,
RewriteDistinctAggregates) :+
Batch("Partial Aggregation Optimization", fixedPoint,
PushPartialAggregationThroughJoin,
DeduplicateRightSideOfLeftSemiAntiJoin) :+
Batch("Object Expressions Optimization", fixedPoint,
EliminateMapObjects,
CombineTypedFilters,
Expand All @@ -224,6 +227,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
LimitPushDown,
ColumnPruning,
CollapseProject,
DeduplicateRightSideOfLeftSemiAntiJoin,
RemoveNoopOperators) :+
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)
Expand Down Expand Up @@ -632,9 +636,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
// Prunes the unused columns from project list of Project/Aggregate/Expand
case p @ Project(_, p2: Project) if !p2.outputSet.subsetOf(p.references) =>
p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains)))
case p @ Project(_, a: Aggregate) if !a.outputSet.subsetOf(p.references) =>
p.copy(
child = a.copy(aggregateExpressions = a.aggregateExpressions.filter(p.references.contains)))
case p @ Project(_, a: AggregateBase) if !a.outputSet.subsetOf(p.references) =>
val newAggregateExpressions = a.aggregateExpressions.filter(p.references.contains)
p.copy(child = a.withAggregateExpressions(newAggregateExpressions))
case a @ Project(_, e @ Expand(_, _, grandChild)) if !e.outputSet.subsetOf(a.references) =>
val newOutput = e.output.filter(a.references.contains(_))
val newProjects = e.projections.map { proj =>
Expand All @@ -649,8 +653,8 @@ object ColumnPruning extends Rule[LogicalPlan] {
d.copy(child = prunedChild(child, d.references))

// Prunes the unused columns from child of Aggregate/Expand/Generate/ScriptTransformation
case a @ Aggregate(_, _, child) if !child.outputSet.subsetOf(a.references) =>
a.copy(child = prunedChild(child, a.references))
case a: AggregateBase if !a.child.outputSet.subsetOf(a.references) =>
a.withNewChildren(Seq(prunedChild(a.child, a.references)))
case f @ FlatMapGroupsInPandas(_, _, _, child) if !child.outputSet.subsetOf(f.references) =>
f.copy(child = prunedChild(child, f.references))
case e @ Expand(_, _, child) if !child.outputSet.subsetOf(e.references) =>
Expand Down Expand Up @@ -738,7 +742,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
})

/** Applies a projection only when the child is producing unnecessary attributes */
private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) =
def prunedChild(c: LogicalPlan, allReferences: AttributeSet): LogicalPlan =
if (!c.outputSet.subsetOf(allReferences)) {
Project(c.output.filter(allReferences.contains), c)
} else {
Expand Down Expand Up @@ -776,12 +780,12 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper {
} else {
p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList))
}
case p @ Project(_, agg: Aggregate) =>
case p @ Project(_, agg: AggregateBase) =>
if (haveCommonNonDeterministicOutput(p.projectList, agg.aggregateExpressions)) {
p
} else {
agg.copy(aggregateExpressions = buildCleanedProjectList(
p.projectList, agg.aggregateExpressions))
val newAggregateExprs = buildCleanedProjectList(p.projectList, agg.aggregateExpressions)
agg.withAggregateExpressions(newAggregateExprs)
}
case Project(l1, g @ GlobalLimit(_, limit @ LocalLimit(_, p2 @ Project(l2, _))))
if isRenaming(l1, l2) =>
Expand Down Expand Up @@ -1377,7 +1381,7 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe
val aliasMap = getAliasMap(project)
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))

case filter @ Filter(condition, aggregate: Aggregate)
case filter @ Filter(condition, aggregate: AggregateBase)
if aggregate.aggregateExpressions.forall(_.deterministic)
&& aggregate.groupingExpressions.nonEmpty =>
val aliasMap = getAliasMap(aggregate)
Expand All @@ -1397,7 +1401,7 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe
if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val replaced = replaceAlias(pushDownPredicate, aliasMap)
val newAggregate = aggregate.copy(child = Filter(replaced, aggregate.child))
val newAggregate = aggregate.withNewChildren(Seq(Filter(replaced, aggregate.child)))
// If there is no more filter to stay up, just eliminate the filter.
// Otherwise, create "Filter(stayUp) <- Aggregate <- Filter(pushDownPredicate)".
if (stayUp.isEmpty) newAggregate else Filter(stayUp.reduce(And), newAggregate)
Expand Down Expand Up @@ -1777,7 +1781,7 @@ object DecimalAggregates extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsDown {
case we @ WindowExpression(ae @ AggregateExpression(af, _, _, _, _), _) => af match {
case Sum(e @ DecimalType.Expression(prec, scale)) if prec + 10 <= MAX_LONG_DIGITS =>
case Sum(e @ DecimalType.Expression(prec, scale), None) if prec + 10 <= MAX_LONG_DIGITS =>
MakeDecimal(we.copy(windowFunction = ae.copy(aggregateFunction = Sum(UnscaledValue(e)))),
prec + 10, scale)

Expand All @@ -1791,7 +1795,7 @@ object DecimalAggregates extends Rule[LogicalPlan] {
case _ => we
}
case ae @ AggregateExpression(af, _, _, _, _) => af match {
case Sum(e @ DecimalType.Expression(prec, scale)) if prec + 10 <= MAX_LONG_DIGITS =>
case Sum(e @ DecimalType.Expression(prec, scale), None) if prec + 10 <= MAX_LONG_DIGITS =>
MakeDecimal(ae.copy(aggregateFunction = Sum(UnscaledValue(e))), prec + 10, scale)

case Average(e @ DecimalType.Expression(prec, scale)) if prec + 4 <= MAX_DOUBLE_DIGITS =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
import org.apache.spark.sql.catalyst.rules.Rule
Expand All @@ -43,6 +44,7 @@ object PredicateReorder extends Rule[LogicalPlan] with PredicateHelper {
private val QUATERNARY_OP_COST = 4.0D
private val SEPTENARY_OP_COST = 7.0D
private val COMPLEX_TYPE_MERGING_OP_COST = 10.0D
private val AGGREGATE_OP_COST = 15.0D
private val USER_DEFINED_OP_COST = 20.0D
private val UNKNOWN_COST = 30.0D

Expand All @@ -65,7 +67,7 @@ object PredicateReorder extends Rule[LogicalPlan] with PredicateHelper {

// The cost of a call expression exp is computed as:
// cost(exp) = typeSize + functionCost + cost(children).
private def expressionCost(exp: Expression): Double = exp match {
def expressionCost(exp: Expression): Double = exp match {
case e: Expression if e.children.isEmpty =>
e.dataType.defaultSize
case e: IsNull =>
Expand Down Expand Up @@ -104,6 +106,8 @@ object PredicateReorder extends Rule[LogicalPlan] with PredicateHelper {
e.dataType.defaultSize + SEPTENARY_OP_COST + e.children.map(expressionCost).sum
case e: ComplexTypeMergingExpression =>
e.dataType.defaultSize + COMPLEX_TYPE_MERGING_OP_COST + e.children.map(expressionCost).sum
case e: DeclarativeAggregate =>
e.dataType.defaultSize + AGGREGATE_OP_COST + e.children.map(expressionCost).sum
case e: UserDefinedExpression =>
e.dataType.defaultSize + USER_DEFINED_OP_COST + e.children.map(expressionCost).sum
case e =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
// Aggregation on empty LocalRelation generated from a streaming source is not eliminated
// as stateful streaming aggregation need to perform other state management operations other
// than just processing the input data.
case Aggregate(ge, _, _) if ge.nonEmpty && !p.isStreaming => empty(p)
case a: AggregateBase if a.groupingExpressions.nonEmpty && !p.isStreaming => empty(p)
// Generators like Hive-style UDTF may return their records within `close`.
case Generate(_: Explode, _, _, _, _, _) => empty(p)
case Expand(_, _, _) => empty(p)
Expand Down
Loading

0 comments on commit 6dd2e0b

Please sign in to comment.