diff --git a/server/checker/merge_checker_test.go b/server/checker/merge_checker_test.go index 23dd6c49c0e..c52a7566d7b 100644 --- a/server/checker/merge_checker_test.go +++ b/server/checker/merge_checker_test.go @@ -407,7 +407,7 @@ func (s *testMergeCheckerSuite) TestStorelimit(c *C) { for i := 0; i < 50; i++ { c.Assert(oc.AddOperator(ops...), IsTrue) for _, op := range ops { - oc.RemoveOperator(op) + c.Assert(oc.RemoveOperator(op), IsTrue) } } s.regions[2] = s.regions[2].Clone( @@ -421,7 +421,7 @@ func (s *testMergeCheckerSuite) TestStorelimit(c *C) { for i := 0; i < 5; i++ { c.Assert(oc.AddOperator(ops...), IsTrue) for _, op := range ops { - oc.RemoveOperator(op) + c.Assert(oc.RemoveOperator(op), IsTrue) } } c.Assert(oc.AddOperator(ops...), IsFalse) diff --git a/server/cluster.go b/server/cluster.go index 39dd8d5b7ab..3c07fce7ecb 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -238,7 +238,6 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) { log.Info("background jobs has been stopped") return case <-ticker.C: - c.checkOperators() c.checkStores() c.collectMetrics() c.coordinator.opController.PruneHistory() @@ -994,29 +993,6 @@ func (c *RaftCluster) deleteStoreLocked(store *core.StoreInfo) error { return nil } -func (c *RaftCluster) checkOperators() { - opController := c.coordinator.opController - for _, op := range opController.GetOperators() { - // after region is merged, it will not heartbeat anymore - // the operator of merged region will not timeout actively - region := c.GetRegion(op.RegionID()) - if region == nil { - log.Debug("remove operator cause region is merged", - zap.Uint64("region-id", op.RegionID()), - zap.Stringer("operator", op)) - opController.RemoveOperator(op) - continue - } - - if op.IsTimeout() { - log.Info("operator timeout", - zap.Uint64("region-id", op.RegionID()), - zap.Stringer("operator", op)) - opController.RemoveTimeoutOperator(op) - } - } -} - func (c *RaftCluster) collectMetrics() { statsMap := statistics.NewStoreStatisticsMap(c.opt, c.GetNamespaceClassifier()) stores := c.GetStores() diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 2da78fd8daf..346a4cbc930 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -179,7 +179,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) { c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(0)) // Remove the operator manually, then we can add a new operator. - oc.RemoveOperator(op1) + c.Assert(oc.RemoveOperator(op1), IsTrue) oc.AddWaitingOperator(op2) c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(1)) c.Assert(oc.GetOperator(1).RegionID(), Equals, op2.RegionID()) @@ -763,7 +763,7 @@ func (s *testOperatorControllerSuite) TestOperatorCount(c *C) { op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) oc.AddWaitingOperator(op2) c.Assert(oc.OperatorCount(operator.OpLeader), Equals, uint64(2)) // 1:leader, 2:leader - oc.RemoveOperator(op1) + c.Assert(oc.RemoveOperator(op1), IsTrue) c.Assert(oc.OperatorCount(operator.OpLeader), Equals, uint64(1)) // 2:leader op1 = newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) @@ -798,7 +798,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) { for i := 0; i < 10; i++ { c.Assert(lb.Schedule(tc), IsNil) } - oc.RemoveOperator(op1) + c.Assert(oc.RemoveOperator(op1), IsTrue) time.Sleep(1 * time.Second) for i := 0; i < 100; i++ { c.Assert(lb.Schedule(tc), NotNil) @@ -887,7 +887,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) { c.Assert(oc.AddWaitingOperator(op2), IsTrue) // count = 2 c.Assert(sc.AllowSchedule(), IsFalse) - oc.RemoveOperator(op1) + c.Assert(oc.RemoveOperator(op1), IsTrue) // count = 1 c.Assert(sc.AllowSchedule(), IsTrue) @@ -898,7 +898,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) { c.Assert(sc.AllowSchedule(), IsFalse) c.Assert(oc.AddWaitingOperator(op3), IsTrue) c.Assert(sc.AllowSchedule(), IsTrue) - oc.RemoveOperator(op3) + c.Assert(oc.RemoveOperator(op3), IsTrue) // add a admin operator will remove old operator c.Assert(oc.AddWaitingOperator(op2), IsTrue) @@ -907,14 +907,14 @@ func (s *testScheduleControllerSuite) TestController(c *C) { op4.SetPriorityLevel(core.HighPriority) c.Assert(oc.AddWaitingOperator(op4), IsTrue) c.Assert(sc.AllowSchedule(), IsTrue) - oc.RemoveOperator(op4) + c.Assert(oc.RemoveOperator(op4), IsTrue) // test wrong region id. op5 := newTestOperator(3, &metapb.RegionEpoch{}, operator.OpHotRegion) c.Assert(oc.AddWaitingOperator(op5), IsFalse) // test wrong region epoch. - oc.RemoveOperator(op1) + c.Assert(oc.RemoveOperator(op1), IsTrue) epoch := &metapb.RegionEpoch{ Version: tc.GetRegion(1).GetRegionEpoch().GetVersion() + 1, ConfVer: tc.GetRegion(1).GetRegionEpoch().GetConfVer(), @@ -924,7 +924,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) { epoch.Version-- op6 = newTestOperator(1, epoch, operator.OpLeader) c.Assert(oc.AddWaitingOperator(op6), IsTrue) - oc.RemoveOperator(op6) + c.Assert(oc.RemoveOperator(op6), IsTrue) } func (s *testScheduleControllerSuite) TestInterval(c *C) { diff --git a/server/handler.go b/server/handler.go index a820bbc0990..7077b978b92 100644 --- a/server/handler.go +++ b/server/handler.go @@ -304,7 +304,7 @@ func (h *Handler) RemoveOperator(regionID uint64) error { return ErrOperatorNotFound } - c.opController.RemoveOperator(op) + _ = c.opController.RemoveOperator(op) return nil } diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 72a6f230c33..79ca3a4b9e5 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -22,6 +22,7 @@ import ( "time" "github.com/juju/ratelimit" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/eraftpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -90,6 +91,9 @@ func NewOperatorController(cluster Cluster, hbStreams HeartbeatStreams) *Operato func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { // Check existed operator. if op := oc.GetOperator(region.GetID()); op != nil { + failpoint.Inject("concurrentRemoveOperator", func() { + time.Sleep(500 * time.Millisecond) + }) timeout := op.IsTimeout() if step := op.Check(region); step != nil && !timeout { operatorCounter.WithLabelValues(op.Desc(), "check").Inc() @@ -103,29 +107,31 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { changes := latest.GetConfVer() - origin.GetConfVer() if source == DispatchFromHeartBeat && changes > uint64(op.ConfVerChanged(region)) { - log.Info("stale operator", zap.Uint64("region-id", region.GetID()), - zap.Reflect("operator", op), zap.Uint64("diff", changes)) - operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() - oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL) - oc.RemoveOperator(op) - oc.PromoteWaitingOperator() + + if oc.RemoveOperator(op) { + log.Info("stale operator", zap.Uint64("region-id", region.GetID()), + zap.Reflect("operator", op), zap.Uint64("diff", changes)) + operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() + oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL) + oc.PromoteWaitingOperator() + } + return } oc.SendScheduleCommand(region, step, source) return } - if op.IsFinish() { + if op.IsFinish() && oc.RemoveOperator(op) { log.Info("operator finish", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op)) operatorCounter.WithLabelValues(op.Desc(), "finish").Inc() operatorDuration.WithLabelValues(op.Desc()).Observe(op.RunningTime().Seconds()) oc.pushHistory(op) oc.opRecords.Put(op, pdpb.OperatorStatus_SUCCESS) - oc.RemoveOperator(op) oc.PromoteWaitingOperator() - } else if timeout { + } else if timeout && oc.RemoveOperator(op) { log.Info("operator timeout", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op)) - oc.RemoveTimeoutOperator(op) + operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc() oc.opRecords.Put(op, pdpb.OperatorStatus_TIMEOUT) oc.PromoteWaitingOperator() } @@ -158,11 +164,17 @@ func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next } r = oc.cluster.GetRegion(regionID) if r == nil { + _ = oc.removeOperatorLocked(op) + log.Debug("remove operator because region disappeared", + zap.Uint64("region-id", op.RegionID()), + zap.Stringer("operator", op)) + operatorCounter.WithLabelValues(op.Desc(), "disappear").Inc() + oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL) return nil, true } step := op.Check(r) if step == nil { - return nil, true + return r, true } now := time.Now() if now.Before(item.time) { @@ -230,7 +242,7 @@ func (oc *OperatorController) AddOperator(ops ...*operator.Operator) bool { if oc.exceedStoreLimit(ops...) || !oc.checkAddOperator(ops...) { for _, op := range ops { - operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc() + operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc() oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL) } return false @@ -306,10 +318,10 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { // If there is an old operator, replace it. The priority should be checked // already. if old, ok := oc.operators[regionID]; ok { + _ = oc.removeOperatorLocked(old) log.Info("replace old operator", zap.Uint64("region-id", regionID), zap.Reflect("operator", old)) - operatorCounter.WithLabelValues(old.Desc(), "replaced").Inc() + operatorCounter.WithLabelValues(old.Desc(), "replace").Inc() oc.opRecords.Put(old, pdpb.OperatorStatus_REPLACE) - oc.removeOperatorLocked(old) } oc.operators[regionID] = op @@ -340,18 +352,10 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { } // RemoveOperator removes a operator from the running operators. -func (oc *OperatorController) RemoveOperator(op *operator.Operator) { - oc.Lock() - defer oc.Unlock() - oc.removeOperatorLocked(op) -} - -// RemoveTimeoutOperator removes a operator which is timeout from the running operators. -func (oc *OperatorController) RemoveTimeoutOperator(op *operator.Operator) { +func (oc *OperatorController) RemoveOperator(op *operator.Operator) (found bool) { oc.Lock() defer oc.Unlock() - operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc() - oc.removeOperatorLocked(op) + return oc.removeOperatorLocked(op) } // GetOperatorStatus gets the operator and its status with the specify id. @@ -367,11 +371,15 @@ func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus { return oc.opRecords.Get(id) } -func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) { +func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) bool { regionID := op.RegionID() - delete(oc.operators, regionID) - oc.updateCounts(oc.operators) - operatorCounter.WithLabelValues(op.Desc(), "remove").Inc() + if cur := oc.operators[regionID]; cur == op { + delete(oc.operators, regionID) + oc.updateCounts(oc.operators) + operatorCounter.WithLabelValues(op.Desc(), "remove").Inc() + return true + } + return false } // GetOperator gets a operator from the given region. diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index 63ee8aad1b8..195cf0c25ed 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -15,10 +15,12 @@ package schedule import ( "container/heap" + "sync" "testing" "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/pkg/mock/mockcluster" @@ -52,8 +54,9 @@ func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) { oc.SetOperator(op1) oc.SetOperator(op2) go func() { + c.Assert(oc.RemoveOperator(op1), IsTrue) for { - oc.RemoveOperator(op1) + c.Assert(oc.RemoveOperator(op1), IsFalse) } }() go func() { @@ -99,27 +102,75 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) { c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_SUCCESS) } +// issue #1716 +func (t *testOperatorControllerSuite) TestConcurrentRemoveOperator(c *C) { + opt := mockoption.NewScheduleOptions() + tc := mockcluster.NewCluster(opt) + oc := NewOperatorController(tc, mockhbstream.NewHeartbeatStream()) + tc.AddLeaderStore(1, 0) + tc.AddLeaderStore(2, 1) + tc.AddLeaderRegion(1, 2, 1) + region1 := tc.GetRegion(1) + steps := []operator.OpStep{ + operator.RemovePeer{FromStore: 1}, + operator.AddPeer{ToStore: 1, PeerID: 4}, + } + // finished op with normal priority + op1 := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2}) + // unfinished op with high priority + op2 := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion|operator.OpAdmin, steps...) + + oc.SetOperator(op1) + + c.Assert(failpoint.Enable("github.com/pingcap/pd/server/schedule/concurrentRemoveOperator", "return(true)"), IsNil) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + oc.Dispatch(region1, "test") + wg.Done() + }() + go func() { + time.Sleep(50 * time.Millisecond) + success := oc.AddOperator(op2) + // If the assert failed before wg.Done, the test will be blocked. + defer c.Assert(success, IsTrue) + wg.Done() + }() + wg.Wait() + + c.Assert(oc.GetOperator(1), Equals, op2) +} + func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) { opt := mockoption.NewScheduleOptions() tc := mockcluster.NewCluster(opt) oc := NewOperatorController(tc, mockhbstream.NewHeartbeatStream()) tc.AddLeaderStore(1, 2) - tc.AddLeaderStore(2, 0) + tc.AddLeaderStore(2, 1) tc.AddLeaderRegion(1, 1, 2) tc.AddLeaderRegion(2, 1, 2) + tc.AddLeaderRegion(4, 2, 1) steps := []operator.OpStep{ operator.RemovePeer{FromStore: 2}, operator.AddPeer{ToStore: 2, PeerID: 4}, } op1 := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2}) op2 := operator.NewOperator("test", "test", 2, &metapb.RegionEpoch{}, operator.OpRegion, steps...) + op3 := operator.NewOperator("test", "test", 3, &metapb.RegionEpoch{}, operator.OpRegion, steps...) + op4 := operator.NewOperator("test", "test", 4, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2}) region1 := tc.GetRegion(1) region2 := tc.GetRegion(2) + region4 := tc.GetRegion(4) // Adds operator and pushes to the notifier queue. { oc.SetOperator(op1) + oc.SetOperator(op3) + oc.SetOperator(op4) oc.SetOperator(op2) heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op1, time: time.Now().Add(100 * time.Millisecond)}) + heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op3, time: time.Now().Add(300 * time.Millisecond)}) + heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op4, time: time.Now().Add(499 * time.Millisecond)}) heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op2, time: time.Now().Add(500 * time.Millisecond)}) } // fisrt poll got nil @@ -133,9 +184,19 @@ func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) { c.Assert(r, NotNil) c.Assert(next, IsTrue) c.Assert(r.GetID(), Equals, region1.GetID()) + + // find op3 with nil region, remove it + c.Assert(oc.GetOperator(3), NotNil) r, next = oc.pollNeedDispatchRegion() c.Assert(r, IsNil) - c.Assert(next, IsFalse) + c.Assert(next, IsTrue) + c.Assert(oc.GetOperator(3), IsNil) + + // find op4 finished + r, next = oc.pollNeedDispatchRegion() + c.Assert(r, NotNil) + c.Assert(next, IsTrue) + c.Assert(r.GetID(), Equals, region4.GetID()) // after waiting 500 millseconds, the region2 need to dispatch time.Sleep(400 * time.Millisecond) @@ -143,6 +204,9 @@ func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) { c.Assert(r, NotNil) c.Assert(next, IsTrue) c.Assert(r.GetID(), Equals, region2.GetID()) + r, next = oc.pollNeedDispatchRegion() + c.Assert(r, IsNil) + c.Assert(next, IsFalse) } func (t *testOperatorControllerSuite) TestStorelimit(c *C) { @@ -159,27 +223,27 @@ func (t *testOperatorControllerSuite) TestStorelimit(c *C) { for i := uint64(1); i <= 5; i++ { op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: i}) c.Assert(oc.AddOperator(op), IsTrue) - oc.RemoveOperator(op) + c.Assert(oc.RemoveOperator(op), IsTrue) } op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 1}) c.Assert(oc.AddOperator(op), IsFalse) - oc.RemoveOperator(op) + c.Assert(oc.RemoveOperator(op), IsFalse) oc.SetStoreLimit(2, 2) for i := uint64(1); i <= 10; i++ { op = operator.NewOperator("test", "test", i, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: i}) c.Assert(oc.AddOperator(op), IsTrue) - oc.RemoveOperator(op) + c.Assert(oc.RemoveOperator(op), IsTrue) } oc.SetAllStoresLimit(1) for i := uint64(1); i <= 5; i++ { op = operator.NewOperator("test", "test", i, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: i}) c.Assert(oc.AddOperator(op), IsTrue) - oc.RemoveOperator(op) + c.Assert(oc.RemoveOperator(op), IsTrue) } op = operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 1}) c.Assert(oc.AddOperator(op), IsFalse) - oc.RemoveOperator(op) + c.Assert(oc.RemoveOperator(op), IsFalse) } // #1652