From feb1f25c94db4a67aeae9efe1ffa4f297eb6e89c Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Thu, 21 Mar 2019 14:48:56 -0700 Subject: [PATCH] changefeedccl: switch high-level retry marker from whitelist to blacklist In the roachtests for crdb-chaos and sink-chaos we're seeing changefeeds fail with surprising errors: [NotLeaseHolderError] r681: replica (n1,s1):1 not lease holder; replica (n2,s2):2 is descriptor not found We'd like to avoid failing a changefeed unnecessarily, so when an error bubbles up to the top level, we'd like to retry the distributed flow if possible. We initially tried to whitelist which errors should cause the changefeed to retry, but this turns out to be brittle, so this commit switches to a blacklist. Any error that is expected to be permanent is now marked with `MarkTerminalError` by the time it comes out of `distChangefeedFlow`. Everything else should be logged loudly and retried. Touches #35974 Touches #36019 Release note: None --- pkg/ccl/changefeedccl/buffer.go | 4 ++ pkg/ccl/changefeedccl/changefeed.go | 3 + pkg/ccl/changefeedccl/changefeed_stmt.go | 92 ++++++++++++------------ pkg/ccl/changefeedccl/changefeed_test.go | 27 +++++-- pkg/ccl/changefeedccl/encoder.go | 4 +- pkg/ccl/changefeedccl/errors.go | 68 ++++++++++++++++++ pkg/ccl/changefeedccl/metrics.go | 10 +-- pkg/ccl/changefeedccl/poller.go | 15 +++- pkg/ccl/changefeedccl/sink.go | 57 ++------------- pkg/ccl/changefeedccl/table_history.go | 7 +- 10 files changed, 170 insertions(+), 117 deletions(-) create mode 100644 pkg/ccl/changefeedccl/errors.go diff --git a/pkg/ccl/changefeedccl/buffer.go b/pkg/ccl/changefeedccl/buffer.go index 3dbc0355acab..dac614869cea 100644 --- a/pkg/ccl/changefeedccl/buffer.go +++ b/pkg/ccl/changefeedccl/buffer.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -208,6 +209,9 @@ func (b *memBuffer) addRow(ctx context.Context, row tree.Datums) error { _, err := b.mu.entries.AddRow(ctx, row) b.mu.Unlock() b.metrics.BufferEntriesIn.Inc(1) + if e, ok := pgerror.GetPGCause(err); ok && e.Code == pgerror.CodeOutOfMemoryError { + err = MarkTerminalError(err) + } return err } diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index fe766037c07c..1da15b6e8ccd 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -322,6 +322,9 @@ func checkpointResolvedTimestamp( return resolved } if err := jobProgressedFn(ctx, progressedClosure); err != nil { + if _, ok := err.(*jobs.InvalidStatusError); ok { + err = MarkTerminalError(err) + } return err } } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 80fc213f236b..dbe3293c4e40 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -11,7 +11,6 @@ package changefeedccl import ( "context" "net/url" - "regexp" "sort" "time" @@ -239,7 +238,8 @@ func changefeedPlanHook( telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets))) if details.SinkURI == `` { - return distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh) + err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh) + return MaybeStripTerminalErrorMarker(err) } settings := p.ExecCfg().Settings @@ -258,16 +258,7 @@ func changefeedPlanHook( nodeID := p.ExtendedEvalContext().NodeID canarySink, err := getSink(details.SinkURI, nodeID, details.Opts, details.Targets, settings) if err != nil { - // In this context, we don't want to retry even retryable errors from the - // sync. Unwrap any retryable errors encountered. - // - // TODO(knz): This error handling is suspicious (see #35854 - // and #35920). What if the error is wrapped? Or has been - // flattened into a pgerror.Error? - if rErr, ok := err.(*retryableSinkError); ok { - return rErr.cause - } - return err + return MaybeStripTerminalErrorMarker(err) } if err := canarySink.Close(); err != nil { return err @@ -427,12 +418,32 @@ func (b *changefeedResumer) Resume( ctx context.Context, job *jobs.Job, planHookState interface{}, startedCh chan<- tree.Datums, ) error { phs := planHookState.(sql.PlanHookState) + jobID := *job.ID() details := job.Details().(jobspb.ChangefeedDetails) progress := job.Progress() - // Errors encountered while emitting changes to the Sink may be transient; for - // example, a temporary network outage. When one of these errors occurs, we do - // not fail the job but rather restart the distSQL flow after a short backoff. + // TODO(dan): This is a workaround for not being able to set an initial + // progress high-water when creating a job (currently only the progress + // details can be set). I didn't want to pick off the refactor to get this + // fix in, but it'd be nice to remove this hack. + if _, ok := details.Opts[optCursor]; ok { + if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) { + progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime} + } + } + + // WIP I'm worried about what happens if we miss adding something to the + // blacklist. Should we bail after receiving some consecutive number of + // identical error messages? + + // We'd like to avoid failing a changefeed unnecessarily, so when an error + // bubbles up to this level, we'd like to "retry" the flow if possible. This + // could be because the sink is down or because a cockroach node has crashed + // or for many other reasons. We initially tried to whitelist which errors + // should cause the changefeed to retry, but this turns out to be brittle, so + // we switched to a blacklist. Any error that is expected to be permanent is + // now marked with `MarkTerminalError` by the time it comes out of + // `distChangefeedFlow`. Everything else should be logged loudly and retried. opts := retry.Options{ InitialBackoff: 5 * time.Millisecond, Multiplier: 2, @@ -440,41 +451,37 @@ func (b *changefeedResumer) Resume( } var err error for r := retry.StartWithCtx(ctx, opts); r.Next(); { - // TODO(dan): This is a workaround for not being able to set an initial - // progress high-water when creating a job (currently only the progress - // details can be set). I didn't want to pick off the refactor to get this - // fix in, but it'd be nice to remove this hack. - if _, ok := details.Opts[optCursor]; ok { - if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) { - progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime} - } + if err = distChangefeedFlow(ctx, phs, jobID, details, progress, startedCh); err == nil { + break } - - err = distChangefeedFlow(ctx, phs, *job.ID(), details, progress, startedCh) - if !isRetryableSinkError(err) && !isRetryableRPCError(err) { + if IsTerminalError(err) { break } - log.Infof(ctx, `CHANGEFEED job %d encountered retryable error: %v`, *job.ID(), err) + + log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %+v`, jobID, err) + if metrics, ok := phs.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { + metrics.ErrorRetries.Inc(1) + } // Re-load the job in order to update our progress object, which may have // been updated by the changeFrontier processor since the flow started. - reloadedJob, phsErr := phs.ExecCfg().JobRegistry.LoadJob(ctx, *job.ID()) - if phsErr != nil { - err = phsErr - break + reloadedJob, reloadErr := phs.ExecCfg().JobRegistry.LoadJob(ctx, jobID) + if reloadErr != nil { + log.Warningf(ctx, `CHANGEFEED job %d could not reload job progress; `+ + `continuing from last known high-water of %s: %v`, + jobID, progress.GetHighWater(), reloadErr) + } else { + progress = reloadedJob.Progress() } - progress = reloadedJob.Progress() + // startedCh is normally used to signal back to the creator of the job that // the job has started; however, in this case nothing will ever receive // on the channel, causing the changefeed flow to block. Replace it with // a dummy channel. startedCh = make(chan tree.Datums, 1) - if metrics, ok := phs.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { - metrics.SinkErrorRetries.Inc(1) - } - continue } if err != nil { - log.Infof(ctx, `CHANGEFEED job %d returning with error: %v`, *job.ID(), err) + err = MaybeStripTerminalErrorMarker(err) + log.Warningf(ctx, `CHANGEFEED job %d returning with error: %v`, jobID, err) } return err } @@ -492,14 +499,3 @@ func changefeedResumeHook(typ jobspb.Type, _ *cluster.Settings) jobs.Resumer { } return &changefeedResumer{} } - -// Retryable RPC Error represents a gRPC error which indicates a retryable -// situation such as a connected node going down. In this case the DistSQL flow -// should be retried. -const retryableErrorStr = "rpc error|node unavailable" - -var retryableErrorRegex = regexp.MustCompile(retryableErrorStr) - -func isRetryableRPCError(err error) bool { - return retryableErrorRegex.MatchString(err.Error()) -} diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index d3c61939060a..e1fc7c400ada 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1052,7 +1052,7 @@ func TestChangefeedMonitoring(t *testing.T) { t.Run(`poller`, pollerTest(sinklessTest, testFn)) } -func TestChangefeedRetryableSinkError(t *testing.T) { +func TestChangefeedRetryableError(t *testing.T) { defer leaktest.AfterTest(t)() defer utilccl.TestingEnableEnterprise()() @@ -1062,8 +1062,11 @@ func TestChangefeedRetryableSinkError(t *testing.T) { Changefeed.(*TestingKnobs).AfterSinkFlush var failSink int64 failSinkHook := func() error { - if atomic.LoadInt64(&failSink) != 0 { - return &retryableSinkError{cause: fmt.Errorf("synthetic retryable error")} + switch atomic.LoadInt64(&failSink) { + case 1: + return fmt.Errorf("synthetic retryable error") + case 2: + return MarkTerminalError(fmt.Errorf("synthetic non-retryable error")) } return origAfterSinkFlushHook() } @@ -1090,10 +1093,10 @@ func TestChangefeedRetryableSinkError(t *testing.T) { // Verify that sink is failing requests. registry := f.Server().JobRegistry().(*jobs.Registry) - retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).SinkErrorRetries + retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).ErrorRetries testutils.SucceedsSoon(t, func() error { if retryCounter.Counter.Count() < 3 { - return fmt.Errorf("insufficient sink error retries detected") + return fmt.Errorf("insufficient error retries detected") } return nil }) @@ -1107,6 +1110,20 @@ func TestChangefeedRetryableSinkError(t *testing.T) { `foo: [2]->{"after": {"a": 2}}`, `foo: [3]->{"after": {"a": 3}}`, }) + + // Set SQL Sink to return a terminal error and insert one last row. + atomic.StoreInt64(&failSink, 2) + sqlDB.Exec(t, `INSERT INTO foo VALUES (4)`) + + // Ensure that we eventually get the error message back out. + for { + _, err := foo.Next() + if err == nil { + continue + } + require.EqualError(t, err, `synthetic non-retryable error`) + break + } } // Only the enterprise version uses jobs. diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 17b983a9035e..e6c24a9be0a1 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -261,7 +261,7 @@ func (e *confluentAvroEncoder) EncodeKey(row encodeRow) ([]byte, error) { var err error registered.schema, err = indexToAvroSchema(row.tableDesc, &row.tableDesc.PrimaryIndex) if err != nil { - return nil, err + return nil, MarkTerminalError(err) } // NB: This uses the kafka name escaper because it has to match the name @@ -295,7 +295,7 @@ func (e *confluentAvroEncoder) EncodeValue(row encodeRow) ([]byte, error) { if !ok { afterDataSchema, err := tableToAvroSchema(row.tableDesc) if err != nil { - return nil, err + return nil, MarkTerminalError(err) } opts := avroEnvelopeOpts{afterField: true, updatedField: e.updatedField} diff --git a/pkg/ccl/changefeedccl/errors.go b/pkg/ccl/changefeedccl/errors.go new file mode 100644 index 000000000000..d11786e34c84 --- /dev/null +++ b/pkg/ccl/changefeedccl/errors.go @@ -0,0 +1,68 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "fmt" + "strings" + + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" +) + +const terminalErrorString = "terminal changefeed error" + +type terminalError struct { + wrapped error +} + +// MarkTerminalError wraps the given error, marking it as non-retryable to +// changefeeds. +func MarkTerminalError(e error) error { + return &terminalError{wrapped: e} +} + +// Error implements the error interface. +func (e *terminalError) Error() string { + return fmt.Sprintf("%s: %s", terminalErrorString, e.wrapped.Error()) +} + +// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is +// planned to be moved to the stdlib in go 1.13. +func (e *terminalError) Unwrap() error { return e.wrapped } + +// IsTerminalError returns true if the supplied error, or any of its parent +// causes, is a IsTerminalError. +func IsTerminalError(err error) bool { + for { + if err == nil { + return false + } + if _, ok := err.(*terminalError); ok { + return true + } + if _, ok := err.(*pgerror.Error); ok { + return strings.Contains(err.Error(), terminalErrorString) + } + if e, ok := err.(interface{ Unwrap() error }); ok { + err = e.Unwrap() + continue + } + return false + } +} + +// MaybeStripTerminalErrorMarker performs some minimal attempt to clean the +// TerminalError marker out. This won't do anything if the TerminalError itself +// has been wrapped, but that's okay, we'll just have an uglier string. +func MaybeStripTerminalErrorMarker(err error) error { + if e, ok := err.(*terminalError); ok { + err = e.wrapped + } + return err +} diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 52084002c1cd..4f88ebb3c840 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -95,9 +95,9 @@ var ( Measurement: "Flushes", Unit: metric.Unit_COUNT, } - metaChangefeedSinkErrorRetries = metric.Metadata{ - Name: "changefeed.sink_error_retries", - Help: "Total retryable errors encountered while emitting to sinks", + metaChangefeedErrorRetries = metric.Metadata{ + Name: "changefeed.error_retries", + Help: "Total retryable errors encountered by all changefeeds", Measurement: "Errors", Unit: metric.Unit_COUNT, } @@ -174,7 +174,7 @@ type Metrics struct { EmittedMessages *metric.Counter EmittedBytes *metric.Counter Flushes *metric.Counter - SinkErrorRetries *metric.Counter + ErrorRetries *metric.Counter BufferEntriesIn *metric.Counter BufferEntriesOut *metric.Counter @@ -202,7 +202,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { EmittedMessages: metric.NewCounter(metaChangefeedEmittedMessages), EmittedBytes: metric.NewCounter(metaChangefeedEmittedBytes), Flushes: metric.NewCounter(metaChangefeedFlushes), - SinkErrorRetries: metric.NewCounter(metaChangefeedSinkErrorRetries), + ErrorRetries: metric.NewCounter(metaChangefeedErrorRetries), BufferEntriesIn: metric.NewCounter(metaChangefeedBufferEntriesIn), BufferEntriesOut: metric.NewCounter(metaChangefeedBufferEntriesOut), diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index 16d418a4c1cc..260c7736adb9 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -11,6 +11,7 @@ package changefeedccl import ( "context" "sort" + "strings" "sync/atomic" "time" @@ -288,7 +289,11 @@ func (p *poller) rangefeedImpl(ctx context.Context) error { } frontier.Forward(span, lastHighwater) g.GoCtx(func(ctx context.Context) error { - return ds.RangeFeed(ctx, req, eventC) + err := ds.RangeFeed(ctx, req, eventC) + if err != nil && strings.Contains(err.Error(), "must be after replica GC threshold") { + err = MarkTerminalError(err) + } + return err }) } g.GoCtx(func(ctx context.Context) error { @@ -497,7 +502,11 @@ func (p *poller) exportSpan( } if pErr != nil { - return pgerror.Wrapf(pErr.GoError(), pgerror.CodeDataExceptionError, + err := pErr.GoError() + if strings.Contains(err.Error(), "must be after replica GC threshold") { + err = MarkTerminalError(err) + } + return pgerror.Wrapf(err, pgerror.CodeDataExceptionError, `fetching changes for %s`, span) } p.metrics.PollRequestNanosHist.RecordValue(exportDuration.Nanoseconds()) @@ -642,7 +651,7 @@ func clusterNodeCount(g *gossip.Gossip) int { func (p *poller) validateTable(ctx context.Context, desc *sqlbase.TableDescriptor) error { if err := validateChangefeedTable(p.details.Targets, desc); err != nil { - return err + return MarkTerminalError(err) } p.mu.Lock() defer p.mu.Unlock() diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 35a503c8afb6..a6c29b9a6ec2 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -336,13 +336,13 @@ func makeKafkaSink( if err != nil { err = pgerror.Wrapf(err, pgerror.CodeCannotConnectNowError, `connecting to kafka: %s`, bootstrapServers) - return nil, &retryableSinkError{cause: err} + return nil, err } sink.producer, err = sarama.NewAsyncProducerFromClient(sink.client) if err != nil { err = pgerror.Wrapf(err, pgerror.CodeCannotConnectNowError, `connecting to kafka: %s`, bootstrapServers) - return nil, &retryableSinkError{cause: err} + return nil, err } sink.start() @@ -405,7 +405,7 @@ func (s *kafkaSink) EmitResolvedTimestamp( topics = append(topics, topic) } if err := s.client.RefreshMetadata(topics...); err != nil { - return &retryableSinkError{cause: err} + return err } s.lastMetadataRefresh = timeutil.Now() } @@ -423,7 +423,7 @@ func (s *kafkaSink) EmitResolvedTimestamp( // be picked up and get later ones. partitions, err := s.client.Partitions(topic) if err != nil { - return &retryableSinkError{cause: err} + return err } for _, partition := range partitions { msg := &sarama.ProducerMessage{ @@ -455,9 +455,6 @@ func (s *kafkaSink) Flush(ctx context.Context) error { s.mu.Unlock() if immediateFlush { - if _, ok := errors.Cause(flushErr).(*sarama.ProducerError); ok { - flushErr = &retryableSinkError{cause: flushErr} - } return flushErr } @@ -472,9 +469,6 @@ func (s *kafkaSink) Flush(ctx context.Context) error { flushErr := s.mu.flushErr s.mu.flushErr = nil s.mu.Unlock() - if _, ok := errors.Cause(flushErr).(*sarama.ProducerError); ok { - flushErr = &retryableSinkError{cause: flushErr} - } return flushErr } } @@ -773,46 +767,3 @@ func (s *bufferSink) Close() error { s.closed = true return nil } - -// causer matches the (unexported) interface used by Go to allow errors to wrap -// their parent cause. -type causer interface { - Cause() error -} - -// String and regex used to match retryable sink errors when they have been -// "flattened" into a pgerror. -const retryableSinkErrorString = "retryable sink error" - -// retryableSinkError should be used by sinks to wrap any error which may -// be retried. -type retryableSinkError struct { - cause error -} - -func (e retryableSinkError) Error() string { - return fmt.Sprintf(retryableSinkErrorString+": %s", e.cause.Error()) -} -func (e retryableSinkError) Cause() error { return e.cause } - -// isRetryableSinkError returns true if the supplied error, or any of its parent -// causes, is a retryableSinkError. -func isRetryableSinkError(err error) bool { - for { - if _, ok := err.(*retryableSinkError); ok { - return true - } - // TODO(mrtracy): This pathway, which occurs when the retryable error is - // detected on a non-local node of the distsql flow, is only currently - // being tested with a roachtest, which is expensive. See if it can be - // tested via a unit test. - if _, ok := err.(*pgerror.Error); ok { - return strings.Contains(err.Error(), retryableSinkErrorString) - } - if e, ok := err.(causer); ok { - err = e.Cause() - continue - } - return false - } -} diff --git a/pkg/ccl/changefeedccl/table_history.go b/pkg/ccl/changefeedccl/table_history.go index 309b19089ceb..65df5da5f1e6 100644 --- a/pkg/ccl/changefeedccl/table_history.go +++ b/pkg/ccl/changefeedccl/table_history.go @@ -11,6 +11,7 @@ package changefeedccl import ( "context" "sort" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/internal/client" @@ -243,7 +244,11 @@ func fetchTableDescriptorVersions( log.Infof(ctx, `fetched table descs (%s,%s] took %s`, startTS, endTS, timeutil.Since(start)) } if pErr != nil { - return nil, pgerror.Wrapf(pErr.GoError(), pgerror.CodeDataExceptionError, + err := pErr.GoError() + if strings.Contains(err.Error(), "must be after replica GC threshold") { + err = MarkTerminalError(err) + } + return nil, pgerror.Wrapf(err, pgerror.CodeDataExceptionError, `fetching changes for %s`, span) }