Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] VReplication Workflow: set state correctly when restarting workflow streams in the copy phase (#16217) #16222

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import (
"context"
"fmt"
"strings"

"google.golang.org/protobuf/encoding/prototext"
Expand Down Expand Up @@ -49,6 +50,8 @@
sqlSelectVReplicationWorkflowConfig = "select id, source, cell, tablet_types, state, message from %s.vreplication where workflow = %a"
// Update the configuration values for a workflow's vreplication stream.
sqlUpdateVReplicationWorkflowStreamConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a"
// Check if workflow is still copying.
sqlGetVReplicationCopyStatus = "select distinct vrepl_id from %s.copy_state where vrepl_id = %d"
)

func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
Expand Down Expand Up @@ -227,6 +230,18 @@
return resp, nil
}

func isStreamCopying(tm *TabletManager, id int64) (bool, error) {
query := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), id)
res, err := tm.VREngine.Exec(query)
if err != nil {
return false, err

Check warning on line 237 in go/vt/vttablet/tabletmanager/rpc_vreplication.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/rpc_vreplication.go#L237

Added line #L237 was not covered by tests
}
if res != nil && len(res.Rows) > 0 {
return true, nil
}
return false, nil
}

// UpdateVReplicationWorkflow updates the sidecar databases's vreplication
// record(s) for this tablet's vreplication workflow stream(s). If there
// are no streams for the given workflow on the tablet then a nil result
Expand Down Expand Up @@ -302,6 +317,17 @@
if !textutil.ValueIsSimulatedNull(req.State) {
state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)]
}
if state == binlogdatapb.VReplicationWorkflowState_Running.String() {
// `Workflow Start` sets the new state to Running. However, if stream is still copying tables, we should set
// the state as Copying.
isCopying, err := isStreamCopying(tm, id)
if err != nil {
return nil, err

Check warning on line 325 in go/vt/vttablet/tabletmanager/rpc_vreplication.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/rpc_vreplication.go#L325

Added line #L325 was not covered by tests
}
if isCopying {
state = binlogdatapb.VReplicationWorkflowState_Copying.String()
}
}
bindVars = map[string]*querypb.BindVariable{
"st": sqltypes.StringBindVariable(state),
"sc": sqltypes.StringBindVariable(string(source)),
Expand Down
38 changes: 34 additions & 4 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,18 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
),
fmt.Sprintf("%d", vreplID),
)

getCopyStateQuery := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), int64(vreplID))
copyStatusFields := sqltypes.MakeTestFields(
"id",
"int64",
)
notCopying := sqltypes.MakeTestResult(copyStatusFields)
copying := sqltypes.MakeTestResult(copyStatusFields, "1")
tests := []struct {
name string
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
query string
name string
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
query string
isCopying bool
}{
{
name: "update cells",
Expand Down Expand Up @@ -579,6 +586,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID),
},
{
name: "update to running while copying",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState_Running,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
},
isCopying: true,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Copying', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, cells[0], tabletTypes[0], vreplID),
},
}

for _, tt := range tests {
Expand All @@ -597,6 +617,16 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
// These are the same for each RPC call.
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil)
if tt.request.State == binlogdatapb.VReplicationWorkflowState_Running ||
tt.request.State == binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt) {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil)
if tt.isCopying {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, copying, nil)
} else {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, notCopying, nil)

}
}
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(idQuery, idRes, nil)

Expand Down
Loading