Skip to content

Commit

Permalink
cluster/executor: implement native SCP download instead of cat (#1382)
Browse files Browse the repository at this point in the history
  • Loading branch information
AstroProfundis authored May 25, 2021
1 parent fdadd94 commit 876377b
Show file tree
Hide file tree
Showing 31 changed files with 202 additions and 70 deletions.
1 change: 1 addition & 0 deletions components/cluster/command/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func newPullCmd() *cobra.Command {

cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Only exec on host with specified roles")
cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only exec on host with specified nodes")
cmd.Flags().IntVarP(&opt.Limit, "limit", "l", 0, "Limits the used bandwidth, specified in Kbit/s")

return cmd
}
Expand Down
4 changes: 2 additions & 2 deletions components/dm/ansible/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (im *Importer) fetchFile(ctx context.Context, host string, port int, fname

tmp = filepath.Join(tmp, filepath.Base(fname))

err = e.Transfer(ctx, fname, tmp, true /*download*/)
err = e.Transfer(ctx, fname, tmp, true /*download*/, 0)
if err != nil {
return nil, errors.Annotatef(err, "transfer %s from %s:%d", fname, host, port)
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func (im *Importer) ScpSourceToMaster(ctx context.Context, topo *spec.Specificat
return errors.AddStack(err)
}

err = e.Transfer(ctx, f.Name(), filepath.Join(target, addr+".yml"), false)
err = e.Transfer(ctx, f.Name(), filepath.Join(target, addr+".yml"), false, 0)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions components/dm/ansible/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ func (g *executorGetter) Get(host string) ctxt.Executor {

// Transfer implements executor interface.
// Replace the deploy directory as the local one in testdata, so we can fetch it.
func (l *localExecutor) Transfer(ctx context.Context, src string, target string, download bool) error {
func (l *localExecutor) Transfer(ctx context.Context, src, target string, download bool, limit int) error {
mydeploy, err := filepath.Abs("./testdata/deploy_dir/" + l.host)
if err != nil {
return errors.AddStack(err)
}
src = strings.Replace(src, "/home/tidb/deploy", mydeploy, 1)
return l.Local.Transfer(ctx, src, target, download)
return l.Local.Transfer(ctx, src, target, download, 0)
}

func TestParseRunScript(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions components/dm/spec/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (i *MasterInstance) InitConfig(
return err
}
dst := filepath.Join(paths.Deploy, "scripts", "run_dm-master.sh")
if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}
if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil {
Expand Down Expand Up @@ -176,7 +176,7 @@ func (i *MasterInstance) ScaleConfig(
}

dst := filepath.Join(paths.Deploy, "scripts", "run_dm-master.sh")
if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}
if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil {
Expand Down Expand Up @@ -265,7 +265,7 @@ func (i *WorkerInstance) InitConfig(
}
dst := filepath.Join(paths.Deploy, "scripts", "run_dm-worker.sh")

if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/cluster/ansible/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout uint64, ssh
inst.GetHost(),
inst.GetPort())),
inst.GetHost(),
true).
true,
0).
Build()
copyFileTasks = append(copyFileTasks, t)
case spec.ComponentTiFlash:
Expand All @@ -71,7 +72,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout uint64, ssh
inst.GetHost(),
inst.GetPort())),
inst.GetHost(),
true).
true,
0).
CopyFile(filepath.Join(inst.DeployDir(), "conf", inst.ComponentName()+"-learner.toml"),
spec.ClusterPath(name,
spec.AnsibleImportedConfigPath,
Expand All @@ -80,7 +82,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout uint64, ssh
inst.GetHost(),
inst.GetPort())),
inst.GetHost(),
true).
true,
0).
Build()
copyFileTasks = append(copyFileTasks, t)
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/ctxt/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type (
Execute(ctx context.Context, cmd string, sudo bool, timeout ...time.Duration) (stdout []byte, stderr []byte, err error)

// Transfer copies files from or to a target
Transfer(ctx context.Context, src string, dst string, download bool) error
Transfer(ctx context.Context, src, dst string, download bool, limit int) error
}

// ExecutorGetter get the executor by host.
Expand Down
5 changes: 3 additions & 2 deletions pkg/cluster/executor/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,15 @@ func (c *CheckPointExecutor) Execute(ctx context.Context, cmd string, sudo bool,
}

// Transfer implements Executer interface.
func (c *CheckPointExecutor) Transfer(ctx context.Context, src string, dst string, download bool) (err error) {
func (c *CheckPointExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) (err error) {
point := checkpoint.Acquire(ctx, scpPoint, map[string]interface{}{
"host": c.config.Host,
"port": c.config.Port,
"user": c.config.User,
"src": src,
"dst": dst,
"download": download,
"limit": limit,
})
defer func() {
point.Release(err,
Expand All @@ -108,5 +109,5 @@ func (c *CheckPointExecutor) Transfer(ctx context.Context, src string, dst strin
return nil
}

return c.Executor.Transfer(ctx, src, dst, download)
return c.Executor.Transfer(ctx, src, dst, download, limit)
}
2 changes: 1 addition & 1 deletion pkg/cluster/executor/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (l *Local) Execute(ctx context.Context, cmd string, sudo bool, timeout ...t
}

// Transfer implements Executer interface.
func (l *Local) Transfer(ctx context.Context, src string, dst string, download bool) error {
func (l *Local) Transfer(ctx context.Context, src, dst string, download bool, limit int) error {
targetPath := filepath.Dir(dst)
if err := utils.CreateDir(targetPath); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/executor/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestLocal(t *testing.T) {
defer os.Remove(dst.Name())

// Transfer src to dst and check it.
err = local.Transfer(ctx, src.Name(), dst.Name(), false)
err = local.Transfer(ctx, src.Name(), dst.Name(), false, 0)
assert.Nil(err)

data, err := os.ReadFile(dst.Name())
Expand Down
129 changes: 129 additions & 0 deletions pkg/cluster/executor/scp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"bufio"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/pingcap/tiup/pkg/utils"
"golang.org/x/crypto/ssh"
)

// ScpDownload downloads a file from remote with SCP
// The implementation is partially inspired by github.com/dtylman/scp
func ScpDownload(session *ssh.Session, client *ssh.Client, src, dst string, limit int) error {
r, err := session.StdoutPipe()
if err != nil {
return err
}
bufr := bufio.NewReader(r)

w, err := session.StdinPipe()
if err != nil {
return err
}

copyF := func() error {
// parse SCP command
line, _, err := bufr.ReadLine()
if err != nil {
return err
}
if line[0] != byte('C') {
return fmt.Errorf("incorrect scp command '%b', should be 'C'", line[0])
}

mode, err := strconv.ParseUint(string(line[1:5]), 0, 32)
if err != nil {
return fmt.Errorf("error parsing file mode; %s", err)
}

// prepare dst file
targetPath := filepath.Dir(dst)
if err := utils.CreateDir(targetPath); err != nil {
return err
}
targetFile, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fs.FileMode(mode))
if err != nil {
return err
}
defer targetFile.Close()

size, err := strconv.Atoi(strings.Fields(string(line))[1])
if err != nil {
return err
}

if err := ack(w); err != nil {
return err
}

// transferring data
n, err := io.CopyN(targetFile, bufr, int64(size))
if err != nil {
return err
}
if n < int64(size) {
return fmt.Errorf("error downloading via scp, file size mismatch")
}
if err := targetFile.Sync(); err != nil {
return err
}

return ack(w)
}

copyErrC := make(chan error, 1)
go func() {
defer w.Close()
copyErrC <- copyF()
}()

remoteCmd := fmt.Sprintf("scp -f %s", src)
if limit > 0 {
remoteCmd = fmt.Sprintf("scp -l %d -f %s", limit, src)
}
err = session.Start(remoteCmd)
if err != nil {
return err
}
if err := ack(w); err != nil { // send an empty byte to start transfer
return err
}

err = <-copyErrC
if err != nil {
return err
}
return session.Wait()
}

func ack(w io.Writer) error {
msg := []byte("\x00")
n, err := w.Write(msg)
if err != nil {
return fmt.Errorf("fail to send response to remote: %s", err)
}
if n < len(msg) {
return fmt.Errorf("fail to send response to remote, size mismatch")
}
return nil
}
20 changes: 6 additions & 14 deletions pkg/cluster/executor/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (e *EasySSHExecutor) Execute(ctx context.Context, cmd string, sudo bool, ti
// This function depends on `scp` (a tool from OpenSSH or other SSH implementation)
// This function is based on easyssh.MakeConfig.Scp() but with support of copying
// file from remote to local.
func (e *EasySSHExecutor) Transfer(ctx context.Context, src string, dst string, download bool) error {
func (e *EasySSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) error {
if !download {
err := e.Config.Scp(src, dst)
if err != nil {
Expand All @@ -197,18 +197,7 @@ func (e *EasySSHExecutor) Transfer(ctx context.Context, src string, dst string,
defer client.Close()
defer session.Close()

targetPath := filepath.Dir(dst)
if err = utils.CreateDir(targetPath); err != nil {
return err
}
targetFile, err := os.Create(dst)
if err != nil {
return err
}

session.Stdout = targetFile

return session.Run(fmt.Sprintf("cat %s", src))
return ScpDownload(session, client, src, dst, limit)
}

func (e *NativeSSHExecutor) prompt(def string) string {
Expand Down Expand Up @@ -319,7 +308,7 @@ func (e *NativeSSHExecutor) Execute(ctx context.Context, cmd string, sudo bool,

// Transfer copies files via SCP
// This function depends on `scp` (a tool from OpenSSH or other SSH implementation)
func (e *NativeSSHExecutor) Transfer(ctx context.Context, src string, dst string, download bool) error {
func (e *NativeSSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) error {
if e.ConnectionTestResult != nil {
return e.ConnectionTestResult
}
Expand All @@ -334,6 +323,9 @@ func (e *NativeSSHExecutor) Transfer(ctx context.Context, src string, dst string
}

args := []string{scp, "-r", "-o", "StrictHostKeyChecking=no"}
if limit > 0 {
args = append(args, "-l", fmt.Sprint(limit))
}
args = e.configArgs(args) // prefix and postfix args

if download {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cluster/manager/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type TransferOptions struct {
Local string
Remote string
Pull bool // default to push
Limit int // rate limit in Kbit/s
}

// Transfer copies files from or to host in the tidb cluster.
Expand Down Expand Up @@ -93,9 +94,9 @@ func (m *Manager) Transfer(name string, opt TransferOptions, gOpt operator.Optio
for _, p := range i.Slice() {
t := task.NewBuilder()
if opt.Pull {
t.CopyFile(p, srcPath, host, opt.Pull)
t.CopyFile(p, srcPath, host, opt.Pull, opt.Limit)
} else {
t.CopyFile(srcPath, p, host, opt.Pull)
t.CopyFile(srcPath, p, host, opt.Pull, opt.Limit)
}
shellTasks = append(shellTasks, t.Build())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/operation/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func GetNodeInfo(

dstDir := filepath.Join(dir, "bin")
dstPath := filepath.Join(dstDir, path.Base(srcPath))
err = exec.Transfer(nctx, srcPath, dstPath, false)
err = exec.Transfer(nctx, srcPath, dstPath, false, 0)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (i *AlertManagerInstance) InitConfig(
}

dst := filepath.Join(paths.Deploy, "scripts", "run_alertmanager.sh")
if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}
if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (i *CDCInstance) InitConfig(
return err
}
dst := filepath.Join(paths.Deploy, "scripts", "run_cdc.sh")
if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (i *DrainerInstance) InitConfig(
return err
}
dst := filepath.Join(paths.Deploy, "scripts", "run_drainer.sh")
if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 876377b

Please sign in to comment.