Skip to content

Commit

Permalink
Merge #41257
Browse files Browse the repository at this point in the history
41257: distsql: join UNION ALL streams on a single node r=andreimatei a=RaduBerinde

Currently, in most cases UNION ALL uses all the streams from both
sides, which can result in unnecessary parallelization downstream.

This change merges streams from the same node into a no-op processor.
Ideally, if the next processor would be on the same node, we wouldn't
need the no-op processor but implementing that would be much more
involved.

Fixes #41251.

Release note: None

Release justification: Needed for a fix for high-severity bug in
existing functionality.

Co-authored-by: Radu Berinde <radu@cockroachlabs.com>
  • Loading branch information
craig[bot] and RaduBerinde committed Oct 5, 2019
2 parents 42247cb + 9c211cd commit e6e9161
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 3 deletions.
10 changes: 10 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3051,6 +3051,16 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
execinfrapb.PostProcessSpec{},
p.ResultTypes,
)
} else {
// With UNION ALL, we can end up with multiple streams on the same node.
// We don't want to have unnecessary routers and cross-node streams, so
// merge these streams now.
//
// More importantly, we need to guarantee that if everything is planned
// on a single node (which is always the case when there are mutations),
// we can fuse everything so there are no concurrent KV operations (see
// #40487, #41307).
p.EnsureSingleStreamPerNode()
}
}
} else {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/opt/exec/execbuilder/testdata/distsql_union
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ subtest Union
query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT x FROM xyz UNION ALL SELECT x FROM xyz ORDER BY x]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJy0ll9v2jAUxd_3Kaz7tGmOgp2EP3miW5mExKADJm2qeMiIVSFRnDlBgiK--5RkUQui13YIj0l8jn8-9whzgPTvGkKYDUaDr3OyVWvybTr5Th4Hvx5Gd8Mx-Xg_nM1nP0afyP8lu3LBbv9Cfo6HkzG5G40ufJtM7wdT8uU32S2AwkbGYhw9ixTCR2BAgQMFDyj4QCGABYVEyaVIU6nyJYdCMIx3ELYorDbJNstfLygspRIQHiBbZWsBIcyjP2sxFVEslNsCCrHIotW62CZRq-dI7fu7_QtQmCXRJg2J4-Y7T7ZZSPoMFkcKcpu92qdZ9CQgZEdaDyFoDIHbIMykyoRy2VkCffb5XX-vjn_QhP-rrVSxUCK-ZHoBYiwdmbj89Izvbe-fbM_MJ8gMJuhyx_Wsa2QB0W4QgttAVINmdQdt6N-u68_NQ-QmIXqO61tP0gKi0yAEt4GokuY3Srry79T198xD9ExC9J3i19dukhYQ3QYhuA1ElbR3o6Qr_25df988RN8kxMCxnqMFQq8xBG6DUKXs3yjlyr93o78AU5EmcpMKo9u3ld_fIn4S5X2fyq1aigcll8U25eOk0BUvYpFm5VdePgw35acc8K2YoWIPF3NU7J-I2bnYu0bs42dmOHeAqju4uI2Ku7i4c82hu9eIe6iYaRJjeMmYrmV4zTTo7KqeMbxoLNCg41VjbY0c75qOHS-bTo23jfU06HjfeEsjxxunYed44c7Vi-OHfwEAAP__HtgCVw==
https://cockroachdb.github.io/distsqlplan/decode.html#eJysVk2L2kAYvvdXDO-ppROSmcSvnHbbtSBY3aqFliWH1AyL4GbSSQK64n8vxpWtkrwz-TgmmWeeT8QDpH-34MNyPB1_XZFcbcm3xfw7eRr_epzeT2bk48NkuVr-mH4ib0d25wO7_Sv5OZvMZ-R-Oi35Nl88jBfky2-yC4BCLCMxC19ECv4TMKDAgYILFDyg0IOAQqLkWqSpVKcjhwIwiXbgOxQ2cZJnp9cBhbVUAvwDZJtsK8CHVfhnKxYijISyHaAQiSzcbAuaRG1eQrW_2-1fgcIyCePUJ5Z9Yp7nmU_uGARHCjLP3q9Ps_BZgM-OtJmEXmcSeKWEd-Y8lioSSkRXrMGxRORMWjKxmXNzspzbqWN_KVUmlM1urN-xz5XeXANvZc7eLq20x83seVf0zLxdZtCuzS3brT2xGiL6HYrglSLajIw1GJkmgMvI-k1Hxs0D5iYBu5bt1W65hohBhyJ4pYg2LfMGLWsCuLQ8aNqyax6waxKwZxW_6PVariFi2KEIXimiTctug5Y1AVxaHjZt2TMP2DMJuGfV7riGhFFnEnilhDYNew0a1ti_NDzq4i9Byf0LkSYyToWh8oCCiJ7FOahU5motHpVcFzTnx3mBK15EIs3OX_n5YRIXn4r-_wezNmCOgt0rsHMLdlGwhzN7uGyGU_dQ9ACn7rcBD1DwEJc9bJPYCAUzppkJPjItHJ8Z47hxhg9N45zhS2M9DRzfmhaOj431Ndbxuems43tjIw0cX5wWjm-OO7h1jm_u1npw_PAvAAD__xq8NpI=

query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT x FROM xyz UNION SELECT x FROM xyz ORDER BY x]
Expand Down Expand Up @@ -63,7 +63,7 @@ https://cockroachdb.github.io/distsqlplan/decode.html#eJyklF-Lm0AUxd_7KYb71MIVnd
query T
SELECT url FROM [EXPLAIN (DISTSQL) (SELECT x FROM xyz ORDER BY y) UNION ALL (SELECT x FROM xyz ORDER BY z) ORDER BY x]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJy0ll1v2j4Yxe__n8J6ror-joKdhJdc0a1MQmLQAZM2VVxkxKqQKM6cIPEivvtEsoiB2sd2CHdN4nP883mOag6Q_l5BCNP-sP95RjZqRb5Mxl_JS__H8_BxMCIPT4PpbPpt2CAPf9dsixXb3Z6MJ0_9Cfn0k-wa5PtoMB6Rx-EQXbhvnP_ezoHCWsZiFL2JFMIXYECBAwUPKPhAIYA5hUTJhUhTqU5LDrlgEG8hbFJYrpNNdno9p7CQSkB4gGyZrQSEMIt-rcRERLFQbhMoxCKLlqt8m0Qt3yK16213e6AwTaJ1GhLHPe083mQh6TGYHynITXa2T7PoVUDIjrQaQlAbArdBmEqVCeWyqwR67P8P_b0q_kEd_mdbqWKhRPye6TsQI-nIxOWXZ_xoe_9ie2Y-QWYwQZc7rmddIwuIVo0Q3AaiHDSrOmhD_1ZVf24eIjcJ0XNc33qSFhDtGiG4DUSZNL9T0qV_u6q_Zx6iZxKi7-T_fe0maQHRqRGC20CUSXt3Srr071T1981D9E1CDBzrOVogdGtD4DYIZcr-nVIu_bt3-gkwEWki16kwun2bp_tbxK-iuO9TuVEL8azkIt-meBznuvxFLNKs-MqLh8G6-HQC_FfMULGHizkq9i_E7Frs3SL28TMznDtA1W1c3ELFHVzcvuXQnVvEXVTMNIkxvGRM1zK8Zhp0dlPPGF40FmjQ8aqxlkaOd03HjpdNp8bbxroadLxvvKmR443TsHO8cNfq-fG_PwEAAP__epsI2g==
https://cockroachdb.github.io/distsqlplan/decode.html#eJysVttq2zAYvt9TiP-qZTK2ZOfkq3ZrBoEs6ZIONoovvFiUQGp5sg05kHcfcRqyhPiXfLirbX36jpTsIP27Ah_mw_Hw6wvJ1Yp8m02_k9fhr-fx42hC7p5G85f5j_E9ufs4sz6eWG-2ZDp7Gs7Il99kc09-TkbTCXkcj9GD2_vz3-sAKMQyEpPwXaTgvwIDChwouEDBAwodCCgkSi5Emkp1OLIrAKNoDb5DYRkneXZ4HVBYSCXA30G2zFYCfHgJ_6zETISRULYDFCKRhctVQZOo5XuoNg_rzRYozJMwTn1i2QfmaZ755IFBsKcg8-x8fZqFbwJ8tqf1JHRak8BLJZyZ81iqSCgRXbAG-xsiJ9KSic2cq5O3uZ0q9udSZULZ7Mr6A_tc6s018HbL2celpfa4mT3vgp6Zt8sM2rW5ZbuVJ1ZBRLdFEbxURJORsRoj0wRwGlm37si4ecDcJGDXsr3KLVcQ0WtRBC8V0aRlXqNlTQCnlnt1W3bNA3ZNAvas4j96tZYriOi3KIKXimjSslujZU0Ap5b7dVv2zAP2TALuWJU7riBh0JoEXiqhScNejYY19k8ND9r4SXDj_plIExmnwlB5QEFEb-IYVCpztRDPSi4KmuPjtMAVLyKRZsev_PgwiotPRf__g1kTMEfB7gXYuQa7KNjDmT1cNsOpOyi6h1N3m4B7KLiPy-43SWyAghnTzAQfmRaOz4xx3DjDh6ZxzvClsY4Gjm9NC8fHxroa6_jcdNbxvbGBBo4vTgvHN8cd3DrHN3dtPdh_-hcAAP__Geg9FQ==

query T
SELECT url FROM [EXPLAIN (DISTSQL) (SELECT x FROM xyz ORDER BY y) UNION (SELECT x FROM xyz ORDER BY z) ORDER BY x]
Expand All @@ -74,7 +74,7 @@ https://cockroachdb.github.io/distsqlplan/decode.html#eJyslk1v2kwUhffvr7DuKtE7yN
query T
SELECT url FROM [EXPLAIN (DISTSQL) (SELECT x FROM xyz ORDER BY y) UNION ALL (SELECT x FROM xyz ORDER BY y, z) ORDER BY x]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJy0lk2P2jwUhffvr7DuatDrKNhJ-MiKaYdKSBSmQKVWIxYpsUZIDE6dIPEh_ntF0oiCZq7tEJZJfI4fn3uEOUD6ewUhTPvD_ucZ2agV-TIZfyUv_R_Pw8fBiDw8Daaz6bdhgzz8XbMtVmx3ezKePPUn5NNPsmuQ76PBeEQeh0N8ISX7xvlxOwcKaxmLUfQmUghfgAEFDhQ8oOADhQDmFBIlFyJNpTotOeSCQbyFsElhuU422en1nMJCKgHhAbJlthIQwiz6tRITEcVCuU2gEIssWq7ybRK1fIvUrrfd7YHCNInWaUgc97TzeJOFpMdgfqQgN9nZPs2iVwEhO9JqCEFtCNwGYSpVJpTLrhLosf8_9Peq-Ad1-J9tpYqFEvF7pu9AjKQjE5dfnvGj7f2L7Zn5BJnBBF3uuJ51jSwgWjVCcBuIctCs6qAN_VtV_bl5iNwkRM9xfetJWkC0a4TgNhBl0vxOSZf-7ar-nnmInkmIvpP_-tpN0gKiUyMEt4Eok_bulHTp36nq75uH6JuEGDjWc7RA6NaGwG0QypT9O6Vc-nfv9BdgItJErlNhdPs2T_e3iF9Fcd-ncqMW4lnJRb5N8TjOdfmLWKRZ8ZUXD4N18ekE-K-YoWIPF3NU7F-I2bXYu0Xs42dmOHeAqtu4uIWKO7i4fcuhO7eIu6iYaRJjeMmYrmV4zTTo7KaeMbxoLNCg41VjLY0c75qOHS-bTo23jXU16HjfeFMjxxunYed44a7V8-N_fwIAAP__gCgJnw==
https://cockroachdb.github.io/distsqlplan/decode.html#eJysVtuK2kAYvu9TDP_VLp2QzCSecrXbrgXB6la30LLkIjXDIriZdJKAB3z3YlyxivlncrhMMt98R8QdpH9X4MN8OB5-fSG5WpFvs-l38jr89Tx-HE3I3dNo_jL_Mb4ndx9n1scT682WTGdPwxn58pts7snPyWg6IY_jMX6Qku39-XEdAIVYRmISvosU_FdgQIEDBRcoeEChAwGFRMmFSFOpDkd2BWAUrcF3KCzjJM8OrwMKC6kE-DvIltlKgA8v4Z-VmIkwEsp2gEIksnC5KmgStXwP1eZhvdkChXkSxqlPLPvAPM0znzwwCPYUZJ6dr0-z8E2Az_a0noROaxJ4qYQzcx5LFQklogvWYH9D5ERaMrGZc3XyNrdTxf5cqkwom11Zf2CfS725Bt5uOfu4tNQeN7PnXdAz83aZQbs2t2y38sQqiOi2KIKXimgyMlZjZJoATiPr1h0ZNw-YmwTsWrZXueUKInotiuClIpq0zGu0rAng1HKvbsuuecCuScCeVfyiV2u5goh-iyJ4qYgmLbs1WtYEcGq5X7dlzzxgzyTgjlW54woSBq1J4KUSmjTs1WhYY__U8KCNvwQ37p-JNJFxKgyVBxRE9CaOQaUyVwvxrOSioDk-Tgtc8SISaXb8yo8Po7j4VPT_P5g1AXMU7F6AnWuwi4I9nNnDZTOcuoOiezh1twm4h4L7uOx-k8QGKJgxzUzwkWnh-MwYx40zfGga5wxfGuto4PjWtHB8bKyrsY7PTWcd3xsbaOD44rRwfHPcwa1zfHPX1oP9p38BAAD__42APdo=

# Only one distinct processor should be used in the single node UNION case.
query T
Expand Down
62 changes: 62 additions & 0 deletions pkg/sql/physicalplan/physical_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -1102,3 +1103,64 @@ func (p *PhysicalPlan) AddDistinctSetOpStage(
p.ResultRouters = append(p.ResultRouters, pIdx)
}
}

// EnsureSingleStreamPerNode goes over the ResultRouters and merges any group of
// routers that are on the same node, using a no-op processor.
//
// TODO(radu): a no-op processor is not ideal if the next processor is on the
// same node. A fix for that is much more complicated, requiring remembering
// extra state in the PhysicalPlan.
func (p *PhysicalPlan) EnsureSingleStreamPerNode() {
// Fast path - check if we need to do anything.
var nodes util.FastIntSet
var foundDuplicates bool
for _, pIdx := range p.ResultRouters {
proc := &p.Processors[pIdx]
if nodes.Contains(int(proc.Node)) {
foundDuplicates = true
break
}
nodes.Add(int(proc.Node))
}
if !foundDuplicates {
return
}
streams := make([]ProcessorIdx, 0, 2)

for i := 0; i < len(p.ResultRouters); i++ {
pIdx := p.ResultRouters[i]
node := p.Processors[p.ResultRouters[i]].Node
streams = append(streams[:0], pIdx)
// Find all streams on the same node.
for j := i + 1; j < len(p.ResultRouters); {
if p.Processors[p.ResultRouters[j]].Node == node {
streams = append(streams, p.ResultRouters[j])
// Remove the stream.
copy(p.ResultRouters[j:], p.ResultRouters[j+1:])
p.ResultRouters = p.ResultRouters[:len(p.ResultRouters)-1]
} else {
j++
}
}
if len(streams) == 1 {
// Nothing to do for this node.
continue
}

// Merge the streams into a no-op processor.
proc := Processor{
Node: node,
Spec: execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{
// The other fields will be filled in by MergeResultStreams.
ColumnTypes: p.ResultTypes,
}},
Core: execinfrapb.ProcessorCoreUnion{Noop: &execinfrapb.NoopCoreSpec{}},
Output: []execinfrapb.OutputRouterSpec{{Type: execinfrapb.OutputRouterSpec_PASS_THROUGH}},
},
}
mergedProcIdx := p.AddProcessor(proc)
p.MergeResultStreams(streams, 0 /* sourceRouterSlot */, p.MergeOrdering, mergedProcIdx, 0 /* destInput */)
p.ResultRouters[i] = mergedProcIdx
}
}

0 comments on commit e6e9161

Please sign in to comment.