diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 2265053fdd2..9dd5e877979 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -22,6 +22,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" @@ -1463,7 +1464,10 @@ func (gc *groupCostController) onResponseImpl( } } } - + // no need to record the consumption, fast path. + if delta.RRU+delta.WRU == 0 { + return delta, nil + } gc.mu.Lock() // Record the consumption of the request add(gc.mu.consumption, delta) @@ -1504,7 +1508,10 @@ func (gc *groupCostController) onResponseWaitImpl( gc.metrics.successfulRequestDuration.Observe(d.Seconds()) waitDuration += d } - + // no need to record the consumption, fast path. + if delta.RRU+delta.WRU == 0 { + return delta, waitDuration, nil + } gc.mu.Lock() // Record the consumption of the request add(gc.mu.consumption, delta) @@ -1522,6 +1529,73 @@ func (gc *groupCostController) onResponseWaitImpl( return delta, waitDuration, nil } +func (gc *groupCostController) onResponseWaitAtomicImpl( + ctx context.Context, req RequestInfo, resp ResponseInfo, +) (*rmpb.Consumption, time.Duration, error) { + delta := &rmpb.Consumption{} + for _, calc := range gc.calculators { + calc.AfterKVRequest(delta, req, resp) + } + var waitDuration time.Duration + if !gc.burstable.Load() { + allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() + d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt) + if err != nil { + if errs.ErrClientResourceGroupThrottled.Equal(err) { + gc.metrics.failedRequestCounterWithThrottled.Inc() + gc.metrics.failedLimitReserveDuration.Observe(d.Seconds()) + } else { + gc.metrics.failedRequestCounterWithOthers.Inc() + } + return nil, waitDuration, err + } + gc.metrics.successfulRequestDuration.Observe(d.Seconds()) + waitDuration += d + } + // no need to record the consumption, fast path. + if delta.RRU+delta.WRU == 0 { + return delta, waitDuration, nil + } + // Record the consumption of the request + AtomicAddConsumption(gc.mu.consumption, delta) + // Record the consumption of the request by store + count := &rmpb.Consumption{} + *count = *delta + // As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest` + for _, calc := range gc.calculators { + calc.BeforeKVRequest(count, req) + } + AtomicAddConsumption(gc.mu.storeCounter[req.StoreID()], count) + AtomicAddConsumption(gc.mu.globalCounter, count) + return delta, waitDuration, nil +} + +// AtomicAdd applies an atomic addition on a field of Consumption. +func AtomicAdd(target *float64, delta float64) { + for { + old := atomic.LoadUint64((*uint64)(unsafe.Pointer(target))) + newValue := math.Float64bits(math.Float64frombits(old) + delta) + if atomic.CompareAndSwapUint64((*uint64)(unsafe.Pointer(target)), old, newValue) { + break + } + } +} + +// AtomicAddConsumption performs atomic addition for all fields in Consumption. +func AtomicAddConsumption(target, delta *rmpb.Consumption) { + if target == nil || delta == nil { + return + } + AtomicAdd(&target.RRU, delta.RRU) + AtomicAdd(&target.WRU, delta.WRU) + AtomicAdd(&target.ReadBytes, delta.ReadBytes) + AtomicAdd(&target.WriteBytes, delta.WriteBytes) + AtomicAdd(&target.TotalCpuTimeMs, delta.TotalCpuTimeMs) + AtomicAdd(&target.SqlLayerCpuTimeMs, delta.SqlLayerCpuTimeMs) + AtomicAdd(&target.KvReadRpcCount, delta.KvReadRpcCount) + AtomicAdd(&target.KvWriteRpcCount, delta.KvWriteRpcCount) +} + // GetActiveResourceGroup is used to get active resource group. // This is used for test only. func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup { diff --git a/client/resource_group/controller/controller.test b/client/resource_group/controller/controller.test new file mode 100755 index 00000000000..5c34783ae14 Binary files /dev/null and b/client/resource_group/controller/controller.test differ diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index f0bdc62d6d3..d7eee700958 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -382,3 +382,72 @@ func TestTryGetController(t *testing.T) { re.NoError(err) re.NotEmpty(consumption) } + +func BenchmarkRequestAndResponseConsumptionLockVer(b *testing.B) { + gc := createTestGroupCostController(require.New(b)) + testCases := []struct { + req *TestRequestInfo + resp *TestResponseInfo + }{ + { + req: &TestRequestInfo{ + isWrite: false, + writeBytes: 0, + }, + resp: &TestResponseInfo{ + readBytes: 100, + kvCPU: 100 * time.Millisecond, + succeed: true, + }, + }, + } + // exclude the token bucket locks + gc.burstable.Store(true) + + // Use b.RunParallel to simulate concurrent scenarios + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for idx, testCase := range testCases { + _, _, err := gc.onResponseWaitImpl(context.TODO(), testCase.req, testCase.resp) + if err != nil { + b.Fatalf("onResponseImpl failed: %v (%d)", err, idx) + } + } + } + }) +} + +func BenchmarkRequestAndResponseConsumptionAtomicVer(b *testing.B) { + gc := createTestGroupCostController(require.New(b)) + testCases := []struct { + req *TestRequestInfo + resp *TestResponseInfo + }{ + { + req: &TestRequestInfo{ + isWrite: false, + writeBytes: 0, + }, + resp: &TestResponseInfo{ + readBytes: 100, + kvCPU: 100 * time.Millisecond, + succeed: true, + }, + }, + } + + // exclude the token bucket locks + gc.burstable.Store(true) + + // Use b.RunParallel to simulate concurrent scenarios + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for idx, testCase := range testCases { + _, _, err := gc.onResponseWaitAtomicImpl(context.TODO(), testCase.req, testCase.resp) + if err != nil { + b.Fatalf("onResponseImpl failed: %v (%d)", err, idx) + } + } + } + }) +}