Skip to content

Commit

Permalink
Revert "changefeedccl: switch high-level retry marker from whitelist …
Browse files Browse the repository at this point in the history
…to blacklist"

This reverts commit 200567d.

Release note: None
  • Loading branch information
danhhz committed Apr 15, 2019
1 parent 3edef1c commit c429118
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 268 deletions.
4 changes: 0 additions & 4 deletions pkg/ccl/changefeedccl/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand Down Expand Up @@ -206,9 +205,6 @@ func (b *memBuffer) addRow(ctx context.Context, row tree.Datums) error {
_, err := b.mu.entries.AddRow(ctx, row)
b.mu.Unlock()
b.metrics.BufferEntriesIn.Inc(1)
if e, ok := pgerror.GetPGCause(err); ok && e.Code == pgerror.CodeOutOfMemoryError {
err = MarkTerminalError(err)
}
return err
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,6 @@ func checkpointResolvedTimestamp(
return resolved
}
if err := jobProgressedFn(ctx, progressedClosure); err != nil {
if _, ok := err.(*jobs.InvalidStatusError); ok {
err = MarkTerminalError(err)
}
return err
}
}
Expand Down
125 changes: 50 additions & 75 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package changefeedccl
import (
"context"
"net/url"
"regexp"
"sort"
"time"

Expand All @@ -30,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -244,8 +244,7 @@ func changefeedPlanHook(
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets)))

if details.SinkURI == `` {
err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
return MaybeStripTerminalErrorMarker(err)
return distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
}

settings := p.ExecCfg().Settings
Expand All @@ -264,7 +263,16 @@ func changefeedPlanHook(
nodeID := p.ExtendedEvalContext().NodeID
canarySink, err := getSink(details.SinkURI, nodeID, details.Opts, details.Targets, settings)
if err != nil {
return MaybeStripTerminalErrorMarker(err)
// In this context, we don't want to retry even retryable errors from the
// sync. Unwrap any retryable errors encountered.
//
// TODO(knz): This error handling is suspicious (see #35854
// and #35920). What if the error is wrapped? Or has been
// flattened into a pgerror.Error?
if rErr, ok := err.(*retryableSinkError); ok {
return rErr.cause
}
return err
}
if err := canarySink.Close(); err != nil {
return err
Expand Down Expand Up @@ -427,100 +435,56 @@ func (b *changefeedResumer) Resume(
ctx context.Context, planHookState interface{}, startedCh chan<- tree.Datums,
) error {
phs := planHookState.(sql.PlanHookState)
execCfg := phs.ExecCfg()
jobID := *b.job.ID()
details := b.job.Details().(jobspb.ChangefeedDetails)
progress := b.job.Progress()

// TODO(dan): This is a workaround for not being able to set an initial
// progress high-water when creating a job (currently only the progress
// details can be set). I didn't want to pick off the refactor to get this
// fix in, but it'd be nice to remove this hack.
if _, ok := details.Opts[optCursor]; ok {
if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) {
progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime}
}
}

// We'd like to avoid failing a changefeed unnecessarily, so when an error
// bubbles up to this level, we'd like to "retry" the flow if possible. This
// could be because the sink is down or because a cockroach node has crashed
// or for many other reasons. We initially tried to whitelist which errors
// should cause the changefeed to retry, but this turns out to be brittle, so
// we switched to a blacklist. Any error that is expected to be permanent is
// now marked with `MarkTerminalError` by the time it comes out of
// `distChangefeedFlow`. Everything else should be logged loudly and retried.
// Errors encountered while emitting changes to the Sink may be transient; for
// example, a temporary network outage. When one of these errors occurs, we do
// not fail the job but rather restart the distSQL flow after a short backoff.
opts := retry.Options{
InitialBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
}
const consecutiveIdenticalErrorsWindow = time.Minute
const consecutiveIdenticalErrorsDefault = int(5)
var consecutiveErrors struct {
message string
count int
firstSeen time.Time
}
var err error
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
if err = distChangefeedFlow(ctx, phs, jobID, details, progress, startedCh); err == nil {
return nil
}
if IsTerminalError(err) {
err = MaybeStripTerminalErrorMarker(err)
log.Warningf(ctx, `CHANGEFEED job %d returning with error: %v`, jobID, err)
return err
}

// If we receive an identical error message more than some number of times
// in a time period, bail. This is a safety net in case we miss marking
// something as terminal.
{
m, d := err.Error(), timeutil.Since(consecutiveErrors.firstSeen)
if d > consecutiveIdenticalErrorsWindow || m != consecutiveErrors.message {
consecutiveErrors.message = m
consecutiveErrors.count = 0
consecutiveErrors.firstSeen = timeutil.Now()
}
consecutiveErrors.count++
consecutiveIdenticalErrorsThreshold := consecutiveIdenticalErrorsDefault
if cfKnobs, ok := execCfg.DistSQLRunTestingKnobs.Changefeed.(*TestingKnobs); ok {
if c := cfKnobs.ConsecutiveIdenticalErrorBailoutCount; c != 0 {
consecutiveIdenticalErrorsThreshold = c
}
}
if consecutiveErrors.count >= consecutiveIdenticalErrorsThreshold {
log.Warningf(ctx, `CHANGEFEED job %d saw the same non-terminal error %d times in %s: %+v`,
jobID, consecutiveErrors.count, d, err)
return err
// TODO(dan): This is a workaround for not being able to set an initial
// progress high-water when creating a job (currently only the progress
// details can be set). I didn't want to pick off the refactor to get this
// fix in, but it'd be nice to remove this hack.
if _, ok := details.Opts[optCursor]; ok {
if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) {
progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime}
}
}

log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %+v`, jobID, err)
if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok {
metrics.ErrorRetries.Inc(1)
err = distChangefeedFlow(ctx, phs, *b.job.ID(), details, progress, startedCh)
if !isRetryableSinkError(err) && !isRetryableRPCError(err) {
break
}
log.Infof(ctx, `CHANGEFEED job %d encountered retryable error: %v`, *b.job.ID(), err)
// Re-load the job in order to update our progress object, which may have
// been updated by the changeFrontier processor since the flow started.
reloadedJob, reloadErr := execCfg.JobRegistry.LoadJob(ctx, jobID)
if reloadErr != nil {
log.Warningf(ctx, `CHANGEFEED job %d could not reload job progress; `+
`continuing from last known high-water of %s: %v`,
jobID, progress.GetHighWater(), reloadErr)
} else {
progress = reloadedJob.Progress()
reloadedJob, phsErr := phs.ExecCfg().JobRegistry.LoadJob(ctx, *b.job.ID())
if phsErr != nil {
err = phsErr
break
}

progress = reloadedJob.Progress()
// startedCh is normally used to signal back to the creator of the job that
// the job has started; however, in this case nothing will ever receive
// on the channel, causing the changefeed flow to block. Replace it with
// a dummy channel.
startedCh = make(chan tree.Datums, 1)
if metrics, ok := phs.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok {
metrics.SinkErrorRetries.Inc(1)
}
continue
}
// We only hit this if `r.Next()` returns false, which right now only happens
// on context cancellation.
return errors.Wrap(err, `ran out of retries`)
if err != nil {
log.Infof(ctx, `CHANGEFEED job %d returning with error: %v`, *b.job.ID(), err)
}
return err
}

// OnFailOrCancel is part of the jobs.Resumer interface.
Expand All @@ -531,3 +495,14 @@ func (b *changefeedResumer) OnSuccess(context.Context, *client.Txn) error { retu

// OnTerminal is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnTerminal(context.Context, jobs.Status, chan<- tree.Datums) {}

// Retryable RPC Error represents a gRPC error which indicates a retryable
// situation such as a connected node going down. In this case the DistSQL flow
// should be retried.
const retryableErrorStr = "rpc error|node unavailable"

var retryableErrorRegex = regexp.MustCompile(retryableErrorStr)

func isRetryableRPCError(err error) bool {
return retryableErrorRegex.MatchString(err.Error())
}
85 changes: 20 additions & 65 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,106 +1060,61 @@ func TestChangefeedMonitoring(t *testing.T) {
t.Run(`poller`, pollerTest(sinklessTest, testFn))
}

func TestChangefeedRetryableError(t *testing.T) {
func TestChangefeedRetryableSinkError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
knobs := f.Server().(*server.TestServer).Cfg.TestingKnobs.
origAfterSinkFlushHook := f.Server().(*server.TestServer).Cfg.TestingKnobs.
DistSQL.(*distsqlrun.TestingKnobs).
Changefeed.(*TestingKnobs)
origAfterSinkFlushHook := knobs.AfterSinkFlush
Changefeed.(*TestingKnobs).AfterSinkFlush
var failSink int64
failSinkHook := func() error {
switch atomic.LoadInt64(&failSink) {
case 1:
return fmt.Errorf("unique synthetic retryable error: %s", timeutil.Now())
case 2:
return MarkTerminalError(fmt.Errorf("synthetic terminal error"))
case 3:
return fmt.Errorf("should be terminal but isn't")
if atomic.LoadInt64(&failSink) != 0 {
return &retryableSinkError{cause: fmt.Errorf("synthetic retryable error")}
}
return origAfterSinkFlushHook()
}
knobs.AfterSinkFlush = failSinkHook
f.Server().(*server.TestServer).Cfg.TestingKnobs.
DistSQL.(*distsqlrun.TestingKnobs).
Changefeed.(*TestingKnobs).AfterSinkFlush = failSinkHook

// Set up a new feed and verify that the sink is started up.
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
foo := feed(t, f, `CREATE CHANGEFEED FOR foo`)
defer closeFeed(t, foo)

// Verify that the sink is started up.
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1}}`,
})

// Set sink to return unique retryable errors and insert a row. Verify that
// sink is failing requests.
// Set SQL Sink to return a retryable error.
atomic.StoreInt64(&failSink, 1)

// Insert 1 row while the sink is failing.
sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`)

// Verify that sink is failing requests.
registry := f.Server().JobRegistry().(*jobs.Registry)
retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).ErrorRetries
retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).SinkErrorRetries
testutils.SucceedsSoon(t, func() error {
if retryCounter.Counter.Count() < 3 {
return fmt.Errorf("insufficient error retries detected")
return fmt.Errorf("insufficient sink error retries detected")
}
return nil
})

// Fix the sink and insert another row. Check that nothing funky happened.
// Fix the sink and insert another row.
atomic.StoreInt64(&failSink, 0)
sqlDB.Exec(t, `INSERT INTO foo VALUES (3)`)

// Check that nothing funky happened.
assertPayloads(t, foo, []string{
`foo: [2]->{"after": {"a": 2}}`,
`foo: [3]->{"after": {"a": 3}}`,
})

// Set sink to return a terminal error and insert a row. Ensure that we
// eventually get the error message back out.
atomic.StoreInt64(&failSink, 2)
sqlDB.Exec(t, `INSERT INTO foo VALUES (4)`)
for {
_, err := foo.Next()
if err == nil {
continue
}
require.EqualError(t, err, `synthetic terminal error`)
break
}

// The above test an error that is correctly _not marked_ as terminal and
// one that is correctly _marked_ as terminal. We're also concerned a
// terminal error that is not marked but should be. This would result in an
// indefinite retry loop, so we have a safety net that bails if we
// consecutively receive an identical error message some number of times.
// But default this safety net is disabled in tests, because we want to
// catch this. The following line enables the safety net (by setting the
// knob to the 0 value) so that we can test the safety net.
knobs.ConsecutiveIdenticalErrorBailoutCount = 0

// Set up a new feed and verify that the sink is started up.
atomic.StoreInt64(&failSink, 0)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
bar := feed(t, f, `CREATE CHANGEFEED FOR bar`)
defer closeFeed(t, bar)
sqlDB.Exec(t, `INSERT INTO bar VALUES (1)`)
assertPayloads(t, bar, []string{
`bar: [1]->{"after": {"a": 1}}`,
})

// Set sink to return a non-unique non-terminal error and insert a row.
// This simulates an error we should blacklist but have missed. The feed
// should eventually fail.
atomic.StoreInt64(&failSink, 3)
sqlDB.Exec(t, `INSERT INTO bar VALUES (2)`)
for {
_, err := bar.Next()
if err == nil {
continue
}
require.EqualError(t, err, `should be terminal but isn't`)
break
}
}

// Only the enterprise version uses jobs.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (e *confluentAvroEncoder) EncodeKey(row encodeRow) ([]byte, error) {
var err error
registered.schema, err = indexToAvroSchema(row.tableDesc, &row.tableDesc.PrimaryIndex)
if err != nil {
return nil, MarkTerminalError(err)
return nil, err
}

// NB: This uses the kafka name escaper because it has to match the name
Expand Down Expand Up @@ -296,7 +296,7 @@ func (e *confluentAvroEncoder) EncodeValue(row encodeRow) ([]byte, error) {
if !ok {
afterDataSchema, err := tableToAvroSchema(row.tableDesc)
if err != nil {
return nil, MarkTerminalError(err)
return nil, err
}

opts := avroEnvelopeOpts{afterField: true, updatedField: e.updatedField}
Expand Down
Loading

0 comments on commit c429118

Please sign in to comment.