diff --git a/components/cluster/command/scale_out.go b/components/cluster/command/scale_out.go index b29013c7a6..b25a54fa2c 100644 --- a/components/cluster/command/scale_out.go +++ b/components/cluster/command/scale_out.go @@ -30,19 +30,38 @@ func newScaleOutCmd() *cobra.Command { IdentityFile: filepath.Join(utils.UserHome(), ".ssh", "id_rsa"), } cmd := &cobra.Command{ - Use: "scale-out ", + Use: "scale-out [topology.yaml]", Short: "Scale out a TiDB cluster", SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { - if len(args) != 2 { - return cmd.Help() + var ( + clusterName string + topoFile string + ) + + // tiup cluster scale-out --stage1 --stage2 + // is equivalent to + // tiup cluster scale-out + if opt.Stage1 && opt.Stage2 { + opt.Stage1 = false + opt.Stage2 = false + } + + if opt.Stage2 && len(args) == 1 { + clusterName = args[0] + } else { + if len(args) == 2 { + clusterName = args[0] + topoFile = args[1] + } else { + return cmd.Help() + } } - clusterName := args[0] clusterReport.ID = scrubClusterName(clusterName) teleCommand = append(teleCommand, scrubClusterName(clusterName)) - topoFile := args[1] + // stage2: topoFile is "" if data, err := os.ReadFile(topoFile); err == nil { teleTopology = string(data) } @@ -64,6 +83,8 @@ func newScaleOutCmd() *cobra.Command { cmd.Flags().StringVarP(&opt.IdentityFile, "identity_file", "i", opt.IdentityFile, "The path of the SSH identity file. If specified, public key authentication will be used.") cmd.Flags().BoolVarP(&opt.UsePassword, "password", "p", false, "Use password of target hosts. If specified, password authentication will be used.") cmd.Flags().BoolVarP(&opt.NoLabels, "no-labels", "", false, "Don't check TiKV labels") + cmd.Flags().BoolVarP(&opt.Stage1, "stage1", "", false, "Don't start the new instance after scale-out, need to manually execute cluster scale-out --stage2") + cmd.Flags().BoolVarP(&opt.Stage2, "stage2", "", false, "Start the new instance and init config after scale-out --stage1") return cmd } diff --git a/components/playground/main.go b/components/playground/main.go index ddb49ac99d..7de793e9b9 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -96,9 +96,9 @@ const ( // hosts clusterHost = "host" - dbHost = "db.Host" - dbPort = "db.Port" - pdHost = "pd.Host" + dbHost = "db.host" + dbPort = "db.port" + pdHost = "pd.host" // config paths dbConfig = "db.config" diff --git a/pkg/cluster/manager/basic.go b/pkg/cluster/manager/basic.go index 11b94a5e77..7ad95da9b2 100644 --- a/pkg/cluster/manager/basic.go +++ b/pkg/cluster/manager/basic.go @@ -85,6 +85,11 @@ func (m *Manager) EnableCluster(name string, gOpt operator.Options, isEnable boo func (m *Manager) StartCluster(name string, gOpt operator.Options, fn ...func(b *task.Builder, metadata spec.Metadata)) error { log.Infof("Starting cluster %s...", name) + // check locked + if err := m.specManager.ScaleOutLockedErr(name); err != nil { + return err + } + metadata, err := m.meta(name) if err != nil && !errors.Is(perrs.Cause(err), meta.ErrValidate) { return err @@ -127,6 +132,11 @@ func (m *Manager) StartCluster(name string, gOpt operator.Options, fn ...func(b // StopCluster stop the cluster. func (m *Manager) StopCluster(name string, gOpt operator.Options, skipConfirm bool) error { + // check locked + if err := m.specManager.ScaleOutLockedErr(name); err != nil { + return err + } + metadata, err := m.meta(name) if err != nil && !errors.Is(perrs.Cause(err), meta.ErrValidate) { return err @@ -177,6 +187,11 @@ func (m *Manager) StopCluster(name string, gOpt operator.Options, skipConfirm bo // RestartCluster restart the cluster. func (m *Manager) RestartCluster(name string, gOpt operator.Options, skipConfirm bool) error { + // check locked + if err := m.specManager.ScaleOutLockedErr(name); err != nil { + return err + } + metadata, err := m.meta(name) if err != nil && !errors.Is(perrs.Cause(err), meta.ErrValidate) { return err diff --git a/pkg/cluster/manager/builder.go b/pkg/cluster/manager/builder.go index 970983caa1..8dc269eb2e 100644 --- a/pkg/cluster/manager/builder.go +++ b/pkg/cluster/manager/builder.go @@ -330,30 +330,48 @@ func buildScaleOutTask( if err != nil { return nil, err } - builder. - Parallel(false, downloadCompTasks...). - Parallel(false, envInitTasks...). - Parallel(false, deployCompTasks...) + + // stage2 just start and init config + if !opt.Stage2 { + builder. + Parallel(false, downloadCompTasks...). + Parallel(false, envInitTasks...). + Parallel(false, deployCompTasks...) + } if afterDeploy != nil { afterDeploy(builder, newPart, gOpt) } - builder. - Func("Save meta", func(_ context.Context) error { - metadata.SetTopology(mergedTopo) - return m.specManager.SaveMeta(name, metadata) - }). - Func("StartCluster", func(ctx context.Context) error { + builder.Func("Save meta", func(_ context.Context) error { + metadata.SetTopology(mergedTopo) + return m.specManager.SaveMeta(name, metadata) + }) + + // don't start the new instance + if opt.Stage1 { + // save scale out file lock + builder.Func("Create Scale-Out File Lock", func(_ context.Context) error { + return m.specManager.NewScaleOutLock(name, newPart) + }) + } else { + builder.Func("Start Cluster", func(ctx context.Context) error { return operator.Start(ctx, newPart, operator.Options{OptTimeout: gOpt.OptTimeout, Operation: operator.ScaleOutOperation}, tlsCfg) }). - Parallel(false, refreshConfigTasks...). - Parallel(false, buildReloadPromTasks(metadata.GetTopology(), gOpt)...) + Parallel(false, refreshConfigTasks...). + Parallel(false, buildReloadPromTasks(metadata.GetTopology(), gOpt)...) + } + + // remove scale-out file lock + if opt.Stage2 { + builder.Func("Release Scale-Out File Lock", func(ctx context.Context) error { + return m.specManager.ReleaseScaleOutLock(name) + }) + } if final != nil { final(builder, name, metadata, gOpt) } - return builder.Build(), nil } diff --git a/pkg/cluster/manager/deploy.go b/pkg/cluster/manager/deploy.go index f838aa5f96..05eae3e7fa 100644 --- a/pkg/cluster/manager/deploy.go +++ b/pkg/cluster/manager/deploy.go @@ -49,6 +49,8 @@ type DeployOptions struct { UsePassword bool // use password instead of identity file for ssh connection IgnoreConfigCheck bool // ignore config check result NoLabels bool // don't check labels for TiKV instance + Stage1 bool // don't start the new instance, just deploy + Stage2 bool // start instances and init Config after stage1 } // DeployerInstance is a instance can deploy to a target deploy directory. diff --git a/pkg/cluster/manager/patch.go b/pkg/cluster/manager/patch.go index 4987b4c922..8e51f36ce1 100644 --- a/pkg/cluster/manager/patch.go +++ b/pkg/cluster/manager/patch.go @@ -41,6 +41,14 @@ func (m *Manager) Patch(name string, packagePath string, opt operator.Options, o return err } + // check locked + if err := m.specManager.ScaleOutLockedErr(name); err != nil { + if !offline { + return errorx.Cast(err). + WithProperty(tui.SuggestionFromString("Please run tiup-cluster patch --offline to try again")) + } + } + metadata, err := m.meta(name) if err != nil { return err diff --git a/pkg/cluster/manager/reload.go b/pkg/cluster/manager/reload.go index e1f48c3b19..6d9bdafa8e 100644 --- a/pkg/cluster/manager/reload.go +++ b/pkg/cluster/manager/reload.go @@ -37,6 +37,11 @@ func (m *Manager) Reload(name string, gOpt operator.Options, skipRestart, skipCo return err } + // check locked + if err := m.specManager.ScaleOutLockedErr(name); err != nil { + return err + } + sshTimeout := gOpt.SSHTimeout exeTimeout := gOpt.OptTimeout diff --git a/pkg/cluster/manager/scale_in.go b/pkg/cluster/manager/scale_in.go index 344b96f25d..b518ed02fd 100644 --- a/pkg/cluster/manager/scale_in.go +++ b/pkg/cluster/manager/scale_in.go @@ -43,6 +43,11 @@ func (m *Manager) ScaleIn( return err } + // check locked + if err := m.specManager.ScaleOutLockedErr(name); err != nil { + return err + } + var ( force bool = gOpt.Force nodes []string = gOpt.Nodes diff --git a/pkg/cluster/manager/scale_out.go b/pkg/cluster/manager/scale_out.go index bd8651b404..c57b89b4fa 100644 --- a/pkg/cluster/manager/scale_out.go +++ b/pkg/cluster/manager/scale_out.go @@ -51,9 +51,9 @@ func (m *Manager) ScaleOut( return err } - // check for the input topology to let user confirm if there're any - // global configs set - if err := checkForGlobalConfigs(topoFile); err != nil { + // check the scale out file lock is exist + err := checkScaleOutLock(m, name, opt) + if err != nil { return err } @@ -71,28 +71,43 @@ func (m *Manager) ScaleOut( // because some default value rely on the global options and monitored options. newPart := topo.NewPart() - // The no tispark master error is ignored, as if the tispark master is removed from the topology - // file for some reason (manual edit, for example), it is still possible to scale-out it to make - // the whole topology back to normal state. - if err := spec.ParseTopologyYaml(topoFile, newPart); err != nil && - !errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) { - return err - } + // if stage2 is true, the new part data store in scale-out file lock + if opt.Stage2 { + // Acquire the Scale-out file lock + newPart, err = m.specManager.ScaleOutLock(name) + if err != nil { + return err + } + } else { // if stage2 is true, not need check topology or other + // check for the input topology to let user confirm if there're any + // global configs set + if err := checkForGlobalConfigs(topoFile); err != nil { + return err + } - if clusterSpec, ok := topo.(*spec.Specification); ok { - if clusterSpec.GlobalOptions.TLSEnabled && - semver.Compare(base.Version, "v4.0.5") < 0 && - len(clusterSpec.TiFlashServers) > 0 { - return fmt.Errorf("TiFlash %s is not supported in TLS enabled cluster", base.Version) + // The no tispark master error is ignored, as if the tispark master is removed from the topology + // file for some reason (manual edit, for example), it is still possible to scale-out it to make + // the whole topology back to normal state. + if err := spec.ParseTopologyYaml(topoFile, newPart); err != nil && + !errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) { + return err } - } - if newPartTopo, ok := newPart.(*spec.Specification); ok { - newPartTopo.AdjustByVersion(base.Version) - } + if clusterSpec, ok := topo.(*spec.Specification); ok { + if clusterSpec.GlobalOptions.TLSEnabled && + semver.Compare(base.Version, "v4.0.5") < 0 && + len(clusterSpec.TiFlashServers) > 0 { + return fmt.Errorf("TiFlash %s is not supported in TLS enabled cluster", base.Version) + } + } - if err := validateNewTopo(newPart); err != nil { - return err + if newPartTopo, ok := newPart.(*spec.Specification); ok { + newPartTopo.AdjustByVersion(base.Version) + } + + if err := validateNewTopo(newPart); err != nil { + return err + } } var ( @@ -115,46 +130,53 @@ func (m *Manager) ScaleOut( return err } - // Abort scale out operation if the merged topology is invalid - mergedTopo := topo.MergeTopo(newPart) - if err := mergedTopo.Validate(); err != nil { - return err - } - spec.ExpandRelativeDir(mergedTopo) + var mergedTopo spec.Topology + // in satge2, not need mergedTopo + if opt.Stage2 { + mergedTopo = topo + } else { + // Abort scale out operation if the merged topology is invalid + mergedTopo = topo.MergeTopo(newPart) + if err := mergedTopo.Validate(); err != nil { + return err + } + spec.ExpandRelativeDir(mergedTopo) - if topo, ok := mergedTopo.(*spec.Specification); ok { - // Check if TiKV's label set correctly - if !opt.NoLabels { - pdList := topo.BaseTopo().MasterList - tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir)) - if err != nil { - return err - } - pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg) - lbs, placementRule, err := pdClient.GetLocationLabels() - if err != nil { - return err - } - if !placementRule { - if err := spec.CheckTiKVLabels(lbs, mergedTopo.(*spec.Specification)); err != nil { - return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err) + if topo, ok := mergedTopo.(*spec.Specification); ok { + // Check if TiKV's label set correctly + if !opt.NoLabels { + pdList := topo.BaseTopo().MasterList + tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir)) + if err != nil { + return err + } + pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg) + lbs, placementRule, err := pdClient.GetLocationLabels() + if err != nil { + return err + } + if !placementRule { + if err := spec.CheckTiKVLabels(lbs, mergedTopo.(*spec.Specification)); err != nil { + return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err) + } } } } - } - clusterList, err := m.specManager.GetAllClusters() - if err != nil { - return err - } - if err := spec.CheckClusterPortConflict(clusterList, name, mergedTopo); err != nil { - return err - } - if err := spec.CheckClusterDirConflict(clusterList, name, mergedTopo); err != nil { - return err + clusterList, err := m.specManager.GetAllClusters() + if err != nil { + return err + } + if err := spec.CheckClusterPortConflict(clusterList, name, mergedTopo); err != nil { + return err + } + if err := spec.CheckClusterDirConflict(clusterList, name, mergedTopo); err != nil { + return err + } } patchedComponents := set.NewStringSet() + // if stage2 is true, this check is not work newPart.IterInstance(func(instance spec.Instance) { if utils.IsExist(m.specManager.Path(name, spec.PatchDirName, instance.ComponentName()+".tar.gz")) { patchedComponents.Insert(instance.ComponentName()) @@ -187,6 +209,11 @@ func (m *Manager) ScaleOut( return perrs.Trace(err) } + if opt.Stage1 { + log.Infof(color.YellowString(`The new instance is not started! +You need to execute 'tiup cluster scale-out %s --stage2' to start the new instance.`, name)) + } + log.Infof("Scaled cluster `%s` out successfully", name) return nil @@ -238,3 +265,32 @@ set them in the specification fileds for each host.`)) } return nil } + +// checkEnvWithStage1 check environment in scale-out stage 1 +func checkScaleOutLock(m *Manager, name string, opt DeployOptions) error { + locked, _ := m.specManager.IsScaleOutLocked(name) + + if (!opt.Stage1 && !opt.Stage2) && locked { + return m.specManager.ScaleOutLockedErr(name) + } + + if opt.Stage1 { + if locked { + return m.specManager.ScaleOutLockedErr(name) + } + log.Warnf(color.YellowString(`The parameter '--stage1' is set, new instance will not be started +Please manually execute 'tiup cluster scale-out %s --stage2' to finish the process.`, name)) + return tui.PromptForConfirmOrAbortError("Do you want to continue? [y/N]: ") + } + + if opt.Stage2 { + if !locked { + return fmt.Errorf("The scale-out file lock does not exist, please make sure to run 'tiup-cluster scale-out %s --stage1' first", name) + } + + log.Warnf(color.YellowString(`The parameter '--stage2' is set, only start the new instances and reload configs.`)) + return tui.PromptForConfirmOrAbortError("Do you want to continue? [y/N]: ") + } + + return nil +} diff --git a/pkg/cluster/manager/upgrade.go b/pkg/cluster/manager/upgrade.go index 56e16e5b31..6bb3591368 100644 --- a/pkg/cluster/manager/upgrade.go +++ b/pkg/cluster/manager/upgrade.go @@ -41,6 +41,11 @@ func (m *Manager) Upgrade(name string, clusterVersion string, opt operator.Optio return err } + // check locked + if err := m.specManager.ScaleOutLockedErr(name); err != nil { + return err + } + metadata, err := m.meta(name) if err != nil { return err diff --git a/pkg/cluster/spec/spec_manager.go b/pkg/cluster/spec/spec_manager.go index 006ed4a60a..0816fe7b09 100644 --- a/pkg/cluster/spec/spec_manager.go +++ b/pkg/cluster/spec/spec_manager.go @@ -33,6 +33,8 @@ var ( ErrCreateDirFailed = errNS.NewType("create_dir_failed") // ErrSaveMetaFailed is ErrSaveMetaFailed ErrSaveMetaFailed = errNS.NewType("save_meta_failed") + // ErrSaveScaleOutFileFailed is ErrSaveMetaFailed + ErrSaveScaleOutFileFailed = errNS.NewType("save_scale-out_lock_failed") ) const ( @@ -42,6 +44,8 @@ const ( PatchDirName = "patch" // BackupDirName is the directory to save backup files. BackupDirName = "backup" + // ScaleOutLockName scale_out snapshot file, like file lock + ScaleOutLockName = ".scale-out.yaml" ) //revive:disable @@ -134,8 +138,8 @@ func (s *SpecManager) Metadata(clusterName string, meta interface{}) error { } // Exist check if the cluster exist by checking the meta file. -func (s *SpecManager) Exist(name string) (exist bool, err error) { - fname := s.Path(name, metaFileName) +func (s *SpecManager) Exist(clusterName string) (exist bool, err error) { + fname := s.Path(clusterName, metaFileName) _, err = os.Stat(fname) if err != nil { @@ -149,12 +153,12 @@ func (s *SpecManager) Exist(name string) (exist bool, err error) { } // Remove remove the data with specified cluster name. -func (s *SpecManager) Remove(name string) error { - return os.RemoveAll(s.Path(name)) +func (s *SpecManager) Remove(clusterName string) error { + return os.RemoveAll(s.Path(clusterName)) } // List return the cluster names. -func (s *SpecManager) List() (names []string, err error) { +func (s *SpecManager) List() (clusterNames []string, err error) { fileInfos, err := os.ReadDir(s.base) if err != nil { if os.IsNotExist(err) { @@ -167,7 +171,7 @@ func (s *SpecManager) List() (names []string, err error) { if utils.IsNotExist(s.Path(info.Name(), metaFileName)) { continue } - names = append(names, info.Name()) + clusterNames = append(clusterNames, info.Name()) } return @@ -202,3 +206,80 @@ func (s *SpecManager) ensureDir(clusterName string) error { } return nil } + +// ScaleOutLock tries to read the ScaleOutLock of a cluster from file +func (s *SpecManager) ScaleOutLock(clusterName string) (Topology, error) { + if locked, err := s.IsScaleOutLocked(clusterName); !locked { + return nil, ErrSaveScaleOutFileFailed.Wrap(err, "Scale-out file lock does not exist"). + WithProperty(tui.SuggestionFromString("Please make sure to run tiup-cluster scale-out --stage1 and try again.")) + } + + fname := s.Path(clusterName, ScaleOutLockName) + + // UnMarshal file lock + topo := &Specification{} + err := ParseTopologyYaml(fname, topo) + if err != nil { + return nil, err + } + return topo, nil +} + +// ScaleOutLockedErr: Determine whether there is a lock, and report an error if it exists +func (s *SpecManager) ScaleOutLockedErr(clusterName string) error { + if locked, err := s.IsScaleOutLocked(clusterName); locked { + return errNS.NewType("scale-out lock").Wrap(err, "Scale-out file lock already exists"). + WithProperty(tui.SuggestionFromString("Please run 'tiup-cluster scale-out --stage2' to continue.")) + } + return nil +} + +// IsScaleOutLocked: judge the cluster scale-out file lock status +func (s *SpecManager) IsScaleOutLocked(clusterName string) (locked bool, err error) { + fname := s.Path(clusterName, ScaleOutLockName) + + _, err = os.Stat(fname) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, perrs.AddStack(err) + } + + return true, nil +} + +// NewScaleOutLock save the meta with specified cluster name. +func (s *SpecManager) NewScaleOutLock(clusterName string, topo Topology) error { + wrapError := func(err error) *errorx.Error { + return ErrSaveScaleOutFileFailed.Wrap(err, "Failed to create scale-out file lock") + } + + if locked, err := s.IsScaleOutLocked(clusterName); locked { + return wrapError(err). + WithProperty(tui.SuggestionFromString("The scale out file lock already exists, please run tiup-cluster scale-out --stage2 to continue.")) + } + + lockFile := s.Path(clusterName, ScaleOutLockName) + + if err := s.ensureDir(clusterName); err != nil { + return wrapError(err) + } + + data, err := yaml.Marshal(topo) + if err != nil { + return wrapError(err) + } + + err = os.WriteFile(lockFile, data, 0644) + if err != nil { + return wrapError(err) + } + + return nil +} + +// ReleaseScaleOutLock remove the scale-out file lock with specified cluster +func (s *SpecManager) ReleaseScaleOutLock(clusterName string) error { + return os.Remove(s.Path(clusterName, ScaleOutLockName)) +}