Skip to content

Commit

Permalink
task: implement parallel concurrency limit (#1420)
Browse files Browse the repository at this point in the history
  • Loading branch information
AstroProfundis authored Jun 15, 2021
1 parent e5e1b46 commit 6c92a87
Show file tree
Hide file tree
Showing 26 changed files with 72 additions and 45 deletions.
1 change: 1 addition & 0 deletions components/cluster/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func init() {
rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'")
rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "(EXPERIMENTAL) Use the native SSH client installed on local system instead of the build-in one.")
rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "(EXPERIMENTAL) The executor type: 'builtin', 'system', 'none'.")
rootCmd.PersistentFlags().IntVarP(&gOpt.Concurrency, "concurrency", "c", 5, "max number of parallel tasks allowed")
_ = rootCmd.PersistentFlags().MarkHidden("native-ssh")

rootCmd.AddCommand(
Expand Down
2 changes: 1 addition & 1 deletion components/dm/ansible/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestImportFromAnsible(t *testing.T) {
im, err := NewImporter(dir, "inventory.ini", executor.SSHTypeBuiltin, 0)
assert.Nil(err)
im.testExecutorGetter = &executorGetter{}
clusterName, meta, err := im.ImportFromAnsibleDir(ctxt.New(context.Background()))
clusterName, meta, err := im.ImportFromAnsibleDir(ctxt.New(context.Background(), 0))
assert.Nil(err, "verbose: %+v", err)
assert.Equal("test-cluster", clusterName)

Expand Down
2 changes: 1 addition & 1 deletion components/dm/command/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newImportCmd() *cobra.Command {
return err
}

ctx := ctxt.New(context.Background())
ctx := ctxt.New(context.Background(), 0)
clusterName, meta, err := importer.ImportFromAnsibleDir(ctx)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions components/dm/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ please backup your data before process.`,
rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'")
rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "Use the SSH client installed on local system instead of the build-in one.")
rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "The executor type: 'builtin', 'system', 'none'")
rootCmd.PersistentFlags().IntVarP(&gOpt.Concurrency, "concurrency", "c", 5, "max number of parallel tasks allowed")
_ = rootCmd.PersistentFlags().MarkHidden("native-ssh")

rootCmd.AddCommand(
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/ansible/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout uint64, ssh
Parallel(false, copyFileTasks...).
Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), 0)); err != nil {
return errors.Trace(err)
}
log.Infof("Finished copying configs.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/ansible/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var (
// parseDirs sets values of directories of component
func parseDirs(user string, ins spec.InstanceSpec, sshTimeout uint64, sshType executor.SSHType) (spec.InstanceSpec, error) {
hostName, sshPort := ins.SSH()
ctx := ctxt.New(context.Background())
ctx := ctxt.New(context.Background(), 0)

e, err := executor.New(sshType, false, executor.SSHConfig{
Host: hostName,
Expand Down
46 changes: 28 additions & 18 deletions pkg/cluster/ctxt/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package ctxt

import (
"context"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -59,10 +60,11 @@ type (
// We should use mutex to prevent concurrent R/W for some fields
// because of the same context can be shared in parallel tasks.
Context struct {
mutex sync.RWMutex

Ev EventBus

exec struct {
sync.RWMutex
executors map[string]Executor
stdouts map[string][]byte
stderrs map[string][]byte
Expand All @@ -72,16 +74,23 @@ type (
// The private/public key is used to access remote server via the user `tidb`
PrivateKeyPath string
PublicKeyPath string

Concurrency int // max number of parallel tasks running at the same time
}
)

// New create a context instance.
func New(ctx context.Context) context.Context {
func New(ctx context.Context, limit int) context.Context {
concurrency := runtime.NumCPU()
if limit > 0 {
concurrency = limit
}

ctx = checkpoint.NewContext(ctx)
return context.WithValue(ctx, ctxKey, &Context{
Ev: NewEventBus(),
mutex: sync.RWMutex{},
Ev: NewEventBus(),
exec: struct {
sync.RWMutex
executors map[string]Executor
stdouts map[string][]byte
stderrs map[string][]byte
Expand All @@ -92,6 +101,7 @@ func New(ctx context.Context) context.Context {
stderrs: make(map[string][]byte),
checkResults: make(map[string][]interface{}),
},
Concurrency: concurrency, // default to CPU count
})
}

Expand All @@ -102,9 +112,9 @@ func GetInner(ctx context.Context) *Context {

// Get implements the operation.ExecutorGetter interface.
func (ctx *Context) Get(host string) (e Executor) {
ctx.exec.Lock()
ctx.mutex.Lock()
e, ok := ctx.exec.executors[host]
ctx.exec.Unlock()
ctx.mutex.Unlock()

if !ok {
panic("no init executor for " + host)
Expand All @@ -124,55 +134,55 @@ func (ctx *Context) GetExecutor(host string) (e Executor, ok bool) {
return e.(Executor), true
}

ctx.exec.RLock()
ctx.mutex.RLock()
e, ok = ctx.exec.executors[host]
ctx.exec.RUnlock()
ctx.mutex.RUnlock()
return
}

// SetExecutor set the executor.
func (ctx *Context) SetExecutor(host string, e Executor) {
ctx.exec.Lock()
ctx.mutex.Lock()
if e != nil {
ctx.exec.executors[host] = e
} else {
delete(ctx.exec.executors, host)
}
ctx.exec.Unlock()
ctx.mutex.Unlock()
}

// GetOutputs get the outputs of a host (if has any)
func (ctx *Context) GetOutputs(hostID string) ([]byte, []byte, bool) {
ctx.exec.RLock()
ctx.mutex.RLock()
stdout, ok1 := ctx.exec.stderrs[hostID]
stderr, ok2 := ctx.exec.stdouts[hostID]
ctx.exec.RUnlock()
ctx.mutex.RUnlock()
return stdout, stderr, ok1 && ok2
}

// SetOutputs set the outputs of a host
func (ctx *Context) SetOutputs(hostID string, stdout []byte, stderr []byte) {
ctx.exec.Lock()
ctx.mutex.Lock()
ctx.exec.stderrs[hostID] = stdout
ctx.exec.stdouts[hostID] = stderr
ctx.exec.Unlock()
ctx.mutex.Unlock()
}

// GetCheckResults get the the check result of a host (if has any)
func (ctx *Context) GetCheckResults(host string) (results []interface{}, ok bool) {
ctx.exec.RLock()
ctx.mutex.RLock()
results, ok = ctx.exec.checkResults[host]
ctx.exec.RUnlock()
ctx.mutex.RUnlock()
return
}

// SetCheckResults append the check result of a host to the list
func (ctx *Context) SetCheckResults(host string, results []interface{}) {
ctx.exec.Lock()
ctx.mutex.Lock()
if currResult, ok := ctx.exec.checkResults[host]; ok {
ctx.exec.checkResults[host] = append(currResult, results...)
} else {
ctx.exec.checkResults[host] = results
}
ctx.exec.Unlock()
ctx.mutex.Unlock()
}
4 changes: 2 additions & 2 deletions pkg/cluster/executor/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func TestLocal(t *testing.T) {
ctx := ctxt.New(context.Background())
ctx := ctxt.New(context.Background(), 0)

assert := require.New(t)
user, err := user.Current()
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestWrongIP(t *testing.T) {
}

func TestLocalExecuteWithQuotes(t *testing.T) {
ctx := ctxt.New(context.Background())
ctx := ctxt.New(context.Background(), 0)

assert := require.New(t)
user, err := user.Current()
Expand Down
8 changes: 4 additions & 4 deletions pkg/cluster/manager/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (m *Manager) EnableCluster(name string, options operator.Options, isEnable

t := b.Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), options.Concurrency)); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
Expand Down Expand Up @@ -106,7 +106,7 @@ func (m *Manager) StartCluster(name string, options operator.Options, fn ...func

t := b.Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), options.Concurrency)); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
Expand Down Expand Up @@ -151,7 +151,7 @@ func (m *Manager) StopCluster(name string, options operator.Options, skipConfirm
}).
Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), options.Concurrency)); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
Expand Down Expand Up @@ -196,7 +196,7 @@ func (m *Manager) RestartCluster(name string, options operator.Options, skipConf
}).
Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), options.Concurrency)); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, gO
ParallelStep("+ Cleanup check files", false, cleanTasks...).
Build()

ctx := ctxt.New(context.Background())
ctx := ctxt.New(context.Background(), gOpt.Concurrency)
if err := t.Execute(ctx); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (m *Manager) CleanCluster(name string, gOpt operator.Options, cleanOpt oper
}).
Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), gOpt.Concurrency)); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (m *Manager) Deploy(

t := builder.Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), gOpt.Concurrency)); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/manager/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (m *Manager) DestroyCluster(name string, gOpt operator.Options, destroyOpt
}).
Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), gOpt.Concurrency)); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
Expand Down Expand Up @@ -119,7 +119,7 @@ func (m *Manager) DestroyTombstone(

b := m.sshTaskBuilder(name, topo, base.User, gOpt)

ctx := ctxt.New(context.Background())
ctx := ctxt.New(context.Background(), gOpt.Concurrency)
nodes, err := operator.DestroyTombstone(ctx, cluster, true /* returnNodesOnly */, gOpt, tlsCfg)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/manager/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (m *Manager) Display(name string, opt operator.Options) error {
cliutil.PrintTable(clusterTable, true)
fmt.Printf("Total nodes: %d\n", len(clusterTable)-1)

ctx := ctxt.New(context.Background())
ctx := ctxt.New(context.Background(), opt.Concurrency)
if t, ok := topo.(*spec.Specification); ok {
// Check if TiKV's label set correctly
pdClient := api.NewPDClient(masterActive, 10*time.Second, tlsCfg)
Expand All @@ -227,7 +227,7 @@ func (m *Manager) Display(name string, opt operator.Options) error {

// GetClusterTopology get the topology of the cluster.
func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstInfo, error) {
ctx := ctxt.New(context.Background())
ctx := ctxt.New(context.Background(), opt.Concurrency)
metadata, err := m.meta(name)
if err != nil && !errors.Is(perrs.Cause(err), meta.ErrValidate) &&
!errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (m *Manager) Exec(name string, opt ExecOptions, gOpt operator.Options) erro
Parallel(false, shellTasks...).
Build()

execCtx := ctxt.New(context.Background())
execCtx := ctxt.New(context.Background(), gOpt.Concurrency)
if err := t.Execute(execCtx); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (m *Manager) Patch(name string, packagePath string, opt operator.Options, o
}).
Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), opt.Concurrency)); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (m *Manager) Reload(name string, opt operator.Options, skipRestart, skipCon

t := tb.Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), opt.Concurrency)); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (m *Manager) ScaleIn(
Parallel(force, buildReloadPromTasks(metadata.GetTopology(), nodes...)...).
Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), gOpt.Concurrency)); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (m *Manager) ScaleOut(
return err
}

ctx := ctxt.New(context.Background())
ctx := ctxt.New(context.Background(), gOpt.Concurrency)
ctx = context.WithValue(ctx, ctxt.CtxBaseTopo, topo)
if err := t.Execute(ctx); err != nil {
if errorx.Cast(err) != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (m *Manager) Transfer(name string, opt TransferOptions, gOpt operator.Optio
Parallel(false, shellTasks...).
Build()

execCtx := ctxt.New(context.Background())
execCtx := ctxt.New(context.Background(), gOpt.Concurrency)
if err := t.Execute(execCtx); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (m *Manager) Upgrade(name string, clusterVersion string, opt operator.Optio
}).
Build()

if err := t.Execute(ctxt.New(context.Background())); err != nil {
if err := t.Execute(ctxt.New(context.Background(), opt.Concurrency)); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Options struct {
IgnoreConfigCheck bool // should we ignore the config check result after init config
NativeSSH bool // should use native ssh client or builtin easy ssh (deprecated, shoule use SSHType)
SSHType executor.SSHType // the ssh type: 'builtin', 'system', 'none'
Concurrency int // max number of parallel tasks to run

// What type of things should we cleanup in clean command
CleanupData bool // should we cleanup data
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/grafana_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func TestLocalDashboards(t *testing.T) {
ctx := ctxt.New(context.Background())
ctx := ctxt.New(context.Background(), 0)

deployDir, err := os.MkdirTemp("", "tiup-*")
assert.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/task/init_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func Test(t *testing.T) { check.TestingT(t) }
var _ = check.Suite(&initConfigSuite{})

func (s *initConfigSuite) TestCheckConfig(c *check.C) {
ctx := ctxt.New(context.Background())
ctx := ctxt.New(context.Background(), 0)
defer mock.With("FakeExecutor", &fakeExecutor{})()

t := &InitConfig{
Expand Down
Loading

0 comments on commit 6c92a87

Please sign in to comment.