Skip to content

Commit

Permalink
distsqlrun: avoid flow.Waiting if called after a panic
Browse files Browse the repository at this point in the history
If a panic occurred in a flow, flow.Wait() would still be called since
it was deferred in Run. This would result in unnecessarily Waiting for
components that would never be signalled to exit through context
cancellation (since the panicking goroutine could be the one responsible
to do so).

Release note: None
  • Loading branch information
asubiotto committed Sep 10, 2019
1 parent 1b75c93 commit 10f81dd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,14 @@ func (f *Flow) Wait() {
if !f.startedGoroutines {
return
}

var panicVal interface{}
if panicVal = recover(); panicVal != nil {
// If Wait is called as part of stack unwinding during a panic, the flow
// context must be canceled to ensure that all asynchronous goroutines get
// the message that they must exit (otherwise we will wait indefinitely).
f.ctxCancel()
}
waitChan := make(chan struct{})

go func() {
Expand All @@ -605,6 +613,9 @@ func (f *Flow) Wait() {
case <-waitChan:
// Exit normally
}
if panicVal != nil {
panic(panicVal)
}
}

// Releasable is an interface for objects than can be Released back into a
Expand Down
39 changes: 39 additions & 0 deletions pkg/sql/distsqlrun/vectorized_panic_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package distsqlrun

import (
"context"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
Expand Down Expand Up @@ -117,6 +118,44 @@ func TestNonVectorizedPanicPropagation(t *testing.T) {
require.Panics(t, func() { mat.Next() }, "NonVectorizedPanic was caught by the operators")
}

// TestNonVectorizedPanicDoesntHangServer verifies that propagating a non
// vectorized panic doesn't result in a hang as described in:
// https://github.com/cockroachdb/cockroach/issues/39779
func TestNonVectorizedPanicDoesntHangServer(t *testing.T) {
defer leaktest.AfterTest(t)()

mat := &materializer{
input: &exec.CallbackOperator{
NextCb: func(ctx context.Context) coldata.Batch {
a := []int{0}
// Trigger an index out of bounds panic.
a[0] = a[100]
return nil
},
},
}
// Avoid uninitialized output panic.
mat.out.output = &RowBuffer{}
flow := &Flow{
processors: []Processor{mat},
// This test specifically verifies that a flow doesn't get stuck in Wait for
// asynchronous components that haven't been signaled to exit. To simulate
// this we just create a mock startable.
startables: []startable{
startableFn(func(ctx context.Context, wg *sync.WaitGroup, _ context.CancelFunc) {
wg.Add(1)
go func() {
// Ensure context is canceled.
<-ctx.Done()
wg.Done()
}()
}),
},
}

require.Panics(t, func() { require.NoError(t, flow.Run(context.Background(), nil)) })
}

// testVectorizedPanicEmitter is an exec.Operator that panics on every
// odd-numbered invocation of Next() and returns the next batch from the input
// on every even-numbered (i.e. it becomes a noop for those iterations). Used
Expand Down

0 comments on commit 10f81dd

Please sign in to comment.