Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merged #1

Merged
merged 2 commits into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ This is a rough outline of what a contributor's workflow looks like:
- Submit a pull request.
- Your PR must receive LGTMs from two maintainers.

More specifics on the development workflow are in [development workflow](./docs/development-workflow.md).

More specifics on the coding flow are in [development](./docs/development.md).

Thanks for your contributions!
Expand Down
66 changes: 66 additions & 0 deletions docs/development-workflow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Development Workflow


Start by forking the `pd` GitHub repository, make changes in a branch and then send a pull request.

## Set up your pd GitHub Repository


After forking the [PD upstream](https://github.com/pingcap/pd/fork) source repository to your personal repository. You can set up your personal development environment for PD project.

```sh
$ cd $GOPATH/src/github.com/pingcap
$ git clone < your personal forked pd repo>
$ cd pd
```

## Set git remote as ``upstream``


```sh
$ git remote add upstream https://github.com/pingcap/pd
$ git fetch upstream
$ git merge upstream/master
...
```

## Create your feature branch


Before making code changes, make sure you create a separate branch for them.

```
$ git checkout -b my-feature
```

## Test your changes


After your code changes, make sure that you have:

- Added test cases for the new code.
- Run `make test`.


## Commit changes


After verification, commit your changes.

```
$ git commit -am 'information about your feature'
```

## Push to the branch


Push your locally committed changes to the remote origin (your fork).

```
$ git push origin my-feature
```

## Create a Pull Request


Pull requests can be created via GitHub. Refer to [this document](https://help.github.com/articles/creating-a-pull-request/) for more details on how to create a pull request.
10 changes: 8 additions & 2 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,10 @@ func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreI
return err
}

op := schedule.CreateMovePeerOperator("adminMovePeer", c.cluster, region, schedule.OpAdmin, fromStoreID, toStoreID, newPeer.GetId())
op, err := schedule.CreateMovePeerOperator("adminMovePeer", c.cluster, region, schedule.OpAdmin, fromStoreID, toStoreID, newPeer.GetId())
if err != nil {
return err
}
if ok := c.opController.AddOperator(op); !ok {
return errors.WithStack(errAddOperator)
}
Expand Down Expand Up @@ -550,7 +553,10 @@ func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) err
return errors.Errorf("region has no peer in store %v", fromStoreID)
}

op := schedule.CreateRemovePeerOperator("adminRemovePeer", c.cluster, schedule.OpAdmin, region, fromStoreID)
op, err := schedule.CreateRemovePeerOperator("adminRemovePeer", c.cluster, schedule.OpAdmin, region, fromStoreID)
if err != nil {
return err
}
if ok := c.opController.AddOperator(op); !ok {
return errors.WithStack(errAddOperator)
}
Expand Down
7 changes: 6 additions & 1 deletion server/schedule/namespace_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,13 @@ func (n *NamespaceChecker) Check(region *core.RegionInfo) *Operator {
checkerCounter.WithLabelValues("namespace_checker", "no_target_peer").Inc()
return nil
}
op, err := CreateMovePeerOperator("makeNamespaceRelocation", n.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
checkerCounter.WithLabelValues("namespace_checker", "create_operator_fail").Inc()
return nil
}
checkerCounter.WithLabelValues("namespace_checker", "new_operator").Inc()
return CreateMovePeerOperator("makeNamespaceRelocation", n.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
return op
}

checkerCounter.WithLabelValues("namespace_checker", "all_right").Inc()
Expand Down
35 changes: 27 additions & 8 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,20 @@ func (o *Operator) History() []OperatorHistory {
}

// CreateRemovePeerOperator creates an Operator that removes a peer from region.
func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, region *core.RegionInfo, storeID uint64) *Operator {
removeKind, steps := removePeerSteps(cluster, region, storeID)
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...)
func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, region *core.RegionInfo, storeID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, storeID, getRegionFollowerIDs(region))
if err != nil {
return nil, err
}
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...), nil
}

// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) *Operator {
removeKind, steps := removePeerSteps(cluster, region, oldStore)
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
if err != nil {
return nil, err
}
var st []OperatorStep
if cluster.IsRaftLearnerEnabled() {
st = []OperatorStep{
Expand All @@ -448,20 +454,33 @@ func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInf
}
}
steps = append(st, steps...)
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...)
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...), nil
}

func getRegionFollowerIDs(region *core.RegionInfo) []uint64 {
var ids []uint64
for id := range region.GetFollowers() {
ids = append(ids, id)
}
return ids
}

// removePeerSteps returns the steps to safely remove a peer. It prevents removing leader by transfer its leadership first.
func removePeerSteps(cluster Cluster, region *core.RegionInfo, storeID uint64) (kind OperatorKind, steps []OperatorStep) {
func removePeerSteps(cluster Cluster, region *core.RegionInfo, storeID uint64, followerIDs []uint64) (kind OperatorKind, steps []OperatorStep, err error) {
if region.GetLeader() != nil && region.GetLeader().GetStoreId() == storeID {
for id := range region.GetFollowers() {
for _, id := range followerIDs {
follower := cluster.GetStore(id)
if follower != nil && !cluster.CheckLabelProperty(RejectLeader, follower.GetLabels()) {
steps = append(steps, TransferLeader{FromStore: storeID, ToStore: id})
kind = OpLeader
break
}
}
if len(steps) == 0 {
err = errors.New("no suitable follower to become region leader")
log.Debug("fail to create remove peer operator", zap.Uint64("region-id", region.GetID()), zap.Error(err))
return
}
}
steps = append(steps, RemovePeer{FromStore: storeID})
kind |= OpRegion
Expand Down
5 changes: 4 additions & 1 deletion server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {
delete(stores, newPeer.GetStoreId())
r.selected.put(newPeer.GetStoreId())

op := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin,
op, err := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin,
peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
continue
}
steps = append(steps, op.steps...)
steps = append(steps, TransferLeader{ToStore: newPeer.GetStoreId()})
kind |= op.Kind()
Expand Down
34 changes: 29 additions & 5 deletions server/schedule/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ func (r *ReplicaChecker) Check(region *core.RegionInfo) *Operator {
checkerCounter.WithLabelValues("replica_checker", "no_worst_peer").Inc()
return nil
}
op, err := CreateRemovePeerOperator("removeExtraReplica", r.cluster, OpReplica, region, oldPeer.GetStoreId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
checkerCounter.WithLabelValues("replica_checker", "new_operator").Inc()
return CreateRemovePeerOperator("removeExtraReplica", r.cluster, OpReplica, region, oldPeer.GetStoreId())
return op
}

return r.checkBestReplacement(region)
Expand Down Expand Up @@ -233,15 +238,25 @@ func (r *ReplicaChecker) checkBestReplacement(region *core.RegionInfo) *Operator
if err != nil {
return nil
}
op, err := CreateMovePeerOperator("moveToBetterLocation", r.cluster, region, OpReplica, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
checkerCounter.WithLabelValues("replica_checker", "new_operator").Inc()
return CreateMovePeerOperator("moveToBetterLocation", r.cluster, region, OpReplica, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
return op
}

func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, peer *metapb.Peer, status string) *Operator {
removeExtra := fmt.Sprintf("removeExtra%sReplica", status)
// Check the number of replicas first.
if len(region.GetPeers()) > r.cluster.GetMaxReplicas() {
return CreateRemovePeerOperator(removeExtra, r.cluster, OpReplica, region, peer.GetStoreId())
op, err := CreateRemovePeerOperator(removeExtra, r.cluster, OpReplica, region, peer.GetStoreId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
return op
}

removePending := fmt.Sprintf("removePending%sReplica", status)
Expand All @@ -250,7 +265,12 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, peer *metapb.Peer, sta
// D then removes C, D will not be successfully added util C is normal again.
// So it's better to remove C directly.
if region.GetPendingPeer(peer.GetId()) != nil {
return CreateRemovePeerOperator(removePending, r.cluster, OpReplica, region, peer.GetStoreId())
op, err := CreateRemovePeerOperator(removePending, r.cluster, OpReplica, region, peer.GetStoreId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
return op
}

storeID, _ := r.SelectBestReplacementStore(region, peer, NewStorageThresholdFilter())
Expand All @@ -264,5 +284,9 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, peer *metapb.Peer, sta
}

replace := fmt.Sprintf("replace%sReplica", status)
return CreateMovePeerOperator(replace, r.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
op, err := CreateMovePeerOperator(replace, r.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
return nil
}
return op
}
6 changes: 5 additions & 1 deletion server/schedulers/adjacent_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,11 @@ func (l *balanceAdjacentRegionScheduler) dispersePeer(cluster schedule.Cluster,
// record the store id and exclude it in next time
l.cacheRegions.assignedStoreIds = append(l.cacheRegions.assignedStoreIds, newPeer.GetStoreId())

op := schedule.CreateMovePeerOperator("balance-adjacent-peer", cluster, region, schedule.OpAdjacent, leaderStoreID, newPeer.GetStoreId(), newPeer.GetId())
op, err := schedule.CreateMovePeerOperator("balance-adjacent-peer", cluster, region, schedule.OpAdjacent, leaderStoreID, newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(l.GetName(), "create_operator_fail").Inc()
return nil
}
op.SetPriorityLevel(core.LowPriority)
schedulerCounter.WithLabelValues(l.GetName(), "adjacent_peer").Inc()
return op
Expand Down
7 changes: 6 additions & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ func (s *balanceRegionScheduler) transferPeer(cluster schedule.Cluster, region *
}
balanceRegionCounter.WithLabelValues("move_peer", source.GetAddress()+"-out").Inc()
balanceRegionCounter.WithLabelValues("move_peer", target.GetAddress()+"-in").Inc()
return schedule.CreateMovePeerOperator("balance-region", cluster, region, schedule.OpBalance, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
op, err := schedule.CreateMovePeerOperator("balance-region", cluster, region, schedule.OpBalance, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(s.GetName(), "create_operator_fail").Inc()
return nil
}
return op
}

// hasPotentialTarget is used to determine whether the specified sourceStore
Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,15 +336,15 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) {
tc.AddRegionStore(4, 16)
// Add region 1 with leader in store 4.
tc.AddLeaderRegion(1, 4)
testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 4, 1)
testutil.CheckTransferPeerWithLeaderTransfer(c, sb.Schedule(tc)[0], schedule.OpBalance, 4, 1)

// Test stateFilter.
tc.SetStoreOffline(1)
tc.UpdateRegionCount(2, 6)
cache.Remove(4)
// When store 1 is offline, it will be filtered,
// store 2 becomes the store with least regions.
testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 4, 2)
testutil.CheckTransferPeerWithLeaderTransfer(c, sb.Schedule(tc)[0], schedule.OpBalance, 4, 2)
opt.SetMaxReplicas(3)
c.Assert(sb.Schedule(tc), IsNil)

Expand Down
14 changes: 12 additions & 2 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,13 @@ func (h *balanceHotRegionsScheduler) balanceHotReadRegions(cluster schedule.Clus
// balance by peer
srcRegion, srcPeer, destPeer := h.balanceByPeer(cluster, h.stats.readStatAsLeader)
if srcRegion != nil {
op, err := schedule.CreateMovePeerOperator("moveHotReadRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(h.GetName(), "create_operator_fail").Inc()
return nil
}
schedulerCounter.WithLabelValues(h.GetName(), "move_peer").Inc()
return []*schedule.Operator{schedule.CreateMovePeerOperator("moveHotReadRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())}
return []*schedule.Operator{op}
}
schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc()
return nil
Expand All @@ -189,8 +194,13 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu
// balance by peer
srcRegion, srcPeer, destPeer := h.balanceByPeer(cluster, h.stats.writeStatAsPeer)
if srcRegion != nil {
op, err := schedule.CreateMovePeerOperator("moveHotWriteRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(h.GetName(), "create_operator_fail").Inc()
return nil
}
schedulerCounter.WithLabelValues(h.GetName(), "move_peer").Inc()
return []*schedule.Operator{schedule.CreateMovePeerOperator("moveHotWriteRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())}
return []*schedule.Operator{op}
}
case 1:
// balance by leader
Expand Down
6 changes: 5 additions & 1 deletion server/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ func (s *shuffleRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule.
return nil
}

op, err := schedule.CreateMovePeerOperator("shuffle-region", cluster, region, schedule.OpAdmin, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(s.GetName(), "create_operator_fail").Inc()
return nil
}
schedulerCounter.WithLabelValues(s.GetName(), "new_operator").Inc()
op := schedule.CreateMovePeerOperator("shuffle-region", cluster, region, schedule.OpAdmin, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
op.SetPriorityLevel(core.HighPriority)
return []*schedule.Operator{op}
}
Expand Down