Skip to content

Commit

Permalink
[release-16.0] vtgate/engine: Fix race condition in join logic (#14435)…
Browse files Browse the repository at this point in the history
… (#14439)

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
  • Loading branch information
vitess-bot[bot] authored Nov 10, 2023
1 parent 883b0f2 commit 13c21fd
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
39 changes: 28 additions & 11 deletions go/vt/vtgate/engine/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -96,22 +97,31 @@ func (jn *Join) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[st

// TryStreamExecute performs a streaming exec.
func (jn *Join) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
var fieldNeeded atomic.Bool
fieldNeeded.Store(wantfields)
err := vcursor.StreamExecutePrimitive(ctx, jn.Left, bindVars, fieldNeeded.Load(), func(lresult *sqltypes.Result) error {
var mu sync.Mutex
// We need to use this atomic since we're also reading this
// value outside of it being locked with the mu lock.
// This is still racy, but worst case it means that we may
// retrieve the right hand side fields twice instead of once.
var fieldsSent atomic.Bool
fieldsSent.Store(!wantfields)
err := vcursor.StreamExecutePrimitive(ctx, jn.Left, bindVars, wantfields, func(lresult *sqltypes.Result) error {
joinVars := make(map[string]*querypb.BindVariable)
for _, lrow := range lresult.Rows {
for k, col := range jn.Vars {
joinVars[k] = sqltypes.ValueBindVariable(lrow[col])
}
var rowSent atomic.Bool
err := vcursor.StreamExecutePrimitive(ctx, jn.Right, combineVars(bindVars, joinVars), fieldNeeded.Load(), func(rresult *sqltypes.Result) error {
err := vcursor.StreamExecutePrimitive(ctx, jn.Right, combineVars(bindVars, joinVars), !fieldsSent.Load(), func(rresult *sqltypes.Result) error {
// This needs to be locking since it's not safe to just use
// fieldsSent. This is because we can't have a race between
// checking fieldsSent and then actually calling the callback
// and in parallel another goroutine doing the same. That
// can lead to out of order execution of the callback. So the callback
// itself and the check need to be covered by the same lock.
mu.Lock()
defer mu.Unlock()
result := &sqltypes.Result{}
if fieldNeeded.Load() {
// This code is currently unreachable because the first result
// will always be just the field info, which will cause the outer
// wantfields code path to be executed. But this may change in the future.
fieldNeeded.Store(false)
if fieldsSent.CompareAndSwap(false, true) {
result.Fields = joinFields(lresult.Fields, rresult.Fields, jn.Cols)
}
for _, rrow := range rresult.Rows {
Expand All @@ -135,8 +145,15 @@ func (jn *Join) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars
return callback(result)
}
}
if fieldNeeded.Load() {
fieldNeeded.Store(false)
// This needs to be locking since it's not safe to just use
// fieldsSent. This is because we can't have a race between
// checking fieldsSent and then actually calling the callback
// and in parallel another goroutine doing the same. That
// can lead to out of order execution of the callback. So the callback
// itself and the check need to be covered by the same lock.
mu.Lock()
defer mu.Unlock()
if fieldsSent.CompareAndSwap(false, true) {
for k := range jn.Vars {
joinVars[k] = sqltypes.NullBindVariable
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/scalar_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func (sa *ScalarAggregate) TryStreamExecute(ctx context.Context, vcursor VCursor
var current []sqltypes.Value
var curDistincts []sqltypes.Value
var fields []*querypb.Field
fieldsSent := false
var mu sync.Mutex
fieldsSent := !wantfields

err := vcursor.StreamExecutePrimitive(ctx, sa.Input, bindVars, wantfields, func(result *sqltypes.Result) error {
// as the underlying primitive call is not sync
Expand Down

0 comments on commit 13c21fd

Please sign in to comment.