Skip to content

Commit

Permalink
client/controller: wait for tokens to reduce the debet
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Jun 5, 2024
1 parent a674e66 commit 10a8e40
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error)
// OnResponse is used to consume tokens after receiving response.
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, time.Duration, error)
// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool
}
Expand Down Expand Up @@ -568,7 +568,7 @@ func (c *ResourceGroupsController) OnRequestWait(
// OnResponse is used to consume tokens after receiving response
func (c *ResourceGroupsController) OnResponse(
resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
) (*rmpb.Consumption, time.Duration, error) {
tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
Expand Down Expand Up @@ -1287,11 +1287,12 @@ func (gc *groupCostController) onRequestWait(

func (gc *groupCostController) onResponse(
req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
) (*rmpb.Consumption, time.Duration, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
var d time.Duration
if !gc.burstable.Load() {
switch gc.mode {
case rmpb.GroupMode_RawMode:
Expand All @@ -1303,9 +1304,25 @@ func (gc *groupCostController) onResponse(
case rmpb.GroupMode_RUMode:
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
counter.limiter.RemoveTokens(time.Now(), v)
var err error
now := time.Now()
for i := 0; i < gc.mainCfg.WaitRetryTimes; i++ {
res := counter.limiter.Reserve(context.Background(), gc.mainCfg.LTBMaxWaitDuration/2, now, v)
if d, err = WaitReservations(context.Background(), now, []*Reservation{res}); err != nil {
time.Sleep(gc.mainCfg.WaitRetryInterval)
d+=gc.mainCfg.WaitRetryInterval
continue
} else {
gc.successfulRequestDuration.Observe(d.Seconds())
break
}
}
if err != nil {
return delta, d, err
}
}
}

}
}

Expand All @@ -1323,7 +1340,7 @@ func (gc *groupCostController) onResponse(
add(gc.mu.globalCounter, count)
gc.mu.Unlock()

return delta, nil
return delta,d, nil,
}

// GetActiveResourceGroup is used to get action resource group.
Expand Down

0 comments on commit 10a8e40

Please sign in to comment.