Skip to content

Commit

Permalink
Merge branch 'master' into upgrade-prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jan 18, 2024
2 parents a582435 + da58fbe commit 27cd367
Show file tree
Hide file tree
Showing 36 changed files with 1,407 additions and 496 deletions.
3 changes: 1 addition & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey)

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down Expand Up @@ -249,7 +248,7 @@ func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID ui
putFn,
deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithRange(tsoServiceEndKey),
true, /* withPrefix */
)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func MakeRegionBound(id uint32) *RegionBound {
}
}

// makeKeyRanges encodes keyspace ID to correct LabelRule data.
func makeKeyRanges(id uint32) []interface{} {
// MakeKeyRanges encodes keyspace ID to correct LabelRule data.
func MakeKeyRanges(id uint32) []interface{} {
regionBound := MakeRegionBound(id)
return []interface{}{
map[string]interface{}{
Expand Down Expand Up @@ -207,7 +207,7 @@ func MakeLabelRule(id uint32) *labeler.LabelRule {
},
},
RuleType: labeler.KeyRange,
Data: makeKeyRanges(id),
Data: MakeKeyRanges(id),
}
}

Expand Down
14 changes: 12 additions & 2 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ func getOperatorByRegion(c *gin.Context) {

// @Tags operators
// @Summary List operators.
// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region, waiting)
// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region, waiting)
// @Param object query bool false "Whether to return as JSON object."
// @Produce json
// @Success 200 {array} operator.Operator
// @Failure 500 {string} string "PD server failed to proceed the request."
Expand All @@ -337,6 +338,7 @@ func getOperators(c *gin.Context) {
)

kinds := c.QueryArray("kind")
_, objectFlag := c.GetQuery("object")
if len(kinds) == 0 {
results, err = handler.GetOperators()
} else {
Expand All @@ -347,7 +349,15 @@ func getOperators(c *gin.Context) {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, results)
if objectFlag {
objResults := make([]*operator.OpObject, len(results))
for i, op := range results {
objResults[i] = op.ToJSONObject()
}
c.IndentedJSON(http.StatusOK, objResults)
} else {
c.IndentedJSON(http.StatusOK, results)
}
}

// @Tags operator
Expand Down
5 changes: 3 additions & 2 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (cw *Watcher) initializeConfigWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
false, /* withPrefix */
)
cw.configWatcher.StartWatchLoop()
return cw.configWatcher.WaitLoad()
Expand Down Expand Up @@ -176,7 +177,7 @@ func (cw *Watcher) initializeTTLConfigWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
true, /* withPrefix */
)
cw.ttlConfigWatcher.StartWatchLoop()
return cw.ttlConfigWatcher.WaitLoad()
Expand Down Expand Up @@ -217,7 +218,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
true, /* withPrefix */
)
cw.schedulerConfigWatcher.StartWatchLoop()
return cw.schedulerConfigWatcher.WaitLoad()
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (w *Watcher) initializeStoreWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
true, /* withPrefix */
)
w.storeWatcher.StartWatchLoop()
return w.storeWatcher.WaitLoad()
Expand Down
29 changes: 20 additions & 9 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (rw *Watcher) initializeRuleWatcher() error {
var suspectKeyRanges *core.KeyRanges

preEventsFn := func(events []*clientv3.Event) error {
// It will be locked until the postFn is finished.
// It will be locked until the postEventsFn is finished.
rw.ruleManager.Lock()
rw.patch = rw.ruleManager.BeginPatch()
suspectKeyRanges = &core.KeyRanges{}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (rw *Watcher) initializeRuleWatcher() error {
}
postEventsFn := func(events []*clientv3.Event) error {
defer rw.ruleManager.Unlock()
if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil {
if err := rw.ruleManager.TryCommitPatchLocked(rw.patch); err != nil {
log.Error("failed to commit patch", zap.Error(err))
return err
}
Expand All @@ -204,35 +204,46 @@ func (rw *Watcher) initializeRuleWatcher() error {
preEventsFn,
putFn, deleteFn,
postEventsFn,
clientv3.WithPrefix(),
true, /* withPrefix */
)
rw.ruleWatcher.StartWatchLoop()
return rw.ruleWatcher.WaitLoad()
}

func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
// TODO: use txn in region labeler.
preEventsFn := func(events []*clientv3.Event) error {
// It will be locked until the postEventsFn is finished.
rw.regionLabeler.Lock()
return nil
}
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
log.Debug("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := labeler.NewLabelRuleFromJSON(kv.Value)
if err != nil {
return err
}
return rw.regionLabeler.SetLabelRule(rule)
return rw.regionLabeler.SetLabelRuleLocked(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete region label rule", zap.String("key", key))
return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim))
return rw.regionLabeler.DeleteLabelRuleLocked(strings.TrimPrefix(key, prefixToTrim))
}
postEventsFn := func(events []*clientv3.Event) error {
defer rw.regionLabeler.Unlock()
rw.regionLabeler.BuildRangeListLocked()
return nil
}
rw.labelWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-region-label-watcher", rw.regionLabelPathPrefix,
func([]*clientv3.Event) error { return nil },
preEventsFn,
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
postEventsFn,
true, /* withPrefix */
)
rw.labelWatcher.StartWatchLoop()
return rw.labelWatcher.WaitLoad()
Expand Down
113 changes: 113 additions & 0 deletions pkg/mcs/scheduling/server/rule/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rule

import (
"context"
"encoding/json"
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)

const (
clusterID = uint64(20240117)
rulesNum = 16384
)

func TestLoadLargeRules(t *testing.T) {
re := require.New(t)
ctx, client, clean := prepare(t)
defer clean()
runWatcherLoadLabelRule(ctx, re, client)
}

func BenchmarkLoadLargeRules(b *testing.B) {
re := require.New(b)
ctx, client, clean := prepare(b)
defer clean()

b.ResetTimer() // Resets the timer to ignore initialization time in the benchmark

for n := 0; n < b.N; n++ {
runWatcherLoadLabelRule(ctx, re, client)
}
}

func runWatcherLoadLabelRule(ctx context.Context, re *require.Assertions, client *clientv3.Client) {
storage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
labelerManager, err := labeler.NewRegionLabeler(ctx, storage, time.Hour)
re.NoError(err)
ctx, cancel := context.WithCancel(ctx)
rw := &Watcher{
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: client,
ruleStorage: storage,
regionLabeler: labelerManager,
}
err = rw.initializeRegionLabelWatcher()
re.NoError(err)
re.Len(labelerManager.GetAllLabelRules(), rulesNum)
cancel()
}

func prepare(t require.TestingT) (context.Context, *clientv3.Client, func()) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
cfg := etcdutil.NewTestSingleConfig()
cfg.Dir = os.TempDir() + "/test_etcd"
os.RemoveAll(cfg.Dir)
etcd, err := embed.StartEtcd(cfg)
re.NoError(err)
client, err := etcdutil.CreateEtcdClient(nil, cfg.LCUrls)
re.NoError(err)
<-etcd.Server.ReadyNotify()

for i := 1; i < rulesNum+1; i++ {
rule := &labeler.LabelRule{
ID: "test_" + strconv.Itoa(i),
Labels: []labeler.RegionLabel{{Key: "test", Value: "test"}},
RuleType: labeler.KeyRange,
Data: keyspace.MakeKeyRanges(uint32(i)),
}
value, err := json.Marshal(rule)
re.NoError(err)
key := endpoint.RegionLabelPathPrefix(clusterID) + "/" + rule.ID
_, err = clientv3.NewKV(client).Put(ctx, key, string(value))
re.NoError(err)
}

return ctx, client, func() {
cancel()
client.Close()
etcd.Close()
os.RemoveAll(cfg.Dir)
}
}
4 changes: 2 additions & 2 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ func InitClient(s server) error {
if err != nil {
return err
}
etcdClient, httpClient, err := etcdutil.CreateClients(tlsConfig, backendUrls)
etcdClient, err := etcdutil.CreateEtcdClient(tlsConfig, backendUrls)
if err != nil {
return err
}
s.SetETCDClient(etcdClient)
s.SetHTTPClient(httpClient)
s.SetHTTPClient(etcdutil.CreateHTTPClient(tlsConfig))
return nil
}

Expand Down
33 changes: 25 additions & 8 deletions pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (l *RegionLabeler) checkAndClearExpiredLabels() {
}
}
if deleted {
l.buildRangeList()
l.BuildRangeListLocked()
}
}

Expand Down Expand Up @@ -128,11 +128,12 @@ func (l *RegionLabeler) loadRules() error {
return err
}
}
l.buildRangeList()
l.BuildRangeListLocked()
return nil
}

func (l *RegionLabeler) buildRangeList() {
// BuildRangeListLocked builds the range list.
func (l *RegionLabeler) BuildRangeListLocked() {
builder := rangelist.NewBuilder()
l.minExpire = nil
for _, rule := range l.labelRules {
Expand Down Expand Up @@ -206,31 +207,47 @@ func (l *RegionLabeler) getAndCheckRule(id string, now time.Time) *LabelRule {

// SetLabelRule inserts or updates a LabelRule.
func (l *RegionLabeler) SetLabelRule(rule *LabelRule) error {
l.Lock()
defer l.Unlock()
if err := l.SetLabelRuleLocked(rule); err != nil {
return err
}
l.BuildRangeListLocked()
return nil
}

// SetLabelRuleLocked inserts or updates a LabelRule but not buildRangeList.
func (l *RegionLabeler) SetLabelRuleLocked(rule *LabelRule) error {
if err := rule.checkAndAdjust(); err != nil {
return err
}
l.Lock()
defer l.Unlock()
if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil {
return err
}
l.labelRules[rule.ID] = rule
l.buildRangeList()
return nil
}

// DeleteLabelRule removes a LabelRule.
func (l *RegionLabeler) DeleteLabelRule(id string) error {
l.Lock()
defer l.Unlock()
if err := l.DeleteLabelRuleLocked(id); err != nil {
return err
}
l.BuildRangeListLocked()
return nil
}

// DeleteLabelRuleLocked removes a LabelRule but not buildRangeList.
func (l *RegionLabeler) DeleteLabelRuleLocked(id string) error {
if _, ok := l.labelRules[id]; !ok {
return errs.ErrRegionRuleNotFound.FastGenByArgs(id)
}
if err := l.storage.DeleteRegionRule(id); err != nil {
return err
}
delete(l.labelRules, id)
l.buildRangeList()
return nil
}

Expand Down Expand Up @@ -264,7 +281,7 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error {
for _, rule := range patch.SetRules {
l.labelRules[rule.ID] = rule
}
l.buildRangeList()
l.BuildRangeListLocked()
return nil
}

Expand Down
Loading

0 comments on commit 27cd367

Please sign in to comment.