Skip to content

Commit

Permalink
schedule: fix a thread-safe bug and improve code (tikv#1719) (tikv#1734)
Browse files Browse the repository at this point in the history
  • Loading branch information
Luffbee authored and sre-bot committed Sep 11, 2019
1 parent 0945672 commit e07854f
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 57 deletions.
23 changes: 0 additions & 23 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,28 +634,6 @@ func (c *RaftCluster) RemoveTombStoneRecords() error {
return nil
}

func (c *RaftCluster) checkOperators() {
opController := c.coordinator.opController
for _, op := range opController.GetOperators() {
// after region is merged, it will not heartbeat anymore
// the operator of merged region will not timeout actively
if c.cachedCluster.GetRegion(op.RegionID()) == nil {
log.Debug("remove operator cause region is merged",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
opController.RemoveOperator(op)
continue
}

if op.IsTimeout() {
log.Info("operator timeout",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
opController.RemoveTimeoutOperator(op)
}
}
}

func (c *RaftCluster) collectMetrics() {
cluster := c.cachedCluster
statsMap := statistics.NewStoreStatisticsMap(c.cachedCluster.opt, c.GetNamespaceClassifier())
Expand Down Expand Up @@ -699,7 +677,6 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) {
log.Info("background jobs has been stopped")
return
case <-ticker.C:
c.checkOperators()
c.checkStores()
c.collectMetrics()
c.coordinator.opController.PruneHistory()
Expand Down
16 changes: 8 additions & 8 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(0))

// Remove the operator manually, then we can add a new operator.
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
oc.AddWaitingOperator(op2)
c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(1))
c.Assert(oc.GetOperator(1).RegionID(), Equals, op2.RegionID())
Expand Down Expand Up @@ -752,7 +752,7 @@ func (s *testOperatorControllerSuite) TestOperatorCount(c *C) {
op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), schedule.OpLeader)
oc.AddWaitingOperator(op2)
c.Assert(oc.OperatorCount(schedule.OpLeader), Equals, uint64(2)) // 1:leader, 2:leader
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
c.Assert(oc.OperatorCount(schedule.OpLeader), Equals, uint64(1)) // 2:leader

op1 = newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpRegion)
Expand Down Expand Up @@ -787,7 +787,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
for i := 0; i < 10; i++ {
c.Assert(lb.Schedule(tc), IsNil)
}
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
time.Sleep(1 * time.Second)
for i := 0; i < 100; i++ {
c.Assert(lb.Schedule(tc), NotNil)
Expand Down Expand Up @@ -876,7 +876,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
c.Assert(oc.AddWaitingOperator(op2), IsTrue)
// count = 2
c.Assert(sc.AllowSchedule(), IsFalse)
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
// count = 1
c.Assert(sc.AllowSchedule(), IsTrue)

Expand All @@ -887,7 +887,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
c.Assert(sc.AllowSchedule(), IsFalse)
c.Assert(oc.AddWaitingOperator(op3), IsTrue)
c.Assert(sc.AllowSchedule(), IsTrue)
oc.RemoveOperator(op3)
c.Assert(oc.RemoveOperator(op3), IsTrue)

// add a admin operator will remove old operator
c.Assert(oc.AddWaitingOperator(op2), IsTrue)
Expand All @@ -896,14 +896,14 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
op4.SetPriorityLevel(core.HighPriority)
c.Assert(oc.AddWaitingOperator(op4), IsTrue)
c.Assert(sc.AllowSchedule(), IsTrue)
oc.RemoveOperator(op4)
c.Assert(oc.RemoveOperator(op4), IsTrue)

// test wrong region id.
op5 := newTestOperator(3, &metapb.RegionEpoch{}, schedule.OpHotRegion)
c.Assert(oc.AddWaitingOperator(op5), IsFalse)

// test wrong region epoch.
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
epoch := &metapb.RegionEpoch{
Version: tc.GetRegion(1).GetRegionEpoch().GetVersion() + 1,
ConfVer: tc.GetRegion(1).GetRegionEpoch().GetConfVer(),
Expand All @@ -913,7 +913,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
epoch.Version--
op6 = newTestOperator(1, epoch, schedule.OpLeader)
c.Assert(oc.AddWaitingOperator(op6), IsTrue)
oc.RemoveOperator(op6)
c.Assert(oc.RemoveOperator(op6), IsTrue)
}

func (s *testScheduleControllerSuite) TestInterval(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (h *Handler) RemoveOperator(regionID uint64) error {
return ErrOperatorNotFound
}

c.opController.RemoveOperator(op)
_ = c.opController.RemoveOperator(op)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions server/schedule/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (s *testMergeCheckerSuite) TestStorelimit(c *C) {
for i := 0; i < 50; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
c.Assert(oc.RemoveOperator(op), IsTrue)
}
}
s.regions[2] = s.regions[2].Clone(
Expand All @@ -309,7 +309,7 @@ func (s *testMergeCheckerSuite) TestStorelimit(c *C) {
for i := 0; i < 5; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
c.Assert(oc.RemoveOperator(op), IsTrue)
}
}
c.Assert(oc.AddOperator(ops...), IsFalse)
Expand Down
45 changes: 25 additions & 20 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/juju/ratelimit"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/eraftpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -89,23 +90,25 @@ func NewOperatorController(cluster Cluster, hbStreams HeartbeatStreams) *Operato
func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
// Check existed operator.
if op := oc.GetOperator(region.GetID()); op != nil {
failpoint.Inject("concurrentRemoveOperator", func() {
time.Sleep(500 * time.Millisecond)
})
timeout := op.IsTimeout()
if step := op.Check(region); step != nil && !timeout {
operatorCounter.WithLabelValues(op.Desc(), "check").Inc()
oc.SendScheduleCommand(region, step, source)
return
}
if op.IsFinish() {
if op.IsFinish() && oc.RemoveOperator(op) {
log.Info("operator finish", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op))
operatorCounter.WithLabelValues(op.Desc(), "finish").Inc()
operatorDuration.WithLabelValues(op.Desc()).Observe(op.RunningTime().Seconds())
oc.pushHistory(op)
oc.opRecords.Put(op, pdpb.OperatorStatus_SUCCESS)
oc.RemoveOperator(op)
oc.PromoteWaitingOperator()
} else if timeout {
} else if timeout && oc.RemoveOperator(op) {
log.Info("operator timeout", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op))
oc.RemoveTimeoutOperator(op)
operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_TIMEOUT)
oc.PromoteWaitingOperator()
}
Expand Down Expand Up @@ -138,11 +141,17 @@ func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next
}
r = oc.cluster.GetRegion(regionID)
if r == nil {
_ = oc.removeOperatorLocked(op)
log.Debug("remove operator because region disappeared",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
operatorCounter.WithLabelValues(op.Desc(), "disappear").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL)
return nil, true
}
step := op.Check(r)
if step == nil {
return nil, true
return r, true
}
now := time.Now()
if now.Before(item.time) {
Expand Down Expand Up @@ -286,10 +295,10 @@ func (oc *OperatorController) addOperatorLocked(op *Operator) bool {
// If there is an old operator, replace it. The priority should be checked
// already.
if old, ok := oc.operators[regionID]; ok {
_ = oc.removeOperatorLocked(old)
log.Info("replace old operator", zap.Uint64("region-id", regionID), zap.Reflect("operator", old))
operatorCounter.WithLabelValues(old.Desc(), "replaced").Inc()
oc.opRecords.Put(old, pdpb.OperatorStatus_REPLACE)
oc.removeOperatorLocked(old)
}

oc.operators[regionID] = op
Expand Down Expand Up @@ -320,18 +329,10 @@ func (oc *OperatorController) addOperatorLocked(op *Operator) bool {
}

// RemoveOperator removes a operator from the running operators.
func (oc *OperatorController) RemoveOperator(op *Operator) {
func (oc *OperatorController) RemoveOperator(op *Operator) (found bool) {
oc.Lock()
defer oc.Unlock()
oc.removeOperatorLocked(op)
}

// RemoveTimeoutOperator removes a operator which is timeout from the running operators.
func (oc *OperatorController) RemoveTimeoutOperator(op *Operator) {
oc.Lock()
defer oc.Unlock()
operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc()
oc.removeOperatorLocked(op)
return oc.removeOperatorLocked(op)
}

// GetOperatorStatus gets the operator and its status with the specify id.
Expand All @@ -347,11 +348,15 @@ func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus {
return oc.opRecords.Get(id)
}

func (oc *OperatorController) removeOperatorLocked(op *Operator) {
func (oc *OperatorController) removeOperatorLocked(op *Operator) bool {
regionID := op.RegionID()
delete(oc.operators, regionID)
oc.updateCounts(oc.operators)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
if cur := oc.operators[regionID]; cur == op {
delete(oc.operators, regionID)
oc.updateCounts(oc.operators)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
return true
}
return false
}

// GetOperator gets a operator from the given region.
Expand Down
72 changes: 69 additions & 3 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ package schedule

import (
"container/heap"
"sync"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/pkg/mock/mockcluster"
"github.com/pingcap/pd/pkg/mock/mockhbstream"
"github.com/pingcap/pd/pkg/mock/mockoption"
"github.com/pingcap/pd/server/core"
)

var _ = Suite(&testOperatorControllerSuite{})
Expand All @@ -45,8 +48,9 @@ func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) {
oc.SetOperator(op1)
oc.SetOperator(op2)
go func() {
c.Assert(oc.RemoveOperator(op1), IsTrue)
for {
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsFalse)
}
}()
go func() {
Expand Down Expand Up @@ -92,27 +96,76 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) {
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_SUCCESS)
}

// issue #1716
func (t *testOperatorControllerSuite) TestConcurrentRemoveOperator(c *C) {
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
oc := NewOperatorController(tc, mockhbstream.NewHeartbeatStream())
tc.AddLeaderStore(1, 0)
tc.AddLeaderStore(2, 1)
tc.AddLeaderRegion(1, 2, 1)
region1 := tc.GetRegion(1)
steps := []OperatorStep{
RemovePeer{FromStore: 1},
AddPeer{ToStore: 1, PeerID: 4},
}
// finished op with normal priority
op1 := NewOperator("test", 1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2})
// unfinished op with high priority
op2 := NewOperator("test", 1, &metapb.RegionEpoch{}, OpRegion|OpAdmin, steps...)
op2.SetPriorityLevel(core.HighPriority)

oc.SetOperator(op1)

c.Assert(failpoint.Enable("github.com/pingcap/pd/server/schedule/concurrentRemoveOperator", "return(true)"), IsNil)

var wg sync.WaitGroup
wg.Add(2)
go func() {
oc.Dispatch(region1, "test")
wg.Done()
}()
go func() {
time.Sleep(50 * time.Millisecond)
success := oc.AddOperator(op2)
// If the assert failed before wg.Done, the test will be blocked.
defer c.Assert(success, IsTrue)
wg.Done()
}()
wg.Wait()

c.Assert(oc.GetOperator(1), Equals, op2)
}

func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) {
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
oc := NewOperatorController(tc, mockhbstream.NewHeartbeatStream())
tc.AddLeaderStore(1, 2)
tc.AddLeaderStore(2, 0)
tc.AddLeaderStore(2, 1)
tc.AddLeaderRegion(1, 1, 2)
tc.AddLeaderRegion(2, 1, 2)
tc.AddLeaderRegion(4, 2, 1)
steps := []OperatorStep{
RemovePeer{FromStore: 2},
AddPeer{ToStore: 2, PeerID: 4},
}
op1 := NewOperator("test", 1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2})
op2 := NewOperator("test", 2, &metapb.RegionEpoch{}, OpRegion, steps...)
op3 := NewOperator("test", 3, &metapb.RegionEpoch{}, OpRegion, steps...)
op4 := NewOperator("test", 4, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2})
region1 := tc.GetRegion(1)
region2 := tc.GetRegion(2)
region4 := tc.GetRegion(4)
// Adds operator and pushes to the notifier queue.
{
oc.SetOperator(op1)
oc.SetOperator(op3)
oc.SetOperator(op4)
oc.SetOperator(op2)
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op1, time: time.Now().Add(100 * time.Millisecond)})
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op3, time: time.Now().Add(300 * time.Millisecond)})
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op4, time: time.Now().Add(499 * time.Millisecond)})
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op2, time: time.Now().Add(500 * time.Millisecond)})
}
// fisrt poll got nil
Expand All @@ -126,14 +179,27 @@ func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) {
c.Assert(r, NotNil)
c.Assert(next, IsTrue)
c.Assert(r.GetID(), Equals, region1.GetID())

// find op3 with nil region, remove it
c.Assert(oc.GetOperator(3), NotNil)
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, IsNil)
c.Assert(next, IsFalse)
c.Assert(next, IsTrue)
c.Assert(oc.GetOperator(3), IsNil)

// find op4 finished
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, NotNil)
c.Assert(next, IsTrue)
c.Assert(r.GetID(), Equals, region4.GetID())

// after waiting 500 millseconds, the region2 need to dispatch
time.Sleep(400 * time.Millisecond)
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, NotNil)
c.Assert(next, IsTrue)
c.Assert(r.GetID(), Equals, region2.GetID())
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, IsNil)
c.Assert(next, IsFalse)
}

0 comments on commit e07854f

Please sign in to comment.