Skip to content

Commit

Permalink
VReplication: Prevent Orphaned VDiff2 Jobs (vitessio#11768)
Browse files Browse the repository at this point in the history
* Prevent orphaned VDiffs in two ways...

1. When opening the engine, restart any vdiffs that are in the
   started state as this indicates it did not complete and was
   unable to save the final state and must be restarted.
2. When a vdiff run fails, retry saving the error state with an
   exponential backoff until the engine shuts down. This way
   the normal retry mechanism will kick in OR #1 will kick in
   when the engine is next opened on the primary tablet.

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Handle failures before vdiff_table records are created

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Add more ephemeral client errors

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Show vdiff state of error even if no vdiff_table records

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Minor cleanup

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Add vdiff2 unit tests

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Add unit test for retry

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Small cleanup

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Addressing review comments and other improvements

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Use warning log for ... warnings :-)

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Minor touch ups

Signed-off-by: Matt Lord <mattalord@gmail.com>

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Dec 4, 2022
1 parent 22067c7 commit 39cbe7d
Show file tree
Hide file tree
Showing 8 changed files with 775 additions and 55 deletions.
16 changes: 14 additions & 2 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ const (

// Error codes for client-side errors.
// Originally found in include/mysql/errmsg.h and
// https://dev.mysql.com/doc/refman/5.7/en/error-messages-client.html
// https://dev.mysql.com/doc/mysql-errors/en/client-error-reference.html
const (
// CRUnknownError is CR_UNKNOWN_ERROR
CRUnknownError = 2000
Expand All @@ -286,6 +286,10 @@ const (
// This is returned if a connection via a TCP socket fails.
CRConnHostError = 2003

// CRUnknownHost is CR_UNKNOWN_HOST
// This is returned if the host name cannot be resolved.
CRUnknownHost = 2005

// CRServerGone is CR_SERVER_GONE_ERROR.
// This is returned if the client tries to send a command but it fails.
CRServerGone = 2006
Expand Down Expand Up @@ -325,7 +329,7 @@ const (

// Error codes for server-side errors.
// Originally found in include/mysql/mysqld_error.h and
// https://dev.mysql.com/doc/refman/5.7/en/error-messages-server.html
// https://dev.mysql.com/doc/mysql-errors/en/server-error-reference.html
// The below are in sorted order by value, grouped by vterror code they should be bucketed into.
// See above reference for more information on each code.
const (
Expand Down Expand Up @@ -543,6 +547,9 @@ const (
ERJSONDocumentTooDeep = 3157
ERWrongValue = 1525

// max execution time exceeded
ERQueryTimeout = 3024

ErrCantCreateGeometryObject = 1416
ErrGISDataWrongEndianess = 3055
ErrNotImplementedForCartesianSRS = 3704
Expand Down Expand Up @@ -677,8 +684,12 @@ func IsEphemeralError(err error) bool {
CRConnHostError,
CRMalformedPacket,
CRNamedPipeStateError,
CRServerHandshakeErr,
CRServerGone,
CRServerLost,
CRSSLConnectionError,
CRUnknownError,
CRUnknownHost,
ERCantCreateThread,
ERDiskFull,
ERForcingClose,
Expand All @@ -689,6 +700,7 @@ func IsEphemeralError(err error) bool {
ERInternalError,
ERLockDeadlock,
ERLockWaitTimeout,
ERQueryTimeout,
EROutOfMemory,
EROutOfResources,
EROutOfSortMemory,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/vdiff2.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st
// on every shard.
if shardStateCounts[vdiff.StoppedState] > 0 {
summary.State = vdiff.StoppedState
} else if tableStateCounts[vdiff.ErrorState] > 0 {
} else if shardStateCounts[vdiff.ErrorState] > 0 || tableStateCounts[vdiff.ErrorState] > 0 {
summary.State = vdiff.ErrorState
} else if tableStateCounts[vdiff.StartedState] > 0 {
summary.State = vdiff.StartedState
Expand Down
67 changes: 60 additions & 7 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -127,18 +128,20 @@ func (ct *controller) run(ctx context.Context) {
row := qr.Named().Row()
state := VDiffState(strings.ToLower(row["state"].ToString()))
switch state {
case PendingState:
log.Infof("Starting vdiff %s", ct.uuid)
case PendingState, StartedState:
action := "Starting"
if state == StartedState {
action = "Restarting"
}
log.Infof("%s vdiff %s", action, ct.uuid)
if err := ct.start(ctx, dbClient); err != nil {
log.Errorf("Encountered an error for vdiff %s: %s", ct.uuid, err)
insertVDiffLog(ctx, dbClient, ct.id, fmt.Sprintf("Error: %s", err))
if err = ct.updateState(dbClient, ErrorState, err); err != nil {
log.Errorf("Encountered an error marking vdiff %s as errored: %v", ct.uuid, err)
if err := ct.saveErrorState(ctx, err); err != nil {
log.Errorf("Unable to save error state for vdiff %s; giving up because %s", ct.uuid, err.Error())
}
return
}
default:
log.Infof("VDiff %s was not marked as pending, doing nothing", state)
log.Infof("VDiff %s was not marked as runnable (state: %s), doing nothing", ct.uuid, state)
}
}

Expand Down Expand Up @@ -271,3 +274,53 @@ func (ct *controller) validate() error {
// TODO: check if vreplication workflow has errors, what else?
return nil
}

// saveErrorState saves the error state for the vdiff in the database.
// It never gives up trying to save the error state, unless the context
// has been cancelled or the done channel has been closed -- indicating
// that the engine is closing or the vdiff has been explicitly stopped.
// Note that when the engine is later opened the started vdiff will be
// restarted even though we were unable to save the error state.
// It uses exponential backoff with a factor of 1.5 to avoid creating
// too many database connections.
func (ct *controller) saveErrorState(ctx context.Context, saveErr error) error {
retryDelay := 100 * time.Millisecond
maxRetryDelay := 60 * time.Second
save := func() error {
dbClient := ct.vde.dbClientFactoryFiltered()
if err := dbClient.Connect(); err != nil {
return err
}
defer dbClient.Close()

if err := ct.updateState(dbClient, ErrorState, saveErr); err != nil {
return err
}
insertVDiffLog(ctx, dbClient, ct.id, fmt.Sprintf("Error: %s", saveErr))

return nil
}

for {
if err := save(); err != nil {
log.Warningf("Failed to persist vdiff error state: %v. Will retry in %s", err, retryDelay.String())
select {
case <-ctx.Done():
return fmt.Errorf("engine is shutting down")
case <-ct.done:
return fmt.Errorf("vdiff was stopped")
case <-time.After(retryDelay):
if retryDelay < maxRetryDelay {
retryDelay = time.Duration(float64(retryDelay) * 1.5)
if retryDelay > maxRetryDelay {
retryDelay = maxRetryDelay
}
}
continue
}
}

// Success
return nil
}
}
35 changes: 20 additions & 15 deletions go/vt/vttablet/tabletmanager/vdiff/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,16 @@ func (vde *Engine) Open(ctx context.Context, vre *vreplication.Engine) {
}

func (vde *Engine) openLocked(ctx context.Context) error {
// Start any pending VDiffs
rows, err := vde.getPendingVDiffs(ctx)
// This should never happen
if len(vde.controllers) > 0 {
log.Warningf("VDiff Engine invalid state detected: %d controllers existed when opening; resetting state", len(vde.controllers))
vde.resetControllers()
}

// At this point the tablet has no controllers running. So
// we want to start any VDiffs that have not been explicitly
// stopped or otherwise finished.
rows, err := vde.getVDiffsToRun(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -219,10 +227,7 @@ func (vde *Engine) Close() {
vde.cancel()

// We still have to wait for all controllers to stop.
for _, ct := range vde.controllers {
ct.Stop()
}
vde.controllers = make(map[int64]*controller)
vde.resetControllers()

// Wait for long-running functions to exit.
vde.wg.Wait()
Expand All @@ -232,14 +237,7 @@ func (vde *Engine) Close() {
log.Infof("VDiff Engine: closed")
}

func (vde *Engine) getDBClient(isAdmin bool) binlogplayer.DBClient {
if isAdmin {
return vde.dbClientFactoryDba()
}
return vde.dbClientFactoryFiltered()
}

func (vde *Engine) getPendingVDiffs(ctx context.Context) (*sqltypes.Result, error) {
func (vde *Engine) getVDiffsToRun(ctx context.Context) (*sqltypes.Result, error) {
dbClient := vde.dbClientFactoryFiltered()
if err := dbClient.Connect(); err != nil {
return nil, err
Expand All @@ -248,7 +246,7 @@ func (vde *Engine) getPendingVDiffs(ctx context.Context) (*sqltypes.Result, erro

// We have to use ExecIgnore here so as not to block quick tablet state
// transitions from primary to non-primary when starting the engine
qr, err := withDDL.ExecIgnore(ctx, sqlGetPendingVDiffs, dbClient.ExecuteFetch)
qr, err := withDDL.ExecIgnore(ctx, sqlGetVDiffsToRun, dbClient.ExecuteFetch)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -343,3 +341,10 @@ func (vde *Engine) retryErroredVDiffs() {
}
}
}

func (vde *Engine) resetControllers() {
for _, ct := range vde.controllers {
ct.Stop()
}
vde.controllers = make(map[int64]*controller)
}
Loading

0 comments on commit 39cbe7d

Please sign in to comment.