Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: fix a thread-safe bug and improve code #1719

Merged
merged 14 commits into from
Sep 6, 2019
4 changes: 2 additions & 2 deletions server/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (s *testMergeCheckerSuite) TestStorelimit(c *C) {
for i := 0; i < 50; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
c.Assert(oc.RemoveOperator(op), IsTrue)
}
}
s.regions[2] = s.regions[2].Clone(
Expand All @@ -421,7 +421,7 @@ func (s *testMergeCheckerSuite) TestStorelimit(c *C) {
for i := 0; i < 5; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
c.Assert(oc.RemoveOperator(op), IsTrue)
}
}
c.Assert(oc.AddOperator(ops...), IsFalse)
Expand Down
24 changes: 0 additions & 24 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) {
log.Info("background jobs has been stopped")
return
case <-ticker.C:
c.checkOperators()
c.checkStores()
c.collectMetrics()
c.coordinator.opController.PruneHistory()
Expand Down Expand Up @@ -994,29 +993,6 @@ func (c *RaftCluster) deleteStoreLocked(store *core.StoreInfo) error {
return nil
}

func (c *RaftCluster) checkOperators() {
opController := c.coordinator.opController
for _, op := range opController.GetOperators() {
// after region is merged, it will not heartbeat anymore
// the operator of merged region will not timeout actively
region := c.GetRegion(op.RegionID())
if region == nil {
log.Debug("remove operator cause region is merged",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
opController.RemoveOperator(op)
continue
}

if op.IsTimeout() {
log.Info("operator timeout",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
opController.RemoveTimeoutOperator(op)
}
}
}

func (c *RaftCluster) collectMetrics() {
statsMap := statistics.NewStoreStatisticsMap(c.opt, c.GetNamespaceClassifier())
stores := c.GetStores()
Expand Down
16 changes: 8 additions & 8 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(0))

// Remove the operator manually, then we can add a new operator.
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
oc.AddWaitingOperator(op2)
c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(1))
c.Assert(oc.GetOperator(1).RegionID(), Equals, op2.RegionID())
Expand Down Expand Up @@ -763,7 +763,7 @@ func (s *testOperatorControllerSuite) TestOperatorCount(c *C) {
op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader)
oc.AddWaitingOperator(op2)
c.Assert(oc.OperatorCount(operator.OpLeader), Equals, uint64(2)) // 1:leader, 2:leader
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
c.Assert(oc.OperatorCount(operator.OpLeader), Equals, uint64(1)) // 2:leader

op1 = newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion)
Expand Down Expand Up @@ -798,7 +798,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
for i := 0; i < 10; i++ {
c.Assert(lb.Schedule(tc), IsNil)
}
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
time.Sleep(1 * time.Second)
for i := 0; i < 100; i++ {
c.Assert(lb.Schedule(tc), NotNil)
Expand Down Expand Up @@ -887,7 +887,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
c.Assert(oc.AddWaitingOperator(op2), IsTrue)
// count = 2
c.Assert(sc.AllowSchedule(), IsFalse)
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
// count = 1
c.Assert(sc.AllowSchedule(), IsTrue)

Expand All @@ -898,7 +898,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
c.Assert(sc.AllowSchedule(), IsFalse)
c.Assert(oc.AddWaitingOperator(op3), IsTrue)
c.Assert(sc.AllowSchedule(), IsTrue)
oc.RemoveOperator(op3)
c.Assert(oc.RemoveOperator(op3), IsTrue)

// add a admin operator will remove old operator
c.Assert(oc.AddWaitingOperator(op2), IsTrue)
Expand All @@ -907,14 +907,14 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
op4.SetPriorityLevel(core.HighPriority)
c.Assert(oc.AddWaitingOperator(op4), IsTrue)
c.Assert(sc.AllowSchedule(), IsTrue)
oc.RemoveOperator(op4)
c.Assert(oc.RemoveOperator(op4), IsTrue)

// test wrong region id.
op5 := newTestOperator(3, &metapb.RegionEpoch{}, operator.OpHotRegion)
c.Assert(oc.AddWaitingOperator(op5), IsFalse)

// test wrong region epoch.
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
epoch := &metapb.RegionEpoch{
Version: tc.GetRegion(1).GetRegionEpoch().GetVersion() + 1,
ConfVer: tc.GetRegion(1).GetRegionEpoch().GetConfVer(),
Expand All @@ -924,7 +924,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
epoch.Version--
op6 = newTestOperator(1, epoch, operator.OpLeader)
c.Assert(oc.AddWaitingOperator(op6), IsTrue)
oc.RemoveOperator(op6)
c.Assert(oc.RemoveOperator(op6), IsTrue)
}

func (s *testScheduleControllerSuite) TestInterval(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (h *Handler) RemoveOperator(regionID uint64) error {
return ErrOperatorNotFound
}

c.opController.RemoveOperator(op)
_ = c.opController.RemoveOperator(op)
rleungx marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down
64 changes: 36 additions & 28 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/juju/ratelimit"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/eraftpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -90,6 +91,9 @@ func NewOperatorController(cluster Cluster, hbStreams HeartbeatStreams) *Operato
func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
// Check existed operator.
if op := oc.GetOperator(region.GetID()); op != nil {
failpoint.Inject("concurrentRemoveOperator", func() {
time.Sleep(500 * time.Millisecond)
})
timeout := op.IsTimeout()
if step := op.Check(region); step != nil && !timeout {
operatorCounter.WithLabelValues(op.Desc(), "check").Inc()
Expand All @@ -103,29 +107,31 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
changes := latest.GetConfVer() - origin.GetConfVer()
if source == DispatchFromHeartBeat &&
changes > uint64(op.ConfVerChanged(region)) {
log.Info("stale operator", zap.Uint64("region-id", region.GetID()),
zap.Reflect("operator", op), zap.Uint64("diff", changes))
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL)
oc.RemoveOperator(op)
oc.PromoteWaitingOperator()

if oc.RemoveOperator(op) {
log.Info("stale operator", zap.Uint64("region-id", region.GetID()),
zap.Reflect("operator", op), zap.Uint64("diff", changes))
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL)
oc.PromoteWaitingOperator()
}

return
}

oc.SendScheduleCommand(region, step, source)
return
}
if op.IsFinish() {
if op.IsFinish() && oc.RemoveOperator(op) {
log.Info("operator finish", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op))
operatorCounter.WithLabelValues(op.Desc(), "finish").Inc()
operatorDuration.WithLabelValues(op.Desc()).Observe(op.RunningTime().Seconds())
oc.pushHistory(op)
oc.opRecords.Put(op, pdpb.OperatorStatus_SUCCESS)
oc.RemoveOperator(op)
oc.PromoteWaitingOperator()
} else if timeout {
} else if timeout && oc.RemoveOperator(op) {
log.Info("operator timeout", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op))
oc.RemoveTimeoutOperator(op)
operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_TIMEOUT)
oc.PromoteWaitingOperator()
}
Expand Down Expand Up @@ -158,11 +164,17 @@ func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next
}
r = oc.cluster.GetRegion(regionID)
if r == nil {
_ = oc.removeOperatorLocked(op)
log.Debug("remove operator because region disappeared",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
operatorCounter.WithLabelValues(op.Desc(), "disappear").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL)
return nil, true
}
step := op.Check(r)
if step == nil {
return nil, true
return r, true
}
now := time.Now()
if now.Before(item.time) {
Expand Down Expand Up @@ -230,7 +242,7 @@ func (oc *OperatorController) AddOperator(ops ...*operator.Operator) bool {

if oc.exceedStoreLimit(ops...) || !oc.checkAddOperator(ops...) {
for _, op := range ops {
operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc()
operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL)
}
return false
Expand Down Expand Up @@ -306,10 +318,10 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool {
// If there is an old operator, replace it. The priority should be checked
// already.
if old, ok := oc.operators[regionID]; ok {
_ = oc.removeOperatorLocked(old)
rleungx marked this conversation as resolved.
Show resolved Hide resolved
log.Info("replace old operator", zap.Uint64("region-id", regionID), zap.Reflect("operator", old))
operatorCounter.WithLabelValues(old.Desc(), "replaced").Inc()
operatorCounter.WithLabelValues(old.Desc(), "replace").Inc()
oc.opRecords.Put(old, pdpb.OperatorStatus_REPLACE)
oc.removeOperatorLocked(old)
}

oc.operators[regionID] = op
Expand Down Expand Up @@ -340,18 +352,10 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool {
}

// RemoveOperator removes a operator from the running operators.
func (oc *OperatorController) RemoveOperator(op *operator.Operator) {
oc.Lock()
defer oc.Unlock()
oc.removeOperatorLocked(op)
}

// RemoveTimeoutOperator removes a operator which is timeout from the running operators.
func (oc *OperatorController) RemoveTimeoutOperator(op *operator.Operator) {
func (oc *OperatorController) RemoveOperator(op *operator.Operator) (found bool) {
oc.Lock()
defer oc.Unlock()
operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc()
oc.removeOperatorLocked(op)
return oc.removeOperatorLocked(op)
}

// GetOperatorStatus gets the operator and its status with the specify id.
Expand All @@ -367,11 +371,15 @@ func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus {
return oc.opRecords.Get(id)
}

func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) {
func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) bool {
regionID := op.RegionID()
delete(oc.operators, regionID)
oc.updateCounts(oc.operators)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
if cur := oc.operators[regionID]; cur == op {
shafreeck marked this conversation as resolved.
Show resolved Hide resolved
delete(oc.operators, regionID)
oc.updateCounts(oc.operators)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
return true
}
return false
}

// GetOperator gets a operator from the given region.
Expand Down
Loading