Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per workload TxThrottler metrics #13526

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT

if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) {
return nil, errTxThrottled
}

Expand All @@ -238,7 +238,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}

func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) {
return nil, errTxThrottled
}
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,6 @@ func (m mockTxThrottler) Open() (err error) {
func (m mockTxThrottler) Close() {
}

func (m mockTxThrottler) Throttle(priority int) (result bool) {
func (m mockTxThrottler) Throttle(priority int, workload string) (result bool) {
return m.throttle
}
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) {
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options), options.GetWorkloadName()) {
return errTxThrottled
}
var connSetting *pools.Setting
Expand Down
16 changes: 8 additions & 8 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type TxThrottler interface {
InitDBConfig(target *querypb.Target)
Open() (err error)
Close()
Throttle(priority int) (result bool)
Throttle(priority int, workload string) (result bool)
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
Expand Down Expand Up @@ -146,8 +146,8 @@ type txThrottler struct {
topoWatchers *stats.GaugesWithSingleLabel
healthChecksReadTotal *stats.CountersWithMultiLabels
healthChecksRecordedTotal *stats.CountersWithMultiLabels
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
requestsTotal *stats.CountersWithSingleLabel
requestsThrottled *stats.CountersWithSingleLabel
}

// txThrottlerConfig holds the parameters that need to be
Expand Down Expand Up @@ -219,8 +219,8 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
[]string{"cell", "DbType"}),
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded",
[]string{"cell", "DbType"}),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"),
requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"),
}
}

Expand Down Expand Up @@ -263,7 +263,7 @@ func (t *txThrottler) Close() {
// It returns true if the transaction should not proceed (the caller
// should back off). Throttle requires that Open() was previously called
// successfully.
func (t *txThrottler) Throttle(priority int) (result bool) {
func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
if !t.config.enabled {
return false
}
Expand All @@ -275,9 +275,9 @@ func (t *txThrottler) Throttle(priority int) (result bool) {
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority

t.requestsTotal.Add(1)
t.requestsTotal.Add(workload, 1)
if result {
t.requestsThrottled.Add(1)
t.requestsThrottled.Add(workload, 1)
}

return result
Expand Down
20 changes: 10 additions & 10 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestDisabledThrottler(t *testing.T) {
Shard: "shard",
})
assert.Nil(t, throttler.Open())
assert.False(t, throttler.Throttle(0))
assert.False(t, throttler.Throttle(0, "some_workload"))
throttlerImpl, _ := throttler.(*txThrottler)
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
throttler.Close()
Expand Down Expand Up @@ -129,9 +129,9 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts())

assert.False(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get())
assert.Zero(t, throttlerImpl.requestsThrottled.Get())
assert.False(t, throttlerImpl.Throttle(100, "some_workload"))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some_workload"])
assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some_workload"])

throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts())
Expand All @@ -148,14 +148,14 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts())

// The second throttle call should reject.
assert.True(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get())
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())
assert.True(t, throttlerImpl.Throttle(100, "some_workload"))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some_workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"])

// This call should not throttle due to priority. Check that's the case and counters agree.
assert.False(t, throttlerImpl.Throttle(0))
assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Get())
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())
assert.False(t, throttlerImpl.Throttle(0, "some_workload"))
assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some_workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"])
throttlerImpl.Close()
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts())
Expand Down