Skip to content

Commit

Permalink
Merge branch 'master' into fix-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 12, 2021
2 parents 552eebd + 24b80e0 commit 238b56b
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 33 deletions.
6 changes: 6 additions & 0 deletions components/cluster/command/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, gO
Shell(
inst.GetHost(),
filepath.Join(task.CheckToolsPathDir, "bin", "insight"),
"",
false,
).
BuildAsStep(fmt.Sprintf(" - Getting system info of %s:%d", inst.GetHost(), inst.GetSSHPort()))
Expand All @@ -230,6 +231,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, gO
Shell(
inst.GetHost(),
"ss -lnt",
"",
false,
).
CheckSys(
Expand All @@ -243,6 +245,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, gO
Shell(
inst.GetHost(),
"cat /etc/security/limits.conf",
"",
false,
).
CheckSys(
Expand All @@ -256,6 +259,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, gO
Shell(
inst.GetHost(),
"sysctl -a",
"",
true,
).
CheckSys(
Expand Down Expand Up @@ -459,11 +463,13 @@ func fixFailedChecks(ctx *task.Context, host string, res *operator.CheckResult,
"/etc/selinux/config",
"setenforce 0",
),
"",
true)
msg = fmt.Sprintf("will try to %s, reboot might be needed", color.HiBlueString("disable SELinux"))
case operator.CheckNameTHP:
t.Shell(host,
"echo never > /sys/kernel/mm/transparent_hugepage/enabled && echo never > /sys/kernel/mm/transparent_hugepage/defrag",
"",
true)
msg = fmt.Sprintf("will try to %s, please check again after reboot", color.HiBlueString("disable THP"))
default:
Expand Down
62 changes: 41 additions & 21 deletions pkg/cluster/manager/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package manager

import (
"fmt"
"strings"

"github.com/fatih/color"
"github.com/joomcode/errorx"
perrs "github.com/pingcap/errors"
Expand Down Expand Up @@ -44,9 +47,10 @@ func (m *Manager) Exec(name string, opt ExecOptions, gOpt operator.Options) erro
filterNodes := set.NewStringSet(gOpt.Nodes...)

var shellTasks []task.Task
uniqueHosts := map[string]int{} // host -> ssh-port
uniqueHosts := map[string]set.StringSet{} // host-sshPort -> {command}
topo.IterInstance(func(inst spec.Instance) {
if _, found := uniqueHosts[inst.GetHost()]; !found {
key := fmt.Sprintf("%s-%d", inst.GetHost(), inst.GetSSHPort())
if _, found := uniqueHosts[key]; !found {
if len(gOpt.Roles) > 0 && !filterRoles.Exist(inst.Role()) {
return
}
Expand All @@ -55,15 +59,28 @@ func (m *Manager) Exec(name string, opt ExecOptions, gOpt operator.Options) erro
return
}

uniqueHosts[inst.GetHost()] = inst.GetSSHPort()
cmds, err := renderInstanceSpec(opt.Command, inst)
if err != nil {
log.Debugf("error rendering command with spec: %s", err)
return // skip
}
cmdSet := set.NewStringSet(cmds...)
if _, ok := uniqueHosts[key]; ok {
uniqueHosts[key].Join(cmdSet)
return
}
uniqueHosts[key] = cmdSet
}
})

for host := range uniqueHosts {
shellTasks = append(shellTasks,
task.NewBuilder().
Shell(host, opt.Command, opt.Sudo).
Build())
for hostKey, i := range uniqueHosts {
host := strings.Split(hostKey, "-")[0]
for _, cmd := range i.Slice() {
shellTasks = append(shellTasks,
task.NewBuilder().
Shell(host, cmd, hostKey+cmd, opt.Sudo).
Build())
}
}

t := m.sshTaskBuilder(name, topo, base.User, gOpt).
Expand All @@ -80,19 +97,22 @@ func (m *Manager) Exec(name string, opt ExecOptions, gOpt operator.Options) erro
}

// print outputs
for host := range uniqueHosts {
stdout, stderr, ok := execCtx.GetOutputs(host)
if !ok {
continue
}
log.Infof("Outputs of %s on %s:",
color.CyanString(opt.Command),
color.CyanString(host))
if len(stdout) > 0 {
log.Infof("%s:\n%s", color.GreenString("stdout"), stdout)
}
if len(stderr) > 0 {
log.Infof("%s:\n%s", color.RedString("stderr"), stderr)
for hostKey, i := range uniqueHosts {
host := strings.Split(hostKey, "-")[0]
for _, cmd := range i.Slice() {
stdout, stderr, ok := execCtx.GetOutputs(hostKey + cmd)
if !ok {
continue
}
log.Infof("Outputs of %s on %s:",
color.CyanString(cmd),
color.CyanString(host))
if len(stdout) > 0 {
log.Infof("%s:\n%s", color.GreenString("stdout"), stdout)
}
if len(stderr) > 0 {
log.Infof("%s:\n%s", color.RedString("stderr"), stderr)
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/cluster/manager/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func (m *Manager) Transfer(name string, opt TransferOptions, gOpt operator.Optio

var shellTasks []task.Task

uniqueHosts := map[string]set.StringSet{} // host-sshPort-port -> {remote-path}
uniqueHosts := map[string]set.StringSet{} // host-sshPort -> {remote-path}
topo.IterInstance(func(inst spec.Instance) {
key := fmt.Sprintf("%s-%d-%d", inst.GetHost(), inst.GetSSHPort(), inst.GetPort())
key := fmt.Sprintf("%s-%d", inst.GetHost(), inst.GetSSHPort())
if _, found := uniqueHosts[key]; !found {
if len(gOpt.Roles) > 0 && !filterRoles.Exist(inst.Role()) {
return
Expand All @@ -68,6 +68,7 @@ func (m *Manager) Transfer(name string, opt TransferOptions, gOpt operator.Optio
instPath := opt.Remote
paths, err := renderInstanceSpec(instPath, inst)
if err != nil {
log.Debugf("error rendering remote path with spec: %s", err)
return // skip
}
pathSet := set.NewStringSet(paths...)
Expand Down Expand Up @@ -114,8 +115,7 @@ func renderInstanceSpec(t string, inst spec.Instance) ([]string, error) {
switch inst.ComponentName() {
case spec.ComponentTiFlash:
for _, d := range strings.Split(inst.DataDir(), ",") {
tf := inst
tfs, ok := tf.(*spec.TiFlashInstance).InstanceSpec.(spec.TiFlashSpec)
tfs, ok := inst.(*spec.TiFlashInstance).InstanceSpec.(spec.TiFlashSpec)
if !ok {
return result, perrs.Errorf("instance type mismatch for %v", inst)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/cluster/task/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,12 @@ func (b *Builder) Rmdir(host string, dirs ...string) *Builder {
}

// Shell command on cluster host
func (b *Builder) Shell(host, command string, sudo bool) *Builder {
func (b *Builder) Shell(host, command, cmdID string, sudo bool) *Builder {
b.tasks = append(b.tasks, &Shell{
host: host,
command: command,
sudo: sudo,
cmdID: cmdID,
})
return b
}
Expand Down Expand Up @@ -364,6 +365,7 @@ func (b *Builder) DeploySpark(inst spec.Instance, sparkVersion, srcPath, deployD
deployDir,
filepath.Join(deployDir, sparkSubPath),
),
"",
false, // (not) sudo
).CopyComponent(
inst.ComponentName(),
Expand All @@ -380,6 +382,7 @@ func (b *Builder) DeploySpark(inst spec.Instance, sparkVersion, srcPath, deployD
filepath.Join(deployDir, "bin"),
deployDir,
),
"",
false, // (not) sudo
)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/cluster/task/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Shell struct {
host string
command string
sudo bool
cmdID string
}

// Execute implements the Task interface
Expand All @@ -37,7 +38,11 @@ func (m *Shell) Execute(ctx *Context) error {
log.Infof("Run command on %s(sudo:%v): %s", m.host, m.sudo, m.command)

stdout, stderr, err := exec.Execute(m.command, m.sudo)
ctx.SetOutputs(m.host, stdout, stderr)
outputID := m.host
if m.cmdID != "" {
outputID = m.cmdID
}
ctx.SetOutputs(outputID, stdout, stderr)
if err != nil {
return errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/cluster/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,19 @@ func (ctx *Context) SetExecutor(host string, e executor.Executor) {
}

// GetOutputs get the outputs of a host (if has any)
func (ctx *Context) GetOutputs(host string) ([]byte, []byte, bool) {
func (ctx *Context) GetOutputs(hostID string) ([]byte, []byte, bool) {
ctx.exec.RLock()
stdout, ok1 := ctx.exec.stderrs[host]
stderr, ok2 := ctx.exec.stdouts[host]
stdout, ok1 := ctx.exec.stderrs[hostID]
stderr, ok2 := ctx.exec.stdouts[hostID]
ctx.exec.RUnlock()
return stdout, stderr, ok1 && ok2
}

// SetOutputs set the outputs of a host
func (ctx *Context) SetOutputs(host string, stdout []byte, stderr []byte) {
func (ctx *Context) SetOutputs(hostID string, stdout []byte, stderr []byte) {
ctx.exec.Lock()
ctx.exec.stderrs[host] = stdout
ctx.exec.stdouts[host] = stderr
ctx.exec.stderrs[hostID] = stdout
ctx.exec.stdouts[hostID] = stderr
ctx.exec.Unlock()
}

Expand Down

0 comments on commit 238b56b

Please sign in to comment.