Skip to content

Commit

Permalink
Merge #41010 #41250
Browse files Browse the repository at this point in the history
41010: roachtest: remove wait loop in backup2TB roachtest r=pbardea a=pbardea

Previously a wait loop was needed in the backup2TB roachtest because the
test was reporting the table as offline when it shouldn't have seen it
as OFFLINE. This was fixed by #40996, and therefore we should no longer
need this wait loop.

Closes #36841.

Release justification: Only touches tests.

Release note: None

41250: opt: map and push down equality conditions r=rytaft a=rytaft

This commit adds a new normalization rule to enable pushing variable
equality conditions such as `a.x=b.x` through joins.

For example, consider this query:

  `SELECT * FROM a, b, c WHERE a.x=b.x AND a.x=c.x`

Given join ordering `(a join (b join c))`, it should be possible to infer the
filter `b.x=c.x` and push it down from the top level onto the join `(b join c)`.
This commit enables that mapping and pushdown to happen.

In addition, this commit updates the `AssociateJoin` rule to map as many
equality conditions as possible to use the output columns of the new inner-most
join, allowing those conditions to be pushed onto that join.

For example, consider this query:

  `SELECT * FROM a, b, c WHERE a.x=b.x AND b.x=c.x`

If the AssociateJoin rule creates a new join ordering `(b join (a join c))`,
it should be possible to map `a.x=b.x` to `a.x=c.x` and add it onto the new
inner-most join `(a join c)`. This commit enables that mapping to happen.

Fixes #38716
Fixes #36226

Release note (performance improvement): Improved performance for some join
queries due to improved filter inference during planning.
Release justification: This commit will not be merged before the release
branch is cut.

Co-authored-by: Paul Bardea <pbardea@gmail.com>
Co-authored-by: Rebecca Taft <becca@cockroachlabs.com>
  • Loading branch information
3 people committed Oct 8, 2019
3 parents 7f8687b + db52476 + df6d188 commit 53ae9e5
Show file tree
Hide file tree
Showing 10 changed files with 866 additions and 204 deletions.
16 changes: 0 additions & 16 deletions pkg/cmd/roachtest/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,6 @@ func registerBackup(r *testRegistry) {
"--db=bank", "--payload-bytes=10240", "--ranges=0", "--csv-server", "http://localhost:8081",
fmt.Sprintf("--rows=%d", rows), "--seed=1", "{pgurl:1}")

// NB: without this delay, the BACKUP operation sometimes claims that
// bank.bank doesn't exist, probably due to some gossip propagation
// delay.
//
// See https://github.com/cockroachdb/cockroach/issues/36841.
for i := 0; i < 5; i++ {
_, err := c.Conn(ctx, 1).ExecContext(ctx, "SELECT * FROM bank.bank LIMIT 1")
if err != nil {
c.l.Printf("%s", err)
time.Sleep(time.Second)
continue
}
c.l.Printf("found the table")
break
}

m := newMonitor(ctx, c)
m.Go(func(ctx context.Context) error {
t.Status(`running backup`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,4 +464,4 @@ SELECT url FROM [EXPLAIN (DISTSQL)
JOIN grandchild1 USING (pid1, cid1)
]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzUll2P4jYUhu_7K9zTm5nWyLHzsRCpUqp22rKisIVZqdIqF1nihUjZhDqh6mjEf69CmBI-xmeCRSTuCPZj-9jPkd5nKP5OwYfZw-jh50eyVin5dTr5g3x6-OvD6KfhmNz9Mpw9zv4c3ZPdlO_rCatIyazk5P1kOCbzZZLGnHycDce_kbtVEvP7emChoiw-HaVkXs0JgUKWx3IcfZUF-J-AAwUBFGyg4AAFF0IKK5XPZVHkqpryvAWG8b_gWxSSbLUuq79DCvNcSfCfoUzKVIIPw6yUKpXRP3Iqo1iq93mSScUsoBDLMkrS7Y4j-aWEao_ka6Segl1VQGGaLJbNkboIoFCvAxRS-aW8C_gP9z-qau72J1CYrEufBJwGggY2DRwauBBuKOTrcnfY_Rk_P5FlVCwPj1SxDoSbkEJRRgsJPt_Qt9f9GH1OdyUz93Dll1oarwIUZqsoK3zSY4J9x1yPceYIJti344-jEXP7jDNBoiwmNsnLpVRF-2rEQTXi1Wr2S62zXMVSyfhgsbAisSlnruT3qFjunp8fvf_uEWng7J-RBuLgIZ3tW7o06NNgcFT9vizboKwzZx7nvXzFuHt8AWf3dg725hc1Br_5xkDqbjaG16IxNH3RY0415lZjzhWbhHfcJLybJhEXiSpuXlSk7qao79qI2pCxx7z_rfWY17TWu6KoomNRRTei2heJat-8qEjdTVH7bUR93c0e619RT7tjPe1u9HQu0tO5eT2Rupt6Dtro2e8xbr0o6lrVR8NRbl3RUKdjQ53u4_CZE01lscqzQr4p7FpVTTJeyPqOinyt5vKDyufbberPyZbbBqdYFmU9KuqPYVYPVQd8O-yZwAMTmBudm7t6mre4MtEO9kzggQnMjc59dGUntDimrSZt6-_b1sL88M6sY9oxEVwPI4LrYURwPYwJjtCI4K6J4HoYEVwPI4LrYUxwhEYE90wEf2eiqB5GFNXDiKJ6GFMUoRFF-yaK6mFEUT2MKKqHMUURGlF0YKIoN8oJCI1IitCIpQiNaYrhWFYwCwtmacEsLhjmBbPAwI0SAz-JDK1s1dOYrXoas1VPo7YiOGZrm7B0-mZt0lJbGrO1VV5qjWO2noQHra3h5pv_AgAA___wPy5O
https://cockroachdb.github.io/distsqlplan/decode.html#eJzUlU9r20AQxe_9FMuckmaKvZL8T1BQad2i4NqpnUIh6CCkiaNW2VVXq9Ji_N2LpDSRXXcVbHDxcTXz9H7zZmFXkH9PwYXFeDJ-e80KlbL389lHdjP-cjV540_Z2Tt_cb34NDlnDy0v64YsVCQ0Z5czf8qiuySNOfu88Kcf2FmWxPy8LixVKOK_q8iisicABCFjmob3lIN7AxwQLECwAcEBhB4ECJmSEeW5VGXLqhL48U9wuwiJyApdfg4QIqkI3BXoRKcELvhCk0op_EFzCmNSlzIRpDpdQIhJh0laOU7oVkPpkdyH6pdXowLCPFneNQuNQQCh_hcgpHSrzzx-gZ51cf5alaLHIyDMCu0yj6NnoWej56DXQ6-P3gCCNYIs9BN-rsMlgcvX-PwRS4x6uE5vc6w_2A9bAoSJlN-KjH2ViWBSlFSPfMOKb1QhGuGsf8I9MRVCqpgUxRtAwXoH_lS-klmHd7c6d3vbG958r93zU9p9y4iN3fePv3trr_ytU8q_ZcRG_oPj52_vlb99Svm3jNjIf3j8_J298ndOKf-WERv5j_7v27MDbk55JkVOz3pZuuXbRPGS6ocsl4WK6ErJqLKpj7NKV32IKdd1ldcHX9SlErAp5kaxtSHm22LL7NxibRvVjlnsHMLdM4r7Zuf-Ic4Do3hodh4e4jwy76rbck3Ml2zbO1i_-B0AAP__qaW5dA==
51 changes: 25 additions & 26 deletions pkg/sql/opt/exec/execbuilder/testdata/join
Original file line number Diff line number Diff line change
Expand Up @@ -93,32 +93,31 @@ EXPLAIN SELECT * FROM
JOIN twocolumn AS c (d, e) ON a.b = c.d AND c.d = onecolumn.x
LIMIT 1
----
· distributed false
· vectorized false
render · ·
└── limit · ·
│ count 1
└── hash-join · ·
│ type inner
│ equality (x) = (x)
├── hash-join · ·
│ │ type inner
│ │ equality (x) = (x)
│ ├── scan · ·
│ │ table twocolumn@primary
│ │ spans ALL
│ └── scan · ·
│ table onecolumn@primary
│ spans ALL
└── hash-join · ·
│ type inner
│ equality (x) = (x)
├── scan · ·
│ table onecolumn@primary
│ spans ALL
└── scan · ·
· table twocolumn@primary
· spans ALL
· distributed false
· vectorized false
limit · ·
│ count 1
└── hash-join · ·
│ type inner
│ equality (x) = (x)
├── hash-join · ·
│ │ type inner
│ │ equality (x) = (x)
│ ├── scan · ·
│ │ table onecolumn@primary
│ │ spans ALL
│ └── scan · ·
│ table twocolumn@primary
│ spans ALL
└── hash-join · ·
│ type inner
│ equality (x) = (x)
├── scan · ·
│ table onecolumn@primary
│ spans ALL
└── scan · ·
· table twocolumn@primary
· spans ALL

# The following queries verify that only the necessary columns are scanned.
query TTTTT
Expand Down
63 changes: 28 additions & 35 deletions pkg/sql/opt/exec/execbuilder/testdata/update_from
Original file line number Diff line number Diff line change
Expand Up @@ -155,39 +155,32 @@ CREATE TABLE ac (a INT, c INT)
query TTT
EXPLAIN UPDATE abc SET b = ab.b, c = ac.c FROM ab, ac WHERE abc.a = ab.a AND abc.a = ac.a
----
· distributed false
· vectorized false
count · ·
└── update · ·
│ table abc
│ set b, c
│ strategy updater
└── render · ·
└── distinct · ·
│ distinct on a
│ order key a
└── merge-join · ·
│ type inner
│ equality (a) = (a)
│ mergeJoinOrder +"(a=a)"
├── merge-join · ·
│ │ type inner
│ │ equality (a) = (a)
│ │ left cols are key ·
│ │ mergeJoinOrder +"(a=a)"
│ ├── scan · ·
│ │ table abc@primary
│ │ spans ALL
│ └── sort · ·
│ │ order +a
│ └── scan · ·
│ table ac@primary
│ spans ALL
└── sort · ·
│ order +a
└── scan · ·
· table ab@primary
· spans ALL
· distributed false
· vectorized false
count · ·
└── update · ·
│ table abc
│ set b, c
│ strategy updater
└── render · ·
└── distinct · ·
│ distinct on a
└── hash-join · ·
│ type inner
│ equality (a) = (a)
├── scan · ·
│ table ab@primary
│ spans ALL
└── hash-join · ·
│ type inner
│ equality (a) = (a)
│ right cols are key ·
├── scan · ·
│ table ac@primary
│ spans ALL
└── scan · ·
· table abc@primary
· spans ALL

# Make sure UPDATE ... FROM works with LATERAL.
query TTT
Expand Down Expand Up @@ -216,14 +209,14 @@ run · ·
│ type inner
│ equality (a) = (a)
├── scan · ·
│ table ac@primary
│ table ab@primary
│ spans ALL
└── hash-join · ·
│ type inner
│ equality (a) = (a)
│ right cols are key ·
├── scan · ·
│ table ab@primary
│ table ac@primary
│ spans ALL
└── scan · ·
· table abc@primary
Expand Down
177 changes: 171 additions & 6 deletions pkg/sql/opt/norm/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,171 @@ func (c *CustomFuncs) SimplifyNotNullEquality(
panic(errors.AssertionFailedf("invalid ops: %v, %v", testOp, constOp))
}

// CanMapJoinOpEqualities checks whether it is possible to map equality
// conditions in a join to use different variables so that the number of
// conditions crossing both sides of a join are minimized.
// See canMapJoinOpEquivalenceGroup for details.
func (c *CustomFuncs) CanMapJoinOpEqualities(
filters memo.FiltersExpr, leftCols, rightCols opt.ColSet,
) bool {
var equivFD props.FuncDepSet
for i := range filters {
equivFD.AddEquivFrom(&filters[i].ScalarProps(c.mem).FuncDeps)
}
equivReps := equivFD.EquivReps()

for col, ok := equivReps.Next(0); ok; col, ok = equivReps.Next(col + 1) {
if c.canMapJoinOpEquivalenceGroup(filters, col, leftCols, rightCols) {
return true
}
}

return false
}

// canMapJoinOpEquivalenceGroup checks whether it is possible to map equality
// conditions in a join that form an equivalence group to use different
// variables so that the number of conditions crossing both sides of a join
// are minimized.
//
// Specifically, it finds the set of columns containing col that forms an
// equivalence group in filters. It splits that group into columns from
// the left and right sides of the join, and checks whether there are multiple
// equality conditions in filters that connect the two groups. If so,
// canMapJoinOpEquivalenceGroup returns true.
func (c *CustomFuncs) canMapJoinOpEquivalenceGroup(
filters memo.FiltersExpr, col opt.ColumnID, leftCols, rightCols opt.ColSet,
) bool {
eqCols := c.GetEquivColsWithEquivType(col, filters)

// To map equality conditions, the equivalent columns must intersect
// both sides and must be fully bound by both sides.
if !(eqCols.Intersects(leftCols) &&
eqCols.Intersects(rightCols) &&
eqCols.SubsetOf(leftCols.Union(rightCols))) {
return false
}

// If more than one equality condition connecting columns in the equivalence
// group spans both sides of the join, these conditions can be remapped.
found := 0
for i := range filters {
fd := &filters[i].ScalarProps(c.mem).FuncDeps
filterEqCols := fd.ComputeEquivClosure(fd.EquivReps())
if filterEqCols.Intersects(leftCols) && filterEqCols.Intersects(rightCols) &&
filterEqCols.SubsetOf(eqCols) {
found++
if found > 1 {
return true
}
}
}

return false
}

// MapJoinOpEqualities maps all variable equality conditions in filters to
// use columns in either leftCols or rightCols where possible. See
// canMapJoinOpEquivalenceGroup and mapJoinOpEquivalenceGroup for more info.
func (c *CustomFuncs) MapJoinOpEqualities(
filters memo.FiltersExpr, leftCols, rightCols opt.ColSet,
) memo.FiltersExpr {
var equivFD props.FuncDepSet
for i := range filters {
equivFD.AddEquivFrom(&filters[i].ScalarProps(c.mem).FuncDeps)
}
equivReps := equivFD.EquivReps()

newFilters := filters
equivReps.ForEach(func(col opt.ColumnID) {
if c.canMapJoinOpEquivalenceGroup(newFilters, col, leftCols, rightCols) {
newFilters = c.mapJoinOpEquivalenceGroup(newFilters, col, leftCols, rightCols)
}
})

return newFilters
}

// mapJoinOpEquivalenceGroup maps equality conditions in a join that form an
// equivalence group to use different variables so that the number of
// conditions crossing both sides of a join are minimized. This is useful for
// creating additional filter conditions that can be pushed down to either side
// of the join.
//
// To perform the mapping, mapJoinOpEquivalenceGroup finds the set of columns
// containing col that forms an equivalence group in filters. The result is
// a set of columns that are all equivalent, some on the left side of the join
// and some on the right side. mapJoinOpEquivalenceGroup constructs a new set of
// equalities that implies the same equivalency group, with the property that
// there is a single condition with one left column and one right column.
// For example, consider this query:
//
// SELECT * FROM a, b WHERE a.x = b.x AND a.x = a.y AND a.y = b.y
//
// It has an equivalence group {a.x, a.y, b.x, b.y}. The columns a.x and a.y
// are on the left side, and b.x and b.y are on the right side. Initially there
// are two conditions that cross both sides. After mapping, the query would be
// converted to:
//
// SELECT * FROM a, b WHERE a.x = a.y AND b.x = b.y AND a.x = b.x
//
func (c *CustomFuncs) mapJoinOpEquivalenceGroup(
filters memo.FiltersExpr, col opt.ColumnID, leftCols, rightCols opt.ColSet,
) memo.FiltersExpr {
eqCols := c.GetEquivColsWithEquivType(col, filters)

// First remove all the equality conditions for this equivalence group.
newFilters := make(memo.FiltersExpr, 0, len(filters))
for i := range filters {
fd := &filters[i].ScalarProps(c.mem).FuncDeps
filterEqCols := fd.ComputeEquivClosure(fd.EquivReps())
if !filterEqCols.Empty() && filterEqCols.SubsetOf(eqCols) {
continue
}
newFilters = append(newFilters, filters[i])
}

// Now append new equality conditions that imply the same equivalency group,
// but only one condition should contain columns from both sides.
leftEqCols := leftCols.Intersection(eqCols)
rightEqCols := rightCols.Intersection(eqCols)
firstLeftCol, ok := leftEqCols.Next(0)
if !ok {
panic(errors.AssertionFailedf(
"mapJoinOpEquivalenceGroup called with equivalence group that does not intersect both sides",
))
}
firstRightCol, ok := rightEqCols.Next(0)
if !ok {
panic(errors.AssertionFailedf(
"mapJoinOpEquivalenceGroup called with equivalence group that does not intersect both sides",
))
}

// Connect all the columns on the left.
for col, ok := leftEqCols.Next(firstLeftCol + 1); ok; col, ok = leftEqCols.Next(col + 1) {
newFilters = append(newFilters, memo.FiltersItem{
Condition: c.f.ConstructEq(c.f.ConstructVariable(firstLeftCol), c.f.ConstructVariable(col)),
})
}

// Connect all the columns on the right.
for col, ok := rightEqCols.Next(firstRightCol + 1); ok; col, ok = rightEqCols.Next(col + 1) {
newFilters = append(newFilters, memo.FiltersItem{
Condition: c.f.ConstructEq(c.f.ConstructVariable(firstRightCol), c.f.ConstructVariable(col)),
})
}

// Connect the two sides.
newFilters = append(newFilters, memo.FiltersItem{
Condition: c.f.ConstructEq(
c.f.ConstructVariable(firstLeftCol), c.f.ConstructVariable(firstRightCol),
),
})

return newFilters
}

// CanMapJoinOpFilter returns true if it is possible to map a boolean expression
// src, which is a conjunct in the given filters expression, to use the output
// columns of the relational expression dst.
Expand Down Expand Up @@ -174,9 +339,8 @@ func (c *CustomFuncs) MapJoinOpFilter(
return src.Condition
}

// MapJoinOpFilter each column in src to one column in dst. We choose an
// arbitrary column (the one with the smallest ColumnID) if there are multiple
// choices.
// Map each column in src to one column in dst. We choose an arbitrary column
// (the one with the smallest ColumnID) if there are multiple choices.
var colMap util.FastIntMap
outerCols := src.ScalarProps(c.mem).OuterCols
for srcCol, ok := outerCols.Next(0); ok; srcCol, ok = outerCols.Next(srcCol + 1) {
Expand All @@ -188,7 +352,7 @@ func (c *CustomFuncs) MapJoinOpFilter(
dstCol, ok := eqCols.Next(0)
if !ok {
panic(errors.AssertionFailedf(
"Map called on src that cannot be mapped to dst. src:\n%s\ndst:\n%s",
"MapJoinOpFilter called on src that cannot be mapped to dst. src:\n%s\ndst:\n%s",
src, dst,
))
}
Expand Down Expand Up @@ -261,10 +425,11 @@ func (c *CustomFuncs) GetEquivColsWithEquivType(
}

// Compute all equivalent columns.
eqCols := opt.MakeColSet(col)
var equivFD props.FuncDepSet
for i := range filters {
eqCols = filters[i].ScalarProps(c.mem).FuncDeps.ComputeEquivClosure(eqCols)
equivFD.AddEquivFrom(&filters[i].ScalarProps(c.mem).FuncDeps)
}
eqCols := equivFD.ComputeEquivGroup(col)

eqCols.ForEach(func(i opt.ColumnID) {
// Only include columns that have the same type as col.
Expand Down
Loading

0 comments on commit 53ae9e5

Please sign in to comment.