Skip to content

Commit

Permalink
Add ut for etcd (pingcap#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored and WangXiangUSTC committed Jan 13, 2020
1 parent d51d2af commit 4f0b875
Show file tree
Hide file tree
Showing 36 changed files with 1,118 additions and 327 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ ErrWorkerExecDDLTimeout,[code=40066:class=dm-worker:scope=internal:level=high],"
ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high],"waiting for relay binlog pos to catch up with loader end binlog pos is timeout (exceeding %s), loader end binlog pos: %s, relay binlog pos: %s"
ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high],"relay log purger is purging, cannot start sub task %s, please try again later"
ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high],"host:port '%s' not valid"
ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high],"source of request does not match with source in worker"
ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium],"parse dm-tracer config flag set"
ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium],"config toml transform"
ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium],"'%s' is an invalid flag"
Expand Down
4 changes: 2 additions & 2 deletions dm/ctl/common/operate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
)

// OperateTask does operation on task
func OperateTask(op pb.TaskOp, name string, workers []string) (*pb.OperateTaskResponse, error) {
func OperateTask(op pb.TaskOp, name string, sources []string) (*pb.OperateTaskResponse, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli := MasterClient()
return cli.OperateTask(ctx, &pb.OperateTaskRequest{
Op: op,
Name: name,
Sources: workers,
Sources: sources,
})
}
6 changes: 3 additions & 3 deletions dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ func GetFileContent(fpath string) ([]byte, error) {
return content, nil
}

// GetWorkerArgs extracts workers from cmd
func GetWorkerArgs(cmd *cobra.Command) ([]string, error) {
return cmd.Flags().GetStringSlice("worker")
// GetSourceArgs extracts sources from cmd
func GetSourceArgs(cmd *cobra.Command) ([]string, error) {
return cmd.Flags().GetStringSlice("source")
}

// ExtractSQLsFromArgs extract multiple sql from args.
Expand Down
3 changes: 2 additions & 1 deletion dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewRootCmd() *cobra.Command {
Short: "DM control",
}
// --worker worker1 -w worker2 --worker=worker3,worker4 -w=worker5,worker6
cmd.PersistentFlags().StringSliceVarP(&commandMasterFlags.workers, "worker", "w", []string{}, "DM-worker ID")
cmd.PersistentFlags().StringSliceVarP(&commandMasterFlags.workers, "source", "s", []string{}, "MySQL Source ID")
cmd.AddCommand(
master.NewStartTaskCmd(),
master.NewStopTaskCmd(),
Expand All @@ -76,6 +76,7 @@ func NewRootCmd() *cobra.Command {
master.NewPurgeRelayCmd(),
master.NewMigrateRelayCmd(),
master.NewOperateMysqlWorkerCmd(),
master.NewOfflineWorkerCmd(),
)
return cmd
}
Expand Down
12 changes: 6 additions & 6 deletions dm/ctl/master/break_ddl_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// NewBreakDDLLockCmd creates a BreakDDLLock command
func NewBreakDDLLockCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "break-ddl-lock <-w worker ...> <task-name> [--remove-id] [--exec] [--skip]",
Use: "break-ddl-lock <-s source ...> <task-name> [--remove-id] [--exec] [--skip]",
Short: "forcefully break DM-worker's DDL lock",
Run: breakDDLLockFunc,
}
Expand All @@ -47,13 +47,13 @@ func breakDDLLockFunc(cmd *cobra.Command, _ []string) {
}
taskName := cmd.Flags().Arg(0)

workers, err := common.GetWorkerArgs(cmd)
sources, err := common.GetSourceArgs(cmd)
if err != nil {
fmt.Println(errors.ErrorStack(err))
return
}
if len(workers) == 0 {
fmt.Println("must specify at least one DM-worker (`-w` / `--worker`)")
if len(sources) == 0 {
fmt.Println("must specify at least one DM-worker (`-s` / `--source`)")
return
}

Expand Down Expand Up @@ -89,14 +89,14 @@ func breakDDLLockFunc(cmd *cobra.Command, _ []string) {
defer cancel()
cli := common.MasterClient()
resp, err := cli.BreakWorkerDDLLock(ctx, &pb.BreakWorkerDDLLockRequest{
Sources: workers,
Sources: sources,
Task: taskName,
RemoveLockID: removeLockID,
ExecDDL: exec,
SkipDDL: skip,
})
if err != nil {
common.PrintLines("can not break DDL lock (in workers %v):\n%s", workers, errors.ErrorStack(err))
common.PrintLines("can not break DDL lock (in sources %v):\n%s", sources, errors.ErrorStack(err))
return
}

Expand Down
6 changes: 3 additions & 3 deletions dm/ctl/master/migrate_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// NewMigrateRelayCmd creates a MigrateRelay command
func NewMigrateRelayCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "migrate-relay <worker> <binlogName> <binlogPos>",
Use: "migrate-relay <source> <binlogName> <binlogPos>",
Short: "migrate DM-worker's relay unit",
Run: migrateRelayFunc,
}
Expand All @@ -45,7 +45,7 @@ func migrateRelayFunc(cmd *cobra.Command, _ []string) {
return
}

worker := cmd.Flags().Arg(0)
source := cmd.Flags().Arg(0)
binlogName := cmd.Flags().Arg(1)
binlogPos, err := strconv.Atoi(cmd.Flags().Arg(2))
if err != nil {
Expand All @@ -59,7 +59,7 @@ func migrateRelayFunc(cmd *cobra.Command, _ []string) {
resp, err := cli.MigrateWorkerRelay(ctx, &pb.MigrateWorkerRelayRequest{
BinlogName: binlogName,
BinlogPos: uint32(binlogPos),
Source: worker,
Source: source,
})
if err != nil {
log.L().Error("can not migrate relay", zap.Error(err))
Expand Down
60 changes: 60 additions & 0 deletions dm/ctl/master/offline_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package master

import (
"context"
"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/errors"
"github.com/spf13/cobra"
"os"
)

// NewOfflineWorkerCmd creates an OfflineWorker command
func NewOfflineWorkerCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "offline-worker <name> <address>",
Short: "offline worker which has been closed",
Run: offlineWorkerFunc,
}
return cmd
}

// offlineWorkerFunc does migrate relay request
func offlineWorkerFunc(cmd *cobra.Command, _ []string) {
if len(cmd.Flags().Args()) != 2 {
cmd.SetOut(os.Stdout)
cmd.Usage()
return
}

name := cmd.Flags().Arg(0)
addr := cmd.Flags().Arg(1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cli := common.MasterClient()
resp, err := cli.OfflineWorker(ctx, &pb.OfflineWorkerRequest{
Name: name,
Address: addr,
})
if err != nil {
common.PrintLines("offline worker failed, error:\n%v", errors.ErrorStack(err))
return
}
if !resp.Result {
common.PrintLines("offline worker failed:\n%v", resp.Msg)
}
}
7 changes: 3 additions & 4 deletions dm/ctl/master/operate_mysql_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package master

import (
"context"
"github.com/pingcap/dm/checker"
"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -70,8 +69,8 @@ func operateMysqlWorkerFunc(cmd *cobra.Command, _ []string) {
common.PrintLines("can not update task:\n%v", errors.ErrorStack(err))
return
}
if !common.PrettyPrintResponseWithCheckTask(resp, checker.ErrorMsgHeader) {
common.PrettyPrintResponse(resp)
}

if !resp.Result {
common.PrintLines("operate worker failed:\n%v", resp.Msg)
}
}
10 changes: 5 additions & 5 deletions dm/ctl/master/pause_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// NewPauseRelayCmd creates a PauseRelay command
func NewPauseRelayCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "pause-relay <-w worker ...>",
Use: "pause-relay <-s source ...>",
Short: "pause DM-worker's relay unit",
Run: pauseRelayFunc,
}
Expand All @@ -42,17 +42,17 @@ func pauseRelayFunc(cmd *cobra.Command, _ []string) {
return
}

workers, err := common.GetWorkerArgs(cmd)
sources, err := common.GetSourceArgs(cmd)
if err != nil {
common.PrintLines("%s", errors.ErrorStack(err))
return
}
if len(workers) == 0 {
fmt.Println("must specify at least one DM-worker (`-w` / `--worker`)")
if len(sources) == 0 {
fmt.Println("must specify at least one DM-worker (`-s` / `--source`)")
return
}

resp, err := common.OperateRelay(pb.RelayOp_PauseRelay, workers)
resp, err := common.OperateRelay(pb.RelayOp_PauseRelay, sources)
if err != nil {
common.PrintLines("can not pause relay unit:\n%v", errors.ErrorStack(err))
return
Expand Down
6 changes: 3 additions & 3 deletions dm/ctl/master/pause_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// NewPauseTaskCmd creates a PauseTask command
func NewPauseTaskCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "pause-task [-w worker ...] <task-name>",
Use: "pause-task [-s source ...] <task-name>",
Short: "pause a specified running task",
Run: pauseTaskFunc,
}
Expand All @@ -42,13 +42,13 @@ func pauseTaskFunc(cmd *cobra.Command, _ []string) {
}
name := cmd.Flags().Arg(0)

workers, err := common.GetWorkerArgs(cmd)
sources, err := common.GetSourceArgs(cmd)
if err != nil {
common.PrintLines("%s", errors.ErrorStack(err))
return
}

resp, err := common.OperateTask(pb.TaskOp_Pause, name, workers)
resp, err := common.OperateTask(pb.TaskOp_Pause, name, sources)
if err != nil {
common.PrintLines("can not pause task %s:\n%v", name, errors.ErrorStack(err))
return
Expand Down
12 changes: 6 additions & 6 deletions dm/ctl/master/purge_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewPurgeRelayCmd() *cobra.Command {
cmd := &cobra.Command{
//Use: "purge-relay <-w worker> [--inactive] [--time] [--filename] [--sub-dir]",
//Short: "purge dm-worker's relay log files, choose 1 of 2 methods",
Use: "purge-relay <-w worker> [--filename] [--sub-dir]",
Use: "purge-relay <-s source> [--filename] [--sub-dir]",
Short: "purge relay log files of the DM-worker according to the specified filename",
Run: purgeRelayFunc,
}
Expand All @@ -61,13 +61,13 @@ func purgeRelayFunc(cmd *cobra.Command, _ []string) {
return
}

workers, err := common.GetWorkerArgs(cmd)
sources, err := common.GetSourceArgs(cmd)
if err != nil {
fmt.Println(errors.ErrorStack(err))
return
}
if len(workers) == 0 {
fmt.Println("must specify at least one DM-worker (`-w` / `--worker`)")
if len(sources) == 0 {
fmt.Println("must specify at least one DM-worker (`-s` / `--source`)")
return
}

Expand Down Expand Up @@ -122,7 +122,7 @@ func purgeRelayFunc(cmd *cobra.Command, _ []string) {
// }
//}

if len(filename) > 0 && len(workers) > 1 {
if len(filename) > 0 && len(sources) > 1 {
fmt.Println("for --filename, can only specify one DM-worker per time")
return
}
Expand All @@ -138,7 +138,7 @@ func purgeRelayFunc(cmd *cobra.Command, _ []string) {
cli := common.MasterClient()

resp, err := cli.PurgeWorkerRelay(ctx, &pb.PurgeWorkerRelayRequest{
Sources: workers,
Sources: sources,
//Inactive: inactive,
//Time: time2.Unix(),
Filename: filename,
Expand Down
10 changes: 5 additions & 5 deletions dm/ctl/master/query_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// NewQueryErrorCmd creates a QueryError command
func NewQueryErrorCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "query-error [-w worker ...] [task-name]",
Use: "query-error [-s source ...] [task-name]",
Short: "query task error",
Run: queryErrorFunc,
}
Expand All @@ -43,7 +43,7 @@ func queryErrorFunc(cmd *cobra.Command, _ []string) {
}
taskName := cmd.Flags().Arg(0) // maybe empty

workers, err := common.GetWorkerArgs(cmd)
sources, err := common.GetSourceArgs(cmd)
if err != nil {
common.PrintLines("%s", errors.ErrorStack(err))
return
Expand All @@ -54,15 +54,15 @@ func queryErrorFunc(cmd *cobra.Command, _ []string) {
cli := common.MasterClient()
resp, err := cli.QueryError(ctx, &pb.QueryErrorListRequest{
Name: taskName,
Sources: workers,
Sources: sources,
})
if err != nil {
common.PrintLines("dmctl query error failed")
if taskName != "" {
common.PrintLines("taskname: %s", taskName)
}
if len(workers) > 0 {
common.PrintLines("workers: %v", workers)
if len(sources) > 0 {
common.PrintLines("sources: %v", sources)
}
common.PrintLines("error: %s", errors.ErrorStack(err))
return
Expand Down
10 changes: 5 additions & 5 deletions dm/ctl/master/query_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type taskInfo struct {
// NewQueryStatusCmd creates a QueryStatus command
func NewQueryStatusCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "query-status [-w worker ...] [task-name]",
Use: "query-status [-s source ...] [task-name]",
Short: "query task status",
Run: queryStatusFunc,
}
Expand All @@ -58,7 +58,7 @@ func queryStatusFunc(cmd *cobra.Command, _ []string) {
}
taskName := cmd.Flags().Arg(0) // maybe empty

workers, err := common.GetWorkerArgs(cmd)
sources, err := common.GetSourceArgs(cmd)
if err != nil {
common.PrintLines("%s", errors.ErrorStack(err))
return
Expand All @@ -69,14 +69,14 @@ func queryStatusFunc(cmd *cobra.Command, _ []string) {
defer cancel()
resp, err := cli.QueryStatus(ctx, &pb.QueryStatusListRequest{
Name: taskName,
Sources: workers,
Sources: sources,
})
if err != nil {
common.PrintLines("can not query %s task's status(in workers %v):\n%s", taskName, workers, errors.ErrorStack(err))
common.PrintLines("can not query %s task's status(in sources %v):\n%s", taskName, sources, errors.ErrorStack(err))
return
}

if resp.Result && taskName == "" && len(workers) == 0 {
if resp.Result && taskName == "" && len(sources) == 0 {
result := wrapTaskResult(resp)
common.PrettyPrintInterface(result)
} else {
Expand Down
Loading

0 comments on commit 4f0b875

Please sign in to comment.