Skip to content

Commit

Permalink
cluster: support evict leaders on stop and restore on start (#1789)
Browse files Browse the repository at this point in the history
  • Loading branch information
AstroProfundis authored Mar 14, 2022
1 parent 5b18753 commit b736fb4
Show file tree
Hide file tree
Showing 16 changed files with 123 additions and 27 deletions.
1 change: 0 additions & 1 deletion components/cluster/command/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func newAuditCleanupCmd() *cobra.Command {
Use: "cleanup",
Short: "cleanup cluster audit logs",
RunE: func(cmd *cobra.Command, args []string) error {

if retainDays < 0 {
return errors.Errorf("retain-days cannot be less than 0")
}
Expand Down
10 changes: 8 additions & 2 deletions components/cluster/command/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
)

func newStartCmd() *cobra.Command {
var initPasswd bool
var (
initPasswd bool
restoreLeader bool
)

cmd := &cobra.Command{
Use: "start <cluster-name>",
Expand All @@ -48,7 +51,7 @@ func newStartCmd() *cobra.Command {
clusterReport.ID = scrubClusterName(clusterName)
teleCommand = append(teleCommand, scrubClusterName(clusterName))

if err := cm.StartCluster(clusterName, gOpt, func(b *task.Builder, metadata spec.Metadata) {
if err := cm.StartCluster(clusterName, gOpt, restoreLeader, func(b *task.Builder, metadata spec.Metadata) {
b.UpdateTopology(
clusterName,
tidbSpec.Path(clusterName),
Expand Down Expand Up @@ -80,9 +83,12 @@ func newStartCmd() *cobra.Command {
}

cmd.Flags().BoolVar(&initPasswd, "init", false, "Initialize a secure root password for the database")
cmd.Flags().BoolVar(&restoreLeader, "restore-leaders", false, "Allow leaders to be scheduled to stores after start")
cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Only start specified roles")
cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only start specified nodes")

_ = cmd.Flags().MarkHidden("restore-leaders")

return cmd
}

Expand Down
7 changes: 6 additions & 1 deletion components/cluster/command/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
)

func newStopCmd() *cobra.Command {
var evictLeader bool

cmd := &cobra.Command{
Use: "stop <cluster-name>",
Short: "Stop a TiDB cluster",
Expand All @@ -34,12 +36,15 @@ func newStopCmd() *cobra.Command {
clusterReport.ID = scrubClusterName(clusterName)
teleCommand = append(teleCommand, scrubClusterName(clusterName))

return cm.StopCluster(clusterName, gOpt, skipConfirm)
return cm.StopCluster(clusterName, gOpt, skipConfirm, evictLeader)
},
}

cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Only stop specified roles")
cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only stop specified nodes")
cmd.Flags().BoolVar(&evictLeader, "evict-leaders", false, "Evict leaders on stores before stop")

_ = cmd.Flags().MarkHidden("evict-leaders")

return cmd
}
1 change: 0 additions & 1 deletion components/dm/command/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func newAuditCleanupCmd() *cobra.Command {
Use: "cleanup",
Short: "cleanup dm audit logs",
RunE: func(cmd *cobra.Command, args []string) error {

if retainDays < 0 {
return errors.Errorf("retain-days cannot be less than 0")
}
Expand Down
10 changes: 9 additions & 1 deletion components/dm/command/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,15 @@ func ScaleInDMCluster(
continue
}

if err := operator.StopComponent(ctx, []dm.Instance{instance}, noAgentHosts, options.OptTimeout); err != nil {
if err := operator.StopComponent(
ctx,
topo,
[]dm.Instance{instance},
noAgentHosts,
options.OptTimeout,
false, /* evictLeader */
&tls.Config{}, /* not used as evictLeader is false */
); err != nil {
return errors.Annotatef(err, "failed to stop %s", component.Name())
}

Expand Down
2 changes: 1 addition & 1 deletion components/dm/command/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newStartCmd() *cobra.Command {

clusterName := args[0]

return cm.StartCluster(clusterName, gOpt)
return cm.StartCluster(clusterName, gOpt, false)
},
}

Expand Down
2 changes: 1 addition & 1 deletion components/dm/command/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newStopCmd() *cobra.Command {

clusterName := args[0]

return cm.StopCluster(clusterName, gOpt, skipConfirm)
return cm.StopCluster(clusterName, gOpt, skipConfirm, false)
},
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/cluster/audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func GetAuditList(dir string) ([]Item, error) {

// OutputAuditLog outputs audit log.
func OutputAuditLog(dir, fileSuffix string, data []byte) error {

auditID := base52.Encode(time.Now().UnixNano() + rand.Int63n(1000))
if customID := os.Getenv(EnvNameAuditID); customID != "" {
auditID = fmt.Sprintf("%s_%s", auditID, customID)
Expand Down Expand Up @@ -223,7 +222,6 @@ type deleteAuditLog struct {

// DeleteAuditLog cleanup audit log
func DeleteAuditLog(dir string, retainDays int, skipConfirm bool, displayMode string) error {

if retainDays < 0 {
return errors.Errorf("retainDays cannot be less than 0")
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/cluster/manager/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (m *Manager) EnableCluster(name string, gOpt operator.Options, isEnable boo
}

// StartCluster start the cluster with specified name.
func (m *Manager) StartCluster(name string, gOpt operator.Options, fn ...func(b *task.Builder, metadata spec.Metadata)) error {
func (m *Manager) StartCluster(name string, gOpt operator.Options, restoreLeader bool, fn ...func(b *task.Builder, metadata spec.Metadata)) error {
m.logger.Infof("Starting cluster %s...", name)

// check locked
Expand All @@ -116,7 +116,7 @@ func (m *Manager) StartCluster(name string, gOpt operator.Options, fn ...func(b
}

b.Func("StartCluster", func(ctx context.Context) error {
return operator.Start(ctx, topo, gOpt, tlsCfg)
return operator.Start(ctx, topo, gOpt, restoreLeader, tlsCfg)
})

for _, f := range fn {
Expand All @@ -143,7 +143,12 @@ func (m *Manager) StartCluster(name string, gOpt operator.Options, fn ...func(b
}

// StopCluster stop the cluster.
func (m *Manager) StopCluster(name string, gOpt operator.Options, skipConfirm bool) error {
func (m *Manager) StopCluster(
name string,
gOpt operator.Options,
skipConfirm,
evictLeader bool,
) error {
// check locked
if err := m.specManager.ScaleOutLockedErr(name); err != nil {
return err
Expand Down Expand Up @@ -181,7 +186,7 @@ func (m *Manager) StopCluster(name string, gOpt operator.Options, skipConfirm bo

t := b.
Func("StopCluster", func(ctx context.Context) error {
return operator.Stop(ctx, topo, gOpt, tlsCfg)
return operator.Stop(ctx, topo, gOpt, evictLeader, tlsCfg)
}).
Build()

Expand Down
10 changes: 9 additions & 1 deletion pkg/cluster/manager/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,15 @@ func buildScaleOutTask(
})
} else {
builder.Func("Start new instances", func(ctx context.Context) error {
return operator.Start(ctx, newPart, operator.Options{OptTimeout: gOpt.OptTimeout, Operation: operator.ScaleOutOperation}, tlsCfg)
return operator.Start(ctx,
newPart,
operator.Options{
OptTimeout: gOpt.OptTimeout,
Operation: operator.ScaleOutOperation,
},
false, /* restoreLeader */
tlsCfg,
)
}).
ParallelStep("+ Refresh components conifgs", gOpt.Force, refreshConfigTasks...).
ParallelStep("+ Reload prometheus and grafana", gOpt.Force,
Expand Down
8 changes: 7 additions & 1 deletion pkg/cluster/manager/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ func (m *Manager) CleanCluster(name string, gOpt operator.Options, cleanOpt oper
}
t := b.
Func("StopCluster", func(ctx context.Context) error {
return operator.Stop(ctx, topo, operator.Options{}, tlsCfg)
return operator.Stop(
ctx,
topo,
operator.Options{},
false, /* eviceLeader */
tlsCfg,
)
}).
Func("CleanupCluster", func(ctx context.Context) error {
return operator.CleanupComponent(ctx, delFileMap)
Expand Down
8 changes: 7 additions & 1 deletion pkg/cluster/manager/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ func (m *Manager) DestroyCluster(name string, gOpt operator.Options, destroyOpt
}
t := b.
Func("StopCluster", func(ctx context.Context) error {
return operator.Stop(ctx, topo, operator.Options{Force: destroyOpt.Force}, tlsCfg)
return operator.Stop(
ctx,
topo,
operator.Options{Force: destroyOpt.Force},
false, /* eviceLeader */
tlsCfg,
)
}).
Func("DestroyCluster", func(ctx context.Context) error {
return operator.Destroy(ctx, topo, destroyOpt)
Expand Down
54 changes: 50 additions & 4 deletions pkg/cluster/operation/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func Start(
ctx context.Context,
cluster spec.Topology,
options Options,
restoreLeader bool,
tlsCfg *tls.Config,
) error {
uniqueHosts := set.NewStringSet()
Expand All @@ -127,10 +128,30 @@ func Start(
if err != nil {
return errors.Annotatef(err, "failed to start %s", comp.Name())
}

errg, _ := errgroup.WithContext(ctx)
for _, inst := range insts {
if !inst.IgnoreMonitorAgent() {
uniqueHosts.Insert(inst.GetHost())
}

if restoreLeader {
rIns, ok := inst.(spec.RollingUpdateInstance)
if ok {
// checkpoint must be in a new context
nctx := checkpoint.NewContext(ctx)
errg.Go(func() error {
err := rIns.PostRestart(nctx, cluster, tlsCfg)
if err != nil && !options.Force {
return err
}
return nil
})
}
}
}
if err := errg.Wait(); err != nil {
return err
}
}

Expand All @@ -150,6 +171,7 @@ func Stop(
ctx context.Context,
cluster spec.Topology,
options Options,
evictLeader bool,
tlsCfg *tls.Config,
) error {
roleFilter := set.NewStringSet(options.Roles...)
Expand All @@ -170,7 +192,15 @@ func Stop(

for _, comp := range components {
insts := FilterInstance(comp.Instances(), nodeFilter)
err := StopComponent(ctx, insts, noAgentHosts, options.OptTimeout)
err := StopComponent(
ctx,
cluster,
insts,
noAgentHosts,
options.OptTimeout,
evictLeader,
tlsCfg,
)
if err != nil && !options.Force {
return errors.Annotatef(err, "failed to stop %s", comp.Name())
}
Expand Down Expand Up @@ -232,12 +262,12 @@ func Restart(
options Options,
tlsCfg *tls.Config,
) error {
err := Stop(ctx, cluster, options, tlsCfg)
err := Stop(ctx, cluster, options, false, tlsCfg)
if err != nil {
return errors.Annotatef(err, "failed to stop")
}

err = Start(ctx, cluster, options, tlsCfg)
err = Start(ctx, cluster, options, false, tlsCfg)
if err != nil {
return errors.Annotatef(err, "failed to start")
}
Expand Down Expand Up @@ -533,7 +563,14 @@ func stopInstance(ctx context.Context, ins spec.Instance, timeout uint64) error
}

// StopComponent stop the instances.
func StopComponent(ctx context.Context, instances []spec.Instance, noAgentHosts set.StringSet, timeout uint64) error {
func StopComponent(ctx context.Context,
topo spec.Topology,
instances []spec.Instance,
noAgentHosts set.StringSet,
timeout uint64,
evictLeader bool,
tlsCfg *tls.Config,
) error {
if len(instances) == 0 {
return nil
}
Expand All @@ -560,6 +597,15 @@ func StopComponent(ctx context.Context, instances []spec.Instance, noAgentHosts
// of checkpoint context every time put it into a new goroutine.
nctx := checkpoint.NewContext(ctx)
errg.Go(func() error {
if evictLeader {
rIns, ok := ins.(spec.RollingUpdateInstance)
if ok {
err := rIns.PreRestart(nctx, topo, int(timeout), tlsCfg)
if err != nil {
return err
}
}
}
err := stopInstance(nctx, ins, timeout)
if err != nil {
return err
Expand Down
10 changes: 9 additions & 1 deletion pkg/cluster/operation/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,15 @@ func StopAndDestroyInstance(ctx context.Context, cluster spec.Topology, instance
})

// just try to stop and destroy
if err := StopComponent(ctx, []spec.Instance{instance}, noAgentHosts, options.OptTimeout); err != nil {
if err := StopComponent(
ctx,
cluster,
[]spec.Instance{instance},
noAgentHosts,
options.OptTimeout,
false, /* evictLeader */
&tls.Config{}, /* not used as evictLeader is false */
); err != nil {
if !ignoreErr {
return errors.Annotatef(err, "failed to stop %s", compName)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ type Operation byte

// Operation represents the kind of cluster operation
const (
StartOperation Operation = iota
StopOperation
RestartOperation
// StartOperation Operation = iota
// StopOperation
RestartOperation Operation = iota
DestroyOperation
UpgradeOperation
ScaleInOperation
Expand Down
6 changes: 4 additions & 2 deletions pkg/cluster/task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ func (c *ClusterOperate) Execute(ctx context.Context) error {
"failed to destroy tombstone",
}
switch c.op {
/* deprecated
case operator.StartOperation:
err = operator.Start(ctx, c.spec, c.options, c.tlsCfg)
err = operator.Start(ctx, c.spec, c.options, false, c.tlsCfg)
case operator.StopOperation:
err = operator.Stop(ctx, c.spec, c.options, c.tlsCfg)
err = operator.Stop(ctx, c.spec, c.options, false, c.tlsCfg)
*/
case operator.RestartOperation:
err = operator.Restart(ctx, c.spec, c.options, c.tlsCfg)
case operator.DestroyOperation:
Expand Down

0 comments on commit b736fb4

Please sign in to comment.