diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 6374ecd5078..612323ca9b7 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -35,6 +35,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "hash/crc32" "html/template" @@ -98,6 +99,9 @@ var ( // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond + + // errKeyspacesToWatchAndTabletFilters is an error for cases where incompatible filters are defined. + errKeyspacesToWatchAndTabletFilters = errors.New("only one of --keyspaces_to_watch and --tablet_filters may be specified at a time") ) // See the documentation for NewHealthCheck below for an explanation of these parameters. @@ -296,6 +300,27 @@ type HealthCheckImpl struct { healthCheckDialSem *semaphore.Weighted } +// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate. +func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) { + if len(tabletFilters) > 0 { + if len(KeyspacesToWatch) > 0 { + return nil, errKeyspacesToWatchAndTabletFilters + } + + fbs, err := NewFilterByShard(tabletFilters) + if err != nil { + return nil, fmt.Errorf("failed to parse tablet_filters value %q: %v", strings.Join(tabletFilters, ","), err) + } + filters = append(filters, fbs) + } else if len(KeyspacesToWatch) > 0 { + filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch)) + } + if len(tabletFilterTags) > 0 { + filters = append(filters, NewFilterByTabletTags(tabletFilterTags)) + } + return filters, nil +} + // NewHealthCheck creates a new HealthCheck object. // Parameters: // retryDelay. @@ -317,10 +342,14 @@ type HealthCheckImpl struct { // // The localCell for this healthcheck // -// callback. +// cellsToWatch. // -// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering. -func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl { +// Is a list of cells to watch for tablets. +// +// filters. +// +// Is one or more filters to apply when determining what tablets we want to stream healthchecks from. +func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl { log.Infof("loading tablets for cells: %v", cellsToWatch) hc := &HealthCheckImpl{ @@ -342,27 +371,10 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur } for _, c := range cells { - var filters TabletFilters log.Infof("Setting up healthcheck for cell: %v", c) if c == "" { continue } - if len(tabletFilters) > 0 { - if len(KeyspacesToWatch) > 0 { - log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time") - } - - fbs, err := NewFilterByShard(tabletFilters) - if err != nil { - log.Exitf("Cannot parse tablet_filters parameter: %v", err) - } - filters = append(filters, fbs) - } else if len(KeyspacesToWatch) > 0 { - filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch)) - } - if len(tabletFilterTags) > 0 { - filters = append(filters, NewFilterByTabletTags(tabletFilterTags)) - } topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency)) } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 35cd1f17d05..28a0dcf91fe 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -64,6 +64,77 @@ func init() { refreshInterval = time.Minute } +func TestNewVTGateHealthCheckFilters(t *testing.T) { + defer func() { + KeyspacesToWatch = nil + tabletFilters = nil + tabletFilterTags = nil + }() + + testCases := []struct { + name string + keyspacesToWatch []string + tabletFilters []string + tabletFilterTags map[string]string + expectedError string + expectedFilterTypes []any + }{ + { + name: "noFilters", + }, + { + name: "tabletFilters", + tabletFilters: []string{"ks1|-80"}, + expectedFilterTypes: []any{&FilterByShard{}}, + }, + { + name: "keyspacesToWatch", + keyspacesToWatch: []string{"ks1"}, + expectedFilterTypes: []any{&FilterByKeyspace{}}, + }, + { + name: "tabletFiltersAndTags", + tabletFilters: []string{"ks1|-80"}, + tabletFilterTags: map[string]string{"test": "true"}, + expectedFilterTypes: []any{&FilterByShard{}, &FilterByTabletTags{}}, + }, + { + name: "keyspacesToWatchAndTags", + tabletFilterTags: map[string]string{"test": "true"}, + keyspacesToWatch: []string{"ks1"}, + expectedFilterTypes: []any{&FilterByKeyspace{}, &FilterByTabletTags{}}, + }, + { + name: "failKeyspacesToWatchAndFilters", + tabletFilters: []string{"ks1|-80"}, + keyspacesToWatch: []string{"ks1"}, + expectedError: errKeyspacesToWatchAndTabletFilters.Error(), + }, + { + name: "failInvalidTabletFilters", + tabletFilters: []string{"shouldfail!@#!"}, + expectedError: "failed to parse tablet_filters value \"shouldfail!@#!\": invalid FilterByShard parameter: shouldfail!@#!", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + KeyspacesToWatch = testCase.keyspacesToWatch + tabletFilters = testCase.tabletFilters + tabletFilterTags = testCase.tabletFilterTags + + filters, err := NewVTGateHealthCheckFilters() + if testCase.expectedError != "" { + assert.EqualError(t, err, testCase.expectedError) + } + assert.Len(t, filters, len(testCase.expectedFilterTypes)) + for i, filter := range filters { + assert.IsType(t, testCase.expectedFilterTypes[i], filter) + } + }) + } +} + func TestHealthCheck(t *testing.T) { // reset error counters hcErrorCounters.ResetAll() @@ -943,7 +1014,7 @@ func TestGetHealthyTablets(t *testing.T) { func TestPrimaryInOtherCell(t *testing.T) { ts := memorytopo.NewServer("cell1", "cell2") - hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() // add a tablet as primary in different cell @@ -1000,7 +1071,7 @@ func TestPrimaryInOtherCell(t *testing.T) { func TestReplicaInOtherCell(t *testing.T) { ts := memorytopo.NewServer("cell1", "cell2") - hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() // add a tablet as replica @@ -1102,7 +1173,7 @@ func TestReplicaInOtherCell(t *testing.T) { func TestCellAliases(t *testing.T) { ts := memorytopo.NewServer("cell1", "cell2") - hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() cellsAlias := &topodatapb.CellsAlias{ @@ -1248,7 +1319,7 @@ func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservic } func createTestHc(ts *topo.Server) *HealthCheckImpl { - return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "") + return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "", nil) } type fakeConn struct { diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go index 652e4ff7c7b..bd1b6def62f 100644 --- a/go/vt/discovery/keyspace_events_test.go +++ b/go/vt/discovery/keyspace_events_test.go @@ -39,7 +39,7 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) { factory.AddCell(cell) ts := faketopo.NewFakeTopoServer(factory) ts2 := &fakeTopoServer{} - hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "") + hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil) defer hc.Close() kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell) kss := &keyspaceState{ @@ -82,7 +82,7 @@ func TestKeyspaceEventTypes(t *testing.T) { factory.AddCell(cell) ts := faketopo.NewFakeTopoServer(factory) ts2 := &fakeTopoServer{} - hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "") + hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil) defer hc.Close() kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell) diff --git a/go/vt/throttler/demo/throttler_demo.go b/go/vt/throttler/demo/throttler_demo.go index 3593bc0806d..91d7e45f92c 100644 --- a/go/vt/throttler/demo/throttler_demo.go +++ b/go/vt/throttler/demo/throttler_demo.go @@ -101,7 +101,7 @@ type replica struct { // throttler is used to enforce the maximum rate at which replica applies // transactions. It must not be confused with the client's throttler. - throttler *throttler.Throttler + throttler throttler.Throttler lastHealthUpdate time.Time lagUpdateInterval time.Duration @@ -224,7 +224,7 @@ type client struct { primary *primary healthCheck discovery.HealthCheck - throttler *throttler.Throttler + throttler throttler.Throttler stopChan chan struct{} wg sync.WaitGroup @@ -237,7 +237,7 @@ func newClient(primary *primary, replica *replica, ts *topo.Server) *client { log.Fatal(err) } - healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "") + healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "", nil) c := &client{ primary: primary, healthCheck: healthCheck, diff --git a/go/vt/throttler/manager.go b/go/vt/throttler/manager.go index c2ee9f0a652..ee142190f75 100644 --- a/go/vt/throttler/manager.go +++ b/go/vt/throttler/manager.go @@ -64,16 +64,16 @@ type managerImpl struct { // mu guards all fields in this group. mu sync.Mutex // throttlers tracks all running throttlers (by their name). - throttlers map[string]*Throttler + throttlers map[string]Throttler } func newManager() *managerImpl { return &managerImpl{ - throttlers: make(map[string]*Throttler), + throttlers: make(map[string]Throttler), } } -func (m *managerImpl) registerThrottler(name string, throttler *Throttler) error { +func (m *managerImpl) registerThrottler(name string, throttler Throttler) error { m.mu.Lock() defer m.mu.Unlock() @@ -207,7 +207,7 @@ func (m *managerImpl) throttlerNamesLocked() []string { // log returns the most recent changes of the MaxReplicationLag module. // There will be one result for each processed replication lag record. -func (m *managerImpl) log(throttlerName string) ([]result, error) { +func (m *managerImpl) log(throttlerName string) ([]Result, error) { m.mu.Lock() defer m.mu.Unlock() @@ -216,5 +216,5 @@ func (m *managerImpl) log(throttlerName string) ([]result, error) { return nil, fmt.Errorf("throttler: %v does not exist", throttlerName) } - return t.log(), nil + return t.Log(), nil } diff --git a/go/vt/throttler/manager_test.go b/go/vt/throttler/manager_test.go index 8c0e6ae4563..a483ce9dc8f 100644 --- a/go/vt/throttler/manager_test.go +++ b/go/vt/throttler/manager_test.go @@ -37,7 +37,7 @@ var ( type managerTestFixture struct { m *managerImpl - t1, t2 *Throttler + t1, t2 Throttler } func (f *managerTestFixture) setUp() error { diff --git a/go/vt/throttler/max_replication_lag_module.go b/go/vt/throttler/max_replication_lag_module.go index e1a76f89c57..f94f6fabf4a 100644 --- a/go/vt/throttler/max_replication_lag_module.go +++ b/go/vt/throttler/max_replication_lag_module.go @@ -312,7 +312,7 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec m.memory.ageBadRate(now) - r := result{ + r := Result{ Now: now, RateChange: unchangedRate, lastRateChange: m.lastRateChange, @@ -445,7 +445,7 @@ func stateGreater(a, b state) bool { // and we should not skip the current replica ("lagRecordNow"). // Even if it's the same replica we may skip it and return false because // we want to wait longer for the propagation of the current rate change. -func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool { +func (m *MaxReplicationLagModule) isReplicaUnderTest(r *Result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool { if m.replicaUnderTest == nil { return true } @@ -471,7 +471,7 @@ func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, t return true } -func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) increaseRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) { m.markCurrentRateAsBadOrGood(r, now, stateIncreaseRate, unknown) oldRate := m.rate.Get() @@ -559,7 +559,7 @@ func (m *MaxReplicationLagModule) minTestDurationUntilNextIncrease(increase floa return minDuration } -func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) { // Guess replication rate based on the difference in the replication lag of this // particular replica. lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(discovery.TabletToMapKey(lagRecordNow.Tablet), m.lastRateChange) @@ -630,7 +630,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, // guessReplicationRate guesses the actual replication rate based on the new bac // Note that "lagDifference" can be positive (lag increased) or negative (lag // decreased). -func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) { +func (m *MaxReplicationLagModule) guessReplicationRate(r *Result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) { // avgReplicationRate is the average rate (per second) at which the replica // applied transactions from the replication stream. We infer the value // from the relative change in the replication lag. @@ -675,14 +675,14 @@ func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate return int64(newRate), reason } -func (m *MaxReplicationLagModule) emergency(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) emergency(r *Result, now time.Time, lagRecordNow replicationLagRecord) { m.markCurrentRateAsBadOrGood(r, now, stateEmergency, unknown) decreaseReason := fmt.Sprintf("replication lag went beyond max: %d > %d", lagRecordNow.lag(), m.config.MaxReplicationLagSec) m.decreaseRateByPercentage(r, now, lagRecordNow, stateEmergency, m.config.EmergencyDecrease, decreaseReason) } -func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) { +func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *Result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) { oldRate := m.rate.Get() rate := int64(float64(oldRate) - float64(oldRate)*decrease) if rate == 0 { @@ -694,7 +694,7 @@ func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.T m.updateRate(r, newState, rate, reason, now, lagRecordNow, m.config.MinDurationBetweenDecreases()) } -func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) { +func (m *MaxReplicationLagModule) updateRate(r *Result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) { oldRate := m.rate.Get() m.currentState = newState @@ -722,7 +722,7 @@ func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int // markCurrentRateAsBadOrGood determines the actual rate between the last rate // change and "now" and determines if that rate was bad or good. -func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time.Time, newState state, replicationLagChange replicationLagChange) { +func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *Result, now time.Time, newState state, replicationLagChange replicationLagChange) { if m.lastRateChange.IsZero() { // Module was just started. We don't have any data points yet. r.GoodOrBad = ignoredRate @@ -796,6 +796,6 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time } } -func (m *MaxReplicationLagModule) log() []result { +func (m *MaxReplicationLagModule) log() []Result { return m.results.latestValues() } diff --git a/go/vt/throttler/replication_lag_cache.go b/go/vt/throttler/replication_lag_cache.go index c9c2e94f113..ab26c0bc6b8 100644 --- a/go/vt/throttler/replication_lag_cache.go +++ b/go/vt/throttler/replication_lag_cache.go @@ -18,6 +18,7 @@ package throttler import ( "sort" + "sync" "time" "vitess.io/vitess/go/vt/discovery" @@ -30,6 +31,8 @@ type replicationLagCache struct { // The map key is replicationLagRecord.LegacyTabletStats.Key. entries map[string]*replicationLagHistory + mu sync.Mutex + // slowReplicas is a set of slow replicas. // The map key is replicationLagRecord.LegacyTabletStats.Key. // This map will always be recomputed by sortByLag() and must not be modified @@ -60,6 +63,9 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache // add inserts or updates "r" in the cache for the replica with the key "r.Key". func (c *replicationLagCache) add(r replicationLagRecord) { + c.mu.Lock() + defer c.mu.Unlock() + if !r.Serving { // Tablet is down. Do no longer track it. delete(c.entries, discovery.TabletToMapKey(r.Tablet)) @@ -76,9 +82,35 @@ func (c *replicationLagCache) add(r replicationLagRecord) { entry.add(r) } +// maxLag returns the maximum replication lag for the entries in cache. +func (c *replicationLagCache) maxLag() (maxLag uint32) { + c.mu.Lock() + defer c.mu.Unlock() + + for key := range c.entries { + if c.isIgnored(key) { + continue + } + + entry, ok := c.entries[key] + if !ok { + continue + } + + latest := entry.latest() + if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // latest returns the current lag record for the given LegacyTabletStats.Key string. // A zero record is returned if there is no latest entry. func (c *replicationLagCache) latest(key string) replicationLagRecord { + c.mu.Lock() + defer c.mu.Unlock() entry, ok := c.entries[key] if !ok { return replicationLagRecord{} @@ -90,6 +122,8 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord { // or just after it. // If there is no such record, a zero record is returned. func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord { + c.mu.Lock() + defer c.mu.Unlock() entry, ok := c.entries[key] if !ok { return replicationLagRecord{} @@ -100,6 +134,9 @@ func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLag // sortByLag sorts all replicas by their latest replication lag value and // tablet uid and updates the c.slowReplicas set. func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumReplicationLag int64) { + c.mu.Lock() + defer c.mu.Unlock() + // Reset the current list of ignored replicas. c.slowReplicas = make(map[string]bool) @@ -142,6 +179,9 @@ func (a byLagAndTabletUID) Less(i, j int) bool { // this slow replica. // "key" refers to ReplicationLagRecord.LegacyTabletStats.Key. func (c *replicationLagCache) ignoreSlowReplica(key string) bool { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.slowReplicas) == 0 { // No slow replicas at all. return false diff --git a/go/vt/throttler/replication_lag_cache_test.go b/go/vt/throttler/replication_lag_cache_test.go index 312f97e1999..9b34210d096 100644 --- a/go/vt/throttler/replication_lag_cache_test.go +++ b/go/vt/throttler/replication_lag_cache_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/discovery" ) @@ -91,3 +93,10 @@ func TestReplicationLagCache_SortByLag(t *testing.T) { t.Fatal("r1 should be tracked as a slow replica") } } + +func TestReplicationLagCache_MaxLag(t *testing.T) { + c := newReplicationLagCache(2) + c.add(lagRecord(sinceZero(1*time.Second), r1, 30)) + c.add(lagRecord(sinceZero(1*time.Second), r2, 1)) + require.Equal(t, uint32(30), c.maxLag()) +} diff --git a/go/vt/throttler/result.go b/go/vt/throttler/result.go index 179711116a3..8af02e58a3b 100644 --- a/go/vt/throttler/result.go +++ b/go/vt/throttler/result.go @@ -50,10 +50,10 @@ state (old/tested/new): {{.OldState}}/{{.TestedState}}/{{.NewState}} lag before: {{.LagBefore}} ({{.AgeOfBeforeLag}} ago) rates (primary/replica): {{.PrimaryRate}}/{{.GuessedReplicationRate}} backlog (old/new): {{.GuessedReplicationBacklogOld}}/{{.GuessedReplicationBacklogNew}} reason: {{.Reason}}`)) -// result is generated by the MaxReplicationLag module for each processed +// Result is generated by the MaxReplicationLag module for each processed // "replicationLagRecord". // It captures the details and the decision of the processing. -type result struct { +type Result struct { Now time.Time RateChange rateChange lastRateChange time.Time @@ -80,7 +80,7 @@ type result struct { GuessedReplicationBacklogNew int } -func (r result) String() string { +func (r Result) String() string { var b bytes.Buffer if err := resultStringTemplate.Execute(&b, r); err != nil { panic(fmt.Sprintf("failed to Execute() template: %v", err)) @@ -88,25 +88,25 @@ func (r result) String() string { return b.String() } -func (r result) Alias() string { +func (r Result) Alias() string { return topoproto.TabletAliasString(r.LagRecordNow.Tablet.Alias) } -func (r result) TimeSinceLastRateChange() string { +func (r Result) TimeSinceLastRateChange() string { if r.lastRateChange.IsZero() { return "n/a" } return fmt.Sprintf("%.1fs", r.Now.Sub(r.lastRateChange).Seconds()) } -func (r result) LagBefore() string { +func (r Result) LagBefore() string { if r.LagRecordBefore.isZero() { return "n/a" } return fmt.Sprintf("%ds", r.LagRecordBefore.Stats.ReplicationLagSeconds) } -func (r result) AgeOfBeforeLag() string { +func (r Result) AgeOfBeforeLag() string { if r.LagRecordBefore.isZero() { return "n/a" } @@ -123,18 +123,18 @@ type resultRing struct { // started reusing entries. wrapped bool // values is the underlying ring buffer. - values []result + values []Result } // newResultRing creates a new resultRing. func newResultRing(capacity int) *resultRing { return &resultRing{ - values: make([]result, capacity), + values: make([]Result, capacity), } } // add inserts a new result into the ring buffer. -func (rr *resultRing) add(r result) { +func (rr *resultRing) add(r Result) { rr.mu.Lock() defer rr.mu.Unlock() @@ -148,7 +148,7 @@ func (rr *resultRing) add(r result) { // latestValues returns all values of the buffer. Entries are sorted in reverse // chronological order i.e. newer items come first. -func (rr *resultRing) latestValues() []result { +func (rr *resultRing) latestValues() []Result { rr.mu.Lock() defer rr.mu.Unlock() @@ -162,7 +162,7 @@ func (rr *resultRing) latestValues() []result { count = rr.position } - results := make([]result, count) + results := make([]Result, count) for i := 0; i < count; i++ { pos := start - i if pos < 0 { diff --git a/go/vt/throttler/result_test.go b/go/vt/throttler/result_test.go index 9efc7df9412..9eadab503e8 100644 --- a/go/vt/throttler/result_test.go +++ b/go/vt/throttler/result_test.go @@ -23,7 +23,7 @@ import ( ) var ( - resultIncreased = result{ + resultIncreased = Result{ Now: sinceZero(1234 * time.Millisecond), RateChange: increasedRate, lastRateChange: sinceZero(1 * time.Millisecond), @@ -45,7 +45,7 @@ var ( GuessedReplicationBacklogOld: 0, GuessedReplicationBacklogNew: 0, } - resultDecreased = result{ + resultDecreased = Result{ Now: sinceZero(5000 * time.Millisecond), RateChange: decreasedRate, lastRateChange: sinceZero(1234 * time.Millisecond), @@ -67,7 +67,7 @@ var ( GuessedReplicationBacklogOld: 10, GuessedReplicationBacklogNew: 20, } - resultEmergency = result{ + resultEmergency = Result{ Now: sinceZero(10123 * time.Millisecond), RateChange: decreasedRate, lastRateChange: sinceZero(5000 * time.Millisecond), @@ -93,7 +93,7 @@ var ( func TestResultString(t *testing.T) { testcases := []struct { - r result + r Result want string }{ { @@ -135,27 +135,27 @@ reason: emergency state decreased the rate`, func TestResultRing(t *testing.T) { // Test data. - r1 := result{Reason: "r1"} - r2 := result{Reason: "r2"} - r3 := result{Reason: "r3"} + r1 := Result{Reason: "r1"} + r2 := Result{Reason: "r2"} + r3 := Result{Reason: "r3"} rr := newResultRing(2) // Use the ring partially. rr.add(r1) - if got, want := rr.latestValues(), []result{r1}; !reflect.DeepEqual(got, want) { + if got, want := rr.latestValues(), []Result{r1}; !reflect.DeepEqual(got, want) { t.Fatalf("items not correctly added to resultRing. got = %v, want = %v", got, want) } // Use it fully. rr.add(r2) - if got, want := rr.latestValues(), []result{r2, r1}; !reflect.DeepEqual(got, want) { + if got, want := rr.latestValues(), []Result{r2, r1}; !reflect.DeepEqual(got, want) { t.Fatalf("items not correctly added to resultRing. got = %v, want = %v", got, want) } // Let it wrap. rr.add(r3) - if got, want := rr.latestValues(), []result{r3, r2}; !reflect.DeepEqual(got, want) { + if got, want := rr.latestValues(), []Result{r3, r2}; !reflect.DeepEqual(got, want) { t.Fatalf("resultRing did not wrap correctly. got = %v, want = %v", got, want) } } diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 68905db1ad5..cd237548b3b 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/vt/proto/topodata" throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) const ( @@ -66,7 +67,22 @@ const ( InvalidMaxReplicationLag = -1 ) -// Throttler provides a client-side, thread-aware throttler. +// Throttler defines the throttler interface. +type Throttler interface { + Throttle(threadID int) time.Duration + ThreadFinished(threadID int) + Close() + MaxRate() int64 + SetMaxRate(rate int64) + RecordReplicationLag(time time.Time, th *discovery.TabletHealth) + GetConfiguration() *throttlerdatapb.Configuration + UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error + ResetConfiguration() + MaxLag(tabletType topodatapb.TabletType) uint32 + Log() []Result +} + +// ThrottlerImpl implements a client-side, thread-aware throttler. // See the package doc for more information. // // Calls of Throttle() and ThreadFinished() take threadID as parameter which is @@ -74,7 +90,7 @@ const ( // NOTE: Trottle() and ThreadFinished() assume that *per thread* calls to them // // are serialized and must not happen concurrently. -type Throttler struct { +type ThrottlerImpl struct { // name describes the Throttler instance and is used e.g. in the webinterface. name string // unit describes the entity the throttler is limiting e.g. "queries" or @@ -127,15 +143,15 @@ type Throttler struct { // unit refers to the type of entity you want to throttle e.g. "queries" or // "transactions". // name describes the Throttler instance and will be used by the webinterface. -func NewThrottler(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (*Throttler, error) { +func NewThrottler(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (Throttler, error) { return newThrottler(GlobalManager, name, unit, threadCount, maxRate, maxReplicationLag, time.Now) } -func NewThrottlerFromConfig(name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) { +func NewThrottlerFromConfig(name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (Throttler, error) { return newThrottlerFromConfig(GlobalManager, name, unit, threadCount, maxRateModuleMaxRate, maxReplicationLagModuleConfig, nowFunc) } -func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxRate, maxReplicationLag int64, nowFunc func() time.Time) (*Throttler, error) { +func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxRate, maxReplicationLag int64, nowFunc func() time.Time) (Throttler, error) { config := NewMaxReplicationLagModuleConfig(maxReplicationLag) config.MaxReplicationLagSec = maxReplicationLag @@ -143,7 +159,7 @@ func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxR } -func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) { +func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (Throttler, error) { err := maxReplicationLagModuleConfig.Verify() if err != nil { return nil, fmt.Errorf("invalid max replication lag config: %w", err) @@ -176,7 +192,7 @@ func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount threadThrottlers[i] = newThreadThrottler(i, actualRateHistory) runningThreads[i] = true } - t := &Throttler{ + t := &ThrottlerImpl{ name: name, unit: unit, manager: manager, @@ -215,7 +231,7 @@ func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount // the backoff duration elapsed. // The maximum value for the returned backoff is 1 second since the throttler // internally operates on a per-second basis. -func (t *Throttler) Throttle(threadID int) time.Duration { +func (t *ThrottlerImpl) Throttle(threadID int) time.Duration { if t.closed { panic(fmt.Sprintf("BUG: thread with ID: %v must not access closed Throttler", threadID)) } @@ -227,30 +243,18 @@ func (t *Throttler) Throttle(threadID int) time.Duration { // MaxLag returns the max of all the last replication lag values seen across all tablets of // the provided type, excluding ignored tablets. -func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { +func (t *ThrottlerImpl) MaxLag(tabletType topodata.TabletType) uint32 { cache := t.maxReplicationLagModule.lagCacheByType(tabletType) - - var maxLag uint32 - cacheEntries := cache.entries - - for key := range cacheEntries { - if cache.isIgnored(key) { - continue - } - - lag := cache.latest(key).Stats.ReplicationLagSeconds - if lag > maxLag { - maxLag = lag - } + if cache == nil { + return 0 } - - return maxLag + return cache.maxLag() } // ThreadFinished marks threadID as finished and redistributes the thread's // rate allotment across the other threads. // After ThreadFinished() is called, Throttle() must not be called anymore. -func (t *Throttler) ThreadFinished(threadID int) { +func (t *ThrottlerImpl) ThreadFinished(threadID int) { if t.threadFinished[threadID] { panic(fmt.Sprintf("BUG: thread with ID: %v already finished", threadID)) } @@ -265,7 +269,7 @@ func (t *Throttler) ThreadFinished(threadID int) { // Close stops all modules and frees all resources. // When Close() returned, the Throttler object must not be used anymore. -func (t *Throttler) Close() { +func (t *ThrottlerImpl) Close() { for _, m := range t.modules { m.Stop() } @@ -278,7 +282,7 @@ func (t *Throttler) Close() { // threadThrottlers accordingly. // The rate changes when the number of thread changes or a module updated its // max rate. -func (t *Throttler) updateMaxRate() { +func (t *ThrottlerImpl) updateMaxRate() { // Set it to infinite initially. maxRate := int64(math.MaxInt64) @@ -319,39 +323,39 @@ func (t *Throttler) updateMaxRate() { } // MaxRate returns the current rate of the MaxRateModule. -func (t *Throttler) MaxRate() int64 { +func (t *ThrottlerImpl) MaxRate() int64 { return t.maxRateModule.MaxRate() } // SetMaxRate updates the rate of the MaxRateModule. -func (t *Throttler) SetMaxRate(rate int64) { +func (t *ThrottlerImpl) SetMaxRate(rate int64) { t.maxRateModule.SetMaxRate(rate) } // RecordReplicationLag must be called by users to report the "ts" tablet health // data observed at "time". // Note: After Close() is called, this method must not be called anymore. -func (t *Throttler) RecordReplicationLag(time time.Time, th *discovery.TabletHealth) { +func (t *ThrottlerImpl) RecordReplicationLag(time time.Time, th *discovery.TabletHealth) { t.maxReplicationLagModule.RecordReplicationLag(time, th) } // GetConfiguration returns the configuration of the MaxReplicationLag module. -func (t *Throttler) GetConfiguration() *throttlerdatapb.Configuration { +func (t *ThrottlerImpl) GetConfiguration() *throttlerdatapb.Configuration { return t.maxReplicationLagModule.getConfiguration() } // UpdateConfiguration updates the configuration of the MaxReplicationLag module. -func (t *Throttler) UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error { +func (t *ThrottlerImpl) UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error { return t.maxReplicationLagModule.updateConfiguration(configuration, copyZeroValues) } // ResetConfiguration resets the configuration of the MaxReplicationLag module // to its initial settings. -func (t *Throttler) ResetConfiguration() { +func (t *ThrottlerImpl) ResetConfiguration() { t.maxReplicationLagModule.resetConfiguration() } -// log returns the most recent changes of the MaxReplicationLag module. -func (t *Throttler) log() []result { +// Log returns the most recent changes of the MaxReplicationLag module. +func (t *ThrottlerImpl) Log() []Result { return t.maxReplicationLagModule.log() } diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go index 0bb0ed0387a..e7e7c13c466 100644 --- a/go/vt/throttler/throttler_test.go +++ b/go/vt/throttler/throttler_test.go @@ -17,10 +17,18 @@ limitations under the License. package throttler import ( + "context" "runtime" "strings" + "sync" "testing" "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/topodata" ) // The main purpose of the benchmarks below is to demonstrate the functionality @@ -162,7 +170,7 @@ func sinceZero(sinceZero time.Duration) time.Time { // threadThrottler.newThreadThrottler() for more details. // newThrottlerWithClock should only be used for testing. -func newThrottlerWithClock(name, unit string, threadCount int, maxRate int64, maxReplicationLag int64, nowFunc func() time.Time) (*Throttler, error) { +func newThrottlerWithClock(name, unit string, threadCount int, maxRate int64, maxReplicationLag int64, nowFunc func() time.Time) (Throttler, error) { return newThrottler(GlobalManager, name, unit, threadCount, maxRate, maxReplicationLag, nowFunc) } @@ -274,14 +282,16 @@ func TestThreadFinished(t *testing.T) { // Max rate update to threadThrottlers happens asynchronously. Wait for it. timer := time.NewTimer(2 * time.Second) + throttlerImpl, ok := throttler.(*ThrottlerImpl) + require.True(t, ok) for { - if throttler.threadThrottlers[0].getMaxRate() == 2 { + if throttlerImpl.threadThrottlers[0].getMaxRate() == 2 { timer.Stop() break } select { case <-timer.C: - t.Fatalf("max rate was not propapgated to threadThrottler[0] in time: %v", throttler.threadThrottlers[0].getMaxRate()) + t.Fatalf("max rate was not propapgated to threadThrottler[0] in time: %v", throttlerImpl.threadThrottlers[0].getMaxRate()) default: // Timer not up yet. Try again. } @@ -389,7 +399,9 @@ func TestUpdateMaxRate_AllThreadsFinished(t *testing.T) { throttler.ThreadFinished(1) // Make sure that there's no division by zero error (threadsRunning == 0). - throttler.updateMaxRate() + throttlerImpl, ok := throttler.(*ThrottlerImpl) + require.True(t, ok) + throttlerImpl.updateMaxRate() // We don't care about the Throttler state at this point. } @@ -426,3 +438,78 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) { }() throttler.ThreadFinished(0) } + +func TestThrottlerMaxLag(t *testing.T) { + fc := &fakeClock{} + throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now) + require.NoError(t, err) + defer throttler.Close() + + require.NotNil(t, throttler) + throttlerImpl, ok := throttler.(*ThrottlerImpl) + require.True(t, ok) + require.NotNil(t, throttlerImpl.maxReplicationLagModule) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + // run .add() and .MaxLag() concurrently to detect races + for _, tabletType := range []topodata.TabletType{ + topodata.TabletType_REPLICA, + topodata.TabletType_RDONLY, + } { + wg.Add(1) + go func(wg *sync.WaitGroup, ctx context.Context, t *ThrottlerImpl, tabletType topodata.TabletType) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + throttler.MaxLag(tabletType) + } + } + }(&wg, ctx, throttlerImpl, tabletType) + + wg.Add(1) + go func(wg *sync.WaitGroup, ctx context.Context, throttler *ThrottlerImpl, tabletType topodata.TabletType) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType) + require.NotNil(t, cache) + cache.add(replicationLagRecord{ + time: time.Now(), + TabletHealth: discovery.TabletHealth{ + Serving: true, + Stats: &query.RealtimeStats{ + ReplicationLagSeconds: 5, + }, + Tablet: &topodata.Tablet{ + Hostname: t.Name(), + Type: tabletType, + PortMap: map[string]int32{ + "test": 15999, + }, + }, + }, + }) + } + } + }(&wg, ctx, throttlerImpl, tabletType) + } + time.Sleep(time.Second) + cancel() + wg.Wait() + + // check .MaxLag() + for _, tabletType := range []topodata.TabletType{ + topodata.TabletType_REPLICA, + topodata.TabletType_RDONLY, + } { + require.Equal(t, uint32(5), throttler.MaxLag(tabletType)) + } +} diff --git a/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go b/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go index 38fd9d76286..99b5a40e0ca 100644 --- a/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go +++ b/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go @@ -73,7 +73,7 @@ func TestSuitePanics(t *testing.T, c throttlerclient.Client) { var throttlerNames = []string{"t1", "t2"} type testFixture struct { - throttlers []*throttler.Throttler + throttlers []throttler.Throttler } func (tf *testFixture) setUp() error { diff --git a/go/vt/throttler/throttlerlogz.go b/go/vt/throttler/throttlerlogz.go index 6952b34feec..b5ce5376108 100644 --- a/go/vt/throttler/throttlerlogz.go +++ b/go/vt/throttler/throttlerlogz.go @@ -152,7 +152,7 @@ func showThrottlerLog(w http.ResponseWriter, m *managerImpl, name string) { colorLevel = "high" } data := struct { - result + Result ColorLevel string }{r, colorLevel} diff --git a/go/vt/throttler/throttlerlogz_test.go b/go/vt/throttler/throttlerlogz_test.go index 82ebb77e7a1..d5d1ff62327 100644 --- a/go/vt/throttler/throttlerlogz_test.go +++ b/go/vt/throttler/throttlerlogz_test.go @@ -21,6 +21,8 @@ import ( "net/http/httptest" "strings" "testing" + + "github.com/stretchr/testify/require" ) func TestThrottlerlogzHandler_MissingSlash(t *testing.T) { @@ -55,7 +57,7 @@ func TestThrottlerlogzHandler(t *testing.T) { testcases := []struct { desc string - r result + r Result want string }{ { @@ -148,7 +150,9 @@ func TestThrottlerlogzHandler(t *testing.T) { request, _ := http.NewRequest("GET", "/throttlerlogz/t1", nil) response := httptest.NewRecorder() - f.t1.maxReplicationLagModule.results.add(tc.r) + throttler, ok := f.t1.(*ThrottlerImpl) + require.True(t, ok) + throttler.maxReplicationLagModule.results.add(tc.r) throttlerlogzHandler(response, request, f.m) got := response.Body.String() diff --git a/go/vt/vtctld/vtctld.go b/go/vt/vtctld/vtctld.go index a599b5a0edd..462ffdd239f 100644 --- a/go/vt/vtctld/vtctld.go +++ b/go/vt/vtctld/vtctld.go @@ -163,7 +163,7 @@ func InitVtctld(ts *topo.Server) error { if err != nil { log.Errorf("Failed to get the list of known cells, failed to instantiate the healthcheck at startup: %v", err) } else { - healthCheck = discovery.NewHealthCheck(ctx, *vtctl.HealthcheckRetryDelay, *vtctl.HealthCheckTimeout, ts, localCell, strings.Join(cells, ",")) + healthCheck = discovery.NewHealthCheck(ctx, *vtctl.HealthcheckRetryDelay, *vtctl.HealthCheckTimeout, ts, localCell, strings.Join(cells, ","), nil) } } diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 6ad595b5202..89431a43212 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -100,7 +100,11 @@ type TabletGateway struct { } func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck { - return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch) + filters, err := discovery.NewVTGateHealthCheckFilters() + if err != nil { + log.Exit(err) + } + return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch, filters) } // NewTabletGateway creates and returns a new TabletGateway diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go index aeb75d258a3..327a37dc43f 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler (interfaces: ThrottlerInterface) +// Source: vitess.io/vitess/go/vt/throttler (interfaces: Throttler) // Package txthrottler is a generated GoMock package. package txthrottler @@ -13,45 +13,46 @@ import ( discovery "vitess.io/vitess/go/vt/discovery" throttlerdata "vitess.io/vitess/go/vt/proto/throttlerdata" topodata "vitess.io/vitess/go/vt/proto/topodata" + throttler "vitess.io/vitess/go/vt/throttler" ) -// MockThrottlerInterface is a mock of ThrottlerInterface interface. -type MockThrottlerInterface struct { +// MockThrottler is a mock of Throttler interface. +type MockThrottler struct { ctrl *gomock.Controller - recorder *MockThrottlerInterfaceMockRecorder + recorder *MockThrottlerMockRecorder } -// MockThrottlerInterfaceMockRecorder is the mock recorder for MockThrottlerInterface. -type MockThrottlerInterfaceMockRecorder struct { - mock *MockThrottlerInterface +// MockThrottlerMockRecorder is the mock recorder for MockThrottler. +type MockThrottlerMockRecorder struct { + mock *MockThrottler } -// NewMockThrottlerInterface creates a new mock instance. -func NewMockThrottlerInterface(ctrl *gomock.Controller) *MockThrottlerInterface { - mock := &MockThrottlerInterface{ctrl: ctrl} - mock.recorder = &MockThrottlerInterfaceMockRecorder{mock} +// NewMockThrottler creates a new mock instance. +func NewMockThrottler(ctrl *gomock.Controller) *MockThrottler { + mock := &MockThrottler{ctrl: ctrl} + mock.recorder = &MockThrottlerMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockThrottlerInterface) EXPECT() *MockThrottlerInterfaceMockRecorder { +func (m *MockThrottler) EXPECT() *MockThrottlerMockRecorder { return m.recorder } // Close mocks base method. -func (m *MockThrottlerInterface) Close() { +func (m *MockThrottler) Close() { m.ctrl.T.Helper() m.ctrl.Call(m, "Close") } // Close indicates an expected call of Close. -func (mr *MockThrottlerInterfaceMockRecorder) Close() *gomock.Call { +func (mr *MockThrottlerMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockThrottlerInterface)(nil).Close)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockThrottler)(nil).Close)) } // GetConfiguration mocks base method. -func (m *MockThrottlerInterface) GetConfiguration() *throttlerdata.Configuration { +func (m *MockThrottler) GetConfiguration() *throttlerdata.Configuration { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetConfiguration") ret0, _ := ret[0].(*throttlerdata.Configuration) @@ -59,27 +60,41 @@ func (m *MockThrottlerInterface) GetConfiguration() *throttlerdata.Configuration } // GetConfiguration indicates an expected call of GetConfiguration. -func (mr *MockThrottlerInterfaceMockRecorder) GetConfiguration() *gomock.Call { +func (mr *MockThrottlerMockRecorder) GetConfiguration() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).GetConfiguration)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottler)(nil).GetConfiguration)) +} + +// Log mocks base method. +func (m *MockThrottler) Log() []throttler.Result { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Log") + ret0, _ := ret[0].([]throttler.Result) + return ret0 +} + +// Log indicates an expected call of Log. +func (mr *MockThrottlerMockRecorder) Log() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Log", reflect.TypeOf((*MockThrottler)(nil).Log)) } // MaxLag mocks base method. -func (m *MockThrottlerInterface) MaxLag(tabletType topodata.TabletType) uint32 { +func (m *MockThrottler) MaxLag(arg0 topodata.TabletType) uint32 { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MaxLag", tabletType) + ret := m.ctrl.Call(m, "MaxLag", arg0) ret0, _ := ret[0].(uint32) return ret0 } -// MaxLag indicates an expected call of LastMaxLagNotIgnoredForTabletType. -func (mr *MockThrottlerInterfaceMockRecorder) MaxLag(tabletType interface{}) *gomock.Call { +// MaxLag indicates an expected call of MaxLag. +func (mr *MockThrottlerMockRecorder) MaxLag(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxLag), tabletType) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottler)(nil).MaxLag), arg0) } // MaxRate mocks base method. -func (m *MockThrottlerInterface) MaxRate() int64 { +func (m *MockThrottler) MaxRate() int64 { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "MaxRate") ret0, _ := ret[0].(int64) @@ -87,61 +102,61 @@ func (m *MockThrottlerInterface) MaxRate() int64 { } // MaxRate indicates an expected call of MaxRate. -func (mr *MockThrottlerInterfaceMockRecorder) MaxRate() *gomock.Call { +func (mr *MockThrottlerMockRecorder) MaxRate() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxRate", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxRate)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxRate", reflect.TypeOf((*MockThrottler)(nil).MaxRate)) } // RecordReplicationLag mocks base method. -func (m *MockThrottlerInterface) RecordReplicationLag(arg0 time.Time, arg1 *discovery.TabletHealth) { +func (m *MockThrottler) RecordReplicationLag(arg0 time.Time, arg1 *discovery.TabletHealth) { m.ctrl.T.Helper() m.ctrl.Call(m, "RecordReplicationLag", arg0, arg1) } // RecordReplicationLag indicates an expected call of RecordReplicationLag. -func (mr *MockThrottlerInterfaceMockRecorder) RecordReplicationLag(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) RecordReplicationLag(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordReplicationLag", reflect.TypeOf((*MockThrottlerInterface)(nil).RecordReplicationLag), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordReplicationLag", reflect.TypeOf((*MockThrottler)(nil).RecordReplicationLag), arg0, arg1) } // ResetConfiguration mocks base method. -func (m *MockThrottlerInterface) ResetConfiguration() { +func (m *MockThrottler) ResetConfiguration() { m.ctrl.T.Helper() m.ctrl.Call(m, "ResetConfiguration") } // ResetConfiguration indicates an expected call of ResetConfiguration. -func (mr *MockThrottlerInterfaceMockRecorder) ResetConfiguration() *gomock.Call { +func (mr *MockThrottlerMockRecorder) ResetConfiguration() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).ResetConfiguration)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetConfiguration", reflect.TypeOf((*MockThrottler)(nil).ResetConfiguration)) } // SetMaxRate mocks base method. -func (m *MockThrottlerInterface) SetMaxRate(arg0 int64) { +func (m *MockThrottler) SetMaxRate(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetMaxRate", arg0) } // SetMaxRate indicates an expected call of SetMaxRate. -func (mr *MockThrottlerInterfaceMockRecorder) SetMaxRate(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) SetMaxRate(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxRate", reflect.TypeOf((*MockThrottlerInterface)(nil).SetMaxRate), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxRate", reflect.TypeOf((*MockThrottler)(nil).SetMaxRate), arg0) } // ThreadFinished mocks base method. -func (m *MockThrottlerInterface) ThreadFinished(arg0 int) { +func (m *MockThrottler) ThreadFinished(arg0 int) { m.ctrl.T.Helper() m.ctrl.Call(m, "ThreadFinished", arg0) } // ThreadFinished indicates an expected call of ThreadFinished. -func (mr *MockThrottlerInterfaceMockRecorder) ThreadFinished(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) ThreadFinished(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ThreadFinished", reflect.TypeOf((*MockThrottlerInterface)(nil).ThreadFinished), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ThreadFinished", reflect.TypeOf((*MockThrottler)(nil).ThreadFinished), arg0) } // Throttle mocks base method. -func (m *MockThrottlerInterface) Throttle(arg0 int) time.Duration { +func (m *MockThrottler) Throttle(arg0 int) time.Duration { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Throttle", arg0) ret0, _ := ret[0].(time.Duration) @@ -149,13 +164,13 @@ func (m *MockThrottlerInterface) Throttle(arg0 int) time.Duration { } // Throttle indicates an expected call of Throttle. -func (mr *MockThrottlerInterfaceMockRecorder) Throttle(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) Throttle(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Throttle", reflect.TypeOf((*MockThrottlerInterface)(nil).Throttle), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Throttle", reflect.TypeOf((*MockThrottler)(nil).Throttle), arg0) } // UpdateConfiguration mocks base method. -func (m *MockThrottlerInterface) UpdateConfiguration(arg0 *throttlerdata.Configuration, arg1 bool) error { +func (m *MockThrottler) UpdateConfiguration(arg0 *throttlerdata.Configuration, arg1 bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateConfiguration", arg0, arg1) ret0, _ := ret[0].(error) @@ -163,7 +178,7 @@ func (m *MockThrottlerInterface) UpdateConfiguration(arg0 *throttlerdata.Configu } // UpdateConfiguration indicates an expected call of UpdateConfiguration. -func (mr *MockThrottlerInterfaceMockRecorder) UpdateConfiguration(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) UpdateConfiguration(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).UpdateConfiguration), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfiguration", reflect.TypeOf((*MockThrottler)(nil).UpdateConfiguration), arg0, arg1) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 8147fcae4bc..70d92aad3a7 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -36,15 +36,14 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" - throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // These vars store the functions used to create the topo server, healthcheck, // and go/vt/throttler. These are provided here so that they can be overridden // in tests to generate mocks. -type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck -type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) +type healthCheckFactoryFunc func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) +type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (throttler.Throttler, error) var ( healthCheckFactory healthCheckFactoryFunc @@ -52,10 +51,15 @@ var ( ) func resetTxThrottlerFactories() { - healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { - return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) + healthCheckFactory = func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) { + // discovery.NewFilterByShard expects a single-shard filter to be in "keyspace|shard" format. + filter, err := discovery.NewFilterByShard([]string{keyspace + "|" + shard}) + if err != nil { + return nil, err + } + return discovery.NewHealthCheck(ctx, discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","), filter), nil } - throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { + throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (throttler.Throttler, error) { return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) } } @@ -72,29 +76,6 @@ type TxThrottler interface { Throttle(priority int, workload string) (result bool) } -// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler -// It is only used here to allow mocking out a throttler object. -type ThrottlerInterface interface { - Throttle(threadID int) time.Duration - ThreadFinished(threadID int) - Close() - MaxRate() int64 - SetMaxRate(rate int64) - RecordReplicationLag(time time.Time, th *discovery.TabletHealth) - GetConfiguration() *throttlerdatapb.Configuration - UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error - ResetConfiguration() - MaxLag(tabletType topodatapb.TabletType) uint32 -} - -// TopologyWatcherInterface defines the public interface that is implemented by -// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out -// go/vt/discovery.LegacyTopologyWatcher. -type TopologyWatcherInterface interface { - Start() - Stop() -} - // TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with // go/vt/throttler.GlobalManager. const TxThrottlerName = "TransactionThrottler" @@ -168,9 +149,11 @@ type txThrottlerStateImpl struct { // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. - throttleMu sync.Mutex - throttler ThrottlerInterface - stopHealthCheck context.CancelFunc + throttleMu sync.Mutex + throttler throttler.Throttler + + ctx context.Context + cancel context.CancelFunc healthCheck discovery.HealthCheck healthCheckChan chan *discovery.TabletHealth @@ -296,7 +279,10 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi tabletTypes[tabletType] = true } + ctx, cancel := context.WithCancel(context.Background()) state := &txThrottlerStateImpl{ + ctx: ctx, + cancel: cancel, config: config, healthCheckCells: config.TxThrottlerHealthCheckCells, tabletTypes: tabletTypes, @@ -307,38 +293,42 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi // get cells from topo if none defined in tabletenv config if len(state.healthCheckCells) == 0 { - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + cellsCtx, cellsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cellsCancel() + state.healthCheckCells = fetchKnownCells(cellsCtx, txThrottler.topoServer, target) state.cellsFromTopo = true } - ctx, cancel := context.WithCancel(context.Background()) - state.stopHealthCheck = cancel - state.initHealthCheckStream(txThrottler.topoServer, target) - go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) + if err := state.initHealthCheckStream(txThrottler.topoServer, target); err != nil { + return nil, err + } + state.healthCheck.RegisterStats() + go state.healthChecksProcessor(txThrottler.topoServer, target) state.waitForTermination.Add(1) go state.updateMaxLag() return state, nil } -func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { - ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) +func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) (err error) { + ts.healthCheck, err = healthCheckFactory(ts.ctx, topoServer, target.Cell, target.Keyspace, target.Shard, ts.healthCheckCells) + if err != nil { + return err + } ts.healthCheckChan = ts.healthCheck.Subscribe() - + return nil } func (ts *txThrottlerStateImpl) closeHealthCheckStream() { if ts.healthCheck == nil { return } - ts.stopHealthCheck() + ts.cancel() ts.healthCheck.Close() } -func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { - fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) +func (ts *txThrottlerStateImpl) updateHealthCheckCells(topoServer *topo.Server, target *querypb.Target) error { + fetchCtx, cancel := context.WithTimeout(ts.ctx, topo.RemoteOperationTimeout) defer cancel() knownCells := fetchKnownCells(fetchCtx, topoServer, target) @@ -346,11 +336,12 @@ func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topo log.Info("txThrottler: restarting healthcheck stream due to topology cells update") ts.healthCheckCells = knownCells ts.closeHealthCheckStream() - ts.initHealthCheckStream(topoServer, target) + return ts.initHealthCheckStream(topoServer, target) } + return nil } -func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { +func (ts *txThrottlerStateImpl) healthChecksProcessor(topoServer *topo.Server, target *querypb.Target) { var cellsUpdateTicks <-chan time.Time if ts.cellsFromTopo { ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) @@ -359,10 +350,12 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoS } for { select { - case <-ctx.Done(): + case <-ts.ctx.Done(): return case <-cellsUpdateTicks: - ts.updateHealthCheckCells(ctx, topoServer, target) + if err := ts.updateHealthCheckCells(topoServer, target); err != nil { + log.Errorf("txThrottler: failed to update cell list: %+v", err) + } case th := <-ts.healthCheckChan: ts.StatsUpdate(th) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index de50f32378d..c595224cb81 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -18,7 +18,7 @@ package txthrottler // Commands to generate the mocks for this test. //go:generate mockgen -destination mock_healthcheck_test.go -package txthrottler -mock_names "HealthCheck=MockHealthCheck" vitess.io/vitess/go/vt/discovery HealthCheck -//go:generate mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler ThrottlerInterface +//go:generate mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/throttler Throttler import ( "context" @@ -66,14 +66,17 @@ func TestEnabledThrottler(t *testing.T) { mockHealthCheck := NewMockHealthCheck(mockCtrl) hcCall1 := mockHealthCheck.EXPECT().Subscribe() hcCall1.Do(func() {}) - hcCall2 := mockHealthCheck.EXPECT().Close() + hcCall2 := mockHealthCheck.EXPECT().RegisterStats() + hcCall2.Do(func() {}) hcCall2.After(hcCall1) - healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { - return mockHealthCheck + hcCall3 := mockHealthCheck.EXPECT().Close() + hcCall3.After(hcCall2) + healthCheckFactory = func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) { + return mockHealthCheck, nil } - mockThrottler := NewMockThrottlerInterface(mockCtrl) - throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { + mockThrottler := NewMockThrottler(mockCtrl) + throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (throttler.Throttler, error) { assert.Equal(t, 1, threadCount) return mockThrottler, nil }