Skip to content

Commit

Permalink
Revert "Merge #41307"
Browse files Browse the repository at this point in the history
This reverts commit 1c99165, reversing
changes made to e6e9161.

Somehow #41307 broke some Jepsen tests. Reversing while I investigate.

Fixes #41376
Fixes #41364
Fixes #41363
Fixes #41362

Release justification: broke Jepsen

Release note: None
  • Loading branch information
andreimatei committed Oct 7, 2019
1 parent be51a00 commit e5fdda8
Show file tree
Hide file tree
Showing 18 changed files with 117 additions and 455 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func newSSTWriterProcessor(
settings: flowCtx.Cfg.Settings,
registry: flowCtx.Cfg.JobRegistry,
progress: spec.Progress,
db: flowCtx.EvalCtx.Txn.DB(),
}
if err := sp.out.Init(&execinfrapb.PostProcessSpec{}, sstOutputTypes, flowCtx.NewEvalCtx(), output); err != nil {
return nil, err
Expand Down Expand Up @@ -96,7 +97,6 @@ func (sp *sstWriter) OutputTypes() []types.T {
}

func (sp *sstWriter) Run(ctx context.Context) {
sp.db = sp.flowCtx.EvalCtx.Txn.DB()
sp.input.Start(ctx)

ctx, span := tracing.ChildSpan(ctx, "sstWriter")
Expand Down
78 changes: 0 additions & 78 deletions pkg/sql/colexec/serial_unordered_synchronizer.go

This file was deleted.

52 changes: 0 additions & 52 deletions pkg/sql/colexec/serial_unordered_synchronizer_test.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ type unorderedSynchronizerMsg struct {
b coldata.Batch
}

var _ Operator = &ParallelUnorderedSynchronizer{}
var _ execinfra.OpNode = &ParallelUnorderedSynchronizer{}
var _ Operator = &UnorderedSynchronizer{}

// ParallelUnorderedSynchronizer is an Operator that combines multiple Operator streams
// UnorderedSynchronizer is an Operator that combines multiple Operator streams
// into one.
type ParallelUnorderedSynchronizer struct {
type UnorderedSynchronizer struct {
inputs []Operator
// readNextBatch is a slice of channels, where each channel corresponds to the
// input at the same index in inputs. It is used as a barrier for input
Expand All @@ -60,39 +59,38 @@ type ParallelUnorderedSynchronizer struct {
done bool
zeroBatch coldata.Batch
// externalWaitGroup refers to the WaitGroup passed in externally. Since the
// ParallelUnorderedSynchronizer spawns goroutines, this allows callers to
// wait for the completion of these goroutines.
// UnorderedSynchronizer spawns goroutines, this allows callers to wait for
// the completion of these goroutines.
externalWaitGroup *sync.WaitGroup
// internalWaitGroup refers to the WaitGroup internally managed by the
// ParallelUnorderedSynchronizer. This will only ever be incremented by the
// ParallelUnorderedSynchronizer and decremented by the input goroutines. This
// allows the ParallelUnorderedSynchronizer to wait only on internal
// goroutines.
// UnorderedSynchronizer. This will only ever be incremented by the
// UnorderedSynchronizer and decremented by the input goroutines. This allows
// the UnorderedSynchronizer to wait only on internal goroutines.
internalWaitGroup *sync.WaitGroup
cancelFn context.CancelFunc
batchCh chan *unorderedSynchronizerMsg
errCh chan error
}

// ChildCount implements the execinfra.OpNode interface.
func (s *ParallelUnorderedSynchronizer) ChildCount() int {
func (s *UnorderedSynchronizer) ChildCount() int {
return len(s.inputs)
}

// Child implements the execinfra.OpNode interface.
func (s *ParallelUnorderedSynchronizer) Child(nth int) execinfra.OpNode {
func (s *UnorderedSynchronizer) Child(nth int) execinfra.OpNode {
return s.inputs[nth]
}

// NewParallelUnorderedSynchronizer creates a new ParallelUnorderedSynchronizer.
// On the first call to Next, len(inputs) goroutines are spawned to read each
// input asynchronously (to not be limited by a slow input). These will
// increment the passed-in WaitGroup and decrement when done. It is also
// guaranteed that these spawned goroutines will have completed on any error or
// zero-length batch received from Next.
func NewParallelUnorderedSynchronizer(
// NewUnorderedSynchronizer creates a new UnorderedSynchronizer. On the first
// call to Next, len(inputs) goroutines are spawned to read each input
// asynchronously (to not be limited by a slow input). These will increment
// the passed-in WaitGroup and decrement when done. It is also guaranteed that
// these spawned goroutines will have completed on any error or zero-length
// batch received from Next.
func NewUnorderedSynchronizer(
inputs []Operator, typs []coltypes.T, wg *sync.WaitGroup,
) *ParallelUnorderedSynchronizer {
) *UnorderedSynchronizer {
readNextBatch := make([]chan struct{}, len(inputs))
for i := range readNextBatch {
// Buffer readNextBatch chans to allow for non-blocking writes. There will
Expand All @@ -101,7 +99,7 @@ func NewParallelUnorderedSynchronizer(
}
zeroBatch := coldata.NewMemBatchWithSize(typs, 0)
zeroBatch.SetLength(0)
return &ParallelUnorderedSynchronizer{
return &UnorderedSynchronizer{
inputs: inputs,
readNextBatch: readNextBatch,
batches: make([]coldata.Batch, len(inputs)),
Expand All @@ -118,7 +116,7 @@ func NewParallelUnorderedSynchronizer(
}

// Init is part of the Operator interface.
func (s *ParallelUnorderedSynchronizer) Init() {
func (s *UnorderedSynchronizer) Init() {
for _, input := range s.inputs {
input.Init()
}
Expand All @@ -132,7 +130,7 @@ func (s *ParallelUnorderedSynchronizer) Init() {
// error on s.errCh, resulting in the first error pushed to be observed by the
// Next goroutine. Inputs are asynchronous so that the synchronizer is minimally
// affected by slow inputs.
func (s *ParallelUnorderedSynchronizer) init(ctx context.Context) {
func (s *UnorderedSynchronizer) init(ctx context.Context) {
ctx, s.cancelFn = contextutil.WithCancel(ctx)
for i, input := range s.inputs {
s.nextBatch[i] = func(input Operator, inputIdx int) func() {
Expand Down Expand Up @@ -201,7 +199,7 @@ func (s *ParallelUnorderedSynchronizer) init(ctx context.Context) {
}

// Next is part of the Operator interface.
func (s *ParallelUnorderedSynchronizer) Next(ctx context.Context) coldata.Batch {
func (s *UnorderedSynchronizer) Next(ctx context.Context) coldata.Batch {
if s.done {
// TODO(yuzefovich): do we want to be on the safe side and explicitly set
// the length here (and below) to 0?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestParallelUnorderedSynchronizer(t *testing.T) {
func TestUnorderedSynchronizer(t *testing.T) {
defer leaktest.AfterTest(t)()

const (
Expand All @@ -49,7 +49,7 @@ func TestParallelUnorderedSynchronizer(t *testing.T) {
}

var wg sync.WaitGroup
s := NewParallelUnorderedSynchronizer(inputs, typs, &wg)
s := NewUnorderedSynchronizer(inputs, typs, &wg)

ctx, cancelFn := context.WithCancel(context.Background())
var cancel bool
Expand Down Expand Up @@ -117,14 +117,14 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
ctx = context.Background()
wg sync.WaitGroup
)
s := NewParallelUnorderedSynchronizer(inputs, []coltypes.T{coltypes.Int64}, &wg)
s := NewUnorderedSynchronizer(inputs, []coltypes.T{coltypes.Int64}, &wg)
err := execerror.CatchVectorizedRuntimeError(func() { _ = s.Next(ctx) })
// This is the crux of the test: assert that all inputs have finished.
require.Equal(t, len(inputs), int(atomic.LoadUint32(&s.numFinishedInputs)))
require.True(t, testutils.IsError(err, expectedErr), err)
}

func BenchmarkParallelUnorderedSynchronizer(b *testing.B) {
func BenchmarkUnorderedSynchronizer(b *testing.B) {
const numInputs = 6

typs := []coltypes.T{coltypes.Int64}
Expand All @@ -136,7 +136,7 @@ func BenchmarkParallelUnorderedSynchronizer(b *testing.B) {
}
var wg sync.WaitGroup
ctx, cancelFn := context.WithCancel(context.Background())
s := NewParallelUnorderedSynchronizer(inputs, typs, &wg)
s := NewUnorderedSynchronizer(inputs, typs, &wg)
b.SetBytes(8 * int64(coldata.BatchSize()))
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
Loading

0 comments on commit e5fdda8

Please sign in to comment.