diff --git a/pkg/cluster/operation/action.go b/pkg/cluster/operation/action.go index 2ddd3f6a80..5f4a6579a8 100644 --- a/pkg/cluster/operation/action.go +++ b/pkg/cluster/operation/action.go @@ -310,7 +310,7 @@ func systemctlMonitor(ctx context.Context, hosts []string, noAgentHosts set.Stri return nil } -func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) error { +func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64, tlsCfg *tls.Config) error { e := ctxt.GetInner(ctx).Get(ins.GetHost()) log.Infof("\tRestarting instance %s", ins.ID()) @@ -319,7 +319,7 @@ func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) err } // Check ready. - if err := ins.Ready(ctx, e, timeout); err != nil { + if err := ins.Ready(ctx, e, timeout, tlsCfg); err != nil { return toFailedActionError(err, "restart", ins.GetHost(), ins.ServiceName(), ins.LogDir()) } @@ -328,25 +328,6 @@ func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) err return nil } -// RestartComponent restarts the component. -func RestartComponent(ctx context.Context, instances []spec.Instance, timeout uint64) error { - if len(instances) == 0 { - return nil - } - - name := instances[0].ComponentName() - log.Infof("Restarting component %s", name) - - for _, ins := range instances { - err := restartInstance(ctx, ins, timeout) - if err != nil { - return err - } - } - - return nil -} - func enableInstance(ctx context.Context, ins spec.Instance, timeout uint64, isEnable bool) error { e := ctxt.GetInner(ctx).Get(ins.GetHost()) @@ -366,7 +347,7 @@ func enableInstance(ctx context.Context, ins spec.Instance, timeout uint64, isEn return nil } -func startInstance(ctx context.Context, ins spec.Instance, timeout uint64) error { +func startInstance(ctx context.Context, ins spec.Instance, timeout uint64, tlsCfg *tls.Config) error { e := ctxt.GetInner(ctx).Get(ins.GetHost()) log.Infof("\tStarting instance %s", ins.ID()) @@ -375,7 +356,7 @@ func startInstance(ctx context.Context, ins spec.Instance, timeout uint64) error } // Check ready. - if err := ins.Ready(ctx, e, timeout); err != nil { + if err := ins.Ready(ctx, e, timeout, tlsCfg); err != nil { return toFailedActionError(err, "start", ins.GetHost(), ins.ServiceName(), ins.LogDir()) } @@ -471,8 +452,13 @@ func StartComponent(ctx context.Context, instances []spec.Instance, noAgentHosts // eg: PD has more strict restrictions on the capacity expansion process, // that is, there should be only one node in the peer-join stage at most // ref https://github.com/tikv/pd/blob/d38b36714ccee70480c39e07126e3456b5fb292d/server/join/join.go#L179-L191 - if options.Operation == ScaleOutOperation && (name == spec.ComponentPD || name == spec.ComponentDMMaster) { - return serialStartInstances(ctx, instances, options, tlsCfg) + if options.Operation == ScaleOutOperation { + switch name { + case spec.ComponentPD, + spec.ComponentTiFlash, + spec.ComponentDMMaster: + return serialStartInstances(ctx, instances, options, tlsCfg) + } } errg, _ := errgroup.WithContext(ctx) @@ -496,7 +482,7 @@ func StartComponent(ctx context.Context, instances []spec.Instance, noAgentHosts if err := ins.PrepareStart(nctx, tlsCfg); err != nil { return err } - return startInstance(nctx, ins, options.OptTimeout) + return startInstance(nctx, ins, options.OptTimeout, tlsCfg) }) } @@ -508,7 +494,7 @@ func serialStartInstances(ctx context.Context, instances []spec.Instance, option if err := ins.PrepareStart(ctx, tlsCfg); err != nil { return err } - if err := startInstance(ctx, ins, options.OptTimeout); err != nil { + if err := startInstance(ctx, ins, options.OptTimeout, tlsCfg); err != nil { return err } } diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index 1cd89bfc9f..905568f883 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -147,7 +147,7 @@ func upgradeInstance(ctx context.Context, topo spec.Topology, instance spec.Inst } } - if err := restartInstance(ctx, instance, options.OptTimeout); err != nil && !options.Force { + if err := restartInstance(ctx, instance, options.OptTimeout, tlsCfg); err != nil && !options.Force { return err } diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index 5e77b41914..80e740c94e 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -81,7 +81,7 @@ type RollingUpdateInstance interface { type Instance interface { InstanceSpec ID() string - Ready(context.Context, ctxt.Executor, uint64) error + Ready(context.Context, ctxt.Executor, uint64, *tls.Config) error InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error PrepareStart(ctx context.Context, tlsCfg *tls.Config) error @@ -143,7 +143,7 @@ type BaseInstance struct { } // Ready implements Instance interface -func (i *BaseInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64) error { +func (i *BaseInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64, _ *tls.Config) error { return PortStarted(ctx, e, i.Port, timeout) } diff --git a/pkg/cluster/spec/tiflash.go b/pkg/cluster/spec/tiflash.go index 8e406cc283..087d22617b 100644 --- a/pkg/cluster/spec/tiflash.go +++ b/pkg/cluster/spec/tiflash.go @@ -19,12 +19,15 @@ import ( "crypto/tls" "encoding/json" "fmt" + "io" + "net/http" "os" "path/filepath" "sort" "strings" "time" + "github.com/pingcap/errors" perrs "github.com/pingcap/errors" "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/ctxt" @@ -250,6 +253,11 @@ func (i *TiFlashInstance) GetServicePort() int { return i.InstanceSpec.(*TiFlashSpec).FlashServicePort } +// GetStatusPort returns the status port of TiFlash +func (i *TiFlashInstance) GetStatusPort() int { + return i.InstanceSpec.(*TiFlashSpec).FlashProxyStatusPort +} + // checkIncorrectKey checks TiFlash's key should not be set in config func (i *TiFlashInstance) checkIncorrectKey(key string) error { errMsg := "NOTE: TiFlash `%s` should NOT be set in topo's \"%s\" config, its value will be ignored, you should set `data_dir` in each host instead, please check your topology" @@ -730,3 +738,54 @@ func (i *TiFlashInstance) PrepareStart(ctx context.Context, tlsCfg *tls.Config) pdClient := api.NewPDClient(endpoints, 10*time.Second, tlsCfg) return pdClient.UpdateReplicateConfig(bytes.NewBuffer(enablePlacementRules)) } + +// Ready implements Instance interface +func (i *TiFlashInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64, tlsCfg *tls.Config) error { + // FIXME: the timeout is applied twice in the whole `Ready()` process, in the worst + // case it might wait double time as other components + if err := PortStarted(ctx, e, i.Port, timeout); err != nil { + return err + } + + scheme := "http" + if i.topo.BaseTopo().GlobalOptions.TLSEnabled { + scheme = "https" + } + addr := fmt.Sprintf("%s://%s:%d/tiflash/store-status", scheme, i.Host, i.GetStatusPort()) + req, err := http.NewRequest("GET", addr, nil) + if err != nil { + return err + } + req = req.WithContext(ctx) + + retryOpt := utils.RetryOption{ + Delay: time.Second, + Timeout: time.Second * time.Duration(timeout), + } + var queryErr error + if err := utils.Retry(func() error { + client := utils.NewHTTPClient(statusQueryTimeout, tlsCfg) + res, err := client.Client().Do(req) + if err != nil { + queryErr = err + return err + } + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + if err != nil { + queryErr = err + return err + } + if res.StatusCode == http.StatusNotFound || string(body) == "Running" { + return nil + } + + err = fmt.Errorf("tiflash store status '%s' not ready", string(body)) + queryErr = err + return err + }, retryOpt); err != nil { + return errors.Annotatef(queryErr, "timed out waiting for tiflash %s:%d to be ready after %ds", + i.Host, i.Port, timeout) + } + return nil +}