Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tablet throttler: get remote tablets metrics from Realtime Stats , with auto-detection #13034

Closed
Prev Previous commit
Next Next commit
Tablet throttler: get remote tablets metrics from Realtime Stats
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed May 3, 2023
commit 10b0c52938959b2d93e9c5183c20c7ec134fd552
316 changes: 165 additions & 151 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

59 changes: 51 additions & 8 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 10 additions & 5 deletions go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
@@ -225,7 +225,7 @@ func (hs *healthStreamer) unregister(ch chan *querypb.StreamHealthResponse) {
delete(hs.clients, ch)
}

func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, terTimestamp time.Time, lag time.Duration, throttlerReplicationLag time.Duration, err error, serving bool) {
func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, terTimestamp time.Time, lag time.Duration, throttlerMetric float64, throttlerMetricsErr error, stateErr error, serving bool) {
hs.mu.Lock()
defer hs.mu.Unlock()

@@ -235,8 +235,8 @@ func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, terTimes
} else {
hs.state.TabletExternallyReparentedTimestamp = 0
}
if err != nil {
hs.state.RealtimeStats.HealthError = err.Error()
if stateErr != nil {
hs.state.RealtimeStats.HealthError = stateErr.Error()
} else {
hs.state.RealtimeStats.HealthError = ""
}
@@ -246,7 +246,12 @@ func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, terTimes
hs.state.RealtimeStats.FilteredReplicationLagSeconds, hs.state.RealtimeStats.BinlogPlayersCount = blpFunc()
hs.state.RealtimeStats.Qps = hs.stats.QPSRates.TotalRate()

hs.state.RealtimeStats.ThrottlerReplicationLagSeconds = throttlerReplicationLag.Seconds()
hs.state.RealtimeStats.ThrottlerMetric = throttlerMetric
if throttlerMetricsErr != nil {
hs.state.RealtimeStats.ThrottlerMetricError = throttlerMetricsErr.Error()
} else {
hs.state.RealtimeStats.ThrottlerMetricError = ""
}

shr := proto.Clone(hs.state).(*querypb.StreamHealthResponse)

@@ -256,7 +261,7 @@ func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, terTimes
serving: shr.Serving,
tabletType: shr.Target.TabletType,
lag: lag,
err: err,
err: stateErr,
})
}

16 changes: 8 additions & 8 deletions go/vt/vttablet/tabletserver/health_streamer_test.go
Original file line number Diff line number Diff line change
@@ -93,7 +93,7 @@ func TestHealthStreamerBroadcast(t *testing.T) {
}
assert.Truef(t, proto.Equal(want, shr), "want: %v, got: %v", want, shr)

hs.ChangeState(topodatapb.TabletType_REPLICA, time.Time{}, 0, 0, nil, false)
hs.ChangeState(topodatapb.TabletType_REPLICA, time.Time{}, 0, 0, nil, nil, false)
shr = <-ch
want = &querypb.StreamHealthResponse{
Target: &querypb.Target{
@@ -109,7 +109,7 @@ func TestHealthStreamerBroadcast(t *testing.T) {

// Test primary and timestamp.
now := time.Now()
hs.ChangeState(topodatapb.TabletType_PRIMARY, now, 0, 0, nil, true)
hs.ChangeState(topodatapb.TabletType_PRIMARY, now, 0, 0, nil, nil, true)
shr = <-ch
want = &querypb.StreamHealthResponse{
Target: &querypb.Target{
@@ -126,24 +126,24 @@ func TestHealthStreamerBroadcast(t *testing.T) {
assert.Truef(t, proto.Equal(want, shr), "want: %v, got: %v", want, shr)

// Test non-serving, and 0 timestamp for non-primary.
hs.ChangeState(topodatapb.TabletType_REPLICA, now, 1*time.Second, 1*time.Second, nil, false)
hs.ChangeState(topodatapb.TabletType_REPLICA, now, 1*time.Second, 1.0, nil, nil, false)
shr = <-ch
want = &querypb.StreamHealthResponse{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_REPLICA,
},
TabletAlias: alias,
RealtimeStats: &querypb.RealtimeStats{
ReplicationLagSeconds: 1,
FilteredReplicationLagSeconds: 1,
ThrottlerReplicationLagSeconds: 1.0,
BinlogPlayersCount: 2,
ReplicationLagSeconds: 1,
FilteredReplicationLagSeconds: 1,
ThrottlerMetric: 1.0,
BinlogPlayersCount: 2,
},
}
assert.Truef(t, proto.Equal(want, shr), "want: %v, got: %v", want, shr)

// Test Health error.
hs.ChangeState(topodatapb.TabletType_REPLICA, now, 0, 0, errors.New("repl err"), false)
hs.ChangeState(topodatapb.TabletType_REPLICA, now, 0, 0, nil, errors.New("repl err"), false)
shr = <-ch
want = &querypb.StreamHealthResponse{
Target: &querypb.Target{
16 changes: 9 additions & 7 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
@@ -181,6 +181,7 @@ type (
lagThrottler interface {
Open() error
Close()
SelfMetrics() (float64, error)
}

tableGarbageCollector interface {
@@ -618,7 +619,7 @@ func (sm *stateManager) setState(tabletType topodatapb.TabletType, state serving
if sm.state == StateNotConnected {
// If we're transitioning out of StateNotConnected, we have
// to also ensure replication status is healthy.
_, _, _ = sm.refreshReplHealthLocked()
_, _ = sm.refreshReplHealthLocked()
}
sm.state = state
// Broadcast also obtains a lock. Trigger in a goroutine to avoid a deadlock.
@@ -663,16 +664,17 @@ func (sm *stateManager) Broadcast() {
sm.mu.Lock()
defer sm.mu.Unlock()

lag, throttlerReplicationLag, err := sm.refreshReplHealthLocked()
sm.hs.ChangeState(sm.target.TabletType, sm.terTimestamp, lag, throttlerReplicationLag, err, sm.isServingLocked())
lag, stateErr := sm.refreshReplHealthLocked()
throttlerMetric, throttlerMetricsErr := sm.throttler.SelfMetrics()
sm.hs.ChangeState(sm.target.TabletType, sm.terTimestamp, lag, throttlerMetric, throttlerMetricsErr, stateErr, sm.isServingLocked())
}

func (sm *stateManager) refreshReplHealthLocked() (time.Duration, time.Duration, error) {
lag, err := sm.rt.Status()
func (sm *stateManager) refreshReplHealthLocked() (time.Duration, error) {
if sm.target.TabletType == topodatapb.TabletType_PRIMARY {
sm.replHealthy = true
return 0, lag, nil
return 0, nil
}
lag, err := sm.rt.Status()
if err != nil {
if sm.replHealthy {
log.Infof("Going unhealthy due to replication error: %v", err)
@@ -691,7 +693,7 @@ func (sm *stateManager) refreshReplHealthLocked() (time.Duration, time.Duration,
sm.replHealthy = true
}
}
return lag, lag, err
return lag, err
}

// EnterLameduck causes tabletserver to enter the lameduck state. This
16 changes: 8 additions & 8 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
@@ -663,34 +663,30 @@ func TestRefreshReplHealthLocked(t *testing.T) {

sm.target.TabletType = topodatapb.TabletType_PRIMARY
sm.replHealthy = false
lag, throttlerReplicationLag, err := sm.refreshReplHealthLocked()
lag, err := sm.refreshReplHealthLocked()
assert.Equal(t, time.Duration(0), lag)
assert.GreaterOrEqual(t, throttlerReplicationLag, time.Duration(0))
assert.NoError(t, err)
assert.True(t, sm.replHealthy)

sm.target.TabletType = topodatapb.TabletType_REPLICA
sm.replHealthy = false
lag, throttlerReplicationLag, err = sm.refreshReplHealthLocked()
lag, err = sm.refreshReplHealthLocked()
assert.Equal(t, 1*time.Second, lag)
assert.Equal(t, 1*time.Second, throttlerReplicationLag)
assert.NoError(t, err)
assert.True(t, sm.replHealthy)

rt.err = errors.New("err")
sm.replHealthy = true
lag, throttlerReplicationLag, err = sm.refreshReplHealthLocked()
lag, err = sm.refreshReplHealthLocked()
assert.Equal(t, 1*time.Second, lag)
assert.Equal(t, 1*time.Second, throttlerReplicationLag)
assert.Error(t, err)
assert.False(t, sm.replHealthy)

rt.err = nil
rt.lag = 3 * time.Hour
sm.replHealthy = true
lag, throttlerReplicationLag, err = sm.refreshReplHealthLocked()
lag, err = sm.refreshReplHealthLocked()
assert.Equal(t, 3*time.Hour, lag)
assert.Equal(t, 3*time.Hour, throttlerReplicationLag)
assert.NoError(t, err)
assert.False(t, sm.replHealthy)
}
@@ -927,6 +923,10 @@ func (te *testLagThrottler) Close() {
te.state = testStateClosed
}

func (te *testLagThrottler) SelfMetrics() (float64, error) {
return 0, nil
}

type testTableGC struct {
testOrderState
}
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
@@ -176,7 +176,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to
tsv.hs = newHealthStreamer(tsv, alias)
tsv.se = schema.NewEngine(tsv)
tsv.rt = repltracker.NewReplTracker(tsv, alias)
tsv.lagThrottler = throttle.NewThrottler(tsv, srvTopoServer, topoServer, alias.Cell, tsv.rt.HeartbeatWriter(), tabletTypeFunc)
tsv.lagThrottler = throttle.NewThrottler(tsv, srvTopoServer, topoServer, alias, tsv.rt.HeartbeatWriter(), tabletTypeFunc)
tsv.vstreamer = vstreamer.NewEngine(tsv, srvTopoServer, tsv.se, tsv.lagThrottler, alias.Cell)
tsv.tracker = schema.NewTracker(tsv, tsv.vstreamer, tsv.se)
tsv.watcher = NewBinlogWatcher(tsv, tsv.vstreamer, tsv.config)
Loading