Skip to content

Commit

Permalink
changefeedccl: switch high-level retry marker from whitelist to black…
Browse files Browse the repository at this point in the history
…list

In the roachtests for crdb-chaos and sink-chaos we're seeing changefeeds
fail with surprising errors:

    [NotLeaseHolderError] r681: replica (n1,s1):1 not lease holder; replica (n2,s2):2 is

    descriptor not found

We'd like to avoid failing a changefeed unnecessarily, so when an error
bubbles up to the top level, we'd like to retry the distributed flow if
possible. We initially tried to whitelist which errors should cause the
changefeed to retry, but this turns out to be brittle, so this commit
switches to a blacklist. Any error that is expected to be permanent is
now marked with `MarkTerminalError` by the time it comes out of
`distChangefeedFlow`. Everything else should be logged loudly and
retried.

Touches cockroachdb#35974
Touches cockroachdb#36019

Release note: None
  • Loading branch information
danhhz committed Mar 25, 2019
1 parent 98ab268 commit feb1f25
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 117 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand Down Expand Up @@ -208,6 +209,9 @@ func (b *memBuffer) addRow(ctx context.Context, row tree.Datums) error {
_, err := b.mu.entries.AddRow(ctx, row)
b.mu.Unlock()
b.metrics.BufferEntriesIn.Inc(1)
if e, ok := pgerror.GetPGCause(err); ok && e.Code == pgerror.CodeOutOfMemoryError {
err = MarkTerminalError(err)
}
return err
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ func checkpointResolvedTimestamp(
return resolved
}
if err := jobProgressedFn(ctx, progressedClosure); err != nil {
if _, ok := err.(*jobs.InvalidStatusError); ok {
err = MarkTerminalError(err)
}
return err
}
}
Expand Down
92 changes: 44 additions & 48 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package changefeedccl
import (
"context"
"net/url"
"regexp"
"sort"
"time"

Expand Down Expand Up @@ -239,7 +238,8 @@ func changefeedPlanHook(
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets)))

if details.SinkURI == `` {
return distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
return MaybeStripTerminalErrorMarker(err)
}

settings := p.ExecCfg().Settings
Expand All @@ -258,16 +258,7 @@ func changefeedPlanHook(
nodeID := p.ExtendedEvalContext().NodeID
canarySink, err := getSink(details.SinkURI, nodeID, details.Opts, details.Targets, settings)
if err != nil {
// In this context, we don't want to retry even retryable errors from the
// sync. Unwrap any retryable errors encountered.
//
// TODO(knz): This error handling is suspicious (see #35854
// and #35920). What if the error is wrapped? Or has been
// flattened into a pgerror.Error?
if rErr, ok := err.(*retryableSinkError); ok {
return rErr.cause
}
return err
return MaybeStripTerminalErrorMarker(err)
}
if err := canarySink.Close(); err != nil {
return err
Expand Down Expand Up @@ -427,54 +418,70 @@ func (b *changefeedResumer) Resume(
ctx context.Context, job *jobs.Job, planHookState interface{}, startedCh chan<- tree.Datums,
) error {
phs := planHookState.(sql.PlanHookState)
jobID := *job.ID()
details := job.Details().(jobspb.ChangefeedDetails)
progress := job.Progress()

// Errors encountered while emitting changes to the Sink may be transient; for
// example, a temporary network outage. When one of these errors occurs, we do
// not fail the job but rather restart the distSQL flow after a short backoff.
// TODO(dan): This is a workaround for not being able to set an initial
// progress high-water when creating a job (currently only the progress
// details can be set). I didn't want to pick off the refactor to get this
// fix in, but it'd be nice to remove this hack.
if _, ok := details.Opts[optCursor]; ok {
if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) {
progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime}
}
}

// WIP I'm worried about what happens if we miss adding something to the
// blacklist. Should we bail after receiving some consecutive number of
// identical error messages?

// We'd like to avoid failing a changefeed unnecessarily, so when an error
// bubbles up to this level, we'd like to "retry" the flow if possible. This
// could be because the sink is down or because a cockroach node has crashed
// or for many other reasons. We initially tried to whitelist which errors
// should cause the changefeed to retry, but this turns out to be brittle, so
// we switched to a blacklist. Any error that is expected to be permanent is
// now marked with `MarkTerminalError` by the time it comes out of
// `distChangefeedFlow`. Everything else should be logged loudly and retried.
opts := retry.Options{
InitialBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
}
var err error
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
// TODO(dan): This is a workaround for not being able to set an initial
// progress high-water when creating a job (currently only the progress
// details can be set). I didn't want to pick off the refactor to get this
// fix in, but it'd be nice to remove this hack.
if _, ok := details.Opts[optCursor]; ok {
if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) {
progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime}
}
if err = distChangefeedFlow(ctx, phs, jobID, details, progress, startedCh); err == nil {
break
}

err = distChangefeedFlow(ctx, phs, *job.ID(), details, progress, startedCh)
if !isRetryableSinkError(err) && !isRetryableRPCError(err) {
if IsTerminalError(err) {
break
}
log.Infof(ctx, `CHANGEFEED job %d encountered retryable error: %v`, *job.ID(), err)

log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %+v`, jobID, err)
if metrics, ok := phs.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok {
metrics.ErrorRetries.Inc(1)
}
// Re-load the job in order to update our progress object, which may have
// been updated by the changeFrontier processor since the flow started.
reloadedJob, phsErr := phs.ExecCfg().JobRegistry.LoadJob(ctx, *job.ID())
if phsErr != nil {
err = phsErr
break
reloadedJob, reloadErr := phs.ExecCfg().JobRegistry.LoadJob(ctx, jobID)
if reloadErr != nil {
log.Warningf(ctx, `CHANGEFEED job %d could not reload job progress; `+
`continuing from last known high-water of %s: %v`,
jobID, progress.GetHighWater(), reloadErr)
} else {
progress = reloadedJob.Progress()
}
progress = reloadedJob.Progress()

// startedCh is normally used to signal back to the creator of the job that
// the job has started; however, in this case nothing will ever receive
// on the channel, causing the changefeed flow to block. Replace it with
// a dummy channel.
startedCh = make(chan tree.Datums, 1)
if metrics, ok := phs.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok {
metrics.SinkErrorRetries.Inc(1)
}
continue
}
if err != nil {
log.Infof(ctx, `CHANGEFEED job %d returning with error: %v`, *job.ID(), err)
err = MaybeStripTerminalErrorMarker(err)
log.Warningf(ctx, `CHANGEFEED job %d returning with error: %v`, jobID, err)
}
return err
}
Expand All @@ -492,14 +499,3 @@ func changefeedResumeHook(typ jobspb.Type, _ *cluster.Settings) jobs.Resumer {
}
return &changefeedResumer{}
}

// Retryable RPC Error represents a gRPC error which indicates a retryable
// situation such as a connected node going down. In this case the DistSQL flow
// should be retried.
const retryableErrorStr = "rpc error|node unavailable"

var retryableErrorRegex = regexp.MustCompile(retryableErrorStr)

func isRetryableRPCError(err error) bool {
return retryableErrorRegex.MatchString(err.Error())
}
27 changes: 22 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ func TestChangefeedMonitoring(t *testing.T) {
t.Run(`poller`, pollerTest(sinklessTest, testFn))
}

func TestChangefeedRetryableSinkError(t *testing.T) {
func TestChangefeedRetryableError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()

Expand All @@ -1062,8 +1062,11 @@ func TestChangefeedRetryableSinkError(t *testing.T) {
Changefeed.(*TestingKnobs).AfterSinkFlush
var failSink int64
failSinkHook := func() error {
if atomic.LoadInt64(&failSink) != 0 {
return &retryableSinkError{cause: fmt.Errorf("synthetic retryable error")}
switch atomic.LoadInt64(&failSink) {
case 1:
return fmt.Errorf("synthetic retryable error")
case 2:
return MarkTerminalError(fmt.Errorf("synthetic non-retryable error"))
}
return origAfterSinkFlushHook()
}
Expand All @@ -1090,10 +1093,10 @@ func TestChangefeedRetryableSinkError(t *testing.T) {

// Verify that sink is failing requests.
registry := f.Server().JobRegistry().(*jobs.Registry)
retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).SinkErrorRetries
retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).ErrorRetries
testutils.SucceedsSoon(t, func() error {
if retryCounter.Counter.Count() < 3 {
return fmt.Errorf("insufficient sink error retries detected")
return fmt.Errorf("insufficient error retries detected")
}
return nil
})
Expand All @@ -1107,6 +1110,20 @@ func TestChangefeedRetryableSinkError(t *testing.T) {
`foo: [2]->{"after": {"a": 2}}`,
`foo: [3]->{"after": {"a": 3}}`,
})

// Set SQL Sink to return a terminal error and insert one last row.
atomic.StoreInt64(&failSink, 2)
sqlDB.Exec(t, `INSERT INTO foo VALUES (4)`)

// Ensure that we eventually get the error message back out.
for {
_, err := foo.Next()
if err == nil {
continue
}
require.EqualError(t, err, `synthetic non-retryable error`)
break
}
}

// Only the enterprise version uses jobs.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (e *confluentAvroEncoder) EncodeKey(row encodeRow) ([]byte, error) {
var err error
registered.schema, err = indexToAvroSchema(row.tableDesc, &row.tableDesc.PrimaryIndex)
if err != nil {
return nil, err
return nil, MarkTerminalError(err)
}

// NB: This uses the kafka name escaper because it has to match the name
Expand Down Expand Up @@ -295,7 +295,7 @@ func (e *confluentAvroEncoder) EncodeValue(row encodeRow) ([]byte, error) {
if !ok {
afterDataSchema, err := tableToAvroSchema(row.tableDesc)
if err != nil {
return nil, err
return nil, MarkTerminalError(err)
}

opts := avroEnvelopeOpts{afterField: true, updatedField: e.updatedField}
Expand Down
68 changes: 68 additions & 0 deletions pkg/ccl/changefeedccl/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
)

const terminalErrorString = "terminal changefeed error"

type terminalError struct {
wrapped error
}

// MarkTerminalError wraps the given error, marking it as non-retryable to
// changefeeds.
func MarkTerminalError(e error) error {
return &terminalError{wrapped: e}
}

// Error implements the error interface.
func (e *terminalError) Error() string {
return fmt.Sprintf("%s: %s", terminalErrorString, e.wrapped.Error())
}

// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is
// planned to be moved to the stdlib in go 1.13.
func (e *terminalError) Unwrap() error { return e.wrapped }

// IsTerminalError returns true if the supplied error, or any of its parent
// causes, is a IsTerminalError.
func IsTerminalError(err error) bool {
for {
if err == nil {
return false
}
if _, ok := err.(*terminalError); ok {
return true
}
if _, ok := err.(*pgerror.Error); ok {
return strings.Contains(err.Error(), terminalErrorString)
}
if e, ok := err.(interface{ Unwrap() error }); ok {
err = e.Unwrap()
continue
}
return false
}
}

// MaybeStripTerminalErrorMarker performs some minimal attempt to clean the
// TerminalError marker out. This won't do anything if the TerminalError itself
// has been wrapped, but that's okay, we'll just have an uglier string.
func MaybeStripTerminalErrorMarker(err error) error {
if e, ok := err.(*terminalError); ok {
err = e.wrapped
}
return err
}
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ var (
Measurement: "Flushes",
Unit: metric.Unit_COUNT,
}
metaChangefeedSinkErrorRetries = metric.Metadata{
Name: "changefeed.sink_error_retries",
Help: "Total retryable errors encountered while emitting to sinks",
metaChangefeedErrorRetries = metric.Metadata{
Name: "changefeed.error_retries",
Help: "Total retryable errors encountered by all changefeeds",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
Expand Down Expand Up @@ -174,7 +174,7 @@ type Metrics struct {
EmittedMessages *metric.Counter
EmittedBytes *metric.Counter
Flushes *metric.Counter
SinkErrorRetries *metric.Counter
ErrorRetries *metric.Counter
BufferEntriesIn *metric.Counter
BufferEntriesOut *metric.Counter

Expand Down Expand Up @@ -202,7 +202,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
EmittedMessages: metric.NewCounter(metaChangefeedEmittedMessages),
EmittedBytes: metric.NewCounter(metaChangefeedEmittedBytes),
Flushes: metric.NewCounter(metaChangefeedFlushes),
SinkErrorRetries: metric.NewCounter(metaChangefeedSinkErrorRetries),
ErrorRetries: metric.NewCounter(metaChangefeedErrorRetries),
BufferEntriesIn: metric.NewCounter(metaChangefeedBufferEntriesIn),
BufferEntriesOut: metric.NewCounter(metaChangefeedBufferEntriesOut),

Expand Down
15 changes: 12 additions & 3 deletions pkg/ccl/changefeedccl/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package changefeedccl
import (
"context"
"sort"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -288,7 +289,11 @@ func (p *poller) rangefeedImpl(ctx context.Context) error {
}
frontier.Forward(span, lastHighwater)
g.GoCtx(func(ctx context.Context) error {
return ds.RangeFeed(ctx, req, eventC)
err := ds.RangeFeed(ctx, req, eventC)
if err != nil && strings.Contains(err.Error(), "must be after replica GC threshold") {
err = MarkTerminalError(err)
}
return err
})
}
g.GoCtx(func(ctx context.Context) error {
Expand Down Expand Up @@ -497,7 +502,11 @@ func (p *poller) exportSpan(
}

if pErr != nil {
return pgerror.Wrapf(pErr.GoError(), pgerror.CodeDataExceptionError,
err := pErr.GoError()
if strings.Contains(err.Error(), "must be after replica GC threshold") {
err = MarkTerminalError(err)
}
return pgerror.Wrapf(err, pgerror.CodeDataExceptionError,
`fetching changes for %s`, span)
}
p.metrics.PollRequestNanosHist.RecordValue(exportDuration.Nanoseconds())
Expand Down Expand Up @@ -642,7 +651,7 @@ func clusterNodeCount(g *gossip.Gossip) int {

func (p *poller) validateTable(ctx context.Context, desc *sqlbase.TableDescriptor) error {
if err := validateChangefeedTable(p.details.Targets, desc); err != nil {
return err
return MarkTerminalError(err)
}
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
Loading

0 comments on commit feb1f25

Please sign in to comment.