diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 0f34782a43c..72b09e8fede 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -190,11 +190,11 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil) if tc.autoRetryError { - testAutoRetryError(t, tc, cells[0].Name) + testAutoRetryError(t, tc, allCellNames) } if tc.resume { - testResume(t, tc, cells[0].Name) + testResume(t, tc, allCellNames) } // These are done here so that we have a valid workflow to test the commands against diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index b602819acdc..c4d91ae9c6d 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1986,18 +1986,13 @@ func (s *Server) checkIfPreviousJournalExists(ctx context.Context, mz *materiali // deleteWorkflowVDiffData cleans up any potential VDiff related data associated // with the workflow on the given tablet. func (s *Server) deleteWorkflowVDiffData(ctx context.Context, tablet *topodatapb.Tablet, workflow string) { - sqlDeleteVDiffs := `delete from vd, vdt, vdl using _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) - inner join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id) - where vd.keyspace = %s and vd.workflow = %s` - query := fmt.Sprintf(sqlDeleteVDiffs, encodeString(tablet.Keyspace), encodeString(workflow)) - rows := -1 - if _, err := s.tmc.ExecuteFetchAsAllPrivs(ctx, tablet, &tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest{ - Query: []byte(query), - MaxRows: uint64(rows), + if _, err := s.tmc.VDiff(ctx, tablet, &tabletmanagerdatapb.VDiffRequest{ + Keyspace: tablet.Keyspace, + Workflow: workflow, + Action: string(vdiff.DeleteAction), + ActionArg: vdiff.AllActionArg, }); err != nil { - if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Num != sqlerror.ERNoSuchTable { // the tables may not exist if no vdiffs have been run - log.Errorf("Error deleting vdiff data for %s.%s workflow: %v", tablet.Keyspace, workflow, err) - } + log.Errorf("Error deleting vdiff data for %s.%s workflow: %v", tablet.Keyspace, workflow, err) } } diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 7d112add1d3..4734ab9ee96 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -480,3 +480,13 @@ func (tmc *fakeTMClient) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *top RowsAffected: 1, }, nil } + +func (tmc *fakeTMClient) VDiff(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) { + return &tabletmanagerdatapb.VDiffResponse{ + Id: 1, + VdiffUuid: req.VdiffUuid, + Output: &querypb.QueryResult{ + RowsAffected: 1, + }, + }, nil +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/action.go b/go/vt/vttablet/tabletmanager/vdiff/action.go index f7d68349ca0..ac9bec86990 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action.go @@ -352,12 +352,36 @@ func (vde *Engine) handleStopAction(ctx context.Context, dbClient binlogplayer.D } func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error { - var err error - query := "" + vde.mu.Lock() + defer vde.mu.Unlock() + var deleteQuery string + cleanupController := func(controller *controller) { + if controller == nil { + return + } + controller.Stop() + delete(vde.controllers, controller.id) + } switch req.ActionArg { case AllActionArg: - query, err = sqlparser.ParseAndBind(sqlDeleteVDiffs, + // We need to stop any running controllers before we delete + // the vdiff records. + query, err := sqlparser.ParseAndBind(sqlGetVDiffIDsByKeyspaceWorkflow, + sqltypes.StringBindVariable(req.Keyspace), + sqltypes.StringBindVariable(req.Workflow), + ) + if err != nil { + return err + } + res, err := dbClient.ExecuteFetch(query, -1) + if err != nil { + return err + } + for _, row := range res.Named().Rows { + cleanupController(vde.controllers[row.AsInt64("id", -1)]) + } + deleteQuery, err = sqlparser.ParseAndBind(sqlDeleteVDiffs, sqltypes.StringBindVariable(req.Keyspace), sqltypes.StringBindVariable(req.Workflow), ) @@ -369,12 +393,33 @@ func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer if err != nil { return fmt.Errorf("action argument %s not supported", req.ActionArg) } - query, err = sqlparser.ParseAndBind(sqlDeleteVDiffByUUID, sqltypes.StringBindVariable(uuid.String())) + // We need to be sure that the controller is stopped, if + // it's still running, before we delete the vdiff record. + query, err := sqlparser.ParseAndBind(sqlGetVDiffID, + sqltypes.StringBindVariable(uuid.String()), + ) + if err != nil { + return err + } + res, err := dbClient.ExecuteFetch(query, 1) + if err != nil { + return err + } + row := res.Named().Row() // Must only be one + if row == nil { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no vdiff found for UUID %s on tablet %v", + uuid, vde.thisTablet.Alias) + } + cleanupController(vde.controllers[row.AsInt64("id", -1)]) + deleteQuery, err = sqlparser.ParseAndBind(sqlDeleteVDiffByUUID, + sqltypes.StringBindVariable(uuid.String()), + ) if err != nil { return err } } - if _, err = dbClient.ExecuteFetch(query, 1); err != nil { + // Execute the query which deletes the vdiff record(s). + if _, err := dbClient.ExecuteFetch(deleteQuery, 1); err != nil { return err } diff --git a/go/vt/vttablet/tabletmanager/vdiff/action_test.go b/go/vt/vttablet/tabletmanager/vdiff/action_test.go index 6c3106f5310..9bbfbaa4d68 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action_test.go @@ -42,6 +42,10 @@ func TestPerformVDiffAction(t *testing.T) { keyspace := "ks" workflow := "wf" uuid := uuid.New().String() + type queryAndResult struct { + query string + result *sqltypes.Result // Optional if you need a non-empty result + } tests := []struct { name string vde *Engine @@ -49,7 +53,7 @@ func TestPerformVDiffAction(t *testing.T) { preFunc func() error postFunc func() error want *tabletmanagerdatapb.VDiffResponse - expectQueries []string + expectQueries []queryAndResult wantErr error }{ { @@ -72,9 +76,13 @@ func TestPerformVDiffAction(t *testing.T) { preFunc: func() error { return tstenv.TopoServ.CreateCellInfo(ctx, "zone100_test", &topodatapb.CellInfo{}) }, - expectQueries: []string{ - fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)), - fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"cell1,zone100_test\",\"target_cell\":\"cell1,zone100_test\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)), + expectQueries: []queryAndResult{ + { + query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)), + }, + { + query: fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"cell1,zone100_test\",\"target_cell\":\"cell1,zone100_test\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)), + }, }, postFunc: func() error { return tstenv.TopoServ.DeleteCellInfo(ctx, "zone100_test", true) @@ -102,9 +110,13 @@ func TestPerformVDiffAction(t *testing.T) { Cells: cells, }) }, - expectQueries: []string{ - fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)), - fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"all\",\"target_cell\":\"all\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)), + expectQueries: []queryAndResult{ + { + query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)), + }, + { + query: fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"all\",\"target_cell\":\"all\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)), + }, }, postFunc: func() error { if err := tstenv.TopoServ.DeleteCellInfo(ctx, "zone100_test", true); err != nil { @@ -119,9 +131,21 @@ func TestPerformVDiffAction(t *testing.T) { Action: string(DeleteAction), ActionArg: uuid, }, - expectQueries: []string{ - fmt.Sprintf(`delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) + expectQueries: []queryAndResult{ + { + query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)), + result: sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id", + "int64", + ), + "1", + ), + }, + { + query: fmt.Sprintf(`delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) where vd.vdiff_uuid = %s`, encodeString(uuid)), + }, }, }, { @@ -132,10 +156,23 @@ func TestPerformVDiffAction(t *testing.T) { Keyspace: keyspace, Workflow: workflow, }, - expectQueries: []string{ - fmt.Sprintf(`delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) + expectQueries: []queryAndResult{ + { + query: fmt.Sprintf("select id as id from _vt.vdiff where keyspace = %s and workflow = %s", encodeString(keyspace), encodeString(workflow)), + result: sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id", + "int64", + ), + "1", + "2", + ), + }, + { + query: fmt.Sprintf(`delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) left join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id) where vd.keyspace = %s and vd.workflow = %s`, encodeString(keyspace), encodeString(workflow)), + }, }, }, } @@ -148,10 +185,14 @@ func TestPerformVDiffAction(t *testing.T) { if tt.vde == nil { tt.vde = vdiffenv.vde } - for _, query := range tt.expectQueries { - vdiffenv.dbClient.ExpectRequest(query, &sqltypes.Result{}, nil) + for _, queryResult := range tt.expectQueries { + if queryResult.result == nil { + queryResult.result = &sqltypes.Result{} + } + vdiffenv.dbClient.ExpectRequest(queryResult.query, queryResult.result, nil) } got, err := tt.vde.PerformVDiffAction(ctx, tt.req) + vdiffenv.dbClient.Wait() if tt.wantErr != nil && !vterrors.Equals(err, tt.wantErr) { t.Errorf("Engine.PerformVDiffAction() error = %v, wantErr %v", err, tt.wantErr) return @@ -163,6 +204,9 @@ func TestPerformVDiffAction(t *testing.T) { err := tt.postFunc() require.NoError(t, err, "post function failed: %v", err) } + // No VDiffs should be running anymore. + require.Equal(t, 0, len(vdiffenv.vde.controllers), "expected no controllers to be running, but found %d", + len(vdiffenv.vde.controllers)) }) } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index ef8d8a6ba86..de93895a4eb 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -23,20 +23,20 @@ import ( "strings" "time" - "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vterrors" - "google.golang.org/protobuf/encoding/prototext" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tmclient" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) /* @@ -183,6 +183,8 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient) select { case <-ctx.Done(): return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + case <-ct.done: + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped") default: } ct.workflowFilter = fmt.Sprintf("where workflow = %s and db_name = %s", encodeString(ct.workflow), @@ -197,6 +199,8 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient) select { case <-ctx.Done(): return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + case <-ct.done: + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped") default: } source := newMigrationSource() @@ -315,9 +319,9 @@ func (ct *controller) saveErrorState(ctx context.Context, saveErr error) error { log.Warningf("Failed to persist vdiff error state: %v. Will retry in %s", err, retryDelay.String()) select { case <-ctx.Done(): - return fmt.Errorf("engine is shutting down") + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "engine is shutting down") case <-ct.done: - return fmt.Errorf("vdiff was stopped") + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped") case <-time.After(retryDelay): if retryDelay < maxRetryDelay { retryDelay = time.Duration(float64(retryDelay) * 1.5) diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index 2269f48d596..f9f48cc72e9 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -42,13 +42,14 @@ const ( sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d" sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = '' where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'` - sqlGetVReplicationEntry = "select * from _vt.vreplication %s" - sqlGetVDiffsToRun = "select * from _vt.vdiff where state in ('started','pending')" // what VDiffs have not been stopped or completed - sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'" - sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a" - sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc" - sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a" - sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)" + sqlGetVReplicationEntry = "select * from _vt.vreplication %s" + sqlGetVDiffsToRun = "select * from _vt.vdiff where state in ('started','pending')" // what VDiffs have not been stopped or completed + sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'" + sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a" + sqlGetVDiffIDsByKeyspaceWorkflow = "select id as id from _vt.vdiff where keyspace = %a and workflow = %a" + sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc" + sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a" + sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)" sqlNewVDiffTable = "insert into _vt.vdiff_table(vdiff_id, table_name, state, table_rows) values(%a, %a, 'pending', %a)" sqlGetVDiffTable = `select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index b703c60e087..e65a0bad253 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -407,6 +407,8 @@ func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStr case participant.result <- result: case <-ctx.Done(): return vterrors.Wrap(ctx.Err(), "VStreamRows") + case <-td.wd.ct.done: + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped") } return nil }) @@ -494,6 +496,8 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl select { case <-ctx.Done(): return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + case <-td.wd.ct.done: + return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped") default: } diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index 7d7af3f8a37..e27d421d398 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -137,6 +137,8 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D select { case <-ctx.Done(): return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + case <-wd.ct.done: + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped") default: } @@ -184,6 +186,8 @@ func (wd *workflowDiffer) diff(ctx context.Context) error { select { case <-ctx.Done(): return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + case <-wd.ct.done: + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped") default: } @@ -203,6 +207,8 @@ func (wd *workflowDiffer) diff(ctx context.Context) error { select { case <-ctx.Done(): return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + case <-wd.ct.done: + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped") default: } query, err := sqlparser.ParseAndBind(sqlGetVDiffTable, diff --git a/go/vt/wrangler/workflow.go b/go/vt/wrangler/workflow.go index 5670fb7173a..d9dbcee7291 100644 --- a/go/vt/wrangler/workflow.go +++ b/go/vt/wrangler/workflow.go @@ -12,12 +12,14 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/workflow" + vdiff2 "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // VReplicationWorkflowType specifies whether workflow is MoveTables or Reshard @@ -709,20 +711,16 @@ func (vrw *VReplicationWorkflow) GetCopyProgress() (*CopyProgress, error) { // region Workflow related utility functions -// deleteWorkflowVDiffData cleans up any potential VDiff related data associated with the workflow on the given tablet +// deleteWorkflowVDiffData cleans up any potential VDiff related data associated +// with the workflow on the given tablet. func (wr *Wrangler) deleteWorkflowVDiffData(ctx context.Context, tablet *topodatapb.Tablet, workflow string) { - sqlDeleteVDiffs := `delete from vd, vdt, vdl using _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) - inner join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id) - where vd.keyspace = %s and vd.workflow = %s` - query := fmt.Sprintf(sqlDeleteVDiffs, encodeString(tablet.Keyspace), encodeString(workflow)) - rows := -1 - if _, err := wr.tmc.ExecuteFetchAsDba(ctx, tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ - Query: []byte(query), - MaxRows: uint64(rows), + if _, err := wr.tmc.VDiff(ctx, tablet, &tabletmanagerdatapb.VDiffRequest{ + Keyspace: tablet.Keyspace, + Workflow: workflow, + Action: string(vdiff2.DeleteAction), + ActionArg: vdiff2.AllActionArg, }); err != nil { - if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Num != sqlerror.ERNoSuchTable { // the tables may not exist if no vdiffs have been run - wr.Logger().Errorf("Error deleting vdiff data for %s.%s workflow: %v", tablet.Keyspace, workflow, err) - } + log.Errorf("Error deleting vdiff data for %s.%s workflow: %v", tablet.Keyspace, workflow, err) } }