diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 8d57c46e855..cc18817d9c5 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -357,33 +357,38 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if err = proto.Unmarshal(item.Kv.Value, group); err != nil { continue } - if gc, ok := c.loadGroupController(group.Name); ok { + name := group.GetName() + gc, ok := c.loadGroupController(name) + if !ok { + continue + } + if !gc.tombstone.Load() { gc.modifyMeta(group) - // If the resource group is marked as tombstone before, set it as active again. - if swapped := gc.tombstone.CompareAndSwap(true, false); swapped { - resourceGroupStatusGauge.WithLabelValues(group.Name, gc.name).Set(1) - log.Info("[resource group controller] mark resource group as active", zap.String("name", group.Name)) - } + continue + } + // If the resource group is marked as tombstone before, re-create the resource group controller. + newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + if err != nil { + log.Warn("[resource group controller] re-create resource group cost controller for tombstone failed", + zap.String("name", name), zap.Error(err)) + continue + } + if c.groupsController.CompareAndSwap(name, gc, newGC) { + log.Info("[resource group controller] re-create resource group cost controller for tombstone", + zap.String("name", name)) } case meta_storagepb.Event_DELETE: - if item.PrevKv != nil { - if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil { - continue - } - // Do not delete the resource group immediately, just mark it as tombstone. - // For the requests that are still in progress, fallback to the default resource group. - if gc, ok := c.loadGroupController(group.Name); ok { - gc.tombstone.Store(true) - resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name) - resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(1) - log.Info("[resource group controller] mark resource group as tombstone", zap.String("name", group.Name)) - } - } else { - // Prev-kv is compacted means there must have been a delete event before this event, - // which means that this is just a duplicated event, so we can just ignore it. + // Prev-kv is compacted means there must have been a delete event before this event, + // which means that this is just a duplicated event, so we can just ignore it. + if item.PrevKv == nil { log.Info("[resource group controller] previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value))) + continue + } + if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil { + continue } + c.tombstoneGroupCostController(group.GetName()) } } case resp, ok := <-watchConfigChannel: @@ -446,15 +451,23 @@ func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *g return tmp.(*groupCostController), loaded } -// tryGetResourceGroup will try to get the resource group controller from local cache first, -// if the local cache misses, it will then call gRPC to fetch the resource group info from server. -func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) { +// NewResourceGroupNotExistErr returns a new error that indicates the resource group does not exist. +// It's exported for testing. +func NewResourceGroupNotExistErr(name string) error { + return errors.Errorf("%s does not exist", name) +} + +// tryGetResourceGroupController will try to get the resource group controller from local cache first. +// If the local cache misses, it will then call gRPC to fetch the resource group info from the remote server. +// If `useTombstone` is true, it will return the resource group controller even if it is marked as tombstone. +func (c *ResourceGroupsController) tryGetResourceGroupController( + ctx context.Context, name string, useTombstone bool, +) (*groupCostController, error) { // Get from the local cache first. gc, ok := c.loadGroupController(name) if ok { - // If the resource group is marked as tombstone, fallback to the default resource group. - if gc.tombstone.Load() && name != defaultResourceGroupName { - return c.tryGetResourceGroup(ctx, defaultResourceGroupName) + if !useTombstone && gc.tombstone.Load() { + return nil, NewResourceGroupNotExistErr(name) } return gc, nil } @@ -464,7 +477,7 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name return nil, err } if group == nil { - return nil, errors.Errorf("%s does not exists", name) + return nil, NewResourceGroupNotExistErr(name) } // Check again to prevent initializing the same resource group concurrently. if gc, ok = c.loadGroupController(name); ok { @@ -476,14 +489,51 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name return nil, err } // Check again to prevent initializing the same resource group concurrently. - gc, loaded := c.loadOrStoreGroupController(group.Name, gc) + gc, loaded := c.loadOrStoreGroupController(name, gc) if !loaded { resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1) - log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName())) + log.Info("[resource group controller] create resource group cost controller", zap.String("name", name)) } return gc, nil } +// Do not delete the resource group immediately to prevent from interrupting the ongoing request, +// mark it as tombstone and create a default resource group controller for it. +func (c *ResourceGroupsController) tombstoneGroupCostController(name string) { + _, ok := c.loadGroupController(name) + if !ok { + return + } + // The default resource group controller should never be deleted. + if name == defaultResourceGroupName { + return + } + // Try to get the default group meta first. + defaultGC, err := c.tryGetResourceGroupController(c.loopCtx, defaultResourceGroupName, false) + if err != nil || defaultGC == nil { + log.Warn("[resource group controller] get default resource group meta for tombstone failed", + zap.String("name", name), zap.Error(err)) + // Directly delete the resource group controller if the default group is not available. + c.groupsController.Delete(name) + return + } + // Create a default resource group controller for the tombstone resource group independently. + gc, err := newGroupCostController(defaultGC.getMeta(), c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + if err != nil { + log.Warn("[resource group controller] create default resource group cost controller for tombstone failed", + zap.String("name", name), zap.Error(err)) + // Directly delete the resource group controller if the default group controller cannot be created. + c.groupsController.Delete(name) + return + } + gc.tombstone.Store(true) + c.groupsController.Store(name, gc) + // Its metrics will be deleted in the cleanup process. + resourceGroupStatusGauge.WithLabelValues(name, name).Set(2) + log.Info("[resource group controller] default resource group controller cost created for tombstone", + zap.String("name", name)) +} + func (c *ResourceGroupsController) cleanUpResourceGroup() { c.groupsController.Range(func(key, value any) bool { resourceGroupName := key.(string) @@ -496,7 +546,6 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() { if gc.inactive || gc.tombstone.Load() { c.groupsController.Delete(resourceGroupName) resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName) - resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, defaultResourceGroupName) return true } gc.inactive = true @@ -589,7 +638,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, func (c *ResourceGroupsController) OnRequestWait( ctx context.Context, resourceGroupName string, info RequestInfo, ) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) { - gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) + gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, true) if err != nil { return nil, nil, time.Duration(0), 0, err } @@ -605,17 +654,13 @@ func (c *ResourceGroupsController) OnResponse( log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName)) return &rmpb.Consumption{}, nil } - // If the resource group is marked as tombstone, fallback to the default resource group. - if gc.tombstone.Load() && resourceGroupName != defaultResourceGroupName { - return c.OnResponse(defaultResourceGroupName, req, resp) - } return gc.onResponse(req, resp) } // IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool { - gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) + gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, false) if err != nil { return false } @@ -626,7 +671,7 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context, func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool { // fallback to default resource group. if bg == nil { - gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName) + gc, err := c.tryGetResourceGroupController(ctx, defaultResourceGroupName, false) if err != nil { return false } @@ -646,7 +691,7 @@ func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, // GetResourceGroup returns the meta setting of the given resource group name. func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error) { - gc, err := c.tryGetResourceGroup(c.loopCtx, resourceGroupName) + gc, err := c.tryGetResourceGroupController(c.loopCtx, resourceGroupName, false) if err != nil { return nil, err } diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index e198effb2d8..821364c292f 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -211,11 +211,11 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil) mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil) - c1, err := controller.tryGetResourceGroup(ctx, defaultResourceGroupName) + c1, err := controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false) re.NoError(err) re.Equal(defaultResourceGroup, c1.meta) - c2, err := controller.tryGetResourceGroup(ctx, "test-group") + c2, err := controller.tryGetResourceGroupController(ctx, "test-group", false) re.NoError(err) re.Equal(testResourceGroup, c2.meta) @@ -271,7 +271,7 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { } } -func TestGetController(t *testing.T) { +func TestTryGetController(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -286,39 +286,51 @@ func TestGetController(t *testing.T) { mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil) mockProvider.On("GetResourceGroup", mock.Anything, "test-group-non-existent", mock.Anything).Return((*rmpb.ResourceGroup)(nil), nil) - c, err := controller.GetResourceGroup("test-group-non-existent") + gc, err := controller.tryGetResourceGroupController(ctx, "test-group-non-existent", false) re.Error(err) - re.Nil(c) - c, err = controller.GetResourceGroup(defaultResourceGroupName) + re.Nil(gc) + gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false) re.NoError(err) - re.Equal(defaultResourceGroup, c) - c, err = controller.GetResourceGroup("test-group") + re.Equal(defaultResourceGroup, gc.getMeta()) + gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false) re.NoError(err) - re.Equal(testResourceGroup, c) - _, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{}) + re.Equal(testResourceGroup, gc.getMeta()) + requestInfo, responseInfo := NewTestRequestInfo(true, 1, 1), NewTestResponseInfo(1, time.Millisecond, true) + _, _, _, _, err = controller.OnRequestWait(ctx, "test-group", requestInfo) re.NoError(err) - _, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{}) + consumption, err := controller.OnResponse("test-group", requestInfo, responseInfo) re.NoError(err) + re.NotEmpty(consumption) // Mark the tombstone manually to test the fallback case. - gc, err := controller.tryGetResourceGroup(ctx, "test-group") + gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false) re.NoError(err) - gc.tombstone.Store(true) - c, err = controller.GetResourceGroup("test-group") + re.NotNil(gc) + controller.tombstoneGroupCostController("test-group") + gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false) + re.Error(err) + re.Nil(gc) + gc, err = controller.tryGetResourceGroupController(ctx, "test-group", true) + re.NoError(err) + re.Equal(defaultResourceGroup, gc.getMeta()) + _, _, _, _, err = controller.OnRequestWait(ctx, "test-group", requestInfo) re.NoError(err) - re.Equal(defaultResourceGroup, c) - _, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{}) + consumption, err = controller.OnResponse("test-group", requestInfo, responseInfo) re.NoError(err) - _, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{}) + re.NotEmpty(consumption) + // Test the default group protection. + gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false) re.NoError(err) - // Mark the default group tombstone manually to test the fallback case. - gc, err = controller.tryGetResourceGroup(ctx, defaultResourceGroupName) + re.Equal(defaultResourceGroup, gc.getMeta()) + controller.tombstoneGroupCostController(defaultResourceGroupName) + gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false) re.NoError(err) - gc.tombstone.Store(true) - c, err = controller.GetResourceGroup(defaultResourceGroupName) + re.Equal(defaultResourceGroup, gc.getMeta()) + gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, true) re.NoError(err) - re.Equal(defaultResourceGroup, c) - _, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, &TestRequestInfo{}) + re.Equal(defaultResourceGroup, gc.getMeta()) + _, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, requestInfo) re.NoError(err) - _, err = controller.OnResponse(defaultResourceGroupName, &TestRequestInfo{}, &TestResponseInfo{}) + consumption, err = controller.OnResponse(defaultResourceGroupName, requestInfo, responseInfo) re.NoError(err) + re.NotEmpty(consumption) } diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 635cb17b822..10b1a0b4520 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -403,9 +403,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { CPUMsCost: 1, } - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) - controller.Start(suite.ctx) - defer controller.Stop() + rgsController, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) + rgsController.Start(suite.ctx) + defer rgsController.Stop() testCases := []struct { resourceGroupName string @@ -445,13 +445,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { rres := cas.tcs[i].makeReadResponse() wres := cas.tcs[i].makeWriteResponse() startTime := time.Now() - _, _, _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) + _, _, _, _, err := rgsController.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) re.NoError(err) - _, _, _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) + _, _, _, _, err = rgsController.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) re.NoError(err) sum += time.Since(startTime) - controller.OnResponse(cas.resourceGroupName, rreq, rres) - controller.OnResponse(cas.resourceGroupName, wreq, wres) + rgsController.OnResponse(cas.resourceGroupName, rreq, rres) + rgsController.OnResponse(cas.resourceGroupName, wreq, wres) time.Sleep(1000 * time.Microsecond) } re.LessOrEqual(sum, buffDuration+cas.tcs[i].waitDuration) @@ -464,11 +464,11 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) + _, _, _, _, err = rgsController.OnRequestWait(suite.ctx, rg.Name, wreq) re.Error(err) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) - group, err := controller.GetResourceGroup(rg.Name) + group, err := rgsController.GetResourceGroup(rg.Name) re.NoError(err) re.Equal(rg, group) // Delete the resource group and make sure it is tombstone. @@ -476,19 +476,21 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re.NoError(err) re.Contains(resp, "Success!") // Make sure the resource group is watched by the controller and marked as tombstone. + expectedErr := controller.NewResourceGroupNotExistErr(rg.Name) testutil.Eventually(re, func() bool { - gc, err := controller.GetResourceGroup(rg.Name) - re.NoError(err) - return gc.GetName() == "default" + gc, err := rgsController.GetResourceGroup(rg.Name) + return err.Error() == expectedErr.Error() && gc == nil }, testutil.WithTickInterval(50*time.Millisecond)) // Add the resource group again. resp, err = cli.AddResourceGroup(suite.ctx, rg) re.NoError(err) re.Contains(resp, "Success!") - // Make sure the resource group can be set to active again. + // Make sure the resource group can be get by the controller again. testutil.Eventually(re, func() bool { - gc, err := controller.GetResourceGroup(rg.Name) - re.NoError(err) + gc, err := rgsController.GetResourceGroup(rg.Name) + if err != nil { + re.EqualError(err, expectedErr.Error()) + } return gc.GetName() == rg.Name }, testutil.WithTickInterval(50*time.Millisecond)) }