Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Handle multiple streams in UpdateVReplicationWorkflow RPC #14447

Merged
merged 5 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
continue
}
}
restartWorkflow(t, ksWorkflow)
vdiffSideBySide(t, ksWorkflow, "")
if dryRunResultSwitchReads != nil {
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", allCellNames, "rdonly,replica", "--dry-run")
Expand Down Expand Up @@ -1578,6 +1579,19 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
validateDryRunResults(t, output, dryRunResults)
}

// restartWorkflow confirms that a workflow can be successfully
// stopped and started.
func restartWorkflow(t *testing.T, ksWorkflow string) {
keyspace, workflow, found := strings.Cut(ksWorkflow, ".")
require.True(t, found, "unexpected ksWorkflow value: %s", ksWorkflow)
err := vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", keyspace, "stop", "--workflow", workflow)
require.NoError(t, err, "failed to stop workflow: %v", err)
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String())
err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", keyspace, "start", "--workflow", workflow)
require.NoError(t, err, "failed to start workflow: %v", err)
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
}

func printSwitchWritesExtraDebug(t *testing.T, ksWorkflow, msg string) {
// Temporary code: print lots of info for debugging occasional flaky failures in customer reshard in CI for multicell test
debug := true
Expand Down
6 changes: 4 additions & 2 deletions go/textutil/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"unicode"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/proto/binlogdata"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -94,7 +94,7 @@ func ValueIsSimulatedNull(val any) bool {
return cval == SimulatedNullString
case []string:
return len(cval) == 1 && cval[0] == sqltypes.NULL.String()
case binlogdata.OnDDLAction:
case binlogdatapb.OnDDLAction:
return int32(cval) == int32(SimulatedNullInt)
case int:
return cval == SimulatedNullInt
Expand All @@ -104,6 +104,8 @@ func ValueIsSimulatedNull(val any) bool {
return int64(cval) == int64(SimulatedNullInt)
case []topodatapb.TabletType:
return len(cval) == 1 && cval[0] == topodatapb.TabletType(SimulatedNullInt)
case binlogdatapb.VReplicationWorkflowState:
return int32(cval) == int32(SimulatedNullInt)
default:
return false
}
Expand Down
133 changes: 69 additions & 64 deletions go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ const (
sqlReadVReplicationWorkflow = "select id, source, pos, stop_pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, message, db_name, rows_copied, tags, time_heartbeat, workflow_type, time_throttled, component_throttled, workflow_sub_type, defer_secondary_keys from %s.vreplication where workflow = %a and db_name = %a"
// Delete VReplication records for the given workflow.
sqlDeleteVReplicationWorkflow = "delete from %s.vreplication where workflow = %a and db_name = %a"
// Retrieve the current configuration values for a workflow's vreplication stream.
// Retrieve the current configuration values for a workflow's vreplication stream(s).
sqlSelectVReplicationWorkflowConfig = "select id, source, cell, tablet_types, state, message from %s.vreplication where workflow = %a"
// Update the configuration values for a workflow's vreplication stream.
sqlUpdateVReplicationWorkflowConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a"
sqlUpdateVReplicationWorkflowStreamConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a"
)

func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
Expand Down Expand Up @@ -228,8 +228,8 @@ func (tm *TabletManager) ReadVReplicationWorkflow(ctx context.Context, req *tabl
}

// UpdateVReplicationWorkflow updates the sidecar databases's vreplication
// record for this tablet's vreplication workflow stream(s). If there
// is no stream for the given workflow on the tablet then a nil result
// record(s) for this tablet's vreplication workflow stream(s). If there
// are no streams for the given workflow on the tablet then a nil result
// is returned as this is expected e.g. on source tablets of a
// Reshard workflow (source and target are the same keyspace). The
// caller can consider this case an error if they choose to.
Expand Down Expand Up @@ -257,68 +257,73 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil}, nil
}

row := res.Named().Row()
id := row.AsInt64("id", 0)
cells := strings.Split(row.AsString("cell", ""), ",")
tabletTypes, inorder, err := discovery.ParseTabletTypesAndOrder(row.AsString("tablet_types", ""))
if err != nil {
return nil, err
}
bls := &binlogdatapb.BinlogSource{}
source := row.AsBytes("source", []byte{})
state := row.AsString("state", "")
message := row.AsString("message", "")
if req.State == binlogdatapb.VReplicationWorkflowState_Running && strings.ToUpper(message) == workflow.Frozen {
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil},
vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "cannot start a workflow when it is frozen")
}
// For the string based values, we use NULL to differentiate
// from an empty string. The NULL value indicates that we
// should keep the existing value.
if !textutil.ValueIsSimulatedNull(req.Cells) {
cells = req.Cells
}
if !textutil.ValueIsSimulatedNull(req.TabletTypes) {
tabletTypes = req.TabletTypes
}
tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes)
if inorder && req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_UNKNOWN ||
req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER {
tabletTypesStr = discovery.InOrderHint + tabletTypesStr
}
if err = prototext.Unmarshal(source, bls); err != nil {
return nil, err
}
// If we don't want to update the existing value then pass
// the simulated NULL value of -1.
if !textutil.ValueIsSimulatedNull(req.OnDdl) {
bls.OnDdl = req.OnDdl
}
source, err = prototext.Marshal(bls)
if err != nil {
return nil, err
}
if !textutil.ValueIsSimulatedNull(req.State) {
state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)]
}
bindVars = map[string]*querypb.BindVariable{
"st": sqltypes.StringBindVariable(state),
"sc": sqltypes.StringBindVariable(string(source)),
"cl": sqltypes.StringBindVariable(strings.Join(cells, ",")),
"tt": sqltypes.StringBindVariable(tabletTypesStr),
"id": sqltypes.Int64BindVariable(id),
}
parsed = sqlparser.BuildParsedQuery(sqlUpdateVReplicationWorkflowConfig, sidecar.GetIdentifier(), ":st", ":sc", ":cl", ":tt", ":id")
stmt, err = parsed.GenerateQuery(bindVars, nil)
if err != nil {
return nil, err
for _, row := range res.Named().Rows {
id := row.AsInt64("id", 0)
cells := strings.Split(row.AsString("cell", ""), ",")
tabletTypes, inorder, err := discovery.ParseTabletTypesAndOrder(row.AsString("tablet_types", ""))
if err != nil {
return nil, err
}
bls := &binlogdatapb.BinlogSource{}
source := row.AsBytes("source", []byte{})
state := row.AsString("state", "")
message := row.AsString("message", "")
if req.State == binlogdatapb.VReplicationWorkflowState_Running && strings.ToUpper(message) == workflow.Frozen {
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil},
vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "cannot start a workflow when it is frozen")
}
// For the string based values, we use NULL to differentiate
// from an empty string. The NULL value indicates that we
// should keep the existing value.
if !textutil.ValueIsSimulatedNull(req.Cells) {
cells = req.Cells
}
if !textutil.ValueIsSimulatedNull(req.TabletTypes) {
tabletTypes = req.TabletTypes
}
tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes)
if inorder && req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_UNKNOWN ||
req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER {
tabletTypesStr = discovery.InOrderHint + tabletTypesStr
}
if err = prototext.Unmarshal(source, bls); err != nil {
return nil, err
}
// If we don't want to update the existing value then pass
// the simulated NULL value of -1.
if !textutil.ValueIsSimulatedNull(req.OnDdl) {
bls.OnDdl = req.OnDdl
}
source, err = prototext.Marshal(bls)
if err != nil {
return nil, err
}
if !textutil.ValueIsSimulatedNull(req.State) {
state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)]
}
bindVars = map[string]*querypb.BindVariable{
"st": sqltypes.StringBindVariable(state),
"sc": sqltypes.StringBindVariable(string(source)),
"cl": sqltypes.StringBindVariable(strings.Join(cells, ",")),
"tt": sqltypes.StringBindVariable(tabletTypesStr),
"id": sqltypes.Int64BindVariable(id),
}
parsed = sqlparser.BuildParsedQuery(sqlUpdateVReplicationWorkflowStreamConfig, sidecar.GetIdentifier(), ":st", ":sc", ":cl", ":tt", ":id")
stmt, err = parsed.GenerateQuery(bindVars, nil)
if err != nil {
return nil, err
}
res, err = tm.VREngine.Exec(stmt)
if err != nil {
return nil, err
}
}
res, err = tm.VREngine.Exec(stmt)

if err != nil {
return nil, err
}
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{
Result: &querypb.QueryResult{
RowsAffected: uint64(len(res.Rows)),
},
}, nil
}

// VReplicationExec executes a vreplication command.
Expand Down
39 changes: 28 additions & 11 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,11 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
keyspace, shard)
selectRes := sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|cell|tablet_types",
"int64|varchar|varchar|varchar",
"id|source|cell|tablet_types|state|message",
"int64|varchar|varchar|varchar|varchar|varbinary",
),
fmt.Sprintf("%d|%s|%s|%s", vreplID, blsStr, cells[0], tabletTypes[0]),
fmt.Sprintf("%d|%s|%s|%s|Running|", vreplID, blsStr, cells[0], tabletTypes[0]),
fmt.Sprintf("%d|%s|%s|%s|Running|", vreplID+1, blsStr, cells[0], tabletTypes[0]),
)
idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where id = %a",
sqltypes.Int64BindVariable(int64(vreplID)))
Expand All @@ -504,62 +505,80 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
name: "update cells",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: []string{"zone2"},
// TabletTypes is an empty value, so the current value should be cleared
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '' where id in (%d)`,
keyspace, shard, "zone2", vreplID),
},
{
name: "update cells, NULL tablet_types",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: []string{"zone3"},
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, // So keep the current value of replica
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, "zone3", tabletTypes[0], vreplID),
},
{
name: "update tablet_types",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
TabletSelectionPreference: tabletmanagerdatapb.TabletSelectionPreference_INORDER,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA},
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '', tablet_types = '%s' where id in (%d)`,
keyspace, shard, "in_order:rdonly,replica", vreplID),
},
{
name: "update tablet_types, NULL cells",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: textutil.SimulatedNullStringSlice, // So keep the current value of zone1
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY},
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, cells[0], "rdonly", vreplID),
},
{
name: "update on_ddl",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
OnDdl: binlogdatapb.OnDDLAction_EXEC,
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't quite understand why the state has changed in all these tests.

Copy link
Contributor Author

@mattlord mattlord Nov 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of adding a test case for updating only the state...

I removed the hardcoded setting of it to stopped for each test case here: https://github.com/vitessio/vitess/pull/14447/files#diff-bd819322de3abbac526383516d0baf39644eeb28bfb3c79db46507d3cef2bbebL578

And added the state field to the test query results here with a more typical value of running (as the new test case stops it): https://github.com/vitessio/vitess/pull/14447/files#diff-bd819322de3abbac526383516d0baf39644eeb28bfb3c79db46507d3cef2bbebR482-R485

keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID),
},
{
name: "update cell,tablet_types,on_ddl",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: []string{"zone1", "zone2", "zone3"},
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_PRIMARY},
OnDdl: binlogdatapb.OnDDLAction_EXEC_IGNORE,
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID),
},
{
name: "update state",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState_Stopped,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
},
query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID),
},
}

for _, tt := range tests {
Expand All @@ -575,8 +594,6 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
require.NotNil(t, tt.request, "No request provided")
require.NotEqual(t, "", tt.query, "No expected query provided")

tt.request.State = binlogdatapb.VReplicationWorkflowState_Stopped

// These are the same for each RPC call.
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil)
Expand Down
Loading