Skip to content

Commit

Permalink
rg/controller: optimize the fallback mechanism of the controller (#8402)
Browse files Browse the repository at this point in the history
close #8388

This PR implements a more robust fallback method, which uses an independent default group controller for each tombstone group.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JmPotato and ti-chi-bot[bot] committed Jul 17, 2024
1 parent ecaef02 commit 5ec6af4
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 78 deletions.
123 changes: 84 additions & 39 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,33 +357,38 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
if gc, ok := c.loadGroupController(group.Name); ok {
name := group.GetName()
gc, ok := c.loadGroupController(name)
if !ok {
continue
}
if !gc.tombstone.Load() {
gc.modifyMeta(group)
// If the resource group is marked as tombstone before, set it as active again.
if swapped := gc.tombstone.CompareAndSwap(true, false); swapped {
resourceGroupStatusGauge.WithLabelValues(group.Name, gc.name).Set(1)
log.Info("[resource group controller] mark resource group as active", zap.String("name", group.Name))
}
continue
}
// If the resource group is marked as tombstone before, re-create the resource group controller.
newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
log.Warn("[resource group controller] re-create resource group cost controller for tombstone failed",
zap.String("name", name), zap.Error(err))
continue
}
if c.groupsController.CompareAndSwap(name, gc, newGC) {
log.Info("[resource group controller] re-create resource group cost controller for tombstone",
zap.String("name", name))
}
case meta_storagepb.Event_DELETE:
if item.PrevKv != nil {
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
// Do not delete the resource group immediately, just mark it as tombstone.
// For the requests that are still in progress, fallback to the default resource group.
if gc, ok := c.loadGroupController(group.Name); ok {
gc.tombstone.Store(true)
resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name)
resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(1)
log.Info("[resource group controller] mark resource group as tombstone", zap.String("name", group.Name))
}
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
if item.PrevKv == nil {
log.Info("[resource group controller] previous key-value pair has been compacted",
zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
continue
}
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
c.tombstoneGroupCostController(group.GetName())
}
}
case resp, ok := <-watchConfigChannel:
Expand Down Expand Up @@ -446,15 +451,23 @@ func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *g
return tmp.(*groupCostController), loaded
}

// tryGetResourceGroup will try to get the resource group controller from local cache first,
// if the local cache misses, it will then call gRPC to fetch the resource group info from server.
func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) {
// NewResourceGroupNotExistErr returns a new error that indicates the resource group does not exist.
// It's exported for testing.
func NewResourceGroupNotExistErr(name string) error {
return errors.Errorf("%s does not exist", name)
}

// tryGetResourceGroupController will try to get the resource group controller from local cache first.
// If the local cache misses, it will then call gRPC to fetch the resource group info from the remote server.
// If `useTombstone` is true, it will return the resource group controller even if it is marked as tombstone.
func (c *ResourceGroupsController) tryGetResourceGroupController(
ctx context.Context, name string, useTombstone bool,
) (*groupCostController, error) {
// Get from the local cache first.
gc, ok := c.loadGroupController(name)
if ok {
// If the resource group is marked as tombstone, fallback to the default resource group.
if gc.tombstone.Load() && name != defaultResourceGroupName {
return c.tryGetResourceGroup(ctx, defaultResourceGroupName)
if !useTombstone && gc.tombstone.Load() {
return nil, NewResourceGroupNotExistErr(name)
}
return gc, nil
}
Expand All @@ -464,7 +477,7 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name
return nil, err
}
if group == nil {
return nil, errors.Errorf("%s does not exists", name)
return nil, NewResourceGroupNotExistErr(name)
}
// Check again to prevent initializing the same resource group concurrently.
if gc, ok = c.loadGroupController(name); ok {
Expand All @@ -476,14 +489,51 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name
return nil, err
}
// Check again to prevent initializing the same resource group concurrently.
gc, loaded := c.loadOrStoreGroupController(group.Name, gc)
gc, loaded := c.loadOrStoreGroupController(name, gc)
if !loaded {
resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1)
log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName()))
log.Info("[resource group controller] create resource group cost controller", zap.String("name", name))
}
return gc, nil
}

// Do not delete the resource group immediately to prevent from interrupting the ongoing request,
// mark it as tombstone and create a default resource group controller for it.
func (c *ResourceGroupsController) tombstoneGroupCostController(name string) {
_, ok := c.loadGroupController(name)
if !ok {
return
}
// The default resource group controller should never be deleted.
if name == defaultResourceGroupName {
return
}
// Try to get the default group meta first.
defaultGC, err := c.tryGetResourceGroupController(c.loopCtx, defaultResourceGroupName, false)
if err != nil || defaultGC == nil {
log.Warn("[resource group controller] get default resource group meta for tombstone failed",
zap.String("name", name), zap.Error(err))
// Directly delete the resource group controller if the default group is not available.
c.groupsController.Delete(name)
return
}
// Create a default resource group controller for the tombstone resource group independently.
gc, err := newGroupCostController(defaultGC.getMeta(), c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
log.Warn("[resource group controller] create default resource group cost controller for tombstone failed",
zap.String("name", name), zap.Error(err))
// Directly delete the resource group controller if the default group controller cannot be created.
c.groupsController.Delete(name)
return
}
gc.tombstone.Store(true)
c.groupsController.Store(name, gc)
// Its metrics will be deleted in the cleanup process.
resourceGroupStatusGauge.WithLabelValues(name, name).Set(2)
log.Info("[resource group controller] default resource group controller cost created for tombstone",
zap.String("name", name))
}

func (c *ResourceGroupsController) cleanUpResourceGroup() {
c.groupsController.Range(func(key, value any) bool {
resourceGroupName := key.(string)
Expand All @@ -496,7 +546,6 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() {
if gc.inactive || gc.tombstone.Load() {
c.groupsController.Delete(resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, defaultResourceGroupName)
return true
}
gc.inactive = true
Expand Down Expand Up @@ -589,7 +638,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, true)
if err != nil {
return nil, nil, time.Duration(0), 0, err
}
Expand All @@ -605,17 +654,13 @@ func (c *ResourceGroupsController) OnResponse(
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
// If the resource group is marked as tombstone, fallback to the default resource group.
if gc.tombstone.Load() && resourceGroupName != defaultResourceGroupName {
return c.OnResponse(defaultResourceGroupName, req, resp)
}
return gc.onResponse(req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
resourceGroupName, requestResource string) bool {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, false)
if err != nil {
return false
}
Expand All @@ -626,7 +671,7 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool {
// fallback to default resource group.
if bg == nil {
gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
if err != nil {
return false
}
Expand All @@ -646,7 +691,7 @@ func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context,

// GetResourceGroup returns the meta setting of the given resource group name.
func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error) {
gc, err := c.tryGetResourceGroup(c.loopCtx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(c.loopCtx, resourceGroupName, false)
if err != nil {
return nil, err
}
Expand Down
60 changes: 36 additions & 24 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,11 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)

c1, err := controller.tryGetResourceGroup(ctx, defaultResourceGroupName)
c1, err := controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.NoError(err)
re.Equal(defaultResourceGroup, c1.meta)

c2, err := controller.tryGetResourceGroup(ctx, "test-group")
c2, err := controller.tryGetResourceGroupController(ctx, "test-group", false)
re.NoError(err)
re.Equal(testResourceGroup, c2.meta)

Expand Down Expand Up @@ -271,7 +271,7 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
}
}

func TestGetController(t *testing.T) {
func TestTryGetController(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -286,39 +286,51 @@ func TestGetController(t *testing.T) {
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group-non-existent", mock.Anything).Return((*rmpb.ResourceGroup)(nil), nil)

c, err := controller.GetResourceGroup("test-group-non-existent")
gc, err := controller.tryGetResourceGroupController(ctx, "test-group-non-existent", false)
re.Error(err)
re.Nil(c)
c, err = controller.GetResourceGroup(defaultResourceGroupName)
re.Nil(gc)
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
c, err = controller.GetResourceGroup("test-group")
re.Equal(defaultResourceGroup, gc.getMeta())
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false)
re.NoError(err)
re.Equal(testResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{})
re.Equal(testResourceGroup, gc.getMeta())
requestInfo, responseInfo := NewTestRequestInfo(true, 1, 1), NewTestResponseInfo(1, time.Millisecond, true)
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", requestInfo)
re.NoError(err)
_, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{})
consumption, err := controller.OnResponse("test-group", requestInfo, responseInfo)
re.NoError(err)
re.NotEmpty(consumption)
// Mark the tombstone manually to test the fallback case.
gc, err := controller.tryGetResourceGroup(ctx, "test-group")
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false)
re.NoError(err)
gc.tombstone.Store(true)
c, err = controller.GetResourceGroup("test-group")
re.NotNil(gc)
controller.tombstoneGroupCostController("test-group")
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false)
re.Error(err)
re.Nil(gc)
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", true)
re.NoError(err)
re.Equal(defaultResourceGroup, gc.getMeta())
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", requestInfo)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{})
consumption, err = controller.OnResponse("test-group", requestInfo, responseInfo)
re.NoError(err)
_, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{})
re.NotEmpty(consumption)
// Test the default group protection.
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.NoError(err)
// Mark the default group tombstone manually to test the fallback case.
gc, err = controller.tryGetResourceGroup(ctx, defaultResourceGroupName)
re.Equal(defaultResourceGroup, gc.getMeta())
controller.tombstoneGroupCostController(defaultResourceGroupName)
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.NoError(err)
gc.tombstone.Store(true)
c, err = controller.GetResourceGroup(defaultResourceGroupName)
re.Equal(defaultResourceGroup, gc.getMeta())
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, true)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, &TestRequestInfo{})
re.Equal(defaultResourceGroup, gc.getMeta())
_, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, requestInfo)
re.NoError(err)
_, err = controller.OnResponse(defaultResourceGroupName, &TestRequestInfo{}, &TestResponseInfo{})
consumption, err = controller.OnResponse(defaultResourceGroupName, requestInfo, responseInfo)
re.NoError(err)
re.NotEmpty(consumption)
}
32 changes: 17 additions & 15 deletions tests/integrations/mcs/resourcemanager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
CPUMsCost: 1,
}

controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg)
controller.Start(suite.ctx)
defer controller.Stop()
rgsController, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg)
rgsController.Start(suite.ctx)
defer rgsController.Stop()

testCases := []struct {
resourceGroupName string
Expand Down Expand Up @@ -445,13 +445,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, _, _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
_, _, _, _, err := rgsController.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
re.NoError(err)
_, _, _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
_, _, _, _, err = rgsController.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(cas.resourceGroupName, rreq, rres)
controller.OnResponse(cas.resourceGroupName, wreq, wres)
rgsController.OnResponse(cas.resourceGroupName, rreq, rres)
rgsController.OnResponse(cas.resourceGroupName, wreq, wres)
time.Sleep(1000 * time.Microsecond)
}
re.LessOrEqual(sum, buffDuration+cas.tcs[i].waitDuration)
Expand All @@ -464,31 +464,33 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)"))
tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0}
wreq := tcs.makeWriteRequest()
_, _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq)
_, _, _, _, err = rgsController.OnRequestWait(suite.ctx, rg.Name, wreq)
re.Error(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate"))

group, err := controller.GetResourceGroup(rg.Name)
group, err := rgsController.GetResourceGroup(rg.Name)
re.NoError(err)
re.Equal(rg, group)
// Delete the resource group and make sure it is tombstone.
resp, err = cli.DeleteResourceGroup(suite.ctx, rg.Name)
re.NoError(err)
re.Contains(resp, "Success!")
// Make sure the resource group is watched by the controller and marked as tombstone.
expectedErr := controller.NewResourceGroupNotExistErr(rg.Name)
testutil.Eventually(re, func() bool {
gc, err := controller.GetResourceGroup(rg.Name)
re.NoError(err)
return gc.GetName() == "default"
gc, err := rgsController.GetResourceGroup(rg.Name)
return err.Error() == expectedErr.Error() && gc == nil
}, testutil.WithTickInterval(50*time.Millisecond))
// Add the resource group again.
resp, err = cli.AddResourceGroup(suite.ctx, rg)
re.NoError(err)
re.Contains(resp, "Success!")
// Make sure the resource group can be set to active again.
// Make sure the resource group can be get by the controller again.
testutil.Eventually(re, func() bool {
gc, err := controller.GetResourceGroup(rg.Name)
re.NoError(err)
gc, err := rgsController.GetResourceGroup(rg.Name)
if err != nil {
re.EqualError(err, expectedErr.Error())
}
return gc.GetName() == rg.Name
}, testutil.WithTickInterval(50*time.Millisecond))
}
Expand Down

0 comments on commit 5ec6af4

Please sign in to comment.