Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: switch retryable errors back to a whitelist #36852

Merged
merged 2 commits into from
Apr 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pkg/ccl/changefeedccl/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand Down Expand Up @@ -206,9 +205,6 @@ func (b *memBuffer) addRow(ctx context.Context, row tree.Datums) error {
_, err := b.mu.entries.AddRow(ctx, row)
b.mu.Unlock()
b.metrics.BufferEntriesIn.Inc(1)
if e, ok := pgerror.GetPGCause(err); ok && e.Code == pgerror.CodeOutOfMemoryError {
err = MarkTerminalError(err)
}
return err
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,6 @@ func checkpointResolvedTimestamp(
return resolved
}
if err := jobProgressedFn(ctx, progressedClosure); err != nil {
if _, ok := err.(*jobs.InvalidStatusError); ok {
err = MarkTerminalError(err)
}
return err
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
if ca.sink, err = getSink(
ca.spec.Feed.SinkURI, nodeID, ca.spec.Feed.Opts, ca.spec.Feed.Targets, ca.flowCtx.Settings,
); err != nil {
err = MarkRetryableError(err)
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
Expand All @@ -151,6 +152,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
// dependency cycles.
metrics := ca.flowCtx.JobRegistry.MetricsStruct().Changefeed.(*Metrics)
ca.sink = makeMetricsSink(metrics, ca.sink)
ca.sink = &errorWrapperSink{wrapped: ca.sink}

var knobs TestingKnobs
if cfKnobs, ok := ca.flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok {
Expand Down Expand Up @@ -418,6 +420,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {
if cf.sink, err = getSink(
cf.spec.Feed.SinkURI, nodeID, cf.spec.Feed.Opts, cf.spec.Feed.Targets, cf.flowCtx.Settings,
); err != nil {
err = MarkRetryableError(err)
cf.MoveToDraining(err)
return ctx
}
Expand All @@ -431,6 +434,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {
// dependency cycles.
cf.metrics = cf.flowCtx.JobRegistry.MetricsStruct().Changefeed.(*Metrics)
cf.sink = makeMetricsSink(cf.metrics, cf.sink)
cf.sink = &errorWrapperSink{wrapped: cf.sink}

if cf.spec.JobID != 0 {
job, err := cf.flowCtx.JobRegistry.LoadJob(ctx, cf.spec.JobID)
Expand Down
45 changes: 4 additions & 41 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -245,7 +244,7 @@ func changefeedPlanHook(

if details.SinkURI == `` {
err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
return MaybeStripTerminalErrorMarker(err)
return MaybeStripRetryableErrorMarker(err)
}

settings := p.ExecCfg().Settings
Expand All @@ -264,7 +263,7 @@ func changefeedPlanHook(
nodeID := p.ExtendedEvalContext().NodeID
canarySink, err := getSink(details.SinkURI, nodeID, details.Opts, details.Targets, settings)
if err != nil {
return MaybeStripTerminalErrorMarker(err)
return MaybeStripRetryableErrorMarker(err)
}
if err := canarySink.Close(); err != nil {
return err
Expand Down Expand Up @@ -445,58 +444,22 @@ func (b *changefeedResumer) Resume(
// We'd like to avoid failing a changefeed unnecessarily, so when an error
// bubbles up to this level, we'd like to "retry" the flow if possible. This
// could be because the sink is down or because a cockroach node has crashed
// or for many other reasons. We initially tried to whitelist which errors
// should cause the changefeed to retry, but this turns out to be brittle, so
// we switched to a blacklist. Any error that is expected to be permanent is
// now marked with `MarkTerminalError` by the time it comes out of
// `distChangefeedFlow`. Everything else should be logged loudly and retried.
// or for many other reasons.
opts := retry.Options{
InitialBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
}
const consecutiveIdenticalErrorsWindow = time.Minute
const consecutiveIdenticalErrorsDefault = int(5)
var consecutiveErrors struct {
message string
count int
firstSeen time.Time
}
var err error
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
if err = distChangefeedFlow(ctx, phs, jobID, details, progress, startedCh); err == nil {
return nil
}
if IsTerminalError(err) {
err = MaybeStripTerminalErrorMarker(err)
if !IsRetryableError(err) {
log.Warningf(ctx, `CHANGEFEED job %d returning with error: %v`, jobID, err)
return err
}

// If we receive an identical error message more than some number of times
// in a time period, bail. This is a safety net in case we miss marking
// something as terminal.
{
m, d := err.Error(), timeutil.Since(consecutiveErrors.firstSeen)
if d > consecutiveIdenticalErrorsWindow || m != consecutiveErrors.message {
consecutiveErrors.message = m
consecutiveErrors.count = 0
consecutiveErrors.firstSeen = timeutil.Now()
}
consecutiveErrors.count++
consecutiveIdenticalErrorsThreshold := consecutiveIdenticalErrorsDefault
if cfKnobs, ok := execCfg.DistSQLRunTestingKnobs.Changefeed.(*TestingKnobs); ok {
if c := cfKnobs.ConsecutiveIdenticalErrorBailoutCount; c != 0 {
consecutiveIdenticalErrorsThreshold = c
}
}
if consecutiveErrors.count >= consecutiveIdenticalErrorsThreshold {
log.Warningf(ctx, `CHANGEFEED job %d saw the same non-terminal error %d times in %s: %+v`,
jobID, consecutiveErrors.count, d, err)
return err
}
}

log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %+v`, jobID, err)
if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok {
metrics.ErrorRetries.Inc(1)
Expand Down
40 changes: 2 additions & 38 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,11 +1073,9 @@ func TestChangefeedRetryableError(t *testing.T) {
failSinkHook := func() error {
switch atomic.LoadInt64(&failSink) {
case 1:
return fmt.Errorf("unique synthetic retryable error: %s", timeutil.Now())
return MarkRetryableError(fmt.Errorf("synthetic retryable error"))
case 2:
return MarkTerminalError(fmt.Errorf("synthetic terminal error"))
case 3:
return fmt.Errorf("should be terminal but isn't")
return fmt.Errorf("synthetic terminal error")
}
return origAfterSinkFlushHook()
}
Expand Down Expand Up @@ -1126,40 +1124,6 @@ func TestChangefeedRetryableError(t *testing.T) {
require.EqualError(t, err, `synthetic terminal error`)
break
}

// The above test an error that is correctly _not marked_ as terminal and
// one that is correctly _marked_ as terminal. We're also concerned a
// terminal error that is not marked but should be. This would result in an
// indefinite retry loop, so we have a safety net that bails if we
// consecutively receive an identical error message some number of times.
// But default this safety net is disabled in tests, because we want to
// catch this. The following line enables the safety net (by setting the
// knob to the 0 value) so that we can test the safety net.
knobs.ConsecutiveIdenticalErrorBailoutCount = 0

// Set up a new feed and verify that the sink is started up.
atomic.StoreInt64(&failSink, 0)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
bar := feed(t, f, `CREATE CHANGEFEED FOR bar`)
defer closeFeed(t, bar)
sqlDB.Exec(t, `INSERT INTO bar VALUES (1)`)
assertPayloads(t, bar, []string{
`bar: [1]->{"after": {"a": 1}}`,
})

// Set sink to return a non-unique non-terminal error and insert a row.
// This simulates an error we should blacklist but have missed. The feed
// should eventually fail.
atomic.StoreInt64(&failSink, 3)
sqlDB.Exec(t, `INSERT INTO bar VALUES (2)`)
for {
_, err := bar.Next()
if err == nil {
continue
}
require.EqualError(t, err, `should be terminal but isn't`)
break
}
}

// Only the enterprise version uses jobs.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (e *confluentAvroEncoder) EncodeKey(row encodeRow) ([]byte, error) {
var err error
registered.schema, err = indexToAvroSchema(row.tableDesc, &row.tableDesc.PrimaryIndex)
if err != nil {
return nil, MarkTerminalError(err)
return nil, err
}

// NB: This uses the kafka name escaper because it has to match the name
Expand Down Expand Up @@ -296,7 +296,7 @@ func (e *confluentAvroEncoder) EncodeValue(row encodeRow) ([]byte, error) {
if !ok {
afterDataSchema, err := tableToAvroSchema(row.tableDesc)
if err != nil {
return nil, MarkTerminalError(err)
return nil, err
}

opts := avroEnvelopeOpts{afterField: true, updatedField: e.updatedField}
Expand Down
52 changes: 30 additions & 22 deletions pkg/ccl/changefeedccl/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,46 +11,54 @@ package changefeedccl
import (
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
)

const terminalErrorString = "terminal changefeed error"
const retryableErrorString = "retryable changefeed error"

type terminalError struct {
type retryableError struct {
wrapped error
}

// MarkTerminalError wraps the given error, marking it as non-retryable to
// MarkRetryableError wraps the given error, marking it as retryable to
// changefeeds.
func MarkTerminalError(e error) error {
return &terminalError{wrapped: e}
func MarkRetryableError(e error) error {
return &retryableError{wrapped: e}
}

// Error implements the error interface.
func (e *terminalError) Error() string {
return fmt.Sprintf("%s: %s", terminalErrorString, e.wrapped.Error())
func (e *retryableError) Error() string {
return fmt.Sprintf("%s: %s", retryableErrorString, e.wrapped.Error())
}

// Cause implements the github.com/pkg/errors.causer interface.
func (e *terminalError) Cause() error { return e.wrapped }
func (e *retryableError) Cause() error { return e.wrapped }

// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is
// planned to be moved to the stdlib in go 1.13.
func (e *terminalError) Unwrap() error { return e.wrapped }
func (e *retryableError) Unwrap() error { return e.wrapped }

// IsTerminalError returns true if the supplied error, or any of its parent
// causes, is a IsTerminalError.
func IsTerminalError(err error) bool {
// IsRetryableError returns true if the supplied error, or any of its parent
// causes, is a IsRetryableError.
func IsRetryableError(err error) bool {
for {
if err == nil {
return false
}
if _, ok := err.(*terminalError); ok {
if _, ok := err.(*retryableError); ok {
return true
}
errStr := err.Error()
if strings.Contains(errStr, retryableErrorString) {
// If a RetryableError occurs on a remote node, DistSQL serializes it such
// that we can't recover the structure and we have to rely on this
// unfortunate string comparison.
return true
}
if _, ok := err.(*pgerror.Error); ok {
return strings.Contains(err.Error(), terminalErrorString)
if strings.Contains(errStr, `rpc error`) {
// When a crdb node dies, any DistSQL flows with processors scheduled on
// it get an error with "rpc error" in the message from the call to
// `(*DistSQLPlanner).Run`.
return true
}
if e, ok := err.(interface{ Unwrap() error }); ok {
err = e.Unwrap()
Expand All @@ -60,11 +68,11 @@ func IsTerminalError(err error) bool {
}
}

// MaybeStripTerminalErrorMarker performs some minimal attempt to clean the
// TerminalError marker out. This won't do anything if the TerminalError itself
// has been wrapped, but that's okay, we'll just have an uglier string.
func MaybeStripTerminalErrorMarker(err error) error {
if e, ok := err.(*terminalError); ok {
// MaybeStripRetryableErrorMarker performs some minimal attempt to clean the
// RetryableError marker out. This won't do anything if the RetryableError
// itself has been wrapped, but that's okay, we'll just have an uglier string.
func MaybeStripRetryableErrorMarker(err error) error {
if e, ok := err.(*retryableError); ok {
err = e.wrapped
}
return err
Expand Down
10 changes: 1 addition & 9 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
gosql "database/sql"
gojson "encoding/json"
"fmt"
"math"
"net/url"
"reflect"
"sort"
Expand Down Expand Up @@ -209,11 +208,7 @@ func expectResolvedTimestampAvro(
func sinklessTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) {
return func(t *testing.T) {
ctx := context.Background()
knobs := base.TestingKnobs{DistSQL: &distsqlrun.TestingKnobs{Changefeed: &TestingKnobs{
// Disable the safety net for errors that should be marked as terminal by
// weren't. We want to catch these in tests.
ConsecutiveIdenticalErrorBailoutCount: math.MaxInt32,
}}}
knobs := base.TestingKnobs{DistSQL: &distsqlrun.TestingKnobs{Changefeed: &TestingKnobs{}}}
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: knobs,
UseDatabase: `d`,
Expand Down Expand Up @@ -254,9 +249,6 @@ func enterpriseTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory))
}
return nil
},
// Disable the safety net for errors that should be marked as terminal by
// weren't. We want to catch these in tests.
ConsecutiveIdenticalErrorBailoutCount: math.MaxInt32,
}}}

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Expand Down
15 changes: 2 additions & 13 deletions pkg/ccl/changefeedccl/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package changefeedccl
import (
"context"
"sort"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -289,11 +288,7 @@ func (p *poller) rangefeedImpl(ctx context.Context) error {
}
frontier.Forward(span, lastHighwater)
g.GoCtx(func(ctx context.Context) error {
err := ds.RangeFeed(ctx, req, eventC)
if _, ok := err.(*roachpb.BatchTimestampBeforeGCError); ok {
err = MarkTerminalError(err)
}
return err
return ds.RangeFeed(ctx, req, eventC)
})
}
g.GoCtx(func(ctx context.Context) error {
Expand Down Expand Up @@ -504,12 +499,6 @@ func (p *poller) exportSpan(

if pErr != nil {
err := pErr.GoError()
// TODO(dan): It'd be nice to avoid this string sniffing, if possible, but
// pErr doesn't seem to have `Details` set, which would rehydrate the
// BatchTimestampBeforeGCError.
if strings.Contains(err.Error(), "must be after replica GC threshold") {
err = MarkTerminalError(err)
}
return pgerror.Wrapf(err, pgerror.CodeDataExceptionError,
`fetching changes for %s`, span)
}
Expand Down Expand Up @@ -655,7 +644,7 @@ func clusterNodeCount(g *gossip.Gossip) int {

func (p *poller) validateTable(ctx context.Context, desc *sqlbase.TableDescriptor) error {
if err := validateChangefeedTable(p.details.Targets, desc); err != nil {
return MarkTerminalError(err)
return err
}
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func (c *rowFetcherCache) TableDescForKey(
// own caching.
tableDesc, _, err = c.leaseMgr.Acquire(ctx, ts, tableID)
if err != nil {
return nil, err
// LeaseManager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, MarkRetryableError(err)
}
// Immediately release the lease, since we only need it for the exact
// timestamp requested.
Expand Down
Loading