Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-19.1: changefeedccl: improve retryable error whitelisting #37092

Merged
merged 2 commits into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
if ca.sink, err = getSink(
ca.spec.Feed.SinkURI, nodeID, ca.spec.Feed.Opts, ca.spec.Feed.Targets, ca.flowCtx.Settings,
); err != nil {
err = MarkRetryableError(err)
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
Expand All @@ -151,6 +152,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
// dependency cycles.
metrics := ca.flowCtx.JobRegistry.MetricsStruct().Changefeed.(*Metrics)
ca.sink = makeMetricsSink(metrics, ca.sink)
ca.sink = &errorWrapperSink{wrapped: ca.sink}

var knobs TestingKnobs
if cfKnobs, ok := ca.flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok {
Expand Down Expand Up @@ -418,6 +420,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {
if cf.sink, err = getSink(
cf.spec.Feed.SinkURI, nodeID, cf.spec.Feed.Opts, cf.spec.Feed.Targets, cf.flowCtx.Settings,
); err != nil {
err = MarkRetryableError(err)
cf.MoveToDraining(err)
return ctx
}
Expand All @@ -431,6 +434,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {
// dependency cycles.
cf.metrics = cf.flowCtx.JobRegistry.MetricsStruct().Changefeed.(*Metrics)
cf.sink = makeMetricsSink(cf.metrics, cf.sink)
cf.sink = &errorWrapperSink{wrapped: cf.sink}

if cf.spec.JobID != 0 {
job, err := cf.flowCtx.JobRegistry.LoadJob(ctx, cf.spec.JobID)
Expand Down
86 changes: 39 additions & 47 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package changefeedccl
import (
"context"
"net/url"
"regexp"
"sort"
"time"

Expand Down Expand Up @@ -239,7 +238,8 @@ func changefeedPlanHook(
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets)))

if details.SinkURI == `` {
return distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
return MaybeStripRetryableErrorMarker(err)
}

settings := p.ExecCfg().Settings
Expand All @@ -258,12 +258,7 @@ func changefeedPlanHook(
nodeID := p.ExtendedEvalContext().NodeID
canarySink, err := getSink(details.SinkURI, nodeID, details.Opts, details.Targets, settings)
if err != nil {
// In this context, we don't want to retry even retryable errors from the
// sync. Unwrap any retryable errors encountered.
if rErr, ok := err.(*retryableSinkError); ok {
return rErr.cause
}
return err
return MaybeStripRetryableErrorMarker(err)
}
if err := canarySink.Close(); err != nil {
return err
Expand Down Expand Up @@ -423,56 +418,64 @@ func (b *changefeedResumer) Resume(
ctx context.Context, job *jobs.Job, planHookState interface{}, startedCh chan<- tree.Datums,
) error {
phs := planHookState.(sql.PlanHookState)
execCfg := phs.ExecCfg()
jobID := *job.ID()
details := job.Details().(jobspb.ChangefeedDetails)
progress := job.Progress()

// Errors encountered while emitting changes to the Sink may be transient; for
// example, a temporary network outage. When one of these errors occurs, we do
// not fail the job but rather restart the distSQL flow after a short backoff.
// TODO(dan): This is a workaround for not being able to set an initial
// progress high-water when creating a job (currently only the progress
// details can be set). I didn't want to pick off the refactor to get this
// fix in, but it'd be nice to remove this hack.
if _, ok := details.Opts[optCursor]; ok {
if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) {
progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime}
}
}

// We'd like to avoid failing a changefeed unnecessarily, so when an error
// bubbles up to this level, we'd like to "retry" the flow if possible. This
// could be because the sink is down or because a cockroach node has crashed
// or for many other reasons.
opts := retry.Options{
InitialBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
}
var err error
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
// TODO(dan): This is a workaround for not being able to set an initial
// progress high-water when creating a job (currently only the progress
// details can be set). I didn't want to pick off the refactor to get this
// fix in, but it'd be nice to remove this hack.
if _, ok := details.Opts[optCursor]; ok {
if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) {
progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime}
}
if err = distChangefeedFlow(ctx, phs, jobID, details, progress, startedCh); err == nil {
return nil
}
if !IsRetryableError(err) {
log.Warningf(ctx, `CHANGEFEED job %d returning with error: %+v`, jobID, err)
return err
}

err = distChangefeedFlow(ctx, phs, *job.ID(), details, progress, startedCh)
if !isRetryableSinkError(err) && !isRetryableRPCError(err) {
break
log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %v`, jobID, err)
if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok {
metrics.ErrorRetries.Inc(1)
}
log.Infof(ctx, `CHANGEFEED job %d encountered retryable error: %v`, *job.ID(), err)
// Re-load the job in order to update our progress object, which may have
// been updated by the changeFrontier processor since the flow started.
reloadedJob, phsErr := phs.ExecCfg().JobRegistry.LoadJob(ctx, *job.ID())
if phsErr != nil {
err = phsErr
break
reloadedJob, reloadErr := execCfg.JobRegistry.LoadJob(ctx, jobID)
if reloadErr != nil {
log.Warningf(ctx, `CHANGEFEED job %d could not reload job progress; `+
`continuing from last known high-water of %s: %v`,
jobID, progress.GetHighWater(), reloadErr)
} else {
progress = reloadedJob.Progress()
}
progress = reloadedJob.Progress()

// startedCh is normally used to signal back to the creator of the job that
// the job has started; however, in this case nothing will ever receive
// on the channel, causing the changefeed flow to block. Replace it with
// a dummy channel.
startedCh = make(chan tree.Datums, 1)
if metrics, ok := phs.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok {
metrics.SinkErrorRetries.Inc(1)
}
continue
}
if err != nil {
log.Infof(ctx, `CHANGEFEED job %d returning with error: %v`, *job.ID(), err)
}
return err
// We only hit this if `r.Next()` returns false, which right now only happens
// on context cancellation.
return errors.Wrap(err, `ran out of retries`)
}

func (b *changefeedResumer) OnFailOrCancel(context.Context, *client.Txn, *jobs.Job) error { return nil }
Expand All @@ -488,14 +491,3 @@ func changefeedResumeHook(typ jobspb.Type, _ *cluster.Settings) jobs.Resumer {
}
return &changefeedResumer{}
}

// Retryable RPC Error represents a gRPC error which indicates a retryable
// situation such as a connected node going down. In this case the DistSQL flow
// should be retried.
const retryableErrorStr = "rpc error|node unavailable"

var retryableErrorRegex = regexp.MustCompile(retryableErrorStr)

func isRetryableRPCError(err error) bool {
return retryableErrorRegex.MatchString(err.Error())
}
49 changes: 29 additions & 20 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,61 +1052,70 @@ func TestChangefeedMonitoring(t *testing.T) {
t.Run(`poller`, pollerTest(sinklessTest, testFn))
}

func TestChangefeedRetryableSinkError(t *testing.T) {
func TestChangefeedRetryableError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
origAfterSinkFlushHook := f.Server().(*server.TestServer).Cfg.TestingKnobs.
knobs := f.Server().(*server.TestServer).Cfg.TestingKnobs.
DistSQL.(*distsqlrun.TestingKnobs).
Changefeed.(*TestingKnobs).AfterSinkFlush
Changefeed.(*TestingKnobs)
origAfterSinkFlushHook := knobs.AfterSinkFlush
var failSink int64
failSinkHook := func() error {
if atomic.LoadInt64(&failSink) != 0 {
return &retryableSinkError{cause: fmt.Errorf("synthetic retryable error")}
switch atomic.LoadInt64(&failSink) {
case 1:
return MarkRetryableError(fmt.Errorf("synthetic retryable error"))
case 2:
return fmt.Errorf("synthetic terminal error")
}
return origAfterSinkFlushHook()
}
f.Server().(*server.TestServer).Cfg.TestingKnobs.
DistSQL.(*distsqlrun.TestingKnobs).
Changefeed.(*TestingKnobs).AfterSinkFlush = failSinkHook
knobs.AfterSinkFlush = failSinkHook

// Set up a new feed and verify that the sink is started up.
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
foo := feed(t, f, `CREATE CHANGEFEED FOR foo`)
defer closeFeed(t, foo)

// Verify that the sink is started up.
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1}}`,
})

// Set SQL Sink to return a retryable error.
// Set sink to return unique retryable errors and insert a row. Verify that
// sink is failing requests.
atomic.StoreInt64(&failSink, 1)

// Insert 1 row while the sink is failing.
sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`)

// Verify that sink is failing requests.
registry := f.Server().JobRegistry().(*jobs.Registry)
retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).SinkErrorRetries
retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).ErrorRetries
testutils.SucceedsSoon(t, func() error {
if retryCounter.Counter.Count() < 3 {
return fmt.Errorf("insufficient sink error retries detected")
return fmt.Errorf("insufficient error retries detected")
}
return nil
})

// Fix the sink and insert another row.
// Fix the sink and insert another row. Check that nothing funky happened.
atomic.StoreInt64(&failSink, 0)
sqlDB.Exec(t, `INSERT INTO foo VALUES (3)`)

// Check that nothing funky happened.
assertPayloads(t, foo, []string{
`foo: [2]->{"after": {"a": 2}}`,
`foo: [3]->{"after": {"a": 3}}`,
})

// Set sink to return a terminal error and insert a row. Ensure that we
// eventually get the error message back out.
atomic.StoreInt64(&failSink, 2)
sqlDB.Exec(t, `INSERT INTO foo VALUES (4)`)
for {
_, err := foo.Next()
if err == nil {
continue
}
require.EqualError(t, err, `synthetic terminal error`)
break
}
}

// Only the enterprise version uses jobs.
Expand Down
79 changes: 79 additions & 0 deletions pkg/ccl/changefeedccl/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"fmt"
"strings"
)

const retryableErrorString = "retryable changefeed error"

type retryableError struct {
wrapped error
}

// MarkRetryableError wraps the given error, marking it as retryable to
// changefeeds.
func MarkRetryableError(e error) error {
return &retryableError{wrapped: e}
}

// Error implements the error interface.
func (e *retryableError) Error() string {
return fmt.Sprintf("%s: %s", retryableErrorString, e.wrapped.Error())
}

// Cause implements the github.com/pkg/errors.causer interface.
func (e *retryableError) Cause() error { return e.wrapped }

// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is
// planned to be moved to the stdlib in go 1.13.
func (e *retryableError) Unwrap() error { return e.wrapped }

// IsRetryableError returns true if the supplied error, or any of its parent
// causes, is a IsRetryableError.
func IsRetryableError(err error) bool {
for {
if err == nil {
return false
}
if _, ok := err.(*retryableError); ok {
return true
}
errStr := err.Error()
if strings.Contains(errStr, retryableErrorString) {
// If a RetryableError occurs on a remote node, DistSQL serializes it such
// that we can't recover the structure and we have to rely on this
// unfortunate string comparison.
return true
}
if strings.Contains(errStr, `rpc error`) {
// When a crdb node dies, any DistSQL flows with processors scheduled on
// it get an error with "rpc error" in the message from the call to
// `(*DistSQLPlanner).Run`.
return true
}
if e, ok := err.(interface{ Unwrap() error }); ok {
err = e.Unwrap()
continue
}
return false
}
}

// MaybeStripRetryableErrorMarker performs some minimal attempt to clean the
// RetryableError marker out. This won't do anything if the RetryableError
// itself has been wrapped, but that's okay, we'll just have an uglier string.
func MaybeStripRetryableErrorMarker(err error) error {
if e, ok := err.(*retryableError); ok {
err = e.wrapped
}
return err
}
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ var (
Measurement: "Flushes",
Unit: metric.Unit_COUNT,
}
metaChangefeedSinkErrorRetries = metric.Metadata{
Name: "changefeed.sink_error_retries",
Help: "Total retryable errors encountered while emitting to sinks",
metaChangefeedErrorRetries = metric.Metadata{
Name: "changefeed.error_retries",
Help: "Total retryable errors encountered by all changefeeds",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
Expand Down Expand Up @@ -174,7 +174,7 @@ type Metrics struct {
EmittedMessages *metric.Counter
EmittedBytes *metric.Counter
Flushes *metric.Counter
SinkErrorRetries *metric.Counter
ErrorRetries *metric.Counter
BufferEntriesIn *metric.Counter
BufferEntriesOut *metric.Counter

Expand Down Expand Up @@ -202,7 +202,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
EmittedMessages: metric.NewCounter(metaChangefeedEmittedMessages),
EmittedBytes: metric.NewCounter(metaChangefeedEmittedBytes),
Flushes: metric.NewCounter(metaChangefeedFlushes),
SinkErrorRetries: metric.NewCounter(metaChangefeedSinkErrorRetries),
ErrorRetries: metric.NewCounter(metaChangefeedErrorRetries),
BufferEntriesIn: metric.NewCounter(metaChangefeedBufferEntriesIn),
BufferEntriesOut: metric.NewCounter(metaChangefeedBufferEntriesOut),

Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func (c *rowFetcherCache) TableDescForKey(
// own caching.
tableDesc, _, err = c.leaseMgr.Acquire(ctx, ts, tableID)
if err != nil {
return nil, err
// LeaseManager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, MarkRetryableError(err)
}
// Immediately release the lease, since we only need it for the exact
// timestamp requested.
Expand Down
Loading