Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VDiff: Cleanup the controller for a VDiff before deleting it #14107

Merged
merged 8 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)

if tc.autoRetryError {
testAutoRetryError(t, tc, cells[0].Name)
testAutoRetryError(t, tc, allCellNames)
}

if tc.resume {
testResume(t, tc, cells[0].Name)
testResume(t, tc, allCellNames)
Comment on lines -193 to +197
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes aren't necessary (as it's vtctlclient only and we don't update the stored values except for the create command), but changing them to match.

}

// These are done here so that we have a valid workflow to test the commands against
Expand Down
17 changes: 6 additions & 11 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1986,18 +1986,13 @@ func (s *Server) checkIfPreviousJournalExists(ctx context.Context, mz *materiali
// deleteWorkflowVDiffData cleans up any potential VDiff related data associated
// with the workflow on the given tablet.
func (s *Server) deleteWorkflowVDiffData(ctx context.Context, tablet *topodatapb.Tablet, workflow string) {
sqlDeleteVDiffs := `delete from vd, vdt, vdl using _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
inner join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id)
where vd.keyspace = %s and vd.workflow = %s`
query := fmt.Sprintf(sqlDeleteVDiffs, encodeString(tablet.Keyspace), encodeString(workflow))
rows := -1
if _, err := s.tmc.ExecuteFetchAsAllPrivs(ctx, tablet, &tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest{
Query: []byte(query),
MaxRows: uint64(rows),
if _, err := s.tmc.VDiff(ctx, tablet, &tabletmanagerdatapb.VDiffRequest{
Keyspace: tablet.Keyspace,
Workflow: workflow,
Action: string(vdiff.DeleteAction),
ActionArg: vdiff.AllActionArg,
}); err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Num != sqlerror.ERNoSuchTable { // the tables may not exist if no vdiffs have been run
log.Errorf("Error deleting vdiff data for %s.%s workflow: %v", tablet.Keyspace, workflow, err)
}
log.Errorf("Error deleting vdiff data for %s.%s workflow: %v", tablet.Keyspace, workflow, err)
}
}

Expand Down
10 changes: 10 additions & 0 deletions go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,13 @@ func (tmc *fakeTMClient) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *top
RowsAffected: 1,
}, nil
}

func (tmc *fakeTMClient) VDiff(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) {
return &tabletmanagerdatapb.VDiffResponse{
Id: 1,
VdiffUuid: req.VdiffUuid,
Output: &querypb.QueryResult{
RowsAffected: 1,
},
}, nil
}
55 changes: 50 additions & 5 deletions go/vt/vttablet/tabletmanager/vdiff/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,36 @@ func (vde *Engine) handleStopAction(ctx context.Context, dbClient binlogplayer.D
}

func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error {
var err error
query := ""
vde.mu.Lock()
defer vde.mu.Unlock()
var deleteQuery string
cleanupController := func(controller *controller) {
if controller == nil {
return
}
controller.Stop()
delete(vde.controllers, controller.id)
}

switch req.ActionArg {
case AllActionArg:
query, err = sqlparser.ParseAndBind(sqlDeleteVDiffs,
// We need to stop any running controllers before we delete
// the vdiff records.
query, err := sqlparser.ParseAndBind(sqlGetVDiffIDsByKeyspaceWorkflow,
sqltypes.StringBindVariable(req.Keyspace),
sqltypes.StringBindVariable(req.Workflow),
)
if err != nil {
return err
}
res, err := dbClient.ExecuteFetch(query, -1)
if err != nil {
return err
}
for _, row := range res.Named().Rows {
cleanupController(vde.controllers[row.AsInt64("id", -1)])
}
deleteQuery, err = sqlparser.ParseAndBind(sqlDeleteVDiffs,
sqltypes.StringBindVariable(req.Keyspace),
sqltypes.StringBindVariable(req.Workflow),
)
Expand All @@ -369,12 +393,33 @@ func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer
if err != nil {
return fmt.Errorf("action argument %s not supported", req.ActionArg)
}
query, err = sqlparser.ParseAndBind(sqlDeleteVDiffByUUID, sqltypes.StringBindVariable(uuid.String()))
// We need to be sure that the controller is stopped, if
// it's still running, before we delete the vdiff record.
query, err := sqlparser.ParseAndBind(sqlGetVDiffID,
sqltypes.StringBindVariable(uuid.String()),
)
if err != nil {
return err
}
res, err := dbClient.ExecuteFetch(query, 1)
if err != nil {
return err
}
row := res.Named().Row() // Must only be one
if row == nil {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no vdiff found for UUID %s on tablet %v",
uuid, vde.thisTablet.Alias)
}
cleanupController(vde.controllers[row.AsInt64("id", -1)])
deleteQuery, err = sqlparser.ParseAndBind(sqlDeleteVDiffByUUID,
sqltypes.StringBindVariable(uuid.String()),
)
if err != nil {
return err
}
}
if _, err = dbClient.ExecuteFetch(query, 1); err != nil {
// Execute the query which deletes the vdiff record(s).
if _, err := dbClient.ExecuteFetch(deleteQuery, 1); err != nil {
return err
}

Expand Down
70 changes: 57 additions & 13 deletions go/vt/vttablet/tabletmanager/vdiff/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,18 @@ func TestPerformVDiffAction(t *testing.T) {
keyspace := "ks"
workflow := "wf"
uuid := uuid.New().String()
type queryAndResult struct {
query string
result *sqltypes.Result // Optional if you need a non-empty result
}
tests := []struct {
name string
vde *Engine
req *tabletmanagerdatapb.VDiffRequest
preFunc func() error
postFunc func() error
want *tabletmanagerdatapb.VDiffResponse
expectQueries []string
expectQueries []queryAndResult
wantErr error
}{
{
Expand All @@ -72,9 +76,13 @@ func TestPerformVDiffAction(t *testing.T) {
preFunc: func() error {
return tstenv.TopoServ.CreateCellInfo(ctx, "zone100_test", &topodatapb.CellInfo{})
},
expectQueries: []string{
fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"cell1,zone100_test\",\"target_cell\":\"cell1,zone100_test\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
},
{
query: fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"cell1,zone100_test\",\"target_cell\":\"cell1,zone100_test\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
},
},
postFunc: func() error {
return tstenv.TopoServ.DeleteCellInfo(ctx, "zone100_test", true)
Expand Down Expand Up @@ -102,9 +110,13 @@ func TestPerformVDiffAction(t *testing.T) {
Cells: cells,
})
},
expectQueries: []string{
fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"all\",\"target_cell\":\"all\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
},
{
query: fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"all\",\"target_cell\":\"all\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
},
},
postFunc: func() error {
if err := tstenv.TopoServ.DeleteCellInfo(ctx, "zone100_test", true); err != nil {
Expand All @@ -119,9 +131,21 @@ func TestPerformVDiffAction(t *testing.T) {
Action: string(DeleteAction),
ActionArg: uuid,
},
expectQueries: []string{
fmt.Sprintf(`delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
result: sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id",
"int64",
),
"1",
),
},
{
query: fmt.Sprintf(`delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vd.vdiff_uuid = %s`, encodeString(uuid)),
},
},
},
{
Expand All @@ -132,10 +156,23 @@ func TestPerformVDiffAction(t *testing.T) {
Keyspace: keyspace,
Workflow: workflow,
},
expectQueries: []string{
fmt.Sprintf(`delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where keyspace = %s and workflow = %s", encodeString(keyspace), encodeString(workflow)),
result: sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id",
"int64",
),
"1",
"2",
),
},
{
query: fmt.Sprintf(`delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
left join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id)
where vd.keyspace = %s and vd.workflow = %s`, encodeString(keyspace), encodeString(workflow)),
},
},
},
}
Expand All @@ -148,10 +185,14 @@ func TestPerformVDiffAction(t *testing.T) {
if tt.vde == nil {
tt.vde = vdiffenv.vde
}
for _, query := range tt.expectQueries {
vdiffenv.dbClient.ExpectRequest(query, &sqltypes.Result{}, nil)
for _, queryResult := range tt.expectQueries {
if queryResult.result == nil {
queryResult.result = &sqltypes.Result{}
}
vdiffenv.dbClient.ExpectRequest(queryResult.query, queryResult.result, nil)
}
got, err := tt.vde.PerformVDiffAction(ctx, tt.req)
vdiffenv.dbClient.Wait()
if tt.wantErr != nil && !vterrors.Equals(err, tt.wantErr) {
t.Errorf("Engine.PerformVDiffAction() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -163,6 +204,9 @@ func TestPerformVDiffAction(t *testing.T) {
err := tt.postFunc()
require.NoError(t, err, "post function failed: %v", err)
}
// No VDiffs should be running anymore.
require.Equal(t, 0, len(vdiffenv.vde.controllers), "expected no controllers to be running, but found %d",
len(vdiffenv.vde.controllers))
})
}
}
22 changes: 13 additions & 9 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ import (
"strings"
"time"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

/*
Expand Down Expand Up @@ -183,6 +183,8 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient)
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}
ct.workflowFilter = fmt.Sprintf("where workflow = %s and db_name = %s", encodeString(ct.workflow),
Expand All @@ -197,6 +199,8 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient)
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}
source := newMigrationSource()
Expand Down Expand Up @@ -315,9 +319,9 @@ func (ct *controller) saveErrorState(ctx context.Context, saveErr error) error {
log.Warningf("Failed to persist vdiff error state: %v. Will retry in %s", err, retryDelay.String())
select {
case <-ctx.Done():
return fmt.Errorf("engine is shutting down")
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "engine is shutting down")
case <-ct.done:
return fmt.Errorf("vdiff was stopped")
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
case <-time.After(retryDelay):
if retryDelay < maxRetryDelay {
retryDelay = time.Duration(float64(retryDelay) * 1.5)
Expand Down
15 changes: 8 additions & 7 deletions go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ const (
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d"
sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = ''
where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'`
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
sqlGetVDiffsToRun = "select * from _vt.vdiff where state in ('started','pending')" // what VDiffs have not been stopped or completed
sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'"
sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a"
sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc"
sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a"
sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)"
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
sqlGetVDiffsToRun = "select * from _vt.vdiff where state in ('started','pending')" // what VDiffs have not been stopped or completed
sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'"
sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a"
sqlGetVDiffIDsByKeyspaceWorkflow = "select id as id from _vt.vdiff where keyspace = %a and workflow = %a"
sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc"
sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a"
sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)"

sqlNewVDiffTable = "insert into _vt.vdiff_table(vdiff_id, table_name, state, table_rows) values(%a, %a, 'pending', %a)"
sqlGetVDiffTable = `select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStr
case participant.result <- result:
case <-ctx.Done():
return vterrors.Wrap(ctx.Err(), "VStreamRows")
case <-td.wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
}
return nil
})
Expand Down Expand Up @@ -494,6 +496,8 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl
select {
case <-ctx.Done():
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-td.wd.ct.done:
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}

Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}

Expand Down Expand Up @@ -184,6 +186,8 @@ func (wd *workflowDiffer) diff(ctx context.Context) error {
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}

Expand All @@ -203,6 +207,8 @@ func (wd *workflowDiffer) diff(ctx context.Context) error {
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}
query, err := sqlparser.ParseAndBind(sqlGetVDiffTable,
Expand Down
Loading
Loading