Skip to content

Commit

Permalink
Merge pull request #2648 from mrtracy/mtracy/enable_rebalancing
Browse files Browse the repository at this point in the history
Add Rebalancing Command-line switch
  • Loading branch information
mrtracy committed Sep 24, 2015
2 parents 99a9b54 + 559c213 commit 016136e
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 4 deletions.
4 changes: 4 additions & 0 deletions cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ var flagUsage = map[string]string{
`,
"max-results": `
Define the maximum number of results that will be retrieved.
`,
"allow-rebalancing": `
Enables this server to rebalance replicas to other stores on the cluster.
`,
}

Expand Down Expand Up @@ -178,6 +181,7 @@ func initFlags(ctx *server.Context) {
f.StringVar(&ctx.Stores, "stores", ctx.Stores, flagUsage["stores"])
f.DurationVar(&ctx.MaxOffset, "max-offset", ctx.MaxOffset, flagUsage["max-offset"])
f.DurationVar(&ctx.MetricsFrequency, "metrics-frequency", ctx.MetricsFrequency, flagUsage["metrics-frequency"])
f.BoolVar(&ctx.AllowRebalancing, "allow-rebalancing", ctx.AllowRebalancing, flagUsage["allow-rebalancing"])

// Security flags.
f.StringVar(&ctx.Certs, "certs", ctx.Certs, flagUsage["certs"])
Expand Down
5 changes: 5 additions & 0 deletions server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
defaultScanMaxIdleTime = 5 * time.Second
defaultMetricsFrequency = 10 * time.Second
defaultTimeUntilStoreDead = 5 * time.Minute
defaultAllowRebalancing = false
)

// Context holds parameters needed to setup a server.
Expand Down Expand Up @@ -97,6 +98,9 @@ type Context struct {
// The value is split evenly between the stores if there are more than one.
CacheSize int64

// Enables this server to rebalance replicas to other servers.
AllowRebalancing bool

// Parsed values.

// Engines is the storage instances specified by Stores.
Expand Down Expand Up @@ -138,6 +142,7 @@ func NewContext() *Context {
ScanMaxIdleTime: defaultScanMaxIdleTime,
MetricsFrequency: defaultMetricsFrequency,
TimeUntilStoreDead: defaultTimeUntilStoreDead,
AllowRebalancing: defaultAllowRebalancing,
}
// Initializes base context defaults.
ctx.InitDefaults()
Expand Down
3 changes: 3 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {
EventFeed: feed,
Tracer: tracer,
StorePool: s.storePool,
RebalancingOptions: storage.RebalancingOptions{
AllowRebalance: s.ctx.AllowRebalancing,
},
}
s.node = NewNode(nCtx)
s.admin = newAdminServer(s.db, s.stopper)
Expand Down
11 changes: 10 additions & 1 deletion storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,12 @@ func (a Allocator) ShouldRebalance(storeID proto.StoreID) bool {
if !a.options.Deterministic && a.randGen.Float32() > rebalanceShouldRebalanceChance {
return false
}
if log.V(2) {
log.Infof("Attempting to rebalance from store %d", storeID)
}
storeDesc := a.storePool.getStoreDescriptor(storeID)
if storeDesc == nil {
if log.V(1) {
if log.V(2) {
log.Warningf(
"shouldRebalance couldn't find store with id %d in StorePool",
storeID)
Expand All @@ -336,6 +339,9 @@ func (a Allocator) ShouldRebalance(storeID proto.StoreID) bool {
// if the number of ranges on the store is above average. This is primarily
// useful for distributing load in a nascent deployment.
if sl.used.mean < minFractionUsedThreshold {
if log.V(2) {
log.Infof("Attempting to rebalance using range counts, count = %d, mean = %f", storeDesc.Capacity.RangeCount, sl.count.mean)
}
return float64(storeDesc.Capacity.RangeCount) > math.Ceil(sl.count.mean)
}
// A store is eligible for rebalancing if its disk usage is sufficiently above
Expand All @@ -347,6 +353,9 @@ func (a Allocator) ShouldRebalance(storeID proto.StoreID) bool {
// usage.
minFractionUsed = maxFractionUsedThreshold
}
if log.V(2) {
log.Infof("Attempting to rebalance using total fraction used, threshold = %f, used = %f", minFractionUsed, storeDesc.Capacity.FractionUsed())
}
return storeDesc.Capacity.FractionUsed() > minFractionUsed
}

Expand Down
19 changes: 16 additions & 3 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/multiraft"
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util"
Expand Down Expand Up @@ -1489,9 +1490,21 @@ func (r *Replica) resolveIntents(ctx context.Context, intents []proto.Intent) {
if log.V(1) {
log.Warningc(ctx, "batch resolve failed: %s", err)
}
if _, ok := err.(*proto.RangeKeyMismatchError); !ok {
// TODO(tschottdorf)
panic(fmt.Sprintf("intent resolution failed, error: %s", err.Error()))
// At this point, as long as the local Replica accepts the
// request it should never fail. However, the replica may reject
// the request in certain cases (for example, if the replica has
// been removed from its range via a rebalancing a command).
// Therefore, we inspect the returned error to detect cases
// where the command was rejected, and can safely ignore those
// errors.
if err != multiraft.ErrGroupDeleted {
switch err.(type) {
case *proto.RangeKeyMismatchError:
case *proto.NotLeaderError:
default:
// TODO(tschottdorf): Does this need to be a panic?
panic(fmt.Sprintf("intent resolution failed with unexpected error: %s", err))
}
}
}
}
Expand Down

0 comments on commit 016136e

Please sign in to comment.