Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: build SSTs from KV_BATCH snapshot #38932

Merged
merged 4 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions c-deps/libroach/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -960,17 +960,23 @@ DBStatus DBSstFileWriterDelete(DBSstFileWriter* fw, DBKey key) {
return kSuccess;
}

DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) {
rocksdb::Status status = fw->rep.Finish();
DBStatus DBSstFileWriterDeleteRange(DBSstFileWriter *fw, DBKey start, DBKey end) {
rocksdb::Status status = fw->rep.DeleteRange(EncodeKey(start), EncodeKey(end));
if (!status.ok()) {
return ToDBStatus(status);
}
return kSuccess;
}

DBStatus DBSstFileWriterCopyData(DBSstFileWriter* fw, DBString* data) {
uint64_t file_size;
status = fw->memenv->GetFileSize("sst", &file_size);
rocksdb::Status status = fw->memenv->GetFileSize("sst", &file_size);
if (!status.ok()) {
return ToDBStatus(status);
}
if (file_size == 0) {
return kSuccess;
}

const rocksdb::EnvOptions soptions;
std::unique_ptr<rocksdb::SequentialFile> sst;
Expand Down Expand Up @@ -1008,6 +1014,23 @@ DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) {
return kSuccess;
}

DBStatus DBSstFileWriterTruncate(DBSstFileWriter* fw, DBString* data) {
DBStatus status = DBSstFileWriterCopyData(fw, data);
if (status.data != NULL) {
return status;
}
return ToDBStatus(fw->memenv->Truncate("sst", 0));
}

DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) {
rocksdb::Status status = fw->rep.Finish();
if (!status.ok()) {
return ToDBStatus(status);
}

return DBSstFileWriterCopyData(fw, data);
}

void DBSstFileWriterClose(DBSstFileWriter* fw) { delete fw; }

DBStatus DBLockFile(DBSlice filename, DBFileLock* lock) {
Expand Down
13 changes: 13 additions & 0 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,19 @@ DBStatus DBSstFileWriterAdd(DBSstFileWriter* fw, DBKey key, DBSlice val);
// Adds a deletion tombstone to the sstable being built. See DBSstFileWriterAdd for more.
DBStatus DBSstFileWriterDelete(DBSstFileWriter* fw, DBKey key);

// Adds a range deletion tombstone to the sstable being built. This function
// can be called at any time with respect to DBSstFileWriter{Put,Merge,Delete}
// (I.E. does not have to be greater than any previously added entry). Range
// deletion tombstones do not take precedence over other Puts in the same SST.
// `Open` must have been called. `Close` cannot have been called.
DBStatus DBSstFileWriterDeleteRange(DBSstFileWriter* fw, DBKey start, DBKey end);

// Truncates the writer and stores the constructed file's contents in *data.
// May be called multiple times. The returned data won't necessarily reflect
// the latest writes, only the keys whose underlying RocksDB blocks have been
// flushed. Close cannot have been called.
DBStatus DBSstFileWriterTruncate(DBSstFileWriter *fw, DBString* data);

// Finalizes the writer and stores the constructed file's contents in *data. At
// least one kv entry must have been added. May only be called once.
DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data);
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<tr><td><code>kv.rangefeed.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.snapshot_sst.sync_size</code></td><td>byte size</td><td><code>2.0 MiB</code></td><td>threshold after which snapshot SST writes must fsync</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>262144</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.parallel_commits_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional commits will be parallelized with transactional writes</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func writeSST(
defer sst.Close()
for _, kv := range kvs {
kv.Key.Timestamp = ts
if err := sst.Add(kv); err != nil {
if err := sst.Put(kv.Key, kv.Value); err != nil {
return err
}
}
Expand Down
23 changes: 12 additions & 11 deletions pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,9 @@ func makeSSTs(
return nil
}

var kv engine.MVCCKeyValue
kv.Key.Timestamp.WallTime = walltime
var key engine.MVCCKey
var value []byte
key.Timestamp.WallTime = walltime
// firstKey is always the first key of the span. lastKey, if nil, means the
// current SST hasn't yet filled up. Once the SST has filled up, lastKey is
// set to the key at which to stop adding KVs. We have to do this because
Expand Down Expand Up @@ -422,11 +423,11 @@ func makeSSTs(

writtenKVs++

kv.Key.Key = it.Key()
kv.Value = it.UnsafeValue()
key.Key = it.Key()
value = it.UnsafeValue()

if lastKey != nil {
if kv.Key.Key.Compare(lastKey) >= 0 {
if key.Key.Compare(lastKey) >= 0 {
if err := writeSST(firstKey, lastKey, true); err != nil {
return err
}
Expand All @@ -441,17 +442,17 @@ func makeSSTs(
}
}

if err := sst.Add(kv); err != nil {
return errors.Wrapf(err, errSSTCreationMaybeDuplicateTemplate, kv.Key.Key)
if err := sst.Put(key, value); err != nil {
return errors.Wrapf(err, errSSTCreationMaybeDuplicateTemplate, key.Key)
}
if err := counts.Count(kv.Key.Key); err != nil {
if err := counts.Count(key.Key); err != nil {
return errors.Wrapf(err, "failed to count key")
}

if sst.DataSize > sstMaxSize && lastKey == nil {
// When we would like to split the file, proceed until we aren't in the
// middle of a row. Start by finding the next safe split key.
lastKey, err = keys.EnsureSafeSplitKey(kv.Key.Key)
lastKey, err = keys.EnsureSafeSplitKey(key.Key)
if err != nil {
return err
}
Expand All @@ -463,9 +464,9 @@ func makeSSTs(
// Although we don't need to avoid row splitting here because there aren't any
// more keys to read, we do still want to produce the same kind of lastKey
// argument for the span as in the case above. lastKey <= the most recent
// sst.Add call, but since we call PrefixEnd below, it will be guaranteed
// sst.Put call, but since we call PrefixEnd below, it will be guaranteed
// to be > the most recent added key.
lastKey, err = keys.EnsureSafeSplitKey(kv.Key.Key)
lastKey, err = keys.EnsureSafeSplitKey(key.Key)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func BenchmarkAddSSTable(b *testing.B) {
b.Fatalf("%+v", err)
}
for _, kv := range kvs {
if err := sst.Add(kv); err != nil {
if err := sst.Put(kv.Key, kv.Value); err != nil {
b.Fatalf("%+v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func exportUsingGoIterator(
continue
}

if err := sst.Add(engine.MVCCKeyValue{Key: iter.UnsafeKey(), Value: iter.UnsafeValue()}); err != nil {
if err := sst.Put(iter.UnsafeKey(), iter.UnsafeValue()); err != nil {
return nil, err
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/storageccl/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
key := keys[idx]
value.ClearChecksum()
value.InitChecksum(key)
kv := engine.MVCCKeyValue{Key: engine.MVCCKey{Key: key, Timestamp: ts}, Value: value.RawBytes}
if err := sst.Add(kv); err != nil {
if err := sst.Put(engine.MVCCKey{Key: key, Timestamp: ts}, value.RawBytes); err != nil {
t.Fatalf("%+v", err)
}
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/storage/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func singleKVSSTable(key engine.MVCCKey, value []byte) ([]byte, error) {
return nil, err
}
defer sst.Close()
kv := engine.MVCCKeyValue{Key: key, Value: value}
if err := sst.Add(kv); err != nil {
if err := sst.Put(key, value); err != nil {
return nil, err
}
return sst.Finish()
Expand Down Expand Up @@ -332,7 +331,7 @@ func TestAddSSTableMVCCStats(t *testing.T) {
}
defer sst.Close()
for _, kv := range kvs {
if err := sst.Add(kv); err != nil {
if err := sst.Put(kv.Key, kv.Value); err != nil {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -436,7 +435,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}
defer sst.Close()
for _, kv := range sstKVs {
if err := sst.Add(kv); err != nil {
if err := sst.Put(kv.Key, kv.Value); err != nil {
t.Fatalf("%+v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bulk/sst_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func makeRocksSST(t testing.TB, kvs []engine.MVCCKeyValue) []byte {
defer w.Close()

for i := range kvs {
if err := w.Add(kvs[i]); err != nil {
if err := w.Put(kvs[i].Key, kvs[i].Value); err != nil {
t.Fatal(err)
}
}
Expand Down
110 changes: 110 additions & 0 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math/rand"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
Expand Down Expand Up @@ -3046,10 +3048,105 @@ func (h *unreliableRaftHandler) HandleRaftResponse(
func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

// We will be testing the SSTs written on store2's engine.
var eng engine.Engine
ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
storeCfg.TestingKnobs.BeforeSnapshotSSTIngestion = func(
inSnap storage.IncomingSnapshot,
snapType storage.SnapshotRequest_Type,
sstNames []string,
) error {
// Only verify snapshots of type RAFT and on the range under exercise
// (range 2). Note that the keys of range 2 aren't verified in this
// functions. Unreplicated range-id local keys are not verified because
// there are too many keys and the other replicated keys are verified later
// on in the test. This function verifies that the subsumed replicas have
// been handled properly.
if snapType != storage.SnapshotRequest_RAFT || inSnap.State.Desc.RangeID != roachpb.RangeID(2) {
return nil
}
// The seven SSTs we are expecting to ingest are in the following order:
// 1. Replicated range-id local keys of the range in the snapshot.
// 2. Range-local keys of the range in the snapshot.
// 3. User keys of the range in the snapshot.
// 4. Unreplicated range-id local keys of the range in the snapshot.
// 5. SST to clear range-id local keys of the subsumed replica with
// RangeID 3.
// 6. SST to clear range-id local keys of the subsumed replica with
// RangeID 4.
// 7. SST to clear the user keys of the subsumed replicas.
//
// NOTE: There are no range-local keys in [d, /Max) in the store we're
// sending a snapshot to, so we aren't expecting an SST to clear those
// keys.
if len(sstNames) != 7 {
return errors.Errorf("expected to ingest 7 SSTs, got %d SSTs", len(sstNames))
}

// Only verify the SSTs of the subsumed replicas (the last three SSTs) by
// constructing the expected SST and ensuring that they are byte-by-byte
// equal. This verification ensures that the SSTs have the same tombstones
// and range deletion tombstones.
var expectedSSTs [][]byte
sstNames = sstNames[4:]

// Range-id local range of subsumed replicas.
for _, rangeID := range []roachpb.RangeID{roachpb.RangeID(3), roachpb.RangeID(4)} {
sst, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
}
defer sst.Close()
r := rditer.MakeRangeIDLocalKeyRange(rangeID, false)
if err := sst.ClearRange(r.Start, r.End); err != nil {
return err
}
tombstoneKey := keys.RaftTombstoneKey(rangeID)
tombstoneValue := &roachpb.RaftTombstone{NextReplicaID: math.MaxInt32}
if err := engine.MVCCBlindPutProto(context.TODO(), &sst, nil, tombstoneKey, hlc.Timestamp{}, tombstoneValue, nil); err != nil {
return err
}
expectedSST, err := sst.Finish()
if err != nil {
return err
}
expectedSSTs = append(expectedSSTs, expectedSST)
}

// User key range of subsumed replicas.
sst, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
}
defer sst.Close()
desc := roachpb.RangeDescriptor{
StartKey: roachpb.RKey("d"),
EndKey: roachpb.RKeyMax,
}
r := rditer.MakeUserKeyRange(&desc)
if err := engine.ClearRangeWithHeuristic(eng, &sst, r.Start, r.End); err != nil {
return err
}
expectedSST, err := sst.Finish()
if err != nil {
return err
}
expectedSSTs = append(expectedSSTs, expectedSST)

for i := range sstNames {
actualSST, err := eng.ReadFile(sstNames[i])
if err != nil {
return err
}
if !bytes.Equal(actualSST, expectedSSTs[i]) {
return errors.Errorf("contents of %s were unexpected", sstNames[i])
}
}
return nil
}
mtc := &multiTestContext{
storeConfig: &storeCfg,
// This test was written before the multiTestContext started creating many
Expand All @@ -3060,6 +3157,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
mtc.Start(t, 3)
defer mtc.Stop()
store0, store2 := mtc.Store(0), mtc.Store(2)
eng = store2.Engine()
distSender := mtc.distSenders[0]

// Create three fully-caught-up, adjacent ranges on all three stores.
Expand All @@ -3074,6 +3172,18 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
mtc.waitForValues(key, []int64{1, 1, 1})
}

// Put some keys in [d, /Max) so the subsumed replica of [c, /Max) with range
// ID 4 has tombstones. We will clear uncontained key range of subsumed
// replicas, so when we are receiving a snapshot for [a, d), we expect to
// clear the keys in [d, /Max).
for i := 0; i < 10; i++ {
key := roachpb.Key("d" + strconv.Itoa(i))
if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(key, []int64{1, 1, 1})
}

aRepl0 := store0.LookupReplica(roachpb.RKey("a"))

// Start dropping all Raft traffic to the first range on store1.
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1072,6 +1073,7 @@ func TestFailedSnapshotFillsReservation(t *testing.T) {
RangeSize: 100,
State: storagepb.ReplicaState{Desc: rep.Desc()},
}
header.RaftMessageRequest.Message.Snapshot.Data = uuid.UUID{}.GetBytes()
// Cause this stream to return an error as soon as we ask it for something.
// This injects an error into HandleSnapshotStream when we try to send the
// "snapshot accepted" message.
Expand Down
Loading