-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
VtctldClient Reshard: add e2e tests to confirm CLI options and fix discovered issues. #15353
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,6 @@ import ( | |
"google.golang.org/protobuf/encoding/protojson" | ||
|
||
"vitess.io/vitess/go/test/endtoend/cluster" | ||
|
||
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" | ||
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" | ||
topodatapb "vitess.io/vitess/go/vt/proto/topodata" | ||
|
@@ -54,20 +53,33 @@ func TestVtctldclientCLI(t *testing.T) { | |
require.NotNil(t, zone2) | ||
defer vc.TearDown() | ||
|
||
sourceKeyspace := "product" | ||
targetKeyspace := "customer" | ||
sourceKeyspaceName := "product" | ||
targetKeyspaceName := "customer" | ||
var mt iMoveTables | ||
workflowName := "wf1" | ||
targetTabs := setupMinimalCustomerKeyspace(t) | ||
|
||
t.Run("MoveTablesCreateFlags1", func(t *testing.T) { | ||
testMoveTablesFlags1(t, &mt, sourceKeyspace, targetKeyspace, workflowName, targetTabs) | ||
testMoveTablesFlags1(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs) | ||
}) | ||
t.Run("MoveTablesCreateFlags2", func(t *testing.T) { | ||
testMoveTablesFlags2(t, &mt, sourceKeyspace, targetKeyspace, workflowName, targetTabs) | ||
testMoveTablesFlags2(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs) | ||
}) | ||
t.Run("MoveTablesCompleteFlags3", func(t *testing.T) { | ||
testMoveTablesFlags3(t, sourceKeyspaceName, targetKeyspaceName, targetTabs) | ||
}) | ||
t.Run("MoveTablesCompleteFlags", func(t *testing.T) { | ||
testMoveTablesFlags3(t, sourceKeyspace, targetKeyspace, targetTabs) | ||
t.Run("Reshard", func(t *testing.T) { | ||
cell := vc.Cells["zone1"] | ||
targetKeyspace := cell.Keyspaces[targetKeyspaceName] | ||
sourceShard := "-80" | ||
newShards := "-40,40-80" | ||
require.NoError(t, vc.AddShards(t, []*Cell{cell}, targetKeyspace, newShards, 1, 0, 400, nil)) | ||
reshardWorkflowName := "reshard" | ||
tablets := map[string]*cluster.VttabletProcess{ | ||
"-40": targetKeyspace.Shards["-40"].Tablets["zone1-400"].Vttablet, | ||
"40-80": targetKeyspace.Shards["40-80"].Tablets["zone1-500"].Vttablet, | ||
} | ||
splitShard(t, targetKeyspaceName, reshardWorkflowName, sourceShard, newShards, tablets) | ||
}) | ||
} | ||
|
||
|
@@ -81,34 +93,31 @@ func testMoveTablesFlags1(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK | |
} | ||
completeFlags := []string{"--keep-routing-rules", "--keep-data"} | ||
switchFlags := []string{} | ||
// Test one set of MoveTable flags. | ||
*mt = createMoveTables(t, sourceKeyspace, targetKeyspace, workflowName, tables, createFlags, completeFlags, switchFlags) | ||
(*mt).Show() | ||
moveTablesOutput := (*mt).GetLastOutput() | ||
// Test one set of MoveTable flags. | ||
moveTablesResponse := getMoveTablesShowResponse(mt) | ||
workflowResponse := getWorkflow(targetKeyspace, workflowName) | ||
|
||
workflowOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", "customer", "show", "--workflow", "wf1") | ||
require.NoError(t, err) | ||
var moveTablesResponse vtctldatapb.GetWorkflowsResponse | ||
err = protojson.Unmarshal([]byte(moveTablesOutput), &moveTablesResponse) | ||
require.NoError(t, err) | ||
|
||
var workflowResponse vtctldatapb.GetWorkflowsResponse | ||
err = protojson.Unmarshal([]byte(workflowOutput), &workflowResponse) | ||
require.NoError(t, err) | ||
|
||
moveTablesResponse.Workflows[0].MaxVReplicationTransactionLag = 0 | ||
moveTablesResponse.Workflows[0].MaxVReplicationLag = 0 | ||
workflowResponse.Workflows[0].MaxVReplicationTransactionLag = 0 | ||
workflowResponse.Workflows[0].MaxVReplicationLag = 0 | ||
// also validates that MoveTables Show and Workflow Show return the same output. | ||
require.EqualValues(t, moveTablesResponse.CloneVT(), workflowResponse.CloneVT()) | ||
require.EqualValues(t, moveTablesResponse.CloneVT(), workflowResponse) | ||
|
||
// Validate that the flags are set correctly in the database. | ||
validateWorkflow1(t, workflowResponse.Workflows) | ||
validateMoveTablesWorkflow(t, workflowResponse.Workflows) | ||
// Since we used --no-routing-rules, there should be no routing rules. | ||
confirmNoRoutingRules(t) | ||
} | ||
|
||
func getMoveTablesShowResponse(mt *iMoveTables) *vtctldatapb.GetWorkflowsResponse { | ||
moveTablesOutput := (*mt).GetLastOutput() | ||
var moveTablesResponse vtctldatapb.GetWorkflowsResponse | ||
err := protojson.Unmarshal([]byte(moveTablesOutput), &moveTablesResponse) | ||
require.NoError(vc.t, err) | ||
moveTablesResponse.Workflows[0].MaxVReplicationTransactionLag = 0 | ||
moveTablesResponse.Workflows[0].MaxVReplicationLag = 0 | ||
return moveTablesResponse.CloneVT() | ||
} | ||
|
||
// Validates some of the flags created from the previous test. | ||
func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetKeyspace, workflowName string, targetTabs map[string]*cluster.VttabletProcess) { | ||
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName) | ||
|
@@ -184,6 +193,135 @@ func createMoveTables(t *testing.T, sourceKeyspace, targetKeyspace, workflowName | |
return mt | ||
} | ||
|
||
// reshard helpers | ||
|
||
func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards string, targetTabs map[string]*cluster.VttabletProcess) { | ||
createFlags := []string{"--auto-start=false", "--defer-secondary-keys=false", "--stop-after-copy", | ||
"--on-ddl", "STOP", "--tablet-types", "primary,rdonly", "--tablet-types-in-preference-order=true", | ||
"--all-cells", "--format=json", | ||
} | ||
rs := newReshard(vc, &reshardWorkflow{ | ||
workflowInfo: &workflowInfo{ | ||
vc: vc, | ||
workflowName: workflowName, | ||
targetKeyspace: keyspace, | ||
}, | ||
sourceShards: sourceShards, | ||
targetShards: targetShards, | ||
createFlags: createFlags, | ||
}, workflowFlavorVtctld) | ||
|
||
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName) | ||
rs.Create() | ||
validateReshardResponse(rs) | ||
workflowResponse := getWorkflow(keyspace, workflowName) | ||
reshardShowResponse := getReshardShowResponse(&rs) | ||
require.EqualValues(t, reshardShowResponse, workflowResponse) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious why we use CloneVT() in some places since we don't do it everywhere. Probably worth a comment somewhere. 🙂 |
||
validateReshardWorkflow(t, workflowResponse.Workflows) | ||
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Stopped.String()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, but we can use |
||
rs.Start() | ||
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String()) | ||
for _, tab := range targetTabs { | ||
alias := fmt.Sprintf("zone1-%d", tab.TabletUID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, but would be nicer to make the cell name a variable if not already. And then use the variable. |
||
query := "update _vt.vreplication set source := replace(source, 'stop_after_copy:true', 'stop_after_copy:false') where db_name = 'vt_customer' and workflow = '" + workflowName + "'" | ||
output, err := vc.VtctlClient.ExecuteCommandWithOutput("ExecuteFetchAsDba", alias, query) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, but IMO we should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 yes please and thank you :) |
||
require.NoError(t, err, output) | ||
} | ||
rs.Start() | ||
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) | ||
rs.Stop() | ||
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String()) | ||
rs.Start() | ||
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) | ||
Comment on lines
+231
to
+235
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couple more places where we can use the |
||
for _, targetTab := range targetTabs { | ||
catchup(t, targetTab, workflowName, "Reshard") | ||
} | ||
vdiff(t, keyspace, workflowName, "zone1", false, true, nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another place where the cell as a var would be nicer in case that is changed elsewhere later. |
||
|
||
rs.SwitchReadsAndWrites() | ||
waitForLowLag(t, keyspace, workflowName+"_reverse") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, but IMO it's nicer to use |
||
vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil) | ||
|
||
rs.ReverseReadsAndWrites() | ||
waitForLowLag(t, keyspace, workflowName) | ||
vdiff(t, keyspace, workflowName, "zone1", false, true, nil) | ||
rs.SwitchReadsAndWrites() | ||
rs.Complete() | ||
} | ||
|
||
func getReshardShowResponse(rs *iReshard) *vtctldatapb.GetWorkflowsResponse { | ||
(*rs).Show() | ||
reshardOutput := (*rs).GetLastOutput() | ||
var reshardResponse vtctldatapb.GetWorkflowsResponse | ||
err := protojson.Unmarshal([]byte(reshardOutput), &reshardResponse) | ||
require.NoError(vc.t, err) | ||
reshardResponse.Workflows[0].MaxVReplicationTransactionLag = 0 | ||
reshardResponse.Workflows[0].MaxVReplicationLag = 0 | ||
Comment on lines
+258
to
+259
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another place where I'm not sure why we're doing this so a comment would be nice. |
||
return reshardResponse.CloneVT() | ||
} | ||
|
||
func validateReshardResponse(rs iReshard) { | ||
resp := getReshardResponse(rs) | ||
require.NotNil(vc.t, resp) | ||
require.NotNil(vc.t, resp.ShardStreams) | ||
require.Equal(vc.t, len(resp.ShardStreams), 2) | ||
keyspace := "customer" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use the |
||
for _, shard := range []string{"-40", "40-80"} { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use the variable here to support other kinds of reshards:
|
||
streams := resp.ShardStreams[fmt.Sprintf("%s/%s", keyspace, shard)] | ||
require.Equal(vc.t, 1, len(streams.Streams)) | ||
require.Equal(vc.t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), streams.Streams[0].Status) | ||
} | ||
} | ||
|
||
func validateReshardWorkflow(t *testing.T, workflows []*vtctldatapb.Workflow) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO we should pass |
||
require.Equal(t, 1, len(workflows)) | ||
wf := workflows[0] | ||
require.Equal(t, "reshard", wf.Name) | ||
require.Equal(t, binlogdatapb.VReplicationWorkflowType_Reshard.String(), wf.WorkflowType) | ||
require.Equal(t, "None", wf.WorkflowSubType) | ||
require.Equal(t, "customer", wf.Target.Keyspace) | ||
require.Equal(t, 2, len(wf.Target.Shards)) | ||
require.Equal(t, "customer", wf.Source.Keyspace) | ||
require.Equal(t, 1, len(wf.Source.Shards)) | ||
require.False(t, wf.DeferSecondaryKeys) | ||
|
||
require.GreaterOrEqual(t, len(wf.ShardStreams), int(1)) | ||
oneStream := maps.Values(wf.ShardStreams)[0] | ||
require.NotNil(t, oneStream) | ||
|
||
stream := oneStream.Streams[0] | ||
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), stream.State) | ||
require.Equal(t, stream.TabletSelectionPreference, tabletmanagerdatapb.TabletSelectionPreference_INORDER) | ||
require.True(t, slices.Equal([]topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_RDONLY}, stream.TabletTypes)) | ||
require.True(t, slices.Equal([]string{"zone1", "zone2"}, stream.Cells)) | ||
|
||
bls := stream.BinlogSource | ||
require.Equal(t, binlogdatapb.OnDDLAction_STOP, bls.OnDdl) | ||
require.True(t, bls.StopAfterCopy) | ||
|
||
} | ||
|
||
func getReshardResponse(rs iReshard) *vtctldatapb.WorkflowStatusResponse { | ||
reshardOutput := rs.GetLastOutput() | ||
var reshardResponse vtctldatapb.WorkflowStatusResponse | ||
err := protojson.Unmarshal([]byte(reshardOutput), &reshardResponse) | ||
require.NoError(vc.t, err) | ||
return reshardResponse.CloneVT() | ||
} | ||
|
||
// helper functions | ||
|
||
func getWorkflow(targetKeyspace, workflow string) *vtctldatapb.GetWorkflowsResponse { | ||
workflowOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKeyspace, "show", "--workflow", workflow) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, it's more compact if we also pass |
||
require.NoError(vc.t, err) | ||
var workflowResponse vtctldatapb.GetWorkflowsResponse | ||
err = protojson.Unmarshal([]byte(workflowOutput), &workflowResponse) | ||
require.NoError(vc.t, err) | ||
workflowResponse.Workflows[0].MaxVReplicationTransactionLag = 0 | ||
workflowResponse.Workflows[0].MaxVReplicationLag = 0 | ||
Comment on lines
+320
to
+321
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another spot where this catches my eye. :-) |
||
return workflowResponse.CloneVT() | ||
} | ||
|
||
func checkTablesExist(t *testing.T, tabletAlias string, tables []string) bool { | ||
tablesResponse, err := vc.VtctldClient.ExecuteCommandWithOutput("GetSchema", tabletAlias, "--tables", strings.Join(tables, ","), "--table-names-only") | ||
require.NoError(t, err) | ||
|
@@ -211,6 +349,7 @@ func getRoutingRules(t *testing.T) *vschemapb.RoutingRules { | |
require.NoError(t, err) | ||
return &routingRulesResponse | ||
} | ||
|
||
func confirmNoRoutingRules(t *testing.T) { | ||
routingRulesResponse := getRoutingRules(t) | ||
require.Zero(t, len(routingRulesResponse.Rules)) | ||
|
@@ -223,7 +362,7 @@ func confirmRoutingRulesExist(t *testing.T) { | |
|
||
// We only want to validate non-standard attributes that are set by the CLI. The other end-to-end tests validate the rest. | ||
// We also check some of the standard attributes to make sure they are set correctly. | ||
func validateWorkflow1(t *testing.T, workflows []*vtctldatapb.Workflow) { | ||
func validateMoveTablesWorkflow(t *testing.T, workflows []*vtctldatapb.Workflow) { | ||
require.Equal(t, 1, len(workflows)) | ||
wf := workflows[0] | ||
require.Equal(t, "wf1", wf.Name) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,9 +74,10 @@ type moveTablesWorkflow struct { | |
tables string | ||
atomicCopy bool | ||
sourceShards string | ||
createFlags []string // currently only used by vtctld | ||
|
||
// currently only used by vtctld | ||
lastOutput string | ||
createFlags []string | ||
completeFlags []string | ||
switchFlags []string | ||
} | ||
|
@@ -270,7 +271,12 @@ type reshardWorkflow struct { | |
targetShards string | ||
skipSchemaCopy bool | ||
|
||
lastOutput string | ||
// currently only used by vtctld | ||
lastOutput string | ||
createFlags []string | ||
completeFlags []string | ||
cancelFlags []string | ||
switchFlags []string | ||
} | ||
|
||
type iReshard interface { | ||
|
@@ -379,8 +385,9 @@ func (v VtctldReshard) Flavor() string { | |
func (v VtctldReshard) exec(args ...string) { | ||
args2 := []string{"Reshard", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace} | ||
args2 = append(args2, args...) | ||
if err := vc.VtctldClient.ExecuteCommand(args2...); err != nil { | ||
v.vc.t.Fatalf("failed to create Reshard workflow: %v", err) | ||
var err error | ||
if v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...); err != nil { | ||
v.vc.t.Fatalf("failed to create Reshard workflow: %v: %s", err, v.lastOutput) | ||
} | ||
} | ||
|
||
|
@@ -395,20 +402,22 @@ func (v VtctldReshard) Create() { | |
if v.skipSchemaCopy { | ||
args = append(args, "--skip-schema-copy="+strconv.FormatBool(v.skipSchemaCopy)) | ||
} | ||
args = append(args, v.createFlags...) | ||
v.exec(args...) | ||
} | ||
|
||
func (v VtctldReshard) SwitchReadsAndWrites() { | ||
v.exec("SwitchTraffic") | ||
args := []string{"SwitchTraffic"} | ||
args = append(args, v.switchFlags...) | ||
v.exec(args...) | ||
} | ||
|
||
func (v VtctldReshard) ReverseReadsAndWrites() { | ||
v.exec("ReverseTraffic") | ||
} | ||
|
||
func (v VtctldReshard) Show() { | ||
//TODO implement me | ||
panic("implement me") | ||
v.exec("Show") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does Show not take any other flags? Same question for ReverseTraffic above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Show does not. Reverse does, but shares the same as MoveTables and they both have the same internal code path, which are separately tested in other e2e tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does take flags, FWIW:
We might want to test |
||
} | ||
|
||
func (v VtctldReshard) SwitchReads() { | ||
|
@@ -422,11 +431,15 @@ func (v VtctldReshard) SwitchWrites() { | |
} | ||
|
||
func (v VtctldReshard) Cancel() { | ||
v.exec("Cancel") | ||
args := []string{"Cancel"} | ||
args = append(args, v.cancelFlags...) | ||
v.exec(args...) | ||
} | ||
|
||
func (v VtctldReshard) Complete() { | ||
v.exec("Complete") | ||
args := []string{"Complete"} | ||
args = append(args, v.completeFlags...) | ||
v.exec(args...) | ||
} | ||
|
||
func (v VtctldReshard) GetLastOutput() string { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1672,7 +1672,11 @@ | |
log.Errorf("%w", err2) | ||
return nil, err | ||
} | ||
rs, err := s.buildResharder(ctx, keyspace, req.Workflow, req.SourceShards, req.TargetShards, strings.Join(cells, ","), "") | ||
tabletTypesStr := topoproto.MakeStringTypeCSV(req.TabletTypes) | ||
if req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the default if this is not specified? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All tablet types |
||
tabletTypesStr = discovery.InOrderHint + tabletTypesStr | ||
} | ||
rs, err := s.buildResharder(ctx, keyspace, req.Workflow, req.SourceShards, req.TargetShards, strings.Join(cells, ","), tabletTypesStr) | ||
if err != nil { | ||
return nil, vterrors.Wrap(err, "buildResharder") | ||
} | ||
|
@@ -1695,7 +1699,10 @@ | |
} else { | ||
log.Warningf("Streams will not be started since --auto-start is set to false") | ||
} | ||
return nil, nil | ||
return s.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{ | ||
Keyspace: keyspace, | ||
Workflow: req.Workflow, | ||
}) | ||
} | ||
|
||
// VDiffCreate is part of the vtctlservicepb.VtctldServer interface. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why we're doing this here. Worth a comment.