Skip to content

Commit

Permalink
Try #38932:
Browse files Browse the repository at this point in the history
  • Loading branch information
craig[bot] committed Aug 9, 2019
2 parents 7a0070a + b320ff5 commit a7ebc07
Show file tree
Hide file tree
Showing 29 changed files with 1,380 additions and 326 deletions.
29 changes: 26 additions & 3 deletions c-deps/libroach/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -960,17 +960,23 @@ DBStatus DBSstFileWriterDelete(DBSstFileWriter* fw, DBKey key) {
return kSuccess;
}

DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) {
rocksdb::Status status = fw->rep.Finish();
DBStatus DBSstFileWriterDeleteRange(DBSstFileWriter *fw, DBKey start, DBKey end) {
rocksdb::Status status = fw->rep.DeleteRange(EncodeKey(start), EncodeKey(end));
if (!status.ok()) {
return ToDBStatus(status);
}
return kSuccess;
}

DBStatus DBSstFileWriterCopyData(DBSstFileWriter* fw, DBString* data) {
uint64_t file_size;
status = fw->memenv->GetFileSize("sst", &file_size);
rocksdb::Status status = fw->memenv->GetFileSize("sst", &file_size);
if (!status.ok()) {
return ToDBStatus(status);
}
if (file_size == 0) {
return kSuccess;
}

const rocksdb::EnvOptions soptions;
std::unique_ptr<rocksdb::SequentialFile> sst;
Expand Down Expand Up @@ -1008,6 +1014,23 @@ DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) {
return kSuccess;
}

DBStatus DBSstFileWriterTruncate(DBSstFileWriter* fw, DBString* data) {
DBStatus status = DBSstFileWriterCopyData(fw, data);
if (status.data != NULL) {
return status;
}
return ToDBStatus(fw->memenv->Truncate("sst", 0));
}

DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) {
rocksdb::Status status = fw->rep.Finish();
if (!status.ok()) {
return ToDBStatus(status);
}

return DBSstFileWriterCopyData(fw, data);
}

void DBSstFileWriterClose(DBSstFileWriter* fw) { delete fw; }

DBStatus DBLockFile(DBSlice filename, DBFileLock* lock) {
Expand Down
13 changes: 13 additions & 0 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,19 @@ DBStatus DBSstFileWriterAdd(DBSstFileWriter* fw, DBKey key, DBSlice val);
// Adds a deletion tombstone to the sstable being built. See DBSstFileWriterAdd for more.
DBStatus DBSstFileWriterDelete(DBSstFileWriter* fw, DBKey key);

// Adds a range deletion tombstone to the sstable being built. This function
// can be called at any time with respect to DBSstFileWriter{Put,Merge,Delete}
// (I.E. does not have to be greater than any previously added entry). Range
// deletion tombstones do not take precedence over other Puts in the same SST.
// `Open` must have been called. `Close` cannot have been called.
DBStatus DBSstFileWriterDeleteRange(DBSstFileWriter* fw, DBKey start, DBKey end);

// Truncates the writer and stores the constructed file's contents in *data.
// May be called multiple times. The returned data won't necessarily reflect
// the latest writes, only the keys whose underlying RocksDB blocks have been
// flushed. Close cannot have been called.
DBStatus DBSstFileWriterTruncate(DBSstFileWriter *fw, DBString* data);

// Finalizes the writer and stores the constructed file's contents in *data. At
// least one kv entry must have been added. May only be called once.
DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data);
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<tr><td><code>kv.rangefeed.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.snapshot_sst.sync_size</code></td><td>byte size</td><td><code>2.0 MiB</code></td><td>threshold after which snapshot SST writes must fsync</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>262144</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.parallel_commits_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional commits will be parallelized with transactional writes</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func writeSST(
defer sst.Close()
for _, kv := range kvs {
kv.Key.Timestamp = ts
if err := sst.Add(kv); err != nil {
if err := sst.Put(kv.Key, kv.Value); err != nil {
return err
}
}
Expand Down
23 changes: 12 additions & 11 deletions pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,9 @@ func makeSSTs(
return nil
}

var kv engine.MVCCKeyValue
kv.Key.Timestamp.WallTime = walltime
var key engine.MVCCKey
var value []byte
key.Timestamp.WallTime = walltime
// firstKey is always the first key of the span. lastKey, if nil, means the
// current SST hasn't yet filled up. Once the SST has filled up, lastKey is
// set to the key at which to stop adding KVs. We have to do this because
Expand Down Expand Up @@ -422,11 +423,11 @@ func makeSSTs(

writtenKVs++

kv.Key.Key = it.Key()
kv.Value = it.UnsafeValue()
key.Key = it.Key()
value = it.UnsafeValue()

if lastKey != nil {
if kv.Key.Key.Compare(lastKey) >= 0 {
if key.Key.Compare(lastKey) >= 0 {
if err := writeSST(firstKey, lastKey, true); err != nil {
return err
}
Expand All @@ -441,17 +442,17 @@ func makeSSTs(
}
}

if err := sst.Add(kv); err != nil {
return errors.Wrapf(err, errSSTCreationMaybeDuplicateTemplate, kv.Key.Key)
if err := sst.Put(key, value); err != nil {
return errors.Wrapf(err, errSSTCreationMaybeDuplicateTemplate, key.Key)
}
if err := counts.Count(kv.Key.Key); err != nil {
if err := counts.Count(key.Key); err != nil {
return errors.Wrapf(err, "failed to count key")
}

if sst.DataSize > sstMaxSize && lastKey == nil {
// When we would like to split the file, proceed until we aren't in the
// middle of a row. Start by finding the next safe split key.
lastKey, err = keys.EnsureSafeSplitKey(kv.Key.Key)
lastKey, err = keys.EnsureSafeSplitKey(key.Key)
if err != nil {
return err
}
Expand All @@ -463,9 +464,9 @@ func makeSSTs(
// Although we don't need to avoid row splitting here because there aren't any
// more keys to read, we do still want to produce the same kind of lastKey
// argument for the span as in the case above. lastKey <= the most recent
// sst.Add call, but since we call PrefixEnd below, it will be guaranteed
// sst.Put call, but since we call PrefixEnd below, it will be guaranteed
// to be > the most recent added key.
lastKey, err = keys.EnsureSafeSplitKey(kv.Key.Key)
lastKey, err = keys.EnsureSafeSplitKey(key.Key)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func BenchmarkAddSSTable(b *testing.B) {
b.Fatalf("%+v", err)
}
for _, kv := range kvs {
if err := sst.Add(kv); err != nil {
if err := sst.Put(kv.Key, kv.Value); err != nil {
b.Fatalf("%+v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func exportUsingGoIterator(
continue
}

if err := sst.Add(engine.MVCCKeyValue{Key: iter.UnsafeKey(), Value: iter.UnsafeValue()}); err != nil {
if err := sst.Put(iter.UnsafeKey(), iter.UnsafeValue()); err != nil {
return nil, err
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/storageccl/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
key := keys[idx]
value.ClearChecksum()
value.InitChecksum(key)
kv := engine.MVCCKeyValue{Key: engine.MVCCKey{Key: key, Timestamp: ts}, Value: value.RawBytes}
if err := sst.Add(kv); err != nil {
if err := sst.Put(engine.MVCCKey{Key: key, Timestamp: ts}, value.RawBytes); err != nil {
t.Fatalf("%+v", err)
}
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/storage/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func singleKVSSTable(key engine.MVCCKey, value []byte) ([]byte, error) {
return nil, err
}
defer sst.Close()
kv := engine.MVCCKeyValue{Key: key, Value: value}
if err := sst.Add(kv); err != nil {
if err := sst.Put(key, value); err != nil {
return nil, err
}
return sst.Finish()
Expand Down Expand Up @@ -332,7 +331,7 @@ func TestAddSSTableMVCCStats(t *testing.T) {
}
defer sst.Close()
for _, kv := range kvs {
if err := sst.Add(kv); err != nil {
if err := sst.Put(kv.Key, kv.Value); err != nil {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -436,7 +435,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}
defer sst.Close()
for _, kv := range sstKVs {
if err := sst.Add(kv); err != nil {
if err := sst.Put(kv.Key, kv.Value); err != nil {
t.Fatalf("%+v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bulk/sst_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func makeRocksSST(t testing.TB, kvs []engine.MVCCKeyValue) []byte {
defer w.Close()

for i := range kvs {
if err := w.Add(kvs[i]); err != nil {
if err := w.Put(kvs[i].Key, kvs[i].Value); err != nil {
t.Fatal(err)
}
}
Expand Down
110 changes: 110 additions & 0 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math/rand"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
Expand Down Expand Up @@ -3046,10 +3048,105 @@ func (h *unreliableRaftHandler) HandleRaftResponse(
func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

// We will be testing the SSTs written on store2's engine.
var eng engine.Engine
ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
storeCfg.TestingKnobs.BeforeSnapshotSSTIngestion = func(
inSnap storage.IncomingSnapshot,
snapType storage.SnapshotRequest_Type,
sstNames []string,
) error {
// Only verify snapshots of type RAFT and on the range under exercise
// (range 2). Note that the keys of range 2 aren't verified in this
// functions. Unreplicated range-id local keys are not verified because
// there are too many keys and the other replicated keys are verified later
// on in the test. This function verifies that the subsumed replicas have
// been handled properly.
if snapType != storage.SnapshotRequest_RAFT || inSnap.State.Desc.RangeID != roachpb.RangeID(2) {
return nil
}
// The seven SSTs we are expecting to ingest are in the following order:
// 1. Replicated range-id local keys of the range in the snapshot.
// 2. Range-local keys of the range in the snapshot.
// 3. User keys of the range in the snapshot.
// 4. Unreplicated range-id local keys of the range in the snapshot.
// 5. SST to clear range-id local keys of the subsumed replica with
// RangeID 3.
// 6. SST to clear range-id local keys of the subsumed replica with
// RangeID 4.
// 7. SST to clear the user keys of the subsumed replicas.
//
// NOTE: There are no range-local keys in [d, /Max) in the store we're
// sending a snapshot to, so we aren't expecting an SST to clear those
// keys.
if len(sstNames) != 7 {
return errors.Errorf("expected to ingest 7 SSTs, got %d SSTs", len(sstNames))
}

// Only verify the SSTs of the subsumed replicas (the last three SSTs) by
// constructing the expected SST and ensuring that they are byte-by-byte
// equal. This verification ensures that the SSTs have the same tombstones
// and range deletion tombstones.
var expectedSSTs [][]byte
sstNames = sstNames[4:]

// Range-id local range of subsumed replicas.
for _, rangeID := range []roachpb.RangeID{roachpb.RangeID(3), roachpb.RangeID(4)} {
sst, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
}
defer sst.Close()
r := rditer.MakeRangeIDLocalKeyRange(rangeID, false)
if err := sst.ClearRange(r.Start, r.End); err != nil {
return err
}
tombstoneKey := keys.RaftTombstoneKey(rangeID)
tombstoneValue := &roachpb.RaftTombstone{NextReplicaID: math.MaxInt32}
if err := engine.MVCCBlindPutProto(context.TODO(), &sst, nil, tombstoneKey, hlc.Timestamp{}, tombstoneValue, nil); err != nil {
return err
}
expectedSST, err := sst.Finish()
if err != nil {
return err
}
expectedSSTs = append(expectedSSTs, expectedSST)
}

// User key range of subsumed replicas.
sst, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
}
defer sst.Close()
desc := roachpb.RangeDescriptor{
StartKey: roachpb.RKey("d"),
EndKey: roachpb.RKeyMax,
}
r := rditer.MakeUserKeyRange(&desc)
if err := engine.ClearRangeWithHeuristic(eng, &sst, r.Start, r.End); err != nil {
return err
}
expectedSST, err := sst.Finish()
if err != nil {
return err
}
expectedSSTs = append(expectedSSTs, expectedSST)

for i := range sstNames {
actualSST, err := eng.ReadFile(sstNames[i])
if err != nil {
return err
}
if !bytes.Equal(actualSST, expectedSSTs[i]) {
return errors.Errorf("contents of %s were unexpected", sstNames[i])
}
}
return nil
}
mtc := &multiTestContext{
storeConfig: &storeCfg,
// This test was written before the multiTestContext started creating many
Expand All @@ -3060,6 +3157,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
mtc.Start(t, 3)
defer mtc.Stop()
store0, store2 := mtc.Store(0), mtc.Store(2)
eng = store2.Engine()
distSender := mtc.distSenders[0]

// Create three fully-caught-up, adjacent ranges on all three stores.
Expand All @@ -3074,6 +3172,18 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
mtc.waitForValues(key, []int64{1, 1, 1})
}

// Put some keys in [d, /Max) so the subsumed replica of [c, /Max) with range
// ID 4 has tombstones. We will clear uncontained key range of subsumed
// replicas, so when we are receiving a snapshot for [a, d), we expect to
// clear the keys in [d, /Max).
for i := 0; i < 10; i++ {
key := roachpb.Key("d" + strconv.Itoa(i))
if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(key, []int64{1, 1, 1})
}

aRepl0 := store0.LookupReplica(roachpb.RKey("a"))

// Start dropping all Raft traffic to the first range on store1.
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1072,6 +1073,7 @@ func TestFailedSnapshotFillsReservation(t *testing.T) {
RangeSize: 100,
State: storagepb.ReplicaState{Desc: rep.Desc()},
}
header.RaftMessageRequest.Message.Snapshot.Data = uuid.UUID{}.GetBytes()
// Cause this stream to return an error as soon as we ask it for something.
// This injects an error into HandleSnapshotStream when we try to send the
// "snapshot accepted" message.
Expand Down
Loading

0 comments on commit a7ebc07

Please sign in to comment.