diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14.plan.txt index b9a3022c5eae..634bc09ffa5d 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14.plan.txt @@ -24,10 +24,8 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["i_item_sk_9"]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["i_brand_id_16", "i_category_id_20", "i_class_id_18"]) - scan item final aggregation over (brand_id, category_id, class_id) - local exchange (GATHER, SINGLE, []) + local exchange (REPARTITION, HASH, ["brand_id", "category_id", "class_id"]) remote exchange (REPARTITION, HASH, ["i_brand_id_64", "i_category_id_68", "i_class_id_66"]) partial aggregation over (i_brand_id_64, i_category_id_68, i_class_id_66) join (INNER, REPLICATED): @@ -61,6 +59,9 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["i_brand_id_16", "i_category_id_20", "i_class_id_18"]) + scan item local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) final aggregation over () @@ -103,10 +104,8 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["i_item_sk_473"]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["i_brand_id_480", "i_category_id_484", "i_class_id_482"]) - scan item final aggregation over (brand_id_495, category_id_497, class_id_496) - local exchange (GATHER, SINGLE, []) + local exchange (REPARTITION, HASH, ["brand_id_495", "category_id_497", "class_id_496"]) remote exchange (REPARTITION, HASH, ["i_brand_id_531", "i_category_id_535", "i_class_id_533"]) partial aggregation over (i_brand_id_531, i_category_id_535, i_class_id_533) join (INNER, REPLICATED): @@ -140,6 +139,9 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["i_brand_id_480", "i_category_id_484", "i_class_id_482"]) + scan item local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) final aggregation over () @@ -182,10 +184,8 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["i_item_sk_1015"]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["i_brand_id_1022", "i_category_id_1026", "i_class_id_1024"]) - scan item final aggregation over (brand_id_1037, category_id_1039, class_id_1038) - local exchange (GATHER, SINGLE, []) + local exchange (REPARTITION, HASH, ["brand_id_1037", "category_id_1039", "class_id_1038"]) remote exchange (REPARTITION, HASH, ["i_brand_id_1073", "i_category_id_1077", "i_class_id_1075"]) partial aggregation over (i_brand_id_1073, i_category_id_1077, i_class_id_1075) join (INNER, REPLICATED): @@ -219,6 +219,9 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["i_brand_id_1022", "i_category_id_1026", "i_class_id_1024"]) + scan item local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) final aggregation over () diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q44.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q44.plan.txt index a4e72f72e449..39119d45388d 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q44.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q44.plan.txt @@ -1,50 +1,48 @@ local exchange (GATHER, SINGLE, []) remote exchange (GATHER, SINGLE, []) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_item_sk_26"]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_item_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["rank"]) - local exchange (GATHER, SINGLE, []) - remote exchange (GATHER, SINGLE, []) - cross join: - final aggregation over (ss_item_sk) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ss_item_sk"]) - partial aggregation over (ss_item_sk) - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - local exchange (GATHER, SINGLE, []) - remote exchange (GATHER, SINGLE, []) - final aggregation over (ss_store_sk_7) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ss_store_sk_7"]) - partial aggregation over (ss_store_sk_7) - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["rank_74"]) + remote exchange (REPARTITION, HASH, ["rank"]) + local exchange (GATHER, SINGLE, []) + remote exchange (GATHER, SINGLE, []) + cross join: + final aggregation over (ss_item_sk) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ss_item_sk"]) + partial aggregation over (ss_item_sk) + scan store_sales local exchange (GATHER, SINGLE, []) - remote exchange (GATHER, SINGLE, []) - cross join: - final aggregation over (ss_item_sk_26) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ss_item_sk_26"]) - partial aggregation over (ss_item_sk_26) - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) + remote exchange (REPLICATE, BROADCAST, []) + local exchange (GATHER, SINGLE, []) + remote exchange (GATHER, SINGLE, []) + final aggregation over (ss_store_sk_7) local exchange (GATHER, SINGLE, []) - remote exchange (GATHER, SINGLE, []) - final aggregation over (ss_store_sk_56) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ss_store_sk_56"]) - partial aggregation over (ss_store_sk_56) - scan store_sales + remote exchange (REPARTITION, HASH, ["ss_store_sk_7"]) + partial aggregation over (ss_store_sk_7) + scan store_sales local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["i_item_sk"]) - scan item + remote exchange (REPARTITION, HASH, ["rank_74"]) + local exchange (GATHER, SINGLE, []) + remote exchange (GATHER, SINGLE, []) + cross join: + final aggregation over (ss_item_sk_26) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ss_item_sk_26"]) + partial aggregation over (ss_item_sk_26) + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + local exchange (GATHER, SINGLE, []) + remote exchange (GATHER, SINGLE, []) + final aggregation over (ss_store_sk_56) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ss_store_sk_56"]) + partial aggregation over (ss_store_sk_56) + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan item local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["i_item_sk_75"]) + remote exchange (REPLICATE, BROADCAST, []) scan item diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q64.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q64.plan.txt index df098aec97f5..5338a1af5248 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q64.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q64.plan.txt @@ -2,182 +2,163 @@ remote exchange (GATHER, SINGLE, []) local exchange (GATHER, UNKNOWN, []) remote exchange (REPARTITION, ROUND_ROBIN, []) join (INNER, PARTITIONED): - final aggregation over (ca_city, ca_city_77, ca_street_name, ca_street_name_74, ca_street_number, ca_street_number_73, ca_zip, ca_zip_80, d_year, d_year_35, d_year_7, i_product_name, s_store_name, s_zip, ss_item_sk) - local exchange (GATHER, SINGLE, []) - partial aggregation over (ca_city, ca_city_77, ca_street_name, ca_street_name_74, ca_street_number, ca_street_number_73, ca_zip, ca_zip_80, d_year, d_year_35, d_year_7, i_product_name, s_store_name, s_zip, ss_item_sk) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_item_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["hd_income_band_sk_67"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["hd_income_band_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["c_current_addr_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_addr_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["c_current_hdemo_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_hdemo_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_promo_sk"]) + remote exchange (REPARTITION, HASH, ["s_store_name", "s_zip", "ss_item_sk"]) + final aggregation over (ca_city, ca_city_77, ca_street_name, ca_street_name_74, ca_street_number, ca_street_number_73, ca_zip, ca_zip_80, d_year, d_year_35, d_year_7, i_product_name, s_store_name, s_zip, ss_item_sk) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ca_city", "ca_city_77", "ca_street_name", "ca_street_name_74", "ca_street_number", "ca_street_number_73", "ca_zip", "ca_zip_80", "d_year", "d_year_35", "d_year_7", "i_product_name", "s_store_name", "s_zip", "ss_item_sk"]) + partial aggregation over (ca_city, ca_city_77, ca_street_name, ca_street_name_74, ca_street_number, ca_street_number_73, ca_zip, ca_zip_80, d_year, d_year_35, d_year_7, i_product_name, s_store_name, s_zip, ss_item_sk) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["c_current_addr_sk"]) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["ss_addr_sk"]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["c_current_cdemo_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_cdemo_sk"]) + remote exchange (REPARTITION, HASH, ["ss_customer_sk"]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["c_first_shipto_date_sk"]) + remote exchange (REPARTITION, HASH, ["ss_item_sk"]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["c_first_sales_date_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_customer_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_store_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_sold_date_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_item_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_item_sk", "ss_ticket_number"]) - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["sr_item_sk", "sr_ticket_number"]) - scan store_returns - final aggregation over (cs_item_sk) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["cs_item_sk"]) - partial aggregation over (cs_item_sk) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["cs_item_sk", "cs_order_number"]) - scan catalog_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["cr_item_sk", "cr_order_number"]) - scan catalog_returns - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["d_date_sk"]) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["s_store_sk"]) - scan store - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["c_customer_sk"]) - scan customer + remote exchange (REPARTITION, HASH, ["ss_item_sk", "ss_ticket_number"]) + scan store_sales local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["d_date_sk_1"]) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["d_date_sk_29"]) - scan date_dim + remote exchange (REPARTITION, HASH, ["sr_item_sk", "sr_ticket_number"]) + scan store_returns + final aggregation over (cs_item_sk) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["cs_item_sk"]) + partial aggregation over (cs_item_sk) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["cs_item_sk", "cs_order_number"]) + scan catalog_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["cr_item_sk", "cr_order_number"]) + scan catalog_returns + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["cd_demo_sk"]) - scan customer_demographics + remote exchange (REPLICATE, BROADCAST, []) + scan store local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["cd_demo_sk_57"]) - scan customer_demographics + remote exchange (REPARTITION, HASH, ["c_customer_sk"]) + scan customer + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["p_promo_sk"]) - scan promotion + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan customer_demographics local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["hd_demo_sk"]) - scan household_demographics + remote exchange (REPLICATE, BROADCAST, []) + scan customer_demographics + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan promotion local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["hd_demo_sk_66"]) + remote exchange (REPLICATE, BROADCAST, []) scan household_demographics - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ca_address_sk"]) - scan customer_address - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ca_address_sk_71"]) - scan customer_address + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan household_demographics + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ca_address_sk"]) + scan customer_address + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ca_address_sk_71"]) + scan customer_address local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ib_income_band_sk"]) + remote exchange (REPLICATE, BROADCAST, []) scan income_band + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan income_band local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ib_income_band_sk_84"]) - scan income_band - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["i_item_sk"]) - scan item - final aggregation over (ca_city_388, ca_city_401, ca_street_name_385, ca_street_name_398, ca_street_number_384, ca_street_number_397, ca_zip_391, ca_zip_404, d_year_210, d_year_238, d_year_266, i_product_name_435, s_store_name_293, s_zip_313, ss_item_sk_98) - local exchange (GATHER, SINGLE, []) - partial aggregation over (ca_city_388, ca_city_401, ca_street_name_385, ca_street_name_398, ca_street_number_384, ca_street_number_397, ca_zip_391, ca_zip_404, d_year_210, d_year_238, d_year_266, i_product_name_435, s_store_name_293, s_zip_313, ss_item_sk_98) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_item_sk_98"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["hd_income_band_sk_378"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["hd_income_band_sk_373"]) + remote exchange (REPLICATE, BROADCAST, []) + scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["s_store_name_293", "s_zip_313", "ss_item_sk_98"]) + final aggregation over (ca_city_388, ca_city_401, ca_street_name_385, ca_street_name_398, ca_street_number_384, ca_street_number_397, ca_zip_391, ca_zip_404, d_year_210, d_year_238, d_year_266, i_product_name_435, s_store_name_293, s_zip_313, ss_item_sk_98) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ca_city_388", "ca_city_401", "ca_street_name_385", "ca_street_name_398", "ca_street_number_384", "ca_street_number_397", "ca_zip_391", "ca_zip_404", "d_year_210", "d_year_238", "d_year_266", "i_product_name_435", "s_store_name_293", "s_zip_313", "ss_item_sk_98"]) + partial aggregation over (ca_city_388, ca_city_401, ca_street_name_385, ca_street_name_398, ca_street_number_384, ca_street_number_397, ca_zip_391, ca_zip_404, d_year_210, d_year_238, d_year_266, i_product_name_435, s_store_name_293, s_zip_313, ss_item_sk_98) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): join (INNER, PARTITIONED): remote exchange (REPARTITION, HASH, ["c_current_addr_sk_321"]) join (INNER, PARTITIONED): remote exchange (REPARTITION, HASH, ["ss_addr_sk_102"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["c_current_hdemo_sk_320"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_hdemo_sk_101"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_promo_sk_104"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["c_current_cdemo_sk_319"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_cdemo_sk_100"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["c_first_shipto_date_sk_322"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["c_first_sales_date_sk_323"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_customer_sk_99"]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["ss_customer_sk_99"]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["ss_item_sk_98"]) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["ss_item_sk_98", "ss_ticket_number_105"]) + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["sr_item_sk_121", "sr_ticket_number_128"]) + scan store_returns + final aggregation over (cs_item_sk_154) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["cs_item_sk_154"]) + partial aggregation over (cs_item_sk_154) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_store_sk_103"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_sold_date_sk_96"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_item_sk_98"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_item_sk_98", "ss_ticket_number_105"]) - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["sr_item_sk_121", "sr_ticket_number_128"]) - scan store_returns - final aggregation over (cs_item_sk_154) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["cs_item_sk_154"]) - partial aggregation over (cs_item_sk_154) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["cs_item_sk_154", "cs_order_number_156"]) - scan catalog_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["cr_item_sk_175", "cr_order_number_189"]) - scan catalog_returns - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["d_date_sk_204"]) - scan date_dim + remote exchange (REPARTITION, HASH, ["cs_item_sk_154", "cs_order_number_156"]) + scan catalog_sales local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["s_store_sk_288"]) - scan store - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["c_customer_sk_317"]) - scan customer - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["d_date_sk_232"]) - scan date_dim + remote exchange (REPARTITION, HASH, ["cr_item_sk_175", "cr_order_number_189"]) + scan catalog_returns local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["d_date_sk_260"]) + remote exchange (REPLICATE, BROADCAST, []) scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["cd_demo_sk_335"]) - scan customer_demographics + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan store + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["c_customer_sk_317"]) + scan customer local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["cd_demo_sk_344"]) - scan customer_demographics + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["p_promo_sk_353"]) - scan promotion + remote exchange (REPLICATE, BROADCAST, []) + scan customer_demographics + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan customer_demographics local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["hd_demo_sk_372"]) - scan household_demographics + remote exchange (REPLICATE, BROADCAST, []) + scan promotion + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan household_demographics local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["hd_demo_sk_377"]) + remote exchange (REPLICATE, BROADCAST, []) scan household_demographics local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["ca_address_sk_382"]) @@ -185,12 +166,12 @@ remote exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["ca_address_sk_395"]) scan customer_address + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan income_band local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ib_income_band_sk_408"]) + remote exchange (REPLICATE, BROADCAST, []) scan income_band - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ib_income_band_sk_411"]) - scan income_band - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["i_item_sk_414"]) - scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan item diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q09.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q09.plan.txt index 3206f7e6539c..0b4b6051770d 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q09.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q09.plan.txt @@ -5,28 +5,27 @@ remote exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["expr", "name_15"]) partial aggregation over (expr, name_15) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["nationkey"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["orderkey"]) + join (INNER, REPLICATED): + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["orderkey"]) + join (INNER, PARTITIONED): join (INNER, PARTITIONED): - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["suppkey_4"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["partkey"]) - scan part - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["partkey_3"]) - scan lineitem - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["suppkey"]) - scan supplier + remote exchange (REPARTITION, HASH, ["suppkey_4"]) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["partkey"]) + scan part + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["partkey_3"]) + scan lineitem local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["suppkey_8"]) - scan partsupp - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["orderkey_11"]) - scan orders + remote exchange (REPARTITION, HASH, ["suppkey"]) + scan supplier + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["suppkey_8"]) + scan partsupp + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["orderkey_11"]) + scan orders local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["nationkey_14"]) + remote exchange (REPLICATE, BROADCAST, []) scan nation diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q21.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q21.plan.txt index a2fc259587ba..146caeb93b9b 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q21.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q21.plan.txt @@ -10,23 +10,21 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) partial aggregation over (commitdate, name, name_7, nationkey, orderkey, orderstatus, receiptdate, suppkey, unique_49) join (LEFT, PARTITIONED): - remote exchange (REPARTITION, HASH, ["orderkey"]) + join (INNER, REPLICATED): join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["nationkey"]) + remote exchange (REPARTITION, HASH, ["orderkey"]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["orderkey"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["suppkey"]) - scan supplier - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["suppkey_0"]) - scan lineitem + remote exchange (REPARTITION, HASH, ["suppkey"]) + scan supplier local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["orderkey_3"]) - scan orders + remote exchange (REPARTITION, HASH, ["suppkey_0"]) + scan lineitem local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["nationkey_6"]) - scan nation + remote exchange (REPARTITION, HASH, ["orderkey_3"]) + scan orders + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan nation local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["orderkey_10"]) scan lineitem diff --git a/presto-main/src/main/java/io/prestosql/sql/planner/iterative/rule/DetermineJoinDistributionType.java b/presto-main/src/main/java/io/prestosql/sql/planner/iterative/rule/DetermineJoinDistributionType.java index 5305ea6d2c78..3971fc8c814e 100644 --- a/presto-main/src/main/java/io/prestosql/sql/planner/iterative/rule/DetermineJoinDistributionType.java +++ b/presto-main/src/main/java/io/prestosql/sql/planner/iterative/rule/DetermineJoinDistributionType.java @@ -14,6 +14,7 @@ package io.prestosql.sql.planner.iterative.rule; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Ordering; import io.airlift.units.DataSize; import io.prestosql.cost.CostComparator; @@ -25,9 +26,14 @@ import io.prestosql.matching.Pattern; import io.prestosql.sql.analyzer.FeaturesConfig.JoinDistributionType; import io.prestosql.sql.planner.TypeProvider; +import io.prestosql.sql.planner.iterative.Lookup; import io.prestosql.sql.planner.iterative.Rule; +import io.prestosql.sql.planner.optimizations.PlanNodeSearcher; import io.prestosql.sql.planner.plan.JoinNode; import io.prestosql.sql.planner.plan.PlanNode; +import io.prestosql.sql.planner.plan.TableScanNode; +import io.prestosql.sql.planner.plan.ValuesNode; +import io.prestosql.sql.tree.Unnest; import java.util.ArrayList; import java.util.List; @@ -44,6 +50,7 @@ import static io.prestosql.sql.planner.plan.JoinNode.Type.LEFT; import static io.prestosql.sql.planner.plan.JoinNode.Type.RIGHT; import static io.prestosql.sql.planner.plan.Patterns.join; +import static io.prestosql.util.MorePredicates.isInstanceOfAny; import static java.util.Objects.requireNonNull; public class DetermineJoinDistributionType @@ -88,7 +95,32 @@ public static boolean canReplicate(JoinNode joinNode, Context context) PlanNode buildSide = joinNode.getRight(); PlanNodeStatsEstimate buildSideStatsEstimate = context.getStatsProvider().getStats(buildSide); double buildSideSizeInBytes = buildSideStatsEstimate.getOutputSizeInBytes(buildSide.getOutputSymbols(), context.getSymbolAllocator().getTypes()); - return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes(); + return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes() + || getSourceTablesSizeInBytes(buildSide, context) <= joinMaxBroadcastTableSize.toBytes(); + } + + public static double getSourceTablesSizeInBytes(PlanNode node, Context context) + { + return getSourceTablesSizeInBytes(node, context.getLookup(), context.getStatsProvider(), context.getSymbolAllocator().getTypes()); + } + + @VisibleForTesting + static double getSourceTablesSizeInBytes(PlanNode node, Lookup lookup, StatsProvider statsProvider, TypeProvider typeProvider) + { + boolean hasExpandingNodes = PlanNodeSearcher.searchFrom(node, lookup) + .where(isInstanceOfAny(JoinNode.class, Unnest.class)) + .matches(); + if (hasExpandingNodes) { + return Double.NaN; + } + + List sourceNodes = PlanNodeSearcher.searchFrom(node, lookup) + .where(isInstanceOfAny(TableScanNode.class, ValuesNode.class)) + .findAll(); + + return sourceNodes.stream() + .mapToDouble(sourceNode -> statsProvider.getStats(sourceNode).getOutputSizeInBytes(sourceNode.getOutputSymbols(), typeProvider)) + .sum(); } private PlanNode getCostBasedJoin(JoinNode joinNode, Context context) @@ -99,7 +131,7 @@ private PlanNode getCostBasedJoin(JoinNode joinNode, Context context) addJoinsWithDifferentDistributions(joinNode.flipChildren(), possibleJoinNodes, context); if (possibleJoinNodes.stream().anyMatch(result -> result.getCost().hasUnknownComponents()) || possibleJoinNodes.isEmpty()) { - return getSyntacticOrderJoin(joinNode, context, AUTOMATIC); + return getSizeBasedJoin(joinNode, context); } // Using Ordering to facilitate rule determinism @@ -107,6 +139,36 @@ private PlanNode getCostBasedJoin(JoinNode joinNode, Context context) return planNodeOrderings.min(possibleJoinNodes).getPlanNode(); } + private JoinNode getSizeBasedJoin(JoinNode joinNode, Context context) + { + DataSize joinMaxBroadcastTableSize = getJoinMaxBroadcastTableSize(context.getSession()); + + boolean isRightSideSmall = getSourceTablesSizeInBytes(joinNode.getRight(), context) <= joinMaxBroadcastTableSize.toBytes(); + if (isRightSideSmall && !mustPartition(joinNode)) { + // choose right join side with small source tables as replicated build side + return joinNode.withDistributionType(REPLICATED); + } + + boolean isLeftSideSmall = getSourceTablesSizeInBytes(joinNode.getLeft(), context) <= joinMaxBroadcastTableSize.toBytes(); + if (isLeftSideSmall && !mustPartition(joinNode.flipChildren())) { + // choose join left side with small source tables as replicated build side + return joinNode.flipChildren().withDistributionType(REPLICATED); + } + + if (isRightSideSmall) { + // right side is small enough, but must be partitioned + return joinNode.withDistributionType(PARTITIONED); + } + + if (isLeftSideSmall) { + // left side is small enough, but must be partitioned + return joinNode.flipChildren().withDistributionType(PARTITIONED); + } + + // neither side is small enough, choose syntactic join order + return getSyntacticOrderJoin(joinNode, context, AUTOMATIC); + } + private void addJoinsWithDifferentDistributions(JoinNode joinNode, List possibleJoinNodes, Context context) { if (!mustPartition(joinNode) && canReplicate(joinNode, context)) { @@ -117,7 +179,7 @@ private void addJoinsWithDifferentDistributions(JoinNode joinNode, List result.getCost().hasUnknownComponents())) { - return node.withDistributionType(PARTITIONED); + return getSizeBaseDistributionType(node, context); } // Using Ordering to facilitate rule determinism @@ -99,6 +100,18 @@ private PlanNode getCostBasedDistributionType(SemiJoinNode node, Context context return planNodeOrderings.min(possibleJoinNodes).getPlanNode(); } + private PlanNode getSizeBaseDistributionType(SemiJoinNode node, Context context) + { + DataSize joinMaxBroadcastTableSize = getJoinMaxBroadcastTableSize(context.getSession()); + + if (getSourceTablesSizeInBytes(node.getFilteringSource(), context) <= joinMaxBroadcastTableSize.toBytes()) { + // choose replicated distribution type as filtering source contains small source tables only + return node.withDistributionType(REPLICATED); + } + + return node.withDistributionType(PARTITIONED); + } + private boolean canReplicate(SemiJoinNode node, Context context) { DataSize joinMaxBroadcastTableSize = getJoinMaxBroadcastTableSize(context.getSession()); @@ -106,7 +119,8 @@ private boolean canReplicate(SemiJoinNode node, Context context) PlanNode buildSide = node.getFilteringSource(); PlanNodeStatsEstimate buildSideStatsEstimate = context.getStatsProvider().getStats(buildSide); double buildSideSizeInBytes = buildSideStatsEstimate.getOutputSizeInBytes(buildSide.getOutputSymbols(), context.getSymbolAllocator().getTypes()); - return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes(); + return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes() + || getSourceTablesSizeInBytes(buildSide, context) <= joinMaxBroadcastTableSize.toBytes(); } private PlanNodeWithCost getSemiJoinNodeWithCost(SemiJoinNode possibleJoinNode, Context context) diff --git a/presto-main/src/test/java/io/prestosql/sql/planner/TestLogicalPlanner.java b/presto-main/src/test/java/io/prestosql/sql/planner/TestLogicalPlanner.java index 79efa289e961..fe63dba3ab34 100644 --- a/presto-main/src/test/java/io/prestosql/sql/planner/TestLogicalPlanner.java +++ b/presto-main/src/test/java/io/prestosql/sql/planner/TestLogicalPlanner.java @@ -48,6 +48,7 @@ import io.prestosql.sql.planner.plan.PlanNode; import io.prestosql.sql.planner.plan.ProjectNode; import io.prestosql.sql.planner.plan.SemiJoinNode; +import io.prestosql.sql.planner.plan.SemiJoinNode.DistributionType; import io.prestosql.sql.planner.plan.SortNode; import io.prestosql.sql.planner.plan.StatisticsWriterNode; import io.prestosql.sql.planner.plan.TableScanNode; @@ -1473,11 +1474,71 @@ public void testGroupingSetsWithDefaultValue() tableScan("orders")))))))))); } + @Test + public void testSizeBasedJoin() + { + // both local.sf100000.nation and local.sf100000.orders don't provide stats, therefore no reordering happens + assertDistributedPlan("SELECT custkey FROM local.\"sf42.5\".nation, local.\"sf42.5\".orders WHERE nation.nationkey = orders.custkey", + automaticJoinDistribution(), + output( + anyTree( + join(INNER, ImmutableList.of(equiJoinClause("NATIONKEY", "CUSTKEY")), + anyTree( + tableScan("nation", ImmutableMap.of("NATIONKEY", "nationkey"))), + anyTree( + tableScan("orders", ImmutableMap.of("CUSTKEY", "custkey"))))))); + + // values node provides stats + assertDistributedPlan("SELECT custkey FROM (VALUES CAST(1 AS BIGINT), CAST(2 AS BIGINT)) t(a), local.\"sf42.5\".orders WHERE t.a = orders.custkey", + automaticJoinDistribution(), + output( + anyTree( + join(INNER, ImmutableList.of(equiJoinClause("CUSTKEY", "T_A")), Optional.empty(), Optional.of(REPLICATED), + anyTree( + tableScan("orders", ImmutableMap.of("CUSTKEY", "custkey"))), + anyTree( + values("T_A")))))); + } + + @Test + public void testSizeBasedSemiJoin() + { + // both local.sf100000.nation and local.sf100000.orders don't provide stats, therefore no reordering happens + assertDistributedPlan("SELECT custkey FROM local.\"sf42.5\".orders WHERE orders.custkey IN (SELECT nationkey FROM local.\"sf42.5\".nation)", + automaticJoinDistribution(), + output( + anyTree( + semiJoin("CUSTKEY", "NATIONKEY", "OUT", Optional.of(DistributionType.PARTITIONED), + anyTree( + tableScan("orders", ImmutableMap.of("CUSTKEY", "custkey"))), + anyTree( + tableScan("nation", ImmutableMap.of("NATIONKEY", "nationkey"))))))); + + // values node provides stats + assertDistributedPlan("SELECT custkey FROM local.\"sf42.5\".orders WHERE orders.custkey IN (SELECT t.a FROM (VALUES CAST(1 AS BIGINT), CAST(2 AS BIGINT)) t(a))", + automaticJoinDistribution(), + output( + anyTree( + semiJoin("CUSTKEY", "T_A", "OUT", Optional.of(DistributionType.REPLICATED), + anyTree( + tableScan("orders", ImmutableMap.of("CUSTKEY", "custkey"))), + anyTree( + values("T_A")))))); + } + private Session noJoinReordering() { return Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(JOIN_REORDERING_STRATEGY, JoinReorderingStrategy.NONE.name()) - .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinNode.DistributionType.PARTITIONED.name()) + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.PARTITIONED.name()) + .build(); + } + + private Session automaticJoinDistribution() + { + return Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(JOIN_REORDERING_STRATEGY, JoinReorderingStrategy.NONE.name()) + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) .build(); } } diff --git a/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java b/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java index c2b709fdc7aa..c560490017ec 100644 --- a/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java +++ b/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java @@ -14,6 +14,7 @@ package io.prestosql.sql.planner.iterative.rule; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; import io.prestosql.cost.CostComparator; import io.prestosql.cost.PlanNodeStatsEstimate; @@ -21,13 +22,18 @@ import io.prestosql.cost.TaskCountEstimator; import io.prestosql.spi.type.VarcharType; import io.prestosql.sql.analyzer.FeaturesConfig.JoinDistributionType; +import io.prestosql.sql.planner.PlanNodeIdAllocator; import io.prestosql.sql.planner.Symbol; +import io.prestosql.sql.planner.iterative.rule.test.PlanBuilder; import io.prestosql.sql.planner.iterative.rule.test.RuleAssert; import io.prestosql.sql.planner.iterative.rule.test.RuleTester; import io.prestosql.sql.planner.plan.JoinNode; import io.prestosql.sql.planner.plan.JoinNode.DistributionType; import io.prestosql.sql.planner.plan.JoinNode.Type; import io.prestosql.sql.planner.plan.PlanNodeId; +import io.prestosql.sql.planner.plan.TableScanNode; +import io.prestosql.sql.planner.plan.ValuesNode; +import io.prestosql.testing.TestingMetadata.TestingColumnHandle; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -40,8 +46,11 @@ import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType; import static io.prestosql.sql.planner.assertions.PlanMatchPattern.enforceSingleRow; import static io.prestosql.sql.planner.assertions.PlanMatchPattern.equiJoinClause; +import static io.prestosql.sql.planner.assertions.PlanMatchPattern.filter; import static io.prestosql.sql.planner.assertions.PlanMatchPattern.join; import static io.prestosql.sql.planner.assertions.PlanMatchPattern.values; +import static io.prestosql.sql.planner.iterative.Lookup.noLookup; +import static io.prestosql.sql.planner.iterative.rule.DetermineJoinDistributionType.getSourceTablesSizeInBytes; import static io.prestosql.sql.planner.iterative.rule.test.PlanBuilder.expression; import static io.prestosql.sql.planner.iterative.rule.test.PlanBuilder.expressions; import static io.prestosql.sql.planner.iterative.rule.test.RuleTester.defaultRuleTester; @@ -51,6 +60,9 @@ import static io.prestosql.sql.planner.plan.JoinNode.Type.INNER; import static io.prestosql.sql.planner.plan.JoinNode.Type.LEFT; import static io.prestosql.sql.planner.plan.JoinNode.Type.RIGHT; +import static io.prestosql.sql.tree.BooleanLiteral.TRUE_LITERAL; +import static java.lang.Double.NaN; +import static org.testng.Assert.assertEquals; @Test(singleThreaded = true) public class TestDetermineJoinDistributionType @@ -643,6 +655,185 @@ public void testReplicatesWhenNotRestricted() values(ImmutableMap.of("B1", 0)))); } + @Test + public void testReplicatesWhenSourceIsSmall() + { + VarcharType symbolType = createUnboundedVarcharType(); // variable width so that average row size is respected + int aRows = 10_000; + int bRows = 10; + + // output size exceeds AUTOMATIC_RESTRICTED limit + PlanNodeStatsEstimate aStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(aRows) + .addSymbolStatistics(ImmutableMap.of(new Symbol("A1"), new SymbolStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + // output size exceeds AUTOMATIC_RESTRICTED limit + PlanNodeStatsEstimate bStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(bRows) + .addSymbolStatistics(ImmutableMap.of(new Symbol("B1"), new SymbolStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + // output size does not exceed AUTOMATIC_RESTRICTED limit + PlanNodeStatsEstimate bSourceStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(bRows) + .addSymbolStatistics(ImmutableMap.of(new Symbol("B1"), new SymbolStatsEstimate(0, 100, 0, 64, 10))) + .build(); + + // immediate join sources exceeds AUTOMATIC_RESTRICTED limit but build tables are small + // therefore replicated distribution type is chosen + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", aStatsEstimate) + .overrideStats("filterB", bStatsEstimate) + .overrideStats("valuesB", bSourceStatsEstimate) + .on(p -> { + Symbol a1 = p.symbol("A1", symbolType); + Symbol b1 = p.symbol("B1", symbolType); + return p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.filter( + new PlanNodeId("filterB"), + TRUE_LITERAL, + p.values(new PlanNodeId("valuesB"), bRows, b1)), + ImmutableList.of(new JoinNode.EquiJoinClause(a1, b1)), + ImmutableList.of(a1), + ImmutableList.of(b1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + filter("true", values(ImmutableMap.of("B1", 0))))); + + // same but with join sides reversed + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", aStatsEstimate) + .overrideStats("filterB", bStatsEstimate) + .overrideStats("valuesB", bSourceStatsEstimate) + .on(p -> { + Symbol a1 = p.symbol("A1", symbolType); + Symbol b1 = p.symbol("B1", symbolType); + return p.join( + INNER, + p.filter( + new PlanNodeId("filterB"), + TRUE_LITERAL, + p.values(new PlanNodeId("valuesB"), bRows, b1)), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1), + ImmutableList.of(a1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + filter("true", values(ImmutableMap.of("B1", 0))))); + + // only probe side (with small tables) source stats are available, join sides should be flipped + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("filterB", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bSourceStatsEstimate) + .on(p -> { + Symbol a1 = p.symbol("A1", symbolType); + Symbol b1 = p.symbol("B1", symbolType); + return p.join( + LEFT, + p.filter( + new PlanNodeId("filterB"), + TRUE_LITERAL, + p.values(new PlanNodeId("valuesB"), bRows, b1)), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1), + ImmutableList.of(a1), + Optional.empty()); + }) + .matches(join( + RIGHT, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + filter("true", values(ImmutableMap.of("B1", 0))))); + } + + @Test + public void testGetSourceTablesSizeInBytes() + { + PlanBuilder planBuilder = new PlanBuilder(new PlanNodeIdAllocator(), tester.getMetadata()); + Symbol symbol = planBuilder.symbol("col"); + Symbol sourceSymbol1 = planBuilder.symbol("source1"); + Symbol sourceSymbol2 = planBuilder.symbol("soruce2"); + + // missing source stats + assertEquals( + getSourceTablesSizeInBytes( + planBuilder.values(symbol), + noLookup(), + node -> PlanNodeStatsEstimate.unknown(), + planBuilder.getTypes()), + NaN); + + // two source plan nodes + PlanNodeStatsEstimate sourceStatsEstimate1 = PlanNodeStatsEstimate.builder() + .setOutputRowCount(10) + .build(); + PlanNodeStatsEstimate sourceStatsEstimate2 = PlanNodeStatsEstimate.builder() + .setOutputRowCount(20) + .build(); + assertEquals( + getSourceTablesSizeInBytes( + planBuilder.union( + ImmutableListMultimap.builder() + .put(symbol, sourceSymbol1) + .put(symbol, sourceSymbol2) + .build(), + ImmutableList.of( + planBuilder.tableScan( + ImmutableList.of(sourceSymbol1), + ImmutableMap.of(sourceSymbol1, new TestingColumnHandle("col"))), + planBuilder.values(new PlanNodeId("valuesNode"), sourceSymbol2))), + noLookup(), + node -> { + if (node instanceof TableScanNode) { + return sourceStatsEstimate1; + } + + if (node instanceof ValuesNode) { + return sourceStatsEstimate2; + } + + return PlanNodeStatsEstimate.unknown(); + }, + planBuilder.getTypes()), + 270.0); + + // join node + assertEquals( + getSourceTablesSizeInBytes( + planBuilder.join( + INNER, + planBuilder.values(sourceSymbol1), + planBuilder.values(sourceSymbol2)), + noLookup(), + node -> sourceStatsEstimate1, + planBuilder.getTypes()), + NaN); + } + private RuleAssert assertDetermineJoinDistributionType() { return assertDetermineJoinDistributionType(COST_COMPARATOR); diff --git a/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/TestDetermineSemiJoinDistributionType.java b/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/TestDetermineSemiJoinDistributionType.java index bf92f840c3cb..e0f51d6114b8 100644 --- a/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/TestDetermineSemiJoinDistributionType.java +++ b/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/TestDetermineSemiJoinDistributionType.java @@ -35,12 +35,14 @@ import static io.prestosql.SystemSessionProperties.JOIN_MAX_BROADCAST_TABLE_SIZE; import static io.prestosql.spi.type.BigintType.BIGINT; import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType; +import static io.prestosql.sql.planner.assertions.PlanMatchPattern.filter; import static io.prestosql.sql.planner.assertions.PlanMatchPattern.semiJoin; import static io.prestosql.sql.planner.assertions.PlanMatchPattern.values; import static io.prestosql.sql.planner.iterative.rule.test.PlanBuilder.expressions; import static io.prestosql.sql.planner.iterative.rule.test.RuleTester.defaultRuleTester; import static io.prestosql.sql.planner.plan.SemiJoinNode.DistributionType.PARTITIONED; import static io.prestosql.sql.planner.plan.SemiJoinNode.DistributionType.REPLICATED; +import static io.prestosql.sql.tree.BooleanLiteral.TRUE_LITERAL; @Test(singleThreaded = true) public class TestDetermineSemiJoinDistributionType @@ -301,6 +303,59 @@ public void testReplicatesWhenNotRestricted() values(ImmutableMap.of("B1", 0)))); } + @Test + public void testReplicatesWhenSourceIsSmall() + { + Type symbolType = createUnboundedVarcharType(); // variable width so that average row size is respected + int aRows = 10_000; + int bRows = 10; + + PlanNodeStatsEstimate probeSideStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(aRows) + .addSymbolStatistics(ImmutableMap.of(new Symbol("A1"), new SymbolStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + PlanNodeStatsEstimate buildSideStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(bRows) + .addSymbolStatistics(ImmutableMap.of(new Symbol("B1"), new SymbolStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + PlanNodeStatsEstimate buildSideSourceStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(bRows) + .addSymbolStatistics(ImmutableMap.of(new Symbol("B1"), new SymbolStatsEstimate(0, 100, 0, 64, 10))) + .build(); + + // build side exceeds AUTOMATIC_RESTRICTED limit but source plan nodes are small + // therefore replicated distribution type is chosen + assertDetermineSemiJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", probeSideStatsEstimate) + .overrideStats("filterB", buildSideStatsEstimate) + .overrideStats("valuesB", buildSideSourceStatsEstimate) + .on(p -> { + Symbol a1 = p.symbol("A1", symbolType); + Symbol b1 = p.symbol("B1", symbolType); + return p.semiJoin( + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.filter( + new PlanNodeId("filterB"), + TRUE_LITERAL, + p.values(new PlanNodeId("valuesB"), bRows, b1)), + a1, + b1, + p.symbol("output"), + Optional.empty(), + Optional.empty(), + Optional.empty()); + }) + .matches(semiJoin( + "A1", + "B1", + "output", + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + filter("true", values(ImmutableMap.of("B1", 0))))); + } + private RuleAssert assertDetermineSemiJoinDistributionType() { return assertDetermineSemiJoinDistributionType(COST_COMPARATOR); diff --git a/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/test/PlanBuilder.java b/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/test/PlanBuilder.java index 5c8789b875a2..6e2cb8bb46d2 100644 --- a/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/presto-main/src/test/java/io/prestosql/sql/planner/iterative/rule/test/PlanBuilder.java @@ -296,7 +296,12 @@ public MarkDistinctNode markDistinct(Symbol markerSymbol, List distinctS public FilterNode filter(Expression predicate, PlanNode source) { - return new FilterNode(idAllocator.getNextId(), source, predicate); + return filter(idAllocator.getNextId(), predicate, source); + } + + public FilterNode filter(PlanNodeId planNodeId, Expression predicate, PlanNode source) + { + return new FilterNode(planNodeId, source, predicate); } public AggregationNode aggregation(Consumer aggregationBuilderConsumer)