diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index e3495a21ff1..01011c2c30a 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -57,7 +57,7 @@ const ( // ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store. type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. - OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error) + OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) // OnResponse is used to consume tokens after receiving response. OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) // IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. @@ -171,12 +171,13 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con if err != nil { return nil, err } - if len(resp.Kvs) == 0 { + kvs := resp.GetKvs() + if len(kvs) == 0 { log.Warn("[resource group controller] server does not save config, load config failed") return DefaultConfig(), nil } config := &Config{} - err = json.Unmarshal(resp.Kvs[0].GetValue(), config) + err = json.Unmarshal(kvs[0].GetValue(), config) if err != nil { return nil, err } @@ -525,10 +526,10 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. func (c *ResourceGroupsController) OnRequestWait( ctx context.Context, resourceGroupName string, info RequestInfo, -) (*rmpb.Consumption, *rmpb.Consumption, error) { +) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) { gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { - return nil, nil, err + return nil, nil, 0, err } return gc.onRequestWait(ctx, info) } @@ -1175,7 +1176,7 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 { func (gc *groupCostController) onRequestWait( ctx context.Context, info RequestInfo, -) (*rmpb.Consumption, *rmpb.Consumption, error) { +) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) { delta := &rmpb.Consumption{} for _, calc := range gc.calculators { calc.BeforeKVRequest(delta, info) @@ -1225,7 +1226,7 @@ func (gc *groupCostController) onRequestWait( failpoint.Inject("triggerUpdate", func() { gc.lowRUNotifyChan <- struct{}{} }) - return nil, nil, err + return nil, nil, 0, err } gc.successfulRequestDuration.Observe(d.Seconds()) } @@ -1244,7 +1245,7 @@ func (gc *groupCostController) onRequestWait( *gc.mu.storeCounter[info.StoreID()] = *gc.mu.globalCounter gc.mu.Unlock() - return delta, penalty, nil + return delta, penalty, gc.meta.Priority, nil } func (gc *groupCostController) onResponse( diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 6877f8206f3..1db19787a81 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -30,8 +30,9 @@ import ( func createTestGroupCostController(re *require.Assertions) *groupCostController { group := &rmpb.ResourceGroup{ - Name: "test", - Mode: rmpb.GroupMode_RUMode, + Name: "test", + Mode: rmpb.GroupMode_RUMode, + Priority: 1, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ Settings: &rmpb.TokenLimitSettings{ @@ -100,8 +101,9 @@ func TestRequestAndResponseConsumption(t *testing.T) { kvCalculator := gc.getKVCalculator() for idx, testCase := range testCases { caseNum := fmt.Sprintf("case %d", idx) - consumption, _, err := gc.onRequestWait(context.TODO(), testCase.req) + consumption, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req) re.NoError(err, caseNum) + re.Equal(priority, gc.meta.Priority) expectedConsumption := &rmpb.Consumption{} if testCase.req.IsWrite() { kvCalculator.calculateWriteCost(expectedConsumption, testCase.req) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index ed6a3ee501c..91a21caf91b 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -438,9 +438,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { rres := cas.tcs[i].makeReadResponse() wres := cas.tcs[i].makeWriteResponse() startTime := time.Now() - _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) + _, _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) re.NoError(err) - _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) + _, _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) re.NoError(err) sum += time.Since(startTime) controller.OnResponse(cas.resourceGroupName, rreq, rres) @@ -457,7 +457,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) + _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) re.Error(err) time.Sleep(time.Millisecond * 200) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) @@ -512,9 +512,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { wreq := tcs.makeWriteRequest() rres := tcs.makeReadResponse() wres := tcs.makeWriteResponse() - _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) + _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) re.NoError(err) - _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) + _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) re.NoError(err) controller.OnResponse(resourceGroupName, rreq, rres) controller.OnResponse(resourceGroupName, wreq, wres) @@ -551,9 +551,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { rres := cas.tcs[i].makeReadResponse() wres := cas.tcs[i].makeWriteResponse() startTime := time.Now() - _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) + _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) re.NoError(err) - _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) + _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) re.NoError(err) sum += time.Since(startTime) controller.OnResponse(resourceGroupName, rreq, rres) @@ -571,14 +571,14 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { resourceGroupName2 := suite.initGroups[2].Name tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 100000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq) + _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq) re.NoError(err) re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend", "return(true)")) resourceGroupName3 := suite.initGroups[3].Name tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 1000, times: 1, waitDuration: 0} wreq = tcs.makeWriteRequest() - _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) + _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) re.NoError(err) time.Sleep(110 * time.Millisecond) tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 10, times: 1010, waitDuration: 0} @@ -586,7 +586,7 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { for i := 0; i < tcs.times; i++ { wreq = tcs.makeWriteRequest() startTime := time.Now() - _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) + _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) duration += time.Since(startTime) re.NoError(err) } @@ -635,7 +635,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // init req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) resp := controller.NewTestResponseInfo(0, time.Duration(30), true) - _, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, _, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) @@ -644,7 +644,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) @@ -654,7 +654,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // failed request, shouldn't be counted in penalty req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(0), false) - _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) @@ -664,7 +664,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // from same store, should be zero req1 := controller.NewTestRequestInfo(false, 0, 1 /* store1 */) resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) + _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req1, resp1) @@ -673,7 +673,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // from different store, should be non-zero req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */) resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) + _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) re.NoError(err) re.Equal(penalty.WriteBytes, 60.0) re.InEpsilon(penalty.TotalCpuTimeMs, 10.0/1000.0/1000.0, 1e-6) @@ -683,7 +683,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // from new store, should be zero req3 := controller.NewTestRequestInfo(true, 0, 3 /* store3 */) resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) + _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req3, resp3) @@ -693,7 +693,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { resourceGroupName = groupNames[1] req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) + _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req4, resp4)