Skip to content

Commit

Permalink
Merge branch 'master' into imporve-diag
Browse files Browse the repository at this point in the history
  • Loading branch information
CabinfeverB authored Dec 11, 2023
2 parents 2069df0 + 00674d0 commit 8f455c4
Show file tree
Hide file tree
Showing 21 changed files with 832 additions and 532 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func (o *PersistConfig) IsLocationReplacementEnabled() bool {
return o.getTTLBoolOr(sc.EnableLocationReplacement, o.GetScheduleConfig().EnableLocationReplacement)
}

// IsTikvRegionSplitEnabled returns whether tikv split region is disabled.
// IsTikvRegionSplitEnabled returns whether tikv split region is enabled.
func (o *PersistConfig) IsTikvRegionSplitEnabled() bool {
return o.getTTLBoolOr(sc.EnableTiKVSplitRegion, o.GetScheduleConfig().EnableTiKVSplitRegion)
}
Expand Down
16 changes: 3 additions & 13 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,16 @@ func (m *Participant) Client() *clientv3.Client {
// IsLeader returns whether the participant is the leader or not by checking its leadership's
// lease and leader info.
func (m *Participant) IsLeader() bool {
if m.GetLeader() == nil {
return false
}
return m.leadership.Check() && m.GetLeader().GetId() == m.member.GetId() && m.campaignCheck()
}

// IsLeaderElected returns true if the leader exists; otherwise false
func (m *Participant) IsLeaderElected() bool {
return m.GetLeader() != nil
return m.GetLeader().GetId() != 0
}

// GetLeaderListenUrls returns current leader's listen urls
func (m *Participant) GetLeaderListenUrls() []string {
if m.GetLeader() == nil {
return nil
}
return m.GetLeader().GetListenUrls()
}

Expand All @@ -149,13 +143,9 @@ func (m *Participant) GetLeaderID() uint64 {
func (m *Participant) GetLeader() participant {
leader := m.leader.Load()
if leader == nil {
return nil
}
member := leader.(participant)
if member.GetId() == 0 {
return nil
return NewParticipantByService(m.serviceName)
}
return member
return leader.(participant)
}

// setLeader sets the member's leader.
Expand Down
6 changes: 6 additions & 0 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ func (oc *Controller) GetFastOpInfluence(cluster *core.BasicCluster, influence O
}
}

// CleanAllOpRecords removes all operators' records.
// It is used in tests only.
func (oc *Controller) CleanAllOpRecords() {
oc.records.ttl.Clear()
}

// AddOpInfluence add operator influence for cluster
func AddOpInfluence(op *Operator, influence OpInfluence, cluster *core.BasicCluster) {
region := cluster.GetRegion(op.RegionID())
Expand Down
7 changes: 7 additions & 0 deletions pkg/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,10 @@ func (w *HotCache) GetThresholds(kind utils.RWType, storeID uint64) []float64 {
}
return nil
}

// CleanCache cleans the cache.
// This is used for test purpose.
func (w *HotCache) CleanCache() {
w.writeCache.removeAllItem()
w.readCache.removeAllItem()
}
12 changes: 12 additions & 0 deletions pkg/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,18 @@ func (f *hotPeerCache) removeItem(item *HotPeerStat) {
}
}

// removeAllItem removes all items of the cache.
// It is used for test.
func (f *hotPeerCache) removeAllItem() {
for _, peers := range f.peersOfStore {
for _, peer := range peers.GetAll() {
item := peer.(*HotPeerStat)
item.actionType = utils.Remove
f.updateStat(item)
}
}
}

func (f *hotPeerCache) coldItem(newItem, oldItem *HotPeerStat) {
newItem.HotDegree = oldItem.HotDegree - 1
newItem.AntiCount = oldItem.AntiCount - 1
Expand Down
11 changes: 10 additions & 1 deletion pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/server"
Expand Down Expand Up @@ -122,16 +123,24 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
// It will be helpful when matching the redirect rules "schedulers" or "schedulers/{name}"
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
for _, rule := range h.microserviceRedirectRules {
// Now we only support checking the scheduling service whether it is independent
if rule.targetServiceName == mcsutils.SchedulingServiceName {
if !h.s.IsServiceIndependent(mcsutils.SchedulingServiceName) {
continue
}
}
if strings.HasPrefix(r.URL.Path, rule.matchPath) &&
slice.Contains(rule.matchMethods, r.Method) {
if rule.filter != nil && !rule.filter(r) {
continue
}
// we check the service primary addr here, so no need to check independently again.
// we check the service primary addr here,
// if the service is not available, we will return ErrRedirect by returning an empty addr.
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when trying to match redirect rules",
zap.String("path", r.URL.Path))
return true, ""
}
// If the URL contains escaped characters, use RawPath instead of Path
origin := r.URL.Path
Expand Down
33 changes: 24 additions & 9 deletions pkg/window/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (
"github.com/stretchr/testify/require"
)

func GetRollingPolicy() *RollingPolicy {
w := NewWindow(Options{Size: 3})
return NewRollingPolicy(w, RollingPolicyOpts{BucketDuration: 100 * time.Millisecond})
const defaultBucketDuration = 100 * time.Millisecond
const defaultSize = 3

func getRollingPolicy() *RollingPolicy {
w := NewWindow(Options{Size: defaultSize})
return NewRollingPolicy(w, RollingPolicyOpts{BucketDuration: defaultBucketDuration})
}

func TestRollingPolicy_Add(t *testing.T) {
Expand All @@ -45,6 +48,7 @@ func TestRollingPolicy_Add(t *testing.T) {
points: []float64{1, 1},
},
{
// In CI, the actual sleep time may be more than 100 (timeSleep = 94).
timeSleep: []int{94, 250},
offset: []int{0, 0},
points: []float64{1, 1},
Expand All @@ -60,14 +64,25 @@ func TestRollingPolicy_Add(t *testing.T) {
t.Run("test policy add", func(t *testing.T) {
var totalTS, lastOffset int
timeSleep := test.timeSleep
policy := GetRollingPolicy()
beginTime := time.Now()
policy := getRollingPolicy()
points := make([]float64, defaultSize)
asExpected := true
for i, n := range timeSleep {
totalTS += n
time.Sleep(time.Duration(n) * time.Millisecond)
offset, point := test.offset[i], test.points[i]
point := test.points[i]
offset := int(time.Since(beginTime)/defaultBucketDuration) % defaultSize
points[i] += point
policy.Add(point)

re.Less(math.Abs(point-policy.window.buckets[offset].Points[0]), 1e-6,
if offset != test.offset[i] {
asExpected = false
}
if asExpected {
re.Less(math.Abs(point-policy.window.buckets[offset].Points[0]), 1e-6,
fmt.Sprintf("error, time since last append: %vms, last offset: %v", totalTS, lastOffset))
}
re.Less(math.Abs(points[i]-policy.window.buckets[offset].Points[0]), 1e-6,
fmt.Sprintf("error, time since last append: %vms, last offset: %v", totalTS, lastOffset))
lastOffset = offset
}
Expand All @@ -78,7 +93,7 @@ func TestRollingPolicy_Add(t *testing.T) {
func TestRollingPolicy_AddWithTimespan(t *testing.T) {
re := require.New(t)
t.Run("timespan < bucket number", func(t *testing.T) {
policy := GetRollingPolicy()
policy := getRollingPolicy()
// bucket 0
policy.Add(0)
// bucket 1
Expand All @@ -102,7 +117,7 @@ func TestRollingPolicy_AddWithTimespan(t *testing.T) {
})

t.Run("timespan > bucket number", func(t *testing.T) {
policy := GetRollingPolicy()
policy := getRollingPolicy()

// bucket 0
policy.Add(0)
Expand Down
2 changes: 1 addition & 1 deletion server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (o *PersistOptions) IsLocationReplacementEnabled() bool {
return o.getTTLBoolOr(sc.EnableLocationReplacement, o.GetScheduleConfig().EnableLocationReplacement)
}

// IsTikvRegionSplitEnabled returns whether tikv split region is disabled.
// IsTikvRegionSplitEnabled returns whether tikv split region is enabled.
func (o *PersistOptions) IsTikvRegionSplitEnabled() bool {
return o.getTTLBoolOr(sc.EnableTiKVSplitRegion, o.GetScheduleConfig().EnableTiKVSplitRegion)
}
Expand Down
Loading

0 comments on commit 8f455c4

Please sign in to comment.