From 777c2d8258d68976472bf3c9c065427154bb3ad4 Mon Sep 17 00:00:00 2001 From: nolouch Date: Fri, 5 Jul 2024 15:23:53 +0800 Subject: [PATCH] controller: fix the low_ru request missed Signed-off-by: nolouch --- .../resource_group/controller/controller.go | 9 +++ .../controller/controller_test.go | 80 +++++++++++++++++-- 2 files changed, 83 insertions(+), 6 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index e0f75dc746bd..960a1e379dfd 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -1177,16 +1177,25 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType switch selectTyp { case periodicReport: selected = selected || gc.shouldReportConsumption() + failpoint.Inject("triggerPeriodicReport", func(val failpoint.Value) { + selected = gc.name == val.(string) + }) fallthrough case lowToken: if counter.limiter.IsLowTokens() { selected = true } + failpoint.Inject("triggerLowRUReport", func(val failpoint.Value) { + if selectTyp == lowToken { + selected = gc.name == val.(string) + } + }) } request := &rmpb.RequestUnitItem{ Type: typ, Value: gc.calcRequest(counter), } + requests = append(requests, request) } req.Request = &rmpb.TokenBucketRequest_RuItems{ diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 94eb75edb8bd..0ceaf94a2a32 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/meta_storagepb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/stretchr/testify/mock" @@ -188,16 +189,83 @@ func (m *MockResourceGroupProvider) Get(ctx context.Context, key []byte, opts .. func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { re := require.New(t) - mockProvider := new(MockResourceGroupProvider) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // expectedResourceGroup := &rmpb.ResourceGroup{Name: "test-group"} + mockProvider := new(MockResourceGroupProvider) + + mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil) + // LoadResourceGroups + mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil) + // Watch + mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil) + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", "default"))) + defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport") + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport", fmt.Sprintf("return(\"%s\")", "test-group"))) + defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport") + controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil) controller.Start(ctx) - expectedResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000}}}} - mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(expectedResourceGroup, nil) - c1, err := controller.tryGetResourceGroup(ctx, "test-group") + defaultResourceGroup := &rmpb.ResourceGroup{Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + mockProvider.On("GetResourceGroup", mock.Anything, "default", mock.Anything).Return(defaultResourceGroup, nil) + mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil) + + c1, err := controller.tryGetResourceGroup(ctx, "default") re.NoError(err) - re.Equal(expectedResourceGroup, c1.meta) + re.Equal(defaultResourceGroup, c1.meta) + + c2, err := controller.tryGetResourceGroup(ctx, "test-group") + re.NoError(err) + re.Equal(testResourceGroup, c2.meta) + + var expectResp []*rmpb.TokenBucketResponse + recTestGroupAcquireTokenRequest := make(chan bool) + mockProvider.On("AcquireTokenBuckets", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + request := args.Get(1).(*rmpb.TokenBucketsRequest) + var responses []*rmpb.TokenBucketResponse + for _, req := range request.Requests { + if req.ResourceGroupName == "default" { + // no response the default group request, that's mean `len(c.run.currentRequests) != 0` always. + time.Sleep(100 * time.Second) + responses = append(responses, &rmpb.TokenBucketResponse{ + ResourceGroupName: "default", + GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{ + { + GrantedTokens: &rmpb.TokenBucket{ + Tokens: 100000, + }, + }, + }, + }) + } else { + responses = append(responses, &rmpb.TokenBucketResponse{ + ResourceGroupName: req.ResourceGroupName, + GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{ + { + GrantedTokens: &rmpb.TokenBucket{ + Tokens: 100000, + }, + }, + }, + }) + } + } + // receive test-group request + if len(request.Requests) == 1 && request.Requests[0].ResourceGroupName == "test-group" { + recTestGroupAcquireTokenRequest <- true + } + expectResp = responses + }).Return(expectResp, nil) + // wait default group request token by PeriodicReport. + time.Sleep(2 * time.Second) + counter := c2.run.requestUnitTokens[0] + counter.limiter.notify() + select { + case <-recTestGroupAcquireTokenRequest: + //re.True(res) + case <-time.After(5 * time.Second): + re.Fail("timeout") + } }