Skip to content

Commit

Permalink
cluster: display support set status-timeout (#1866)
Browse files Browse the repository at this point in the history
  • Loading branch information
srstack authored May 9, 2022
1 parent 3b24388 commit 12b9e99
Show file tree
Hide file tree
Showing 23 changed files with 137 additions and 82 deletions.
6 changes: 5 additions & 1 deletion components/cluster/command/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package command
import (
"errors"
"fmt"
"time"

perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cluster/spec"
Expand All @@ -29,6 +30,7 @@ func newDisplayCmd() *cobra.Command {
showDashboardOnly bool
showVersionOnly bool
showTiKVLabels bool
statusTimeout uint64
)
cmd := &cobra.Command{
Use: "display <cluster-name>",
Expand All @@ -38,6 +40,7 @@ func newDisplayCmd() *cobra.Command {
return cmd.Help()
}

gOpt.APITimeout = statusTimeout
clusterName = args[0]
clusterReport.ID = scrubClusterName(clusterName)
teleCommand = append(teleCommand, scrubClusterName(clusterName))
Expand Down Expand Up @@ -67,7 +70,7 @@ func newDisplayCmd() *cobra.Command {
if err != nil {
return err
}
return cm.DisplayDashboardInfo(clusterName, tlsCfg)
return cm.DisplayDashboardInfo(clusterName, time.Second*time.Duration(gOpt.APITimeout), tlsCfg)
}
if showTiKVLabels {
return cm.DisplayTiKVLabels(clusterName, gOpt)
Expand All @@ -82,6 +85,7 @@ func newDisplayCmd() *cobra.Command {
cmd.Flags().BoolVar(&showDashboardOnly, "dashboard", false, "Only display TiDB Dashboard information")
cmd.Flags().BoolVar(&showVersionOnly, "version", false, "Only display TiDB cluster version")
cmd.Flags().BoolVar(&showTiKVLabels, "labels", false, "Only display labels of specified TiKV role or nodes")
cmd.Flags().Uint64Var(&statusTimeout, "status-timeout", 10, "Timeout in seconds when getting node status")

return cmd
}
3 changes: 3 additions & 0 deletions components/dm/command/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func newDisplayCmd() *cobra.Command {
var (
clusterName string
showVersionOnly bool
statusTimeout uint64
)
cmd := &cobra.Command{
Use: "display <cluster-name>",
Expand All @@ -36,6 +37,7 @@ func newDisplayCmd() *cobra.Command {
return cmd.Help()
}

gOpt.APITimeout = statusTimeout
clusterName = args[0]

if showVersionOnly {
Expand All @@ -55,6 +57,7 @@ func newDisplayCmd() *cobra.Command {
cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only display specified nodes")
cmd.Flags().BoolVar(&showVersionOnly, "version", false, "Only display DM cluster version")
cmd.Flags().BoolVar(&gOpt.ShowUptime, "uptime", false, "Display DM with uptime")
cmd.Flags().Uint64Var(&statusTimeout, "status-timeout", 10, "Timeout in seconds when getting node status")

return cmd
}
8 changes: 4 additions & 4 deletions components/dm/spec/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (c *DMMasterComponent) Instances() []Instance {
s.DataDir,
},
StatusFn: s.Status,
UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration {
return spec.UptimeByHost(s.Host, s.Port, tlsCfg)
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return spec.UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
},
},
topo: c.Topology,
Expand Down Expand Up @@ -268,8 +268,8 @@ func (c *DMWorkerComponent) Instances() []Instance {
s.DataDir,
},
StatusFn: s.Status,
UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration {
return spec.UptimeByHost(s.Host, s.Port, tlsCfg)
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return spec.UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
},
},
topo: c.Topology,
Expand Down
16 changes: 12 additions & 4 deletions components/dm/spec/topology_dm.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,13 @@ type MasterSpec struct {
}

// Status queries current status of the instance
func (s *MasterSpec) Status(_ context.Context, tlsCfg *tls.Config, _ ...string) string {
func (s *MasterSpec) Status(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string {
if timeout < time.Second {
timeout = statusQueryTimeout
}

addr := fmt.Sprintf("%s:%d", s.Host, s.Port)
dc := api.NewDMMasterClient([]string{addr}, statusQueryTimeout, tlsCfg)
dc := api.NewDMMasterClient([]string{addr}, timeout, tlsCfg)
isFound, isActive, isLeader, err := dc.GetMaster(s.Name)
if err != nil {
return "Down"
Expand Down Expand Up @@ -207,11 +211,15 @@ type WorkerSpec struct {
}

// Status queries current status of the instance
func (s *WorkerSpec) Status(_ context.Context, tlsCfg *tls.Config, masterList ...string) string {
func (s *WorkerSpec) Status(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, masterList ...string) string {
if len(masterList) < 1 {
return "N/A"
}
dc := api.NewDMMasterClient(masterList, statusQueryTimeout, tlsCfg)

if timeout < time.Second {
timeout = statusQueryTimeout
}
dc := api.NewDMMasterClient(masterList, timeout, tlsCfg)
stage, err := dc.GetWorker(s.Name)
if err != nil {
return "Down"
Expand Down
2 changes: 1 addition & 1 deletion components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (p *Playground) binlogClient() (*api.BinlogClient, error) {
addrs = append(addrs, inst.Addr())
}

return api.NewBinlogClient(addrs, nil)
return api.NewBinlogClient(addrs, 5*time.Second, nil)
}

func (p *Playground) pdClient() *api.PDClient {
Expand Down
16 changes: 13 additions & 3 deletions pkg/cluster/api/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ type BinlogClient struct {
}

// NewBinlogClient create a BinlogClient.
func NewBinlogClient(pdEndpoints []string, tlsConfig *tls.Config) (*BinlogClient, error) {
func NewBinlogClient(pdEndpoints []string, timeout time.Duration, tlsConfig *tls.Config) (*BinlogClient, error) {
if timeout < time.Second {
timeout = time.Second * 5
}

etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: pdEndpoints,
DialTimeout: time.Second * 5,
DialTimeout: timeout,
TLS: tlsConfig,
})
if err != nil {
Expand All @@ -47,7 +51,7 @@ func NewBinlogClient(pdEndpoints []string, tlsConfig *tls.Config) (*BinlogClient

return &BinlogClient{
tls: tlsConfig,
httpClient: utils.NewHTTPClient(5*time.Second, tlsConfig).Client(),
httpClient: utils.NewHTTPClient(timeout, tlsConfig).Client(),
etcdClient: etcdClient,
}, nil
}
Expand Down Expand Up @@ -165,6 +169,9 @@ func (c *BinlogClient) UpdatePumpState(ctx context.Context, addr string, state s
func (c *BinlogClient) updateStatus(ctx context.Context, ty string, nodeID string, state string) error {
key := fmt.Sprintf("/tidb-binlog/v1/%s/%s", ty, nodeID)

// set timeout, otherwise it will keep retrying
ctx, f := context.WithTimeout(ctx, c.httpClient.Timeout)
defer f()
resp, err := c.etcdClient.KV.Get(ctx, key)
if err != nil {
return errors.AddStack(err)
Expand Down Expand Up @@ -202,6 +209,9 @@ func (c *BinlogClient) updateStatus(ctx context.Context, ty string, nodeID strin
func (c *BinlogClient) nodeStatus(ctx context.Context, ty string) (status []*NodeStatus, err error) {
key := fmt.Sprintf("/tidb-binlog/v1/%s", ty)

// set timeout, otherwise it will keep retrying
ctx, f := context.WithTimeout(ctx, c.httpClient.Timeout)
defer f()
resp, err := c.etcdClient.KV.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
return nil, errors.AddStack(err)
Expand Down
22 changes: 14 additions & 8 deletions pkg/cluster/manager/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func (m *Manager) Display(name string, opt operator.Options) error {
topo := metadata.GetTopology()
base := metadata.GetBaseMeta()
cyan := color.New(color.FgCyan, color.Bold)

statusTimeout := time.Duration(opt.APITimeout) * time.Second
// display cluster meta
var j *JSONOutput
if m.logger.GetDisplayMode() == logprinter.DisplayModeJSON {
Expand Down Expand Up @@ -202,7 +204,7 @@ func (m *Manager) Display(name string, opt operator.Options) error {
)
if t, ok := topo.(*spec.Specification); ok {
var err error
dashboardAddr, err = t.GetDashboardAddress(ctx, tlsCfg, masterActive...)
dashboardAddr, err = t.GetDashboardAddress(ctx, tlsCfg, statusTimeout, masterActive...)
if err == nil && !set.NewStringSet("", "auto", "none").Exist(dashboardAddr) {
scheme := "http"
if tlsCfg != nil {
Expand Down Expand Up @@ -289,6 +291,7 @@ func (m *Manager) DisplayTiKVLabels(name string, opt operator.Options) error {
metadata, _ := m.meta(name)
topo := metadata.GetTopology()
base := metadata.GetBaseMeta()
statusTimeout := time.Duration(opt.APITimeout) * time.Second
// display cluster meta
cyan := color.New(color.FgCyan, color.Bold)

Expand Down Expand Up @@ -361,7 +364,7 @@ func (m *Manager) DisplayTiKVLabels(name string, opt operator.Options) error {
}
topo.IterInstance(func(ins spec.Instance) {
if ins.ComponentName() == spec.ComponentPD {
status := ins.Status(ctx, tlsCfg, masterList...)
status := ins.Status(ctx, statusTimeout, tlsCfg, masterList...)
if strings.HasPrefix(status, "Up") || strings.HasPrefix(status, "Healthy") {
instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort())
masterActive = append(masterActive, instAddr)
Expand Down Expand Up @@ -456,6 +459,8 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI
topo := metadata.GetTopology()
base := metadata.GetBaseMeta()

statusTimeout := time.Duration(opt.APITimeout) * time.Second

err = SetSSHKeySet(ctx, m.specManager.Path(name, "ssh", "id_rsa"), m.specManager.Path(name, "ssh", "id_rsa.pub"))
if err != nil {
return nil, err
Expand All @@ -481,7 +486,8 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI
if ins.ComponentName() != spec.ComponentPD && ins.ComponentName() != spec.ComponentDMMaster {
return
}
status := ins.Status(ctx, tlsCfg, masterList...)

status := ins.Status(ctx, statusTimeout, tlsCfg, masterList...)
if strings.HasPrefix(status, "Up") || strings.HasPrefix(status, "Healthy") {
instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort())
masterActive = append(masterActive, instAddr)
Expand All @@ -491,7 +497,7 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI

var dashboardAddr string
if t, ok := topo.(*spec.Specification); ok {
dashboardAddr, _ = t.GetDashboardAddress(ctx, tlsCfg, masterActive...)
dashboardAddr, _ = t.GetDashboardAddress(ctx, tlsCfg, statusTimeout, masterActive...)
}

clusterInstInfos := []InstInfo{}
Expand Down Expand Up @@ -524,12 +530,12 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI
case spec.ComponentDMMaster:
status = masterStatus[ins.ID()]
default:
status = ins.Status(ctx, tlsCfg, masterActive...)
status = ins.Status(ctx, statusTimeout, tlsCfg, masterActive...)
}

since := "-"
if opt.ShowUptime {
since = formatInstanceSince(ins.Uptime(ctx, tlsCfg))
since = formatInstanceSince(ins.Uptime(ctx, statusTimeout, tlsCfg))
}

// Query the service status and uptime
Expand Down Expand Up @@ -717,7 +723,7 @@ func SetClusterSSH(ctx context.Context, topo spec.Topology, deployUser string, s
}

// DisplayDashboardInfo prints the dashboard address of cluster
func (m *Manager) DisplayDashboardInfo(clusterName string, tlsCfg *tls.Config) error {
func (m *Manager) DisplayDashboardInfo(clusterName string, timeout time.Duration, tlsCfg *tls.Config) error {
metadata, err := spec.ClusterMetadata(clusterName)
if err != nil && !errors.Is(perrs.Cause(err), meta.ErrValidate) &&
!errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) {
Expand All @@ -730,7 +736,7 @@ func (m *Manager) DisplayDashboardInfo(clusterName string, tlsCfg *tls.Config) e
}

ctx := context.WithValue(context.Background(), logprinter.ContextKeyLogger, m.logger)
pdAPI := api.NewPDClient(ctx, pdEndpoints, 2*time.Second, tlsCfg)
pdAPI := api.NewPDClient(ctx, pdEndpoints, timeout, tlsCfg)
dashboardAddr, err := pdAPI.GetDashboardAddress()
if err != nil {
return fmt.Errorf("failed to retrieve TiDB Dashboard instance from PD: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/operation/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func DestroyClusterTombstone(
defer tcpProxy.Close(closeC)
pdEndpoints = tcpProxy.GetEndpoints()
}
binlogClient, err := api.NewBinlogClient(pdEndpoints, tlsCfg)
binlogClient, err := api.NewBinlogClient(pdEndpoints, 5*time.Second, tlsCfg)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/operation/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func ScaleInCluster(
defer tcpProxy.Close(closeC)
pdEndpoints = tcpProxy.GetEndpoints()
}
binlogClient, err := api.NewBinlogClient(pdEndpoints, tlsCfg)
binlogClient, err := api.NewBinlogClient(pdEndpoints, 5*time.Second, tlsCfg)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/cluster/spec/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func (c *AlertManagerComponent) Instances() []Instance {
s.DeployDir,
s.DataDir,
},
StatusFn: func(_ context.Context, _ *tls.Config, _ ...string) string {
return statusByHost(s.Host, s.WebPort, "/-/ready", nil)
StatusFn: func(_ context.Context, timeout time.Duration, _ *tls.Config, _ ...string) string {
return statusByHost(s.Host, s.WebPort, "/-/ready", timeout, nil)
},
UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.WebPort, tlsCfg)
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.WebPort, timeout, tlsCfg)
},
},
topo: c.Topology,
Expand Down
8 changes: 4 additions & 4 deletions pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func (c *CDCComponent) Instances() []Instance {
Dirs: []string{
s.DeployDir,
},
StatusFn: func(_ context.Context, tlsCfg *tls.Config, _ ...string) string {
return statusByHost(s.Host, s.Port, "/status", tlsCfg)
StatusFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string {
return statusByHost(s.Host, s.Port, "/status", timeout, tlsCfg)
},
UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, tlsCfg)
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
},
}, c.Topology}
if s.DataDir != "" {
Expand Down
14 changes: 9 additions & 5 deletions pkg/cluster/spec/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ type DrainerSpec struct {
}

// Status queries current status of the instance
func (s *DrainerSpec) Status(ctx context.Context, tlsCfg *tls.Config, pdList ...string) string {
state := statusByHost(s.Host, s.Port, "/status", tlsCfg)
func (s *DrainerSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string {
if timeout < time.Second {
timeout = statusQueryTimeout
}

state := statusByHost(s.Host, s.Port, "/status", timeout, tlsCfg)

if s.Offline {
binlogClient, err := api.NewBinlogClient(pdList, tlsCfg)
binlogClient, err := api.NewBinlogClient(pdList, timeout, tlsCfg)
if err != nil {
return state
}
Expand Down Expand Up @@ -127,8 +131,8 @@ func (c *DrainerComponent) Instances() []Instance {
s.DataDir,
},
StatusFn: s.Status,
UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, tlsCfg)
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
},
}, c.Topology})
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/cluster/spec/grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ func (c *GrafanaComponent) Instances() []Instance {
Dirs: []string{
s.DeployDir,
},
StatusFn: func(_ context.Context, _ *tls.Config, _ ...string) string {
return statusByHost(s.Host, s.Port, "", nil)
StatusFn: func(_ context.Context, timeout time.Duration, _ *tls.Config, _ ...string) string {
return statusByHost(s.Host, s.Port, "", timeout, nil)
},
UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, tlsCfg)
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
},
},
topo: c.Topology,
Expand Down
Loading

0 comments on commit 12b9e99

Please sign in to comment.