Skip to content

Commit

Permalink
address
Browse files Browse the repository at this point in the history
Signed-off-by: Shuning Chen <nolouch@ShuningdeMacBook-Pro.local>
  • Loading branch information
Shuning Chen authored and Shuning Chen committed Jul 2, 2024
1 parent a6958c9 commit 18fad8b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 23 deletions.
20 changes: 10 additions & 10 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ type ResourceGroupsController struct {
calculators []ResourceCalculator

// When a signal is received, it means the number of available token is low.
lowTokenNotifyChan chan NotifyMsg
lowTokenNotifyChan chan notifyMsg
// When a token bucket response received from server, it will be sent to the channel.
tokenResponseChan chan []*rmpb.TokenBucketResponse
// When the token bucket of a resource group is updated, it will be sent to the channel.
Expand Down Expand Up @@ -182,7 +182,7 @@ func NewResourceGroupController(
clientUniqueID: clientUniqueID,
provider: provider,
ruConfig: ruConfig,
lowTokenNotifyChan: make(chan NotifyMsg, 1),
lowTokenNotifyChan: make(chan notifyMsg, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen),
opts: opts,
Expand Down Expand Up @@ -288,7 +288,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */, NotifyMsg{})
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */, notifyMsg{})
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
Expand Down Expand Up @@ -509,7 +509,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
}
}

func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType, notifyMsg NotifyMsg) {
func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType, notifyMsg notifyMsg) {
c.run.currentRequests = make([]*rmpb.TokenBucketRequest, 0)
c.groupsController.Range(func(_, value any) bool {
gc := value.(*groupCostController)
Expand All @@ -525,7 +525,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
}
}

func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string, notifyMsg NotifyMsg) {
func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string, notifyMsg notifyMsg) {
now := time.Now()
req := &rmpb.TokenBucketsRequest{
Requests: requests,
Expand All @@ -550,8 +550,8 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
} else {
successfulTokenRequestDuration.Observe(latency.Seconds())
}
if !notifyMsg.StartTime.IsZero() && time.Since(notifyMsg.StartTime) > slowNotifyFilterDuration {
log.Warn("[resource group controller] slow token bucket request", zap.String("source", source), zap.Duration("cost", time.Since(notifyMsg.StartTime)))
if !notifyMsg.startTime.IsZero() && time.Since(notifyMsg.startTime) > slowNotifyFilterDuration {
log.Warn("[resource group controller] slow token bucket request", zap.String("source", source), zap.Duration("cost", time.Since(notifyMsg.startTime)))
}
logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency))
c.tokenResponseChan <- resp
Expand Down Expand Up @@ -648,7 +648,7 @@ type groupCostController struct {
// fast path to make once token limit with un-limit burst.
burstable *atomic.Bool

lowRUNotifyChan chan<- NotifyMsg
lowRUNotifyChan chan<- notifyMsg
tokenBucketUpdateChan chan<- *groupCostController

// run contains the state that is updated by the main loop.
Expand Down Expand Up @@ -738,7 +738,7 @@ type tokenCounter struct {
func newGroupCostController(
group *rmpb.ResourceGroup,
mainCfg *RUConfig,
lowRUNotifyChan chan NotifyMsg,
lowRUNotifyChan chan notifyMsg,
tokenBucketUpdateChan chan *groupCostController,
) (*groupCostController, error) {
switch group.Mode {
Expand Down Expand Up @@ -1286,7 +1286,7 @@ func (gc *groupCostController) onRequestWait(
sub(gc.mu.consumption, delta)
gc.mu.Unlock()
failpoint.Inject("triggerUpdate", func() {
gc.lowRUNotifyChan <- NotifyMsg{}
gc.lowRUNotifyChan <- notifyMsg{}
})
return nil, nil, waitDuration, 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController
JobTypes: []string{"lightning", "br"},
},
}
ch1 := make(chan NotifyMsg)
ch1 := make(chan notifyMsg)
ch2 := make(chan *groupCostController)
gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2)
re.NoError(err)
Expand Down
14 changes: 7 additions & 7 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Limiter struct {
// last is the last time the limiter's tokens field was updated
last time.Time
notifyThreshold float64
lowTokensNotifyChan chan<- NotifyMsg
lowTokensNotifyChan chan<- notifyMsg
// To prevent too many chan sent, the notifyThreshold is set to 0 after notify.
// So the notifyThreshold cannot show whether the limiter is in the low token state,
// isLowProcess is used to check it.
Expand All @@ -88,9 +88,9 @@ type Limiter struct {
metrics *limiterMetricsCollection
}

// NotifyMsg is a message to notify the low token state.
type NotifyMsg struct {
StartTime time.Time
// notifyMsg is a message to notify the low token state.
type notifyMsg struct {
startTime time.Time
}

// limiterMetricsCollection is a collection of metrics for a limiter.
Expand All @@ -107,7 +107,7 @@ func (lim *Limiter) Limit() Limit {

// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- NotifyMsg) *Limiter {
func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- notifyMsg) *Limiter {
lim := &Limiter{
limit: r,
last: now,
Expand All @@ -121,7 +121,7 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify

// NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- NotifyMsg) *Limiter {
func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- notifyMsg) *Limiter {
lim := &Limiter{
name: name,
limit: Limit(cfg.NewRate),
Expand Down Expand Up @@ -262,7 +262,7 @@ func (lim *Limiter) notify() {
lim.notifyThreshold = 0
lim.isLowProcess = true
select {
case lim.lowTokensNotifyChan <- NotifyMsg{StartTime: time.Now()}:
case lim.lowTokensNotifyChan <- notifyMsg{startTime: time.Now()}:
if lim.metrics != nil {
lim.metrics.lowTokenNotifyCounter.Inc()
}
Expand Down
10 changes: 5 additions & 5 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo
}

func TestSimpleReserve(t *testing.T) {
lim := NewLimiter(t0, 1, 0, 2, make(chan NotifyMsg, 1))
lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1))

runReserveMax(t, lim, request{t0, 3, t1, true})
runReserveMax(t, lim, request{t0, 3, t4, true})
Expand All @@ -103,7 +103,7 @@ func TestSimpleReserve(t *testing.T) {

func TestReconfig(t *testing.T) {
re := require.New(t)
lim := NewLimiter(t0, 1, 0, 2, make(chan NotifyMsg, 1))
lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1))

runReserveMax(t, lim, request{t0, 4, t2, true})
args := tokenBucketReconfigureArgs{
Expand All @@ -126,7 +126,7 @@ func TestReconfig(t *testing.T) {
}

func TestNotify(t *testing.T) {
nc := make(chan NotifyMsg, 1)
nc := make(chan notifyMsg, 1)
lim := NewLimiter(t0, 1, 0, 0, nc)

args := tokenBucketReconfigureArgs{
Expand All @@ -147,7 +147,7 @@ func TestCancel(t *testing.T) {
ctx := context.Background()
ctx1, cancel1 := context.WithDeadline(ctx, t2)
re := require.New(t)
nc := make(chan NotifyMsg, 1)
nc := make(chan notifyMsg, 1)
lim1 := NewLimiter(t0, 1, 0, 10, nc)
lim2 := NewLimiter(t0, 1, 0, 0, nc)

Expand Down Expand Up @@ -189,7 +189,7 @@ func TestCancel(t *testing.T) {

func TestCancelErrorOfReservation(t *testing.T) {
re := require.New(t)
nc := make(chan NotifyMsg, 1)
nc := make(chan notifyMsg, 1)
lim := NewLimiter(t0, 10, 0, 10, nc)
ctx, cancel := context.WithCancel(context.Background())
cancel()
Expand Down

0 comments on commit 18fad8b

Please sign in to comment.