Skip to content

Commit

Permalink
Merge #44769
Browse files Browse the repository at this point in the history
44769: kvnemesis: add a multi-node variant of the test r=nvanbenschoten a=danhhz

To make this more interesting, at the same time, add support for the
ChangeReplicas command.

Release note: None

Co-authored-by: Daniel Harrison <daniel.harrison@gmail.com>
  • Loading branch information
craig[bot] and danhhz committed Feb 21, 2020
2 parents bef461c + ecfb9a3 commit 2627ba3
Show file tree
Hide file tree
Showing 13 changed files with 834 additions and 210 deletions.
80 changes: 68 additions & 12 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,28 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// Applier executes Steps.
type Applier struct {
db *client.DB
mu struct {
dbs []*client.DB
mu struct {
dbIdx int
syncutil.Mutex
txns map[string]*client.Txn
}
}

// MakeApplier constructs an Applier that executes against the given DB.
func MakeApplier(db *client.DB) *Applier {
func MakeApplier(dbs ...*client.DB) *Applier {
a := &Applier{
db: db,
dbs: dbs,
}
a.mu.txns = make(map[string]*client.Txn)
return a
Expand All @@ -41,29 +45,46 @@ func MakeApplier(db *client.DB) *Applier {
// error is only returned from Apply if there is an internal coding error within
// Applier, errors from a Step execution are saved in the Step itself.
func (a *Applier) Apply(ctx context.Context, step *Step) (retErr error) {
step.Before = a.db.Clock().Now()
var db *client.DB
db, step.DBID = a.getNextDBRoundRobin()

step.Before = db.Clock().Now()
defer func() {
step.After = a.db.Clock().Now()
step.After = db.Clock().Now()
if p := recover(); p != nil {
retErr = errors.Errorf(`panic applying step %s: %v`, step, p)
}
}()
a.applyOp(ctx, &step.Op)
applyOp(ctx, db, &step.Op)
return nil
}

func (a *Applier) applyOp(ctx context.Context, op *Operation) {
func (a *Applier) getNextDBRoundRobin() (*client.DB, int32) {
a.mu.Lock()
dbIdx := a.mu.dbIdx
a.mu.dbIdx = (a.mu.dbIdx + 1) % len(a.dbs)
a.mu.Unlock()
return a.dbs[dbIdx], int32(dbIdx)
}

func applyOp(ctx context.Context, db *client.DB, op *Operation) {
switch o := op.GetValue().(type) {
case *GetOperation, *PutOperation, *BatchOperation:
applyClientOp(ctx, a.db, op)
applyClientOp(ctx, db, op)
case *SplitOperation:
err := a.db.AdminSplit(ctx, o.Key, o.Key, hlc.MaxTimestamp)
err := db.AdminSplit(ctx, o.Key, o.Key, hlc.MaxTimestamp)
o.Result = resultError(ctx, err)
case *MergeOperation:
err := a.db.AdminMerge(ctx, o.Key)
err := db.AdminMerge(ctx, o.Key)
o.Result = resultError(ctx, err)
case *ChangeReplicasOperation:
ctx = client.ChangeReplicasCanMixAddAndRemoveContext(ctx)
desc := getRangeDesc(ctx, o.Key, db)
_, err := db.AdminChangeReplicas(ctx, o.Key, desc, o.Changes)
// TODO(dan): Save returned desc?
o.Result = resultError(ctx, err)
case *ClosureTxnOperation:
txnErr := a.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txnErr := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
for i := range o.Ops {
op := &o.Ops[i]
applyClientOp(ctx, txn, op)
Expand Down Expand Up @@ -181,3 +202,38 @@ func resultError(ctx context.Context, err error) Result {
Err: &ee,
}
}

func getRangeDesc(ctx context.Context, key roachpb.Key, dbs ...*client.DB) roachpb.RangeDescriptor {
var dbIdx int
var opts = retry.Options{}
for r := retry.StartWithCtx(ctx, opts); r.Next(); dbIdx = (dbIdx + 1) % len(dbs) {
sender := dbs[dbIdx].NonTransactionalSender()
descs, _, err := client.RangeLookup(ctx, sender, key, roachpb.CONSISTENT, 0, false)
if err != nil {
log.Infof(ctx, "looking up descriptor for %s: %+v", key, err)
continue
}
if len(descs) != 1 {
log.Infof(ctx, "unexpected number of descriptors for %s: %d", key, len(descs))
continue
}
return descs[0]
}
panic(`unreachable`)
}

func newGetReplicasFn(dbs ...*client.DB) GetReplicasFn {
ctx := context.Background()
return func(key roachpb.Key) []roachpb.ReplicationTarget {
desc := getRangeDesc(ctx, key, dbs...)
replicas := desc.Replicas().All()
targets := make([]roachpb.ReplicationTarget, len(replicas))
for i, replica := range replicas {
targets[i] = roachpb.ReplicationTarget{
NodeID: replica.NodeID,
StoreID: replica.StoreID,
}
}
return targets
}
}
32 changes: 16 additions & 16 deletions pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestApplier(t *testing.T) {
defer tc.Stopper().Stop(ctx)
db := tc.Server(0).DB()

a := MakeApplier(db)
a := MakeApplier(db, db)
check := func(t *testing.T, s Step, expected string) {
t.Helper()
require.NoError(t, a.Apply(ctx, &s))
Expand All @@ -45,35 +45,35 @@ func TestApplier(t *testing.T) {
}

// Basic operations
check(t, step(get(`a`)), `db.Get(ctx, "a") // (nil, nil)`)
check(t, step(get(`a`)), `db0.Get(ctx, "a") // (nil, nil)`)

check(t, step(put(`a`, `1`)), `db.Put(ctx, "a", 1) // nil`)
check(t, step(get(`a`)), `db.Get(ctx, "a") // ("1", nil)`)
check(t, step(put(`a`, `1`)), `db1.Put(ctx, "a", 1) // nil`)
check(t, step(get(`a`)), `db0.Get(ctx, "a") // ("1", nil)`)

checkErr(t, step(get(`a`)), `db.Get(ctx, "a") // (nil, aborted in distSender: context canceled)`)
checkErr(t, step(put(`a`, `1`)), `db.Put(ctx, "a", 1) // aborted in distSender: context canceled`)
checkErr(t, step(get(`a`)), `db1.Get(ctx, "a") // (nil, aborted in distSender: context canceled)`)
checkErr(t, step(put(`a`, `1`)), `db0.Put(ctx, "a", 1) // aborted in distSender: context canceled`)

// Batch
check(t, step(batch(put(`b`, `2`), get(`a`))), `
{
b := &Batch{}
b.Put(ctx, "b", 2) // nil
b.Get(ctx, "a") // ("1", nil)
db.Run(ctx, b) // nil
db1.Run(ctx, b) // nil
}
`)
checkErr(t, step(batch(put(`b`, `2`), get(`a`))), `
{
b := &Batch{}
b.Put(ctx, "b", 2) // aborted in distSender: context canceled
b.Get(ctx, "a") // (nil, aborted in distSender: context canceled)
db.Run(ctx, b) // aborted in distSender: context canceled
db0.Run(ctx, b) // aborted in distSender: context canceled
}
`)

// Txn commit
check(t, step(closureTxn(ClosureTxnType_Commit, put(`e`, `5`), batch(put(`f`, `6`)))), `
db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
db1.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.Put(ctx, "e", 5) // nil
{
b := &Batch{}
Expand All @@ -86,7 +86,7 @@ db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {

// Txn commit in batch
check(t, step(closureTxnCommitInBatch(opSlice(get(`a`), put(`f`, `6`)), put(`e`, `5`))), `
db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
db0.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.Put(ctx, "e", 5) // nil
b := &Batch{}
b.Get(ctx, "a") // ("1", nil)
Expand All @@ -98,25 +98,25 @@ db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {

// Txn rollback
check(t, step(closureTxn(ClosureTxnType_Rollback, put(`e`, `5`))), `
db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
db1.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.Put(ctx, "e", 5) // nil
return errors.New("rollback")
}) // rollback
`)

// Txn error
checkErr(t, step(closureTxn(ClosureTxnType_Rollback, put(`e`, `5`))), `
db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
db0.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.Put(ctx, "e", 5)
return errors.New("rollback")
}) // context canceled
`)

// Splits and merges
check(t, step(split(`foo`)), `db.AdminSplit(ctx, "foo") // nil`)
check(t, step(merge(`foo`)), `db.AdminMerge(ctx, "foo") // nil`)
check(t, step(split(`foo`)), `db1.AdminSplit(ctx, "foo") // nil`)
check(t, step(merge(`foo`)), `db0.AdminMerge(ctx, "foo") // nil`)
checkErr(t, step(split(`foo`)),
`db.AdminSplit(ctx, "foo") // aborted in distSender: context canceled`)
`db1.AdminSplit(ctx, "foo") // aborted in distSender: context canceled`)
checkErr(t, step(merge(`foo`)),
`db.AdminMerge(ctx, "foo") // aborted in distSender: context canceled`)
`db0.AdminMerge(ctx, "foo") // aborted in distSender: context canceled`)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
// - Validate read only transactions
// - CPut/InitPut/Increment/Delete
// - DeleteRange/ClearRange/RevertRange/Scan/ReverseScan
// - ChangeReplicas/TransferLease
// - TransferLease
// - ExportRequest
// - AddSSTable
// - Root and leaf transactions
Expand Down
Loading

0 comments on commit 2627ba3

Please sign in to comment.