Skip to content

Commit

Permalink
Revert "streamingccl: hang processors on losing connection with sinkl…
Browse files Browse the repository at this point in the history
…ess stream client"

This reverts commit f5244f4.
  • Loading branch information
adityamaru committed Aug 16, 2021
1 parent dfe97af commit 749c8ee
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 221 deletions.
5 changes: 0 additions & 5 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,3 @@ func MakeKVEvent(kv roachpb.KeyValue) Event {
func MakeCheckpointEvent(resolvedTimestamp hlc.Timestamp) Event {
return checkpointEvent{resolvedTimestamp: resolvedTimestamp}
}

// MakeGenerationEvent creates an GenerationEvent.
func MakeGenerationEvent() Event {
return generationEvent{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package streamclient
import (
"context"
gosql "database/sql"
"database/sql/driver"
"fmt"
"strconv"

Expand Down Expand Up @@ -125,15 +124,7 @@ func (m *sinklessReplicationClient) ConsumePartition(
}
}
if err := rows.Err(); err != nil {
if errors.Is(err, driver.ErrBadConn) {
select {
case eventCh <- streamingccl.MakeGenerationEvent():
case <-ctx.Done():
errCh <- ctx.Err()
}
} else {
errCh <- err
}
errCh <- err
return
}
}()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,4 @@ INSERT INTO d.t2 VALUES (2);
feed.ObserveResolved(secondObserved.Value.Timestamp)
cancelIngestion()
})

t.Run("stream-address-disconnects", func(t *testing.T) {
clientCtx, cancelIngestion := context.WithCancel(ctx)
eventCh, errCh, err := client.ConsumePartition(clientCtx, pa, startTime)
require.NoError(t, err)
feedSource := &channelFeedSource{eventCh: eventCh, errCh: errCh}
feed := streamingtest.MakeReplicationFeed(t, feedSource)

h.SysServer.Stopper().Stop(clientCtx)

require.True(t, feed.ObserveGeneration())
cancelIngestion()
})
}
55 changes: 14 additions & 41 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -96,6 +95,14 @@ type streamIngestionProcessor struct {
// and have attempted to flush them with `internalDrained`.
internalDrained bool

// ingestionErr stores any error that is returned from the worker goroutine so
// that it can be forwarded through the DistSQL flow.
ingestionErr error

// pollingErr stores any error that is returned from the poller checking for a
// cutover signal so that it can be forwarded through the DistSQL flow.
pollingErr error

// pollingWaitGroup registers the polling goroutine and waits for it to return
// when the processor is being drained.
pollingWaitGroup sync.WaitGroup
Expand All @@ -110,20 +117,6 @@ type streamIngestionProcessor struct {
// closePoller is used to shutdown the poller that checks the job for a
// cutover signal.
closePoller chan struct{}

// mu is used to provide thread-safe read-write operations to ingestionErr
// and pollingErr.
mu struct {
syncutil.Mutex

// ingestionErr stores any error that is returned from the worker goroutine so
// that it can be forwarded through the DistSQL flow.
ingestionErr error

// pollingErr stores any error that is returned from the poller checking for a
// cutover signal so that it can be forwarded through the DistSQL flow.
pollingErr error
}
}

// partitionEvent augments a normal event with the partition it came from.
Expand Down Expand Up @@ -197,9 +190,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
defer sip.pollingWaitGroup.Done()
err := sip.checkForCutoverSignal(ctx, sip.closePoller)
if err != nil {
sip.mu.Lock()
sip.mu.pollingErr = errors.Wrap(err, "error while polling job for cutover signal")
sip.mu.Unlock()
sip.pollingErr = errors.Wrap(err, "error while polling job for cutover signal")
}
}()

Expand Down Expand Up @@ -229,11 +220,8 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr
return nil, sip.DrainHelper()
}

sip.mu.Lock()
err := sip.mu.pollingErr
sip.mu.Unlock()
if err != nil {
sip.MoveToDraining(err)
if sip.pollingErr != nil {
sip.MoveToDraining(sip.pollingErr)
return nil, sip.DrainHelper()
}

Expand All @@ -255,11 +243,8 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr
return row, nil
}

sip.mu.Lock()
err = sip.mu.ingestionErr
sip.mu.Unlock()
if err != nil {
sip.MoveToDraining(err)
if sip.ingestionErr != nil {
sip.MoveToDraining(sip.ingestionErr)
return nil, sip.DrainHelper()
}

Expand Down Expand Up @@ -387,10 +372,7 @@ func (sip *streamIngestionProcessor) merge(
})
}
go func() {
err := g.Wait()
sip.mu.Lock()
defer sip.mu.Unlock()
sip.mu.ingestionErr = err
sip.ingestionErr = g.Wait()
close(merged)
}()

Expand Down Expand Up @@ -444,15 +426,6 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err
}

return sip.flush()
case streamingccl.GenerationEvent:
log.Info(sip.Ctx, "GenerationEvent received")
select {
case <-sip.cutoverCh:
sip.internalDrained = true
return nil, nil
case <-sip.Ctx.Done():
return nil, sip.Ctx.Err()
}
default:
return nil, errors.Newf("unknown streaming event type %v", event.Type())
}
Expand Down
153 changes: 15 additions & 138 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -49,20 +48,9 @@ import (
// partition addresses.
type mockStreamClient struct {
partitionEvents map[streamingccl.PartitionAddress][]streamingccl.Event

// mu is used to provide a threadsafe interface to interceptors.
mu struct {
syncutil.Mutex

// interceptors can be registered to peek at every event generated by this
// client.
interceptors []func(streamingccl.Event, streamingccl.PartitionAddress)
tableID int
}
}

var _ streamclient.Client = &mockStreamClient{}
var _ streamclient.InterceptableStreamClient = &mockStreamClient{}

// GetTopology implements the Client interface.
func (m *mockStreamClient) GetTopology(
Expand All @@ -73,51 +61,22 @@ func (m *mockStreamClient) GetTopology(

// ConsumePartition implements the Client interface.
func (m *mockStreamClient) ConsumePartition(
ctx context.Context, address streamingccl.PartitionAddress, _ hlc.Timestamp,
_ context.Context, address streamingccl.PartitionAddress, _ hlc.Timestamp,
) (chan streamingccl.Event, chan error, error) {
var events []streamingccl.Event
var ok bool
if events, ok = m.partitionEvents[address]; !ok {
return nil, nil, errors.Newf("no events found for paritition %s", address)
}

eventCh := make(chan streamingccl.Event)
errCh := make(chan error)

go func() {
defer close(eventCh)
defer close(errCh)

for _, event := range events {
select {
case eventCh <- event:
case <-ctx.Done():
errCh <- ctx.Err()
}

func() {
m.mu.Lock()
defer m.mu.Unlock()

if len(m.mu.interceptors) > 0 {
for _, interceptor := range m.mu.interceptors {
if interceptor != nil {
interceptor(event, address)
}
}
}
}()
}
}()
eventCh := make(chan streamingccl.Event, len(events))

return eventCh, errCh, nil
}
for _, event := range events {
eventCh <- event
}
close(eventCh)

// RegisterInterception implements the InterceptableStreamClient interface.
func (m *mockStreamClient) RegisterInterception(fn streamclient.InterceptFn) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.interceptors = append(m.mu.interceptors, fn)
return eventCh, nil, nil
}

// errorStreamClient always returns an error when consuming a partition.
Expand Down Expand Up @@ -212,59 +171,6 @@ func TestStreamIngestionProcessor(t *testing.T) {
require.Nil(t, row)
testutils.IsError(meta.Err, "this client always returns an error")
})

t.Run("stream ingestion processor shuts down gracefully on losing client connection", func(t *testing.T) {
events := []streamingccl.Event{streamingccl.MakeGenerationEvent()}
pa := streamingccl.PartitionAddress("partition")
mockClient := &mockStreamClient{
partitionEvents: map[streamingccl.PartitionAddress][]streamingccl.Event{pa: events},
}

startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
partitionAddresses := []streamingccl.PartitionAddress{"partition"}

interceptCh := make(chan struct{})
defer close(interceptCh)
sendToInterceptCh := func() {
interceptCh <- struct{}{}
}
interceptGeneration := markGenerationEventReceived(sendToInterceptCh)
sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test/",
partitionAddresses, startTime, []streamclient.InterceptFn{interceptGeneration}, mockClient)
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
sip.Run(ctx)
}()

// The channel will block on read if the event has not been intercepted yet.
// Once it unblocks, we are guaranteed that the mockClient has sent the
// GenerationEvent and the processor has read it.
<-interceptCh

// The sip processor has received a GenerationEvent and is thus
// waiting for a cutover signal, so let's send one!
sip.cutoverCh <- struct{}{}

wg.Wait()
// Ensure that all the outputs are properly closed.
if !out.ProducerClosed() {
t.Fatalf("output RowReceiver not closed")
}

for {
// No metadata should have been produced since the processor
// should have been moved to draining state with a nil error.
row := out.NextNoMeta(t)
if row == nil {
break
}
t.Fatalf("more output rows than expected")
}
})
}

func getPartitionSpanToTableID(
Expand Down Expand Up @@ -473,30 +379,6 @@ func runStreamIngestionProcessor(
interceptEvents []streamclient.InterceptFn,
mockClient streamclient.Client,
) (*distsqlutils.RowBuffer, error) {
sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, streamAddr,
partitionAddresses, startTime, interceptEvents, mockClient)
require.NoError(t, err)

sip.Run(ctx)

// Ensure that all the outputs are properly closed.
if !out.ProducerClosed() {
t.Fatalf("output RowReceiver not closed")
}
return out, err
}

func getStreamIngestionProcessor(
ctx context.Context,
t *testing.T,
registry *jobs.Registry,
kvDB *kv.DB,
streamAddr string,
partitionAddresses []streamingccl.PartitionAddress,
startTime hlc.Timestamp,
interceptEvents []streamclient.InterceptFn,
mockClient streamclient.Client,
) (*streamIngestionProcessor, *distsqlutils.RowBuffer, error) {
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)

Expand Down Expand Up @@ -541,7 +423,14 @@ func getStreamIngestionProcessor(
interceptable.RegisterInterception(interceptor)
}
}
return sip, out, err

sip.Run(ctx)

// Ensure that all the outputs are properly closed.
if !out.ProducerClosed() {
t.Fatalf("output RowReceiver not closed")
}
return out, err
}

func registerValidatorWithClient(
Expand Down Expand Up @@ -587,15 +476,3 @@ func makeCheckpointEventCounter(
}
}
}

// markGenerationEventReceived runs f after seeing a GenerationEvent.
func markGenerationEventReceived(
f func(),
) func(event streamingccl.Event, pa streamingccl.PartitionAddress) {
return func(event streamingccl.Event, pa streamingccl.PartitionAddress) {
switch event.Type() {
case streamingccl.GenerationEvent:
f()
}
}
}
Loading

0 comments on commit 749c8ee

Please sign in to comment.