From 10f81dd794b7b720c4102649d8ad796a76d48c42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alfonso=20Subiotto=20Marqu=C3=A9s?= Date: Tue, 10 Sep 2019 15:22:31 +0200 Subject: [PATCH] distsqlrun: avoid flow.Waiting if called after a panic If a panic occurred in a flow, flow.Wait() would still be called since it was deferred in Run. This would result in unnecessarily Waiting for components that would never be signalled to exit through context cancellation (since the panicking goroutine could be the one responsible to do so). Release note: None --- pkg/sql/distsqlrun/flow.go | 11 ++++++ .../vectorized_panic_propagation_test.go | 39 +++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/pkg/sql/distsqlrun/flow.go b/pkg/sql/distsqlrun/flow.go index 5ae534d28e6d..ff7b2e4f9f15 100644 --- a/pkg/sql/distsqlrun/flow.go +++ b/pkg/sql/distsqlrun/flow.go @@ -591,6 +591,14 @@ func (f *Flow) Wait() { if !f.startedGoroutines { return } + + var panicVal interface{} + if panicVal = recover(); panicVal != nil { + // If Wait is called as part of stack unwinding during a panic, the flow + // context must be canceled to ensure that all asynchronous goroutines get + // the message that they must exit (otherwise we will wait indefinitely). + f.ctxCancel() + } waitChan := make(chan struct{}) go func() { @@ -605,6 +613,9 @@ func (f *Flow) Wait() { case <-waitChan: // Exit normally } + if panicVal != nil { + panic(panicVal) + } } // Releasable is an interface for objects than can be Released back into a diff --git a/pkg/sql/distsqlrun/vectorized_panic_propagation_test.go b/pkg/sql/distsqlrun/vectorized_panic_propagation_test.go index 2642c8255b0f..278abf9e576b 100644 --- a/pkg/sql/distsqlrun/vectorized_panic_propagation_test.go +++ b/pkg/sql/distsqlrun/vectorized_panic_propagation_test.go @@ -12,6 +12,7 @@ package distsqlrun import ( "context" + "sync" "testing" "github.com/cockroachdb/cockroach/pkg/col/coldata" @@ -117,6 +118,44 @@ func TestNonVectorizedPanicPropagation(t *testing.T) { require.Panics(t, func() { mat.Next() }, "NonVectorizedPanic was caught by the operators") } +// TestNonVectorizedPanicDoesntHangServer verifies that propagating a non +// vectorized panic doesn't result in a hang as described in: +// https://github.com/cockroachdb/cockroach/issues/39779 +func TestNonVectorizedPanicDoesntHangServer(t *testing.T) { + defer leaktest.AfterTest(t)() + + mat := &materializer{ + input: &exec.CallbackOperator{ + NextCb: func(ctx context.Context) coldata.Batch { + a := []int{0} + // Trigger an index out of bounds panic. + a[0] = a[100] + return nil + }, + }, + } + // Avoid uninitialized output panic. + mat.out.output = &RowBuffer{} + flow := &Flow{ + processors: []Processor{mat}, + // This test specifically verifies that a flow doesn't get stuck in Wait for + // asynchronous components that haven't been signaled to exit. To simulate + // this we just create a mock startable. + startables: []startable{ + startableFn(func(ctx context.Context, wg *sync.WaitGroup, _ context.CancelFunc) { + wg.Add(1) + go func() { + // Ensure context is canceled. + <-ctx.Done() + wg.Done() + }() + }), + }, + } + + require.Panics(t, func() { require.NoError(t, flow.Run(context.Background(), nil)) }) +} + // testVectorizedPanicEmitter is an exec.Operator that panics on every // odd-numbered invocation of Next() and returns the next batch from the input // on every even-numbered (i.e. it becomes a noop for those iterations). Used