Skip to content

Commit

Permalink
Increse nsharers only if the node belongs to consumers of current sli…
Browse files Browse the repository at this point in the history
…ce (#12447)

Recently I built from GreenPlum master branch to run TPC-DS query with 1GB data. For Q47 and Q57, when I turned off GUC `execute_pruned_plan` (on by default), some of worker processes will be hang and the query never returns.

Take Q57 as an example. My cluster configuration is 1 QD + 2 QE. The query looks like:

```sql
with v1 as(
  select
    i_category,i_brand,
    cc_name,d_year,d_moy,
    sum(cs_sales_price) sum_sales,
    avg(sum(cs_sales_price)) over (partition by
      i_category,i_brand,cc_name,d_year)
    avg_monthly_sales,
    rank() over (partition by
      i_category,i_brand,cc_name
    order by
      d_year,d_moy
    ) rn
  from
    item,catalog_sales,date_dim,call_center
  where
    cs_item_sk = i_item_sk and
    cs_sold_date_sk = d_date_sk and
    cc_call_center_sk= cs_call_center_sk and(
      d_year = 1999 or
      ( d_year = 1999-1 and d_moy =12) or
      ( d_year = 1999+1 and d_moy =1)
    )
  group by
    i_category,i_brand,cc_name,d_year,d_moy
),
v2 as(
  select
    v1.i_category,v1.i_brand,v1.cc_name,
    v1.d_year,v1.d_moy,v1.avg_monthly_sales,
    v1.sum_sales,v1_lag.sum_sales psum,
    v1_lead.sum_sales nsum
  from
    v1,v1 v1_lag,v1 v1_lead
  where
    v1.i_category = v1_lag.i_category and
    v1.i_category = v1_lead.i_category and
    v1.i_brand = v1_lag.i_brand and
    v1.i_brand = v1_lead.i_brand and
    v1. cc_name = v1_lag. cc_name and
    v1. cc_name = v1_lead. cc_name and
    v1.rn = v1_lag.rn + 1 and
    v1.rn = v1_lead.rn - 1
)
select *
from v2
where
  d_year = 1999 and
  avg_monthly_sales > 0 and
  case when avg_monthly_sales > 0 then
    abs(sum_sales - avg_monthly_sales) / avg_monthly_sales
    else null end > 0.1
order by
  sum_sales - avg_monthly_sales,3
limit 100;
```

When `execute_pruned_plan` is on by default, the plan looks like:

```
                                                                                                                                                                 QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Result  (cost=0.00..2832.84 rows=1 width=64) (actual time=10792.606..10792.702 rows=100 loops=1)
   ->  Gather Motion 2:1  (slice1; segments: 2)  (cost=0.00..2832.84 rows=1 width=64) (actual time=10792.597..10792.673 rows=100 loops=1)
         Merge Key: ((share0_ref4.sum_sales - share0_ref4.avg_monthly_sales)), share0_ref4.cc_name
         ->  Sort  (cost=0.00..2832.84 rows=1 width=72) (actual time=10791.203..10791.225 rows=50 loops=1)
               Sort Key: ((share0_ref4.sum_sales - share0_ref4.avg_monthly_sales)), share0_ref4.cc_name
               Sort Method:  quicksort  Memory: 152kB
               ->  Sequence  (cost=0.00..2832.84 rows=1 width=72) (actual time=10790.522..10790.559 rows=50 loops=1)
                     ->  Shared Scan (share slice:id 1:0)  (cost=0.00..1539.83 rows=1 width=1) (actual time=10140.895..10145.397 rows=16510 loops=1)
                           ->  WindowAgg  (cost=0.00..1539.83 rows=1 width=56) (actual time=10082.465..10128.750 rows=16510 loops=1)
                                 Partition By: item.i_category, item.i_brand, call_center.cc_name
                                 Order By: date_dim.d_year, date_dim.d_moy
                                 ->  Sort  (cost=0.00..1539.83 rows=1 width=48) (actual time=10082.429..10084.923 rows=16510 loops=1)
                                       Sort Key: item.i_category, item.i_brand, call_center.cc_name, date_dim.d_year, date_dim.d_moy
                                       Sort Method:  quicksort  Memory: 20078kB
                                       ->  Redistribute Motion 2:2  (slice2; segments: 2)  (cost=0.00..1539.83 rows=1 width=48) (actual time=9924.269..9989.657 rows=16510 loops=1)
                                             Hash Key: item.i_category, item.i_brand, call_center.cc_name
                                             ->  WindowAgg  (cost=0.00..1539.83 rows=1 width=48) (actual time=9924.717..9974.500 rows=16633 loops=1)
                                                   Partition By: item.i_category, item.i_brand, call_center.cc_name, date_dim.d_year
                                                   ->  Sort  (cost=0.00..1539.83 rows=1 width=126) (actual time=9924.662..9927.280 rows=16633 loops=1)
                                                         Sort Key: item.i_category, item.i_brand, call_center.cc_name, date_dim.d_year
                                                         Sort Method:  quicksort  Memory: 20076kB
                                                         ->  Redistribute Motion 2:2  (slice3; segments: 2)  (cost=0.00..1539.83 rows=1 width=126) (actual time=9394.220..9856.375 rows=16633 loops=1)
                                                               Hash Key: item.i_category, item.i_brand, call_center.cc_name, date_dim.d_year
                                                               ->  GroupAggregate  (cost=0.00..1539.83 rows=1 width=126) (actual time=9391.783..9833.988 rows=16424 loops=1)
                                                                     Group Key: item.i_category, item.i_brand, call_center.cc_name, date_dim.d_year, date_dim.d_moy
                                                                     ->  Sort  (cost=0.00..1539.83 rows=1 width=124) (actual time=9397.448..9628.606 rows=174584 loops=1)
                                                                           Sort Key: item.i_category, item.i_brand, call_center.cc_name, date_dim.d_year, date_dim.d_moy
                                                                           Sort Method:  external merge  Disk: 134144kB
                                                                           ->  Redistribute Motion 2:2  (slice4; segments: 2)  (cost=0.00..1539.83 rows=1 width=124) (actual time=6107.447..8237.581 rows=174584 loops=1)
                                                                                 Hash Key: item.i_category, item.i_brand, call_center.cc_name, date_dim.d_year, date_dim.d_moy
                                                                                 ->  Hash Join  (cost=0.00..1539.83 rows=1 width=124) (actual time=6112.706..7088.349 rows=178669 loops=1)
                                                                                       Hash Cond: (date_dim.d_date_sk = catalog_sales.cs_sold_date_sk)
                                                                                       ->  Seq Scan on date_dim  (cost=0.00..436.38 rows=204 width=12) (actual time=10.656..17.972 rows=222 loops=1)
                                                                                             Filter: ((d_year = 1999) OR ((d_year = 1998) AND (d_moy = 12)) OR ((d_year = 2000) AND (d_moy = 1)))
                                                                                             Rows Removed by Filter: 36504
                                                                                       ->  Hash  (cost=1103.41..1103.41 rows=1 width=120) (actual time=6100.040..6100.040 rows=1430799 loops=1)
                                                                                             Buckets: 16384 (originally 16384)  Batches: 32 (originally 1)  Memory Usage: 12493kB
                                                                                             ->  Broadcast Motion 2:2  (slice5; segments: 2)  (cost=0.00..1103.41 rows=1 width=120) (actual time=1.802..5410.377 rows=1434428 loops=1)
                                                                                                   ->  Nested Loop  (cost=0.00..1103.40 rows=1 width=120) (actual time=1.632..5127.625 rows=718766 loops=1)
                                                                                                         Join Filter: true
                                                                                                         ->  Redistribute Motion 2:2  (slice6; segments: 2)  (cost=0.00..1097.40 rows=1 width=22) (actual time=1.564..362.958 rows=718766 loops=1)
                                                                                                               Hash Key: catalog_sales.cs_item_sk
                                                                                                               ->  Hash Join  (cost=0.00..1097.40 rows=1 width=22) (actual time=1.112..996.643 rows=717589 loops=1)
                                                                                                                     Hash Cond: (catalog_sales.cs_call_center_sk = call_center.cc_call_center_sk)
                                                                                                                     ->  Seq Scan on catalog_sales  (cost=0.00..509.10 rows=720774 width=18) (actual time=0.144..602.362 rows=721193 loops=1)
                                                                                                                     ->  Hash  (cost=431.00..431.00 rows=1 width=12) (actual time=0.022..0.022 rows=6 loops=1)
                                                                                                                           Buckets: 32768  Batches: 1  Memory Usage: 257kB
                                                                                                                           ->  Broadcast Motion 2:2  (slice7; segments: 2)  (cost=0.00..431.00 rows=1 width=12) (actual time=0.009..0.012 rows=6 loops=1)
                                                                                                                                 ->  Seq Scan on call_center  (cost=0.00..431.00 rows=1 width=12) (actual time=0.032..0.035 rows=4 loops=1)
                                                                                                         ->  Index Scan using item_pkey on item  (cost=0.00..6.00 rows=1 width=102) (actual time=0.000..0.006 rows=1 loops=718766)
                                                                                                               Index Cond: (i_item_sk = catalog_sales.cs_item_sk)
                     ->  Redistribute Motion 1:2  (slice8)  (cost=0.00..1293.01 rows=1 width=72) (actual time=646.614..646.646 rows=50 loops=1)
                           ->  Limit  (cost=0.00..1293.01 rows=1 width=72) (actual time=10787.533..10787.700 rows=100 loops=1)
                                 ->  Gather Motion 2:1  (slice9; segments: 2)  (cost=0.00..1293.01 rows=1 width=72) (actual time=10787.527..10787.654 rows=100 loops=1)
                                       Merge Key: ((share0_ref4.sum_sales - share0_ref4.avg_monthly_sales)), share0_ref4.cc_name
                                       ->  Sort  (cost=0.00..1293.01 rows=1 width=72) (actual time=10789.933..10789.995 rows=357 loops=1)
                                             Sort Key: ((share0_ref4.sum_sales - share0_ref4.avg_monthly_sales)), share0_ref4.cc_name
                                             Sort Method:  quicksort  Memory: 14998kB
                                             ->  Result  (cost=0.00..1293.01 rows=1 width=150) (actual time=10648.280..10774.898 rows=12379 loops=1)
                                                   Filter: ((share0_ref4.d_year = 1999) AND (share0_ref4.avg_monthly_sales > '0'::numeric) AND (CASE WHEN (share0_ref4.avg_monthly_sales > '0'::numeric) THEN (abs((share0_ref4.sum_sales - share0_ref4.avg_monthly_sales)) / share0_ref4.avg_monthly_sales) ELSE NULL::numeric END > 0.1))
                                                   ->  Hash Join  (cost=0.00..1293.01 rows=1 width=150) (actual time=10648.253..10740.262 rows=13582 loops=1)
                                                         Hash Cond: ((share0_ref4.i_category = share0_ref3.i_category) AND (share0_ref4.i_brand = share0_ref3.i_brand) AND ((share0_ref4.cc_name)::text = (share0_ref3.cc_name)::text) AND (share0_ref4.rn = (share0_ref3.rn + 1)) AND (share0_ref4.rn = (share0_ref2.rn - 1)))
                                                         ->  Shared Scan (share slice:id 9:0)  (cost=0.00..431.00 rows=1 width=142) (actual time=0.013..5.570 rows=16510 loops=1)
                                                         ->  Hash  (cost=862.00..862.00 rows=1 width=142) (actual time=10647.380..10647.380 rows=209076 loops=1)
                                                               Buckets: 65536 (originally 32768)  Batches: 2 (originally 1)  Memory Usage: 31389kB
                                                               ->  Hash Join  (cost=0.00..862.00 rows=1 width=142) (actual time=10156.494..10374.421 rows=209076 loops=1)
                                                                     Hash Cond: ((share0_ref3.i_category = share0_ref2.i_category) AND (share0_ref3.i_brand = share0_ref2.i_brand) AND ((share0_ref3.cc_name)::text = (share0_ref2.cc_name)::text))
                                                                     ->  Shared Scan (share slice:id 9:0)  (cost=0.00..431.00 rows=1 width=126) (actual time=0.009..6.887 rows=16510 loops=1)
                                                                     ->  Hash  (cost=431.00..431.00 rows=1 width=126) (actual time=10156.297..10156.298 rows=16178 loops=1)
                                                                           Buckets: 32768  Batches: 1  Memory Usage: 3144kB
                                                                           ->  Shared Scan (share slice:id 9:0)  (cost=0.00..431.00 rows=1 width=126) (actual time=10139.421..10144.473 rows=16510 loops=1)
 Planning Time: 1905.667 ms
   (slice0)    Executor memory: 330K bytes.
   (slice1)    Executor memory: 4750K bytes avg x 2 workers, 4968K bytes max (seg1).  Work_mem: 4861K bytes max.
   (slice2)    Executor memory: 4701K bytes avg x 2 workers, 4952K bytes max (seg0).  Work_mem: 4894K bytes max.
   (slice3)    Executor memory: 12428K bytes avg x 2 workers, 12428K bytes max (seg0).  Work_mem: 12375K bytes max.
 * (slice4)    Executor memory: 14021K bytes avg x 2 workers, 14021K bytes max (seg0).  Work_mem: 12493K bytes max, 221759K bytes wanted.
   (slice5)    Executor memory: 77K bytes avg x 2 workers, 77K bytes max (seg0).
   (slice6)    Executor memory: 323K bytes avg x 2 workers, 323K bytes max (seg0).  Work_mem: 257K bytes max.
   (slice7)    Executor memory: 39K bytes avg x 2 workers, 39K bytes max (seg0).
   (slice8)    Executor memory: 242K bytes (entry db).
 * (slice9)    Executor memory: 35344K bytes avg x 2 workers, 35360K bytes max (seg1).  Work_mem: 31389K bytes max, 37501K bytes wanted.
 Memory used:  128000kB
 Memory wanted:  3328681kB
 Optimizer: Pivotal Optimizer (GPORCA)
 Execution Time: 10856.507 ms
(86 rows)

Time: 12779.991 ms (00:12.780)
```

There is only one share slice in this query, one producer in slice 1, three consumers in slice 9. However, when I turned GUC off, the query never returns, and the process situation looks like:

```
postgres  22285  22255  0 03:03 pts/1    00:00:00 psql -p9221
postgres  22288  20912  3 03:03 ?        00:00:03 postgres:  9221, postgres tpcds [local] con150 cmd16 EXPLAIN
postgres  22294  20939  0 03:03 ?        00:00:00 postgres:  9210, postgres tpcds 172.17.0.50(60732) con150 seg0 cmd17 slice1 MPPEXEC SELECT
postgres  22295  20950  0 03:03 ?        00:00:00 postgres:  9211, postgres tpcds 172.17.0.50(36177) con150 seg1 cmd17 slice1 MPPEXEC SELECT
postgres  22306  20939  5 03:03 ?        00:00:04 postgres:  9210, postgres tpcds 172.17.0.50(60742) con150 seg0 idle
postgres  22307  20950  5 03:03 ?        00:00:04 postgres:  9211, postgres tpcds 172.17.0.50(36187) con150 seg1 idle
postgres  22310  20939 11 03:03 ?        00:00:10 postgres:  9210, postgres tpcds 172.17.0.50(60745) con150 seg0 idle
postgres  22311  20950 12 03:03 ?        00:00:11 postgres:  9211, postgres tpcds 172.17.0.50(36190) con150 seg1 idle
postgres  22314  20939  5 03:03 ?        00:00:04 postgres:  9210, postgres tpcds 172.17.0.50(60748) con150 seg0 idle
postgres  22315  20950  5 03:03 ?        00:00:04 postgres:  9211, postgres tpcds 172.17.0.50(36193) con150 seg1 idle
postgres  22318  20939  1 03:03 ?        00:00:01 postgres:  9210, postgres tpcds 172.17.0.50(60750) con150 seg0 idle
postgres  22319  20950  2 03:03 ?        00:00:01 postgres:  9211, postgres tpcds 172.17.0.50(36195) con150 seg1 idle
postgres  22322  20912  0 03:03 ?        00:00:00 postgres:  9221, postgres tpcds [local] con150 seg-1 idle
postgres  22324  20939  0 03:03 ?        00:00:00 postgres:  9210, postgres tpcds 172.17.0.50(60754) con150 seg0 idle
postgres  22325  20950  0 03:03 ?        00:00:00 postgres:  9211, postgres tpcds 172.17.0.50(36199) con150 seg1 idle
postgres  22348  20939  0 03:05 ?        00:00:00 postgres:  9210, postgres tpcds 172.17.0.50(45936) con150 seg0 idle
postgres  22349  20950  0 03:05 ?        00:00:00 postgres:  9211, postgres tpcds 172.17.0.50(49614) con150 seg1 idle
postgres  22352  20939  4 03:05 ?        00:00:00 postgres:  9210, postgres tpcds 172.17.0.50(45939) con150 seg0 idle
postgres  22353  20950  4 03:05 ?        00:00:00 postgres:  9211, postgres tpcds 172.17.0.50(49617) con150 seg1 idle
```

According to my debugging, the stack of slice 1 processes looks like:

```
#0  0x00007fde606f94f3 in epoll_wait () from /lib64/libc.so.6
#1  0x0000000000d2eec1 in WaitEventSetWaitBlock (set=0x87d8fe0, cur_timeout=-1, occurred_events=0x7ffce695fe00, nevents=1) at latch.c:1081
#2  0x0000000000d2ed9a in WaitEventSetWait (set=0x87d8fe0, timeout=-1, occurred_events=0x7ffce695fe00, nevents=1, wait_event_info=0) at latch.c:1033
#3  0x0000000000d5987d in ConditionVariableSleep (cv=0x7fde540890b0, wait_event_info=0) at condition_variable.c:157
#4  0x0000000000b30a61 in shareinput_writer_waitdone (ref=0x87da950, nconsumers=1) at nodeShareInputScan.c:994
#5  0x0000000000b2fe89 in ExecEndShareInputScan (node=0x88c2ec0) at nodeShareInputScan.c:522
#6  0x0000000000ad63e8 in ExecEndNode (node=0x88c2ec0) at execProcnode.c:888
#7  0x0000000000b3237b in ExecEndSequence (node=0x88c2d80) at nodeSequence.c:132
#8  0x0000000000ad623f in ExecEndNode (node=0x88c2d80) at execProcnode.c:779
#9  0x0000000000b1772e in ExecEndSort (node=0x88c2658) at nodeSort.c:365
```

That is to say, the producer is waiting for consumers to wake it up, while the consumers didn't. According to further debugging, I found a **squelch** is triggered on the *Gather Motion* node upstream of three ShareInputScan consumer nodes. In the squelch logic of ShareInputScan, the consumer will notify producer only if `ndone == nsharers`:

```c
local_state->ndone++;

if (local_state->ndone == local_state->nsharers)
{
    shareinput_reader_notifydone(node->ref, sisc->nconsumers);
    local_state->closed = true;
}
```

While `ndone` will be accumulated one by one consumer, `nsharers` is initialized in ExecInitNode. However, GUC `execute_pruned_plan` affects the root node where the Executor starts to call `ExecInitNode`:

- `execute_pruned_plan` set to true: the initialization will start at the root node of slice 9, `nsharers` will be 3
- `execute_pruned_plan` set to false: the initialization will start at the root node of the whole plan tree, `nsharers` will be 4, then `ndone == nsharers` will never establish, because we only have three consumers, `ndone` will be 3 at most

According to my understanding, the algorithm should work well no matter this GUC is set to true or false. So I add some conditions in the process of initialization of `nsharers`: to accumulate `nsharers` only when initializing consumer nodes of current slice. Then this algorithm should be working fine.
  • Loading branch information
mrdrivingduck authored Sep 9, 2021
1 parent ec02439 commit 3f51ee6
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/backend/executor/nodeShareInputScan.c
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,17 @@ ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags)
}

local_state = list_nth(estate->es_sharenode, node->share_id);
local_state->nsharers++;

/*
* To accumulate the number of CTE consumers executed in this slice.
* This variable will be used by the last finishing CTE consumer
* in current slice, to wake the corresponding CTE producer up for
* cleaning the materialized tuplestore, during squelching.
*/
if (currentSliceId == node->this_slice_id &&
currentSliceId != node->producer_slice_id)
local_state->nsharers++;

if (childState)
local_state->childState = childState;
sisstate->local_state = local_state;
Expand Down

0 comments on commit 3f51ee6

Please sign in to comment.