Skip to content

Commit

Permalink
kvserver: make AdminRelocateRange work with non-voting replicas
Browse files Browse the repository at this point in the history
This commit teaches `RelocateRange` to work with non-voters. This lets
the merge queue rebalance a range that has non-voters so the merge can
actually proceed, as range merges require that replica sets of the LHS
and RHS ranges must be aligned.

Release note: None
  • Loading branch information
aayushshah15 committed Jan 7, 2021
1 parent d2e7818 commit bfe698a
Show file tree
Hide file tree
Showing 19 changed files with 1,426 additions and 893 deletions.
7 changes: 5 additions & 2 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,9 @@ func (b *Batch) adminChangeReplicas(

// adminRelocateRange is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminRelocateRange(key interface{}, targets []roachpb.ReplicationTarget) {
func (b *Batch) adminRelocateRange(
key interface{}, voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -725,7 +727,8 @@ func (b *Batch) adminRelocateRange(key interface{}, targets []roachpb.Replicatio
RequestHeader: roachpb.RequestHeader{
Key: k,
},
Targets: targets,
VoterTargets: voterTargets,
NonVoterTargets: nonVoterTargets,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,10 @@ func (db *DB) AdminChangeReplicas(
// AdminRelocateRange relocates the replicas for a range onto the specified
// list of stores.
func (db *DB) AdminRelocateRange(
ctx context.Context, key interface{}, targets []roachpb.ReplicationTarget,
ctx context.Context, key interface{}, voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
) error {
b := &Batch{}
b.adminRelocateRange(key, targets)
b.adminRelocateRange(key, voterTargets, nonVoterTargets)
return getOneErr(db.Run(ctx, b), b)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/build",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/config/zonepb",
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,17 +495,17 @@ func (a *Allocator) AllocateTarget(

func (a *Allocator) allocateTargetFromList(
ctx context.Context,
sl StoreList,
candidateStores StoreList,
zone *zonepb.ZoneConfig,
candidateReplicas []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
options scorerOptions,
) (*roachpb.StoreDescriptor, string) {
analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, candidateReplicas, zone)
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
candidates := allocateCandidates(
ctx,
sl, analyzedConstraints, candidateReplicas,
a.storePool.getLocalitiesByStore(candidateReplicas),
candidateStores, analyzedConstraints, existingReplicas,
a.storePool.getLocalitiesByStore(existingReplicas),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
options,
)
Expand Down Expand Up @@ -560,17 +560,17 @@ func (a Allocator) RemoveTarget(
}

// Retrieve store descriptors for the provided candidates from the StorePool.
existingStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
candidateStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
for i, exist := range candidates {
existingStoreIDs[i] = exist.StoreID
candidateStoreIDs[i] = exist.StoreID
}
sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, storeFilterNone)
candidateStoreList, _, _ := a.storePool.getStoreListFromIDs(candidateStoreIDs, storeFilterNone)

analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
options := a.scorerOptions()
rankedCandidates := removeCandidates(
sl,
candidateStoreList,
analyzedConstraints,
a.storePool.getLocalitiesByStore(existingReplicas),
options,
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,16 +409,16 @@ func (cl candidateList) removeCandidate(c candidate) candidateList {
// stores that meet the criteria are included in the list.
func allocateCandidates(
ctx context.Context,
sl StoreList,
candidateStores StoreList,
constraints constraint.AnalyzedConstraints,
existing []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
options scorerOptions,
) candidateList {
var candidates candidateList
for _, s := range sl.stores {
if nodeHasReplica(s.Node.NodeID, existing) {
for _, s := range candidateStores.stores {
if nodeHasReplica(s.Node.NodeID, existingReplicas) {
continue
}
if !isNodeValidForRoutineReplicaTransfer(ctx, s.Node.NodeID) {
Expand All @@ -433,14 +433,14 @@ func allocateCandidates(
continue
}
diversityScore := diversityAllocateScore(s, existingStoreLocalities)
balanceScore := balanceScore(sl, s.Capacity, options)
balanceScore := balanceScore(candidateStores, s.Capacity, options)
var convergesScore int
if options.qpsRebalanceThreshold > 0 {
if s.Capacity.QueriesPerSecond < underfullThreshold(sl.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
if s.Capacity.QueriesPerSecond < underfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
convergesScore = 1
} else if s.Capacity.QueriesPerSecond < sl.candidateQueriesPerSecond.mean {
} else if s.Capacity.QueriesPerSecond < candidateStores.candidateQueriesPerSecond.mean {
convergesScore = 0
} else if s.Capacity.QueriesPerSecond < overfullThreshold(sl.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
} else if s.Capacity.QueriesPerSecond < overfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
convergesScore = -1
} else {
convergesScore = -2
Expand Down
Loading

0 comments on commit bfe698a

Please sign in to comment.