Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_manager/client: introduce watch resource group #6510

Merged
merged 10 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
Expand Down
31 changes: 22 additions & 9 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
)

// MetaStorageClient is the interface for meta storage client.
Expand Down Expand Up @@ -125,7 +125,12 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (
PrevKv: options.prevKv,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.metaStorageClient().Put(ctx, req)
cli := c.metaStorageClient()
if cli == nil {
cancel()
return nil, errs.ErrClientGetMetaStorageClient
}
resp, err := cli.Put(ctx, req)
cancel()

if err = c.respForMetaStorageErr(cmdFailedDurationPut, start, err, resp.GetHeader()); err != nil {
Expand Down Expand Up @@ -158,7 +163,12 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s
Revision: options.revision,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.metaStorageClient().Get(ctx, req)
cli := c.metaStorageClient()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please help take a look here, thx. @rleungx @binshi-bing

if cli == nil {
cancel()
return nil, errs.ErrClientGetMetaStorageClient
}
resp, err := cli.Get(ctx, req)
cancel()

if err = c.respForMetaStorageErr(cmdFailedDurationGet, start, err, resp.GetHeader()); err != nil {
Expand All @@ -177,7 +187,11 @@ func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan
options.rangeEnd = getPrefix(key)
}

res, err := c.metaStorageClient().Watch(ctx, &meta_storagepb.WatchRequest{
cli := c.metaStorageClient()
if cli == nil {
return nil, errs.ErrClientGetMetaStorageClient
}
res, err := cli.Watch(ctx, &meta_storagepb.WatchRequest{
Key: key,
RangeEnd: options.rangeEnd,
StartRevision: options.revision,
Expand All @@ -190,13 +204,12 @@ func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan
go func() {
defer func() {
close(eventCh)
if r := recover(); r != nil {
log.Error("[pd] panic in client `Watch`", zap.Any("error", r))
return
}
}()
for {
resp, err := res.Recv()
failpoint.Inject("watchStreamError", func() {
err = errors.Errorf("fake error")
})
if err != nil {
return
}
Expand Down
139 changes: 98 additions & 41 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"sync/atomic"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/meta_storagepb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -39,6 +41,8 @@ const (
maxNotificationChanLen = 200
needTokensAmplification = 1.1
trickleReserveDuration = 1250 * time.Millisecond

watchRetryInterval = 30 * time.Second
)

type selectType int
Expand All @@ -58,13 +62,14 @@ type ResourceGroupKVInterceptor interface {

// ResourceGroupProvider provides some api to interact with resource manager server。
type ResourceGroupProvider interface {
ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error)
GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error)
AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error)
LoadResourcrGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resourcr -> Resource

Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep consistent with ResourceGroupClient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we can add the Watch into ResourceGroupClient interface?

}

// ResourceControlCreateOption create a ResourceGroupsController with the optional settings.
Expand Down Expand Up @@ -203,6 +208,17 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
stateUpdateTicker = time.NewTicker(time.Millisecond * 100)
})

_, revision, err := c.provider.LoadResourcrGroups(ctx)
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))
}
watchChannel, err := c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
watchRetryTimer := time.NewTimer(watchRetryInterval)
if err == nil {
watchRetryTimer.Stop()
}
defer watchRetryTimer.Stop()

for {
select {
case <-c.loopCtx.Done():
Expand All @@ -219,9 +235,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
c.run.currentRequests = nil
case <-cleanupTicker.C:
if err := c.cleanUpResourceGroup(c.loopCtx); err != nil {
log.Error("[resource group controller] clean up resource groups failed", zap.Error(err))
}
c.cleanUpResourceGroup()
case <-stateUpdateTicker.C:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
Expand All @@ -239,6 +253,41 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
case resp, ok := <-watchChannel:
if !ok {
watchChannel = nil
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
continue
}
for _, item := range resp {
revision = item.Kv.ModRevision
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
switch item.Type {
case meta_storagepb.Event_PUT:
if item, ok := c.groupsController.Load(group.Name); ok {
gc := item.(*groupCostController)
gc.modifyMeta(group)
}
case meta_storagepb.Event_DELETE:
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
resourceGroupStatusGauge.DeleteLabelValues(group.Name)
}
}
}
case <-watchRetryTimer.C:
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
if err != nil {
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
}
case gc := <-c.tokenBucketUpdateChan:
now := gc.run.now
go gc.handleTokenBucketUpdateEvent(c.loopCtx, now)
Expand Down Expand Up @@ -289,23 +338,9 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name
return tmp.(*groupCostController), nil
}

func (c *ResourceGroupsController) cleanUpResourceGroup(ctx context.Context) error {
groups, err := c.provider.ListResourceGroups(ctx)
if err != nil {
return errs.ErrClientListResourceGroup.FastGenByArgs(err.Error())
}
latestGroups := make(map[string]struct{})
for _, group := range groups {
latestGroups[group.GetName()] = struct{}{}
}
func (c *ResourceGroupsController) cleanUpResourceGroup() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

help add a GetResourceGrouup() by the way

c.groupsController.Range(func(key, value any) bool {
resourceGroupName := key.(string)
if _, ok := latestGroups[resourceGroupName]; !ok {
c.groupsController.Delete(key)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName)
return true
}

gc := value.(*groupCostController)
// Check for stale resource groups, which will be deleted when consumption is continuously unchanged.
gc.mu.Lock()
Expand All @@ -323,7 +358,6 @@ func (c *ResourceGroupsController) cleanUpResourceGroup(ctx context.Context) err
}
return true
})
return nil
}

func (c *ResourceGroupsController) executeOnAllGroups(f func(controller *groupCostController)) {
Expand Down Expand Up @@ -427,11 +461,15 @@ func (c *ResourceGroupsController) OnResponse(
}

type groupCostController struct {
*rmpb.ResourceGroup
mainCfg *Config
calculators []ResourceCalculator
mode rmpb.GroupMode

// invariant attributes
name string
mode rmpb.GroupMode
mainCfg *Config
// meta info
meta *rmpb.ResourceGroup
metaLock sync.RWMutex

calculators []ResourceCalculator
handleRespFunc func(*rmpb.TokenBucketResponse)

successfulRequestDuration prometheus.Observer
Expand Down Expand Up @@ -527,8 +565,10 @@ func newGroupCostController(
return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type")
}
gc := &groupCostController{
ResourceGroup: group,
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name),
Expand All @@ -537,7 +577,6 @@ func newGroupCostController(
newKVCalculator(mainCfg),
newSQLCalculator(mainCfg),
},
mode: group.GetMode(),
tokenBucketUpdateChan: tokenBucketUpdateChan,
lowRUNotifyChan: lowRUNotifyChan,
burstable: &atomic.Bool{},
Expand Down Expand Up @@ -582,35 +621,37 @@ func (gc *groupCostController) initRunState() {
return cfg
}

gc.metaLock.RLock()
defer gc.metaLock.RUnlock()
switch gc.mode {
case rmpb.GroupMode_RUMode:
gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter)
for typ := range requestUnitLimitTypeList {
tb := getRUTokenBucketSetting(gc.ResourceGroup, typ)
tb := getRUTokenBucketSetting(gc.meta, typ)
cfg := cfgFunc(tb)
limiter := NewLimiterWithCfg(now, cfg, gc.lowRUNotifyChan)
counter := &tokenCounter{
limiter: limiter,
avgRUPerSec: 0,
avgLastTime: now,
getTokenBucketFunc: func() *rmpb.TokenBucket {
return getRUTokenBucketSetting(gc.ResourceGroup, typ)
return getRUTokenBucketSetting(gc.meta, typ)
},
}
gc.run.requestUnitTokens[typ] = counter
}
case rmpb.GroupMode_RawMode:
gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter)
for typ := range requestResourceLimitTypeList {
tb := getRawResourceTokenBucketSetting(gc.ResourceGroup, typ)
tb := getRawResourceTokenBucketSetting(gc.meta, typ)
cfg := cfgFunc(tb)
limiter := NewLimiterWithCfg(now, cfg, gc.lowRUNotifyChan)
counter := &tokenCounter{
limiter: limiter,
avgRUPerSec: 0,
avgLastTime: now,
getTokenBucketFunc: func() *rmpb.TokenBucket {
return getRawResourceTokenBucketSetting(gc.ResourceGroup, typ)
return getRawResourceTokenBucketSetting(gc.meta, typ)
},
}
gc.run.resourceTokens[typ] = counter
Expand Down Expand Up @@ -718,7 +759,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() {
if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))

Do we need to keep consistent with this https://github.com/tikv/pd/pull/6450/files#diff-ed72d626078622ff5c8e9b3f7d1876c4b3d6c648a4bc833785ca312da718aa07R319

}
gc.burstable.Store(isBurstable)
}
Expand All @@ -732,7 +773,7 @@ func (gc *groupCostController) updateAvgRUPerSec() {
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
gc.burstable.Store(isBurstable)
}
Expand Down Expand Up @@ -771,7 +812,7 @@ func (gc *groupCostController) shouldReportConsumption() bool {
if timeSinceLastRequest >= extendedReportingPeriodFactor*defaultTargetPeriod {
return true
}
switch gc.Mode {
switch gc.mode {
case rmpb.GroupMode_RUMode:
for typ := range requestUnitLimitTypeList {
if getRUValueFromConsumption(gc.run.consumption, typ)-getRUValueFromConsumption(gc.run.lastRequestConsumption, typ) >= consumptionsReportingThreshold {
Expand Down Expand Up @@ -837,7 +878,7 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() {
cfg.NewRate = 99999999
})
counter.limiter.Reconfigure(gc.run.now, cfg, resetLowProcess())
log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("resource group", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]))
log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("resource group", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("resource group", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]))
log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("resource-group", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]))

}
}

Expand Down Expand Up @@ -918,7 +959,7 @@ func initCounterNotify(counter *tokenCounter) {

func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType) *rmpb.TokenBucketRequest {
req := &rmpb.TokenBucketRequest{
ResourceGroupName: gc.ResourceGroup.GetName(),
ResourceGroupName: gc.name,
}
// collect request resource
selected := gc.run.requestInProgress
Expand Down Expand Up @@ -985,6 +1026,19 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
return req
}

// This is used for test only.
func (gc *groupCostController) getMeta() *rmpb.ResourceGroup {
gc.metaLock.Lock()
defer gc.metaLock.Unlock()
return gc.meta
}

func (gc *groupCostController) modifyMeta(newMeta *rmpb.ResourceGroup) {
gc.metaLock.Lock()
defer gc.metaLock.Unlock()
gc.meta = newMeta
}

func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {
// `needTokensAmplification` is used to properly amplify a need. The reason is that in the current implementation,
// the token returned from the server determines the average consumption speed.
Expand Down Expand Up @@ -1110,11 +1164,14 @@ func (gc *groupCostController) onResponse(
return delta, nil
}

// CheckResourceGroupExist checks if groupsController map {rg.name -> resource group controller}
// contains name. Used for test only.
func (c *ResourceGroupsController) CheckResourceGroupExist(name string) bool {
_, ok := c.groupsController.Load(name)
return ok
// GetActiveResourceGroup is used to get action resource group.
// This is used for test only.
func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup {
tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
return nil
}
return tmp.(*groupCostController).getMeta()
}

// This is used for test only.
Expand Down
Loading