diff --git a/pkg/ccl/changefeedccl/buffer.go b/pkg/ccl/changefeedccl/buffer.go index 3dbc0355acab..dac614869cea 100644 --- a/pkg/ccl/changefeedccl/buffer.go +++ b/pkg/ccl/changefeedccl/buffer.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -208,6 +209,9 @@ func (b *memBuffer) addRow(ctx context.Context, row tree.Datums) error { _, err := b.mu.entries.AddRow(ctx, row) b.mu.Unlock() b.metrics.BufferEntriesIn.Inc(1) + if e, ok := pgerror.GetPGCause(err); ok && e.Code == pgerror.CodeOutOfMemoryError { + err = MarkTerminalError(err) + } return err } diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index fe766037c07c..1da15b6e8ccd 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -322,6 +322,9 @@ func checkpointResolvedTimestamp( return resolved } if err := jobProgressedFn(ctx, progressedClosure); err != nil { + if _, ok := err.(*jobs.InvalidStatusError); ok { + err = MarkTerminalError(err) + } return err } } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 80fc213f236b..6fd6525a7d8a 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -11,7 +11,6 @@ package changefeedccl import ( "context" "net/url" - "regexp" "sort" "time" @@ -31,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/pkg/errors" ) @@ -239,7 +239,8 @@ func changefeedPlanHook( telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets))) if details.SinkURI == `` { - return distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh) + err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh) + return MaybeStripTerminalErrorMarker(err) } settings := p.ExecCfg().Settings @@ -258,16 +259,7 @@ func changefeedPlanHook( nodeID := p.ExtendedEvalContext().NodeID canarySink, err := getSink(details.SinkURI, nodeID, details.Opts, details.Targets, settings) if err != nil { - // In this context, we don't want to retry even retryable errors from the - // sync. Unwrap any retryable errors encountered. - // - // TODO(knz): This error handling is suspicious (see #35854 - // and #35920). What if the error is wrapped? Or has been - // flattened into a pgerror.Error? - if rErr, ok := err.(*retryableSinkError); ok { - return rErr.cause - } - return err + return MaybeStripTerminalErrorMarker(err) } if err := canarySink.Close(); err != nil { return err @@ -427,56 +419,100 @@ func (b *changefeedResumer) Resume( ctx context.Context, job *jobs.Job, planHookState interface{}, startedCh chan<- tree.Datums, ) error { phs := planHookState.(sql.PlanHookState) + execCfg := phs.ExecCfg() + jobID := *job.ID() details := job.Details().(jobspb.ChangefeedDetails) progress := job.Progress() - // Errors encountered while emitting changes to the Sink may be transient; for - // example, a temporary network outage. When one of these errors occurs, we do - // not fail the job but rather restart the distSQL flow after a short backoff. + // TODO(dan): This is a workaround for not being able to set an initial + // progress high-water when creating a job (currently only the progress + // details can be set). I didn't want to pick off the refactor to get this + // fix in, but it'd be nice to remove this hack. + if _, ok := details.Opts[optCursor]; ok { + if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) { + progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime} + } + } + + // We'd like to avoid failing a changefeed unnecessarily, so when an error + // bubbles up to this level, we'd like to "retry" the flow if possible. This + // could be because the sink is down or because a cockroach node has crashed + // or for many other reasons. We initially tried to whitelist which errors + // should cause the changefeed to retry, but this turns out to be brittle, so + // we switched to a blacklist. Any error that is expected to be permanent is + // now marked with `MarkTerminalError` by the time it comes out of + // `distChangefeedFlow`. Everything else should be logged loudly and retried. opts := retry.Options{ InitialBackoff: 5 * time.Millisecond, Multiplier: 2, MaxBackoff: 10 * time.Second, } + const consecutiveIdenticalErrorsWindow = time.Minute + const consecutiveIdenticalErrorsDefault = int(5) + var consecutiveErrors struct { + message string + count int + firstSeen time.Time + } var err error for r := retry.StartWithCtx(ctx, opts); r.Next(); { - // TODO(dan): This is a workaround for not being able to set an initial - // progress high-water when creating a job (currently only the progress - // details can be set). I didn't want to pick off the refactor to get this - // fix in, but it'd be nice to remove this hack. - if _, ok := details.Opts[optCursor]; ok { - if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) { - progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime} + if err = distChangefeedFlow(ctx, phs, jobID, details, progress, startedCh); err == nil { + return nil + } + if IsTerminalError(err) { + err = MaybeStripTerminalErrorMarker(err) + log.Warningf(ctx, `CHANGEFEED job %d returning with error: %v`, jobID, err) + return err + } + + // If we receive an identical error message more than some number of times + // in a time period, bail. This is a safety net in case we miss marking + // something as terminal. + { + m, d := err.Error(), timeutil.Since(consecutiveErrors.firstSeen) + if d > consecutiveIdenticalErrorsWindow || m != consecutiveErrors.message { + consecutiveErrors.message = m + consecutiveErrors.count = 0 + consecutiveErrors.firstSeen = timeutil.Now() + } + consecutiveErrors.count++ + consecutiveIdenticalErrorsThreshold := consecutiveIdenticalErrorsDefault + if cfKnobs, ok := execCfg.DistSQLRunTestingKnobs.Changefeed.(*TestingKnobs); ok { + if c := cfKnobs.ConsecutiveIdenticalErrorBailoutCount; c != 0 { + consecutiveIdenticalErrorsThreshold = c + } + } + if consecutiveErrors.count >= consecutiveIdenticalErrorsThreshold { + log.Warningf(ctx, `CHANGEFEED job %d saw the same non-terminal error %d times in %s: %+v`, + jobID, consecutiveErrors.count, d, err) + return err } } - err = distChangefeedFlow(ctx, phs, *job.ID(), details, progress, startedCh) - if !isRetryableSinkError(err) && !isRetryableRPCError(err) { - break + log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %+v`, jobID, err) + if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { + metrics.ErrorRetries.Inc(1) } - log.Infof(ctx, `CHANGEFEED job %d encountered retryable error: %v`, *job.ID(), err) // Re-load the job in order to update our progress object, which may have // been updated by the changeFrontier processor since the flow started. - reloadedJob, phsErr := phs.ExecCfg().JobRegistry.LoadJob(ctx, *job.ID()) - if phsErr != nil { - err = phsErr - break + reloadedJob, reloadErr := execCfg.JobRegistry.LoadJob(ctx, jobID) + if reloadErr != nil { + log.Warningf(ctx, `CHANGEFEED job %d could not reload job progress; `+ + `continuing from last known high-water of %s: %v`, + jobID, progress.GetHighWater(), reloadErr) + } else { + progress = reloadedJob.Progress() } - progress = reloadedJob.Progress() + // startedCh is normally used to signal back to the creator of the job that // the job has started; however, in this case nothing will ever receive // on the channel, causing the changefeed flow to block. Replace it with // a dummy channel. startedCh = make(chan tree.Datums, 1) - if metrics, ok := phs.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { - metrics.SinkErrorRetries.Inc(1) - } - continue } - if err != nil { - log.Infof(ctx, `CHANGEFEED job %d returning with error: %v`, *job.ID(), err) - } - return err + // We only hit this if `r.Next()` returns false, which right now only happens + // on context cancellation. + return errors.Wrap(err, `ran out of retries`) } func (b *changefeedResumer) OnFailOrCancel(context.Context, *client.Txn, *jobs.Job) error { return nil } @@ -492,14 +528,3 @@ func changefeedResumeHook(typ jobspb.Type, _ *cluster.Settings) jobs.Resumer { } return &changefeedResumer{} } - -// Retryable RPC Error represents a gRPC error which indicates a retryable -// situation such as a connected node going down. In this case the DistSQL flow -// should be retried. -const retryableErrorStr = "rpc error|node unavailable" - -var retryableErrorRegex = regexp.MustCompile(retryableErrorStr) - -func isRetryableRPCError(err error) bool { - return retryableErrorRegex.MatchString(err.Error()) -} diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index d3c61939060a..429596aef568 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1052,61 +1052,106 @@ func TestChangefeedMonitoring(t *testing.T) { t.Run(`poller`, pollerTest(sinklessTest, testFn)) } -func TestChangefeedRetryableSinkError(t *testing.T) { +func TestChangefeedRetryableError(t *testing.T) { defer leaktest.AfterTest(t)() defer utilccl.TestingEnableEnterprise()() testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { - origAfterSinkFlushHook := f.Server().(*server.TestServer).Cfg.TestingKnobs. + knobs := f.Server().(*server.TestServer).Cfg.TestingKnobs. DistSQL.(*distsqlrun.TestingKnobs). - Changefeed.(*TestingKnobs).AfterSinkFlush + Changefeed.(*TestingKnobs) + origAfterSinkFlushHook := knobs.AfterSinkFlush var failSink int64 failSinkHook := func() error { - if atomic.LoadInt64(&failSink) != 0 { - return &retryableSinkError{cause: fmt.Errorf("synthetic retryable error")} + switch atomic.LoadInt64(&failSink) { + case 1: + return fmt.Errorf("unique synthetic retryable error: %s", timeutil.Now()) + case 2: + return MarkTerminalError(fmt.Errorf("synthetic terminal error")) + case 3: + return fmt.Errorf("should be terminal but isn't") } return origAfterSinkFlushHook() } - f.Server().(*server.TestServer).Cfg.TestingKnobs. - DistSQL.(*distsqlrun.TestingKnobs). - Changefeed.(*TestingKnobs).AfterSinkFlush = failSinkHook + knobs.AfterSinkFlush = failSinkHook + // Set up a new feed and verify that the sink is started up. sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) defer closeFeed(t, foo) - - // Verify that the sink is started up. sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`) assertPayloads(t, foo, []string{ `foo: [1]->{"after": {"a": 1}}`, }) - // Set SQL Sink to return a retryable error. + // Set sink to return unique retryable errors and insert a row. Verify that + // sink is failing requests. atomic.StoreInt64(&failSink, 1) - - // Insert 1 row while the sink is failing. sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`) - - // Verify that sink is failing requests. registry := f.Server().JobRegistry().(*jobs.Registry) - retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).SinkErrorRetries + retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).ErrorRetries testutils.SucceedsSoon(t, func() error { if retryCounter.Counter.Count() < 3 { - return fmt.Errorf("insufficient sink error retries detected") + return fmt.Errorf("insufficient error retries detected") } return nil }) - // Fix the sink and insert another row. + // Fix the sink and insert another row. Check that nothing funky happened. atomic.StoreInt64(&failSink, 0) sqlDB.Exec(t, `INSERT INTO foo VALUES (3)`) - - // Check that nothing funky happened. assertPayloads(t, foo, []string{ `foo: [2]->{"after": {"a": 2}}`, `foo: [3]->{"after": {"a": 3}}`, }) + + // Set sink to return a terminal error and insert a row. Ensure that we + // eventually get the error message back out. + atomic.StoreInt64(&failSink, 2) + sqlDB.Exec(t, `INSERT INTO foo VALUES (4)`) + for { + _, err := foo.Next() + if err == nil { + continue + } + require.EqualError(t, err, `synthetic terminal error`) + break + } + + // The above test an error that is correctly _not marked_ as terminal and + // one that is correctly _marked_ as terminal. We're also concerned a + // terminal error that is not marked but should be. This would result in an + // indefinite retry loop, so we have a safety net that bails if we + // consecutively receive an identical error message some number of times. + // But default this safety net is disabled in tests, because we want to + // catch this. The following line enables the safety net (by setting the + // knob to the 0 value) so that we can test the safety net. + knobs.ConsecutiveIdenticalErrorBailoutCount = 0 + + // Set up a new feed and verify that the sink is started up. + atomic.StoreInt64(&failSink, 0) + sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) + bar := feed(t, f, `CREATE CHANGEFEED FOR bar`) + defer closeFeed(t, bar) + sqlDB.Exec(t, `INSERT INTO bar VALUES (1)`) + assertPayloads(t, bar, []string{ + `bar: [1]->{"after": {"a": 1}}`, + }) + + // Set sink to return a non-unique non-terminal error and insert a row. + // This simulates an error we should blacklist but have missed. The feed + // should eventually fail. + atomic.StoreInt64(&failSink, 3) + sqlDB.Exec(t, `INSERT INTO bar VALUES (2)`) + for { + _, err := bar.Next() + if err == nil { + continue + } + require.EqualError(t, err, `should be terminal but isn't`) + break + } } // Only the enterprise version uses jobs. diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 17b983a9035e..e6c24a9be0a1 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -261,7 +261,7 @@ func (e *confluentAvroEncoder) EncodeKey(row encodeRow) ([]byte, error) { var err error registered.schema, err = indexToAvroSchema(row.tableDesc, &row.tableDesc.PrimaryIndex) if err != nil { - return nil, err + return nil, MarkTerminalError(err) } // NB: This uses the kafka name escaper because it has to match the name @@ -295,7 +295,7 @@ func (e *confluentAvroEncoder) EncodeValue(row encodeRow) ([]byte, error) { if !ok { afterDataSchema, err := tableToAvroSchema(row.tableDesc) if err != nil { - return nil, err + return nil, MarkTerminalError(err) } opts := avroEnvelopeOpts{afterField: true, updatedField: e.updatedField} diff --git a/pkg/ccl/changefeedccl/errors.go b/pkg/ccl/changefeedccl/errors.go new file mode 100644 index 000000000000..c5695a31d092 --- /dev/null +++ b/pkg/ccl/changefeedccl/errors.go @@ -0,0 +1,71 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "fmt" + "strings" + + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" +) + +const terminalErrorString = "terminal changefeed error" + +type terminalError struct { + wrapped error +} + +// MarkTerminalError wraps the given error, marking it as non-retryable to +// changefeeds. +func MarkTerminalError(e error) error { + return &terminalError{wrapped: e} +} + +// Error implements the error interface. +func (e *terminalError) Error() string { + return fmt.Sprintf("%s: %s", terminalErrorString, e.wrapped.Error()) +} + +// Cause implements the github.com/pkg/errors.causer interface. +func (e *terminalError) Cause() error { return e.wrapped } + +// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is +// planned to be moved to the stdlib in go 1.13. +func (e *terminalError) Unwrap() error { return e.wrapped } + +// IsTerminalError returns true if the supplied error, or any of its parent +// causes, is a IsTerminalError. +func IsTerminalError(err error) bool { + for { + if err == nil { + return false + } + if _, ok := err.(*terminalError); ok { + return true + } + if _, ok := err.(*pgerror.Error); ok { + return strings.Contains(err.Error(), terminalErrorString) + } + if e, ok := err.(interface{ Unwrap() error }); ok { + err = e.Unwrap() + continue + } + return false + } +} + +// MaybeStripTerminalErrorMarker performs some minimal attempt to clean the +// TerminalError marker out. This won't do anything if the TerminalError itself +// has been wrapped, but that's okay, we'll just have an uglier string. +func MaybeStripTerminalErrorMarker(err error) error { + if e, ok := err.(*terminalError); ok { + err = e.wrapped + } + return err +} diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 65a3ab274311..89e7b655b430 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -13,6 +13,7 @@ import ( gosql "database/sql" gojson "encoding/json" "fmt" + "math" "net/url" "reflect" "sort" @@ -208,7 +209,11 @@ func expectResolvedTimestampAvro( func sinklessTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) { return func(t *testing.T) { ctx := context.Background() - knobs := base.TestingKnobs{DistSQL: &distsqlrun.TestingKnobs{Changefeed: &TestingKnobs{}}} + knobs := base.TestingKnobs{DistSQL: &distsqlrun.TestingKnobs{Changefeed: &TestingKnobs{ + // Disable the safety net for errors that should be marked as terminal by + // weren't. We want to catch these in tests. + ConsecutiveIdenticalErrorBailoutCount: math.MaxInt32, + }}} s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: knobs, UseDatabase: `d`, @@ -249,6 +254,9 @@ func enterpriseTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) } return nil }, + // Disable the safety net for errors that should be marked as terminal by + // weren't. We want to catch these in tests. + ConsecutiveIdenticalErrorBailoutCount: math.MaxInt32, }}} s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 52084002c1cd..4f88ebb3c840 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -95,9 +95,9 @@ var ( Measurement: "Flushes", Unit: metric.Unit_COUNT, } - metaChangefeedSinkErrorRetries = metric.Metadata{ - Name: "changefeed.sink_error_retries", - Help: "Total retryable errors encountered while emitting to sinks", + metaChangefeedErrorRetries = metric.Metadata{ + Name: "changefeed.error_retries", + Help: "Total retryable errors encountered by all changefeeds", Measurement: "Errors", Unit: metric.Unit_COUNT, } @@ -174,7 +174,7 @@ type Metrics struct { EmittedMessages *metric.Counter EmittedBytes *metric.Counter Flushes *metric.Counter - SinkErrorRetries *metric.Counter + ErrorRetries *metric.Counter BufferEntriesIn *metric.Counter BufferEntriesOut *metric.Counter @@ -202,7 +202,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { EmittedMessages: metric.NewCounter(metaChangefeedEmittedMessages), EmittedBytes: metric.NewCounter(metaChangefeedEmittedBytes), Flushes: metric.NewCounter(metaChangefeedFlushes), - SinkErrorRetries: metric.NewCounter(metaChangefeedSinkErrorRetries), + ErrorRetries: metric.NewCounter(metaChangefeedErrorRetries), BufferEntriesIn: metric.NewCounter(metaChangefeedBufferEntriesIn), BufferEntriesOut: metric.NewCounter(metaChangefeedBufferEntriesOut), diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index 16d418a4c1cc..ce35d26ea357 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -11,6 +11,7 @@ package changefeedccl import ( "context" "sort" + "strings" "sync/atomic" "time" @@ -288,7 +289,11 @@ func (p *poller) rangefeedImpl(ctx context.Context) error { } frontier.Forward(span, lastHighwater) g.GoCtx(func(ctx context.Context) error { - return ds.RangeFeed(ctx, req, eventC) + err := ds.RangeFeed(ctx, req, eventC) + if _, ok := err.(*roachpb.BatchTimestampBeforeGCError); ok { + err = MarkTerminalError(err) + } + return err }) } g.GoCtx(func(ctx context.Context) error { @@ -497,7 +502,14 @@ func (p *poller) exportSpan( } if pErr != nil { - return pgerror.Wrapf(pErr.GoError(), pgerror.CodeDataExceptionError, + err := pErr.GoError() + // TODO(dan): It'd be nice to avoid this string sniffing, if possible, but + // pErr doesn't seem to have `Details` set, which would rehydrate the + // BatchTimestampBeforeGCError. + if strings.Contains(err.Error(), "must be after replica GC threshold") { + err = MarkTerminalError(err) + } + return pgerror.Wrapf(err, pgerror.CodeDataExceptionError, `fetching changes for %s`, span) } p.metrics.PollRequestNanosHist.RecordValue(exportDuration.Nanoseconds()) @@ -642,7 +654,7 @@ func clusterNodeCount(g *gossip.Gossip) int { func (p *poller) validateTable(ctx context.Context, desc *sqlbase.TableDescriptor) error { if err := validateChangefeedTable(p.details.Targets, desc); err != nil { - return err + return MarkTerminalError(err) } p.mu.Lock() defer p.mu.Unlock() diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 35a503c8afb6..a6c29b9a6ec2 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -336,13 +336,13 @@ func makeKafkaSink( if err != nil { err = pgerror.Wrapf(err, pgerror.CodeCannotConnectNowError, `connecting to kafka: %s`, bootstrapServers) - return nil, &retryableSinkError{cause: err} + return nil, err } sink.producer, err = sarama.NewAsyncProducerFromClient(sink.client) if err != nil { err = pgerror.Wrapf(err, pgerror.CodeCannotConnectNowError, `connecting to kafka: %s`, bootstrapServers) - return nil, &retryableSinkError{cause: err} + return nil, err } sink.start() @@ -405,7 +405,7 @@ func (s *kafkaSink) EmitResolvedTimestamp( topics = append(topics, topic) } if err := s.client.RefreshMetadata(topics...); err != nil { - return &retryableSinkError{cause: err} + return err } s.lastMetadataRefresh = timeutil.Now() } @@ -423,7 +423,7 @@ func (s *kafkaSink) EmitResolvedTimestamp( // be picked up and get later ones. partitions, err := s.client.Partitions(topic) if err != nil { - return &retryableSinkError{cause: err} + return err } for _, partition := range partitions { msg := &sarama.ProducerMessage{ @@ -455,9 +455,6 @@ func (s *kafkaSink) Flush(ctx context.Context) error { s.mu.Unlock() if immediateFlush { - if _, ok := errors.Cause(flushErr).(*sarama.ProducerError); ok { - flushErr = &retryableSinkError{cause: flushErr} - } return flushErr } @@ -472,9 +469,6 @@ func (s *kafkaSink) Flush(ctx context.Context) error { flushErr := s.mu.flushErr s.mu.flushErr = nil s.mu.Unlock() - if _, ok := errors.Cause(flushErr).(*sarama.ProducerError); ok { - flushErr = &retryableSinkError{cause: flushErr} - } return flushErr } } @@ -773,46 +767,3 @@ func (s *bufferSink) Close() error { s.closed = true return nil } - -// causer matches the (unexported) interface used by Go to allow errors to wrap -// their parent cause. -type causer interface { - Cause() error -} - -// String and regex used to match retryable sink errors when they have been -// "flattened" into a pgerror. -const retryableSinkErrorString = "retryable sink error" - -// retryableSinkError should be used by sinks to wrap any error which may -// be retried. -type retryableSinkError struct { - cause error -} - -func (e retryableSinkError) Error() string { - return fmt.Sprintf(retryableSinkErrorString+": %s", e.cause.Error()) -} -func (e retryableSinkError) Cause() error { return e.cause } - -// isRetryableSinkError returns true if the supplied error, or any of its parent -// causes, is a retryableSinkError. -func isRetryableSinkError(err error) bool { - for { - if _, ok := err.(*retryableSinkError); ok { - return true - } - // TODO(mrtracy): This pathway, which occurs when the retryable error is - // detected on a non-local node of the distsql flow, is only currently - // being tested with a roachtest, which is expensive. See if it can be - // tested via a unit test. - if _, ok := err.(*pgerror.Error); ok { - return strings.Contains(err.Error(), retryableSinkErrorString) - } - if e, ok := err.(causer); ok { - err = e.Cause() - continue - } - return false - } -} diff --git a/pkg/ccl/changefeedccl/table_history.go b/pkg/ccl/changefeedccl/table_history.go index 309b19089ceb..f96bf71ff7b0 100644 --- a/pkg/ccl/changefeedccl/table_history.go +++ b/pkg/ccl/changefeedccl/table_history.go @@ -11,6 +11,7 @@ package changefeedccl import ( "context" "sort" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/internal/client" @@ -243,7 +244,14 @@ func fetchTableDescriptorVersions( log.Infof(ctx, `fetched table descs (%s,%s] took %s`, startTS, endTS, timeutil.Since(start)) } if pErr != nil { - return nil, pgerror.Wrapf(pErr.GoError(), pgerror.CodeDataExceptionError, + err := pErr.GoError() + // TODO(dan): It'd be nice to avoid this string sniffing, if possible, but + // pErr doesn't seem to have `Details` set, which would rehydrate the + // BatchTimestampBeforeGCError. + if strings.Contains(err.Error(), "must be after replica GC threshold") { + err = MarkTerminalError(err) + } + return nil, pgerror.Wrapf(err, pgerror.CodeDataExceptionError, `fetching changes for %s`, span) } diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index d6e009acac28..07b7ff25f726 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -19,6 +19,12 @@ type TestingKnobs struct { AfterSinkFlush func() error // MemBufferCapacity, if non-zero, overrides memBufferDefaultCapacity. MemBufferCapacity int64 + // ConsecutiveIdenticalErrorBailoutCount is an override for the top-level + // safety net in the retry loop for non-terminal errors: if we consecutively + // receive an identical error message some number of times, we assume it + // should have been marked as terminal but wasn't. When non-zero, this is an + // override for how many times. When zero, we fall back to a default. + ConsecutiveIdenticalErrorBailoutCount int } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.