Skip to content

Commit

Permalink
Workflow Status: change logic to determine whether MoveTables write…
Browse files Browse the repository at this point in the history
…s are switched (#16731)

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps authored Sep 20, 2024
1 parent e68bdd5 commit 490bb0c
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 43 deletions.
43 changes: 28 additions & 15 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ const (
tabletUIDStep = 10
)

var defaultTabletTypes = []topodatapb.TabletType{
topodatapb.TabletType_PRIMARY,
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}

type testKeyspace struct {
KeyspaceName string
ShardNames []string
Expand Down Expand Up @@ -199,30 +205,38 @@ func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspac
return tablet
}

// addTableRoutingRules adds routing rules from the test env's source keyspace to
// its target keyspace for the given tablet types and tables.
func (env *testEnv) addTableRoutingRules(t *testing.T, ctx context.Context, tabletTypes []topodatapb.TabletType, tables []string) {
ks := env.targetKeyspace.KeyspaceName
rules := make(map[string][]string, len(tables)*(len(tabletTypes)*3))
func (env *testEnv) saveRoutingRules(t *testing.T, rules map[string][]string) {
err := topotools.SaveRoutingRules(context.Background(), env.ts, rules)
require.NoError(t, err)
err = env.ts.RebuildSrvVSchema(context.Background(), nil)
require.NoError(t, err)
}

func (env *testEnv) updateTableRoutingRules(t *testing.T, ctx context.Context,
tabletTypes []topodatapb.TabletType, tables []string, sourceKeyspace, targetKeyspace, toKeyspace string) {

if len(tabletTypes) == 0 {
tabletTypes = defaultTabletTypes
}
rr, err := env.ts.GetRoutingRules(ctx)
require.NoError(t, err)
rules := topotools.GetRoutingRulesMap(rr)
for _, tabletType := range tabletTypes {
for _, tableName := range tables {
toTarget := []string{ks + "." + tableName}
toTarget := []string{toKeyspace + "." + tableName}
tt := strings.ToLower(tabletType.String())
if tabletType == topodatapb.TabletType_PRIMARY {
rules[tableName] = toTarget
rules[ks+"."+tableName] = toTarget
rules[env.sourceKeyspace.KeyspaceName+"."+tableName] = toTarget
rules[targetKeyspace+"."+tableName] = toTarget
rules[sourceKeyspace+"."+tableName] = toTarget
} else {
rules[tableName+"@"+tt] = toTarget
rules[ks+"."+tableName+"@"+tt] = toTarget
rules[env.sourceKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget
rules[targetKeyspace+"."+tableName+"@"+tt] = toTarget
rules[sourceKeyspace+"."+tableName+"@"+tt] = toTarget
}
}
}
err := topotools.SaveRoutingRules(ctx, env.ts, rules)
require.NoError(t, err)
err = env.ts.RebuildSrvVSchema(ctx, nil)
require.NoError(t, err)
env.saveRoutingRules(t, rules)
}

func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) {
Expand Down Expand Up @@ -305,7 +319,6 @@ func (tmc *testTMClient) GetWorkflowKey(keyspace, shard string) string {
func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if expect := tmc.readVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, req) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ReadVReplicationWorkflow request: got %+v, want %+v", req, expect)
Expand Down
24 changes: 15 additions & 9 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ func (s *Server) GetCellsWithShardReadsSwitched(
// keyspace.
func (s *Server) GetCellsWithTableReadsSwitched(
ctx context.Context,
keyspace string,
sourceKeyspace string,
targetKeyspace string,
table string,
tabletType topodatapb.TabletType,
) (cellsSwitched []string, cellsNotSwitched []string, err error) {
Expand Down Expand Up @@ -330,7 +331,7 @@ func (s *Server) GetCellsWithTableReadsSwitched(
)

for _, rule := range srvVSchema.RoutingRules.Rules {
ruleName := fmt.Sprintf("%s.%s@%s", keyspace, table, strings.ToLower(tabletType.String()))
ruleName := fmt.Sprintf("%s.%s@%s", sourceKeyspace, table, strings.ToLower(tabletType.String()))
if rule.FromTable == ruleName {
found = true

Expand All @@ -341,7 +342,7 @@ func (s *Server) GetCellsWithTableReadsSwitched(
return nil, nil, err
}

if ks == keyspace {
if ks != sourceKeyspace {
switched = true
break // if one table in the workflow switched, we are done.
}
Expand Down Expand Up @@ -945,6 +946,10 @@ ORDER BY
}, nil
}

func (s *Server) GetWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) {
return s.getWorkflowState(ctx, targetKeyspace, workflowName)
}

func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) {
ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName)
if err != nil {
Expand Down Expand Up @@ -1014,12 +1019,12 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
}
}
} else {
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_RDONLY)
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}

state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_REPLICA)
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_REPLICA)
if err != nil {
return nil, nil, err
}
Expand All @@ -1028,10 +1033,11 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
return nil, nil, err
}
for _, table := range ts.Tables() {
rr := globalRules[table]
// If a rule exists for the table and points to the target keyspace, then
// writes have been switched.
if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", targetKeyspace, table) {
// If a rule for the primary tablet type exists for any table and points to the target keyspace,
// then writes have been switched.
ruleKey := fmt.Sprintf("%s.%s", sourceKeyspace, table)
rr := globalRules[ruleKey]
if len(rr) > 0 && rr[0] != ruleKey {
state.WritesSwitched = true
break
}
Expand Down
38 changes: 24 additions & 14 deletions go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,6 @@ func TestMoveTablesComplete(t *testing.T) {
tableTemplate := "CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))"
sourceKeyspaceName := "sourceks"
targetKeyspaceName := "targetks"
tabletTypes := []topodatapb.TabletType{
topodatapb.TabletType_PRIMARY,
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
lockName := fmt.Sprintf("%s/%s", targetKeyspaceName, workflowName)
schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
table1Name: {
Expand Down Expand Up @@ -641,7 +636,8 @@ func TestMoveTablesComplete(t *testing.T) {
tc.preFunc(t, env)
}
// Setup the routing rules as they would be after having previously done SwitchTraffic.
env.addTableRoutingRules(t, ctx, tabletTypes, []string{table1Name, table2Name, table3Name})
env.updateTableRoutingRules(t, ctx, nil, []string{table1Name, table2Name, table3Name},
tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName)
got, err := env.ws.MoveTablesComplete(ctx, tc.req)
if tc.wantErr != "" {
require.EqualError(t, err, tc.wantErr)
Expand Down Expand Up @@ -1154,7 +1150,8 @@ func TestMoveTablesTrafficSwitching(t *testing.T) {
} else {
env.tmc.reverse.Store(true)
// Setup the routing rules as they would be after having previously done SwitchTraffic.
env.addTableRoutingRules(t, ctx, tabletTypes, []string{tableName})
env.updateTableRoutingRules(t, ctx, tabletTypes, []string{tableName},
tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName)
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR)
for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR)
Expand Down Expand Up @@ -1372,7 +1369,8 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) {
} else {
env.tmc.reverse.Store(true)
// Setup the routing rules as they would be after having previously done SwitchTraffic.
env.addTableRoutingRules(t, ctx, tabletTypes, tables)
env.updateTableRoutingRules(t, ctx, tabletTypes, tables,
tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName)
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR)
for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR)
Expand Down Expand Up @@ -1411,6 +1409,15 @@ func TestMirrorTraffic(t *testing.T) {
topodatapb.TabletType_RDONLY,
}

initialRoutingRules := map[string][]string{
fmt.Sprintf("%s.%s", sourceKs, table1): {fmt.Sprintf("%s.%s", sourceKs, table1)},
fmt.Sprintf("%s.%s", sourceKs, table2): {fmt.Sprintf("%s.%s", sourceKs, table2)},
fmt.Sprintf("%s.%s@replica", sourceKs, table1): {fmt.Sprintf("%s.%s@replica", sourceKs, table1)},
fmt.Sprintf("%s.%s@replica", sourceKs, table2): {fmt.Sprintf("%s.%s@replica", sourceKs, table2)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table1): {fmt.Sprintf("%s.%s@rdonly", sourceKs, table1)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table2): {fmt.Sprintf("%s.%s@rdonly", sourceKs, table2)},
}

tests := []struct {
name string

Expand Down Expand Up @@ -1498,8 +1505,8 @@ func TestMirrorTraffic(t *testing.T) {
Percent: 50.0,
},
routingRules: map[string][]string{
fmt.Sprintf("%s.%s@rdonly", targetKs, table1): {fmt.Sprintf("%s.%s@rdonly", targetKs, table1)},
fmt.Sprintf("%s.%s@rdonly", targetKs, table2): {fmt.Sprintf("%s.%s@rdonly", targetKs, table2)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table1): {fmt.Sprintf("%s.%s@rdonly", targetKs, table1)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table2): {fmt.Sprintf("%s.%s@rdonly", targetKs, table2)},
},
wantErr: "cannot mirror [rdonly] traffic for workflow src2target at this time: traffic for those tablet types is switched",
wantMirrorRules: make(map[string]map[string]float32),
Expand All @@ -1513,8 +1520,8 @@ func TestMirrorTraffic(t *testing.T) {
Percent: 50.0,
},
routingRules: map[string][]string{
fmt.Sprintf("%s.%s@replica", targetKs, table1): {fmt.Sprintf("%s.%s@replica", targetKs, table1)},
fmt.Sprintf("%s.%s@replica", targetKs, table2): {fmt.Sprintf("%s.%s@replica", targetKs, table2)},
fmt.Sprintf("%s.%s@replica", sourceKs, table1): {fmt.Sprintf("%s.%s@replica", targetKs, table1)},
fmt.Sprintf("%s.%s@replica", sourceKs, table2): {fmt.Sprintf("%s.%s@replica", targetKs, table2)},
},
wantErr: "cannot mirror [replica] traffic for workflow src2target at this time: traffic for those tablet types is switched",
wantMirrorRules: make(map[string]map[string]float32),
Expand All @@ -1528,8 +1535,8 @@ func TestMirrorTraffic(t *testing.T) {
Percent: 50.0,
},
routingRules: map[string][]string{
table1: {fmt.Sprintf("%s.%s", targetKs, table1)},
table2: {fmt.Sprintf("%s.%s", targetKs, table2)},
fmt.Sprintf("%s.%s", sourceKs, table1): {fmt.Sprintf("%s.%s", targetKs, table1)},
fmt.Sprintf("%s.%s", sourceKs, table2): {fmt.Sprintf("%s.%s", targetKs, table2)},
},
wantErr: "cannot mirror [primary] traffic for workflow src2target at this time: traffic for those tablet types is switched",
wantMirrorRules: make(map[string]map[string]float32),
Expand Down Expand Up @@ -1610,6 +1617,7 @@ func TestMirrorTraffic(t *testing.T) {
TabletTypes: tabletTypes,
Percent: 50.0,
},
routingRules: initialRoutingRules,
wantMirrorRules: map[string]map[string]float32{
fmt.Sprintf("%s.%s", sourceKs, table1): {
fmt.Sprintf("%s.%s", targetKs, table1): 50.0,
Expand Down Expand Up @@ -1644,6 +1652,7 @@ func TestMirrorTraffic(t *testing.T) {
TabletTypes: tabletTypes,
Percent: 50.0,
},
routingRules: initialRoutingRules,
wantMirrorRules: map[string]map[string]float32{
fmt.Sprintf("%s.%s", sourceKs, table1): {
fmt.Sprintf("%s.%s", targetKs, table1): 50.0,
Expand Down Expand Up @@ -1681,6 +1690,7 @@ func TestMirrorTraffic(t *testing.T) {
fmt.Sprintf("%s.%s", targetKs, table1): 25.0,
},
},
routingRules: initialRoutingRules,
req: &vtctldatapb.WorkflowMirrorTrafficRequest{
Keyspace: targetKs,
Workflow: workflow,
Expand Down
Loading

0 comments on commit 490bb0c

Please sign in to comment.