Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/pd into sche-redirect11
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 committed Nov 21, 2023
2 parents 07e96f0 + 8919bc1 commit d2d30bc
Show file tree
Hide file tree
Showing 35 changed files with 468 additions and 143 deletions.
15 changes: 14 additions & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
accelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
Expand All @@ -45,7 +45,10 @@ const (
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
PlacementRuleBundle = "/pd/api/v1/config/placement-rule"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
RegionLabelRules = "/pd/api/v1/config/region-label/rules"
RegionLabelRulesByIDs = "/pd/api/v1/config/region-label/rules/ids"
// Scheduler
Schedulers = "/pd/api/v1/schedulers"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-"
Expand Down Expand Up @@ -123,6 +126,16 @@ func PlacementRuleByGroupAndID(group, id string) string {
return fmt.Sprintf("%s/%s/%s", PlacementRule, group, id)
}

// PlacementRuleBundleByGroup returns the path of PD HTTP API to get placement rule bundle by group.
func PlacementRuleBundleByGroup(group string) string {
return fmt.Sprintf("%s/%s", PlacementRuleBundle, group)
}

// PlacementRuleBundleWithPartialParameter returns the path of PD HTTP API to get placement rule bundle with partial parameter.
func PlacementRuleBundleWithPartialParameter(partial bool) string {
return fmt.Sprintf("%s?partial=%t", PlacementRuleBundle, partial)
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)
Expand Down
153 changes: 137 additions & 16 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (

// Client is a PD (Placement Driver) HTTP client.
type Client interface {
/* Meta-related interfaces */
GetRegionByID(context.Context, uint64) (*RegionInfo, error)
GetRegionByKey(context.Context, []byte) (*RegionInfo, error)
GetRegions(context.Context) (*RegionsInfo, error)
Expand All @@ -51,11 +52,28 @@ type Client interface {
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetRegionStatusByKeyRange(context.Context, []byte, []byte) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
/* Rule-related interfaces */
GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error)
GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error)
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error)
SetPlacementRule(context.Context, *Rule) error
SetPlacementRuleBundles(context.Context, []*GroupBundle, bool) error
DeletePlacementRule(context.Context, string, string) error
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetAllRegionLabelRules(context.Context) ([]*LabelRule, error)
GetRegionLabelRulesByIDs(context.Context, []string) ([]*LabelRule, error)
SetRegionLabelRule(context.Context, *LabelRule) error
PatchRegionLabelRules(context.Context, *LabelRulePatch) error
/* Scheduling-related interfaces */
AccelerateSchedule(context.Context, []byte, []byte) error
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

/* Client-related methods */
// WithRespHandler sets and returns a new client with the given HTTP response handler.
// This allows the caller to customize how the response is handled, including error handling logic.
// Additionally, it is important for the caller to handle the content of the response body properly
// in order to ensure that it can be read and marshaled correctly into `res`.
WithRespHandler(func(resp *http.Response, res interface{}) error) Client
Close()
}

Expand All @@ -66,6 +84,8 @@ type client struct {
tlsConf *tls.Config
cli *http.Client

respHandler func(resp *http.Response, res interface{}) error

requestCounter *prometheus.CounterVec
executionDuration *prometheus.HistogramVec
}
Expand Down Expand Up @@ -143,6 +163,15 @@ func (c *client) Close() {
log.Info("[pd] http client closed")
}

// WithRespHandler sets and returns a new client with the given HTTP response handler.
func (c *client) WithRespHandler(
handler func(resp *http.Response, res interface{}) error,
) Client {
newClient := *c
newClient.respHandler = handler
return &newClient
}

func (c *client) reqCounter(name, status string) {
if c.requestCounter == nil {
return
Expand Down Expand Up @@ -204,6 +233,12 @@ func (c *client) request(
}
c.execDuration(name, time.Since(start))
c.reqCounter(name, resp.Status)

// Give away the response handling to the caller if the handler is set.
if c.respHandler != nil {
return c.respHandler(resp, res)
}

defer func() {
err = resp.Body.Close()
if err != nil {
Expand Down Expand Up @@ -345,6 +380,30 @@ func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) {
return &stores, nil
}

// GetAllPlacementRuleBundles gets all placement rules bundles.
func (c *client) GetAllPlacementRuleBundles(ctx context.Context) ([]*GroupBundle, error) {
var bundles []*GroupBundle
err := c.requestWithRetry(ctx,
"GetPlacementRuleBundle", PlacementRuleBundle,
http.MethodGet, nil, &bundles)
if err != nil {
return nil, err
}
return bundles, nil
}

// GetPlacementRuleBundleByGroup gets the placement rules bundle by group.
func (c *client) GetPlacementRuleBundleByGroup(ctx context.Context, group string) (*GroupBundle, error) {
var bundle GroupBundle
err := c.requestWithRetry(ctx,
"GetPlacementRuleBundleByGroup", PlacementRuleBundleByGroup(group),
http.MethodGet, nil, &bundle)
if err != nil {
return nil, err
}
return &bundle, nil
}

// GetPlacementRulesByGroup gets the placement rules by group.
func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([]*Rule, error) {
var rules []*Rule
Expand All @@ -368,13 +427,90 @@ func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error {
http.MethodPost, bytes.NewBuffer(ruleJSON), nil)
}

// SetPlacementRuleBundles sets the placement rule bundles.
// If `partial` is false, all old configurations will be over-written and dropped.
func (c *client) SetPlacementRuleBundles(ctx context.Context, bundles []*GroupBundle, partial bool) error {
bundlesJSON, err := json.Marshal(bundles)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetPlacementRuleBundles", PlacementRuleBundleWithPartialParameter(partial),
http.MethodPost, bytes.NewBuffer(bundlesJSON), nil)
}

// DeletePlacementRule deletes the placement rule.
func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error {
return c.requestWithRetry(ctx,
"DeletePlacementRule", PlacementRuleByGroupAndID(group, id),
http.MethodDelete, nil, nil)
}

// GetAllRegionLabelRules gets all region label rules.
func (c *client) GetAllRegionLabelRules(ctx context.Context) ([]*LabelRule, error) {
var labelRules []*LabelRule
err := c.requestWithRetry(ctx,
"GetAllRegionLabelRules", RegionLabelRules,
http.MethodGet, nil, &labelRules)
if err != nil {
return nil, err
}
return labelRules, nil
}

// GetRegionLabelRulesByIDs gets the region label rules by IDs.
func (c *client) GetRegionLabelRulesByIDs(ctx context.Context, ruleIDs []string) ([]*LabelRule, error) {
idsJSON, err := json.Marshal(ruleIDs)
if err != nil {
return nil, errors.Trace(err)
}
var labelRules []*LabelRule
err = c.requestWithRetry(ctx,
"GetRegionLabelRulesByIDs", RegionLabelRulesByIDs,
http.MethodGet, bytes.NewBuffer(idsJSON), &labelRules)
if err != nil {
return nil, err
}
return labelRules, nil
}

// SetRegionLabelRule sets the region label rule.
func (c *client) SetRegionLabelRule(ctx context.Context, labelRule *LabelRule) error {
labelRuleJSON, err := json.Marshal(labelRule)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetRegionLabelRule", RegionLabelRule,
http.MethodPost, bytes.NewBuffer(labelRuleJSON), nil)
}

// PatchRegionLabelRules patches the region label rules.
func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *LabelRulePatch) error {
labelRulePatchJSON, err := json.Marshal(labelRulePatch)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"PatchRegionLabelRules", RegionLabelRules,
http.MethodPatch, bytes.NewBuffer(labelRulePatchJSON), nil)
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", AccelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs.
func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
uri := MinResolvedTSPrefix
Expand Down Expand Up @@ -406,18 +542,3 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
}
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", accelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}
31 changes: 31 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,34 @@ type Rule struct {
Version uint64 `json:"version,omitempty"` // only set at runtime, add 1 each time rules updated, begin from 0.
CreateTimestamp uint64 `json:"create_timestamp,omitempty"` // only set at runtime, recorded rule create timestamp
}

// GroupBundle represents a rule group and all rules belong to the group.
type GroupBundle struct {
ID string `json:"group_id"`
Index int `json:"group_index"`
Override bool `json:"group_override"`
Rules []*Rule `json:"rules"`
}

// RegionLabel is the label of a region.
type RegionLabel struct {
Key string `json:"key"`
Value string `json:"value"`
TTL string `json:"ttl,omitempty"`
StartAt string `json:"start_at,omitempty"`
}

// LabelRule is the rule to assign labels to a region.
type LabelRule struct {
ID string `json:"id"`
Index int `json:"index"`
Labels []RegionLabel `json:"labels"`
RuleType string `json:"rule_type"`
Data interface{} `json:"data"`
}

// LabelRulePatch is the patch to update the label rules.
type LabelRulePatch struct {
SetRules []*LabelRule `json:"sets"`
DeleteRules []string `json:"deletes"`
}
4 changes: 3 additions & 1 deletion client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ func (bo *BackOffer) Exec(
fn func() error,
) error {
if err := fn(); err != nil {
after := time.NewTimer(bo.nextInterval())
defer after.Stop()
select {
case <-ctx.Done():
case <-time.After(bo.nextInterval()):
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
Expand Down
20 changes: 20 additions & 0 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,23 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption {
store.lastAwakenTime = lastAwaken
}
}

// SetStoreMeta sets the meta for the store.
func SetStoreMeta(newMeta *metapb.Store) StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.Version = newMeta.GetVersion()
meta.GitHash = newMeta.GetGitHash()
meta.Address = newMeta.GetAddress()
meta.StatusAddress = newMeta.GetStatusAddress()
meta.PeerAddress = newMeta.GetPeerAddress()
meta.StartTimestamp = newMeta.GetStartTimestamp()
meta.DeployPath = newMeta.GetDeployPath()
meta.LastHeartbeat = newMeta.GetLastHeartbeat()
meta.State = newMeta.GetState()
meta.Labels = newMeta.GetLabels()
meta.NodeState = newMeta.GetNodeState()
meta.PhysicallyDestroyed = newMeta.GetPhysicallyDestroyed()
store.meta = meta
}
}
19 changes: 1 addition & 18 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
return errors.Errorf("store %v not found", storeID)
}

nowTime := time.Now()
newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime))
newStore := store.Clone(core.SetStoreStats(stats))

if store := c.GetStore(storeID); store != nil {
statistics.UpdateStoreHeartbeatMetrics(store)
Expand Down Expand Up @@ -486,10 +485,6 @@ func (c *Cluster) collectMetrics() {

c.coordinator.GetSchedulersController().CollectSchedulerMetrics()
c.coordinator.CollectHotSpotMetrics()
c.collectClusterMetrics()
}

func (c *Cluster) collectClusterMetrics() {
if c.regionStats == nil {
return
}
Expand All @@ -501,20 +496,8 @@ func (c *Cluster) collectClusterMetrics() {

func (c *Cluster) resetMetrics() {
statistics.Reset()

schedulers.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
c.resetClusterMetrics()
}

func (c *Cluster) resetClusterMetrics() {
if c.regionStats == nil {
return
}
c.regionStats.Reset()
c.labelStats.Reset()
// reset hot cache metrics
c.hotStat.ResetMetrics()
}

// StartBackgroundJobs starts background jobs.
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
log.Info("update scheduler config", zap.String("name", string(kv.Value)))
log.Info("update scheduler config", zap.String("name", name),
zap.String("value", string(kv.Value)))
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (w *Watcher) initializeStoreWatcher() error {
w.basicCluster.PutStore(core.NewStoreInfo(store))
return nil
}
w.basicCluster.PutStore(origin.Clone(core.SetStoreState(store.GetState(), store.GetPhysicallyDestroyed())))
w.basicCluster.PutStore(origin.Clone(core.SetStoreMeta(store)))
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
Expand Down
Loading

0 comments on commit d2d30bc

Please sign in to comment.