Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jan 29, 2024
1 parent 4cd8a23 commit c7a27d0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
8 changes: 1 addition & 7 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ var (
// refreshKnownTablets tells us whether to process all tablets or only new tablets.
refreshKnownTablets = true

// topoReadConcurrency tells us how many topo reads are allowed in parallel.
topoReadConcurrency int64 = 32

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

Expand All @@ -107,8 +104,6 @@ const (
DefaultHealthCheckRetryDelay = 5 * time.Second
DefaultHealthCheckTimeout = 1 * time.Minute

// DefaultTopoReadConcurrency is used as the default value for the topoReadConcurrency parameter of a TopologyWatcher.
DefaultTopoReadConcurrency int = 5
// DefaultTopologyWatcherRefreshInterval is used as the default value for
// the refresh interval of a topology watcher.
DefaultTopologyWatcherRefreshInterval = 1 * time.Minute
Expand Down Expand Up @@ -176,7 +171,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
ParseTabletURLTemplateFromFlag()
}

Expand Down Expand Up @@ -362,7 +356,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
33 changes: 20 additions & 13 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package topo
import (
"context"
"path"
"runtime"
"sort"
"sync"

"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/event"
Expand All @@ -37,13 +38,20 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// This file contains keyspace utility functions
// This file contains keyspace utility functions.

// Default concurrency to use in order to avoid overhwelming the topo server.
// This uses a heuristic based on the number of vCPUs available -- where it's
// assumed that as larger machines are used for Vitess deployments they will
// be able to do more concurrently.
var DefaultConcurrency = runtime.NumCPU()
var DefaultConcurrency int64

func registerFlags(fs *pflag.FlagSet) {
fs.Int64Var(&DefaultConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
}

func init() {
servenv.OnParseFor("vtcombo", registerFlags)
servenv.OnParseFor("vtctld", registerFlags)
servenv.OnParseFor("vtgate", registerFlags)
}

// KeyspaceInfo is a meta struct that contains metadata to give the
// data more context and convenience. This is the main way we interact
Expand Down Expand Up @@ -183,7 +191,7 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error {
type FindAllShardsInKeyspaceOptions struct {
// Concurrency controls the maximum number of concurrent calls to GetShard.
// If <= 0, Concurrency is set to 1.
Concurrency int
Concurrency int64
}

// FindAllShardsInKeyspace reads and returns all the existing shards in a
Expand Down Expand Up @@ -230,12 +238,11 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
}
if IsErrType(err, NoNode) {
// The path doesn't exist, let's see if the keyspace exists.
_, kerr := ts.GetKeyspace(ctx, keyspace)
if kerr == nil {
// We simply have no shards.
return make(map[string]*ShardInfo, 0), nil
if _, kerr := ts.GetKeyspace(ctx, keyspace); kerr != nil {
return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace)
}
return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace)
// We simply have no shards.
return make(map[string]*ShardInfo, 0), nil
}
// Currently the ZooKeeper implementation does not support scans so we
// fall back to concurrently fetching the shards one by one.
Expand Down Expand Up @@ -268,7 +275,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
)

eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(opt.Concurrency)
eg.SetLimit(int(opt.Concurrency))

for _, shard := range shards {
shard := shard
Expand Down

0 comments on commit c7a27d0

Please sign in to comment.