Skip to content

Commit

Permalink
Merge branch 'master' into fix_run_race
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp authored Jul 2, 2024
2 parents 01be4c6 + 49f9b11 commit 326091d
Show file tree
Hide file tree
Showing 70 changed files with 910 additions and 305 deletions.
11 changes: 8 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ linters-settings:
severity: warning
disabled: false
exclude: [""]
errcheck:
exclude-functions:
- (*github.com/unrolled/render.Render).JSON
- (*github.com/unrolled/render.Render).Data
- (*github.com/unrolled/render.Render).Text
- (net/http.ResponseWriter).Write
- github.com/pingcap/log.Sync
- (github.com/tikv/pd/pkg/ratelimit.Runner).RunTask
issues:
exclude-rules:
- path: (_test\.go|pkg/mock/.*\.go|tests/.*\.go)
Expand All @@ -207,6 +215,3 @@ issues:
- path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump)
linters:
- errcheck
- path: (pkg/tso/admin.go|pkg/schedule/schedulers/split_bucket.go|server/api/plugin_disable.go|server/api/plugin_disable.go|server/api/operator.go|server/api/region.go|pkg/schedule/schedulers/balance_leader.go|pkg/replication/replication_mode.go|pkg/storage/endpoint/gc_safe_point.go|server/.*\.go|pkg/syncer/server.go)
linters:
- errcheck
43 changes: 25 additions & 18 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ import (
)

const (
controllerConfigPath = "resource_group/controller"
maxNotificationChanLen = 200
needTokensAmplification = 1.1
trickleReserveDuration = 1250 * time.Millisecond
controllerConfigPath = "resource_group/controller"
maxNotificationChanLen = 200
needTokensAmplification = 1.1
trickleReserveDuration = 1250 * time.Millisecond
slowNotifyFilterDuration = 10 * time.Millisecond

watchRetryInterval = 30 * time.Second
)
Expand Down Expand Up @@ -139,7 +140,7 @@ type ResourceGroupsController struct {
calculators []ResourceCalculator

// When a signal is received, it means the number of available token is low.
lowTokenNotifyChan chan struct{}
lowTokenNotifyChan chan notifyMsg
// When a token bucket response received from server, it will be sent to the channel.
tokenResponseChan chan []*rmpb.TokenBucketResponse
// When the token bucket of a resource group is updated, it will be sent to the channel.
Expand Down Expand Up @@ -181,7 +182,7 @@ func NewResourceGroupController(
clientUniqueID: clientUniqueID,
provider: provider,
ruConfig: ruConfig,
lowTokenNotifyChan: make(chan struct{}, 1),
lowTokenNotifyChan: make(chan notifyMsg, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen),
opts: opts,
Expand Down Expand Up @@ -287,7 +288,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */, notifyMsg{})
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
Expand Down Expand Up @@ -325,11 +326,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
c.handleTokenBucketResponse(resp)
}
c.run.currentRequests = nil
case <-c.lowTokenNotifyChan:
case notifyMsg := <-c.lowTokenNotifyChan:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */)
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
}
if c.run.inDegradedMode {
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
Expand Down Expand Up @@ -508,7 +509,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
}
}

func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType) {
func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType, notifyMsg notifyMsg) {
c.run.currentRequests = make([]*rmpb.TokenBucketRequest, 0)
c.groupsController.Range(func(_, value any) bool {
gc := value.(*groupCostController)
Expand All @@ -520,11 +521,11 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
return true
})
if len(c.run.currentRequests) > 0 {
c.sendTokenBucketRequests(ctx, c.run.currentRequests, source)
c.sendTokenBucketRequests(ctx, c.run.currentRequests, source, notifyMsg)
}
}

func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string) {
func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string, notifyMsg notifyMsg) {
now := time.Now()
req := &rmpb.TokenBucketsRequest{
Requests: requests,
Expand All @@ -542,13 +543,16 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
if err != nil {
// Don't log any errors caused by the stopper canceling the context.
if !errors.ErrorEqual(err, context.Canceled) {
log.L().Sugar().Infof("[resource group controller] token bucket rpc error: %v", err)
log.Error("[resource group controller] token bucket rpc error", zap.Error(err))
}
resp = nil
failedTokenRequestDuration.Observe(latency.Seconds())
} else {
successfulTokenRequestDuration.Observe(latency.Seconds())
}
if !notifyMsg.startTime.IsZero() && time.Since(notifyMsg.startTime) > slowNotifyFilterDuration {
log.Warn("[resource group controller] slow token bucket request", zap.String("source", source), zap.Duration("cost", time.Since(notifyMsg.startTime)))
}
logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency))
c.tokenResponseChan <- resp
}()
Expand Down Expand Up @@ -644,7 +648,7 @@ type groupCostController struct {
// fast path to make once token limit with un-limit burst.
burstable *atomic.Bool

lowRUNotifyChan chan<- struct{}
lowRUNotifyChan chan<- notifyMsg
tokenBucketUpdateChan chan<- *groupCostController

// run contains the state that is updated by the main loop.
Expand Down Expand Up @@ -734,7 +738,7 @@ type tokenCounter struct {
func newGroupCostController(
group *rmpb.ResourceGroup,
mainCfg *RUConfig,
lowRUNotifyChan chan struct{},
lowRUNotifyChan chan notifyMsg,
tokenBucketUpdateChan chan *groupCostController,
) (*groupCostController, error) {
switch group.Mode {
Expand Down Expand Up @@ -853,7 +857,7 @@ func (gc *groupCostController) updateRunState() {
}
*gc.run.consumption = *gc.mu.consumption
gc.mu.Unlock()
logControllerTrace("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption))
logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption))
gc.run.now = newTime
}

Expand Down Expand Up @@ -1053,7 +1057,7 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() {
cfg.NewRate = 99999999
})
counter.limiter.Reconfigure(gc.run.now, cfg, resetLowProcess())
log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("resource-group", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]))
log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]))
}
}

Expand Down Expand Up @@ -1107,6 +1111,9 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
timerDuration = (trickleDuration + trickleReserveDuration) / 2
}
counter.notify.mu.Lock()
if counter.notify.setupNotificationTimer != nil {
counter.notify.setupNotificationTimer.Stop()
}
counter.notify.setupNotificationTimer = time.NewTimer(timerDuration)
counter.notify.setupNotificationCh = counter.notify.setupNotificationTimer.C
counter.notify.setupNotificationThreshold = 1
Expand Down Expand Up @@ -1279,7 +1286,7 @@ func (gc *groupCostController) onRequestWait(
sub(gc.mu.consumption, delta)
gc.mu.Unlock()
failpoint.Inject("triggerUpdate", func() {
gc.lowRUNotifyChan <- struct{}{}
gc.lowRUNotifyChan <- notifyMsg{}
})
return nil, nil, waitDuration, 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController
JobTypes: []string{"lightning", "br"},
},
}
ch1 := make(chan struct{})
ch1 := make(chan notifyMsg)
ch2 := make(chan *groupCostController)
gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2)
re.NoError(err)
Expand Down
23 changes: 17 additions & 6 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Limiter struct {
// last is the last time the limiter's tokens field was updated
last time.Time
notifyThreshold float64
lowTokensNotifyChan chan<- struct{}
lowTokensNotifyChan chan<- notifyMsg
// To prevent too many chan sent, the notifyThreshold is set to 0 after notify.
// So the notifyThreshold cannot show whether the limiter is in the low token state,
// isLowProcess is used to check it.
Expand All @@ -88,6 +88,11 @@ type Limiter struct {
metrics *limiterMetricsCollection
}

// notifyMsg is a message to notify the low token state.
type notifyMsg struct {
startTime time.Time
}

// limiterMetricsCollection is a collection of metrics for a limiter.
type limiterMetricsCollection struct {
lowTokenNotifyCounter prometheus.Counter
Expand All @@ -102,7 +107,7 @@ func (lim *Limiter) Limit() Limit {

// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- struct{}) *Limiter {
func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- notifyMsg) *Limiter {
lim := &Limiter{
limit: r,
last: now,
Expand All @@ -116,7 +121,7 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify

// NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter {
func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- notifyMsg) *Limiter {
lim := &Limiter{
name: name,
limit: Limit(cfg.NewRate),
Expand Down Expand Up @@ -144,6 +149,7 @@ type Reservation struct {
// This is the Limit at reservation time, it can change later.
limit Limit
remainingTokens float64
err error
}

// OK returns whether the limiter can provide the requested number of tokens
Expand Down Expand Up @@ -218,7 +224,8 @@ func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now
select {
case <-ctx.Done():
return &Reservation{
ok: false,
ok: false,
err: ctx.Err(),
}
default:
}
Expand Down Expand Up @@ -255,7 +262,7 @@ func (lim *Limiter) notify() {
lim.notifyThreshold = 0
lim.isLowProcess = true
select {
case lim.lowTokensNotifyChan <- struct{}{}:
case lim.lowTokensNotifyChan <- notifyMsg{startTime: time.Now()}:
if lim.metrics != nil {
lim.metrics.lowTokenNotifyCounter.Inc()
}
Expand Down Expand Up @@ -414,7 +421,8 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
zap.Float64("notify-threshold", lim.notifyThreshold),
zap.Bool("is-low-process", lim.isLowProcess),
zap.Int64("burst", lim.burst),
zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
zap.Int("remaining-notify-times", lim.remainingNotifyTimes),
zap.String("name", lim.name))
}
lim.last = last
if lim.limit == 0 {
Expand Down Expand Up @@ -495,6 +503,9 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
for _, res := range reservations {
if !res.ok {
cancel()
if res.err != nil {
return res.needWaitDuration, res.err
}
return res.needWaitDuration, errs.ErrClientResourceGroupThrottled.FastGenByArgs(res.needWaitDuration, res.limit, res.remainingTokens)
}
delay := res.DelayFrom(now)
Expand Down
21 changes: 17 additions & 4 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo
}

func TestSimpleReserve(t *testing.T) {
lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1))
lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1))

runReserveMax(t, lim, request{t0, 3, t1, true})
runReserveMax(t, lim, request{t0, 3, t4, true})
Expand All @@ -103,7 +103,7 @@ func TestSimpleReserve(t *testing.T) {

func TestReconfig(t *testing.T) {
re := require.New(t)
lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1))
lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1))

runReserveMax(t, lim, request{t0, 4, t2, true})
args := tokenBucketReconfigureArgs{
Expand All @@ -126,7 +126,7 @@ func TestReconfig(t *testing.T) {
}

func TestNotify(t *testing.T) {
nc := make(chan struct{}, 1)
nc := make(chan notifyMsg, 1)
lim := NewLimiter(t0, 1, 0, 0, nc)

args := tokenBucketReconfigureArgs{
Expand All @@ -147,7 +147,7 @@ func TestCancel(t *testing.T) {
ctx := context.Background()
ctx1, cancel1 := context.WithDeadline(ctx, t2)
re := require.New(t)
nc := make(chan struct{}, 1)
nc := make(chan notifyMsg, 1)
lim1 := NewLimiter(t0, 1, 0, 10, nc)
lim2 := NewLimiter(t0, 1, 0, 0, nc)

Expand Down Expand Up @@ -186,3 +186,16 @@ func TestCancel(t *testing.T) {
checkTokens(re, lim1, t5, 15)
checkTokens(re, lim2, t5, 5)
}

func TestCancelErrorOfReservation(t *testing.T) {
re := require.New(t)
nc := make(chan notifyMsg, 1)
lim := NewLimiter(t0, 10, 0, 10, nc)
ctx, cancel := context.WithCancel(context.Background())
cancel()
r := lim.Reserve(ctx, InfDuration, t0, 5)
d, err := WaitReservations(context.Background(), t0, []*Reservation{r})
re.Equal(0*time.Second, d)
re.Error(err)
re.Contains(err.Error(), "context canceled")
}
8 changes: 6 additions & 2 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,12 @@ func start(cmd *cobra.Command, args []string, services ...string) {
schedulers.Register()
cfg := config.NewConfig()
flagSet := cmd.Flags()
flagSet.Parse(args)
err := cfg.Parse(flagSet)
err := flagSet.Parse(args)
if err != nil {
cmd.Println(err)
return
}
err = cfg.Parse(flagSet)
defer logutil.LogPanic()

if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
# key-path = ""
## A CN which must be provided by a client
# cert-allowed-cn = ["example.com"]
## Whether or not to enable redact log.
## Whether to enable the log redaction. It can be the following values:
## - false: disable redact log.
## - true: enable redact log, which will replace the sensitive information with "?".
## - "MARKER": enable redact log, which will use single guillemets ‹› to enclose the sensitive information.
# redact-info-log = false

[security.encryption]
Expand Down
8 changes: 4 additions & 4 deletions pkg/autoscaling/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@ func NewHTTPHandler(svr *server.Server, rd *render.Render) *HTTPHandler {
func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rc := h.svr.GetRaftCluster()
if rc == nil {
_ = h.rd.JSON(w, http.StatusInternalServerError, errs.ErrNotBootstrapped.FastGenByArgs().Error())
h.rd.JSON(w, http.StatusInternalServerError, errs.ErrNotBootstrapped.FastGenByArgs().Error())
return
}
data, err := io.ReadAll(r.Body)
r.Body.Close()
if err != nil {
_ = h.rd.JSON(w, http.StatusInternalServerError, err.Error())
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

strategy := Strategy{}
if err := json.Unmarshal(data, &strategy); err != nil {
_ = h.rd.JSON(w, http.StatusBadRequest, err.Error())
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

plan := calculate(rc, h.svr.GetPDServerConfig(), &strategy)
_ = h.rd.JSON(w, http.StatusOK, plan)
h.rd.JSON(w, http.StatusOK, plan)
}
Loading

0 comments on commit 326091d

Please sign in to comment.