Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support replay command to enhance checkpoint #1157

Merged
merged 7 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions components/cluster/command/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package command

import (
"fmt"
"path"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/audit"
"github.com/pingcap/tiup/pkg/cluster/spec"
"github.com/spf13/cobra"
)

func newReplayCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "replay <audit-id>",
Short: "Replay previous operation and skip successed steps",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return cmd.Help()
}

file := path.Join(spec.AuditDir(), args[0])
if !checkpoint.HasCheckPoint() {
if err := checkpoint.SetCheckPoint(file); err != nil {
return errors.Annotate(err, "set checkpoint failed")
}
}

args, err := audit.CommandArgs(file)
if err != nil {
return errors.Annotate(err, "read audit log failed")
}

if !skipConfirm {
if err := cliutil.PromptForConfirmOrAbortError(
fmt.Sprintf("Will replay the command `tiup cluster %s`\nDo you want to continue? [y/N]: ", strings.Join(args[1:], " ")),
); err != nil {
return err
}
}

rootCmd.SetArgs(args[1:])
return rootCmd.Execute()
},
}

return cmd
}
12 changes: 1 addition & 11 deletions components/cluster/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@ import (
"encoding/json"
"fmt"
"os"
"path"
"strings"
"time"

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/joomcode/errorx"
"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/executor"
"github.com/pingcap/tiup/pkg/cluster/flags"
Expand Down Expand Up @@ -123,12 +120,6 @@ func init() {
fmt.Println("The --native-ssh flag has been deprecated, please use --ssh=system")
}

if gOpt.CheckPoint != "" {
if err := checkpoint.SetCheckPoint(path.Join(spec.AuditDir(), gOpt.CheckPoint)); err != nil {
return errors.Annotate(err, "set checkpoint failed")
}
}

return nil
},
PersistentPostRunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -145,9 +136,7 @@ func init() {
rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'")
rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "(EXPERIMENTAL) Use the native SSH client installed on local system instead of the build-in one.")
rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "(EXPERIMENTAL) The executor type: 'builtin', 'system', 'none'.")
rootCmd.PersistentFlags().StringVar(&gOpt.CheckPoint, "checkpoint", "", "(EXPERIMENTAL) The audit log ID this command should recover from.")
_ = rootCmd.PersistentFlags().MarkHidden("native-ssh")
_ = rootCmd.PersistentFlags().MarkHidden("checkpoint")

rootCmd.AddCommand(
newCheckCmd(),
Expand Down Expand Up @@ -176,6 +165,7 @@ func init() {
newPushCmd(),
newTestCmd(), // hidden command for test internally
newTelemetryCmd(),
newReplayCmd(),
newTemplateCmd(),
)
}
Expand Down
64 changes: 64 additions & 0 deletions components/dm/command/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package command

import (
"fmt"
"path"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/audit"
"github.com/pingcap/tiup/pkg/cluster/spec"
"github.com/spf13/cobra"
)

func newReplayCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "replay <audit-id>",
Short: "Replay previous operation and skip successed steps",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return cmd.Help()
}

file := path.Join(spec.AuditDir(), args[0])
if !checkpoint.HasCheckPoint() {
if err := checkpoint.SetCheckPoint(file); err != nil {
return errors.Annotate(err, "set checkpoint failed")
}
}

args, err := audit.CommandArgs(file)
if err != nil {
return errors.Annotate(err, "read audit log failed")
}

if !skipConfirm {
if err := cliutil.PromptForConfirmOrAbortError(
fmt.Sprintf("Will replay the command `tiup dm %s`\nDo you want to continue? [y/N]: ", strings.Join(args[1:], " ")),
); err != nil {
return err
}
}

rootCmd.SetArgs(args[1:])
return rootCmd.Execute()
},
}

return cmd
}
12 changes: 1 addition & 11 deletions components/dm/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@ package command
import (
"fmt"
"os"
"path"
"strings"

"github.com/fatih/color"
"github.com/joomcode/errorx"
"github.com/pingcap/errors"
"github.com/pingcap/tiup/components/dm/spec"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/executor"
"github.com/pingcap/tiup/pkg/cluster/manager"
Expand Down Expand Up @@ -99,12 +96,6 @@ please backup your data before process.`,
fmt.Println("The --native-ssh flag has been deprecated, please use --ssh=system")
}

if gOpt.CheckPoint != "" {
if err := checkpoint.SetCheckPoint(path.Join(cspec.AuditDir(), gOpt.CheckPoint)); err != nil {
return errors.Annotate(err, "set checkpoint failed")
}
}

return nil
},
PersistentPostRunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -119,9 +110,7 @@ please backup your data before process.`,
rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'")
rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "Use the SSH client installed on local system instead of the build-in one.")
rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "The executor type: 'builtin', 'system', 'none'")
rootCmd.PersistentFlags().StringVar(&gOpt.CheckPoint, "checkpoint", "", "(EXPERIMENTAL) The audit log ID this command should recover from.")
_ = rootCmd.PersistentFlags().MarkHidden("native-ssh")
_ = rootCmd.PersistentFlags().MarkHidden("checkpoint")

rootCmd.AddCommand(
newDeployCmd(),
Expand All @@ -143,6 +132,7 @@ please backup your data before process.`,
newImportCmd(),
newEnableCmd(),
newDisableCmd(),
newReplayCmd(),
newTemplateCmd(),
)
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
DebugCheckpoint = os.Getenv("DEBUG_CHECKPOINT") == "1"
)

// SetCheckPoint set global checkpoint for executor
// SetCheckPoint set global checkpoint for replay
func SetCheckPoint(file string) error {
pointReader, err := os.Open(file)
if err != nil {
Expand All @@ -63,6 +63,11 @@ func SetCheckPoint(file string) error {
return nil
}

// HasCheckPoint returns if SetCheckPoint has been called
func HasCheckPoint() bool {
return checkpoint != nil
}

// Acquire wraps CheckPoint.Acquire
func Acquire(ctx context.Context, point map[string]interface{}) *Point {
if ctx.Value(goroutineKey) == nil || ctx.Value(semKey) == nil {
Expand Down Expand Up @@ -202,11 +207,16 @@ next_set:
if cf.eq == nil {
continue
}
if !cf.eq(ma[cf.field], mb[cf.field]) {
if !contains(ma, cf.field) || !contains(mb, cf.field) || !cf.eq(ma[cf.field], mb[cf.field]) {
continue next_set
}
}
return true
}
return false
}

func contains(m map[string]interface{}, f string) bool {
_, ok := m[f]
return ok
}
11 changes: 11 additions & 0 deletions pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ func setup() {
return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b)
}),
)

RegisterField(
Field("host", reflect.DeepEqual),
Field("port", func(a, b interface{}) bool {
return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b)
}),
Field("user", reflect.DeepEqual),
Field("src", reflect.DeepEqual),
Field("dst", reflect.DeepEqual),
Field("download", reflect.DeepEqual),
)
}

func TestCheckPointSimple(t *testing.T) {
Expand Down
72 changes: 58 additions & 14 deletions pkg/cluster/audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package audit
import (
"bufio"
"fmt"
"net/url"
"os"
"path/filepath"
"sort"
Expand All @@ -30,22 +31,51 @@ import (
"github.com/pingcap/tiup/pkg/utils/rand"
)

// ShowAuditList show the audit list.
func ShowAuditList(dir string) error {
firstLine := func(fileName string) (string, error) {
file, err := os.Open(filepath.Join(dir, fileName))
if err != nil {
return "", errors.Trace(err)
}
defer file.Close()
// CommandArgs returns the original commands from the first line of a file
func CommandArgs(fp string) ([]string, error) {
file, err := os.Open(fp)
if err != nil {
return nil, errors.Trace(err)
}
defer file.Close()

scanner := bufio.NewScanner(file)
if !scanner.Scan() {
return nil, errors.New("unknown audit log format")
}

args := strings.Split(scanner.Text(), " ")
return decodeCommandArgs(args)
}

// encodeCommandArgs encode args with url.QueryEscape
func encodeCommandArgs(args []string) []string {
encoded := []string{}

scanner := bufio.NewScanner(file)
if scanner.Scan() {
return scanner.Text(), nil
for _, arg := range args {
encoded = append(encoded, url.QueryEscape(arg))
}

return encoded
}

// decodeCommandArgs decode args with url.QueryUnescape
func decodeCommandArgs(args []string) ([]string, error) {
decoded := []string{}

for _, arg := range args {
a, err := url.QueryUnescape(arg)
if err != nil {
return nil, errors.Annotate(err, "failed on decode the command line of audit log")
}
return "", errors.New("unknown audit log format")
decoded = append(decoded, a)
}

return decoded, nil
}

// ShowAuditList show the audit list.
func ShowAuditList(dir string) error {
// Header
clusterTable := [][]string{{"ID", "Time", "Command"}}
fileInfos, err := os.ReadDir(dir)
Expand All @@ -60,10 +90,11 @@ func ShowAuditList(dir string) error {
if err != nil {
continue
}
cmd, err := firstLine(fi.Name())
args, err := CommandArgs(filepath.Join(dir, fi.Name()))
if err != nil {
continue
}
cmd := strings.Join(args, " ")
clusterTable = append(clusterTable, []string{
fi.Name(),
t.Format(time.RFC3339),
Expand All @@ -82,7 +113,20 @@ func ShowAuditList(dir string) error {
// OutputAuditLog outputs audit log.
func OutputAuditLog(dir string, data []byte) error {
fname := filepath.Join(dir, base52.Encode(time.Now().UnixNano()+rand.Int63n(1000)))
return os.WriteFile(fname, data, 0644)
f, err := os.Create(fname)
if err != nil {
return errors.Annotate(err, "create audit log")
}
defer f.Close()

args := encodeCommandArgs(os.Args)
if _, err := f.Write([]byte(strings.Join(args, " ") + "\n")); err != nil {
return errors.Annotate(err, "write audit log")
}
if _, err := f.Write(data); err != nil {
return errors.Annotate(err, "write audit log")
}
return nil
}

// ShowAuditLog show the audit with the specified auditID
Expand Down
1 change: 0 additions & 1 deletion pkg/cluster/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type Options struct {
SSHTimeout uint64 // timeout in seconds when connecting an SSH server
OptTimeout uint64 // timeout in seconds for operations that support it, not to confuse with SSH timeout
APITimeout uint64 // timeout in seconds for API operations that support it, like transferring store leader
CheckPoint string // the audit log ID where we should recover from, this is useful when an action failed and we want to continue that action
IgnoreConfigCheck bool // should we ignore the config check result after init config
NativeSSH bool // should use native ssh client or builtin easy ssh (deprecated, shoule use SSHType)
SSHType executor.SSHType // the ssh type: 'builtin', 'system', 'none'
Expand Down
Loading