From 04f4b5482a756745d1e9047fb7dd16f2d00f9f89 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Thu, 4 Apr 2019 15:03:58 -0700 Subject: [PATCH] changefeedccl: switch retryable errors back to a whitelist For a while, the cdc/crdb-chaos and cdc/sink-chaos roachtests have been failing because an error that should be marked as retryable wasn't. As a result of the discussion in #35974, I tried switching from a whitelist (retryable error) to a blacklist (terminal error) in #36132, but on reflection this doesn't seem like a great idea. We added a safety net to prevent false negatives from retrying indefinitely but it was immediately apparent that this meant we needed to tune the retry loop parameters. Better is to just do the due diligence of investigating the errors that should be retried and retrying them. The commit is intended for backport into 19.1 once it's baked for a bit. Closes #35974 Closes #36018 Closes #36019 Closes #36432 Release note (bug fix): `CHANGEFEED` now retry instead of erroring in more situations --- .../changefeedccl/changefeed_processors.go | 4 + pkg/ccl/changefeedccl/changefeed_stmt.go | 90 ++++++++--------- pkg/ccl/changefeedccl/changefeed_test.go | 49 ++++++---- pkg/ccl/changefeedccl/errors.go | 79 +++++++++++++++ pkg/ccl/changefeedccl/metrics.go | 10 +- pkg/ccl/changefeedccl/poller.go | 3 +- pkg/ccl/changefeedccl/rowfetcher_cache.go | 4 +- pkg/ccl/changefeedccl/sink.go | 97 +++++++++---------- pkg/ccl/changefeedccl/table_history.go | 3 +- pkg/cmd/roachtest/cdc.go | 2 - 10 files changed, 207 insertions(+), 134 deletions(-) create mode 100644 pkg/ccl/changefeedccl/errors.go diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 58c6bd5467f7..789c7daec2e2 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -125,6 +125,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { if ca.sink, err = getSink( ca.spec.Feed.SinkURI, nodeID, ca.spec.Feed.Opts, ca.spec.Feed.Targets, ca.flowCtx.Settings, ); err != nil { + err = MarkRetryableError(err) // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() @@ -151,6 +152,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { // dependency cycles. metrics := ca.flowCtx.JobRegistry.MetricsStruct().Changefeed.(*Metrics) ca.sink = makeMetricsSink(metrics, ca.sink) + ca.sink = &errorWrapperSink{wrapped: ca.sink} var knobs TestingKnobs if cfKnobs, ok := ca.flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok { @@ -418,6 +420,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context { if cf.sink, err = getSink( cf.spec.Feed.SinkURI, nodeID, cf.spec.Feed.Opts, cf.spec.Feed.Targets, cf.flowCtx.Settings, ); err != nil { + err = MarkRetryableError(err) cf.MoveToDraining(err) return ctx } @@ -431,6 +434,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context { // dependency cycles. cf.metrics = cf.flowCtx.JobRegistry.MetricsStruct().Changefeed.(*Metrics) cf.sink = makeMetricsSink(cf.metrics, cf.sink) + cf.sink = &errorWrapperSink{wrapped: cf.sink} if cf.spec.JobID != 0 { job, err := cf.flowCtx.JobRegistry.LoadJob(ctx, cf.spec.JobID) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index e9b0215b477b..fdb0b8bea89c 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -11,7 +11,6 @@ package changefeedccl import ( "context" "net/url" - "regexp" "sort" "time" @@ -244,7 +243,8 @@ func changefeedPlanHook( telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets))) if details.SinkURI == `` { - return distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh) + err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh) + return MaybeStripRetryableErrorMarker(err) } settings := p.ExecCfg().Settings @@ -263,16 +263,7 @@ func changefeedPlanHook( nodeID := p.ExtendedEvalContext().NodeID canarySink, err := getSink(details.SinkURI, nodeID, details.Opts, details.Targets, settings) if err != nil { - // In this context, we don't want to retry even retryable errors from the - // sync. Unwrap any retryable errors encountered. - // - // TODO(knz): This error handling is suspicious (see #35854 - // and #35920). What if the error is wrapped? Or has been - // flattened into a pgerror.Error? - if rErr, ok := err.(*retryableSinkError); ok { - return rErr.cause - } - return err + return MaybeStripRetryableErrorMarker(err) } if err := canarySink.Close(); err != nil { return err @@ -435,12 +426,25 @@ func (b *changefeedResumer) Resume( ctx context.Context, planHookState interface{}, startedCh chan<- tree.Datums, ) error { phs := planHookState.(sql.PlanHookState) + execCfg := phs.ExecCfg() + jobID := *b.job.ID() details := b.job.Details().(jobspb.ChangefeedDetails) progress := b.job.Progress() - // Errors encountered while emitting changes to the Sink may be transient; for - // example, a temporary network outage. When one of these errors occurs, we do - // not fail the job but rather restart the distSQL flow after a short backoff. + // TODO(dan): This is a workaround for not being able to set an initial + // progress high-water when creating a job (currently only the progress + // details can be set). I didn't want to pick off the refactor to get this + // fix in, but it'd be nice to remove this hack. + if _, ok := details.Opts[optCursor]; ok { + if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) { + progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime} + } + } + + // We'd like to avoid failing a changefeed unnecessarily, so when an error + // bubbles up to this level, we'd like to "retry" the flow if possible. This + // could be because the sink is down or because a cockroach node has crashed + // or for many other reasons. opts := retry.Options{ InitialBackoff: 5 * time.Millisecond, Multiplier: 2, @@ -448,43 +452,38 @@ func (b *changefeedResumer) Resume( } var err error for r := retry.StartWithCtx(ctx, opts); r.Next(); { - // TODO(dan): This is a workaround for not being able to set an initial - // progress high-water when creating a job (currently only the progress - // details can be set). I didn't want to pick off the refactor to get this - // fix in, but it'd be nice to remove this hack. - if _, ok := details.Opts[optCursor]; ok { - if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) { - progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime} - } + if err = distChangefeedFlow(ctx, phs, jobID, details, progress, startedCh); err == nil { + return nil + } + if !IsRetryableError(err) { + log.Warningf(ctx, `CHANGEFEED job %d returning with error: %v`, jobID, err) + return err } - err = distChangefeedFlow(ctx, phs, *b.job.ID(), details, progress, startedCh) - if !isRetryableSinkError(err) && !isRetryableRPCError(err) { - break + log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %+v`, jobID, err) + if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { + metrics.ErrorRetries.Inc(1) } - log.Infof(ctx, `CHANGEFEED job %d encountered retryable error: %v`, *b.job.ID(), err) // Re-load the job in order to update our progress object, which may have // been updated by the changeFrontier processor since the flow started. - reloadedJob, phsErr := phs.ExecCfg().JobRegistry.LoadJob(ctx, *b.job.ID()) - if phsErr != nil { - err = phsErr - break + reloadedJob, reloadErr := execCfg.JobRegistry.LoadJob(ctx, jobID) + if reloadErr != nil { + log.Warningf(ctx, `CHANGEFEED job %d could not reload job progress; `+ + `continuing from last known high-water of %s: %v`, + jobID, progress.GetHighWater(), reloadErr) + } else { + progress = reloadedJob.Progress() } - progress = reloadedJob.Progress() + // startedCh is normally used to signal back to the creator of the job that // the job has started; however, in this case nothing will ever receive // on the channel, causing the changefeed flow to block. Replace it with // a dummy channel. startedCh = make(chan tree.Datums, 1) - if metrics, ok := phs.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { - metrics.SinkErrorRetries.Inc(1) - } - continue - } - if err != nil { - log.Infof(ctx, `CHANGEFEED job %d returning with error: %v`, *b.job.ID(), err) } - return err + // We only hit this if `r.Next()` returns false, which right now only happens + // on context cancellation. + return errors.Wrap(err, `ran out of retries`) } // OnFailOrCancel is part of the jobs.Resumer interface. @@ -495,14 +494,3 @@ func (b *changefeedResumer) OnSuccess(context.Context, *client.Txn) error { retu // OnTerminal is part of the jobs.Resumer interface. func (b *changefeedResumer) OnTerminal(context.Context, jobs.Status, chan<- tree.Datums) {} - -// Retryable RPC Error represents a gRPC error which indicates a retryable -// situation such as a connected node going down. In this case the DistSQL flow -// should be retried. -const retryableErrorStr = "rpc error|node unavailable" - -var retryableErrorRegex = regexp.MustCompile(retryableErrorStr) - -func isRetryableRPCError(err error) bool { - return retryableErrorRegex.MatchString(err.Error()) -} diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 9d491e333a7a..6b3b4742cb3e 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1060,61 +1060,70 @@ func TestChangefeedMonitoring(t *testing.T) { t.Run(`poller`, pollerTest(sinklessTest, testFn)) } -func TestChangefeedRetryableSinkError(t *testing.T) { +func TestChangefeedRetryableError(t *testing.T) { defer leaktest.AfterTest(t)() defer utilccl.TestingEnableEnterprise()() testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { - origAfterSinkFlushHook := f.Server().(*server.TestServer).Cfg.TestingKnobs. + knobs := f.Server().(*server.TestServer).Cfg.TestingKnobs. DistSQL.(*distsqlrun.TestingKnobs). - Changefeed.(*TestingKnobs).AfterSinkFlush + Changefeed.(*TestingKnobs) + origAfterSinkFlushHook := knobs.AfterSinkFlush var failSink int64 failSinkHook := func() error { - if atomic.LoadInt64(&failSink) != 0 { - return &retryableSinkError{cause: fmt.Errorf("synthetic retryable error")} + switch atomic.LoadInt64(&failSink) { + case 1: + return MarkRetryableError(fmt.Errorf("synthetic retryable error")) + case 2: + return fmt.Errorf("synthetic terminal error") } return origAfterSinkFlushHook() } - f.Server().(*server.TestServer).Cfg.TestingKnobs. - DistSQL.(*distsqlrun.TestingKnobs). - Changefeed.(*TestingKnobs).AfterSinkFlush = failSinkHook + knobs.AfterSinkFlush = failSinkHook + // Set up a new feed and verify that the sink is started up. sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) defer closeFeed(t, foo) - - // Verify that the sink is started up. sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`) assertPayloads(t, foo, []string{ `foo: [1]->{"after": {"a": 1}}`, }) - // Set SQL Sink to return a retryable error. + // Set sink to return unique retryable errors and insert a row. Verify that + // sink is failing requests. atomic.StoreInt64(&failSink, 1) - - // Insert 1 row while the sink is failing. sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`) - - // Verify that sink is failing requests. registry := f.Server().JobRegistry().(*jobs.Registry) - retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).SinkErrorRetries + retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).ErrorRetries testutils.SucceedsSoon(t, func() error { if retryCounter.Counter.Count() < 3 { - return fmt.Errorf("insufficient sink error retries detected") + return fmt.Errorf("insufficient error retries detected") } return nil }) - // Fix the sink and insert another row. + // Fix the sink and insert another row. Check that nothing funky happened. atomic.StoreInt64(&failSink, 0) sqlDB.Exec(t, `INSERT INTO foo VALUES (3)`) - - // Check that nothing funky happened. assertPayloads(t, foo, []string{ `foo: [2]->{"after": {"a": 2}}`, `foo: [3]->{"after": {"a": 3}}`, }) + + // Set sink to return a terminal error and insert a row. Ensure that we + // eventually get the error message back out. + atomic.StoreInt64(&failSink, 2) + sqlDB.Exec(t, `INSERT INTO foo VALUES (4)`) + for { + _, err := foo.Next() + if err == nil { + continue + } + require.EqualError(t, err, `synthetic terminal error`) + break + } } // Only the enterprise version uses jobs. diff --git a/pkg/ccl/changefeedccl/errors.go b/pkg/ccl/changefeedccl/errors.go new file mode 100644 index 000000000000..ff21f896cd39 --- /dev/null +++ b/pkg/ccl/changefeedccl/errors.go @@ -0,0 +1,79 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "fmt" + "strings" +) + +const retryableErrorString = "retryable changefeed error" + +type retryableError struct { + wrapped error +} + +// MarkRetryableError wraps the given error, marking it as retryable to +// changefeeds. +func MarkRetryableError(e error) error { + return &retryableError{wrapped: e} +} + +// Error implements the error interface. +func (e *retryableError) Error() string { + return fmt.Sprintf("%s: %s", retryableErrorString, e.wrapped.Error()) +} + +// Cause implements the github.com/pkg/errors.causer interface. +func (e *retryableError) Cause() error { return e.wrapped } + +// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is +// planned to be moved to the stdlib in go 1.13. +func (e *retryableError) Unwrap() error { return e.wrapped } + +// IsRetryableError returns true if the supplied error, or any of its parent +// causes, is a IsRetryableError. +func IsRetryableError(err error) bool { + for { + if err == nil { + return false + } + if _, ok := err.(*retryableError); ok { + return true + } + errStr := err.Error() + if strings.Contains(errStr, retryableErrorString) { + // If a RetryableError occurs on a remote node, DistSQL serializes it such + // that we can't recover the structure and we have to rely on this + // unfortunate string comparison. + return true + } + if strings.Contains(errStr, `rpc error`) { + // When a crdb node dies, any DistSQL flows with processors scheduled on + // it get an error with "rpc error" in the message from the call to + // `(*DistSQLPlanner).Run`. + return true + } + if e, ok := err.(interface{ Unwrap() error }); ok { + err = e.Unwrap() + continue + } + return false + } +} + +// MaybeStripRetryableErrorMarker performs some minimal attempt to clean the +// RetryableError marker out. This won't do anything if the RetryableError +// itself has been wrapped, but that's okay, we'll just have an uglier string. +func MaybeStripRetryableErrorMarker(err error) error { + if e, ok := err.(*retryableError); ok { + err = e.wrapped + } + return err +} diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 52084002c1cd..4f88ebb3c840 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -95,9 +95,9 @@ var ( Measurement: "Flushes", Unit: metric.Unit_COUNT, } - metaChangefeedSinkErrorRetries = metric.Metadata{ - Name: "changefeed.sink_error_retries", - Help: "Total retryable errors encountered while emitting to sinks", + metaChangefeedErrorRetries = metric.Metadata{ + Name: "changefeed.error_retries", + Help: "Total retryable errors encountered by all changefeeds", Measurement: "Errors", Unit: metric.Unit_COUNT, } @@ -174,7 +174,7 @@ type Metrics struct { EmittedMessages *metric.Counter EmittedBytes *metric.Counter Flushes *metric.Counter - SinkErrorRetries *metric.Counter + ErrorRetries *metric.Counter BufferEntriesIn *metric.Counter BufferEntriesOut *metric.Counter @@ -202,7 +202,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { EmittedMessages: metric.NewCounter(metaChangefeedEmittedMessages), EmittedBytes: metric.NewCounter(metaChangefeedEmittedBytes), Flushes: metric.NewCounter(metaChangefeedFlushes), - SinkErrorRetries: metric.NewCounter(metaChangefeedSinkErrorRetries), + ErrorRetries: metric.NewCounter(metaChangefeedErrorRetries), BufferEntriesIn: metric.NewCounter(metaChangefeedBufferEntriesIn), BufferEntriesOut: metric.NewCounter(metaChangefeedBufferEntriesOut), diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index f62af00d95bc..69a16e50a630 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -498,7 +498,8 @@ func (p *poller) exportSpan( } if pErr != nil { - return pgerror.Wrapf(pErr.GoError(), pgerror.CodeDataExceptionError, + err := pErr.GoError() + return pgerror.Wrapf(err, pgerror.CodeDataExceptionError, `fetching changes for %s`, span) } p.metrics.PollRequestNanosHist.RecordValue(exportDuration.Nanoseconds()) diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index 36ef89d674b1..ec15fa836d2d 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -52,7 +52,9 @@ func (c *rowFetcherCache) TableDescForKey( // own caching. tableDesc, _, err = c.leaseMgr.Acquire(ctx, ts, tableID) if err != nil { - return nil, err + // LeaseManager can return all kinds of errors during chaos, but based on + // its usage, none of them should ever be terminal. + return nil, MarkRetryableError(err) } // Immediately release the lease, since we only need it for the exact // timestamp requested. diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 35a503c8afb6..640d2fac069f 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -204,6 +204,46 @@ func getSink( return s, nil } +// errorWrapperSink delegates to another sink and marks all returned errors as +// retryable. During changefeed setup, we use the sink once without this to +// verify configuration, but in the steady state, no sink error should be +// terminal. +type errorWrapperSink struct { + wrapped Sink +} + +func (s errorWrapperSink) EmitRow( + ctx context.Context, table *sqlbase.TableDescriptor, key, value []byte, updated hlc.Timestamp, +) error { + if err := s.wrapped.EmitRow(ctx, table, key, value, updated); err != nil { + return MarkRetryableError(err) + } + return nil +} + +func (s errorWrapperSink) EmitResolvedTimestamp( + ctx context.Context, encoder Encoder, resolved hlc.Timestamp, +) error { + if err := s.wrapped.EmitResolvedTimestamp(ctx, encoder, resolved); err != nil { + return MarkRetryableError(err) + } + return nil +} + +func (s errorWrapperSink) Flush(ctx context.Context) error { + if err := s.wrapped.Flush(ctx); err != nil { + return MarkRetryableError(err) + } + return nil +} + +func (s errorWrapperSink) Close() error { + if err := s.wrapped.Close(); err != nil { + return MarkRetryableError(err) + } + return nil +} + type kafkaLogAdapter struct { ctx context.Context } @@ -336,13 +376,13 @@ func makeKafkaSink( if err != nil { err = pgerror.Wrapf(err, pgerror.CodeCannotConnectNowError, `connecting to kafka: %s`, bootstrapServers) - return nil, &retryableSinkError{cause: err} + return nil, err } sink.producer, err = sarama.NewAsyncProducerFromClient(sink.client) if err != nil { err = pgerror.Wrapf(err, pgerror.CodeCannotConnectNowError, `connecting to kafka: %s`, bootstrapServers) - return nil, &retryableSinkError{cause: err} + return nil, err } sink.start() @@ -405,7 +445,7 @@ func (s *kafkaSink) EmitResolvedTimestamp( topics = append(topics, topic) } if err := s.client.RefreshMetadata(topics...); err != nil { - return &retryableSinkError{cause: err} + return err } s.lastMetadataRefresh = timeutil.Now() } @@ -423,7 +463,7 @@ func (s *kafkaSink) EmitResolvedTimestamp( // be picked up and get later ones. partitions, err := s.client.Partitions(topic) if err != nil { - return &retryableSinkError{cause: err} + return err } for _, partition := range partitions { msg := &sarama.ProducerMessage{ @@ -455,9 +495,6 @@ func (s *kafkaSink) Flush(ctx context.Context) error { s.mu.Unlock() if immediateFlush { - if _, ok := errors.Cause(flushErr).(*sarama.ProducerError); ok { - flushErr = &retryableSinkError{cause: flushErr} - } return flushErr } @@ -472,9 +509,6 @@ func (s *kafkaSink) Flush(ctx context.Context) error { flushErr := s.mu.flushErr s.mu.flushErr = nil s.mu.Unlock() - if _, ok := errors.Cause(flushErr).(*sarama.ProducerError); ok { - flushErr = &retryableSinkError{cause: flushErr} - } return flushErr } } @@ -773,46 +807,3 @@ func (s *bufferSink) Close() error { s.closed = true return nil } - -// causer matches the (unexported) interface used by Go to allow errors to wrap -// their parent cause. -type causer interface { - Cause() error -} - -// String and regex used to match retryable sink errors when they have been -// "flattened" into a pgerror. -const retryableSinkErrorString = "retryable sink error" - -// retryableSinkError should be used by sinks to wrap any error which may -// be retried. -type retryableSinkError struct { - cause error -} - -func (e retryableSinkError) Error() string { - return fmt.Sprintf(retryableSinkErrorString+": %s", e.cause.Error()) -} -func (e retryableSinkError) Cause() error { return e.cause } - -// isRetryableSinkError returns true if the supplied error, or any of its parent -// causes, is a retryableSinkError. -func isRetryableSinkError(err error) bool { - for { - if _, ok := err.(*retryableSinkError); ok { - return true - } - // TODO(mrtracy): This pathway, which occurs when the retryable error is - // detected on a non-local node of the distsql flow, is only currently - // being tested with a roachtest, which is expensive. See if it can be - // tested via a unit test. - if _, ok := err.(*pgerror.Error); ok { - return strings.Contains(err.Error(), retryableSinkErrorString) - } - if e, ok := err.(causer); ok { - err = e.Cause() - continue - } - return false - } -} diff --git a/pkg/ccl/changefeedccl/table_history.go b/pkg/ccl/changefeedccl/table_history.go index 309b19089ceb..db6a61b771dc 100644 --- a/pkg/ccl/changefeedccl/table_history.go +++ b/pkg/ccl/changefeedccl/table_history.go @@ -243,7 +243,8 @@ func fetchTableDescriptorVersions( log.Infof(ctx, `fetched table descs (%s,%s] took %s`, startTS, endTS, timeutil.Since(start)) } if pErr != nil { - return nil, pgerror.Wrapf(pErr.GoError(), pgerror.CodeDataExceptionError, + err := pErr.GoError() + return nil, pgerror.Wrapf(err, pgerror.CodeDataExceptionError, `fetching changes for %s`, span) } diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 64a01d5497bd..4c6cc23dabfd 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -524,7 +524,6 @@ func registerCDC(r *registry) { }) r.Add(testSpec{ Name: fmt.Sprintf("cdc/sink-chaos/rangefeed=%t", useRangeFeed), - Skip: `#36432`, MinVersion: "v2.1.0", Cluster: makeClusterSpec(4, cpu(16)), Run: func(ctx context.Context, t *test, c *cluster) { @@ -541,7 +540,6 @@ func registerCDC(r *registry) { }) r.Add(testSpec{ Name: fmt.Sprintf("cdc/crdb-chaos/rangefeed=%t", useRangeFeed), - Skip: `#36432`, MinVersion: "v2.1.0", Cluster: makeClusterSpec(4, cpu(16)), Run: func(ctx context.Context, t *test, c *cluster) {