diff --git a/go/mysql/constants.go b/go/mysql/constants.go index adc6aeacbd3..bcbfd6ae931 100644 --- a/go/mysql/constants.go +++ b/go/mysql/constants.go @@ -273,7 +273,7 @@ const ( // Error codes for client-side errors. // Originally found in include/mysql/errmsg.h and -// https://dev.mysql.com/doc/refman/5.7/en/error-messages-client.html +// https://dev.mysql.com/doc/mysql-errors/en/client-error-reference.html const ( // CRUnknownError is CR_UNKNOWN_ERROR CRUnknownError = 2000 @@ -286,6 +286,10 @@ const ( // This is returned if a connection via a TCP socket fails. CRConnHostError = 2003 + // CRUnknownHost is CR_UNKNOWN_HOST + // This is returned if the host name cannot be resolved. + CRUnknownHost = 2005 + // CRServerGone is CR_SERVER_GONE_ERROR. // This is returned if the client tries to send a command but it fails. CRServerGone = 2006 @@ -325,7 +329,7 @@ const ( // Error codes for server-side errors. // Originally found in include/mysql/mysqld_error.h and -// https://dev.mysql.com/doc/refman/5.7/en/error-messages-server.html +// https://dev.mysql.com/doc/mysql-errors/en/server-error-reference.html // The below are in sorted order by value, grouped by vterror code they should be bucketed into. // See above reference for more information on each code. const ( @@ -543,6 +547,9 @@ const ( ERJSONDocumentTooDeep = 3157 ERWrongValue = 1525 + // max execution time exceeded + ERQueryTimeout = 3024 + ErrCantCreateGeometryObject = 1416 ErrGISDataWrongEndianess = 3055 ErrNotImplementedForCartesianSRS = 3704 @@ -677,8 +684,12 @@ func IsEphemeralError(err error) bool { CRConnHostError, CRMalformedPacket, CRNamedPipeStateError, + CRServerHandshakeErr, + CRServerGone, CRServerLost, CRSSLConnectionError, + CRUnknownError, + CRUnknownHost, ERCantCreateThread, ERDiskFull, ERForcingClose, @@ -689,6 +700,7 @@ func IsEphemeralError(err error) bool { ERInternalError, ERLockDeadlock, ERLockWaitTimeout, + ERQueryTimeout, EROutOfMemory, EROutOfResources, EROutOfSortMemory, diff --git a/go/vt/vtctl/vdiff2.go b/go/vt/vtctl/vdiff2.go index e97b56edb68..83aa15ef763 100644 --- a/go/vt/vtctl/vdiff2.go +++ b/go/vt/vtctl/vdiff2.go @@ -577,7 +577,7 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st // on every shard. if shardStateCounts[vdiff.StoppedState] > 0 { summary.State = vdiff.StoppedState - } else if tableStateCounts[vdiff.ErrorState] > 0 { + } else if shardStateCounts[vdiff.ErrorState] > 0 || tableStateCounts[vdiff.ErrorState] > 0 { summary.State = vdiff.ErrorState } else if tableStateCounts[vdiff.StartedState] > 0 { summary.State = vdiff.StartedState diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index e3a362d3c6b..f3ee434edcd 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "strings" + "time" "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/vterrors" @@ -127,18 +128,20 @@ func (ct *controller) run(ctx context.Context) { row := qr.Named().Row() state := VDiffState(strings.ToLower(row["state"].ToString())) switch state { - case PendingState: - log.Infof("Starting vdiff %s", ct.uuid) + case PendingState, StartedState: + action := "Starting" + if state == StartedState { + action = "Restarting" + } + log.Infof("%s vdiff %s", action, ct.uuid) if err := ct.start(ctx, dbClient); err != nil { log.Errorf("Encountered an error for vdiff %s: %s", ct.uuid, err) - insertVDiffLog(ctx, dbClient, ct.id, fmt.Sprintf("Error: %s", err)) - if err = ct.updateState(dbClient, ErrorState, err); err != nil { - log.Errorf("Encountered an error marking vdiff %s as errored: %v", ct.uuid, err) + if err := ct.saveErrorState(ctx, err); err != nil { + log.Errorf("Unable to save error state for vdiff %s; giving up because %s", ct.uuid, err.Error()) } - return } default: - log.Infof("VDiff %s was not marked as pending, doing nothing", state) + log.Infof("VDiff %s was not marked as runnable (state: %s), doing nothing", ct.uuid, state) } } @@ -271,3 +274,53 @@ func (ct *controller) validate() error { // TODO: check if vreplication workflow has errors, what else? return nil } + +// saveErrorState saves the error state for the vdiff in the database. +// It never gives up trying to save the error state, unless the context +// has been cancelled or the done channel has been closed -- indicating +// that the engine is closing or the vdiff has been explicitly stopped. +// Note that when the engine is later opened the started vdiff will be +// restarted even though we were unable to save the error state. +// It uses exponential backoff with a factor of 1.5 to avoid creating +// too many database connections. +func (ct *controller) saveErrorState(ctx context.Context, saveErr error) error { + retryDelay := 100 * time.Millisecond + maxRetryDelay := 60 * time.Second + save := func() error { + dbClient := ct.vde.dbClientFactoryFiltered() + if err := dbClient.Connect(); err != nil { + return err + } + defer dbClient.Close() + + if err := ct.updateState(dbClient, ErrorState, saveErr); err != nil { + return err + } + insertVDiffLog(ctx, dbClient, ct.id, fmt.Sprintf("Error: %s", saveErr)) + + return nil + } + + for { + if err := save(); err != nil { + log.Warningf("Failed to persist vdiff error state: %v. Will retry in %s", err, retryDelay.String()) + select { + case <-ctx.Done(): + return fmt.Errorf("engine is shutting down") + case <-ct.done: + return fmt.Errorf("vdiff was stopped") + case <-time.After(retryDelay): + if retryDelay < maxRetryDelay { + retryDelay = time.Duration(float64(retryDelay) * 1.5) + if retryDelay > maxRetryDelay { + retryDelay = maxRetryDelay + } + } + continue + } + } + + // Success + return nil + } +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index 80165ff3d87..73ed5d9deed 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -111,8 +111,16 @@ func (vde *Engine) Open(ctx context.Context, vre *vreplication.Engine) { } func (vde *Engine) openLocked(ctx context.Context) error { - // Start any pending VDiffs - rows, err := vde.getPendingVDiffs(ctx) + // This should never happen + if len(vde.controllers) > 0 { + log.Warningf("VDiff Engine invalid state detected: %d controllers existed when opening; resetting state", len(vde.controllers)) + vde.resetControllers() + } + + // At this point the tablet has no controllers running. So + // we want to start any VDiffs that have not been explicitly + // stopped or otherwise finished. + rows, err := vde.getVDiffsToRun(ctx) if err != nil { return err } @@ -219,10 +227,7 @@ func (vde *Engine) Close() { vde.cancel() // We still have to wait for all controllers to stop. - for _, ct := range vde.controllers { - ct.Stop() - } - vde.controllers = make(map[int64]*controller) + vde.resetControllers() // Wait for long-running functions to exit. vde.wg.Wait() @@ -232,14 +237,7 @@ func (vde *Engine) Close() { log.Infof("VDiff Engine: closed") } -func (vde *Engine) getDBClient(isAdmin bool) binlogplayer.DBClient { - if isAdmin { - return vde.dbClientFactoryDba() - } - return vde.dbClientFactoryFiltered() -} - -func (vde *Engine) getPendingVDiffs(ctx context.Context) (*sqltypes.Result, error) { +func (vde *Engine) getVDiffsToRun(ctx context.Context) (*sqltypes.Result, error) { dbClient := vde.dbClientFactoryFiltered() if err := dbClient.Connect(); err != nil { return nil, err @@ -248,7 +246,7 @@ func (vde *Engine) getPendingVDiffs(ctx context.Context) (*sqltypes.Result, erro // We have to use ExecIgnore here so as not to block quick tablet state // transitions from primary to non-primary when starting the engine - qr, err := withDDL.ExecIgnore(ctx, sqlGetPendingVDiffs, dbClient.ExecuteFetch) + qr, err := withDDL.ExecIgnore(ctx, sqlGetVDiffsToRun, dbClient.ExecuteFetch) if err != nil { return nil, err } @@ -343,3 +341,10 @@ func (vde *Engine) retryErroredVDiffs() { } } } + +func (vde *Engine) resetControllers() { + for _, ct := range vde.controllers { + ct.Stop() + } + vde.controllers = make(map[int64]*controller) +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go new file mode 100644 index 00000000000..9a7014c2ab3 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go @@ -0,0 +1,252 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vdiff + +import ( + "context" + "fmt" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +var ( + wfName = "testwf" + optionsJS = `{"core_options": {"auto_retry": true}}` + vdiffTestCols = "id|vdiff_uuid|workflow|keyspace|shard|db_name|state|options|last_error" + vdiffTestColTypes = "int64|varchar|varbinary|varbinary|varchar|varbinary|varbinary|json|varbinary" + singleRowAffected = &sqltypes.Result{RowsAffected: 1} + noResults = &sqltypes.Result{} + testSchema = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }, + }, + } +) + +func TestEngineOpen(t *testing.T) { + UUID := uuid.New().String() + source := `keyspace:"testsrc" shard:"0" filter:{rules:{match:"t1" filter:"select * from t1"}}` + tests := []struct { + name string + state VDiffState + }{ + // This needs to be started, for the first time, on open + { + name: "pending vdiff", + state: PendingState, + }, + // This needs to be restarted on open as it was previously started + // but was unable to terminate normally (e.g. crash) in the previous + // engine. + { + name: "started vdiff", + state: StartedState, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tablet := addTablet(100) + tablet.Type = topodatapb.TabletType_PRIMARY + defer deleteTablet(tablet) + resetBinlogClient() + dbClient := binlogplayer.NewMockDBClient(t) + dbClientFactory := func() binlogplayer.DBClient { return dbClient } + vde := &Engine{ + controllers: make(map[int64]*controller), + ts: env.TopoServ, + thisTablet: tablet, + dbClientFactoryFiltered: dbClientFactory, + dbClientFactoryDba: dbClientFactory, + dbName: vdiffdb, + } + require.False(t, vde.IsOpen()) + + initialQR := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + vdiffTestCols, + vdiffTestColTypes, + ), + fmt.Sprintf("1|%s|%s|%s|%s|%s|%s|%s|", UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, tt.state, optionsJS), + ) + + dbClient.ExpectRequest("select * from _vt.vdiff where state in ('started','pending')", initialQR, nil) + + dbClient.ExpectRequest("select * from _vt.vdiff where id = 1", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + vdiffTestCols, + vdiffTestColTypes, + ), + fmt.Sprintf("1|%s|%s|%s|%s|%s|%s|%s|", UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, tt.state, optionsJS), + ), nil) + + dbClient.ExpectRequest(fmt.Sprintf("select * from _vt.vreplication where workflow = '%s' and db_name = '%s'", wfName, vdiffdb), sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|workflow|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type", + "int64|varbinary|blob|varbinary|varbinary|int64|int64|varbinary|varbinary|int64|int64|varbinary|varbinary|varbinary|int64|varbinary|int64|int64|int64|varchar|int64", + ), + fmt.Sprintf("1|%s|%s|MySQL56/f69ed286-6909-11ed-8342-0a50724f3211:1-110||9223372036854775807|9223372036854775807||PRIMARY,REPLICA|1669511347|0|Running||%s|200||1669511347|1|0||1", wfName, source, vdiffdb), + ), nil) + + dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", singleRowAffected, nil) + dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: started')", singleRowAffected, nil) + dbClient.ExpectRequest(`select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report + from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) + where vdt.vdiff_id = 1 and vdt.table_name = 't1'`, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "lastpk|mismatch|report", + "varbinary|int64|json", + ), + `fields:{name:"c1" type:INT64 table:"t1" org_table:"t1" database:"vt_customer" org_name:"c1" column_length:20 charset:63 flags:53251} rows:{lengths:1 values:"1"}|0|{}`, + ), nil) + dbClient.ExpectRequest("select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = 'vdiff_test' and table_name in ('t1')", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "table_name|table_rows", + "varchar|int64", + ), + "t1|1", + ), nil) + dbClient.ExpectRequest(`select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report + from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) + where vdt.vdiff_id = 1 and vdt.table_name = 't1'`, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "lastpk|mismatch|report", + "varbinary|int64|json", + ), + `fields:{name:"c1" type:INT64 table:"t1" org_table:"t1" database:"vt_customer" org_name:"c1" column_length:20 charset:63 flags:53251} rows:{lengths:1 values:"1"}|0|{"TableName": "t1", "MatchingRows": 1, "ProcessedRows": 1, "MismatchedRows": 0, "ExtraRowsSource": 0, "ExtraRowsTarget": 0}`, + ), nil) + + // Now let's short circuit the vdiff as we know that the open has worked as expected. + shortCircuitTestAfterQuery("update _vt.vdiff_table set table_rows = 1 where vdiff_id = 1 and table_name = 't1'", dbClient) + + vde.Open(context.Background(), vreplEngine) + defer vde.Close() + assert.True(t, vde.IsOpen()) + assert.Equal(t, 1, len(vde.controllers)) + dbClient.Wait() + }) + } +} + +func TestEngineRetryErroredVDiffs(t *testing.T) { + UUID := uuid.New().String() + source := `keyspace:"testsrc" shard:"0" filter:{rules:{match:"t1" filter:"select * from t1"}}` + expectedControllerCnt := 0 + tests := []struct { + name string + retryQueryResults *sqltypes.Result + expectRetry bool + }{ + { + name: "nothing to retry", + retryQueryResults: noResults, + }, + { + name: "non-ephemeral error", + retryQueryResults: sqltypes.MakeTestResult(sqltypes.MakeTestFields( + vdiffTestCols, + vdiffTestColTypes, + ), + fmt.Sprintf("1|%s|%s|%s|%s|%s|error|%s|%v", UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, optionsJS, + mysql.NewSQLError(mysql.ERNoSuchTable, "42S02", "Table 'foo' doesn't exist")), + ), + }, + { + name: "ephemeral error", + retryQueryResults: sqltypes.MakeTestResult(sqltypes.MakeTestFields( + vdiffTestCols, + vdiffTestColTypes, + ), + fmt.Sprintf("1|%s|%s|%s|%s|%s|error|%s|%v", UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, optionsJS, + mysql.NewSQLError(mysql.ERLockWaitTimeout, "HY000", "Lock wait timeout exceeded; try restarting transaction")), + ), + expectRetry: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tablet := addTablet(100) + tablet.Type = topodatapb.TabletType_PRIMARY + defer deleteTablet(tablet) + resetBinlogClient() + dbClient := binlogplayer.NewMockDBClient(t) + dbClientFactory := func() binlogplayer.DBClient { return dbClient } + vde := &Engine{ + controllers: make(map[int64]*controller), + ts: env.TopoServ, + thisTablet: tablet, + dbClientFactoryFiltered: dbClientFactory, + dbClientFactoryDba: dbClientFactory, + dbName: vdiffdb, + } + require.False(t, vde.IsOpen()) + + dbClient.ExpectRequest("select * from _vt.vdiff where state in ('started','pending')", noResults, nil) + vde.Open(context.Background(), vreplEngine) + defer vde.Close() + assert.True(t, vde.IsOpen()) + assert.Equal(t, 0, len(vde.controllers)) + + dbClient.ExpectRequest("select * from _vt.vdiff where state = 'error' and options->>'$.core_options.auto_retry' = 'true'", tt.retryQueryResults, nil) + // Right now this only supports a single row as with multiple rows we have + // multiple controllers in separate goroutines and the order is not + // guaranteed. If we want to support multiple rows here then we'll need to + // switch to using the queryhistory package. That will also require building + // out that package to support MockDBClient and its Expect* functions + // (query+results+err) as right now it only supports a real DBClient and + // checks for query execution. + for _, row := range tt.retryQueryResults.Rows { + id := row[0].ToString() + if tt.expectRetry { + dbClient.ExpectRequestRE("update _vt.vdiff as vd left join _vt.vdiff_table as vdt on \\(vd.id = vdt.vdiff_id\\) set vd.state = 'pending'.*", singleRowAffected, nil) + dbClient.ExpectRequest(fmt.Sprintf("select * from _vt.vdiff where id = %s", id), sqltypes.MakeTestResult(sqltypes.MakeTestFields( + vdiffTestCols, + vdiffTestColTypes, + ), + fmt.Sprintf("%s|%s|%s|%s|%s|%s|pending|%s|", id, UUID, wfName, env.KeyspaceName, env.ShardName, vdiffdb, optionsJS), + ), nil) + dbClient.ExpectRequest(fmt.Sprintf("select * from _vt.vreplication where workflow = '%s' and db_name = '%s'", wfName, vdiffdb), sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|workflow|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type", + "int64|varbinary|blob|varbinary|varbinary|int64|int64|varbinary|varbinary|int64|int64|varbinary|varbinary|varbinary|int64|varbinary|int64|int64|int64|varchar|int64", + ), + fmt.Sprintf("%s|%s|%s|MySQL56/f69ed286-6909-11ed-8342-0a50724f3211:1-110||9223372036854775807|9223372036854775807||PRIMARY,REPLICA|1669511347|0|Running||%s|200||1669511347|1|0||1", id, wfName, source, vdiffdb), + ), nil) + + // At this point we know that we kicked off the expected retry so we can short circit the vdiff. + shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = %s", id), dbClient) + + expectedControllerCnt++ + } + } + + err := vde.retryVDiffs(vde.ctx) + assert.NoError(t, err) + assert.Equal(t, expectedControllerCnt, len(vde.controllers)) + dbClient.Wait() + }) + } + +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go new file mode 100644 index 00000000000..80fb1f4c443 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -0,0 +1,417 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vdiff + +import ( + "context" + "fmt" + "io" + "os" + "strings" + "testing" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/vttablet/tabletconntest" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" + "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" + "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" + "vitess.io/vitess/go/vt/vttablet/tmclient" + "vitess.io/vitess/go/vt/vttablet/tmclienttest" + "vitess.io/vitess/go/vt/withddl" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +var ( + vstreamerEngine *vstreamer.Engine + vreplEngine *vreplication.Engine + env *testenv.Env + tmc = newFakeTMClient() + globalFBC = &fakeBinlogClient{} + globalDBQueries = make(chan string, 1000) + vdiffdb = "vdiff_test" + doNotLogDBQueries = false +) + +type LogExpectation struct { + Type string + Detail string +} + +func init() { + tabletconn.RegisterDialer("test", func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + return &fakeTabletConn{ + QueryService: fakes.ErrorQueryService, + tablet: tablet, + }, nil + }) + tabletconntest.SetProtocol("go.vt.vttablet.tabletmanager.vdiff.framework_test", "test") + + binlogplayer.RegisterClientFactory("test", func() binlogplayer.Client { return globalFBC }) + + tmclient.RegisterTabletManagerClientFactory("test", func() tmclient.TabletManagerClient { return tmc }) + tmclienttest.SetProtocol("go.vt.vttablet.tabletmanager.vdiff.framework_test", "test") +} + +func TestMain(m *testing.M) { + binlogplayer.SetProtocol("vdiff_test_framework", "test") + exitCode := func() int { + var err error + env, err = testenv.Init() + if err != nil { + fmt.Fprintf(os.Stderr, "%v", err) + return 1 + } + defer env.Close() + + vstreamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) + vstreamerEngine.InitDBConfig(env.KeyspaceName, env.ShardName) + vstreamerEngine.Open() + defer vstreamerEngine.Close() + + ddls := binlogplayer.CreateVReplicationTable() + ddls = append(ddls, binlogplayer.AlterVReplicationTable...) + ddls = append(ddls, withDDL.DDLs()...) + ddls = append(ddls, fmt.Sprintf("create database %s", vdiffdb)) + + for _, ddl := range ddls { + if err := env.Mysqld.ExecuteSuperQuery(context.Background(), ddl); err != nil { + fmt.Fprintf(os.Stderr, "%v", err) + } + } + + vreplEngine = vreplication.NewTestEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, realDBClientFactory, vdiffdb, nil) + vreplEngine.Open(context.Background()) + defer vreplEngine.Close() + + tmc.schema = testSchema + + return m.Run() + }() + os.Exit(exitCode) +} + +func resetBinlogClient() { + globalFBC = &fakeBinlogClient{} +} + +// shortCircuitTestAfterQuery is used to short circuit a test after a specific query is executed. +// This can be used to end a vdiff, by returning an error from the specified query, once the test +// has verified the necessary behavior. +func shortCircuitTestAfterQuery(query string, dbClient *binlogplayer.MockDBClient) { + dbClient.ExpectRequest(query, singleRowAffected, fmt.Errorf("Short circuiting test")) + dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = 'Short circuiting test' where id = 1", singleRowAffected, nil) + dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: error')", singleRowAffected, nil) + dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'Error: Short circuiting test')", singleRowAffected, nil) +} + +//-------------------------------------- +// Topos and tablets + +func addTablet(id int) *topodatapb.Tablet { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: env.Cells[0], + Uid: uint32(id), + }, + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + KeyRange: &topodatapb.KeyRange{}, + Type: topodatapb.TabletType_REPLICA, + PortMap: map[string]int32{ + "test": int32(id), + }, + } + if err := env.TopoServ.CreateTablet(context.Background(), tablet); err != nil { + panic(err) + } + env.SchemaEngine.Reload(context.Background()) + return tablet +} + +func deleteTablet(tablet *topodatapb.Tablet) { + env.TopoServ.DeleteTablet(context.Background(), tablet.Alias) + // This is not automatically removed from shard replication, which results in log spam. + topo.DeleteTabletReplicationData(context.Background(), env.TopoServ, tablet) + env.SchemaEngine.Reload(context.Background()) +} + +// fakeTabletConn implement TabletConn interface. We only care about the +// health check part. The state reported by the tablet will depend +// on the Tag values "serving" and "healthy". +type fakeTabletConn struct { + queryservice.QueryService + tablet *topodatapb.Tablet +} + +// StreamHealth is part of queryservice.QueryService. +func (ftc *fakeTabletConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + return callback(&querypb.StreamHealthResponse{ + Serving: true, + Target: &querypb.Target{ + Keyspace: ftc.tablet.Keyspace, + Shard: ftc.tablet.Shard, + TabletType: ftc.tablet.Type, + }, + RealtimeStats: &querypb.RealtimeStats{}, + }) +} + +// vstreamHook allows you to do work just before calling VStream. +var vstreamHook func(ctx context.Context) + +// VStream directly calls into the pre-initialized engine. +func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VStreamRequest, send func([]*binlogdatapb.VEvent) error) error { + if request.Target.Keyspace != "vttest" { + <-ctx.Done() + return io.EOF + } + if vstreamHook != nil { + vstreamHook(ctx) + } + return vstreamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) +} + +// vstreamRowsHook allows you to do work just before calling VStreamRows. +var vstreamRowsHook func(ctx context.Context) + +// vstreamRowsSendHook allows you to do work just before VStreamRows calls send. +var vstreamRowsSendHook func(ctx context.Context) + +// VStreamRows directly calls into the pre-initialized engine. +func (ftc *fakeTabletConn) VStreamRows(ctx context.Context, request *binlogdatapb.VStreamRowsRequest, send func(*binlogdatapb.VStreamRowsResponse) error) error { + if vstreamRowsHook != nil { + vstreamRowsHook(ctx) + } + var row []sqltypes.Value + if request.Lastpk != nil { + r := sqltypes.Proto3ToResult(request.Lastpk) + if len(r.Rows) != 1 { + return fmt.Errorf("unexpected lastpk input: %v", request.Lastpk) + } + row = r.Rows[0] + } + return vstreamerEngine.StreamRows(ctx, request.Query, row, func(rows *binlogdatapb.VStreamRowsResponse) error { + if vstreamRowsSendHook != nil { + vstreamRowsSendHook(ctx) + } + return send(rows) + }) +} + +//-------------------------------------- +// Binlog Client to TabletManager + +// fakeBinlogClient satisfies binlogplayer.Client. +// Not to be used concurrently. +type fakeBinlogClient struct { + lastTablet *topodatapb.Tablet + lastPos string + lastTables []string + lastKeyRange *topodatapb.KeyRange + lastCharset *binlogdatapb.Charset +} + +func (fbc *fakeBinlogClient) Dial(tablet *topodatapb.Tablet) error { + fbc.lastTablet = tablet + return nil +} + +func (fbc *fakeBinlogClient) Close() { +} + +func (fbc *fakeBinlogClient) StreamTables(ctx context.Context, position string, tables []string, charset *binlogdatapb.Charset) (binlogplayer.BinlogTransactionStream, error) { + fbc.lastPos = position + fbc.lastTables = tables + fbc.lastCharset = charset + return &btStream{ctx: ctx}, nil +} + +func (fbc *fakeBinlogClient) StreamKeyRange(ctx context.Context, position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset) (binlogplayer.BinlogTransactionStream, error) { + fbc.lastPos = position + fbc.lastKeyRange = keyRange + fbc.lastCharset = charset + return &btStream{ctx: ctx}, nil +} + +// btStream satisfies binlogplayer.BinlogTransactionStream +type btStream struct { + ctx context.Context + sent bool +} + +func (bts *btStream) Recv() (*binlogdatapb.BinlogTransaction, error) { + if !bts.sent { + bts.sent = true + return &binlogdatapb.BinlogTransaction{ + Statements: []*binlogdatapb.BinlogTransaction_Statement{ + { + Category: binlogdatapb.BinlogTransaction_Statement_BL_INSERT, + Sql: []byte("insert into t values(1)"), + }, + }, + EventToken: &querypb.EventToken{ + Timestamp: 72, + Position: "MariaDB/0-1-1235", + }, + }, nil + } + <-bts.ctx.Done() + return nil, bts.ctx.Err() +} + +//-------------------------------------- +// DBCLient wrapper + +func realDBClientFactory() binlogplayer.DBClient { + return &realDBClient{} +} + +type realDBClient struct { + conn *mysql.Conn + nolog bool +} + +func (dbc *realDBClient) DBName() string { + return vdiffdb +} + +func (dbc *realDBClient) Connect() error { + app, err := env.Dbcfgs.AppWithDB().MysqlParams() + if err != nil { + return err + } + app.DbName = vdiffdb + conn, err := mysql.Connect(context.Background(), app) + if err != nil { + return err + } + dbc.conn = conn + return nil +} + +func (dbc *realDBClient) Begin() error { + _, err := dbc.ExecuteFetch("begin", 10000) + return err +} + +func (dbc *realDBClient) Commit() error { + _, err := dbc.ExecuteFetch("commit", 10000) + return err +} + +func (dbc *realDBClient) Rollback() error { + _, err := dbc.ExecuteFetch("rollback", 10000) + return err +} + +func (dbc *realDBClient) Close() { + dbc.conn.Close() +} + +func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) { + // Use Clone() because the contents of memory region referenced by + // string can change when clients (e.g. vcopier) use unsafe string methods. + query = strings.Clone(query) + if strings.HasPrefix(query, "use") || + query == withddl.QueryToTriggerWithDDL { // this query breaks unit tests since it errors out + return nil, nil + } + qr, err := dbc.conn.ExecuteFetch(query, 10000, true) + if doNotLogDBQueries { + return qr, err + } + if !strings.HasPrefix(query, "select") && !strings.HasPrefix(query, "set") && !dbc.nolog { + globalDBQueries <- query + } + return qr, err +} + +//---------------------------------------------- +// fakeTMClient + +type fakeTMClient struct { + tmclient.TabletManagerClient + schema *tabletmanagerdatapb.SchemaDefinition + vrQueries map[int]map[string]*querypb.QueryResult + waitpos map[int]string + vrpos map[int]string + pos map[int]string +} + +func newFakeTMClient() *fakeTMClient { + return &fakeTMClient{ + vrQueries: make(map[int]map[string]*querypb.QueryResult), + waitpos: make(map[int]string), + vrpos: make(map[int]string), + pos: make(map[int]string), + } +} + +func (tmc *fakeTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) { + return tmc.schema, nil +} + +func (tmc *fakeTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { + result, ok := tmc.vrQueries[int(tablet.Alias.Uid)][query] + if !ok { + return nil, fmt.Errorf("query %q not found for tablet %d", query, tablet.Alias.Uid) + } + return result, nil +} + +func (tmc *fakeTMClient) WaitForPosition(ctx context.Context, tablet *topodatapb.Tablet, pos string) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if pos != tmc.waitpos[int(tablet.Alias.Uid)] { + return fmt.Errorf("waitpos %s not reached for tablet %d", pos, tablet.Alias.Uid) + } + return nil +} + +func (tmc *fakeTMClient) VReplicationWaitForPos(ctx context.Context, tablet *topodatapb.Tablet, id int, pos string) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if pos != tmc.vrpos[int(tablet.Alias.Uid)] { + return fmt.Errorf("vrpos %s not reached for tablet %d", pos, tablet.Alias.Uid) + } + return nil +} + +func (tmc *fakeTMClient) PrimaryPosition(ctx context.Context, tablet *topodatapb.Tablet) (string, error) { + pos, ok := tmc.pos[int(tablet.Alias.Uid)] + if !ok { + return "", fmt.Errorf("no primary position for %d", tablet.Alias.Uid) + } + return pos, nil +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index a724e474a44..e272a6ac74b 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -54,7 +54,7 @@ const ( db_name varbinary(1024), state varbinary(1024), options json, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + created_at timestamp DEFAULT CURRENT_TIMESTAMP, started_timestamp timestamp NULL DEFAULT NULL, liveness_timestamp timestamp NULL DEFAULT NULL, completed_timestamp timestamp NULL DEFAULT NULL, @@ -70,8 +70,8 @@ const ( rows_compared int not null default 0, mismatch bool not null default false, report json, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + created_at timestamp DEFAULT CURRENT_TIMESTAMP, + updated_at timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, primary key (vdiff_id, table_name)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4` sqlCreateVDiffLogTable = `CREATE TABLE IF NOT EXISTS _vt.vdiff_log ( @@ -85,32 +85,31 @@ const ( sqlResumeVDiff = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.options = %s, vd.started_at = NULL, vd.completed_at = NULL, vd.state = 'pending', vdt.state = 'pending' where vd.vdiff_uuid = %s and vd.id = vdt.vdiff_id and vd.state in ('completed', 'stopped') and vdt.state in ('completed', 'stopped')` - sqlRetryVDiff = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'pending', vd.last_error = '', vdt.state = 'pending' - where vd.id = %d and vd.id = vdt.vdiff_id and vd.state = 'error' and vdt.state = 'error'` + sqlRetryVDiff = `update _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) set vd.state = 'pending', + vd.last_error = '', vdt.state = 'pending' where vd.id = %d and (vd.state = 'error' or vdt.state = 'error')` sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %s and workflow = %s and vdiff_uuid = %s" sqlGetMostRecentVDiff = "select * from _vt.vdiff where keyspace = %s and workflow = %s order by id desc limit 1" sqlGetVDiffByID = "select * from _vt.vdiff where id = %d" - sqlDeleteVDiffs = `delete from vd, vdt, vdl using _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) - inner join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id) + sqlDeleteVDiffs = `delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) + left join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id) where vd.keyspace = %s and vd.workflow = %s` - sqlDeleteVDiffByUUID = `delete from vd, vdt using _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) + sqlDeleteVDiffByUUID = `delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) and vd.vdiff_uuid = %s` sqlVDiffSummary = `select vd.state as vdiff_state, vd.last_error as last_error, vdt.table_name as table_name, vd.vdiff_uuid as 'uuid', vdt.state as table_state, vdt.table_rows as table_rows, vd.started_at as started_at, vdt.table_rows as table_rows, vdt.rows_compared as rows_compared, vd.completed_at as completed_at, IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report - from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) - where vdt.vdiff_id = %d` + from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) + where vd.id = %d` // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1` sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d" sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = '' where vd.id = vdt.vdiff_id and vd.id = %d and vd.state != 'completed'` sqlGetVReplicationEntry = "select * from _vt.vreplication %s" - sqlGetPendingVDiffs = "select * from _vt.vdiff where state = 'pending'" + sqlGetVDiffsToRun = "select * from _vt.vdiff where state in ('started','pending')" // what VDiffs have not been stopped or completed sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and options->>'$.core_options.auto_retry' = 'true'" sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %s" sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc" - sqlGetTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name = %s" sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)" sqlNewVDiffTable = "insert into _vt.vdiff_table(vdiff_id, table_name, state, table_rows) values(%d, %s, 'pending', %d)" diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 2617eb83831..f24d9442ac1 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -671,24 +671,6 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D return nil } -func (td *tableDiffer) updateTableRows(ctx context.Context, dbClient binlogplayer.DBClient) error { - query := fmt.Sprintf(sqlGetTableRows, encodeString(td.wd.ct.vde.dbName), encodeString(td.table.Name)) - qr, err := dbClient.ExecuteFetch(query, 1) - if err != nil { - return err - } - if len(qr.Rows) == 0 { - return fmt.Errorf("no information_schema status found for table %s on tablet %v", - td.table.Name, td.wd.ct.vde.thisTablet.Alias) - } - row := qr.Named().Row() - query = fmt.Sprintf(sqlUpdateTableRows, row.AsInt64("table_rows", 0), td.wd.ct.id, encodeString(td.table.Name)) - if _, err := dbClient.ExecuteFetch(query, 1); err != nil { - return err - } - return nil -} - func (td *tableDiffer) updateTableState(ctx context.Context, dbClient binlogplayer.DBClient, state VDiffState) error { query := fmt.Sprintf(sqlUpdateTableState, encodeString(string(state)), td.wd.ct.id, encodeString(td.table.Name)) if _, err := dbClient.ExecuteFetch(query, 1); err != nil {