Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use GetTabletsByCell in healthcheck #14693

Merged
merged 11 commits into from
Dec 12, 2023
Merged
2 changes: 1 addition & 1 deletion go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn

shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases)
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases, nil)
if err != nil {
log.Warningf("Error fetching tablets from topo: %v", err)
// If we get a partial result we can still use it, otherwise return.
Expand Down
56 changes: 14 additions & 42 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ type TopologyWatcher struct {
cell string
refreshInterval time.Duration
refreshKnownTablets bool
getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)
sem chan int
getTablets func(tw *TopologyWatcher) ([]*topo.TabletInfo, error)
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -94,7 +93,7 @@ type TopologyWatcher struct {
// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets that it is configured to watch, and reloads them periodically if needed.
// As of now there is only one implementation: watch all tablets in a cell.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topo.TabletInfo, error)) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
Expand All @@ -103,7 +102,6 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
refreshInterval: refreshInterval,
refreshKnownTablets: refreshKnownTablets,
getTablets: getTablets,
sem: make(chan int, topoReadConcurrency),
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})
Expand All @@ -117,8 +115,8 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: topoReadConcurrency})
})
}

Expand Down Expand Up @@ -149,11 +147,10 @@ func (tw *TopologyWatcher) Stop() {
}

func (tw *TopologyWatcher) loadTablets() {
var wg sync.WaitGroup
newTablets := make(map[string]*tabletInfo)

// First get the list of relevant tabletAliases.
tabletAliases, err := tw.getTablets(tw)
// First get the list of all tablets.
tabletInfos, err := tw.getTablets(tw)
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1)
Expand All @@ -168,11 +165,11 @@ func (tw *TopologyWatcher) loadTablets() {

// Accumulate a list of all known alias strings to use later
// when sorting.
tabletAliasStrs := make([]string, 0, len(tabletAliases))
tabletAliasStrs := make([]string, 0, len(tabletInfos))

tw.mu.Lock()
deepthi marked this conversation as resolved.
Show resolved Hide resolved
for _, tAlias := range tabletAliases {
aliasStr := topoproto.TabletAliasString(tAlias)
for _, tInfo := range tabletInfos {
aliasStr := topoproto.TabletAliasString(tInfo.Alias)
tabletAliasStrs = append(tabletAliasStrs, aliasStr)

if !tw.refreshKnownTablets {
Expand All @@ -182,38 +179,13 @@ func (tw *TopologyWatcher) loadTablets() {
continue
}
}

wg.Add(1)
go func(alias *topodata.TabletAlias) {
defer wg.Done()
tw.sem <- 1 // Wait for active queue to drain.
tablet, err := tw.topoServer.GetTablet(tw.ctx, alias)
topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1)
<-tw.sem // Done; enable next request to run.
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1)
select {
case <-tw.ctx.Done():
return
default:
}
log.Errorf("cannot get tablet for alias %v: %v", alias, err)
return
}
tw.mu.Lock()
aliasStr := topoproto.TabletAliasString(alias)
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
tablet: tablet.Tablet,
}
tw.mu.Unlock()
}(tAlias)
// There's no network call here, so we just do the tablets one at a time instead of in parallel goroutines.
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
tablet: tInfo.Tablet,
}
}

tw.mu.Unlock()
wg.Wait()
tw.mu.Lock()

for alias, newVal := range newTablets {
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) {
continue
Expand Down
20 changes: 10 additions & 10 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
t.Fatalf("CreateTablet failed: %v", err)
}
tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check the tablet is returned by GetAllTablets().
Expand Down Expand Up @@ -178,9 +178,9 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
// If refreshKnownTablets is disabled, only the new tablet is read
// from the topo
if refreshKnownTablets {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
} else {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
}
checkChecksum(t, tw, 2762153755)

Expand All @@ -195,7 +195,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
// only the list is read from the topo and the checksum doesn't change
tw.loadTablets()
if refreshKnownTablets {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0})
} else {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1})
}
Expand All @@ -221,7 +221,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
key = TabletToMapKey(tablet)

if refreshKnownTablets {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 1})

if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet)
Expand Down Expand Up @@ -264,7 +264,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
t.Fatalf("UpdateTabletFields failed: %v", err)
}
tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 2})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2})
allTablets = fhc.GetAllTablets()
key2 := TabletToMapKey(tablet2)
if _, ok := allTablets[key2]; !ok {
Expand All @@ -288,7 +288,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
t.Fatalf("UpdateTabletFields failed: %v", err)
}
tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 2})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2})
}

// Remove the tablet and check that it is detected as being gone.
Expand All @@ -300,7 +300,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
}
tw.loadTablets()
if refreshKnownTablets {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1})
} else {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1})
}
Expand Down Expand Up @@ -551,7 +551,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {
require.NoError(t, ts.CreateTablet(context.Background(), tablet))

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check tablet is reported by HealthCheck
Expand All @@ -576,7 +576,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {
require.NoError(t, ts.CreateTablet(context.Background(), tablet2))

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0})
checkChecksum(t, tw, 2762153755)

// Check the new tablet is NOT reported by HealthCheck.
Expand Down
9 changes: 5 additions & 4 deletions go/vt/topo/consultopo/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ var (
// are either application-level errors, or context errors.
func convertError(err error, nodePath string) error {
// Unwrap errors from the Go HTTP client.
if urlErr, ok := err.(*url.Error); ok {
var urlErr *url.Error
if errors.As(err, &urlErr) {
err = urlErr.Err
}

// Convert specific sentinel values.
switch err {
case context.Canceled:
switch {
case errors.Is(err, context.Canceled):
return topo.NewError(topo.Interrupted, nodePath)
case context.DeadlineExceeded:
case errors.Is(err, context.DeadlineExceeded):
return topo.NewError(topo.Timeout, nodePath)
}

Expand Down
3 changes: 3 additions & 0 deletions go/vt/topo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
NoUpdateNeeded
NoImplementation
NoReadOnlyImplementation
ResourceExhausted
)

// Error represents a topo error.
Expand Down Expand Up @@ -68,6 +69,8 @@ func NewError(code ErrorCode, node string) error {
message = fmt.Sprintf("no such topology implementation %s", node)
case NoReadOnlyImplementation:
message = fmt.Sprintf("no read-only topology implementation %s", node)
case ResourceExhausted:
message = fmt.Sprintf("server resource exhausted: %s", node)
default:
message = fmt.Sprintf("unknown code: %s", node)
}
Expand Down
13 changes: 9 additions & 4 deletions go/vt/topo/etcd2topo/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func convertError(err error, nodePath string) error {
return nil
}

if typeErr, ok := err.(rpctypes.EtcdError); ok {
var typeErr rpctypes.EtcdError
if errors.As(err, &typeErr) {
switch typeErr.Code() {
case codes.NotFound:
return topo.NewError(topo.NoNode, nodePath)
Expand All @@ -61,6 +62,8 @@ func convertError(err error, nodePath string) error {
// etcd primary election is failing, so timeout
// also sounds reasonable there.
return topo.NewError(topo.Timeout, nodePath)
case codes.ResourceExhausted:
return topo.NewError(topo.ResourceExhausted, nodePath)
}
return err
}
Expand All @@ -74,15 +77,17 @@ func convertError(err error, nodePath string) error {
return topo.NewError(topo.Interrupted, nodePath)
case codes.DeadlineExceeded:
return topo.NewError(topo.Timeout, nodePath)
case codes.ResourceExhausted:
return topo.NewError(topo.ResourceExhausted, nodePath)
default:
return err
}
}

switch err {
case context.Canceled:
switch {
case errors.Is(err, context.Canceled):
return topo.NewError(topo.Interrupted, nodePath)
case context.DeadlineExceeded:
case errors.Is(err, context.DeadlineExceeded):
return topo.NewError(topo.Timeout, nodePath)
default:
return err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (ts *Server) GetTabletMapForShardByCell(ctx context.Context, keyspace, shar

// get the tablets for the cells we were able to reach, forward
// ErrPartialResult from FindAllTabletAliasesInShard
result, gerr := ts.GetTabletMap(ctx, aliases)
result, gerr := ts.GetTabletMap(ctx, aliases, nil)
if gerr == nil && err != nil {
gerr = err
}
Expand Down
Loading
Loading