From 9ef56470af1126b3f869c322e7e0aca55e632bf6 Mon Sep 17 00:00:00 2001 From: lucklove Date: Thu, 25 Feb 2021 12:38:51 +0800 Subject: [PATCH 1/5] Support replay command to enhance checkpoint --- components/cluster/command/replay.go | 53 ++++++++++++++++++++++ components/cluster/command/root.go | 3 +- components/dm/command/replay.go | 53 ++++++++++++++++++++++ components/dm/command/root.go | 3 +- pkg/checkpoint/checkpoint.go | 7 ++- pkg/cluster/audit/audit.go | 57 ++++++++++++++++++------ pkg/cluster/operation/operation.go | 1 - pkg/logger/audit.go | 3 +- tests/tiup-cluster/script/cmd_subtest.sh | 2 +- 9 files changed, 161 insertions(+), 21 deletions(-) create mode 100644 components/cluster/command/replay.go create mode 100644 components/dm/command/replay.go diff --git a/components/cluster/command/replay.go b/components/cluster/command/replay.go new file mode 100644 index 0000000000..a9d9409649 --- /dev/null +++ b/components/cluster/command/replay.go @@ -0,0 +1,53 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "path" + + "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/checkpoint" + "github.com/pingcap/tiup/pkg/cluster/audit" + "github.com/pingcap/tiup/pkg/cluster/spec" + "github.com/spf13/cobra" +) + +func newReplayCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "replay ", + Short: "Replay previous operation and skip successed steps", + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) != 1 { + return cmd.Help() + } + + file := path.Join(spec.AuditDir(), args[0]) + if !checkpoint.HasCheckPoint() { + if err := checkpoint.SetCheckPoint(file); err != nil { + return errors.Annotate(err, "set checkpoint failed") + } + } + + args, err := audit.CommandArgs(file) + if err != nil { + return errors.Annotate(err, "read audit log failed") + } + + rootCmd.SetArgs(args[1:]) + return rootCmd.Execute() + }, + } + + return cmd +} diff --git a/components/cluster/command/root.go b/components/cluster/command/root.go index eed6d6ffd7..6fa46e6384 100644 --- a/components/cluster/command/root.go +++ b/components/cluster/command/root.go @@ -145,9 +145,7 @@ func init() { rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'") rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "(EXPERIMENTAL) Use the native SSH client installed on local system instead of the build-in one.") rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "(EXPERIMENTAL) The executor type: 'builtin', 'system', 'none'.") - rootCmd.PersistentFlags().StringVar(&gOpt.CheckPoint, "checkpoint", "", "(EXPERIMENTAL) The audit log ID this command should recover from.") _ = rootCmd.PersistentFlags().MarkHidden("native-ssh") - _ = rootCmd.PersistentFlags().MarkHidden("checkpoint") rootCmd.AddCommand( newCheckCmd(), @@ -176,6 +174,7 @@ func init() { newPushCmd(), newTestCmd(), // hidden command for test internally newTelemetryCmd(), + newReplayCmd(), ) } diff --git a/components/dm/command/replay.go b/components/dm/command/replay.go new file mode 100644 index 0000000000..a9d9409649 --- /dev/null +++ b/components/dm/command/replay.go @@ -0,0 +1,53 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "path" + + "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/checkpoint" + "github.com/pingcap/tiup/pkg/cluster/audit" + "github.com/pingcap/tiup/pkg/cluster/spec" + "github.com/spf13/cobra" +) + +func newReplayCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "replay ", + Short: "Replay previous operation and skip successed steps", + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) != 1 { + return cmd.Help() + } + + file := path.Join(spec.AuditDir(), args[0]) + if !checkpoint.HasCheckPoint() { + if err := checkpoint.SetCheckPoint(file); err != nil { + return errors.Annotate(err, "set checkpoint failed") + } + } + + args, err := audit.CommandArgs(file) + if err != nil { + return errors.Annotate(err, "read audit log failed") + } + + rootCmd.SetArgs(args[1:]) + return rootCmd.Execute() + }, + } + + return cmd +} diff --git a/components/dm/command/root.go b/components/dm/command/root.go index bc935c672e..d9e78b396a 100644 --- a/components/dm/command/root.go +++ b/components/dm/command/root.go @@ -119,9 +119,7 @@ please backup your data before process.`, rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'") rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "Use the SSH client installed on local system instead of the build-in one.") rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "The executor type: 'builtin', 'system', 'none'") - rootCmd.PersistentFlags().StringVar(&gOpt.CheckPoint, "checkpoint", "", "(EXPERIMENTAL) The audit log ID this command should recover from.") _ = rootCmd.PersistentFlags().MarkHidden("native-ssh") - _ = rootCmd.PersistentFlags().MarkHidden("checkpoint") rootCmd.AddCommand( newDeployCmd(), @@ -143,6 +141,7 @@ please backup your data before process.`, newImportCmd(), newEnableCmd(), newDisableCmd(), + newReplayCmd(), ) } diff --git a/pkg/checkpoint/checkpoint.go b/pkg/checkpoint/checkpoint.go index ac5ceac02b..c62a17fd23 100644 --- a/pkg/checkpoint/checkpoint.go +++ b/pkg/checkpoint/checkpoint.go @@ -47,7 +47,7 @@ var ( DebugCheckpoint = os.Getenv("DEBUG_CHECKPOINT") == "1" ) -// SetCheckPoint set global checkpoint for executor +// SetCheckPoint set global checkpoint for replay func SetCheckPoint(file string) error { pointReader, err := os.Open(file) if err != nil { @@ -63,6 +63,11 @@ func SetCheckPoint(file string) error { return nil } +// HasCheckPoint returns if SetCheckPoint has been called +func HasCheckPoint() bool { + return checkpoint != nil +} + // Acquire wraps CheckPoint.Acquire func Acquire(ctx context.Context, point map[string]interface{}) *Point { if ctx.Value(goroutineKey) == nil || ctx.Value(semKey) == nil { diff --git a/pkg/cluster/audit/audit.go b/pkg/cluster/audit/audit.go index 575254855b..1bea79235d 100644 --- a/pkg/cluster/audit/audit.go +++ b/pkg/cluster/audit/audit.go @@ -16,6 +16,7 @@ package audit import ( "bufio" "fmt" + "net/url" "os" "path/filepath" "sort" @@ -30,22 +31,51 @@ import ( "github.com/pingcap/tiup/pkg/utils/rand" ) -// ShowAuditList show the audit list. -func ShowAuditList(dir string) error { - firstLine := func(fileName string) (string, error) { - file, err := os.Open(filepath.Join(dir, fileName)) - if err != nil { - return "", errors.Trace(err) - } - defer file.Close() +// CommandArgs returns the original commands from the first line of a file +func CommandArgs(fp string) ([]string, error) { + file, err := os.Open(fp) + if err != nil { + return nil, errors.Trace(err) + } + defer file.Close() + + scanner := bufio.NewScanner(file) + if !scanner.Scan() { + return nil, errors.New("unknown audit log format") + } + + args := strings.Split(scanner.Text(), " ") + return DecodeCommandArgs(args) +} + +// EncodeCommandArgs encode args with url.QueryEscape +func EncodeCommandArgs(args []string) []string { + encoded := []string{} - scanner := bufio.NewScanner(file) - if scanner.Scan() { - return scanner.Text(), nil + for _, arg := range args { + encoded = append(encoded, url.QueryEscape(arg)) + } + + return encoded +} + +// DecodeCommandArgs decode args with url.QueryUnescape +func DecodeCommandArgs(args []string) ([]string, error) { + decoded := []string{} + + for _, arg := range args { + a, err := url.QueryUnescape(arg) + if err != nil { + return nil, errors.Annotate(err, "failed on decode the command line of audit log") } - return "", errors.New("unknown audit log format") + decoded = append(decoded, a) } + return decoded, nil +} + +// ShowAuditList show the audit list. +func ShowAuditList(dir string) error { // Header clusterTable := [][]string{{"ID", "Time", "Command"}} fileInfos, err := os.ReadDir(dir) @@ -60,10 +90,11 @@ func ShowAuditList(dir string) error { if err != nil { continue } - cmd, err := firstLine(fi.Name()) + args, err := CommandArgs(filepath.Join(dir, fi.Name())) if err != nil { continue } + cmd := strings.Join(args, " ") clusterTable = append(clusterTable, []string{ fi.Name(), t.Format(time.RFC3339), diff --git a/pkg/cluster/operation/operation.go b/pkg/cluster/operation/operation.go index cfaab27f52..01c96b0239 100644 --- a/pkg/cluster/operation/operation.go +++ b/pkg/cluster/operation/operation.go @@ -29,7 +29,6 @@ type Options struct { SSHTimeout uint64 // timeout in seconds when connecting an SSH server OptTimeout uint64 // timeout in seconds for operations that support it, not to confuse with SSH timeout APITimeout uint64 // timeout in seconds for API operations that support it, like transferring store leader - CheckPoint string // the audit log ID where we should recover from, this is useful when an action failed and we want to continue that action IgnoreConfigCheck bool // should we ignore the config check result after init config NativeSSH bool // should use native ssh client or builtin easy ssh (deprecated, shoule use SSHType) SSHType executor.SSHType // the ssh type: 'builtin', 'system', 'none' diff --git a/pkg/logger/audit.go b/pkg/logger/audit.go index 01724ee534..61ea7d7974 100644 --- a/pkg/logger/audit.go +++ b/pkg/logger/audit.go @@ -41,7 +41,8 @@ func DisableAuditLog() { } func newAuditLogCore() zapcore.Core { - auditBuffer = bytes.NewBufferString(strings.Join(os.Args, " ") + "\n") + args := audit.EncodeCommandArgs(os.Args) + auditBuffer = bytes.NewBufferString(strings.Join(args, " ") + "\n") encoder := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()) return zapcore.NewCore(encoder, zapcore.Lock(zapcore.AddSync(auditBuffer)), zapcore.DebugLevel) } diff --git a/tests/tiup-cluster/script/cmd_subtest.sh b/tests/tiup-cluster/script/cmd_subtest.sh index 8873acd2de..a72a46b81a 100755 --- a/tests/tiup-cluster/script/cmd_subtest.sh +++ b/tests/tiup-cluster/script/cmd_subtest.sh @@ -73,7 +73,7 @@ function cmd_subtest() { tiup-cluster exec $name -N n1 --command "ls /tmp/checkpoint" tiup-cluster exec $name -N n1 --command "rm -f /tmp/checkpoint" id=`tiup-cluster audit | grep "exec $name" | grep "ls /tmp/checkpoint" | awk '{print $1}'` - tiup-cluster exec $name -N n1 --command "ls /tmp/checkpoint" --checkpoint $id + tiup-cluster replay $id ! tiup-cluster exec $name -N n1 --command "ls /tmp/checkpoint" # test patch overwrite From ddaa0d38d10c9674885a615e84096f2fd7de464d Mon Sep 17 00:00:00 2001 From: lucklove Date: Thu, 25 Feb 2021 15:54:38 +0800 Subject: [PATCH 2/5] Add confirm --- components/cluster/command/replay.go | 11 +++++++++++ components/cluster/command/root.go | 9 --------- components/dm/command/replay.go | 11 +++++++++++ components/dm/command/root.go | 9 --------- tests/tiup-cluster/script/cmd_subtest.sh | 2 +- 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/components/cluster/command/replay.go b/components/cluster/command/replay.go index a9d9409649..e556bb004c 100644 --- a/components/cluster/command/replay.go +++ b/components/cluster/command/replay.go @@ -14,10 +14,13 @@ package command import ( + "fmt" "path" + "strings" "github.com/pingcap/errors" "github.com/pingcap/tiup/pkg/checkpoint" + "github.com/pingcap/tiup/pkg/cliutil" "github.com/pingcap/tiup/pkg/cluster/audit" "github.com/pingcap/tiup/pkg/cluster/spec" "github.com/spf13/cobra" @@ -44,6 +47,14 @@ func newReplayCmd() *cobra.Command { return errors.Annotate(err, "read audit log failed") } + if !skipConfirm { + if err := cliutil.PromptForConfirmOrAbortError( + fmt.Sprintf("Will replay the command `tiup cluster %s`\nDo you want to continue? [y/N]: ", strings.Join(args[1:], " ")), + ); err != nil { + return err + } + } + rootCmd.SetArgs(args[1:]) return rootCmd.Execute() }, diff --git a/components/cluster/command/root.go b/components/cluster/command/root.go index 6fa46e6384..4584d77612 100644 --- a/components/cluster/command/root.go +++ b/components/cluster/command/root.go @@ -18,15 +18,12 @@ import ( "encoding/json" "fmt" "os" - "path" "strings" "time" "github.com/fatih/color" "github.com/google/uuid" "github.com/joomcode/errorx" - "github.com/pingcap/errors" - "github.com/pingcap/tiup/pkg/checkpoint" "github.com/pingcap/tiup/pkg/cliutil" "github.com/pingcap/tiup/pkg/cluster/executor" "github.com/pingcap/tiup/pkg/cluster/flags" @@ -123,12 +120,6 @@ func init() { fmt.Println("The --native-ssh flag has been deprecated, please use --ssh=system") } - if gOpt.CheckPoint != "" { - if err := checkpoint.SetCheckPoint(path.Join(spec.AuditDir(), gOpt.CheckPoint)); err != nil { - return errors.Annotate(err, "set checkpoint failed") - } - } - return nil }, PersistentPostRunE: func(cmd *cobra.Command, args []string) error { diff --git a/components/dm/command/replay.go b/components/dm/command/replay.go index a9d9409649..04e85392f1 100644 --- a/components/dm/command/replay.go +++ b/components/dm/command/replay.go @@ -14,10 +14,13 @@ package command import ( + "fmt" "path" + "strings" "github.com/pingcap/errors" "github.com/pingcap/tiup/pkg/checkpoint" + "github.com/pingcap/tiup/pkg/cliutil" "github.com/pingcap/tiup/pkg/cluster/audit" "github.com/pingcap/tiup/pkg/cluster/spec" "github.com/spf13/cobra" @@ -44,6 +47,14 @@ func newReplayCmd() *cobra.Command { return errors.Annotate(err, "read audit log failed") } + if !skipConfirm { + if err := cliutil.PromptForConfirmOrAbortError( + fmt.Sprintf("Will replay the command `tiup dm %s`\nDo you want to continue? [y/N]: ", strings.Join(args[1:], " ")), + ); err != nil { + return err + } + } + rootCmd.SetArgs(args[1:]) return rootCmd.Execute() }, diff --git a/components/dm/command/root.go b/components/dm/command/root.go index d9e78b396a..27a8234b41 100644 --- a/components/dm/command/root.go +++ b/components/dm/command/root.go @@ -16,14 +16,11 @@ package command import ( "fmt" "os" - "path" "strings" "github.com/fatih/color" "github.com/joomcode/errorx" - "github.com/pingcap/errors" "github.com/pingcap/tiup/components/dm/spec" - "github.com/pingcap/tiup/pkg/checkpoint" "github.com/pingcap/tiup/pkg/cliutil" "github.com/pingcap/tiup/pkg/cluster/executor" "github.com/pingcap/tiup/pkg/cluster/manager" @@ -99,12 +96,6 @@ please backup your data before process.`, fmt.Println("The --native-ssh flag has been deprecated, please use --ssh=system") } - if gOpt.CheckPoint != "" { - if err := checkpoint.SetCheckPoint(path.Join(cspec.AuditDir(), gOpt.CheckPoint)); err != nil { - return errors.Annotate(err, "set checkpoint failed") - } - } - return nil }, PersistentPostRunE: func(cmd *cobra.Command, args []string) error { diff --git a/tests/tiup-cluster/script/cmd_subtest.sh b/tests/tiup-cluster/script/cmd_subtest.sh index a72a46b81a..9c9202b419 100755 --- a/tests/tiup-cluster/script/cmd_subtest.sh +++ b/tests/tiup-cluster/script/cmd_subtest.sh @@ -73,7 +73,7 @@ function cmd_subtest() { tiup-cluster exec $name -N n1 --command "ls /tmp/checkpoint" tiup-cluster exec $name -N n1 --command "rm -f /tmp/checkpoint" id=`tiup-cluster audit | grep "exec $name" | grep "ls /tmp/checkpoint" | awk '{print $1}'` - tiup-cluster replay $id + tiup-cluster replay --yes $id ! tiup-cluster exec $name -N n1 --command "ls /tmp/checkpoint" # test patch overwrite From 7ed712a9af8f863decb6458a9ae732cd1a384e5e Mon Sep 17 00:00:00 2001 From: lucklove Date: Thu, 25 Feb 2021 18:06:07 +0800 Subject: [PATCH 3/5] Fix test --- pkg/cluster/audit/audit.go | 25 +++++++++++++++++++------ pkg/logger/audit.go | 5 +---- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/pkg/cluster/audit/audit.go b/pkg/cluster/audit/audit.go index 1bea79235d..8bf7f9d3e2 100644 --- a/pkg/cluster/audit/audit.go +++ b/pkg/cluster/audit/audit.go @@ -45,11 +45,11 @@ func CommandArgs(fp string) ([]string, error) { } args := strings.Split(scanner.Text(), " ") - return DecodeCommandArgs(args) + return decodeCommandArgs(args) } -// EncodeCommandArgs encode args with url.QueryEscape -func EncodeCommandArgs(args []string) []string { +// encodeCommandArgs encode args with url.QueryEscape +func encodeCommandArgs(args []string) []string { encoded := []string{} for _, arg := range args { @@ -59,8 +59,8 @@ func EncodeCommandArgs(args []string) []string { return encoded } -// DecodeCommandArgs decode args with url.QueryUnescape -func DecodeCommandArgs(args []string) ([]string, error) { +// decodeCommandArgs decode args with url.QueryUnescape +func decodeCommandArgs(args []string) ([]string, error) { decoded := []string{} for _, arg := range args { @@ -113,7 +113,20 @@ func ShowAuditList(dir string) error { // OutputAuditLog outputs audit log. func OutputAuditLog(dir string, data []byte) error { fname := filepath.Join(dir, base52.Encode(time.Now().UnixNano()+rand.Int63n(1000))) - return os.WriteFile(fname, data, 0644) + f, err := os.Create(fname) + if err != nil { + return errors.Annotate(err, "create audit log") + } + defer f.Close() + + args := encodeCommandArgs(os.Args) + if _, err := f.Write([]byte(strings.Join(args, " ") + "\n")); err != nil { + return errors.Annotate(err, "write audit log") + } + if _, err := f.Write(data); err != nil { + return errors.Annotate(err, "write audit log") + } + return nil } // ShowAuditLog show the audit with the specified auditID diff --git a/pkg/logger/audit.go b/pkg/logger/audit.go index 61ea7d7974..90c1c8de07 100644 --- a/pkg/logger/audit.go +++ b/pkg/logger/audit.go @@ -15,8 +15,6 @@ package logger import ( "bytes" - "os" - "strings" "github.com/pingcap/tiup/pkg/cluster/audit" utils2 "github.com/pingcap/tiup/pkg/utils" @@ -41,8 +39,7 @@ func DisableAuditLog() { } func newAuditLogCore() zapcore.Core { - args := audit.EncodeCommandArgs(os.Args) - auditBuffer = bytes.NewBufferString(strings.Join(args, " ") + "\n") + auditBuffer = bytes.NewBuffer([]byte{}) encoder := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()) return zapcore.NewCore(encoder, zapcore.Lock(zapcore.AddSync(auditBuffer)), zapcore.DebugLevel) } From f96e2ca04c71edc54f9b05bd276b7b0b1945b7f9 Mon Sep 17 00:00:00 2001 From: lucklove Date: Fri, 26 Feb 2021 11:36:55 +0800 Subject: [PATCH 4/5] Fix checkpoint compare --- pkg/checkpoint/checkpoint.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/checkpoint/checkpoint.go b/pkg/checkpoint/checkpoint.go index c62a17fd23..c0751a560a 100644 --- a/pkg/checkpoint/checkpoint.go +++ b/pkg/checkpoint/checkpoint.go @@ -207,7 +207,7 @@ next_set: if cf.eq == nil { continue } - if !cf.eq(ma[cf.field], mb[cf.field]) { + if !contains(ma, cf.field) || !contains(mb, cf.field) || !cf.eq(ma[cf.field], mb[cf.field]) { continue next_set } } @@ -215,3 +215,8 @@ next_set: } return false } + +func contains(m map[string]interface{}, f string) bool { + _, ok := m[f] + return ok +} From d25cfa13160fc07e95d9e9dc0cdc68dfee41c0da Mon Sep 17 00:00:00 2001 From: lucklove Date: Fri, 26 Feb 2021 12:19:55 +0800 Subject: [PATCH 5/5] Fix test --- pkg/checkpoint/checkpoint_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/checkpoint/checkpoint_test.go b/pkg/checkpoint/checkpoint_test.go index eaa378a9bf..ba58f122bc 100644 --- a/pkg/checkpoint/checkpoint_test.go +++ b/pkg/checkpoint/checkpoint_test.go @@ -35,6 +35,17 @@ func setup() { return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b) }), ) + + RegisterField( + Field("host", reflect.DeepEqual), + Field("port", func(a, b interface{}) bool { + return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b) + }), + Field("user", reflect.DeepEqual), + Field("src", reflect.DeepEqual), + Field("dst", reflect.DeepEqual), + Field("download", reflect.DeepEqual), + ) } func TestCheckPointSimple(t *testing.T) {