Skip to content

Commit

Permalink
Merge pull request #1 from pingcap/master
Browse files Browse the repository at this point in the history
merged
  • Loading branch information
bradyjoestar authored Mar 20, 2019
2 parents 59968e1 + e1c1ae4 commit b966728
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ This is a rough outline of what a contributor's workflow looks like:
- Submit a pull request.
- Your PR must receive LGTMs from two maintainers.

More specifics on the development workflow are in [development workflow](./docs/development-workflow.md).

More specifics on the coding flow are in [development](./docs/development.md).

Thanks for your contributions!
Expand Down
66 changes: 66 additions & 0 deletions docs/development-workflow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Development Workflow


Start by forking the `pd` GitHub repository, make changes in a branch and then send a pull request.

## Set up your pd GitHub Repository


After forking the [PD upstream](https://github.com/pingcap/pd/fork) source repository to your personal repository. You can set up your personal development environment for PD project.

```sh
$ cd $GOPATH/src/github.com/pingcap
$ git clone < your personal forked pd repo>
$ cd pd
```

## Set git remote as ``upstream``


```sh
$ git remote add upstream https://github.com/pingcap/pd
$ git fetch upstream
$ git merge upstream/master
...
```

## Create your feature branch


Before making code changes, make sure you create a separate branch for them.

```
$ git checkout -b my-feature
```

## Test your changes


After your code changes, make sure that you have:

- Added test cases for the new code.
- Run `make test`.


## Commit changes


After verification, commit your changes.

```
$ git commit -am 'information about your feature'
```

## Push to the branch


Push your locally committed changes to the remote origin (your fork).

```
$ git push origin my-feature
```

## Create a Pull Request


Pull requests can be created via GitHub. Refer to [this document](https://help.github.com/articles/creating-a-pull-request/) for more details on how to create a pull request.
10 changes: 8 additions & 2 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,10 @@ func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreI
return err
}

op := schedule.CreateMovePeerOperator("adminMovePeer", c.cluster, region, schedule.OpAdmin, fromStoreID, toStoreID, newPeer.GetId())
op, err := schedule.CreateMovePeerOperator("adminMovePeer", c.cluster, region, schedule.OpAdmin, fromStoreID, toStoreID, newPeer.GetId())
if err != nil {
return err
}
if ok := c.opController.AddOperator(op); !ok {
return errors.WithStack(errAddOperator)
}
Expand Down Expand Up @@ -550,7 +553,10 @@ func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) err
return errors.Errorf("region has no peer in store %v", fromStoreID)
}

op := schedule.CreateRemovePeerOperator("adminRemovePeer", c.cluster, schedule.OpAdmin, region, fromStoreID)
op, err := schedule.CreateRemovePeerOperator("adminRemovePeer", c.cluster, schedule.OpAdmin, region, fromStoreID)
if err != nil {
return err
}
if ok := c.opController.AddOperator(op); !ok {
return errors.WithStack(errAddOperator)
}
Expand Down
7 changes: 6 additions & 1 deletion server/schedule/namespace_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,13 @@ func (n *NamespaceChecker) Check(region *core.RegionInfo) *Operator {
checkerCounter.WithLabelValues("namespace_checker", "no_target_peer").Inc()
return nil
}
op, err := CreateMovePeerOperator("makeNamespaceRelocation", n.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
checkerCounter.WithLabelValues("namespace_checker", "create_operator_fail").Inc()
return nil
}
checkerCounter.WithLabelValues("namespace_checker", "new_operator").Inc()
return CreateMovePeerOperator("makeNamespaceRelocation", n.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
return op
}

checkerCounter.WithLabelValues("namespace_checker", "all_right").Inc()
Expand Down
35 changes: 27 additions & 8 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,20 @@ func (o *Operator) History() []OperatorHistory {
}

// CreateRemovePeerOperator creates an Operator that removes a peer from region.
func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, region *core.RegionInfo, storeID uint64) *Operator {
removeKind, steps := removePeerSteps(cluster, region, storeID)
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...)
func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, region *core.RegionInfo, storeID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, storeID, getRegionFollowerIDs(region))
if err != nil {
return nil, err
}
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...), nil
}

// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) *Operator {
removeKind, steps := removePeerSteps(cluster, region, oldStore)
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
if err != nil {
return nil, err
}
var st []OperatorStep
if cluster.IsRaftLearnerEnabled() {
st = []OperatorStep{
Expand All @@ -448,20 +454,33 @@ func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInf
}
}
steps = append(st, steps...)
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...)
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...), nil
}

func getRegionFollowerIDs(region *core.RegionInfo) []uint64 {
var ids []uint64
for id := range region.GetFollowers() {
ids = append(ids, id)
}
return ids
}

// removePeerSteps returns the steps to safely remove a peer. It prevents removing leader by transfer its leadership first.
func removePeerSteps(cluster Cluster, region *core.RegionInfo, storeID uint64) (kind OperatorKind, steps []OperatorStep) {
func removePeerSteps(cluster Cluster, region *core.RegionInfo, storeID uint64, followerIDs []uint64) (kind OperatorKind, steps []OperatorStep, err error) {
if region.GetLeader() != nil && region.GetLeader().GetStoreId() == storeID {
for id := range region.GetFollowers() {
for _, id := range followerIDs {
follower := cluster.GetStore(id)
if follower != nil && !cluster.CheckLabelProperty(RejectLeader, follower.GetLabels()) {
steps = append(steps, TransferLeader{FromStore: storeID, ToStore: id})
kind = OpLeader
break
}
}
if len(steps) == 0 {
err = errors.New("no suitable follower to become region leader")
log.Debug("fail to create remove peer operator", zap.Uint64("region-id", region.GetID()), zap.Error(err))
return
}
}
steps = append(steps, RemovePeer{FromStore: storeID})
kind |= OpRegion
Expand Down
5 changes: 4 additions & 1 deletion server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {
delete(stores, newPeer.GetStoreId())
r.selected.put(newPeer.GetStoreId())

op := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin,
op, err := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin,
peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
continue
}
steps = append(steps, op.steps...)
steps = append(steps, TransferLeader{ToStore: newPeer.GetStoreId()})
kind |= op.Kind()
Expand Down
34 changes: 29 additions & 5 deletions server/schedule/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ func (r *ReplicaChecker) Check(region *core.RegionInfo) *Operator {
checkerCounter.WithLabelValues("replica_checker", "no_worst_peer").Inc()
return nil
}
op, err := CreateRemovePeerOperator("removeExtraReplica", r.cluster, OpReplica, region, oldPeer.GetStoreId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
checkerCounter.WithLabelValues("replica_checker", "new_operator").Inc()
return CreateRemovePeerOperator("removeExtraReplica", r.cluster, OpReplica, region, oldPeer.GetStoreId())
return op
}

return r.checkBestReplacement(region)
Expand Down Expand Up @@ -233,15 +238,25 @@ func (r *ReplicaChecker) checkBestReplacement(region *core.RegionInfo) *Operator
if err != nil {
return nil
}
op, err := CreateMovePeerOperator("moveToBetterLocation", r.cluster, region, OpReplica, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
checkerCounter.WithLabelValues("replica_checker", "new_operator").Inc()
return CreateMovePeerOperator("moveToBetterLocation", r.cluster, region, OpReplica, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
return op
}

func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, peer *metapb.Peer, status string) *Operator {
removeExtra := fmt.Sprintf("removeExtra%sReplica", status)
// Check the number of replicas first.
if len(region.GetPeers()) > r.cluster.GetMaxReplicas() {
return CreateRemovePeerOperator(removeExtra, r.cluster, OpReplica, region, peer.GetStoreId())
op, err := CreateRemovePeerOperator(removeExtra, r.cluster, OpReplica, region, peer.GetStoreId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
return op
}

removePending := fmt.Sprintf("removePending%sReplica", status)
Expand All @@ -250,7 +265,12 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, peer *metapb.Peer, sta
// D then removes C, D will not be successfully added util C is normal again.
// So it's better to remove C directly.
if region.GetPendingPeer(peer.GetId()) != nil {
return CreateRemovePeerOperator(removePending, r.cluster, OpReplica, region, peer.GetStoreId())
op, err := CreateRemovePeerOperator(removePending, r.cluster, OpReplica, region, peer.GetStoreId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
return op
}

storeID, _ := r.SelectBestReplacementStore(region, peer, NewStorageThresholdFilter())
Expand All @@ -264,5 +284,9 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, peer *metapb.Peer, sta
}

replace := fmt.Sprintf("replace%sReplica", status)
return CreateMovePeerOperator(replace, r.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
op, err := CreateMovePeerOperator(replace, r.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
return nil
}
return op
}
6 changes: 5 additions & 1 deletion server/schedulers/adjacent_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,11 @@ func (l *balanceAdjacentRegionScheduler) dispersePeer(cluster schedule.Cluster,
// record the store id and exclude it in next time
l.cacheRegions.assignedStoreIds = append(l.cacheRegions.assignedStoreIds, newPeer.GetStoreId())

op := schedule.CreateMovePeerOperator("balance-adjacent-peer", cluster, region, schedule.OpAdjacent, leaderStoreID, newPeer.GetStoreId(), newPeer.GetId())
op, err := schedule.CreateMovePeerOperator("balance-adjacent-peer", cluster, region, schedule.OpAdjacent, leaderStoreID, newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(l.GetName(), "create_operator_fail").Inc()
return nil
}
op.SetPriorityLevel(core.LowPriority)
schedulerCounter.WithLabelValues(l.GetName(), "adjacent_peer").Inc()
return op
Expand Down
7 changes: 6 additions & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ func (s *balanceRegionScheduler) transferPeer(cluster schedule.Cluster, region *
}
balanceRegionCounter.WithLabelValues("move_peer", source.GetAddress()+"-out").Inc()
balanceRegionCounter.WithLabelValues("move_peer", target.GetAddress()+"-in").Inc()
return schedule.CreateMovePeerOperator("balance-region", cluster, region, schedule.OpBalance, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
op, err := schedule.CreateMovePeerOperator("balance-region", cluster, region, schedule.OpBalance, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(s.GetName(), "create_operator_fail").Inc()
return nil
}
return op
}

// hasPotentialTarget is used to determine whether the specified sourceStore
Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,15 +336,15 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) {
tc.AddRegionStore(4, 16)
// Add region 1 with leader in store 4.
tc.AddLeaderRegion(1, 4)
testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 4, 1)
testutil.CheckTransferPeerWithLeaderTransfer(c, sb.Schedule(tc)[0], schedule.OpBalance, 4, 1)

// Test stateFilter.
tc.SetStoreOffline(1)
tc.UpdateRegionCount(2, 6)
cache.Remove(4)
// When store 1 is offline, it will be filtered,
// store 2 becomes the store with least regions.
testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 4, 2)
testutil.CheckTransferPeerWithLeaderTransfer(c, sb.Schedule(tc)[0], schedule.OpBalance, 4, 2)
opt.SetMaxReplicas(3)
c.Assert(sb.Schedule(tc), IsNil)

Expand Down
14 changes: 12 additions & 2 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,13 @@ func (h *balanceHotRegionsScheduler) balanceHotReadRegions(cluster schedule.Clus
// balance by peer
srcRegion, srcPeer, destPeer := h.balanceByPeer(cluster, h.stats.readStatAsLeader)
if srcRegion != nil {
op, err := schedule.CreateMovePeerOperator("moveHotReadRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(h.GetName(), "create_operator_fail").Inc()
return nil
}
schedulerCounter.WithLabelValues(h.GetName(), "move_peer").Inc()
return []*schedule.Operator{schedule.CreateMovePeerOperator("moveHotReadRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())}
return []*schedule.Operator{op}
}
schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc()
return nil
Expand All @@ -189,8 +194,13 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu
// balance by peer
srcRegion, srcPeer, destPeer := h.balanceByPeer(cluster, h.stats.writeStatAsPeer)
if srcRegion != nil {
op, err := schedule.CreateMovePeerOperator("moveHotWriteRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(h.GetName(), "create_operator_fail").Inc()
return nil
}
schedulerCounter.WithLabelValues(h.GetName(), "move_peer").Inc()
return []*schedule.Operator{schedule.CreateMovePeerOperator("moveHotWriteRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())}
return []*schedule.Operator{op}
}
case 1:
// balance by leader
Expand Down
6 changes: 5 additions & 1 deletion server/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ func (s *shuffleRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule.
return nil
}

op, err := schedule.CreateMovePeerOperator("shuffle-region", cluster, region, schedule.OpAdmin, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(s.GetName(), "create_operator_fail").Inc()
return nil
}
schedulerCounter.WithLabelValues(s.GetName(), "new_operator").Inc()
op := schedule.CreateMovePeerOperator("shuffle-region", cluster, region, schedule.OpAdmin, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
op.SetPriorityLevel(core.HighPriority)
return []*schedule.Operator{op}
}
Expand Down

0 comments on commit b966728

Please sign in to comment.