diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index ab2882022372..026d77c22bfa 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -45,7 +45,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/ts" "github.com/cockroachdb/cockroach/pkg/ts/tspb" @@ -81,13 +80,17 @@ func adminSplitArgs(splitKey roachpb.Key) *roachpb.AdminSplitRequest { func TestStoreRangeSplitAtIllegalKeys(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - cfg := kvserver.TestStoreConfig(nil) - cfg.TestingKnobs.DisableSplitQueue = true - cfg.TestingKnobs.DisableMergeQueue = true - store := createTestStoreWithConfig(t, stopper, cfg) + ctx := context.Background() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + }, + }) + defer s.Stopper().Stop(ctx) for _, key := range []roachpb.Key{ keys.Meta1Prefix, @@ -98,7 +101,7 @@ func TestStoreRangeSplitAtIllegalKeys(t *testing.T) { keys.SystemSQLCodec.TablePrefix(10 /* system descriptor ID */), } { args := adminSplitArgs(key) - _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), args) + _, pErr := kv.SendWrapped(context.Background(), s.DB().NonTransactionalSender(), args) if !testutils.IsPError(pErr, "cannot split") { t.Errorf("%q: unexpected split error %s", key, pErr) } @@ -111,16 +114,19 @@ func TestStoreSplitAbortSpan(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - manualClock := hlc.NewManualClock(2400 * time.Hour.Nanoseconds()) - clock := hlc.NewClock(manualClock.UnixNano, time.Millisecond) - storeCfg := kvserver.TestStoreConfig(clock) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) left, middle, right := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c") @@ -166,10 +172,15 @@ func TestStoreSplitAbortSpan(t *testing.T) { } thresh := kvserverbase.TxnCleanupThreshold.Nanoseconds() - // Pick a non-gcable and gcable timestamp, respectively. Avoid the clock's - // exact timestamp because of unpredictable logical ticks. - tsFresh := hlc.Timestamp{WallTime: manualClock.UnixNano() - thresh + 1} - tsStale := hlc.Timestamp{WallTime: manualClock.UnixNano() - thresh - 1} + // Make sure this test doesn't run out of padding time if we significantly + // reduce TxnCleanupThreshold in the future for whatever reason. + require.Greater(t, thresh, int64(time.Minute)) + // Pick a non-gcable and gcable timestamp, respectively. + // Use the current time for some non-expired abort span records. + // Note that the cleanup threshold is so large that while this test runs, + // these records won't expire. + tsFresh := hlc.Timestamp{WallTime: s.Clock().Now().WallTime} + tsStale := hlc.Timestamp{WallTime: s.Clock().Now().WallTime - thresh - 1} args := []roachpb.Request{ populateAbortSpan(key(left, 1), tsFresh), @@ -244,27 +255,35 @@ func TestStoreSplitAbortSpan(t *testing.T) { func TestStoreRangeSplitAtTablePrefix(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) + + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) key := keys.UserTableDataMin args := adminSplitArgs(key) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), args); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), args); pErr != nil { t.Fatalf("%q: split unexpected error: %s", key, pErr) } - var desc descpb.TableDescriptor + var desc descpb.Descriptor descBytes, err := protoutil.Marshal(&desc) if err != nil { t.Fatal(err) } // Update SystemConfig to trigger gossip. - if err := store.DB().Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + if err := store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { return err } @@ -301,12 +320,20 @@ func TestStoreRangeSplitAtTablePrefix(t *testing.T) { func TestStoreRangeSplitInsideRow(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) + + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) // Manually create some the column keys corresponding to the table: // @@ -324,16 +351,16 @@ func TestStoreRangeSplitInsideRow(t *testing.T) { } // We don't care about the value, so just store any old thing. - if err := store.DB().Put(context.Background(), col1Key, "column 1"); err != nil { + if err := store.DB().Put(ctx, col1Key, "column 1"); err != nil { t.Fatal(err) } - if err := store.DB().Put(context.Background(), col2Key, "column 2"); err != nil { + if err := store.DB().Put(ctx, col2Key, "column 2"); err != nil { t.Fatal(err) } // Split between col1Key and col2Key by splitting before col2Key. args := adminSplitArgs(col2Key) - _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), args) + _, pErr := kv.SendWrapped(ctx, store.TestSender(), args) if pErr != nil { t.Fatalf("%s: split unexpected error: %s", col1Key, pErr) } @@ -362,27 +389,35 @@ func TestStoreRangeSplitInsideRow(t *testing.T) { func TestStoreRangeSplitIntents(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) + + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) // First, write some values left and right of the proposed split key. pArgs := putArgs([]byte("c"), []byte("foo")) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), pArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), pArgs); pErr != nil { t.Fatal(pErr) } pArgs = putArgs([]byte("x"), []byte("bar")) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), pArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), pArgs); pErr != nil { t.Fatal(pErr) } // Split the range. splitKey := roachpb.Key("m") args := adminSplitArgs(splitKey) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), args); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), args); pErr != nil { t.Fatal(pErr) } @@ -393,7 +428,7 @@ func TestStoreRangeSplitIntents(t *testing.T) { } for _, key := range []roachpb.Key{keys.RangeDescriptorKey(roachpb.RKeyMin), keys.RangeDescriptorKey(splitKeyAddr)} { if _, _, err := storage.MVCCGet( - context.Background(), store.Engine(), key, store.Clock().Now(), storage.MVCCGetOptions{}, + ctx, store.Engine(), key, store.Clock().Now(), storage.MVCCGetOptions{}, ); err != nil { t.Errorf("failed to read consistent range descriptor for key %s: %+v", key, err) } @@ -435,26 +470,34 @@ func TestStoreRangeSplitIntents(t *testing.T) { func TestStoreRangeSplitAtRangeBounds(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) + + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) // Split range 1 at an arbitrary key. key := roachpb.Key("a") rngID := store.LookupReplica(roachpb.RKey(key)).RangeID h := roachpb.Header{RangeID: rngID} args := adminSplitArgs(key) - if _, pErr := kv.SendWrappedWith(context.Background(), store, h, args); pErr != nil { + if _, pErr := kv.SendWrappedWith(ctx, store, h, args); pErr != nil { t.Fatal(pErr) } replCount := store.ReplicaCount() // An AdminSplit request sent to the end of the old range // should fail with a RangeKeyMismatchError. - _, pErr := kv.SendWrappedWith(context.Background(), store, h, args) + _, pErr := kv.SendWrappedWith(ctx, store, h, args) if _, ok := pErr.GetDetail().(*roachpb.RangeKeyMismatchError); !ok { t.Fatalf("expected RangeKeyMismatchError, found: %v", pErr) } @@ -463,7 +506,7 @@ func TestStoreRangeSplitAtRangeBounds(t *testing.T) { // should succeed but no new ranges should be created. newRng := store.LookupReplica(roachpb.RKey(key)) h.RangeID = newRng.RangeID - if _, pErr := kv.SendWrappedWith(context.Background(), store, h, args); pErr != nil { + if _, pErr := kv.SendWrappedWith(ctx, store, h, args); pErr != nil { t.Fatal(pErr) } @@ -544,29 +587,31 @@ SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE func TestStoreRangeSplitIdempotency(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithOpts(t, - testStoreOpts{ - // This test was written before the test stores were able to start with - // more than one range and is not prepared to handle many ranges. - dontCreateSystemRanges: true, - cfg: &storeCfg}, - stopper) - rangeID := roachpb.RangeID(1) + + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + splitKey := roachpb.Key("m") content := roachpb.Key("asdvb") // First, write some values left and right of the proposed split key. pArgs := putArgs([]byte("c"), content) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), pArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), pArgs); pErr != nil { t.Fatal(pErr) } pArgs = putArgs([]byte("x"), content) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), pArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), pArgs); pErr != nil { t.Fatal(pErr) } @@ -578,7 +623,7 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { lTxn := txn lTxn.Sequence++ lIncArgs.Sequence = lTxn.Sequence - if _, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{ + if _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{ Txn: &lTxn, }, lIncArgs); pErr != nil { t.Fatal(pErr) @@ -587,14 +632,16 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { rTxn := txn rTxn.Sequence++ rIncArgs.Sequence = rTxn.Sequence - if _, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{ + if _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{ Txn: &rTxn, }, rIncArgs); pErr != nil { t.Fatal(pErr) } + originalRepl := store.LookupReplica(roachpb.RKey(splitKey)) + require.NotNil(t, originalRepl) // Get the original stats for key and value bytes. - ms, err := stateloader.Make(rangeID).LoadMVCCStats(context.Background(), store.Engine()) + ms, err := stateloader.Make(originalRepl.RangeID).LoadMVCCStats(ctx, store.Engine()) if err != nil { t.Fatal(err) } @@ -602,7 +649,7 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { // Split the range. args := adminSplitArgs(splitKey) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), args); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), args); pErr != nil { t.Fatal(pErr) } @@ -619,20 +666,16 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { } } - repl := store.LookupReplica(roachpb.RKeyMin) - rngDesc := repl.Desc() + rngDesc := originalRepl.Desc() newRng := store.LookupReplica([]byte("m")) newRngDesc := newRng.Desc() if !bytes.Equal(newRngDesc.StartKey, splitKey) || !bytes.Equal(splitKey, rngDesc.EndKey) { t.Errorf("ranges mismatched, wanted %q=%q=%q", newRngDesc.StartKey, splitKey, rngDesc.EndKey) } - if !bytes.Equal(newRngDesc.EndKey, roachpb.RKeyMax) || !bytes.Equal(rngDesc.StartKey, roachpb.RKeyMin) { - t.Errorf("new ranges do not cover KeyMin-KeyMax, but only %q-%q", rngDesc.StartKey, newRngDesc.EndKey) - } // Try to get values from both left and right of where the split happened. gArgs := getArgs([]byte("c")) - if reply, pErr := kv.SendWrapped(context.Background(), store.TestSender(), gArgs); pErr != nil { + if reply, pErr := kv.SendWrapped(ctx, store.TestSender(), gArgs); pErr != nil { t.Fatal(pErr) } else if replyBytes, pErr := reply.(*roachpb.GetResponse).Value.GetBytes(); pErr != nil { t.Fatal(pErr) @@ -640,7 +683,7 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { t.Fatalf("actual value %q did not match expected value %q", replyBytes, content) } gArgs = getArgs([]byte("x")) - if reply, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{ + if reply, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{ RangeID: newRng.RangeID, }, gArgs); pErr != nil { t.Fatal(pErr) @@ -652,7 +695,7 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { // Send out an increment request copied from above (same txn/sequence) // which remains in the old range. - _, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{ + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{ Txn: &lTxn, }, lIncArgs) if pErr != nil { @@ -661,7 +704,7 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { // Send out the same increment copied from above (same txn/sequence), but // now to the newly created range (which should hold that key). - _, pErr = kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{ + _, pErr = kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{ RangeID: newRng.RangeID, Txn: &rTxn, }, rIncArgs) @@ -671,12 +714,12 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { // Compare stats of split ranges to ensure they are non zero and // exceed the original range when summed. - left, err := stateloader.Make(rangeID).LoadMVCCStats(context.Background(), store.Engine()) + left, err := stateloader.Make(originalRepl.RangeID).LoadMVCCStats(ctx, store.Engine()) if err != nil { t.Fatal(err) } lKeyBytes, lValBytes := left.KeyBytes, left.ValBytes - right, err := stateloader.Make(newRng.RangeID).LoadMVCCStats(context.Background(), store.Engine()) + right, err := stateloader.Make(newRng.RangeID).LoadMVCCStats(ctx, store.Engine()) if err != nil { t.Fatal(err) } @@ -688,10 +731,10 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { if lValBytes == 0 || rValBytes == 0 { t.Errorf("expected non-zero val bytes; got %d, %d", lValBytes, rValBytes) } - if lKeyBytes+rKeyBytes <= keyBytes { + if lKeyBytes+rKeyBytes != keyBytes { t.Errorf("left + right key bytes don't match; %d + %d <= %d", lKeyBytes, rKeyBytes, keyBytes) } - if lValBytes+rValBytes <= valBytes { + if lValBytes+rValBytes != valBytes { t.Errorf("left + right val bytes don't match; %d + %d <= %d", lValBytes, rValBytes, valBytes) } } @@ -704,14 +747,22 @@ func TestStoreRangeSplitIdempotency(t *testing.T) { func TestStoreRangeSplitStats(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - manual := hlc.NewManualClock(123) - storeCfg := kvserver.TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond)) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + + start := s.Clock().Now() // Split the range after the last table data key. keyPrefix := keys.SystemSQLCodec.TablePrefix(keys.MinUserDescID) @@ -723,7 +774,7 @@ func TestStoreRangeSplitStats(t *testing.T) { repl := store.LookupReplica(roachpb.RKey(keyPrefix)) // NOTE that this value is expected to change over time, depending on what // we store in the sys-local keyspace. Update it accordingly for this test. - empty := enginepb.MVCCStats{LastUpdateNanos: manual.UnixNano()} + empty := enginepb.MVCCStats{LastUpdateNanos: start.WallTime} if err := verifyRangeStats(store.Engine(), repl.RangeID, empty); err != nil { t.Fatal(err) } @@ -738,15 +789,13 @@ func TestStoreRangeSplitStats(t *testing.T) { if err != nil { t.Fatal(err) } - if err := verifyRecomputedStats(snap, repl.Desc(), ms, manual.UnixNano()); err != nil { + if err := verifyRecomputedStats(snap, repl.Desc(), ms, start.WallTime); err != nil { t.Fatalf("failed to verify range's stats before split: %+v", err) } if inMemMS := repl.GetMVCCStats(); inMemMS != ms { t.Fatalf("in-memory and on-disk diverged:\n%+v\n!=\n%+v", inMemMS, ms) } - manual.Increment(100) - // Split the range at approximate halfway point. args = adminSplitArgs(midKey) if _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{ @@ -785,19 +834,18 @@ func TestStoreRangeSplitStats(t *testing.T) { } // Stats should both have the new timestamp. - now := manual.UnixNano() - if lTs := msLeft.LastUpdateNanos; lTs != now { - t.Errorf("expected left range stats to have new timestamp, want %d, got %d", now, lTs) + if lTs := msLeft.LastUpdateNanos; lTs < start.WallTime { + t.Errorf("expected left range stats to have new timestamp, want %d, got %d", start.WallTime, lTs) } - if rTs := msRight.LastUpdateNanos; rTs != now { - t.Errorf("expected right range stats to have new timestamp, want %d, got %d", now, rTs) + if rTs := msRight.LastUpdateNanos; rTs < start.WallTime { + t.Errorf("expected right range stats to have new timestamp, want %d, got %d", start.WallTime, rTs) } // Stats should agree with recomputation. - if err := verifyRecomputedStats(snap, repl.Desc(), msLeft, now); err != nil { + if err := verifyRecomputedStats(snap, repl.Desc(), msLeft, s.Clock().PhysicalNow()); err != nil { t.Fatalf("failed to verify left range's stats after split: %+v", err) } - if err := verifyRecomputedStats(snap, replRight.Desc(), msRight, now); err != nil { + if err := verifyRecomputedStats(snap, replRight.Desc(), msRight, s.Clock().PhysicalNow()); err != nil { t.Fatalf("failed to verify right range's stats after split: %+v", err) } } @@ -834,23 +882,17 @@ func TestStoreEmptyRangeSnapshotSize(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - - // Disable the replicate queue, the split queue, and the merge queue as we - // want to control both rebalancing, splits, and merges ourselves. - sc := kvserver.TestStoreConfig(nil) - sc.TestingKnobs.DisableReplicateQueue = true - sc.TestingKnobs.DisableSplitQueue = true - sc.TestingKnobs.DisableMergeQueue = true - - mtc := &multiTestContext{storeConfig: &sc} - defer mtc.Stop() - mtc.Start(t, 2) + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) // Split the range after the last table data key to get a range that contains // no user data. splitKey := keys.SystemSQLCodec.TablePrefix(keys.MinUserDescID) splitArgs := adminSplitArgs(splitKey) - if _, err := kv.SendWrapped(ctx, mtc.distSenders[0], splitArgs); err != nil { + if _, err := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), splitArgs); err != nil { t.Fatal(err) } @@ -861,7 +903,7 @@ func TestStoreEmptyRangeSnapshotSize(t *testing.T) { headers []*kvserver.SnapshotRequest_Header }{} messageHandler := RaftMessageHandlerInterceptor{ - RaftMessageHandler: mtc.stores[1], + RaftMessageHandler: tc.GetFirstStoreFromServer(t, 1), handleSnapshotFilter: func(header *kvserver.SnapshotRequest_Header) { // Each snapshot request is handled in a new goroutine, so we need // synchronization. @@ -870,12 +912,11 @@ func TestStoreEmptyRangeSnapshotSize(t *testing.T) { messageRecorder.headers = append(messageRecorder.headers, header) }, } - mtc.transport.Listen(mtc.stores[1].StoreID(), messageHandler) + tc.Servers[1].RaftTransport().Listen(tc.GetFirstStoreFromServer(t, 1).StoreID(), messageHandler) // Replicate the newly-split range to trigger a snapshot request from store 0 // to store 1. - rangeID := mtc.stores[0].LookupReplica(roachpb.RKey(splitKey)).RangeID - mtc.replicateRange(rangeID, 1) + desc := tc.AddVotersOrFatal(t, splitKey, tc.Target(1)) // Verify that we saw at least one snapshot request, messageRecorder.Lock() @@ -884,7 +925,7 @@ func TestStoreEmptyRangeSnapshotSize(t *testing.T) { t.Fatalf("expected at least one snapshot header, but got %d", a) } for i, header := range messageRecorder.headers { - if e, a := header.State.Desc.RangeID, rangeID; e != a { + if e, a := header.State.Desc.RangeID, desc.RangeID; e != a { t.Errorf("%d: expected RangeID to be %d, but got %d", i, e, a) } if header.RangeSize != 0 { @@ -907,13 +948,22 @@ func TestStoreEmptyRangeSnapshotSize(t *testing.T) { func TestStoreRangeSplitStatsWithMerges(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - manual := hlc.NewManualClock(123) - storeCfg := kvserver.TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond)) - storeCfg.TestingKnobs.DisableSplitQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableSplitQueue: true, + DisableMergeQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + + start := s.Clock().Now() // Split the range after the last table data key. keyPrefix := keys.SystemSQLCodec.TablePrefix(keys.MinUserDescID) @@ -925,14 +975,13 @@ func TestStoreRangeSplitStatsWithMerges(t *testing.T) { repl := store.LookupReplica(roachpb.RKey(keyPrefix)) // NOTE that this value is expected to change over time, depending on what // we store in the sys-local keyspace. Update it accordingly for this test. - empty := enginepb.MVCCStats{LastUpdateNanos: manual.UnixNano()} + empty := enginepb.MVCCStats{LastUpdateNanos: start.WallTime} if err := verifyRangeStats(store.Engine(), repl.RangeID, empty); err != nil { t.Fatal(err) } // Write random TimeSeries data. midKey := writeRandomTimeSeriesDataToRange(t, store, repl.RangeID, keyPrefix) - manual.Increment(100) // Split the range at approximate halfway point. args = adminSplitArgs(midKey) @@ -955,19 +1004,18 @@ func TestStoreRangeSplitStatsWithMerges(t *testing.T) { } // Stats should both have the new timestamp. - now := manual.UnixNano() - if lTs := msLeft.LastUpdateNanos; lTs != now { - t.Errorf("expected left range stats to have new timestamp, want %d, got %d", now, lTs) + if lTs := msLeft.LastUpdateNanos; lTs < start.WallTime { + t.Errorf("expected left range stats to have new timestamp, want %d, got %d", start.WallTime, lTs) } - if rTs := msRight.LastUpdateNanos; rTs != now { - t.Errorf("expected right range stats to have new timestamp, want %d, got %d", now, rTs) + if rTs := msRight.LastUpdateNanos; rTs < start.WallTime { + t.Errorf("expected right range stats to have new timestamp, want %d, got %d", start.WallTime, rTs) } // Stats should agree with recomputation. - if err := verifyRecomputedStats(snap, repl.Desc(), msLeft, now); err != nil { + if err := verifyRecomputedStats(snap, repl.Desc(), msLeft, s.Clock().PhysicalNow()); err != nil { t.Fatalf("failed to verify left range's stats after split: %+v", err) } - if err := verifyRecomputedStats(snap, replRight.Desc(), msRight, now); err != nil { + if err := verifyRecomputedStats(snap, replRight.Desc(), msRight, s.Clock().PhysicalNow()); err != nil { t.Fatalf("failed to verify right range's stats after split: %+v", err) } } @@ -1020,12 +1068,20 @@ func fillRange( func TestStoreZoneUpdateAndRangeSplit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - storeCfg := kvserver.TestStoreConfig(nil /* clock */) - storeCfg.TestingKnobs.DisableMergeQueue = true - store := createTestStoreWithConfig(t, stopper, storeCfg) - config.TestingSetupZoneConfigHook(stopper) + + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + config.TestingSetupZoneConfigHook(s.Stopper()) const maxBytes = 1 << 16 // Set max bytes. @@ -1081,12 +1137,20 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) { func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - storeCfg := kvserver.TestStoreConfig(nil /* clock */) - storeCfg.TestingKnobs.DisableMergeQueue = true - store := createTestStoreWithConfig(t, stopper, storeCfg) - config.TestingSetupZoneConfigHook(stopper) + + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + config.TestingSetupZoneConfigHook(s.Stopper()) origRng := store.LookupReplica(roachpb.RKeyMin) @@ -1153,15 +1217,14 @@ func TestStoreRangeSplitBackpressureWrites(t *testing.T) { var activateSplitFilter int32 splitKey := roachpb.RKey(keys.UserTableDataMin) splitPending, blockSplits := make(chan struct{}), make(chan struct{}) - storeCfg := kvserver.TestStoreConfig(nil) + // Set maxBytes to something small so we can exceed the maximum split // size without adding 2x64MB of data. const maxBytes = 1 << 16 - storeCfg.DefaultZoneConfig.RangeMaxBytes = proto.Int64(maxBytes) - storeCfg.TestingKnobs.DisableGCQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.TestingRequestFilter = + zoneConfig := zonepb.DefaultZoneConfig() + zoneConfig.RangeMaxBytes = proto.Int64(maxBytes) + + testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { for _, req := range ba.Requests { if cPut, ok := req.GetInner().(*roachpb.ConditionalPutRequest); ok { @@ -1180,9 +1243,23 @@ func TestStoreRangeSplitBackpressureWrites(t *testing.T) { } ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - store := createTestStoreWithConfig(t, stopper, storeCfg) + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DefaultZoneConfigOverride: &zoneConfig, + }, + Store: &kvserver.StoreTestingKnobs{ + DisableGCQueue: true, + DisableMergeQueue: true, + DisableSplitQueue: true, + TestingRequestFilter: testingRequestFilter, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) // Split at the split key. sArgs := adminSplitArgs(splitKey.AsRawKey()) @@ -1206,7 +1283,7 @@ func TestStoreRangeSplitBackpressureWrites(t *testing.T) { // it gets blocked in the response filter. if tc.splitOngoing { atomic.StoreInt32(&activateSplitFilter, 1) - if err := stopper.RunAsyncTask(ctx, "force split", func(_ context.Context) { + if err := s.Stopper().RunAsyncTask(ctx, "force split", func(_ context.Context) { store.SetSplitQueueActive(true) if err := store.ForceSplitScanAndProcess(); err != nil { log.Fatalf(ctx, "%v", err) @@ -1288,11 +1365,11 @@ func TestStoreRangeSplitBackpressureWrites(t *testing.T) { func TestStoreRangeSystemSplits(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) // Intentionally leave the merge queue enabled. This indirectly tests that the // merge queue respects these split points. - store, _ := createTestStore(t, stopper) + ctx := context.Background() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) userTableMax := keys.MinUserDescID + 4 var exceptions map[int]struct{} @@ -1304,17 +1381,17 @@ func TestStoreRangeSystemSplits(t *testing.T) { // - descriptor IDs are used to determine split keys // - the write triggers a SystemConfig update and gossip // We should end up with splits at each user table prefix. - if err := store.DB().Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + if err := s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { return err } descTablePrefix := keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID) kvs, _ /* splits */ := schema.GetInitialValues() - for _, kv := range kvs { - if !bytes.HasPrefix(kv.Key, descTablePrefix) { + for _, akv := range kvs { + if !bytes.HasPrefix(akv.Key, descTablePrefix) { continue } - if err := txn.Put(ctx, kv.Key, &kv.Value); err != nil { + if err := txn.Put(ctx, akv.Key, &akv.Value); err != nil { return err } } @@ -1363,7 +1440,7 @@ func TestStoreRangeSystemSplits(t *testing.T) { expKeys = append(expKeys, testutils.MakeKey(keys.Meta2Prefix, roachpb.RKeyMax)) testutils.SucceedsSoon(t, func() error { - rows, err := store.DB().Scan(context.Background(), keys.Meta2Prefix, keys.MetaMax, 0) + rows, err := s.DB().Scan(context.Background(), keys.Meta2Prefix, keys.MetaMax, 0) if err != nil { return err } @@ -1383,7 +1460,7 @@ func TestStoreRangeSystemSplits(t *testing.T) { // Write another, disjoint (+3) descriptor for a user table. userTableMax += 3 exceptions = map[int]struct{}{userTableMax - 1: {}, userTableMax - 2: {}} - if err := store.DB().Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + if err := s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { return err } @@ -1903,12 +1980,20 @@ func TestStoreSplitOnRemovedReplica(t *testing.T) { func TestStoreSplitGCThreshold(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) + + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) leftKey := roachpb.Key("a") splitKey := roachpb.Key("b") @@ -1916,11 +2001,11 @@ func TestStoreSplitGCThreshold(t *testing.T) { content := []byte("test") pArgs := putArgs(leftKey, content) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), pArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), pArgs); pErr != nil { t.Fatal(pErr) } pArgs = putArgs(rightKey, content) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), pArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), pArgs); pErr != nil { t.Fatal(pErr) } @@ -1934,12 +2019,12 @@ func TestStoreSplitGCThreshold(t *testing.T) { }, Threshold: specifiedGCThreshold, } - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), gcArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), gcArgs); pErr != nil { t.Fatal(pErr) } args := adminSplitArgs(splitKey) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), args); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), args); pErr != nil { t.Fatal(pErr) } @@ -1950,7 +2035,7 @@ func TestStoreSplitGCThreshold(t *testing.T) { t.Fatalf("expected RHS's GCThreshold is equal to %v, but got %v", specifiedGCThreshold, gcThreshold) } - repl.AssertState(context.Background(), store.Engine()) + repl.AssertState(ctx, store.Engine()) } // TestStoreRangeSplitRaceUninitializedRHS reproduces #7600 (before it was @@ -1961,17 +2046,6 @@ func TestStoreRangeSplitRaceUninitializedRHS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Skipping as part of test-infra-team flaky test cleanup. - skip.WithIssue(t, 50809) - - mtc := &multiTestContext{} - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableMergeQueue = true - // An aggressive tick interval lets groups communicate more and thus - // triggers test failures much more reliably. We can't go too aggressive - // or race tests never make any progress. - storeCfg.RaftTickInterval = 50 * time.Millisecond - storeCfg.RaftElectionTimeoutTicks = 2 currentTrigger := make(chan *roachpb.SplitTrigger, 1) var seen struct { syncutil.Mutex @@ -1979,7 +2053,7 @@ func TestStoreRangeSplitRaceUninitializedRHS(t *testing.T) { } seen.sids = make(map[kvserverbase.CmdIDKey][2]bool) - storeCfg.TestingKnobs.EvalKnobs.TestingEvalFilter = func(args kvserverbase.FilterArgs) *roachpb.Error { + testingEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { et, ok := args.Req.(*roachpb.EndTxnRequest) if !ok || et.InternalCommitTrigger == nil { return nil @@ -2013,16 +2087,37 @@ func TestStoreRangeSplitRaceUninitializedRHS(t *testing.T) { return nil } - mtc.storeConfig = &storeCfg - defer mtc.Stop() - mtc.Start(t, 2) + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: testingEvalFilter, + }, + }, + }, + RaftConfig: base.RaftConfig{ + // An aggressive tick interval lets groups communicate more and thus + // triggers test failures much more reliably. We can't go too aggressive + // or race tests never make any progress. + RaftTickInterval: 100 * time.Millisecond, + RaftElectionTimeoutTicks: 2, + RaftHeartbeatIntervalTicks: 1, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) - leftRange := mtc.stores[0].LookupReplica(roachpb.RKey("a")) + leftRange := store.LookupReplica(roachpb.RKey("a")) // Replicate the left range onto the second node. We don't wait since we // don't actually care what the second node does. All we want is that the // first node isn't surprised by messages from that node. - mtc.replicateRange(leftRange.RangeID, 1) + tc.AddVotersOrFatal(t, leftRange.Desc().StartKey.AsRawKey(), tc.Target(1)) for i := 0; i < 10; i++ { errChan := make(chan *roachpb.Error) @@ -2038,7 +2133,7 @@ func TestStoreRangeSplitRaceUninitializedRHS(t *testing.T) { // range). splitKey := roachpb.Key(encoding.EncodeVarintDescending([]byte("a"), int64(i))) splitArgs := adminSplitArgs(splitKey) - _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], splitArgs) + _, pErr := kv.SendWrapped(context.Background(), tc.Servers[0].DistSender(), splitArgs) errChan <- pErr }() go func() { @@ -2059,7 +2154,7 @@ func TestStoreRangeSplitRaceUninitializedRHS(t *testing.T) { // side in the split trigger was racing with the uninitialized // version for the same group, resulting in clobbered HardState). for term := uint64(1); ; term++ { - if sent := mtc.transport.SendAsync(&kvserver.RaftMessageRequest{ + if sent := tc.Servers[1].RaftTransport().SendAsync(&kvserver.RaftMessageRequest{ RangeID: trigger.RightDesc.RangeID, ToReplica: replicas[0], FromReplica: replicas[1], @@ -2096,47 +2191,55 @@ func TestStoreRangeSplitRaceUninitializedRHS(t *testing.T) { func TestLeaderAfterSplit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - storeConfig := kvserver.TestStoreConfig(nil) - storeConfig.TestingKnobs.DisableReplicateQueue = true - storeConfig.TestingKnobs.DisableMergeQueue = true - storeConfig.RaftElectionTimeoutTicks = 1000000 - mtc := &multiTestContext{ - storeConfig: &storeConfig, - } - defer mtc.Stop() - mtc.Start(t, 3) - mtc.replicateRange(1, 1, 2) + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + RaftConfig: base.RaftConfig{ + RaftElectionTimeoutTicks: 1000000, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) leftKey := roachpb.Key("a") splitKey := roachpb.Key("m") rightKey := roachpb.Key("z") + repl := store.LookupReplica(roachpb.RKey(leftKey)) + require.NotNil(t, repl) + tc.AddVotersOrFatal(t, repl.Desc().StartKey.AsRawKey(), tc.Targets(1, 2)...) + splitArgs := adminSplitArgs(splitKey) - if _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], splitArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), splitArgs); pErr != nil { t.Fatal(pErr) } incArgs := incrementArgs(leftKey, 1) - if _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), incArgs); pErr != nil { t.Fatal(pErr) } incArgs = incrementArgs(rightKey, 2) - if _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), incArgs); pErr != nil { t.Fatal(pErr) } } func BenchmarkStoreRangeSplit(b *testing.B) { - var mtc multiTestContext - mtc.Start(b, 1) - defer mtc.Stop() - store := mtc.Store(0) + ctx := context.Background() + serv, _, _ := serverutils.StartServer(b, base.TestServerArgs{}) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(b, err) // Perform initial split of ranges. sArgs := adminSplitArgs(roachpb.Key("b")) - if _, err := kv.SendWrapped(context.Background(), store.TestSender(), sArgs); err != nil { + if _, err := kv.SendWrapped(ctx, store.TestSender(), sArgs); err != nil { b.Fatal(err) } @@ -2148,7 +2251,7 @@ func BenchmarkStoreRangeSplit(b *testing.B) { // Merge the b range back into the a range. mArgs := adminMergeArgs(roachpb.KeyMin) - if _, err := kv.SendWrapped(context.Background(), store.TestSender(), mArgs); err != nil { + if _, err := kv.SendWrapped(ctx, store.TestSender(), mArgs); err != nil { b.Fatal(err) } @@ -2156,13 +2259,13 @@ func BenchmarkStoreRangeSplit(b *testing.B) { for i := 0; i < b.N; i++ { // Split the range. b.StartTimer() - if _, err := kv.SendWrapped(context.Background(), store.TestSender(), sArgs); err != nil { + if _, err := kv.SendWrapped(ctx, store.TestSender(), sArgs); err != nil { b.Fatal(err) } // Merge the ranges. b.StopTimer() - if _, err := kv.SendWrapped(context.Background(), store.TestSender(), mArgs); err != nil { + if _, err := kv.SendWrapped(ctx, store.TestSender(), mArgs); err != nil { b.Fatal(err) } } @@ -2224,22 +2327,30 @@ func writeRandomTimeSeriesDataToRange( func TestStoreRangeGossipOnSplits(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.GossipWhenCapacityDeltaExceedsFraction = 0.5 // 50% for testing - // We can't properly test how frequently changes in the number of ranges - // trigger the store to gossip its capacities if we have to worry about - // changes in the number of leases also triggering store gossip. - storeCfg.TestingKnobs.DisableLeaseCapacityGossip = true - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - storeCfg.TestingKnobs.DisableScanner = true - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) + + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + DisableScanner: true, + GossipWhenCapacityDeltaExceedsFraction: 0.5, // 50% for testing + // We can't properly test how frequently changes in the number of ranges + // trigger the store to gossip its capacities if we have to worry about + // changes in the number of leases also triggering store gossip. + DisableLeaseCapacityGossip: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) storeKey := gossip.MakeStoreKey(store.StoreID()) // Avoid excessive logging on under-replicated ranges due to our many splits. - config.TestingSetupZoneConfigHook(stopper) + config.TestingSetupZoneConfigHook(s.Stopper()) zoneConfig := zonepb.DefaultZoneConfig() zoneConfig.NumReplicas = proto.Int32(1) config.TestingSetZoneConfig(0, zoneConfig) @@ -2308,16 +2419,24 @@ func TestStoreRangeGossipOnSplits(t *testing.T) { func TestStoreTxnWaitQueueEnabledOnSplit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) + + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) key := keys.UserTableDataMin args := adminSplitArgs(key) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), args); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), args); pErr != nil { t.Fatalf("%q: split unexpected error: %s", key, pErr) } @@ -2332,17 +2451,25 @@ func TestStoreTxnWaitQueueEnabledOnSplit(t *testing.T) { func TestDistributedTxnCleanup(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) + + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) // Split at "a". lhsKey := roachpb.Key("a") args := adminSplitArgs(lhsKey) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), args); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), args); pErr != nil { t.Fatalf("split at %q: %s", lhsKey, pErr) } lhs := store.LookupReplica(roachpb.RKey("a")) @@ -2366,7 +2493,6 @@ func TestDistributedTxnCleanup(t *testing.T) { testutils.RunTrueAndFalse(t, "commit", func(t *testing.T, commit bool) { // Run a distributed transaction involving the lhsKey and rhsKey. var txnKey roachpb.Key - ctx := context.Background() txn := kv.NewTxn(ctx, store.DB(), 0 /* gatewayNodeID */) txnFn := func(ctx context.Context, txn *kv.Txn) error { b := txn.NewBatch() @@ -2526,10 +2652,7 @@ func TestTxnWaitQueueDependencyCycleWithRangeSplit(t *testing.T) { var pushCount int32 firstPush := make(chan struct{}) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - storeCfg.TestingKnobs.EvalKnobs.TestingEvalFilter = + testingEvalFilter := func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { if _, ok := filterArgs.Req.(*roachpb.PushTxnRequest); ok { if atomic.AddInt32(&pushCount, 1) == 1 { @@ -2538,16 +2661,29 @@ func TestTxnWaitQueueDependencyCycleWithRangeSplit(t *testing.T) { } return nil } - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - store := createTestStoreWithConfig(t, stopper, storeCfg) + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: testingEvalFilter, + }, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) lhsKey := roachpb.Key("a") rhsKey := roachpb.Key("b") // Split at "a". args := adminSplitArgs(lhsKey) - if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), args); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), args); pErr != nil { t.Fatalf("split at %q: %s", lhsKey, pErr) } lhs := store.LookupReplica(roachpb.RKey("a")) @@ -2562,7 +2698,7 @@ func TestTxnWaitQueueDependencyCycleWithRangeSplit(t *testing.T) { // Start txn to write key a. txnACh := make(chan error) go func() { - txnACh <- store.DB().Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + txnACh <- store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { if err := txn.Put(ctx, lhsKey, "value"); err != nil { return err } @@ -2579,7 +2715,7 @@ func TestTxnWaitQueueDependencyCycleWithRangeSplit(t *testing.T) { // Start txn to write key b. txnBCh := make(chan error) go func() { - txnBCh <- store.DB().Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + txnBCh <- store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { if err := txn.Put(ctx, rhsKey, "value"); err != nil { return err } @@ -2611,7 +2747,7 @@ func TestTxnWaitQueueDependencyCycleWithRangeSplit(t *testing.T) { // Split at "b". args = adminSplitArgs(rhsKey) - if _, pErr := kv.SendWrappedWith(context.Background(), store, roachpb.Header{ + if _, pErr := kv.SendWrappedWith(ctx, store, roachpb.Header{ RangeID: lhs.RangeID, }, args); pErr != nil { t.Fatalf("split at %q: %s", rhsKey, pErr) @@ -2962,12 +3098,7 @@ func TestRangeLookupAsyncResolveIntent(t *testing.T) { blockPushTxn := make(chan struct{}) defer close(blockPushTxn) - // Disable async tasks in the intent resolver. All tasks will be synchronous. - cfg := kvserver.TestStoreConfig(nil) - cfg.TestingKnobs.IntentResolverKnobs.ForceSyncIntentResolution = true - cfg.TestingKnobs.DisableSplitQueue = true - cfg.TestingKnobs.DisableMergeQueue = true - cfg.TestingKnobs.TestingProposalFilter = + testingProposalFilter := func(args kvserverbase.ProposalFilterArgs) *roachpb.Error { for _, union := range args.Req.Requests { if union.GetInner().Method() == roachpb.PushTxn { @@ -2978,9 +3109,23 @@ func TestRangeLookupAsyncResolveIntent(t *testing.T) { return nil } ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - store := createTestStoreWithConfig(t, stopper, cfg) + srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // Disable async tasks in the intent resolver. All tasks will be synchronous. + IntentResolverKnobs: kvserverbase.IntentResolverTestingKnobs{ + ForceSyncIntentResolution: true, + }, + DisableMergeQueue: true, + DisableSplitQueue: true, + TestingProposalFilter: testingProposalFilter, + }, + }, + }) + s := srv.(*server.TestServer) + defer s.Stopper().Stop(context.Background()) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) // Split range 1 at an arbitrary key so that we're not dealing with the // first range for the rest of this test. The first range is handled @@ -3172,7 +3317,7 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) { }) close(blockPromoteCh) - if err := g.Wait(); !kvserver.IsRetriableReplicationChangeError(err) { + if err := g.Wait(); !testutils.IsError(err, `descriptor changed`) { t.Fatalf(`expected "descriptor changed" error got: %+v`, err) } @@ -3184,8 +3329,8 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) { // has not heard a raft message addressed to a later replica ID while the // "was not found on" error is expected if the store has heard that it has // a newer replica ID before receiving the snapshot. - if !kvserver.IsRetriableReplicationChangeError(err) { - t.Fatal(err) + if !testutils.IsError(err, `snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+`) { + t.Fatalf(`expected snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+" error got: %+v`, err) } } for i := 0; i < 5; i++ { @@ -3265,14 +3410,21 @@ func TestSplitBlocksReadsToRHS(t *testing.T) { return nil } - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - storeCfg.TestingKnobs.TestingProposalFilter = propFilter ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - store := createTestStoreWithConfig(t, stopper, storeCfg) + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + TestingProposalFilter: propFilter, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + repl := store.LookupReplica(roachpb.RKey(keySplit)) tsBefore := store.Clock().Now() diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 1149c3e73c21..e6e0aaaf517e 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -1663,6 +1663,9 @@ func verifyRangeStats( if err != nil { return err } + // When used with a real wall clock these will not be the same, since it + // takes time to load stats. + expMS.AgeTo(ms.LastUpdateNanos) // Clear system counts as these are expected to vary. ms.SysBytes, ms.SysCount, ms.AbortSpanBytes = 0, 0, 0 if ms != expMS { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 547a113df938..b56e44c3b83a 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -735,11 +735,6 @@ type StoreConfig struct { // HistogramWindowInterval is (server.Config).HistogramWindowInterval HistogramWindowInterval time.Duration - // GossipWhenCapacityDeltaExceedsFraction specifies the fraction from the last - // gossiped store capacity values which need be exceeded before the store will - // gossip immediately without waiting for the periodic gossip interval. - GossipWhenCapacityDeltaExceedsFraction float64 - // ExternalStorage creates ExternalStorage objects which allows access to external files ExternalStorage cloud.ExternalStorageFactory ExternalStorageFromURI cloud.ExternalStorageFromURIFactory @@ -791,8 +786,8 @@ func (sc *StoreConfig) SetDefaults() { envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_SNAPSHOT_APPLY_LIMIT", 1) } - if sc.GossipWhenCapacityDeltaExceedsFraction == 0 { - sc.GossipWhenCapacityDeltaExceedsFraction = defaultGossipWhenCapacityDeltaExceedsFraction + if sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction == 0 { + sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction = defaultGossipWhenCapacityDeltaExceedsFraction } } @@ -1919,9 +1914,9 @@ func (s *Store) GossipStore(ctx context.Context, useCached bool) error { // the usual periodic interval. Re-gossip more rapidly for RangeCount // changes because allocators with stale information are much more // likely to make bad decisions. - rangeCountdown := float64(storeDesc.Capacity.RangeCount) * s.cfg.GossipWhenCapacityDeltaExceedsFraction + rangeCountdown := float64(storeDesc.Capacity.RangeCount) * s.cfg.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction atomic.StoreInt32(&s.gossipRangeCountdown, int32(math.Ceil(math.Min(rangeCountdown, 3)))) - leaseCountdown := float64(storeDesc.Capacity.LeaseCount) * s.cfg.GossipWhenCapacityDeltaExceedsFraction + leaseCountdown := float64(storeDesc.Capacity.LeaseCount) * s.cfg.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction atomic.StoreInt32(&s.gossipLeaseCountdown, int32(math.Ceil(math.Max(leaseCountdown, 1)))) syncutil.StoreFloat64(&s.gossipQueriesPerSecondVal, storeDesc.Capacity.QueriesPerSecond) syncutil.StoreFloat64(&s.gossipWritesPerSecondVal, storeDesc.Capacity.WritesPerSecond) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 0cb375ad49de..0aca18c495d1 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -266,6 +266,10 @@ type StoreTestingKnobs struct { // If set, use the given truncated state type when bootstrapping ranges. // This is used for testing the truncated state migration. TruncatedStateTypeOverride *stateloader.TruncatedStateType + // GossipWhenCapacityDeltaExceedsFraction specifies the fraction from the last + // gossiped store capacity values which need be exceeded before the store will + // gossip immediately without waiting for the periodic gossip interval. + GossipWhenCapacityDeltaExceedsFraction float64 } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.