Skip to content

Commit

Permalink
Add DeleteVRWorkflow RPC
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed May 2, 2023
1 parent 7542015 commit 16fe219
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 11 deletions.
22 changes: 11 additions & 11 deletions go/cmd/vtctldclient/command/movetables.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ See the --help output for each command for more details.`,
Args: cobra.ExactArgs(1),
}

// MoveTablesCancel makes a MoveTablesCancel gRPC call to a vtctld.
MoveTablesCancel = &cobra.Command{
Use: "Cancel",
Short: "Cancel a MoveTables VReplication workflow",
Example: `vtctldclient --server=localhost:15999 MoveTables --workflow "commerce2customer" --target-keyspace "customer" Cancel`,
DisableFlagsInUseLine: true,
Aliases: []string{"cancel"},
Args: cobra.NoArgs,
RunE: commandMoveTablesCancel,
}

// MoveTablesCreate makes a MoveTablesCreate gRPC call to a vtctld.
MoveTablesCreate = &cobra.Command{
Use: "Create",
Expand Down Expand Up @@ -82,17 +93,6 @@ See the --help output for each command for more details.`,
},
RunE: commandMoveTablesCreate,
}

// MoveTablesCancel makes a MoveTablesCancel gRPC call to a vtctld.
MoveTablesCancel = &cobra.Command{
Use: "Cancel",
Short: "Cancel a MoveTables VReplication workflow",
Example: `vtctldclient --server=localhost:15999 MoveTables --workflow "commerce2customer" --target-keyspace "customer" Cancel`,
DisableFlagsInUseLine: true,
Aliases: []string{"cancel"},
Args: cobra.NoArgs,
RunE: commandMoveTablesCancel,
}
)

var (
Expand Down
35 changes: 35 additions & 0 deletions go/cmd/vtctldclient/command/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ var (
RunE: commandGetWorkflows,
}

// WorkflowDelete makes a WorkflowDelete gRPC call to a vtctld.
WorkflowDelete = &cobra.Command{
Use: "delete",
Short: "Delete a VReplication workflow",
Example: `vtctldclient --server=localhost:15999 workflow --keyspace=customer delete --workflow=commerce2customer"`,
DisableFlagsInUseLine: true,
Aliases: []string{"Delete"},
Args: cobra.NoArgs,
RunE: commandMoveTablesCancel,
}

// WorkflowUpdate makes a WorkflowUpdate gRPC call to a vtctld.
WorkflowUpdate = &cobra.Command{
Use: "update",
Expand Down Expand Up @@ -136,6 +147,28 @@ var (
}{}
)

func commandWorkflowDelete(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.WorkflowDeleteRequest{
Keyspace: moveTablesOptions.TargetKeyspace,
Workflow: moveTablesOptions.Workflow,
}
resp, err := client.WorkflowDelete(commandCtx, req)
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp)
if err != nil {
return err
}

fmt.Printf("%s\n", data)

return nil
}

func commandWorkflowUpdate(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

Expand Down Expand Up @@ -190,4 +223,6 @@ func init() {
WorkflowUpdate.Flags().StringSliceVarP(&workflowUpdateOptions.TabletTypes, "tablet-types", "t", nil, "New source tablet types to replicate from (e.g. PRIMARY,REPLICA,RDONLY)")
WorkflowUpdate.Flags().StringVar(&workflowUpdateOptions.OnDDL, "on-ddl", "", "New instruction on what to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE")
Workflow.AddCommand(WorkflowUpdate)

Workflow.AddCommand(WorkflowDelete)
}
4 changes: 4 additions & 0 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,10 @@ func (itmc *internalTabletManagerClient) CreateVRWorkflow(context.Context, *topo
return nil, fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) DeleteVRWorkflow(context.Context, *topodatapb.Tablet, *tabletmanagerdatapb.DeleteVRWorkflowRequest) (*tabletmanagerdatapb.DeleteVRWorkflowResponse, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) VReplicationExec(context.Context, *topodatapb.Tablet, string) (*querypb.QueryResult, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4372,6 +4372,20 @@ func (s *VtctldServer) ValidateVSchema(ctx context.Context, req *vtctldatapb.Val
return resp, err
}

// WorkflowDelete is part of the vtctlservicepb.VtctldServer interface.
func (s *VtctldServer) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDeleteRequest) (resp *vtctldatapb.WorkflowDeleteResponse, err error) {
span, ctx := trace.NewSpan(ctx, "VtctldServer.WorkflowDelete")
defer span.Finish()

defer panicHandler(&err)

span.Annotate("keyspace", req.Keyspace)
span.Annotate("workflow", req.Workflow)

resp, err = s.ws.WorkflowDelete(ctx, req)
return resp, err
}

// WorkflowUpdate is part of the vtctlservicepb.VtctldServer interface.
func (s *VtctldServer) WorkflowUpdate(ctx context.Context, req *vtctldatapb.WorkflowUpdateRequest) (resp *vtctldatapb.WorkflowUpdateResponse, err error) {
span, ctx := trace.NewSpan(ctx, "VtctldServer.WorkflowUpdate")
Expand Down
44 changes: 44 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,50 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}

// WorkflowCreate is part of the vtctlservicepb.VtctldServer interface.
// It passes on the request to the target primary tablets that are
// participating in the given workflow.
func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDeleteRequest) (*vtctldatapb.WorkflowDeleteResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.WorkflowDelete")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
span.Annotate("workflow", req.Workflow)

vx := vexec.NewVExec(req.Keyspace, req.Workflow, s.ts, s.tmc)
callback := func(ctx context.Context, tablet *topo.TabletInfo) (*querypb.QueryResult, error) {
res, err := s.tmc.DeleteVRWorkflow(ctx, tablet.Tablet, nil)
if err != nil {
return nil, err
}
return res.Result, err
}
res, err := vx.CallbackContext(ctx, callback)
if err != nil {
if topo.IsErrType(err, topo.NoNode) {
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.Keyspace)
}
return nil, err
}

if len(res) == 0 {
return nil, fmt.Errorf("the %s workflow does not exist in the %s keyspace", req.Workflow, req.Keyspace)
}

response := &vtctldatapb.WorkflowDeleteResponse{}
response.Summary = fmt.Sprintf("Successfully updated the %s workflow on (%d) target primary tablets in the %s keyspace", req.Workflow, len(res), req.Keyspace)
details := make([]*vtctldatapb.WorkflowDeleteResponse_TabletInfo, 0, len(res))
for tinfo, tres := range res {
result := &vtctldatapb.WorkflowDeleteResponse_TabletInfo{
Tablet: fmt.Sprintf("%s-%d (%s/%s)", tinfo.Alias.Cell, tinfo.Alias.Uid, tinfo.Keyspace, tinfo.Shard),
Deleted: tres.RowsAffected > 0, // Can be more than one with shard merges
}
details = append(details, result)
}
response.Details = details
return response, nil
}

// WorkflowUpdate is part of the vtctlservicepb.VtctldServer interface.
// It passes the embedded TabletRequest object to the given keyspace's
// target primary tablets that are participating in the given workflow.
Expand Down
13 changes: 13 additions & 0 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,19 @@ func (client *Client) CreateVRWorkflow(ctx context.Context, tablet *topodatapb.T
return response, nil
}

func (client *Client) DeleteVRWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.DeleteVRWorkflowRequest) (*tabletmanagerdatapb.DeleteVRWorkflowResponse, error) {
c, closer, err := client.dialer.dial(ctx, tablet)
if err != nil {
return nil, err
}
defer closer.Close()
response, err := c.DeleteVRWorkflow(ctx, request)
if err != nil {
return nil, err
}
return response, nil
}

// VReplicationExec is part of the tmclient.TabletManagerClient interface.
func (client *Client) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
c, closer, err := client.dialer.dial(ctx, tablet)
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vttablet/grpctmserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,13 @@ func (s *server) CreateVRWorkflow(ctx context.Context, request *tabletmanagerdat
return s.tm.CreateVRWorkflow(ctx, request)
}

func (s *server) DeleteVRWorkflow(ctx context.Context, request *tabletmanagerdatapb.DeleteVRWorkflowRequest) (response *tabletmanagerdatapb.DeleteVRWorkflowResponse, err error) {
defer s.tm.HandleRPCPanic(ctx, "CreateVRWorkflow", request, response, true /*verbose*/, &err)
ctx = callinfo.GRPCCallInfo(ctx)
response = &tabletmanagerdatapb.DeleteVRWorkflowResponse{}
return s.tm.DeleteVRWorkflow(ctx, request)
}

func (s *server) VReplicationExec(ctx context.Context, request *tabletmanagerdatapb.VReplicationExecRequest) (response *tabletmanagerdatapb.VReplicationExecResponse, err error) {
defer s.tm.HandleRPCPanic(ctx, "VReplicationExec", request, response, true /*verbose*/, &err)
ctx = callinfo.GRPCCallInfo(ctx)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/rpc_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type RPCTM interface {

// VReplication API
CreateVRWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVRWorkflowRequest) (*tabletmanagerdatapb.CreateVRWorkflowResponse, error)
DeleteVRWorkflow(ctx context.Context, req *tabletmanagerdatapb.DeleteVRWorkflowRequest) (*tabletmanagerdatapb.DeleteVRWorkflowResponse, error)
VReplicationExec(ctx context.Context, query string) (*querypb.QueryResult, error)
VReplicationWaitForPos(ctx context.Context, id int32, pos string) error
UpdateVRWorkflow(ctx context.Context, req *tabletmanagerdatapb.UpdateVRWorkflowRequest) (*tabletmanagerdatapb.UpdateVRWorkflowResponse, error)
Expand Down
24 changes: 24 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
const (
// Create a new MoveTables VReplication workflow record.
sqlCreateVRWorkflow = "insert into %s.vreplication(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys) values (%a, %a, '', 0, 0, %a, %a, now(), 0, %a, %a, %a, %a, %a)"
// Delete vreplication records for the given workflow.
sqlDeleteVRWorkflow = "delete from %s.vreplication where workflow = %a and db_name = %a"
// Retrieve the current configuration values for a workflow's vreplication stream.
sqlSelectVRWorkflowConfig = "select id, source, cell, tablet_types from %s.vreplication where workflow = %a"
// Update the configuration values for a workflow's vreplication stream.
Expand Down Expand Up @@ -80,6 +82,28 @@ func (tm *TabletManager) CreateVRWorkflow(ctx context.Context, req *tabletmanage
return &tabletmanagerdatapb.CreateVRWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
}

func (tm *TabletManager) DeleteVRWorkflow(ctx context.Context, req *tabletmanagerdatapb.DeleteVRWorkflowRequest) (*tabletmanagerdatapb.DeleteVRWorkflowResponse, error) {
res := &sqltypes.Result{}
bindVars := map[string]*querypb.BindVariable{
"wf": sqltypes.StringBindVariable(req.Workflow),
"db": sqltypes.StringBindVariable(tm.DBConfigs.DBName),
}
parsed := sqlparser.BuildParsedQuery(sqlDeleteVRWorkflow, sidecardb.GetIdentifier(), ":wf", ":db")
stmt, err := parsed.GenerateQuery(bindVars, nil)
if err != nil {
return nil, err
}
log.Errorf("DeleteVRWorkflow SQL: %s", stmt)
streamres, err := tm.VREngine.Exec(stmt)

if err != nil {
return nil, err
}
res.RowsAffected += streamres.RowsAffected

return &tabletmanagerdatapb.DeleteVRWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
}

// UpdateVRWorkflow updates the sidecar databases's vreplication
// record for this tablet's vreplication workflow stream(s). If there
// is no stream for the given workflow on the tablet then a nil result
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tmclient/rpc_client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type TabletManagerClient interface {
//

CreateVRWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVRWorkflowRequest) (*tabletmanagerdatapb.CreateVRWorkflowResponse, error)
DeleteVRWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.DeleteVRWorkflowRequest) (*tabletmanagerdatapb.DeleteVRWorkflowResponse, error)
UpdateVRWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.UpdateVRWorkflowRequest) (*tabletmanagerdatapb.UpdateVRWorkflowResponse, error)
// VReplicationExec executes a VReplication command
VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tmrpctest/test_tm_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (fra *fakeRPCTM) CreateVRWorkflow(ctx context.Context, req *tabletmanagerda
panic("implement me")
}

func (fra *fakeRPCTM) DeleteVRWorkflow(ctx context.Context, req *tabletmanagerdatapb.DeleteVRWorkflowRequest) (*tabletmanagerdatapb.DeleteVRWorkflowResponse, error) {
//TODO implement me
panic("implement me")
}

func (fra *fakeRPCTM) UpdateVRWorkflow(ctx context.Context, req *tabletmanagerdatapb.UpdateVRWorkflowRequest) (*tabletmanagerdatapb.UpdateVRWorkflowResponse, error) {
//TODO implement me
panic("implement me")
Expand Down

0 comments on commit 16fe219

Please sign in to comment.