Skip to content

Commit

Permalink
roachtest: benchmark repeated decommissions over time
Browse files Browse the repository at this point in the history
This adds new long-running configurations to the `decommissionBench`
roachtest, so that repeated decommissions over time in a cluster can be
benchmarked individually.

With these long-running tests, we've seen the following averages:
```
decommissionBench/nodes=4/cpu=16/warehouses=1000/duration=1h0m0s:
avg decommission: 16m41s, avg upreplication: 33m17s

decommissionBench/nodes=4/cpu=4/warehouses=100:
avg decommission: 2m44s, avg upreplication: 4m3s
```

Release note: None
  • Loading branch information
AlexTalks committed Jul 26, 2022
1 parent 4ab8292 commit 89dd056
Show file tree
Hide file tree
Showing 3 changed files with 369 additions and 84 deletions.
8 changes: 4 additions & 4 deletions pkg/cmd/roachtest/tests/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func registerAllocator(r registry.Registry) {
m = c.NewMonitor(ctx, clusNodes)
m.Go(func(ctx context.Context) error {
t.Status("waiting for reblance")
err := waitForRebalance(ctx, t.L(), db, maxStdDev)
err := waitForRebalance(ctx, t.L(), db, maxStdDev, allocatorStableSeconds)
if err != nil {
return err
}
Expand Down Expand Up @@ -281,14 +281,14 @@ func allocatorStats(db *gosql.DB) (s replicationStats, err error) {
}

// waitForRebalance waits until there's been no recent range adds, removes, and
// splits. We wait until the cluster is balanced or until `StableInterval`
// splits. We wait until the cluster is balanced or until `stableSeconds`
// elapses, whichever comes first. Then, it prints stats about the rebalancing
// process. If the replica count appears unbalanced, an error is returned.
//
// This method is crude but necessary. If we were to wait until range counts
// were just about even, we'd miss potential post-rebalance thrashing.
func waitForRebalance(
ctx context.Context, l *logger.Logger, db *gosql.DB, maxStdDev float64,
ctx context.Context, l *logger.Logger, db *gosql.DB, maxStdDev float64, stableSeconds int64,
) error {
const statsInterval = 2 * time.Second

Expand All @@ -307,7 +307,7 @@ func waitForRebalance(
}

l.Printf("%v\n", stats)
if allocatorStableSeconds <= stats.SecondsSinceLastEvent {
if stableSeconds <= stats.SecondsSinceLastEvent {
l.Printf("replica count stddev = %f, max allowed stddev = %f\n", stats.ReplicaCountStdDev, maxStdDev)
if stats.ReplicaCountStdDev > maxStdDev {
_ = printRebalanceStats(l, db)
Expand Down
61 changes: 40 additions & 21 deletions pkg/cmd/roachtest/tests/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,6 @@ func runDecommission(
}

m.Go(func() error {
getNodeID := func(node int) (int, error) {
dbNode := c.Conn(ctx, t.L(), node)
defer dbNode.Close()

var nodeID int
if err := dbNode.QueryRow(`SELECT node_id FROM crdb_internal.node_runtime_info LIMIT 1`).Scan(&nodeID); err != nil {
return 0, err
}
return nodeID, nil
}

tBegin, whileDown := timeutil.Now(), true
node := nodes
for timeutil.Since(tBegin) <= duration {
Expand All @@ -304,7 +293,7 @@ func runDecommission(
}

t.Status(fmt.Sprintf("decommissioning %d (down=%t)", node, whileDown))
nodeID, err := getNodeID(node)
nodeID, err := h.getLogicalNodeID(ctx, node)
if err != nil {
return err
}
Expand Down Expand Up @@ -1169,6 +1158,20 @@ func (h *decommTestHelper) stop(ctx context.Context, node int) {
h.c.Stop(ctx, h.t.L(), opts, h.c.Node(node))
}

// getLogicalNodeID connects to the nodeIdx-th node in the roachprod cluster to
// obtain the logical CockroachDB nodeID of this node. This is because nodes can
// change ID as they are continuously decommissioned, wiped, and started anew.
func (h *decommTestHelper) getLogicalNodeID(ctx context.Context, nodeIdx int) (int, error) {
dbNode := h.c.Conn(ctx, h.t.L(), nodeIdx)
defer dbNode.Close()

var nodeID int
if err := dbNode.QueryRow(`SELECT node_id FROM crdb_internal.node_runtime_info LIMIT 1`).Scan(&nodeID); err != nil {
return 0, err
}
return nodeID, nil
}

// decommission decommissions the given targetNodes, running the process
// through the specified runNode.
func (h *decommTestHelper) decommission(
Expand Down Expand Up @@ -1207,17 +1210,21 @@ func (h *decommTestHelper) recommission(

// checkDecommissioned validates that a node has successfully decommissioned
// and has updated its state in gossip.
func (h *decommTestHelper) checkDecommissioned(ctx context.Context, downNodeID, runNode int) error {
func (h *decommTestHelper) checkDecommissioned(
ctx context.Context, targetLogicalNodeID, runNode int,
) error {
db := h.c.Conn(ctx, h.t.L(), runNode)
defer db.Close()
var membership string
if err := db.QueryRow("SELECT membership FROM crdb_internal.kv_node_liveness WHERE node_id = $1",
downNodeID).Scan(&membership); err != nil {
if err := db.QueryRow(
"SELECT membership FROM crdb_internal.kv_node_liveness WHERE node_id = $1",
targetLogicalNodeID,
).Scan(&membership); err != nil {
return err
}

if membership != "decommissioned" {
return errors.Newf("node %d not decommissioned", downNodeID)
return errors.Newf("node %d not decommissioned", targetLogicalNodeID)
}

return nil
Expand All @@ -1226,19 +1233,25 @@ func (h *decommTestHelper) checkDecommissioned(ctx context.Context, downNodeID,
// waitReplicatedAwayFrom checks each second until there are no ranges present
// on a node and all ranges are fully replicated.
func (h *decommTestHelper) waitReplicatedAwayFrom(
ctx context.Context, downNodeID, runNode int,
ctx context.Context, targetLogicalNodeID, runNode int,
) error {
db := h.c.Conn(ctx, h.t.L(), runNode)
defer func() {
_ = db.Close()
}()

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

var count int
if err := db.QueryRow(
// Check if the down node has any replicas.
"SELECT count(*) FROM crdb_internal.ranges WHERE array_position(replicas, $1) IS NOT NULL",
downNodeID,
targetLogicalNodeID,
).Scan(&count); err != nil {
return err
}
Expand All @@ -1264,7 +1277,7 @@ func (h *decommTestHelper) waitReplicatedAwayFrom(
// the roachprod node and targetNodeID representing the logical nodeID within
// the cluster.
func (h *decommTestHelper) waitUpReplicated(
ctx context.Context, targetNodeID, targetNode int, database string,
ctx context.Context, targetLogicalNodeID, targetNode int, database string,
) error {
db := h.c.Conn(ctx, h.t.L(), targetNode)
defer func() {
Expand All @@ -1273,17 +1286,23 @@ func (h *decommTestHelper) waitUpReplicated(

var count int
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Check to see that there are no ranges where the target node is
// not part of the replica set.
stmtReplicaCount := fmt.Sprintf(
"SELECT count(*) FROM crdb_internal.ranges "+
"WHERE array_position(replicas, %d) IS NULL and database_name = '%s';",
targetNodeID, database,
targetLogicalNodeID, database,
)
if err := db.QueryRow(stmtReplicaCount).Scan(&count); err != nil {
return err
}
h.t.L().Printf("node%d (n%d) awaiting %d replica(s)", targetNode, targetNodeID, count)
h.t.L().Printf("node%d (n%d) awaiting %d replica(s)", targetNode, targetLogicalNodeID, count)
if count == 0 {
break
}
Expand Down
Loading

0 comments on commit 89dd056

Please sign in to comment.