Skip to content

Commit

Permalink
scheduler: do not remove the operator when the step does not finish
Browse files Browse the repository at this point in the history
Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

There is a bug introduced by #1652, in some case, like adding peers or
adding learners, the step is left unfinished if the peer is in pending
state, although the conf version has changed, in these cases, the
operator will be removed because the controller thought someone has
changed the conf version(in fact, it self did). We fix that by checking
if the conf version has actually changed by current step, if it is,
the operator is not regarded as stale.

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>
  • Loading branch information
shafreeck committed Aug 29, 2019
1 parent 14b91e8 commit d7e2f64
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 44 deletions.
86 changes: 48 additions & 38 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s u64Set) String() string {
// OpStep describes the basic scheduling steps that can not be subdivided.
type OpStep interface {
fmt.Stringer
ExpectConfVerChange() bool
ConfVerChanged(region *core.RegionInfo) bool
IsFinish(region *core.RegionInfo) bool
Influence(opInfluence OpInfluence, region *core.RegionInfo)
}
Expand All @@ -137,10 +137,9 @@ type TransferLeader struct {
FromStore, ToStore uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (tl TransferLeader) ExpectConfVerChange() bool {
return false
// ConfVerChanged returns true if the conf version has been changed by this step
func (tl TransferLeader) ConfVerChanged(region *core.RegionInfo) bool {
return false // transfer leader never change the conf version
}

func (tl TransferLeader) String() string {
Expand Down Expand Up @@ -168,10 +167,12 @@ type AddPeer struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (ap AddPeer) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (ap AddPeer) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(ap.ToStore); p != nil {
return p.GetId() == ap.PeerID
}
return false
}
func (ap AddPeer) String() string {
return fmt.Sprintf("add peer %v on store %v", ap.PeerID, ap.ToStore)
Expand Down Expand Up @@ -208,10 +209,12 @@ type AddLearner struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (al AddLearner) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (al AddLearner) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreLearner(al.ToStore); p != nil {
return p.GetId() == al.PeerID
}
return false
}

func (al AddLearner) String() string {
Expand Down Expand Up @@ -249,10 +252,12 @@ type PromoteLearner struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (pl PromoteLearner) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (pl PromoteLearner) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(pl.ToStore); p != nil {
return p.GetId() == pl.PeerID
}
return false
}

func (pl PromoteLearner) String() string {
Expand All @@ -278,10 +283,9 @@ type RemovePeer struct {
FromStore uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (rp RemovePeer) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (rp RemovePeer) ConfVerChanged(region *core.RegionInfo) bool {
return region.GetStorePeer(rp.FromStore) == nil
}

func (rp RemovePeer) String() string {
Expand Down Expand Up @@ -314,9 +318,8 @@ type MergeRegion struct {
IsPassive bool
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (mr MergeRegion) ExpectConfVerChange() bool {
// ConfVerChanged returns true if the conf version has been changed by this step
func (mr MergeRegion) ConfVerChanged(region *core.RegionInfo) bool {
return false
}

Expand Down Expand Up @@ -351,9 +354,8 @@ type SplitRegion struct {
Policy pdpb.CheckPolicy
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (sr SplitRegion) ExpectConfVerChange() bool {
// ConfVerChanged returns true if the conf version has been changed by this step
func (sr SplitRegion) ConfVerChanged(region *core.RegionInfo) bool {
return false
}

Expand Down Expand Up @@ -382,10 +384,12 @@ type AddLightPeer struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (ap AddLightPeer) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (ap AddLightPeer) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(ap.ToStore); p != nil {
return p.GetId() == ap.PeerID
}
return false
}

func (ap AddLightPeer) String() string {
Expand Down Expand Up @@ -417,10 +421,12 @@ type AddLightLearner struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (al AddLightLearner) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (al AddLightLearner) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreLearner(al.ToStore); p != nil {
return p.GetId() == al.PeerID
}
return false
}

func (al AddLightLearner) String() string {
Expand Down Expand Up @@ -582,11 +588,15 @@ func (o *Operator) Check(region *core.RegionInfo) OpStep {
}

// ConfVerChanged returns the number of confver has consumed by steps
func (o *Operator) ConfVerChanged() int {
func (o *Operator) ConfVerChanged(region *core.RegionInfo) int {
total := 0
current := atomic.LoadInt32(&o.currentStep)
for _, step := range o.steps[0:current] {
if step.ExpectConfVerChange() {
if current == int32(len(o.steps)) {
current--
}
// including current step, it may has taken effects in this heartbeat
for _, step := range o.steps[0 : current+1] {
if step.ConfVerChanged(region) {
total++
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
latest := region.GetRegionEpoch()
changes := latest.GetConfVer() - origin.GetConfVer()
if source == DispatchFromHeartBeat &&
changes > uint64(op.ConfVerChanged()) {
changes > uint64(op.ConfVerChanged(region)) {
log.Info("stale operator", zap.Uint64("region-id", region.GetID()),
zap.Reflect("operator", op), zap.Uint64("diff", changes))
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
Expand Down
74 changes: 69 additions & 5 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/pd/pkg/mock/mockcluster"
"github.com/pingcap/pd/pkg/mock/mockhbstream"
"github.com/pingcap/pd/pkg/mock/mockoption"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule/operator"
)

Expand Down Expand Up @@ -206,30 +207,93 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) {
&metapb.RegionEpoch{ConfVer: 0, Version: 0})

controller.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(), Equals, 0)
c.Assert(op.ConfVerChanged(region), Equals, 0)
c.Assert(len(stream.MsgCh()), Equals, 2)

// report the result of removing peer
region = cluster.MockRegionInfo(1, 2, []uint64{2},
&metapb.RegionEpoch{ConfVer: 0, Version: 0})

controller.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(), Equals, 1)
c.Assert(op.ConfVerChanged(region), Equals, 1)
c.Assert(len(stream.MsgCh()), Equals, 2)

// add and disaptch op again, the op should be stale
op = operator.NewOperator("test", "test", 1,
&metapb.RegionEpoch{ConfVer: 0, Version: 0},
operator.OpRegion, steps...)
c.Assert(controller.AddOperator(op), Equals, true)
c.Assert(op.ConfVerChanged(), Equals, 0)
c.Assert(op.ConfVerChanged(region), Equals, 0)
c.Assert(len(stream.MsgCh()), Equals, 3)

// report region with an abnormal confver
region = cluster.MockRegionInfo(1, 1, []uint64{1, 2},
&metapb.RegionEpoch{ConfVer: 1, Version: 0})
controller.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(), Equals, 0)
// no new step sended
c.Assert(op.ConfVerChanged(region), Equals, 0)
// no new step
c.Assert(len(stream.MsgCh()), Equals, 3)
}

func (t *testOperatorControllerSuite) TestDispatchUnfinishedStep(c *C) {
cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions())
stream := mockhbstream.NewHeartbeatStreams(cluster.ID)
controller := NewOperatorController(cluster, stream)

// Create a new region with epoch(0, 0)
// the region has two peers with its peer id allocated incremently.
// so the two peers are {peerid: 1, storeid: 1}, {peerid: 2, storeid: 2}
// The peer on store 1 is the leader
epoch := &metapb.RegionEpoch{ConfVer: 0, Version: 0}
region := cluster.MockRegionInfo(1, 1, []uint64{2}, epoch)
// Put region into cluster, otherwise, AddOperator will fail because of
// missing region
cluster.PutRegion(region)

// The next allocated peer should have peerid 3, so we add this peer
// to store 3
steps := []operator.OpStep{
operator.AddPeer{3, 3},
}

// Create an operator
op := operator.NewOperator("test", "test", 1, epoch,
operator.OpRegion, steps...)
c.Assert(controller.AddOperator(op), Equals, true)
c.Assert(len(stream.MsgCh()), Equals, 1)

// Create region2 witch is cloned from the original region.
// region2 has peer 3 in pending state, so the AddPeer step
// is left unfinished
region2 := region.Clone(
core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3}),
core.WithPendingPeers([]*metapb.Peer{
{Id: 3, StoreId: 3, IsLearner: false},
}),
core.WithIncConfVer(),
)
c.Assert(region2.GetPendingPeers(), NotNil)
c.Assert(steps[0].IsFinish(region2), Equals, false)
controller.Dispatch(region2, DispatchFromHeartBeat)

// In this case, the conf version has been changed, but the
// peer added is in peeding state, the operator should not be
// removed by the stale checker
c.Assert(op.ConfVerChanged(region2), Equals, 1)
c.Assert(controller.GetOperator(1), NotNil)
// The operator is valid yet, but the step should not be sent
// again, because it is in pending state, so the message channel
// should not be increased
c.Assert(len(stream.MsgCh()), Equals, 1)

// Finish the step by clearing the pending state
region3 := region.Clone(
core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3}),
core.WithIncConfVer(),
)
c.Assert(steps[0].IsFinish(region3), Equals, true)
controller.Dispatch(region3, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(region3), Equals, 1)
// The Operator has finished, so no message should be sent
c.Assert(len(stream.MsgCh()), Equals, 1)
}

0 comments on commit d7e2f64

Please sign in to comment.