From 5a908cf95eb9632b4e85ba1c3d6da40604120d6f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 7 Aug 2023 09:59:57 -0400 Subject: [PATCH 01/18] MoveTables: don't create routing rules until after streams Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 68 +++++++++++++++++---------------- go/vt/wrangler/materializer.go | 69 ++++++++++++++++++---------------- 2 files changed, 72 insertions(+), 65 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index c55c5889b03..d6a9baffeb8 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1004,39 +1004,7 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } } - if externalTopo == nil { - // Save routing rules before vschema. If we save vschema first, and routing rules - // fails to save, we may generate duplicate table errors. - rules, err := topotools.GetRoutingRules(ctx, s.ts) - if err != nil { - return nil, err - } - for _, table := range tables { - toSource := []string{sourceKeyspace + "." + table} - rules[table] = toSource - rules[table+"@replica"] = toSource - rules[table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[targetKeyspace+"."+table+"@replica"] = toSource - rules[targetKeyspace+"."+table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[sourceKeyspace+"."+table+"@replica"] = toSource - rules[sourceKeyspace+"."+table+"@rdonly"] = toSource - } - if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil { - return nil, err - } - if vschema != nil { - // We added to the vschema. - if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { - return nil, err - } - } - } - if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil { - return nil, err - } ms := &vtctldatapb.MaterializeSettings{ Workflow: req.Workflow, MaterializationIntent: vtctldatapb.MaterializationIntent_MOVETABLES, @@ -1081,6 +1049,42 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } + // Now that we know the streams could be created, let's put the associated routing + // rules in place. + if externalTopo == nil { + // Save routing rules before vschema. If we save vschema first, and routing rules + // fails to save, we may generate duplicate table errors. + rules, err := topotools.GetRoutingRules(ctx, s.ts) + if err != nil { + return nil, err + } + for _, table := range tables { + toSource := []string{sourceKeyspace + "." + table} + rules[table] = toSource + rules[table+"@replica"] = toSource + rules[table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[targetKeyspace+"."+table+"@replica"] = toSource + rules[targetKeyspace+"."+table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[sourceKeyspace+"."+table+"@replica"] = toSource + rules[sourceKeyspace+"."+table+"@rdonly"] = toSource + } + if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil { + return nil, err + } + + if vschema != nil { + // We added to the vschema. + if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { + return nil, err + } + } + } + if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil { + return nil, err + } + if ms.SourceTimeZone != "" { if err := mz.checkTZConversion(ctx, ms.SourceTimeZone); err != nil { return nil, err diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 2edc06aa607..4790cab3be3 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -212,39 +212,6 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta } } } - if externalTopo == nil { - // Save routing rules before vschema. If we save vschema first, and routing rules - // fails to save, we may generate duplicate table errors. - rules, err := topotools.GetRoutingRules(ctx, wr.ts) - if err != nil { - return err - } - for _, table := range tables { - toSource := []string{sourceKeyspace + "." + table} - rules[table] = toSource - rules[table+"@replica"] = toSource - rules[table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[targetKeyspace+"."+table+"@replica"] = toSource - rules[targetKeyspace+"."+table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[sourceKeyspace+"."+table+"@replica"] = toSource - rules[sourceKeyspace+"."+table+"@rdonly"] = toSource - } - if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil { - return err - } - - if vschema != nil { - // We added to the vschema. - if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { - return err - } - } - } - if err := wr.ts.RebuildSrvVSchema(ctx, nil); err != nil { - return err - } tabletTypes, inorder, err := discovery.ParseTabletTypesAndOrder(tabletTypesStr) if err != nil { return err @@ -290,6 +257,42 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta return err } + // Now that we know the streams could be created, let's put the associated routing + // rules in place. + if externalTopo == nil { + // Save routing rules before vschema. If we save vschema first, and routing rules + // fails to save, we may generate duplicate table errors. + rules, err := topotools.GetRoutingRules(ctx, wr.ts) + if err != nil { + return err + } + for _, table := range tables { + toSource := []string{sourceKeyspace + "." + table} + rules[table] = toSource + rules[table+"@replica"] = toSource + rules[table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[targetKeyspace+"."+table+"@replica"] = toSource + rules[targetKeyspace+"."+table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[sourceKeyspace+"."+table+"@replica"] = toSource + rules[sourceKeyspace+"."+table+"@rdonly"] = toSource + } + if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil { + return err + } + + if vschema != nil { + // We added to the vschema. + if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { + return err + } + } + } + if err := wr.ts.RebuildSrvVSchema(ctx, nil); err != nil { + return err + } + if sourceTimeZone != "" { if err := mz.checkTZConversion(ctx, sourceTimeZone); err != nil { return err From e478ab2ea02d36d62a4a93980ee7607ddb2ee9b2 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 7 Aug 2023 13:36:15 -0400 Subject: [PATCH 02/18] Add vtctlclient unit test (that fails on main) Signed-off-by: Matt Lord --- go/vt/wrangler/traffic_switcher_env_test.go | 21 ++++++++++++ go/vt/wrangler/traffic_switcher_test.go | 36 +++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 18b2e6d54c4..f9f2c7edf15 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -37,6 +37,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vschema" vschemapb "vitess.io/vitess/go/vt/proto/vschema" @@ -180,6 +181,17 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, }, }, } + schema := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "t1", + }, + { + Name: "t2", + }, + }, + } + tme.setPrimarySchemas(schema) if len(sourceShards) != 1 { if err := tme.ts.SaveVSchema(ctx, "ks1", vs); err != nil { t.Fatal(err) @@ -678,6 +690,15 @@ func (tme *testMigraterEnv) setPrimaryPositions() { } } +func (tme *testMigraterEnv) setPrimarySchemas(schema *tabletmanagerdatapb.SchemaDefinition) { + for _, primary := range tme.sourcePrimaries { + primary.FakeMysqlDaemon.Schema = schema + } + for _, primary := range tme.targetPrimaries { + primary.FakeMysqlDaemon.Schema = schema + } +} + func (tme *testMigraterEnv) expectNoPreviousJournals() { // validate that no previous journals exist for _, dbclient := range tme.dbSourceClients { diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index f7ae7998410..b3b3748e795 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -29,12 +29,14 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/workflow" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) var ( @@ -2143,6 +2145,40 @@ func TestIsPartialMoveTables(t *testing.T) { } } +// TestNoOrphanedRoutingRulesOnFailedCreate tests that no orphaned routing rules +// are left in place when the workflow creation fails -- specifically at the point +// where we try and create the workflow streams. +func TestNoOrphanedRoutingRulesOnFailedCreate(t *testing.T) { + ctx := context.Background() + tme := newTestTableMigraterCustom(ctx, t, []string{"0"}, []string{"-80", "80-"}, "select * %s") + defer tme.close(t) + + // The target keyspace is sharded. Let's remove the vschema definitions so + // that we know the workflow creation will fail. + // Let's also be sure that the routing rules are empty. + if err := topotools.SaveRoutingRules(ctx, tme.wr.ts, nil); err != nil { + t.Fatal(err) + } + emptyVschema := &vschemapb.Keyspace{} + if err := tme.ts.SaveVSchema(ctx, "ks2", emptyVschema); err != nil { + t.Fatal(err) + } + if err := tme.ts.RebuildSrvVSchema(ctx, nil); err != nil { + t.Fatal(err) + } + err := topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks1", []string{"cell1"}, false) + if err != nil { + t.Fatal(err) + } + + err = tme.wr.MoveTables(ctx, "testwf", "ks1", "ks2", "t1,t2", "cell1", "primary,replica", false, "", true, false, "", false, false, "", "", nil) + require.Error(t, err) + + // Check that there are no orphaned routing rules. + emptyRules := make(map[string][]string) + checkRouting(t, tme.wr, emptyRules) +} + func checkRouting(t *testing.T, wr *Wrangler, want map[string][]string) { t.Helper() ctx := context.Background() From d200fd70cc20bc79730c0a5f88d49e5a45c6e20f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 7 Aug 2023 19:29:16 -0400 Subject: [PATCH 03/18] Improve comment Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 4 ++-- go/vt/wrangler/materializer.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index d6a9baffeb8..009f53094e6 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1049,8 +1049,8 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } - // Now that we know the streams could be created, let's put the associated routing - // rules in place. + // Now that the streams have been successfully created, let's put the associated + // routing rules in place. if externalTopo == nil { // Save routing rules before vschema. If we save vschema first, and routing rules // fails to save, we may generate duplicate table errors. diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 4790cab3be3..c28af0917ec 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -257,8 +257,8 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta return err } - // Now that we know the streams could be created, let's put the associated routing - // rules in place. + // Now that the streams have been successfully created, let's put the associated + // routing rules in place. if externalTopo == nil { // Save routing rules before vschema. If we save vschema first, and routing rules // fails to save, we may generate duplicate table errors. From 637ba76ab45271589dc8359b7ef24f27ade347c1 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 8 Aug 2023 00:11:35 -0400 Subject: [PATCH 04/18] Pedantic cleanup Signed-off-by: Matt Lord --- go/vt/wrangler/traffic_switcher_env_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index f9f2c7edf15..d86bbc6590a 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -36,11 +36,6 @@ import ( "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/proto/vschema" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" @@ -49,6 +44,11 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tmclient" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) const ( @@ -511,26 +511,26 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe vs := &vschemapb.Keyspace{ Sharded: true, - Vindexes: map[string]*vschema.Vindex{ + Vindexes: map[string]*vschemapb.Vindex{ "thash": { Type: "hash", }, }, - Tables: map[string]*vschema.Table{ + Tables: map[string]*vschemapb.Table{ "t1": { - ColumnVindexes: []*vschema.ColumnVindex{{ + ColumnVindexes: []*vschemapb.ColumnVindex{{ Columns: []string{"c1"}, Name: "thash", }}, }, "t2": { - ColumnVindexes: []*vschema.ColumnVindex{{ + ColumnVindexes: []*vschemapb.ColumnVindex{{ Columns: []string{"c1"}, Name: "thash", }}, }, "t3": { - ColumnVindexes: []*vschema.ColumnVindex{{ + ColumnVindexes: []*vschemapb.ColumnVindex{{ Columns: []string{"c1"}, Name: "thash", }}, From a1dd8e146413db880987982a18a17e7cc7c3b40c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 8 Aug 2023 00:19:43 -0400 Subject: [PATCH 05/18] Minor changes after self review Signed-off-by: Matt Lord --- go/vt/wrangler/traffic_switcher_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index b3b3748e795..7d4c8238aa7 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -36,7 +36,6 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) var ( @@ -2159,8 +2158,7 @@ func TestNoOrphanedRoutingRulesOnFailedCreate(t *testing.T) { if err := topotools.SaveRoutingRules(ctx, tme.wr.ts, nil); err != nil { t.Fatal(err) } - emptyVschema := &vschemapb.Keyspace{} - if err := tme.ts.SaveVSchema(ctx, "ks2", emptyVschema); err != nil { + if err := tme.ts.SaveVSchema(ctx, "ks2", nil); err != nil { t.Fatal(err) } if err := tme.ts.RebuildSrvVSchema(ctx, nil); err != nil { From 411f2fe7c198843dd7f19b10acf5560747de1115 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 8 Aug 2023 00:23:37 -0400 Subject: [PATCH 06/18] Stop using t.Fatal in new tests Signed-off-by: Matt Lord --- go/vt/wrangler/traffic_switcher_test.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 7d4c8238aa7..e1cb3ebc59c 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -2155,19 +2155,14 @@ func TestNoOrphanedRoutingRulesOnFailedCreate(t *testing.T) { // The target keyspace is sharded. Let's remove the vschema definitions so // that we know the workflow creation will fail. // Let's also be sure that the routing rules are empty. - if err := topotools.SaveRoutingRules(ctx, tme.wr.ts, nil); err != nil { - t.Fatal(err) - } - if err := tme.ts.SaveVSchema(ctx, "ks2", nil); err != nil { - t.Fatal(err) - } - if err := tme.ts.RebuildSrvVSchema(ctx, nil); err != nil { - t.Fatal(err) - } - err := topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks1", []string{"cell1"}, false) - if err != nil { - t.Fatal(err) - } + err := topotools.SaveRoutingRules(ctx, tme.wr.ts, nil) + require.NoError(t, err, "failed to save routing rules") + err = tme.ts.SaveVSchema(ctx, "ks2", nil) + require.NoError(t, err, "failed to save vschema") + err = tme.ts.RebuildSrvVSchema(ctx, nil) + require.NoError(t, err, "failed to rebuild serving vschema") + err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks1", []string{"cell1"}, false) + require.NoError(t, err, "failed to rebuild keyspace") err = tme.wr.MoveTables(ctx, "testwf", "ks1", "ks2", "t1,t2", "cell1", "primary,replica", false, "", true, false, "", false, false, "", "", nil) require.Error(t, err) From b47fe100df2c702fea9e27091066598e2e1f7079 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 9 Aug 2023 22:59:18 -0400 Subject: [PATCH 07/18] Add vtctldclient unit test Signed-off-by: Matt Lord --- .../tabletmanager/rpc_vreplication_test.go | 73 +++++++++++++++++++ go/vt/wrangler/traffic_switcher_test.go | 9 ++- 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 8c8e5c0e37b..e2e931a1153 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -29,7 +29,9 @@ import ( "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -590,3 +592,74 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { }) } } + +// TestNoOrphanedRoutingRulesOnFailedCreate tests that no orphaned routing rules +// are left in place when the workflow creation fails -- specifically at the point +// where we try and create the workflow streams. +func TestNoOrphanedRoutingRulesOnFailedCreate(t *testing.T) { + ctx := context.Background() + sourceKs := "sourceks" + sourceTabletUID := 200 + sourceShard := "0" + targetKs := "targetks" + targetShards := make(map[string]*fakeTabletConn) + wf := "testwf" + table := defaultSchema.TableDefinitions[0].Name + tenv := newTestEnv(t, sourceKs, []string{sourceShard}) + defer tenv.close() + ws := workflow.NewServer(tenv.ts, tenv.tmc) + + sourceTablet := tenv.addTablet(t, sourceTabletUID, sourceKs, shard) + defer tenv.deleteTablet(sourceTablet.tablet) + targetShards["-80"] = tenv.addTablet(t, 300, targetKs, "-80") + defer tenv.deleteTablet(targetShards["-80"].tablet) + targetShards["80-"] = tenv.addTablet(t, 310, targetKs, "80-") + defer tenv.deleteTablet(targetShards["80-"].tablet) + + tenv.mysqld.Schema = defaultSchema + tenv.mysqld.Schema.DatabaseSchema = tenv.dbName + + // The target keyspace is sharded. Let's remove any vschema table + // definitions so that we know the workflow creation will fail. + // Let's also be sure that the routing rules are empty. + err := topotools.SaveRoutingRules(ctx, tenv.ts, nil) + require.NoError(t, err, "failed to save routing rules") + err = tenv.ts.SaveVSchema(ctx, targetKs, &vschemapb.Keyspace{ + Sharded: true, + }) + require.NoError(t, err, "failed to save vschema") + err = tenv.ts.RebuildSrvVSchema(ctx, nil) + require.NoError(t, err, "failed to rebuild serving vschema") + err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tenv.ts, sourceKs, tenv.cells, false) + require.NoError(t, err, "failed to rebuild keyspace") + + for _, tablet := range targetShards { + tenv.tmc.setVReplicationExecResults(tablet.tablet, fmt.Sprintf(checkForWorkflow, targetKs, wf), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(tablet.tablet, fmt.Sprintf(checkForFrozenWorkflow, targetKs), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(tablet.tablet, fmt.Sprintf(getWorkflow, targetKs, wf), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id", + "int64", + ), + "1", + ), + ) + tablet.vrdbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + } + + _, err = ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{ + Workflow: wf, + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Cells: tenv.cells, + TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, + IncludeTables: []string{table}, + }) + require.ErrorContains(t, err, fmt.Sprintf("table %s not found in vschema for keyspace %s", table, targetKs)) + + // Check that there are no orphaned routing rules. + rules, err := topotools.GetRoutingRules(ctx, tenv.ts) + require.NoError(t, err, "failed to get routing rules") + require.Equal(t, 0, len(rules), "expected no routing rules to be present") +} diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index c130b1cfed5..f7339bef42a 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -36,6 +36,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) var ( @@ -2153,12 +2154,14 @@ func TestNoOrphanedRoutingRulesOnFailedCreate(t *testing.T) { tme := newTestTableMigraterCustom(ctx, t, []string{"0"}, []string{"-80", "80-"}, "select * %s") defer tme.close(t) - // The target keyspace is sharded. Let's remove the vschema definitions so - // that we know the workflow creation will fail. + // The target keyspace is sharded. Let's remove any vschema table + // definitions so that we know the workflow creation will fail. // Let's also be sure that the routing rules are empty. err := topotools.SaveRoutingRules(ctx, tme.wr.ts, nil) require.NoError(t, err, "failed to save routing rules") - err = tme.ts.SaveVSchema(ctx, "ks2", nil) + err = tme.ts.SaveVSchema(ctx, "ks2", &vschemapb.Keyspace{ + Sharded: true, + }) require.NoError(t, err, "failed to save vschema") err = tme.ts.RebuildSrvVSchema(ctx, nil) require.NoError(t, err, "failed to rebuild serving vschema") From d7360b819b20de35edeb217a82b8a2b49dafba65 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Aug 2023 13:08:17 -0400 Subject: [PATCH 08/18] Automatically clean up on MoveTables Create error Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer.go | 5 --- go/vt/vtctl/workflow/server.go | 19 ++++++++ go/vt/wrangler/materializer.go | 65 +++++++++++++++++----------- 3 files changed, 58 insertions(+), 31 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 15da7a20107..54a6a726bcf 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -69,11 +69,6 @@ func (mz *materializer) prepareMaterializerStreams(req *vtctldatapb.MoveTablesCr if err != nil { return err } - if mz.isPartial { - if err := createDefaultShardRoutingRules(mz.ctx, mz.ms, mz.ts); err != nil { - return err - } - } if err := mz.deploySchema(); err != nil { return err } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index e85ef9c2c53..d8d741573ff 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1069,11 +1069,30 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } + // If we get an error after this point, where the vreplication streams/records have been + // created, then we clean up the workflow's artifacts. + defer func() { + if err != nil { + ts, cerr := s.buildTrafficSwitcher(ctx, ms.TargetKeyspace, ms.Workflow) + if cerr != nil { + err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr) + } + if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil { + err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr) + } + } + }() + // Now that the streams have been successfully created, let's put the associated // routing rules in place. if externalTopo == nil { // Save routing rules before vschema. If we save vschema first, and routing rules // fails to save, we may generate duplicate table errors. + if mz.isPartial { + if err := createDefaultShardRoutingRules(mz.ctx, mz.ms, mz.ts); err != nil { + return nil, err + } + } rules, err := topotools.GetRoutingRules(ctx, s.ts) if err != nil { return nil, err diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index c28af0917ec..0039b029b53 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -129,11 +129,10 @@ func shouldInclude(table string, excludes []string) bool { // MoveTables initiates moving table(s) over to another keyspace func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs, cell, tabletTypesStr string, allTables bool, excludeTables string, autoStart, stopAfterCopy bool, - externalCluster string, dropForeignKeys, deferSecondaryKeys bool, sourceTimeZone, onDDL string, sourceShards []string) error { + externalCluster string, dropForeignKeys, deferSecondaryKeys bool, sourceTimeZone, onDDL string, sourceShards []string) (err error) { //FIXME validate tableSpecs, allTables, excludeTables var tables []string var externalTopo *topo.Server - var err error if externalCluster != "" { // when the source is an external mysql cluster mounted using the Mount command externalTopo, err = wr.ts.OpenExternalVitessClusterServer(ctx, externalCluster) @@ -257,31 +256,50 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta return err } + // If we get an error after this point, where the vreplication streams/records have been + // created, then we clean up the workflow's artifacts. + defer func() { + if err != nil { + ts, cerr := wr.buildTrafficSwitcher(ctx, ms.TargetKeyspace, ms.Workflow) + if cerr != nil { + err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr) + } + if cerr := wr.dropArtifacts(ctx, false, &switcher{ts: ts, wr: wr}); cerr != nil { + err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr) + } + } + }() + // Now that the streams have been successfully created, let's put the associated // routing rules in place. if externalTopo == nil { // Save routing rules before vschema. If we save vschema first, and routing rules // fails to save, we may generate duplicate table errors. - rules, err := topotools.GetRoutingRules(ctx, wr.ts) - if err != nil { - return err - } - for _, table := range tables { - toSource := []string{sourceKeyspace + "." + table} - rules[table] = toSource - rules[table+"@replica"] = toSource - rules[table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[targetKeyspace+"."+table+"@replica"] = toSource - rules[targetKeyspace+"."+table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[sourceKeyspace+"."+table+"@replica"] = toSource - rules[sourceKeyspace+"."+table+"@rdonly"] = toSource - } - if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil { - return err + if mz.isPartial { + if err := wr.createDefaultShardRoutingRules(ctx, ms); err != nil { + return err + } + } else { + rules, err := topotools.GetRoutingRules(ctx, wr.ts) + if err != nil { + return err + } + for _, table := range tables { + toSource := []string{sourceKeyspace + "." + table} + rules[table] = toSource + rules[table+"@replica"] = toSource + rules[table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[targetKeyspace+"."+table+"@replica"] = toSource + rules[targetKeyspace+"."+table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[sourceKeyspace+"."+table+"@replica"] = toSource + rules[sourceKeyspace+"."+table+"@rdonly"] = toSource + } + if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil { + return err + } } - if vschema != nil { // We added to the vschema. if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { @@ -964,11 +982,6 @@ func (wr *Wrangler) prepareMaterializerStreams(ctx context.Context, ms *vtctldat if err != nil { return nil, err } - if mz.isPartial { - if err := wr.createDefaultShardRoutingRules(ctx, ms); err != nil { - return nil, err - } - } if err := mz.deploySchema(ctx); err != nil { return nil, err } From 83f593870c52adb8c7cb8f3684f302e2d2cfe205 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Aug 2023 14:05:34 -0400 Subject: [PATCH 09/18] Add missing else in workflow server Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 39 +++++++++++++++++----------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index d8d741573ff..3213cf079d6 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1092,25 +1092,26 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if err := createDefaultShardRoutingRules(mz.ctx, mz.ms, mz.ts); err != nil { return nil, err } - } - rules, err := topotools.GetRoutingRules(ctx, s.ts) - if err != nil { - return nil, err - } - for _, table := range tables { - toSource := []string{sourceKeyspace + "." + table} - rules[table] = toSource - rules[table+"@replica"] = toSource - rules[table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[targetKeyspace+"."+table+"@replica"] = toSource - rules[targetKeyspace+"."+table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[sourceKeyspace+"."+table+"@replica"] = toSource - rules[sourceKeyspace+"."+table+"@rdonly"] = toSource - } - if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil { - return nil, err + } else { + rules, err := topotools.GetRoutingRules(ctx, s.ts) + if err != nil { + return nil, err + } + for _, table := range tables { + toSource := []string{sourceKeyspace + "." + table} + rules[table] = toSource + rules[table+"@replica"] = toSource + rules[table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[targetKeyspace+"."+table+"@replica"] = toSource + rules[targetKeyspace+"."+table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[sourceKeyspace+"."+table+"@replica"] = toSource + rules[sourceKeyspace+"."+table+"@rdonly"] = toSource + } + if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil { + return nil, err + } } if vschema != nil { From 3c381c45eafdf5c98591958d1aa969f7c121216f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Aug 2023 14:42:44 -0400 Subject: [PATCH 10/18] Restore original logic Meaning that the routing rules are put in place for partial movetables as well. Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 40 ++++++++++++++++----------------- go/vt/wrangler/materializer.go | 41 +++++++++++++++++----------------- 2 files changed, 41 insertions(+), 40 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 3213cf079d6..24a3c7675f4 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1092,26 +1092,26 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if err := createDefaultShardRoutingRules(mz.ctx, mz.ms, mz.ts); err != nil { return nil, err } - } else { - rules, err := topotools.GetRoutingRules(ctx, s.ts) - if err != nil { - return nil, err - } - for _, table := range tables { - toSource := []string{sourceKeyspace + "." + table} - rules[table] = toSource - rules[table+"@replica"] = toSource - rules[table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[targetKeyspace+"."+table+"@replica"] = toSource - rules[targetKeyspace+"."+table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[sourceKeyspace+"."+table+"@replica"] = toSource - rules[sourceKeyspace+"."+table+"@rdonly"] = toSource - } - if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil { - return nil, err - } + } + + rules, err := topotools.GetRoutingRules(ctx, s.ts) + if err != nil { + return nil, err + } + for _, table := range tables { + toSource := []string{sourceKeyspace + "." + table} + rules[table] = toSource + rules[table+"@replica"] = toSource + rules[table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[targetKeyspace+"."+table+"@replica"] = toSource + rules[targetKeyspace+"."+table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[sourceKeyspace+"."+table+"@replica"] = toSource + rules[sourceKeyspace+"."+table+"@rdonly"] = toSource + } + if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil { + return nil, err } if vschema != nil { diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 0039b029b53..ac6e6aacb71 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -279,27 +279,28 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta if err := wr.createDefaultShardRoutingRules(ctx, ms); err != nil { return err } - } else { - rules, err := topotools.GetRoutingRules(ctx, wr.ts) - if err != nil { - return err - } - for _, table := range tables { - toSource := []string{sourceKeyspace + "." + table} - rules[table] = toSource - rules[table+"@replica"] = toSource - rules[table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[targetKeyspace+"."+table+"@replica"] = toSource - rules[targetKeyspace+"."+table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[sourceKeyspace+"."+table+"@replica"] = toSource - rules[sourceKeyspace+"."+table+"@rdonly"] = toSource - } - if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil { - return err - } } + + rules, err := topotools.GetRoutingRules(ctx, wr.ts) + if err != nil { + return err + } + for _, table := range tables { + toSource := []string{sourceKeyspace + "." + table} + rules[table] = toSource + rules[table+"@replica"] = toSource + rules[table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[targetKeyspace+"."+table+"@replica"] = toSource + rules[targetKeyspace+"."+table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[sourceKeyspace+"."+table+"@replica"] = toSource + rules[sourceKeyspace+"."+table+"@rdonly"] = toSource + } + if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil { + return err + } + if vschema != nil { // We added to the vschema. if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { From ed1beae5b3eb9279014643a533c5398ccba4060e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Aug 2023 15:25:29 -0400 Subject: [PATCH 11/18] Add named return values for workflow case Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 24a3c7675f4..dc30011d584 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -947,7 +947,7 @@ func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletI // MoveTablesCreate is part of the vtctlservicepb.VtctldServer interface. // It passes the embedded TabletRequest object to the given keyspace's // target primary tablets that will be executing the workflow. -func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest) (*vtctldatapb.WorkflowStatusResponse, error) { +func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest) (res *vtctldatapb.WorkflowStatusResponse, err error) { span, ctx := trace.NewSpan(ctx, "workflow.Server.MoveTablesCreate") defer span.Finish() @@ -964,7 +964,6 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl tables = req.IncludeTables externalTopo *topo.Server sourceTopo *topo.Server = s.ts - err error ) // When the source is an external cluster mounted using the Mount command. From d98416fb5e5597b97c081d3b3b37c5ee819fab63 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 15 Aug 2023 02:13:57 -0400 Subject: [PATCH 12/18] Improve vtctldclient unit test Signed-off-by: Matt Lord --- .../tabletmanager/rpc_vreplication_test.go | 159 +++++++++++++----- 1 file changed, 121 insertions(+), 38 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 56ad788daad..eb071c5202c 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -21,6 +21,7 @@ import ( "fmt" "math" "runtime/debug" + "strings" "testing" "github.com/stretchr/testify/require" @@ -29,7 +30,7 @@ import ( "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" - "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/workflow" @@ -66,6 +67,7 @@ const ( stopForCutover = "update _vt.vreplication set state='Stopped', message='stopped for cutover' where id=1" getMaxValForSequence = "select max(`id`) as maxval from `vt_%s`.`%s`" initSequenceTable = "insert into %a.%a (id, next_id, cache) values (0, %d, 1000) on duplicate key update next_id = if(next_id < %d, %d, next_id)" + deleteWorkflow = "delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'" ) var ( @@ -571,60 +573,139 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { } } -// TestNoOrphanedRoutingRulesOnFailedCreate tests that no orphaned routing rules -// are left in place when the workflow creation fails -- specifically at the point -// where we try and create the workflow streams. -func TestNoOrphanedRoutingRulesOnFailedCreate(t *testing.T) { +// TestFailedMoveTablesCreateCleanup tests that the workflow +// and its artifacts are cleaned up when the workflow creation +// fails -- specifically after the point where we have created +// the workflow streams. +func TestFailedMoveTablesCreateCleanup(t *testing.T) { ctx := context.Background() sourceKs := "sourceks" sourceTabletUID := 200 - sourceShard := "0" + shard := "0" + targetTabletUID := 300 targetKs := "targetks" - targetShards := make(map[string]*fakeTabletConn) wf := "testwf" table := defaultSchema.TableDefinitions[0].Name - tenv := newTestEnv(t, sourceKs, []string{sourceShard}) + invalidTimeZone := "NOPE" + bls := fmt.Sprintf("keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"%s\" filter:\"select * from %s\"}}", + sourceKs, shard, table, table) + tenv := newTestEnv(t, sourceKs, []string{shard}) defer tenv.close() ws := workflow.NewServer(tenv.ts, tenv.tmc) sourceTablet := tenv.addTablet(t, sourceTabletUID, sourceKs, shard) defer tenv.deleteTablet(sourceTablet.tablet) - targetShards["-80"] = tenv.addTablet(t, 300, targetKs, "-80") - defer tenv.deleteTablet(targetShards["-80"].tablet) - targetShards["80-"] = tenv.addTablet(t, 310, targetKs, "80-") - defer tenv.deleteTablet(targetShards["80-"].tablet) + targetTablet := tenv.addTablet(t, targetTabletUID, targetKs, shard) tenv.mysqld.Schema = defaultSchema tenv.mysqld.Schema.DatabaseSchema = tenv.dbName + tenv.mysqld.FetchSuperQueryMap = make(map[string]*sqltypes.Result) + tenv.mysqld.FetchSuperQueryMap[`select character_set_name, collation_name, column_name, data_type, column_type, extra from information_schema.columns where .*`] = sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "character_set_name|collation_name|column_name|data_type|column_type|extra", + "varchar|varchar|varchar|varchar|varchar|varchar", + ), + "NULL|NULL|id|bigint|bigint|", + "NULL|NULL|c2|bigint|bigint|", + ) - // The target keyspace is sharded. Let's remove any vschema table - // definitions so that we know the workflow creation will fail. - // Let's also be sure that the routing rules are empty. + // Let's be sure that the routing rules are empty to start. err := topotools.SaveRoutingRules(ctx, tenv.ts, nil) require.NoError(t, err, "failed to save routing rules") - err = tenv.ts.SaveVSchema(ctx, targetKs, &vschemapb.Keyspace{ - Sharded: true, - }) - require.NoError(t, err, "failed to save vschema") - err = tenv.ts.RebuildSrvVSchema(ctx, nil) - require.NoError(t, err, "failed to rebuild serving vschema") - err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tenv.ts, sourceKs, tenv.cells, false) - require.NoError(t, err, "failed to rebuild keyspace") - - for _, tablet := range targetShards { - tenv.tmc.setVReplicationExecResults(tablet.tablet, fmt.Sprintf(checkForWorkflow, targetKs, wf), &sqltypes.Result{}) - tenv.tmc.setVReplicationExecResults(tablet.tablet, fmt.Sprintf(checkForFrozenWorkflow, targetKs), &sqltypes.Result{}) - tenv.tmc.setVReplicationExecResults(tablet.tablet, fmt.Sprintf(getWorkflow, targetKs, wf), - sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "id", - "int64", - ), - "1", + + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(checkForWorkflow, targetKs, wf), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(checkForFrozenWorkflow, targetKs), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(getWorkflow, targetKs, wf), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id", + "int64", ), - ) - tablet.vrdbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) - } + "1", + ), + ) + targetTablet.vrdbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest( + fmt.Sprintf("%s %s", + insertVReplicationPrefix, + fmt.Sprintf(`values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"%s\" filter:\"select * from %s\"}} source_time_zone:\"%s\" target_time_zone:\"UTC\"', '', 0, 0, '%s', 'primary', now(), 0, 'Stopped', '%s', 1, 0, 0)`, + wf, sourceKs, shard, table, table, invalidTimeZone, strings.Join(tenv.cells, ","), tenv.dbName), + ), + &sqltypes.Result{ + RowsAffected: 1, + InsertID: 1, + }, + nil, + ) + targetTablet.vrdbClient.ExpectRequest(getAutoIncrementStep, &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(getVReplicationRecord, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|source", + "int64|varchar", + ), + fmt.Sprintf("1|%s", bls), + ), nil) + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(`update _vt.vreplication set message='Picked source tablet: cell:\"zone1\" uid:%d' where id=1`, sourceTabletUID), + &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(getWorkflowState, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", + "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", + ), + fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), + ), nil) + targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "count(distinct table_name)", + "int64", + ), + "1", + ), nil) + targetTablet.vrdbClient.ExpectRequest(getWorkflowState, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", + "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", + ), + fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), + ), nil) + targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "count(distinct table_name)", + "int64", + ), + "1", + ), nil) + targetTablet.vrdbClient.ExpectRequest(getBinlogRowImage, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "@@binlog_row_image", + "varchar", + ), + "FULL", + ), nil) + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(insertStreamsCreatedLog, bls), &sqltypes.Result{}, nil) + + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, + fmt.Sprintf("select convert_tz('2006-01-02 15:04:05', '%s', 'UTC')", invalidTimeZone), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + fmt.Sprintf("convert_tz('2006-01-02 15:04:05', '%s', 'UTC')", invalidTimeZone), + "datetime", + ), + "NULL", + ), + ) + + // We expect the workflow creation to fail due to the invalid time zone + // and thus the workflow iteslf to be cleaned up. + tenv.tmc.setVReplicationExecResults(sourceTablet.tablet, + fmt.Sprintf(deleteWorkflow, sourceKs, workflow.ReverseWorkflowName(wf)), + &sqltypes.Result{RowsAffected: 1}) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, + fmt.Sprintf(deleteWorkflow, targetKs, wf), + &sqltypes.Result{RowsAffected: 1}) _, err = ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{ Workflow: wf, @@ -633,8 +714,10 @@ func TestNoOrphanedRoutingRulesOnFailedCreate(t *testing.T) { Cells: tenv.cells, TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, IncludeTables: []string{table}, + SourceTimeZone: invalidTimeZone, }) - require.ErrorContains(t, err, fmt.Sprintf("table %s not found in vschema for keyspace %s", table, targetKs)) + log.Errorf("Error: %v", err) + require.ErrorContains(t, err, fmt.Sprintf("unable to perform time_zone conversions from %s to UTC", invalidTimeZone)) // Check that there are no orphaned routing rules. rules, err := topotools.GetRoutingRules(ctx, tenv.ts) From 5741fcf941438499e76258e480dfc44313c00dcd Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 15 Aug 2023 02:31:56 -0400 Subject: [PATCH 13/18] Minor tweaks Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 8 ++++---- go/vt/vttablet/tabletmanager/rpc_vreplication_test.go | 2 -- go/vt/wrangler/materializer.go | 8 ++++---- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index dc30011d584..77cbfd589c5 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1068,8 +1068,8 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } - // If we get an error after this point, where the vreplication streams/records have been - // created, then we clean up the workflow's artifacts. + // If we get an error after this point, where the vreplication streams/records + // have been created, then we clean up the workflow's artifacts. defer func() { if err != nil { ts, cerr := s.buildTrafficSwitcher(ctx, ms.TargetKeyspace, ms.Workflow) @@ -1085,8 +1085,8 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl // Now that the streams have been successfully created, let's put the associated // routing rules in place. if externalTopo == nil { - // Save routing rules before vschema. If we save vschema first, and routing rules - // fails to save, we may generate duplicate table errors. + // Save routing rules before vschema. If we save vschema first, and routing + // rules fails to save, we may generate duplicate table errors. if mz.isPartial { if err := createDefaultShardRoutingRules(mz.ctx, mz.ms, mz.ts); err != nil { return nil, err diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index eb071c5202c..950245ad339 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -30,7 +30,6 @@ import ( "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/workflow" @@ -716,7 +715,6 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { IncludeTables: []string{table}, SourceTimeZone: invalidTimeZone, }) - log.Errorf("Error: %v", err) require.ErrorContains(t, err, fmt.Sprintf("unable to perform time_zone conversions from %s to UTC", invalidTimeZone)) // Check that there are no orphaned routing rules. diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index ac6e6aacb71..028fc29925e 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -256,8 +256,8 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta return err } - // If we get an error after this point, where the vreplication streams/records have been - // created, then we clean up the workflow's artifacts. + // If we get an error after this point, where the vreplication streams/records + // have been created, then we clean up the workflow's artifacts. defer func() { if err != nil { ts, cerr := wr.buildTrafficSwitcher(ctx, ms.TargetKeyspace, ms.Workflow) @@ -273,8 +273,8 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta // Now that the streams have been successfully created, let's put the associated // routing rules in place. if externalTopo == nil { - // Save routing rules before vschema. If we save vschema first, and routing rules - // fails to save, we may generate duplicate table errors. + // Save routing rules before vschema. If we save vschema first, and routing + // rules fails to save, we may generate duplicate table errors. if mz.isPartial { if err := wr.createDefaultShardRoutingRules(ctx, ms); err != nil { return err From 982bc2f7214255fc3442ad0fc4cf0a014bf9b840 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 15 Aug 2023 02:39:21 -0400 Subject: [PATCH 14/18] Minor formatting improvements Signed-off-by: Matt Lord --- .../tabletmanager/rpc_vreplication_test.go | 88 ++++++++++++------- 1 file changed, 54 insertions(+), 34 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 950245ad339..3748279f3fd 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -644,46 +644,64 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { "int64|varchar", ), fmt.Sprintf("1|%s", bls), - ), nil) - targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(`update _vt.vreplication set message='Picked source tablet: cell:\"zone1\" uid:%d' where id=1`, sourceTabletUID), + ), + nil, + ) + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(`update _vt.vreplication set message='Picked source tablet: cell:\"zone1\" uid:%d' where id=1`, + sourceTabletUID), &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil) - targetTablet.vrdbClient.ExpectRequest(getWorkflowState, sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", - "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", + targetTablet.vrdbClient.ExpectRequest(getWorkflowState, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", + "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", + ), + fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), ), - fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), - ), nil) - targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable, sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "count(distinct table_name)", - "int64", + nil, + ) + targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "count(distinct table_name)", + "int64", + ), + "1", ), - "1", - ), nil) - targetTablet.vrdbClient.ExpectRequest(getWorkflowState, sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", - "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", + nil, + ) + targetTablet.vrdbClient.ExpectRequest(getWorkflowState, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", + "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", + ), + fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), ), - fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), - ), nil) - targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable, sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "count(distinct table_name)", - "int64", + nil, + ) + targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "count(distinct table_name)", + "int64", + ), + "1", ), - "1", - ), nil) - targetTablet.vrdbClient.ExpectRequest(getBinlogRowImage, sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "@@binlog_row_image", - "varchar", + nil, + ) + targetTablet.vrdbClient.ExpectRequest(getBinlogRowImage, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "@@binlog_row_image", + "varchar", + ), + "FULL", ), - "FULL", - ), nil) + nil, + ) targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(insertStreamsCreatedLog, bls), &sqltypes.Result{}, nil) tenv.tmc.setVReplicationExecResults(targetTablet.tablet, @@ -701,10 +719,12 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { // and thus the workflow iteslf to be cleaned up. tenv.tmc.setVReplicationExecResults(sourceTablet.tablet, fmt.Sprintf(deleteWorkflow, sourceKs, workflow.ReverseWorkflowName(wf)), - &sqltypes.Result{RowsAffected: 1}) + &sqltypes.Result{RowsAffected: 1}, + ) tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(deleteWorkflow, targetKs, wf), - &sqltypes.Result{RowsAffected: 1}) + &sqltypes.Result{RowsAffected: 1}, + ) _, err = ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{ Workflow: wf, From f0713ee9e3337ced0fec5f5764c5ceb744090af4 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 15 Aug 2023 02:52:06 -0400 Subject: [PATCH 15/18] Must stop nitting... Signed-off-by: Matt Lord --- .../vttablet/tabletmanager/rpc_vreplication_test.go | 10 +++++----- go/vt/wrangler/traffic_switcher_test.go | 13 +++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 3748279f3fd..e05d1f11be7 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -67,6 +67,7 @@ const ( getMaxValForSequence = "select max(`id`) as maxval from `vt_%s`.`%s`" initSequenceTable = "insert into %a.%a (id, next_id, cache) values (0, %d, 1000) on duplicate key update next_id = if(next_id < %d, %d, next_id)" deleteWorkflow = "delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'" + updatePickedSourceTablet = `update _vt.vreplication set message='Picked source tablet: cell:\"%s\" uid:%d' where id=1` ) var ( @@ -316,7 +317,7 @@ func TestMoveTables(t *testing.T) { ), fmt.Sprintf("1|%s", bls), ), nil) - ftc.vrdbClient.ExpectRequest(`update _vt.vreplication set message='Picked source tablet: cell:\"zone1\" uid:200' where id=1`, &sqltypes.Result{}, nil) + ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), &sqltypes.Result{}, nil) ftc.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil) ftc.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil) ftc.vrdbClient.ExpectRequest(getWorkflowState, sqltypes.MakeTestResult( @@ -647,8 +648,7 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { ), nil, ) - targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(`update _vt.vreplication set message='Picked source tablet: cell:\"zone1\" uid:%d' where id=1`, - sourceTabletUID), + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil) @@ -715,8 +715,8 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { ), ) - // We expect the workflow creation to fail due to the invalid time zone - // and thus the workflow iteslf to be cleaned up. + // We expect the workflow creation to fail due to the invalid time + // zone and thus the workflow iteslf to be cleaned up. tenv.tmc.setVReplicationExecResults(sourceTablet.tablet, fmt.Sprintf(deleteWorkflow, sourceKs, workflow.ReverseWorkflowName(wf)), &sqltypes.Result{RowsAffected: 1}, diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index f7339bef42a..75037cc975e 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -2146,17 +2146,18 @@ func TestIsPartialMoveTables(t *testing.T) { } } -// TestNoOrphanedRoutingRulesOnFailedCreate tests that no orphaned routing rules -// are left in place when the workflow creation fails -- specifically at the point -// where we try and create the workflow streams. +// TestNoOrphanedRoutingRulesOnFailedCreate tests that no orphaned +// routing rules are left in place when the workflow creation +// fails -- specifically at the point where we try and create the +// workflow streams. func TestNoOrphanedRoutingRulesOnFailedCreate(t *testing.T) { ctx := context.Background() tme := newTestTableMigraterCustom(ctx, t, []string{"0"}, []string{"-80", "80-"}, "select * %s") defer tme.close(t) - // The target keyspace is sharded. Let's remove any vschema table - // definitions so that we know the workflow creation will fail. - // Let's also be sure that the routing rules are empty. + // The target keyspace is sharded. Let's remove any vschema + // table definitions so that we know the workflow creation will + // fail. Let's also be sure that the routing rules are empty. err := topotools.SaveRoutingRules(ctx, tme.wr.ts, nil) require.NoError(t, err, "failed to save routing rules") err = tme.ts.SaveVSchema(ctx, "ks2", &vschemapb.Keyspace{ From 24d8332418bf95f7dc2b3f08e118cfe7922a85c3 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 20 Aug 2023 11:27:03 -0400 Subject: [PATCH 16/18] Don't fail e2e workflow if schema tracker is slow Signed-off-by: Matt Lord --- go/test/endtoend/utils/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/utils/utils.go b/go/test/endtoend/utils/utils.go index ff2d97d0cc8..c0137b27066 100644 --- a/go/test/endtoend/utils/utils.go +++ b/go/test/endtoend/utils/utils.go @@ -227,7 +227,7 @@ func AssertMatchesWithTimeout(t *testing.T, conn *mysql.Conn, query, expected st // WaitForAuthoritative waits for a table to become authoritative func WaitForAuthoritative(t *testing.T, ks, tbl string, readVSchema func() (*interface{}, error)) error { - timeout := time.After(10 * time.Second) + timeout := time.After(60 * time.Second) for { select { case <-timeout: From 80f8a0314687169298f98466e040e92ab4e461ce Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 20 Aug 2023 16:41:32 -0400 Subject: [PATCH 17/18] Turns out that WorkflowDelete isn't the right fit That assumes that all artifacts related to a successfully created workflow exist. In our case they do not (e.g. the target tables). So we add code to rollback any target vschema changes if we made any. Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 11 +++++++++++ go/vt/vttablet/tabletmanager/rpc_vreplication_test.go | 9 +++++++++ go/vt/wrangler/materializer.go | 10 ++++++++++ 3 files changed, 30 insertions(+) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 77cbfd589c5..3ce5abffed8 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -28,6 +28,7 @@ import ( "golang.org/x/sync/semaphore" "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/mysql/sqlerror" @@ -977,6 +978,7 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } var vschema *vschemapb.Keyspace + var origVSchema *vschemapb.Keyspace // If we need to rollback a failed create vschema, err = s.ts.GetVSchema(ctx, targetKeyspace) if err != nil { return nil, err @@ -1019,6 +1021,9 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl log.Infof("Found tables to move: %s", strings.Join(tables, ",")) if !vschema.Sharded { + // Save the original in case we need to restore it for a late failure + // in the defer(). + origVSchema = proto.Clone(vschema).(*vschemapb.Keyspace) if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil { return nil, err } @@ -1079,6 +1084,12 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil { err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr) } + if origVSchema == nil { // There's no previous version to restore + return + } + if cerr := s.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil { + err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr) + } } }() diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index e05d1f11be7..b7a76add581 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -726,6 +726,10 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { &sqltypes.Result{RowsAffected: 1}, ) + // Save the current target vschema. + vs, err := tenv.ts.GetVSchema(ctx, targetKs) + require.NoError(t, err, "failed to get target vschema") + _, err = ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{ Workflow: wf, SourceKeyspace: sourceKs, @@ -741,4 +745,9 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { rules, err := topotools.GetRoutingRules(ctx, tenv.ts) require.NoError(t, err, "failed to get routing rules") require.Equal(t, 0, len(rules), "expected no routing rules to be present") + + // Check that our vschema changes were also rolled back. + vs2, err := tenv.ts.GetVSchema(ctx, targetKs) + require.NoError(t, err, "failed to get target vschema") + require.Equal(t, vs, vs2, "expected vschema to be unchanged") } diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 028fc29925e..28a48f344ac 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -144,6 +144,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta } var vschema *vschemapb.Keyspace + var origVSchema *vschemapb.Keyspace // If we need to rollback a failed create vschema, err = wr.ts.GetVSchema(ctx, targetKeyspace) if err != nil { return err @@ -206,6 +207,9 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta log.Infof("Found tables to move: %s", strings.Join(tables, ",")) if !vschema.Sharded { + // Save the original in case we need to restore it for a late failure + // in the defer(). + origVSchema = proto.Clone(vschema).(*vschemapb.Keyspace) if err := wr.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil { return err } @@ -267,6 +271,12 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta if cerr := wr.dropArtifacts(ctx, false, &switcher{ts: ts, wr: wr}); cerr != nil { err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr) } + if origVSchema == nil { // There's no previous version to restore + return + } + if cerr := wr.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil { + err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr) + } } }() From dc2742f17ba7fdd469b17eeb39456ed4a8ad2c9a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 20 Aug 2023 16:49:22 -0400 Subject: [PATCH 18/18] Address review comment Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/rpc_vreplication_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index b7a76add581..942ef201dbe 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -596,6 +596,7 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { sourceTablet := tenv.addTablet(t, sourceTabletUID, sourceKs, shard) defer tenv.deleteTablet(sourceTablet.tablet) targetTablet := tenv.addTablet(t, targetTabletUID, targetKs, shard) + defer tenv.deleteTablet(targetTablet.tablet) tenv.mysqld.Schema = defaultSchema tenv.mysqld.Schema.DatabaseSchema = tenv.dbName