Skip to content

Commit

Permalink
cluster: Scale-out action can be divided into 2 (#1638)
Browse files Browse the repository at this point in the history
  • Loading branch information
srstack authored Nov 24, 2021
1 parent ca94426 commit 0ec99bb
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 77 deletions.
31 changes: 26 additions & 5 deletions components/cluster/command/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,38 @@ func newScaleOutCmd() *cobra.Command {
IdentityFile: filepath.Join(utils.UserHome(), ".ssh", "id_rsa"),
}
cmd := &cobra.Command{
Use: "scale-out <cluster-name> <topology.yaml>",
Use: "scale-out <cluster-name> [topology.yaml]",
Short: "Scale out a TiDB cluster",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 2 {
return cmd.Help()
var (
clusterName string
topoFile string
)

// tiup cluster scale-out --stage1 --stage2
// is equivalent to
// tiup cluster scale-out
if opt.Stage1 && opt.Stage2 {
opt.Stage1 = false
opt.Stage2 = false
}

if opt.Stage2 && len(args) == 1 {
clusterName = args[0]
} else {
if len(args) == 2 {
clusterName = args[0]
topoFile = args[1]
} else {
return cmd.Help()
}
}

clusterName := args[0]
clusterReport.ID = scrubClusterName(clusterName)
teleCommand = append(teleCommand, scrubClusterName(clusterName))

topoFile := args[1]
// stage2: topoFile is ""
if data, err := os.ReadFile(topoFile); err == nil {
teleTopology = string(data)
}
Expand All @@ -64,6 +83,8 @@ func newScaleOutCmd() *cobra.Command {
cmd.Flags().StringVarP(&opt.IdentityFile, "identity_file", "i", opt.IdentityFile, "The path of the SSH identity file. If specified, public key authentication will be used.")
cmd.Flags().BoolVarP(&opt.UsePassword, "password", "p", false, "Use password of target hosts. If specified, password authentication will be used.")
cmd.Flags().BoolVarP(&opt.NoLabels, "no-labels", "", false, "Don't check TiKV labels")
cmd.Flags().BoolVarP(&opt.Stage1, "stage1", "", false, "Don't start the new instance after scale-out, need to manually execute cluster scale-out --stage2")
cmd.Flags().BoolVarP(&opt.Stage2, "stage2", "", false, "Start the new instance and init config after scale-out --stage1")

return cmd
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/cluster/manager/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (m *Manager) EnableCluster(name string, gOpt operator.Options, isEnable boo
func (m *Manager) StartCluster(name string, gOpt operator.Options, fn ...func(b *task.Builder, metadata spec.Metadata)) error {
log.Infof("Starting cluster %s...", name)

// check locked
if err := m.specManager.ScaleOutLockedErr(name); err != nil {
return err
}

metadata, err := m.meta(name)
if err != nil && !errors.Is(perrs.Cause(err), meta.ErrValidate) {
return err
Expand Down Expand Up @@ -127,6 +132,11 @@ func (m *Manager) StartCluster(name string, gOpt operator.Options, fn ...func(b

// StopCluster stop the cluster.
func (m *Manager) StopCluster(name string, gOpt operator.Options, skipConfirm bool) error {
// check locked
if err := m.specManager.ScaleOutLockedErr(name); err != nil {
return err
}

metadata, err := m.meta(name)
if err != nil && !errors.Is(perrs.Cause(err), meta.ErrValidate) {
return err
Expand Down Expand Up @@ -177,6 +187,11 @@ func (m *Manager) StopCluster(name string, gOpt operator.Options, skipConfirm bo

// RestartCluster restart the cluster.
func (m *Manager) RestartCluster(name string, gOpt operator.Options, skipConfirm bool) error {
// check locked
if err := m.specManager.ScaleOutLockedErr(name); err != nil {
return err
}

metadata, err := m.meta(name)
if err != nil && !errors.Is(perrs.Cause(err), meta.ErrValidate) {
return err
Expand Down
44 changes: 31 additions & 13 deletions pkg/cluster/manager/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,30 +330,48 @@ func buildScaleOutTask(
if err != nil {
return nil, err
}
builder.
Parallel(false, downloadCompTasks...).
Parallel(false, envInitTasks...).
Parallel(false, deployCompTasks...)

// stage2 just start and init config
if !opt.Stage2 {
builder.
Parallel(false, downloadCompTasks...).
Parallel(false, envInitTasks...).
Parallel(false, deployCompTasks...)
}

if afterDeploy != nil {
afterDeploy(builder, newPart, gOpt)
}

builder.
Func("Save meta", func(_ context.Context) error {
metadata.SetTopology(mergedTopo)
return m.specManager.SaveMeta(name, metadata)
}).
Func("StartCluster", func(ctx context.Context) error {
builder.Func("Save meta", func(_ context.Context) error {
metadata.SetTopology(mergedTopo)
return m.specManager.SaveMeta(name, metadata)
})

// don't start the new instance
if opt.Stage1 {
// save scale out file lock
builder.Func("Create Scale-Out File Lock", func(_ context.Context) error {
return m.specManager.NewScaleOutLock(name, newPart)
})
} else {
builder.Func("Start Cluster", func(ctx context.Context) error {
return operator.Start(ctx, newPart, operator.Options{OptTimeout: gOpt.OptTimeout, Operation: operator.ScaleOutOperation}, tlsCfg)
}).
Parallel(false, refreshConfigTasks...).
Parallel(false, buildReloadPromTasks(metadata.GetTopology(), gOpt)...)
Parallel(false, refreshConfigTasks...).
Parallel(false, buildReloadPromTasks(metadata.GetTopology(), gOpt)...)
}

// remove scale-out file lock
if opt.Stage2 {
builder.Func("Release Scale-Out File Lock", func(ctx context.Context) error {
return m.specManager.ReleaseScaleOutLock(name)
})
}

if final != nil {
final(builder, name, metadata, gOpt)
}

return builder.Build(), nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/cluster/manager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type DeployOptions struct {
UsePassword bool // use password instead of identity file for ssh connection
IgnoreConfigCheck bool // ignore config check result
NoLabels bool // don't check labels for TiKV instance
Stage1 bool // don't start the new instance, just deploy
Stage2 bool // start instances and init Config after stage1
}

// DeployerInstance is a instance can deploy to a target deploy directory.
Expand Down
8 changes: 8 additions & 0 deletions pkg/cluster/manager/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ func (m *Manager) Patch(name string, packagePath string, opt operator.Options, o
return err
}

// check locked
if err := m.specManager.ScaleOutLockedErr(name); err != nil {
if !offline {
return errorx.Cast(err).
WithProperty(tui.SuggestionFromString("Please run tiup-cluster patch --offline to try again"))
}
}

metadata, err := m.meta(name)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/manager/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func (m *Manager) Reload(name string, gOpt operator.Options, skipRestart, skipCo
return err
}

// check locked
if err := m.specManager.ScaleOutLockedErr(name); err != nil {
return err
}

sshTimeout := gOpt.SSHTimeout
exeTimeout := gOpt.OptTimeout

Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/manager/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func (m *Manager) ScaleIn(
return err
}

// check locked
if err := m.specManager.ScaleOutLockedErr(name); err != nil {
return err
}

var (
force bool = gOpt.Force
nodes []string = gOpt.Nodes
Expand Down
Loading

0 comments on commit 0ec99bb

Please sign in to comment.