Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: support evict leaders on stop and restore on start #1789

Merged
merged 2 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion components/cluster/command/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func newAuditCleanupCmd() *cobra.Command {
Use: "cleanup",
Short: "cleanup cluster audit logs",
RunE: func(cmd *cobra.Command, args []string) error {

if retainDays < 0 {
return errors.Errorf("retain-days cannot be less than 0")
}
Expand Down
10 changes: 8 additions & 2 deletions components/cluster/command/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
)

func newStartCmd() *cobra.Command {
var initPasswd bool
var (
initPasswd bool
restoreLeader bool
)

cmd := &cobra.Command{
Use: "start <cluster-name>",
Expand All @@ -48,7 +51,7 @@ func newStartCmd() *cobra.Command {
clusterReport.ID = scrubClusterName(clusterName)
teleCommand = append(teleCommand, scrubClusterName(clusterName))

if err := cm.StartCluster(clusterName, gOpt, func(b *task.Builder, metadata spec.Metadata) {
if err := cm.StartCluster(clusterName, gOpt, restoreLeader, func(b *task.Builder, metadata spec.Metadata) {
b.UpdateTopology(
clusterName,
tidbSpec.Path(clusterName),
Expand Down Expand Up @@ -80,9 +83,12 @@ func newStartCmd() *cobra.Command {
}

cmd.Flags().BoolVar(&initPasswd, "init", false, "Initialize a secure root password for the database")
cmd.Flags().BoolVar(&restoreLeader, "restore-leaders", false, "Allow leaders to be scheduled to stores after start")
cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Only start specified roles")
cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only start specified nodes")

_ = cmd.Flags().MarkHidden("restore-leaders")

return cmd
}

Expand Down
7 changes: 6 additions & 1 deletion components/cluster/command/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
)

func newStopCmd() *cobra.Command {
var evictLeader bool

cmd := &cobra.Command{
Use: "stop <cluster-name>",
Short: "Stop a TiDB cluster",
Expand All @@ -34,12 +36,15 @@ func newStopCmd() *cobra.Command {
clusterReport.ID = scrubClusterName(clusterName)
teleCommand = append(teleCommand, scrubClusterName(clusterName))

return cm.StopCluster(clusterName, gOpt, skipConfirm)
return cm.StopCluster(clusterName, gOpt, skipConfirm, evictLeader)
},
}

cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Only stop specified roles")
cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only stop specified nodes")
cmd.Flags().BoolVar(&evictLeader, "evict-leaders", false, "Evict leaders on stores before stop")

_ = cmd.Flags().MarkHidden("evict-leaders")

return cmd
}
1 change: 0 additions & 1 deletion components/dm/command/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func newAuditCleanupCmd() *cobra.Command {
Use: "cleanup",
Short: "cleanup dm audit logs",
RunE: func(cmd *cobra.Command, args []string) error {

if retainDays < 0 {
return errors.Errorf("retain-days cannot be less than 0")
}
Expand Down
10 changes: 9 additions & 1 deletion components/dm/command/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,15 @@ func ScaleInDMCluster(
continue
}

if err := operator.StopComponent(ctx, []dm.Instance{instance}, noAgentHosts, options.OptTimeout); err != nil {
if err := operator.StopComponent(
ctx,
topo,
[]dm.Instance{instance},
noAgentHosts,
options.OptTimeout,
false, /* evictLeader */
&tls.Config{}, /* not used as evictLeader is false */
); err != nil {
return errors.Annotatef(err, "failed to stop %s", component.Name())
}

Expand Down
2 changes: 1 addition & 1 deletion components/dm/command/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newStartCmd() *cobra.Command {

clusterName := args[0]

return cm.StartCluster(clusterName, gOpt)
return cm.StartCluster(clusterName, gOpt, false)
},
}

Expand Down
2 changes: 1 addition & 1 deletion components/dm/command/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newStopCmd() *cobra.Command {

clusterName := args[0]

return cm.StopCluster(clusterName, gOpt, skipConfirm)
return cm.StopCluster(clusterName, gOpt, skipConfirm, false)
},
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/cluster/audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func GetAuditList(dir string) ([]Item, error) {

// OutputAuditLog outputs audit log.
func OutputAuditLog(dir, fileSuffix string, data []byte) error {

auditID := base52.Encode(time.Now().UnixNano() + rand.Int63n(1000))
if customID := os.Getenv(EnvNameAuditID); customID != "" {
auditID = fmt.Sprintf("%s_%s", auditID, customID)
Expand Down Expand Up @@ -223,7 +222,6 @@ type deleteAuditLog struct {

// DeleteAuditLog cleanup audit log
func DeleteAuditLog(dir string, retainDays int, skipConfirm bool, displayMode string) error {

if retainDays < 0 {
return errors.Errorf("retainDays cannot be less than 0")
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/cluster/manager/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (m *Manager) EnableCluster(name string, gOpt operator.Options, isEnable boo
}

// StartCluster start the cluster with specified name.
func (m *Manager) StartCluster(name string, gOpt operator.Options, fn ...func(b *task.Builder, metadata spec.Metadata)) error {
func (m *Manager) StartCluster(name string, gOpt operator.Options, restoreLeader bool, fn ...func(b *task.Builder, metadata spec.Metadata)) error {
m.logger.Infof("Starting cluster %s...", name)

// check locked
Expand All @@ -116,7 +116,7 @@ func (m *Manager) StartCluster(name string, gOpt operator.Options, fn ...func(b
}

b.Func("StartCluster", func(ctx context.Context) error {
return operator.Start(ctx, topo, gOpt, tlsCfg)
return operator.Start(ctx, topo, gOpt, restoreLeader, tlsCfg)
})

for _, f := range fn {
Expand All @@ -143,7 +143,12 @@ func (m *Manager) StartCluster(name string, gOpt operator.Options, fn ...func(b
}

// StopCluster stop the cluster.
func (m *Manager) StopCluster(name string, gOpt operator.Options, skipConfirm bool) error {
func (m *Manager) StopCluster(
name string,
gOpt operator.Options,
skipConfirm,
evictLeader bool,
) error {
// check locked
if err := m.specManager.ScaleOutLockedErr(name); err != nil {
return err
Expand Down Expand Up @@ -181,7 +186,7 @@ func (m *Manager) StopCluster(name string, gOpt operator.Options, skipConfirm bo

t := b.
Func("StopCluster", func(ctx context.Context) error {
return operator.Stop(ctx, topo, gOpt, tlsCfg)
return operator.Stop(ctx, topo, gOpt, evictLeader, tlsCfg)
}).
Build()

Expand Down
10 changes: 9 additions & 1 deletion pkg/cluster/manager/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,15 @@ func buildScaleOutTask(
})
} else {
builder.Func("Start new instances", func(ctx context.Context) error {
return operator.Start(ctx, newPart, operator.Options{OptTimeout: gOpt.OptTimeout, Operation: operator.ScaleOutOperation}, tlsCfg)
return operator.Start(ctx,
newPart,
operator.Options{
OptTimeout: gOpt.OptTimeout,
Operation: operator.ScaleOutOperation,
},
false, /* restoreLeader */
tlsCfg,
)
}).
ParallelStep("+ Refresh components conifgs", gOpt.Force, refreshConfigTasks...).
ParallelStep("+ Reload prometheus and grafana", gOpt.Force,
Expand Down
8 changes: 7 additions & 1 deletion pkg/cluster/manager/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ func (m *Manager) CleanCluster(name string, gOpt operator.Options, cleanOpt oper
}
t := b.
Func("StopCluster", func(ctx context.Context) error {
return operator.Stop(ctx, topo, operator.Options{}, tlsCfg)
return operator.Stop(
ctx,
topo,
operator.Options{},
false, /* eviceLeader */
tlsCfg,
)
}).
Func("CleanupCluster", func(ctx context.Context) error {
return operator.CleanupComponent(ctx, delFileMap)
Expand Down
8 changes: 7 additions & 1 deletion pkg/cluster/manager/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ func (m *Manager) DestroyCluster(name string, gOpt operator.Options, destroyOpt
}
t := b.
Func("StopCluster", func(ctx context.Context) error {
return operator.Stop(ctx, topo, operator.Options{Force: destroyOpt.Force}, tlsCfg)
return operator.Stop(
ctx,
topo,
operator.Options{Force: destroyOpt.Force},
false, /* eviceLeader */
tlsCfg,
)
}).
Func("DestroyCluster", func(ctx context.Context) error {
return operator.Destroy(ctx, topo, destroyOpt)
Expand Down
54 changes: 50 additions & 4 deletions pkg/cluster/operation/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func Start(
ctx context.Context,
cluster spec.Topology,
options Options,
restoreLeader bool,
tlsCfg *tls.Config,
) error {
uniqueHosts := set.NewStringSet()
Expand All @@ -127,10 +128,30 @@ func Start(
if err != nil {
return errors.Annotatef(err, "failed to start %s", comp.Name())
}

errg, _ := errgroup.WithContext(ctx)
for _, inst := range insts {
if !inst.IgnoreMonitorAgent() {
uniqueHosts.Insert(inst.GetHost())
}

if restoreLeader {
rIns, ok := inst.(spec.RollingUpdateInstance)
if ok {
// checkpoint must be in a new context
nctx := checkpoint.NewContext(ctx)
errg.Go(func() error {
err := rIns.PostRestart(nctx, cluster, tlsCfg)
if err != nil && !options.Force {
return err
}
return nil
})
}
}
}
if err := errg.Wait(); err != nil {
return err
}
}

Expand All @@ -150,6 +171,7 @@ func Stop(
ctx context.Context,
cluster spec.Topology,
options Options,
evictLeader bool,
tlsCfg *tls.Config,
) error {
roleFilter := set.NewStringSet(options.Roles...)
Expand All @@ -170,7 +192,15 @@ func Stop(

for _, comp := range components {
insts := FilterInstance(comp.Instances(), nodeFilter)
err := StopComponent(ctx, insts, noAgentHosts, options.OptTimeout)
err := StopComponent(
ctx,
cluster,
insts,
noAgentHosts,
options.OptTimeout,
evictLeader,
tlsCfg,
)
if err != nil && !options.Force {
return errors.Annotatef(err, "failed to stop %s", comp.Name())
}
Expand Down Expand Up @@ -232,12 +262,12 @@ func Restart(
options Options,
tlsCfg *tls.Config,
) error {
err := Stop(ctx, cluster, options, tlsCfg)
err := Stop(ctx, cluster, options, false, tlsCfg)
if err != nil {
return errors.Annotatef(err, "failed to stop")
}

err = Start(ctx, cluster, options, tlsCfg)
err = Start(ctx, cluster, options, false, tlsCfg)
if err != nil {
return errors.Annotatef(err, "failed to start")
}
Expand Down Expand Up @@ -533,7 +563,14 @@ func stopInstance(ctx context.Context, ins spec.Instance, timeout uint64) error
}

// StopComponent stop the instances.
func StopComponent(ctx context.Context, instances []spec.Instance, noAgentHosts set.StringSet, timeout uint64) error {
func StopComponent(ctx context.Context,
topo spec.Topology,
instances []spec.Instance,
noAgentHosts set.StringSet,
timeout uint64,
evictLeader bool,
tlsCfg *tls.Config,
) error {
if len(instances) == 0 {
return nil
}
Expand All @@ -560,6 +597,15 @@ func StopComponent(ctx context.Context, instances []spec.Instance, noAgentHosts
// of checkpoint context every time put it into a new goroutine.
nctx := checkpoint.NewContext(ctx)
errg.Go(func() error {
if evictLeader {
rIns, ok := ins.(spec.RollingUpdateInstance)
if ok {
err := rIns.PreRestart(nctx, topo, int(timeout), tlsCfg)
if err != nil {
return err
}
}
}
err := stopInstance(nctx, ins, timeout)
if err != nil {
return err
Expand Down
10 changes: 9 additions & 1 deletion pkg/cluster/operation/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,15 @@ func StopAndDestroyInstance(ctx context.Context, cluster spec.Topology, instance
})

// just try to stop and destroy
if err := StopComponent(ctx, []spec.Instance{instance}, noAgentHosts, options.OptTimeout); err != nil {
if err := StopComponent(
ctx,
cluster,
[]spec.Instance{instance},
noAgentHosts,
options.OptTimeout,
false, /* evictLeader */
&tls.Config{}, /* not used as evictLeader is false */
); err != nil {
if !ignoreErr {
return errors.Annotatef(err, "failed to stop %s", compName)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ type Operation byte

// Operation represents the kind of cluster operation
const (
StartOperation Operation = iota
StopOperation
RestartOperation
// StartOperation Operation = iota
// StopOperation
RestartOperation Operation = iota
DestroyOperation
UpgradeOperation
ScaleInOperation
Expand Down
6 changes: 4 additions & 2 deletions pkg/cluster/task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ func (c *ClusterOperate) Execute(ctx context.Context) error {
"failed to destroy tombstone",
}
switch c.op {
/* deprecated
case operator.StartOperation:
err = operator.Start(ctx, c.spec, c.options, c.tlsCfg)
err = operator.Start(ctx, c.spec, c.options, false, c.tlsCfg)
case operator.StopOperation:
err = operator.Stop(ctx, c.spec, c.options, c.tlsCfg)
err = operator.Stop(ctx, c.spec, c.options, false, c.tlsCfg)
*/
case operator.RestartOperation:
err = operator.Restart(ctx, c.spec, c.options, c.tlsCfg)
case operator.DestroyOperation:
Expand Down