diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index 8b407dbcb0c..fb0c4e9d734 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -92,18 +92,18 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M logger := logutil.NewConsoleLogger() require.NoError(t, topoServ.RebuildSrvVSchema(ctx, []string{"cell"})) - tabletID := 100 + tabletID := startingSourceTabletUID sourceShardsMap := make(map[string]any) for _, shard := range sourceShards { sourceShardsMap[shard] = nil require.NoError(t, topoServ.CreateShard(ctx, ms.SourceKeyspace, shard)) _ = env.addTablet(t, tabletID, env.ms.SourceKeyspace, shard, topodatapb.TabletType_PRIMARY) - tabletID += 10 + tabletID += tabletUIDStep } require.NoError(t, topotools.RebuildKeyspace(ctx, logger, topoServ, ms.SourceKeyspace, []string{"cell"}, false)) - tabletID = 200 + tabletID = startingTargetTabletUID for _, shard := range targetShards { if ms.SourceKeyspace == ms.TargetKeyspace { if _, ok := sourceShardsMap[shard]; ok { @@ -112,7 +112,7 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M } require.NoError(t, topoServ.CreateShard(ctx, ms.TargetKeyspace, shard)) _ = env.addTablet(t, tabletID, env.ms.TargetKeyspace, shard, topodatapb.TabletType_PRIMARY) - tabletID += 10 + tabletID += tabletUIDStep } for _, ts := range ms.TableSettings { @@ -230,6 +230,9 @@ type testMaterializerTMClient struct { // Used to override the response to ReadVReplicationWorkflow. readVReplicationWorkflow readVReplicationWorkflowFunc + + // Responses to GetSchema RPCs for individual tablets. + getSchemaResponses map[uint32]*tabletmanagerdatapb.SchemaDefinition } func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSettings []*vtctldatapb.TableMaterializeSettings) *testMaterializerTMClient { @@ -240,6 +243,7 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe tableSettings: tableSettings, vrQueries: make(map[int][]*queryResult), createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse), + getSchemaResponses: make(map[uint32]*tabletmanagerdatapb.SchemaDefinition), } } @@ -314,10 +318,24 @@ func (tmc *testMaterializerTMClient) DeleteVReplicationWorkflow(ctx context.Cont }, nil } +func (tmc *testMaterializerTMClient) SetGetSchemaResponse(tabletUID int, res *tabletmanagerdatapb.SchemaDefinition) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + if tmc.getSchemaResponses == nil { + tmc.getSchemaResponses = make(map[uint32]*tabletmanagerdatapb.SchemaDefinition) + } + tmc.getSchemaResponses[uint32(tabletUID)] = res +} + func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) { tmc.mu.Lock() defer tmc.mu.Unlock() + if tmc.getSchemaResponses != nil && tmc.getSchemaResponses[tablet.Alias.Uid] != nil { + return tmc.getSchemaResponses[tablet.Alias.Uid], nil + } + schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} for _, table := range request.Tables { if table == "/.*/" { diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index bc33a81633e..c84388128a6 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -601,6 +601,289 @@ func TestMoveTablesDDLFlag(t *testing.T) { } } +func TestShardedAutoIncHandling(t *testing.T) { + t1DDL := "create table t1 (id int not null auto_increment primary key, c1 varchar(10))" + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "workflow", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: "t1", + CreateDdl: t1DDL, + SourceExpression: "select * from t1", + }}, + WorkflowOptions: &vtctldatapb.WorkflowOptions{}, + } + + type testcase struct { + name string + value vtctldatapb.ShardedAutoIncrementHandling + globalKeyspace string + targetShards []string + targetVSchema *vschemapb.Keyspace + wantTargetVSchema *vschemapb.Keyspace + expectQueries []string + expectErr string + } + testcases := []testcase{ + { + name: "global-keyspace does not exist", + globalKeyspace: "foo", + expectErr: "global-keyspace foo does not exist", + }, + { + name: "leave", + globalKeyspace: "sourceks", + targetShards: []string{"-80", "80-"}, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Name: "xxhash", + Column: "id", + }, + }, + }, + }, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + }, + value: vtctldatapb.ShardedAutoIncrementHandling_LEAVE, + wantTargetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Name: "xxhash", + Column: "id", + }, + }, + }, + }, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + }, + expectQueries: []string{ + t1DDL, // Unchanged + }, + }, + { + name: "remove", + globalKeyspace: "sourceks", + targetShards: []string{"-80", "80-"}, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Name: "xxhash", + Column: "id", + }, + }, + }, + }, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + }, + value: vtctldatapb.ShardedAutoIncrementHandling_REMOVE, + wantTargetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Name: "xxhash", + Column: "id", + }, + }, + }, + }, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + }, + expectQueries: []string{ // auto_increment clause removed + `create table t1 ( + id int not null primary key, + c1 varchar(10) +)`, + }, + }, + { + name: "replace, but vschema AutoIncrement already in place", + globalKeyspace: "sourceks", + targetShards: []string{"-80", "80-"}, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Name: "xxhash", + Column: "id", + }, + }, + AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition exists + Column: "id", + Sequence: "t1_non_default_seq_name", + }, + }, + }, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + }, + value: vtctldatapb.ShardedAutoIncrementHandling_REPLACE, + wantTargetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Name: "xxhash", + Column: "id", + }, + }, + AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition left alone + Column: "id", + Sequence: "t1_non_default_seq_name", + }, + }, + }, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + }, + expectQueries: []string{ // auto_increment clause removed + `create table t1 ( + id int not null primary key, + c1 varchar(10) +)`, + }, + }, + { + name: "replace", + globalKeyspace: "sourceks", + targetShards: []string{"-80", "80-"}, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Name: "xxhash", + Column: "id", + }, + }, + }, + }, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + }, + value: vtctldatapb.ShardedAutoIncrementHandling_REPLACE, + wantTargetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Name: "xxhash", + Column: "id", + }, + }, + AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition added + Column: "id", + Sequence: "t1_seq", + }, + }, + }, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + }, + expectQueries: []string{ // auto_increment clause removed + `create table t1 ( + id int not null primary key, + c1 varchar(10) +)`, + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if tc.targetShards == nil { + tc.targetShards = []string{"0"} + } + env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, tc.targetShards) + defer env.close() + + env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) + for i := range tc.targetShards { + uid := startingTargetTabletUID + (i * tabletUIDStep) + for _, query := range tc.expectQueries { + env.tmc.expectVRQuery(uid, query, &sqltypes.Result{}) + } + env.tmc.expectVRQuery(uid, mzGetCopyState, &sqltypes.Result{}) + env.tmc.expectVRQuery(uid, mzGetLatestCopyState, &sqltypes.Result{}) + env.tmc.SetGetSchemaResponse(uid, &tabletmanagerdatapb.SchemaDefinition{}) // So that the schema is copied from the source + } + + if tc.targetVSchema != nil { + err := env.ws.ts.SaveVSchema(ctx, ms.TargetKeyspace, tc.targetVSchema) + require.NoError(t, err) + } + + _, err := env.ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{ + Workflow: ms.Workflow, + SourceKeyspace: ms.SourceKeyspace, + TargetKeyspace: ms.TargetKeyspace, + IncludeTables: []string{"t1"}, + WorkflowOptions: &vtctldatapb.WorkflowOptions{ + StripShardedAutoIncrement: tc.value, + GlobalKeyspace: tc.globalKeyspace, + }, + }) + if tc.expectErr != "" { + require.EqualError(t, err, tc.expectErr) + } else { + require.NoError(t, err) + if tc.wantTargetVSchema != nil { + targetVSchema, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace) + require.NoError(t, err) + require.True(t, proto.Equal(targetVSchema, tc.wantTargetVSchema)) + } + } + }) + } +} + // TestMoveTablesNoRoutingRules confirms that MoveTables does not create routing rules if --no-routing-rules is specified. func TestMoveTablesNoRoutingRules(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 2b5489a797e..c247ed91e9a 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1440,8 +1440,8 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl s.Logger().Infof("Found tables to move: %s", strings.Join(tables, ",")) if !vschema.Sharded { - // Save the original in case we need to restore it for a late failure - // in the defer(). + // Save the original in case we need to restore it for a late failure in + // the defer(). origVSchema = vschema.CloneVT() if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil { return nil, err