Skip to content

Commit

Permalink
Optimize the fallback mechanism of the controller
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jul 16, 2024
1 parent ca179e6 commit bf07c02
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 65 deletions.
117 changes: 79 additions & 38 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,33 +357,38 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
if gc, ok := c.loadGroupController(group.Name); ok {
name := group.GetName()
gc, ok := c.loadGroupController(name)
if !ok {
continue
}
if !gc.tombstone.Load() {
gc.modifyMeta(group)
// If the resource group is marked as tombstone before, set it as active again.
if swapped := gc.tombstone.CompareAndSwap(true, false); swapped {
resourceGroupStatusGauge.WithLabelValues(group.Name, gc.name).Set(1)
log.Info("[resource group controller] mark resource group as active", zap.String("name", group.Name))
}
continue
}
// If the resource group is marked as tombstone before, re-create the resource group controller.
newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
log.Warn("[resource group controller] create resource group cost controller for tombstone failed",
zap.String("name", name), zap.Error(err))
continue
}
if c.groupsController.CompareAndSwap(name, gc, newGC) {
log.Info("[resource group controller] re-re-create resource group ntroller for tombstone",
zap.String("name", name))
}
case meta_storagepb.Event_DELETE:
if item.PrevKv != nil {
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
// Do not delete the resource group immediately, just mark it as tombstone.
// For the requests that are still in progress, fallback to the default resource group.
if gc, ok := c.loadGroupController(group.Name); ok {
gc.tombstone.Store(true)
resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name)
resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(1)
log.Info("[resource group controller] mark resource group as tombstone", zap.String("name", group.Name))
}
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
if item.PrevKv == nil {
log.Info("[resource group controller] previous key-value pair has been compacted",
zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
continue
}
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
c.tombstoneGroupCostController(group.GetName())
}
}
case resp, ok := <-watchConfigChannel:
Expand Down Expand Up @@ -446,15 +451,17 @@ func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *g
return tmp.(*groupCostController), loaded
}

// tryGetResourceGroup will try to get the resource group controller from local cache first,
// if the local cache misses, it will then call gRPC to fetch the resource group info from server.
func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) {
// tryGetResourceGroupController will try to get the resource group controller from local cache first.
// If the local cache misses, it will then call gRPC to fetch the resource group info from the remote server.
// If `useTombstone` is true, it will return the resource group controller even if it is marked as tombstone.
func (c *ResourceGroupsController) tryGetResourceGroupController(
ctx context.Context, name string, useTombstone bool,
) (*groupCostController, error) {
// Get from the local cache first.
gc, ok := c.loadGroupController(name)
if ok {
// If the resource group is marked as tombstone, fallback to the default resource group.
if gc.tombstone.Load() && name != defaultResourceGroupName {
return c.tryGetResourceGroup(ctx, defaultResourceGroupName)
if gc.tombstone.Load() && !useTombstone {
return nil, errors.Errorf("%s does not exists", name)
}
return gc, nil
}
Expand All @@ -464,7 +471,7 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name
return nil, err
}
if group == nil {
return nil, errors.Errorf("%s does not exists", name)
return nil, errors.Errorf("%s does not exist", name)
}
// Check again to prevent initializing the same resource group concurrently.
if gc, ok = c.loadGroupController(name); ok {
Expand All @@ -476,14 +483,52 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name
return nil, err
}
// Check again to prevent initializing the same resource group concurrently.
gc, loaded := c.loadOrStoreGroupController(group.Name, gc)
gc, loaded := c.loadOrStoreGroupController(name, gc)
if !loaded {
resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1)
log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName()))
log.Info("[resource group controller] create resource group cost controller", zap.String("name", name))
}
return gc, nil
}

// Do not delete the resource group immediately to prevent from interrupting the ongoing request,
// mark it as tombstone and create a default resource group controller for it.
func (c *ResourceGroupsController) tombstoneGroupCostController(name string) {
_, ok := c.loadGroupController(name)
if !ok {
return
}
resourceGroupStatusGauge.DeleteLabelValues(name, name)
// Skip if the deleted resource group is the default resource group.
// The default resource group controller should never be deleted.
if name == defaultResourceGroupName {
return
}
// Try to get the default group meta first.
defaultGC, err := c.tryGetResourceGroupController(c.loopCtx, defaultResourceGroupName, true)
if err != nil || defaultGC == nil {
log.Warn("[resource group controller] create default resource group controller for tombstone failed",
zap.String("name", name), zap.Error(err))
// Directly delete the resource group controller if the default group is not available.
c.groupsController.Delete(name)
return
}
// Create a default resource group controller for the tombstone resource group independently.
gc, err := newGroupCostController(defaultGC.getMeta(), c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
log.Warn("[resource group controller] create default resource group controller for tombstone failed",
zap.String("name", name), zap.Error(err))
// Directly delete the resource group controller if the default group controller cannot be created.
c.groupsController.Delete(name)
return
}
gc.tombstone.Store(true)
c.groupsController.Store(name, gc)
resourceGroupStatusGauge.WithLabelValues(name, name).Set(1)
log.Info("[resource group controller] default resource group controller created for tombstone",
zap.String("name", name))
}

func (c *ResourceGroupsController) cleanUpResourceGroup() {
c.groupsController.Range(func(key, value any) bool {
resourceGroupName := key.(string)
Expand Down Expand Up @@ -589,7 +634,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, true)
if err != nil {
return nil, nil, time.Duration(0), 0, err
}
Expand All @@ -605,17 +650,13 @@ func (c *ResourceGroupsController) OnResponse(
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
// If the resource group is marked as tombstone, fallback to the default resource group.
if gc.tombstone.Load() && resourceGroupName != defaultResourceGroupName {
return c.OnResponse(defaultResourceGroupName, req, resp)
}
return gc.onResponse(req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
resourceGroupName, requestResource string) bool {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, false)
if err != nil {
return false
}
Expand All @@ -626,7 +667,7 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool {
// fallback to default resource group.
if bg == nil {
gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
if err != nil {
return false
}
Expand All @@ -646,7 +687,7 @@ func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context,

// GetResourceGroup returns the meta setting of the given resource group name.
func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error) {
gc, err := c.tryGetResourceGroup(c.loopCtx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(c.loopCtx, resourceGroupName, false)
if err != nil {
return nil, err
}
Expand Down
57 changes: 34 additions & 23 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,11 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)

c1, err := controller.tryGetResourceGroup(ctx, defaultResourceGroupName)
c1, err := controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.NoError(err)
re.Equal(defaultResourceGroup, c1.meta)

c2, err := controller.tryGetResourceGroup(ctx, "test-group")
c2, err := controller.tryGetResourceGroupController(ctx, "test-group", false)
re.NoError(err)
re.Equal(testResourceGroup, c2.meta)

Expand Down Expand Up @@ -271,7 +271,7 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
}
}

func TestGetController(t *testing.T) {
func TestTryGetController(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -286,39 +286,50 @@ func TestGetController(t *testing.T) {
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group-non-existent", mock.Anything).Return((*rmpb.ResourceGroup)(nil), nil)

c, err := controller.GetResourceGroup("test-group-non-existent")
gc, err := controller.tryGetResourceGroupController(ctx, "test-group-non-existent", false)
re.Error(err)
re.Nil(c)
c, err = controller.GetResourceGroup(defaultResourceGroupName)
re.Nil(gc)
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
c, err = controller.GetResourceGroup("test-group")
re.Equal(defaultResourceGroup, gc.getMeta())
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false)
re.NoError(err)
re.Equal(testResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{})
re.Equal(testResourceGroup, gc.getMeta())
requestInfo, responseInfo := NewTestRequestInfo(true, 1, 1), NewTestResponseInfo(1, time.Millisecond, true)
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", requestInfo)
re.NoError(err)
_, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{})
consumption, err := controller.OnResponse("test-group", requestInfo, responseInfo)
re.NoError(err)
re.NotEmpty(consumption)
// Mark the tombstone manually to test the fallback case.
gc, err := controller.tryGetResourceGroup(ctx, "test-group")
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false)
re.NoError(err)
gc.tombstone.Store(true)
c, err = controller.GetResourceGroup("test-group")
re.NotNil(gc)
controller.tombstoneGroupCostController("test-group")
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false)
re.Error(err)
re.Nil(gc)
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", true)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{})
re.Equal(defaultResourceGroup, gc.getMeta())
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", requestInfo)
re.NoError(err)
_, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{})
consumption, err = controller.OnResponse("test-group", requestInfo, responseInfo)
re.NoError(err)
re.NotEmpty(consumption)
// Mark the default group tombstone manually to test the fallback case.
gc, err = controller.tryGetResourceGroup(ctx, defaultResourceGroupName)
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.NoError(err)
gc.tombstone.Store(true)
c, err = controller.GetResourceGroup(defaultResourceGroupName)
re.NotNil(gc)
controller.tombstoneGroupCostController(defaultResourceGroupName)
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.Error(err)
re.Nil(gc)
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, true)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, &TestRequestInfo{})
re.Equal(defaultResourceGroup, gc.getMeta())
_, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, requestInfo)
re.NoError(err)
_, err = controller.OnResponse(defaultResourceGroupName, &TestRequestInfo{}, &TestResponseInfo{})
_, err = controller.OnResponse(defaultResourceGroupName, requestInfo, responseInfo)
re.NoError(err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,17 +478,18 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
// Make sure the resource group is watched by the controller and marked as tombstone.
testutil.Eventually(re, func() bool {
gc, err := controller.GetResourceGroup(rg.Name)
re.NoError(err)
return gc.GetName() == "default"
return err.Error() == fmt.Sprintf("%s does not exist", rg.Name) && gc == nil
}, testutil.WithTickInterval(50*time.Millisecond))
// Add the resource group again.
resp, err = cli.AddResourceGroup(suite.ctx, rg)
re.NoError(err)
re.Contains(resp, "Success!")
// Make sure the resource group can be set to active again.
// Make sure the resource group can be get by the controller again.
testutil.Eventually(re, func() bool {
gc, err := controller.GetResourceGroup(rg.Name)
re.NoError(err)
if err != nil {
re.ErrorContains(err, fmt.Sprintf("%s does not exist", rg.Name))
}
return gc.GetName() == rg.Name
}, testutil.WithTickInterval(50*time.Millisecond))
}
Expand Down

0 comments on commit bf07c02

Please sign in to comment.