diff --git a/c-deps/libroach/db.cc b/c-deps/libroach/db.cc index ee30b419cd7c..88c4425e2a51 100644 --- a/c-deps/libroach/db.cc +++ b/c-deps/libroach/db.cc @@ -960,17 +960,23 @@ DBStatus DBSstFileWriterDelete(DBSstFileWriter* fw, DBKey key) { return kSuccess; } -DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) { - rocksdb::Status status = fw->rep.Finish(); +DBStatus DBSstFileWriterDeleteRange(DBSstFileWriter *fw, DBKey start, DBKey end) { + rocksdb::Status status = fw->rep.DeleteRange(EncodeKey(start), EncodeKey(end)); if (!status.ok()) { return ToDBStatus(status); } + return kSuccess; +} +DBStatus DBSstFileWriterCopyData(DBSstFileWriter* fw, DBString* data) { uint64_t file_size; - status = fw->memenv->GetFileSize("sst", &file_size); + rocksdb::Status status = fw->memenv->GetFileSize("sst", &file_size); if (!status.ok()) { return ToDBStatus(status); } + if (file_size == 0) { + return kSuccess; + } const rocksdb::EnvOptions soptions; std::unique_ptr sst; @@ -1008,6 +1014,23 @@ DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) { return kSuccess; } +DBStatus DBSstFileWriterTruncate(DBSstFileWriter* fw, DBString* data) { + DBStatus status = DBSstFileWriterCopyData(fw, data); + if (status.data != NULL) { + return status; + } + return ToDBStatus(fw->memenv->Truncate("sst", 0)); +} + +DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) { + rocksdb::Status status = fw->rep.Finish(); + if (!status.ok()) { + return ToDBStatus(status); + } + + return DBSstFileWriterCopyData(fw, data); +} + void DBSstFileWriterClose(DBSstFileWriter* fw) { delete fw; } DBStatus DBLockFile(DBSlice filename, DBFileLock* lock) { diff --git a/c-deps/libroach/include/libroach.h b/c-deps/libroach/include/libroach.h index c366a3acdf48..2550588ed6b1 100644 --- a/c-deps/libroach/include/libroach.h +++ b/c-deps/libroach/include/libroach.h @@ -472,6 +472,19 @@ DBStatus DBSstFileWriterAdd(DBSstFileWriter* fw, DBKey key, DBSlice val); // Adds a deletion tombstone to the sstable being built. See DBSstFileWriterAdd for more. DBStatus DBSstFileWriterDelete(DBSstFileWriter* fw, DBKey key); +// Adds a range deletion tombstone to the sstable being built. This function +// can be called at any time with respect to DBSstFileWriter{Put,Merge,Delete} +// (I.E. does not have to be greater than any previously added entry). Range +// deletion tombstones do not take precedence over other Puts in the same SST. +// `Open` must have been called. `Close` cannot have been called. +DBStatus DBSstFileWriterDeleteRange(DBSstFileWriter* fw, DBKey start, DBKey end); + +// Truncates the writer and stores the constructed file's contents in *data. +// May be called multiple times. The returned data won't necessarily reflect +// the latest writes, only the keys whose underlying RocksDB blocks have been +// flushed. Close cannot have been called. +DBStatus DBSstFileWriterTruncate(DBSstFileWriter *fw, DBString* data); + // Finalizes the writer and stores the constructed file's contents in *data. At // least one kv entry must have been added. May only be called once. DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data); diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 5cc133424d0a..7018b07a4586 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -51,6 +51,7 @@ kv.rangefeed.enabledbooleanfalseif set, rangefeed registration is enabled kv.snapshot_rebalance.max_ratebyte size8.0 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots kv.snapshot_recovery.max_ratebyte size8.0 MiBthe rate limit (bytes/sec) to use for recovery snapshots +kv.snapshot_sst.sync_sizebyte size2.0 MiBthreshold after which snapshot SST writes must fsync kv.transaction.max_intents_bytesinteger262144maximum number of bytes used to track write intents in transactions kv.transaction.max_refresh_spans_bytesinteger256000maximum number of bytes used to track refresh spans in serializable transactions kv.transaction.parallel_commits_enabledbooleantrueif enabled, transactional commits will be parallelized with transactional writes diff --git a/pkg/ccl/importccl/load.go b/pkg/ccl/importccl/load.go index c03717fef5f5..6ab6019c5a46 100644 --- a/pkg/ccl/importccl/load.go +++ b/pkg/ccl/importccl/load.go @@ -352,7 +352,7 @@ func writeSST( defer sst.Close() for _, kv := range kvs { kv.Key.Timestamp = ts - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { return err } } diff --git a/pkg/ccl/importccl/sst_writer_proc.go b/pkg/ccl/importccl/sst_writer_proc.go index 225b0a31d1b7..ce27d98563a1 100644 --- a/pkg/ccl/importccl/sst_writer_proc.go +++ b/pkg/ccl/importccl/sst_writer_proc.go @@ -386,8 +386,9 @@ func makeSSTs( return nil } - var kv engine.MVCCKeyValue - kv.Key.Timestamp.WallTime = walltime + var key engine.MVCCKey + var value []byte + key.Timestamp.WallTime = walltime // firstKey is always the first key of the span. lastKey, if nil, means the // current SST hasn't yet filled up. Once the SST has filled up, lastKey is // set to the key at which to stop adding KVs. We have to do this because @@ -422,11 +423,11 @@ func makeSSTs( writtenKVs++ - kv.Key.Key = it.Key() - kv.Value = it.UnsafeValue() + key.Key = it.Key() + value = it.UnsafeValue() if lastKey != nil { - if kv.Key.Key.Compare(lastKey) >= 0 { + if key.Key.Compare(lastKey) >= 0 { if err := writeSST(firstKey, lastKey, true); err != nil { return err } @@ -441,17 +442,17 @@ func makeSSTs( } } - if err := sst.Add(kv); err != nil { - return errors.Wrapf(err, errSSTCreationMaybeDuplicateTemplate, kv.Key.Key) + if err := sst.Put(key, value); err != nil { + return errors.Wrapf(err, errSSTCreationMaybeDuplicateTemplate, key.Key) } - if err := counts.Count(kv.Key.Key); err != nil { + if err := counts.Count(key.Key); err != nil { return errors.Wrapf(err, "failed to count key") } if sst.DataSize > sstMaxSize && lastKey == nil { // When we would like to split the file, proceed until we aren't in the // middle of a row. Start by finding the next safe split key. - lastKey, err = keys.EnsureSafeSplitKey(kv.Key.Key) + lastKey, err = keys.EnsureSafeSplitKey(key.Key) if err != nil { return err } @@ -463,9 +464,9 @@ func makeSSTs( // Although we don't need to avoid row splitting here because there aren't any // more keys to read, we do still want to produce the same kind of lastKey // argument for the span as in the case above. lastKey <= the most recent - // sst.Add call, but since we call PrefixEnd below, it will be guaranteed + // sst.Put call, but since we call PrefixEnd below, it will be guaranteed // to be > the most recent added key. - lastKey, err = keys.EnsureSafeSplitKey(kv.Key.Key) + lastKey, err = keys.EnsureSafeSplitKey(key.Key) if err != nil { return err } diff --git a/pkg/ccl/storageccl/bench_test.go b/pkg/ccl/storageccl/bench_test.go index faf631c9ecf2..0d0bdf28455e 100644 --- a/pkg/ccl/storageccl/bench_test.go +++ b/pkg/ccl/storageccl/bench_test.go @@ -65,7 +65,7 @@ func BenchmarkAddSSTable(b *testing.B) { b.Fatalf("%+v", err) } for _, kv := range kvs { - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { b.Fatalf("%+v", err) } } diff --git a/pkg/ccl/storageccl/export_test.go b/pkg/ccl/storageccl/export_test.go index a86950d6a5c2..b080c5b777fa 100644 --- a/pkg/ccl/storageccl/export_test.go +++ b/pkg/ccl/storageccl/export_test.go @@ -248,7 +248,7 @@ func exportUsingGoIterator( continue } - if err := sst.Add(engine.MVCCKeyValue{Key: iter.UnsafeKey(), Value: iter.UnsafeValue()}); err != nil { + if err := sst.Put(iter.UnsafeKey(), iter.UnsafeValue()); err != nil { return nil, err } } diff --git a/pkg/ccl/storageccl/import_test.go b/pkg/ccl/storageccl/import_test.go index 19f324267714..48114f128834 100644 --- a/pkg/ccl/storageccl/import_test.go +++ b/pkg/ccl/storageccl/import_test.go @@ -180,8 +180,7 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) { key := keys[idx] value.ClearChecksum() value.InitChecksum(key) - kv := engine.MVCCKeyValue{Key: engine.MVCCKey{Key: key, Timestamp: ts}, Value: value.RawBytes} - if err := sst.Add(kv); err != nil { + if err := sst.Put(engine.MVCCKey{Key: key, Timestamp: ts}, value.RawBytes); err != nil { t.Fatalf("%+v", err) } } diff --git a/pkg/storage/batcheval/cmd_add_sstable_test.go b/pkg/storage/batcheval/cmd_add_sstable_test.go index 3391d167a5cc..58ea5ad13253 100644 --- a/pkg/storage/batcheval/cmd_add_sstable_test.go +++ b/pkg/storage/batcheval/cmd_add_sstable_test.go @@ -41,8 +41,7 @@ func singleKVSSTable(key engine.MVCCKey, value []byte) ([]byte, error) { return nil, err } defer sst.Close() - kv := engine.MVCCKeyValue{Key: key, Value: value} - if err := sst.Add(kv); err != nil { + if err := sst.Put(key, value); err != nil { return nil, err } return sst.Finish() @@ -332,7 +331,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { } defer sst.Close() for _, kv := range kvs { - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { t.Fatalf("%+v", err) } } @@ -436,7 +435,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) { } defer sst.Close() for _, kv := range sstKVs { - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { t.Fatalf("%+v", err) } } diff --git a/pkg/storage/bulk/sst_writer_test.go b/pkg/storage/bulk/sst_writer_test.go index 2516d7de656f..e006de2d371f 100644 --- a/pkg/storage/bulk/sst_writer_test.go +++ b/pkg/storage/bulk/sst_writer_test.go @@ -56,7 +56,7 @@ func makeRocksSST(t testing.TB, kvs []engine.MVCCKeyValue) []byte { defer w.Close() for i := range kvs { - if err := w.Add(kvs[i]); err != nil { + if err := w.Put(kvs[i].Key, kvs[i].Value); err != nil { t.Fatal(err) } } diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index dff83d9866a0..98ddd75e011e 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -18,6 +18,7 @@ import ( "math/rand" "reflect" "regexp" + "strconv" "strings" "sync" "sync/atomic" @@ -37,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" @@ -3046,10 +3048,105 @@ func (h *unreliableRaftHandler) HandleRaftResponse( func TestStoreRangeMergeRaftSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() + // We will be testing the SSTs written on store2's engine. + var eng engine.Engine ctx := context.Background() storeCfg := storage.TestStoreConfig(nil) storeCfg.TestingKnobs.DisableReplicateQueue = true storeCfg.TestingKnobs.DisableReplicaGCQueue = true + storeCfg.TestingKnobs.BeforeSnapshotSSTIngestion = func( + inSnap storage.IncomingSnapshot, + snapType storage.SnapshotRequest_Type, + sstNames []string, + ) error { + // Only verify snapshots of type RAFT and on the range under exercise + // (range 2). Note that the keys of range 2 aren't verified in this + // functions. Unreplicated range-id local keys are not verified because + // there are too many keys and the other replicated keys are verified later + // on in the test. This function verifies that the subsumed replicas have + // been handled properly. + if snapType != storage.SnapshotRequest_RAFT || inSnap.State.Desc.RangeID != roachpb.RangeID(2) { + return nil + } + // The seven SSTs we are expecting to ingest are in the following order: + // 1. Replicated range-id local keys of the range in the snapshot. + // 2. Range-local keys of the range in the snapshot. + // 3. User keys of the range in the snapshot. + // 4. Unreplicated range-id local keys of the range in the snapshot. + // 5. SST to clear range-id local keys of the subsumed replica with + // RangeID 3. + // 6. SST to clear range-id local keys of the subsumed replica with + // RangeID 4. + // 7. SST to clear the user keys of the subsumed replicas. + // + // NOTE: There are no range-local keys in [d, /Max) in the store we're + // sending a snapshot to, so we aren't expecting an SST to clear those + // keys. + if len(sstNames) != 7 { + return errors.Errorf("expected to ingest 7 SSTs, got %d SSTs", len(sstNames)) + } + + // Only verify the SSTs of the subsumed replicas (the last three SSTs) by + // constructing the expected SST and ensuring that they are byte-by-byte + // equal. This verification ensures that the SSTs have the same tombstones + // and range deletion tombstones. + var expectedSSTs [][]byte + sstNames = sstNames[4:] + + // Range-id local range of subsumed replicas. + for _, rangeID := range []roachpb.RangeID{roachpb.RangeID(3), roachpb.RangeID(4)} { + sst, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + defer sst.Close() + r := rditer.MakeRangeIDLocalKeyRange(rangeID, false) + if err := sst.ClearRange(r.Start, r.End); err != nil { + return err + } + tombstoneKey := keys.RaftTombstoneKey(rangeID) + tombstoneValue := &roachpb.RaftTombstone{NextReplicaID: math.MaxInt32} + if err := engine.MVCCBlindPutProto(context.TODO(), &sst, nil, tombstoneKey, hlc.Timestamp{}, tombstoneValue, nil); err != nil { + return err + } + expectedSST, err := sst.Finish() + if err != nil { + return err + } + expectedSSTs = append(expectedSSTs, expectedSST) + } + + // User key range of subsumed replicas. + sst, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + defer sst.Close() + desc := roachpb.RangeDescriptor{ + StartKey: roachpb.RKey("d"), + EndKey: roachpb.RKeyMax, + } + r := rditer.MakeUserKeyRange(&desc) + if err := engine.ClearRangeWithHeuristic(eng, &sst, r.Start, r.End); err != nil { + return err + } + expectedSST, err := sst.Finish() + if err != nil { + return err + } + expectedSSTs = append(expectedSSTs, expectedSST) + + for i := range sstNames { + actualSST, err := eng.ReadFile(sstNames[i]) + if err != nil { + return err + } + if !bytes.Equal(actualSST, expectedSSTs[i]) { + return errors.Errorf("contents of %s were unexpected", sstNames[i]) + } + } + return nil + } mtc := &multiTestContext{ storeConfig: &storeCfg, // This test was written before the multiTestContext started creating many @@ -3060,6 +3157,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { mtc.Start(t, 3) defer mtc.Stop() store0, store2 := mtc.Store(0), mtc.Store(2) + eng = store2.Engine() distSender := mtc.distSenders[0] // Create three fully-caught-up, adjacent ranges on all three stores. @@ -3074,6 +3172,18 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { mtc.waitForValues(key, []int64{1, 1, 1}) } + // Put some keys in [d, /Max) so the subsumed replica of [c, /Max) with range + // ID 4 has tombstones. We will clear uncontained key range of subsumed + // replicas, so when we are receiving a snapshot for [a, d), we expect to + // clear the keys in [d, /Max). + for i := 0; i < 10; i++ { + key := roachpb.Key("d" + strconv.Itoa(i)) + if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil { + t.Fatal(pErr) + } + mtc.waitForValues(key, []int64{1, 1, 1}) + } + aRepl0 := store0.LookupReplica(roachpb.RKey("a")) // Start dropping all Raft traffic to the first range on store1. diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index e8a7b42816d8..15f487ca7fbb 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -45,6 +45,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1072,6 +1073,7 @@ func TestFailedSnapshotFillsReservation(t *testing.T) { RangeSize: 100, State: storagepb.ReplicaState{Desc: rep.Desc()}, } + header.RaftMessageRequest.Message.Snapshot.Data = uuid.UUID{}.GetBytes() // Cause this stream to return an error as soon as we ask it for something. // This injects an error into HandleSnapshotStream when we try to send the // "snapshot accepted" message. diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index 0ed91a63c105..a8addb28cea1 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -504,3 +504,53 @@ func WriteSyncNoop(ctx context.Context, eng Engine) error { } return nil } + +// ClearRangeWithHeuristic clears the keys from start (inclusive) to end +// (exclusive). Depending on the number of keys, it will either use ClearRange +// or ClearRangeIter. +func ClearRangeWithHeuristic(eng Reader, writer Writer, start, end MVCCKey) error { + iter := eng.NewIterator(IterOptions{UpperBound: end.Key}) + defer iter.Close() + + // It is expensive for there to be many range deletion tombstones in the same + // sstable because all of the tombstones in an sstable are loaded whenever the + // sstable is accessed. So we avoid using range deletion unless there is some + // minimum number of keys. The value here was pulled out of thin air. It might + // be better to make this dependent on the size of the data being deleted. Or + // perhaps we should fix RocksDB to handle large numbers of tombstones in an + // sstable better. + const clearRangeMinKeys = 64 + // Peek into the range to see whether it's large enough to justify + // ClearRange. Note that the work done here is bounded by + // clearRangeMinKeys, so it will be fairly cheap even for large + // ranges. + // + // TODO(bdarnell): Move this into ClearIterRange so we don't have + // to do this scan twice. + count := 0 + iter.Seek(start) + for { + valid, err := iter.Valid() + if err != nil { + return err + } + if !valid || !iter.Key().Less(end) { + break + } + count++ + if count > clearRangeMinKeys { + break + } + iter.Next() + } + var err error + if count > clearRangeMinKeys { + err = writer.ClearRange(start, end) + } else { + err = writer.ClearIterRange(iter, start, end) + } + if err != nil { + return err + } + return nil +} diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index 937375f81298..8d401ad0d053 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -621,6 +621,26 @@ func MVCCPutProto( return MVCCPut(ctx, engine, ms, key, timestamp, value, txn) } +// MVCCBlindPutProto sets the given key to the protobuf-serialized byte string +// of msg and the provided timestamp. See MVCCBlindPut for a discussion on this +// fast-path and when it is appropriate to use. +func MVCCBlindPutProto( + ctx context.Context, + engine Writer, + ms *enginepb.MVCCStats, + key roachpb.Key, + timestamp hlc.Timestamp, + msg protoutil.Message, + txn *roachpb.Transaction, +) error { + value := roachpb.Value{} + if err := value.SetProto(msg); err != nil { + return err + } + value.InitChecksum(key) + return MVCCBlindPut(ctx, engine, ms, key, timestamp, value, txn) +} + type getBuffer struct { meta enginepb.MVCCMetadata value roachpb.Value diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index 3b8aa521903f..d2cd066d2e9e 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -2923,13 +2923,15 @@ func CheckForKeyCollisions(existingIter Iterator, sstIter Iterator) error { } // RocksDBSstFileWriter creates a file suitable for importing with -// RocksDBSstFileReader. +// RocksDBSstFileReader. It implements the Writer interface. type RocksDBSstFileWriter struct { fw *C.DBSstFileWriter // DataSize tracks the total key and value bytes added so far. DataSize int64 } +var _ Writer = &RocksDBSstFileWriter{} + // MakeRocksDBSstFileWriter creates a new RocksDBSstFileWriter with the default // configuration. func MakeRocksDBSstFileWriter() (RocksDBSstFileWriter, error) { @@ -2938,28 +2940,106 @@ func MakeRocksDBSstFileWriter() (RocksDBSstFileWriter, error) { return RocksDBSstFileWriter{fw: fw}, err } -// Add puts a kv entry into the sstable being built. An error is returned if it -// is not greater than any previously added entry (according to the comparator -// configured during writer creation). `Close` cannot have been called. -func (fw *RocksDBSstFileWriter) Add(kv MVCCKeyValue) error { +// ApplyBatchRepr implements the Writer interface. +func (fw *RocksDBSstFileWriter) ApplyBatchRepr(repr []byte, sync bool) error { + panic("unimplemented") +} + +// Clear implements the Writer interface. Note that it inserts a tombstone +// rather than actually remove the entry from the storage engine. An error is +// returned if it is not greater than any previous key used in Put or Clear +// (according to the comparator configured during writer creation). Close +// cannot have been called. +func (fw *RocksDBSstFileWriter) Clear(key MVCCKey) error { + if fw.fw == nil { + return errors.New("cannot call Clear on a closed writer") + } + fw.DataSize += int64(len(key.Key)) + return statusToError(C.DBSstFileWriterDelete(fw.fw, goToCKey(key))) +} + +// SingleClear implements the Writer interface. +func (fw *RocksDBSstFileWriter) SingleClear(key MVCCKey) error { + panic("unimplemented") +} + +// ClearRange implements the Writer interface. Note that it inserts a range deletion +// tombstone rather than actually remove the entries from the storage engine. +// It can be called at any time with respect to Put and Clear. +func (fw *RocksDBSstFileWriter) ClearRange(start, end MVCCKey) error { if fw.fw == nil { - return errors.New("cannot call Open on a closed writer") + return errors.New("cannot call ClearRange on a closed writer") + } + fw.DataSize += int64(len(start.Key)) + int64(len(end.Key)) + return statusToError(C.DBSstFileWriterDeleteRange(fw.fw, goToCKey(start), goToCKey(end))) +} + +// ClearIterRange implements the Writer interface. +// +// NOTE: This method is fairly expensive as it performs a Cgo call for every +// key deleted. +func (fw *RocksDBSstFileWriter) ClearIterRange(iter Iterator, start, end MVCCKey) error { + if fw.fw == nil { + return errors.New("cannot call ClearIterRange on a closed writer") + } + iter.Seek(start) + for { + valid, err := iter.Valid() + if err != nil { + return err + } + if !valid || !iter.Key().Less(end) { + break + } + if err := fw.Clear(iter.Key()); err != nil { + return err + } + iter.Next() } - fw.DataSize += int64(len(kv.Key.Key)) + int64(len(kv.Value)) - return statusToError(C.DBSstFileWriterAdd(fw.fw, goToCKey(kv.Key), goToCSlice(kv.Value))) + return nil } -// Delete puts a deletion tombstone into the sstable being built. See -// the Add method for more. -func (fw *RocksDBSstFileWriter) Delete(k MVCCKey) error { +// Merge implements the Writer interface. +func (fw *RocksDBSstFileWriter) Merge(key MVCCKey, value []byte) error { + panic("unimplemented") +} + +// Put implements the Writer interface. It puts a kv entry into the sstable +// being built. An error is returned if it is not greater than any previous key +// used in Put or Clear (according to the comparator configured during writer +// creation). Close cannot have been called. +func (fw *RocksDBSstFileWriter) Put(key MVCCKey, value []byte) error { if fw.fw == nil { - return errors.New("cannot call Delete on a closed writer") + return errors.New("cannot call Put on a closed writer") } - fw.DataSize += int64(len(k.Key)) - return statusToError(C.DBSstFileWriterDelete(fw.fw, goToCKey(k))) + fw.DataSize += int64(len(key.Key)) + int64(len(value)) + return statusToError(C.DBSstFileWriterAdd(fw.fw, goToCKey(key), goToCSlice(value))) +} + +// LogData implements the Writer interface. +func (fw *RocksDBSstFileWriter) LogData(data []byte) error { + panic("unimplemented") } -var _ = (*RocksDBSstFileWriter).Delete +// LogLogicalOp implements the Writer interface. +func (fw *RocksDBSstFileWriter) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails) { + // No-op. Logical logging disabled. +} + +// Truncate truncates the writer's current memory buffer and returns the +// contents it contained. May be called multiple times. The function may not +// truncate and return all keys if the underlying RocksDB blocks have not been +// flushed. Close cannot have been called. +func (fw *RocksDBSstFileWriter) Truncate() ([]byte, error) { + if fw.fw == nil { + return nil, errors.New("cannot call Truncate on a closed writer") + } + var contents C.DBString + if err := statusToError(C.DBSstFileWriterTruncate(fw.fw, &contents)); err != nil { + return nil, err + } + return cStringToGoBytes(contents), nil +} // Finish finalizes the writer and returns the constructed file's contents. At // least one kv entry must have been added. diff --git a/pkg/storage/engine/rocksdb_test.go b/pkg/storage/engine/rocksdb_test.go index 432814fd2b51..9b8d122f9957 100644 --- a/pkg/storage/engine/rocksdb_test.go +++ b/pkg/storage/engine/rocksdb_test.go @@ -715,6 +715,91 @@ func TestConcurrentBatch(t *testing.T) { } } +// TestRocksDBSstFileWriterTruncate ensures that sum of the chunks created by +// calling Truncate on a RocksDBSstFileWriter is equivalent to an SST built +// without ever calling Truncate. +func TestRocksDBSstFileWriterTruncate(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Truncate will be used on this writer. + sst1, err := MakeRocksDBSstFileWriter() + if err != nil { + t.Fatal(err) + } + defer sst1.Close() + + // Truncate will not be used on this writer. + sst2, err := MakeRocksDBSstFileWriter() + if err != nil { + t.Fatal(err) + } + defer sst2.Close() + + const keyLen = 10 + const valLen = 950 + ts := hlc.Timestamp{WallTime: 1} + key := MVCCKey{Key: roachpb.Key(make([]byte, keyLen)), Timestamp: ts} + value := make([]byte, valLen) + + var resBuf1, resBuf2 []byte + const entries = 100000 + const truncateChunk = entries / 10 + for i := 0; i < entries; i++ { + key.Key = []byte(fmt.Sprintf("%09d", i)) + copy(value, key.Key) + + if err := sst1.Put(key, value); err != nil { + t.Fatal(err) + } + if err := sst2.Put(key, value); err != nil { + t.Fatal(err) + } + + if i > 0 && i%truncateChunk == 0 { + sst1Chunk, err := sst1.Truncate() + if err != nil { + t.Fatal(err) + } + t.Logf("iteration %d, truncate chunk\tlen=%d", i, len(sst1Chunk)) + + // Even though we added keys, it is not guaranteed strictly by the + // contract of Truncate that a byte slice will be returned. This is + // because the keys may be in un-flushed blocks. This test had been tuned + // such that every other batch chunk is always large enough to require at + // least one block to be flushed. + empty := len(sst1Chunk) == 0 + if i%(2*truncateChunk) == 0 { + if empty { + t.Fatalf("expected non-empty SST chunk during iteration %d", i) + } + resBuf1 = append(resBuf1, sst1Chunk...) + } else { + if !empty { + t.Fatalf("expected empty SST chunk during iteration %d", i) + } + } + } + } + + sst1FinishBuf, err := sst1.Finish() + if err != nil { + t.Fatal(err) + } + resBuf1 = append(resBuf1, sst1FinishBuf...) + t.Logf("truncated sst final chunk\t\tlen=%d", len(sst1FinishBuf)) + + resBuf2, err = sst2.Finish() + if err != nil { + t.Fatal(err) + } + t.Logf("non-truncated sst final chunk\tlen=%d", len(resBuf2)) + + if !bytes.Equal(resBuf1, resBuf2) { + t.Errorf("expected SST made up of truncate chunks (len=%d) to be equivalent to SST that "+ + "was not (len=%d)", len(sst1FinishBuf), len(resBuf2)) + } +} + func BenchmarkRocksDBSstFileWriter(b *testing.B) { dir, err := ioutil.TempDir("", "BenchmarkRocksDBSstFileWriter") if err != nil { @@ -757,7 +842,7 @@ func BenchmarkRocksDBSstFileWriter(b *testing.B) { kv.Key.Key = []byte(fmt.Sprintf("%09d", i)) copy(kv.Value, kv.Key.Key) b.StartTimer() - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { b.Fatal(err) } } @@ -800,7 +885,7 @@ func BenchmarkRocksDBSstFileReader(b *testing.B) { for i := 0; i < entries; i++ { kv.Key.Key = []byte(fmt.Sprintf("%09d", i)) copy(kv.Value, kv.Key.Key) - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { b.Fatal(err) } } @@ -1336,13 +1421,7 @@ func TestRocksDBDeleteRangeCompaction(t *testing.T) { defer sst.Close() for i := 0; i < numEntries; i++ { - kv := MVCCKeyValue{ - Key: MVCCKey{ - Key: makeKey(string(p), i), - }, - Value: randutil.RandBytes(rnd, valueSize), - } - if err := sst.Add(kv); err != nil { + if err := sst.Put(MVCCKey{Key: makeKey(string(p), i)}, randutil.RandBytes(rnd, valueSize)); err != nil { t.Fatal(err) } } @@ -1464,12 +1543,7 @@ func BenchmarkRocksDBDeleteRangeIterate(b *testing.B) { defer sst.Close() for i := 0; i < entries; i++ { - kv := MVCCKeyValue{ - Key: MVCCKey{ - Key: makeKey(i), - }, - } - if err := sst.Add(kv); err != nil { + if err := sst.Put(MVCCKey{Key: makeKey(i)}, nil); err != nil { b.Fatal(err) } } @@ -1588,10 +1662,10 @@ func TestSstFileWriterTimeBound(t *testing.T) { t.Fatal(sst) } defer sst.Close() - if err := sst.Add(MVCCKeyValue{ - Key: MVCCKey{Key: []byte("key"), Timestamp: hlc.Timestamp{WallTime: walltime}}, - Value: []byte("value"), - }); err != nil { + if err := sst.Put( + MVCCKey{Key: []byte("key"), Timestamp: hlc.Timestamp{WallTime: walltime}}, + []byte("value"), + ); err != nil { t.Fatal(err) } sstContents, err := sst.Finish() diff --git a/pkg/storage/engine/sst_iterator_test.go b/pkg/storage/engine/sst_iterator_test.go index 4070d94173d9..45989392a46b 100644 --- a/pkg/storage/engine/sst_iterator_test.go +++ b/pkg/storage/engine/sst_iterator_test.go @@ -86,7 +86,7 @@ func TestSSTIterator(t *testing.T) { }, Value: []byte{'a' + byte(i)}, } - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { t.Fatalf("%+v", err) } allKVs = append(allKVs, kv) diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index c91831dbf348..dfded040766b 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -391,7 +391,7 @@ func MakeSSTable(key, value string, ts hlc.Timestamp) ([]byte, engine.MVCCKeyVal Value: v.RawBytes, } - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { panic(errors.Wrap(err, "while finishing SSTable")) } b, err := sst.Finish() diff --git a/pkg/storage/rditer/replica_data_iter.go b/pkg/storage/rditer/replica_data_iter.go index f8a998250bc6..f63a56e61090 100644 --- a/pkg/storage/rditer/replica_data_iter.go +++ b/pkg/storage/rditer/replica_data_iter.go @@ -43,21 +43,60 @@ type ReplicaDataIterator struct { // MakeAllKeyRanges returns all key ranges for the given Range. func MakeAllKeyRanges(d *roachpb.RangeDescriptor) []KeyRange { - return makeReplicaKeyRanges(d, keys.MakeRangeIDPrefix) + return []KeyRange{ + MakeRangeIDLocalKeyRange(d.RangeID, false /* replicatedOnly */), + MakeRangeLocalKeyRange(d), + MakeUserKeyRange(d), + } } -// MakeReplicatedKeyRanges returns all key ranges that are fully Raft replicated -// for the given Range. +// MakeReplicatedKeyRanges returns all key ranges that are fully Raft +// replicated for the given Range. +// +// NOTE: The logic for receiving snapshot relies on this function returning the +// ranges in the following sorted order: +// +// 1. Replicated range-id local key range +// 2. Range-local key range +// 3. User key range func MakeReplicatedKeyRanges(d *roachpb.RangeDescriptor) []KeyRange { - return makeReplicaKeyRanges(d, keys.MakeRangeIDReplicatedPrefix) + return []KeyRange{ + MakeRangeIDLocalKeyRange(d.RangeID, true /* replicatedOnly */), + MakeRangeLocalKeyRange(d), + MakeUserKeyRange(d), + } } -// makeReplicaKeyRanges returns a slice of 3 key ranges. The last key range in -// the returned slice corresponds to the actual range data (i.e. not the range -// metadata). -func makeReplicaKeyRanges( - d *roachpb.RangeDescriptor, metaFunc func(roachpb.RangeID) roachpb.Key, -) []KeyRange { +// MakeRangeIDLocalKeyRange returns the range-id local key range. If +// replicatedOnly is true, then it returns only the replicated keys, otherwise, +// it only returns both the replicated and unreplicated keys. +func MakeRangeIDLocalKeyRange(rangeID roachpb.RangeID, replicatedOnly bool) KeyRange { + var prefixFn func(roachpb.RangeID) roachpb.Key + if replicatedOnly { + prefixFn = keys.MakeRangeIDReplicatedPrefix + } else { + prefixFn = keys.MakeRangeIDPrefix + } + sysRangeIDKey := prefixFn(rangeID) + return KeyRange{ + Start: engine.MakeMVCCMetadataKey(sysRangeIDKey), + End: engine.MakeMVCCMetadataKey(sysRangeIDKey.PrefixEnd()), + } +} + +// MakeRangeLocalKeyRange returns the range local key range. Range-local keys +// are replicated keys that do not belong to the range they would naturally +// sort into. For example, /Local/Range/Table/1 would sort into [/Min, +// /System), but it actually belongs to [/Table/1, /Table/2). +func MakeRangeLocalKeyRange(d *roachpb.RangeDescriptor) KeyRange { + return KeyRange{ + Start: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.StartKey)), + End: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.EndKey)), + } +} + +// MakeUserKeyRange returns the user key range. +func MakeUserKeyRange(d *roachpb.RangeDescriptor) KeyRange { // The first range in the keyspace starts at KeyMin, which includes the // node-local space. We need the original StartKey to find the range // metadata, but the actual data starts at LocalMax. @@ -65,20 +104,9 @@ func makeReplicaKeyRanges( if d.StartKey.Equal(roachpb.RKeyMin) { dataStartKey = keys.LocalMax } - sysRangeIDKey := metaFunc(d.RangeID) - return []KeyRange{ - { - Start: engine.MakeMVCCMetadataKey(sysRangeIDKey), - End: engine.MakeMVCCMetadataKey(sysRangeIDKey.PrefixEnd()), - }, - { - Start: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.StartKey)), - End: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.EndKey)), - }, - { - Start: engine.MakeMVCCMetadataKey(dataStartKey), - End: engine.MakeMVCCMetadataKey(d.EndKey.AsRawKey()), - }, + return KeyRange{ + Start: engine.MakeMVCCMetadataKey(dataStartKey), + End: engine.MakeMVCCMetadataKey(d.EndKey.AsRawKey()), } } diff --git a/pkg/storage/replica_application_state_machine.go b/pkg/storage/replica_application_state_machine.go index fc644d74320a..75829de30021 100644 --- a/pkg/storage/replica_application_state_machine.go +++ b/pkg/storage/replica_application_state_machine.go @@ -550,9 +550,10 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat if err != nil { return wrapWithNonDeterministicFailure(err, "unable to get replica for merge") } - const destroyData = false + const rangeIDLocalOnly = true + const mustClearRange = false if err := rhsRepl.preDestroyRaftMuLocked( - ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, destroyData, + ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, rangeIDLocalOnly, mustClearRange, ); err != nil { return wrapWithNonDeterministicFailure(err, "unable to destroy range before merge") } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index f1668813a64c..fb4ec63eb805 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -1313,16 +1313,30 @@ func execChangeReplicasTxn( // waiting for a second and final response from the recipient which indicates if // the snapshot was a success. // +// `receiveSnapshot` takes the key-value pairs sent and creates three SSTs from +// them for direct ingestion: one for the replicated range-ID local keys, one +// for the range local keys, and one for the user keys. The reason it creates +// three separate SSTs is to prevent overlaps with the memtable and existing +// SSTs in RocksDB. Each of the SSTs also has a range deletion tombstone to +// delete the existing data in the range. +// // Applying the snapshot: After the recipient has received the message // indicating it has all the data, it hands it all to -// `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks the -// same things as `shouldAcceptSnapshotData` to make sure nothing has changed -// while the snapshot was being transferred. It then guarantees that there is -// either an initialized[3] replica or a `ReplicaPlaceholder`[4] to accept the -// snapshot by creating a placeholder if necessary. Finally, a *Raft snapshot* -// message is manually handed to the replica's Raft node (by calling -// `stepRaftGroup` + `handleRaftReadyRaftMuLocked`), at which point the snapshot -// has been applied. +// `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks +// the same things as `shouldAcceptSnapshotData` to make sure nothing has +// changed while the snapshot was being transferred. It then guarantees that +// there is either an initialized[3] replica or a `ReplicaPlaceholder`[4] to +// accept the snapshot by creating a placeholder if necessary. Finally, a *Raft +// snapshot* message is manually handed to the replica's Raft node (by calling +// `stepRaftGroup` + `handleRaftReadyRaftMuLocked`). During the application +// process, several other SSTs may be created for direct ingestion. An SST for +// the unreplicated range-ID local keys is created for the Raft entries, hard +// state, and truncated state. An SST is created for deleting each subsumed +// replica's range-ID local keys and at most two SSTs are created for deleting +// the user keys and range local keys of all subsumed replicas. All in all, a +// maximum of 6 + SR SSTs will be created for direct ingestion where SR is the +// number of subsumed replicas. In the case where there are no subsumed +// replicas, 4 SSTs will be created. // // [1]: There is a third kind of snapshot, called "preemptive", which is how we // avoided the above fragility before learner replicas were introduced in the diff --git a/pkg/storage/replica_destroy.go b/pkg/storage/replica_destroy.go index 5dac458b4311..b2b5b253275d 100644 --- a/pkg/storage/replica_destroy.go +++ b/pkg/storage/replica_destroy.go @@ -65,12 +65,13 @@ func (s destroyStatus) Removed() bool { func (r *Replica) preDestroyRaftMuLocked( ctx context.Context, reader engine.Reader, - batch engine.Batch, + writer engine.Writer, nextReplicaID roachpb.ReplicaID, - destroyData bool, + rangeIDLocalOnly bool, + mustClearRange bool, ) error { desc := r.Desc() - err := clearRangeData(ctx, desc, reader, batch, destroyData) + err := clearRangeData(desc, reader, writer, rangeIDLocalOnly, mustClearRange) if err != nil { return err } @@ -80,7 +81,7 @@ func (r *Replica) preDestroyRaftMuLocked( // NB: Legacy tombstones (which are in the replicated key space) are wiped // in clearRangeData, but that's OK since we're writing a new one in the same // batch (and in particular, sequenced *after* the wipe). - return r.setTombstoneKey(ctx, batch, nextReplicaID) + return r.setTombstoneKey(ctx, writer, nextReplicaID) } func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCStats) error { @@ -109,22 +110,27 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS if r.raftMu.sideloaded != nil { return r.raftMu.sideloaded.Clear(ctx) } + return nil } // destroyRaftMuLocked deletes data associated with a replica, leaving a -// tombstone. If `destroyData` is true, data in all of the range's keyspaces -// will be deleted. Otherwise, only data in the range-ID local keyspace will be -// deleted. Requires that Replica.raftMu is held. +// tombstone. func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb.ReplicaID) error { startTime := timeutil.Now() ms := r.GetMVCCStats() - const destroyData = true batch := r.Engine().NewWriteOnlyBatch() defer batch.Close() - if err := r.preDestroyRaftMuLocked(ctx, r.Engine(), batch, nextReplicaID, destroyData); err != nil { + if err := r.preDestroyRaftMuLocked( + ctx, + r.Engine(), + batch, + nextReplicaID, + false, /* rangeIDLocalOnly */ + false, /* mustClearRange */ + ); err != nil { return err } preTime := timeutil.Now() @@ -172,7 +178,7 @@ func (r *Replica) cancelPendingCommandsLocked() { // ID that it hasn't yet received a RangeDescriptor for if it receives raft // requests for that replica ID (as seen in #14231). func (r *Replica) setTombstoneKey( - ctx context.Context, eng engine.ReadWriter, externalNextReplicaID roachpb.ReplicaID, + ctx context.Context, eng engine.Writer, externalNextReplicaID roachpb.ReplicaID, ) error { r.mu.Lock() nextReplicaID := r.mu.state.Desc.NextReplicaID @@ -188,6 +194,7 @@ func (r *Replica) setTombstoneKey( tombstone := &roachpb.RaftTombstone{ NextReplicaID: nextReplicaID, } - return engine.MVCCPutProto(ctx, eng, nil, tombstoneKey, - hlc.Timestamp{}, nil, tombstone) + // "Blind" because ms == nil and timestamp == hlc.Timestamp{}. + return engine.MVCCBlindPutProto(ctx, eng, nil, tombstoneKey, + hlc.Timestamp{}, tombstone, nil) } diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 443a73cc4dd3..cded380c95a2 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -482,8 +482,8 @@ func (s *OutgoingSnapshot) Close() { // IncomingSnapshot contains the data for an incoming streaming snapshot message. type IncomingSnapshot struct { SnapUUID uuid.UUID - // The RocksDB BatchReprs that make up this snapshot. - Batches [][]byte + // The storage interface for the underlying SSTs. + SSSS *SSTSnapshotStorageScratch // The Raft log entries for this snapshot. LogEntries [][]byte // The replica state at the time the snapshot was generated (never nil). @@ -593,9 +593,14 @@ func snapshot( // append is intentionally oblivious to the existence of sideloaded proposals. // They are managed by the caller, including cleaning up obsolete on-disk // payloads in case the log tail is replaced. +// +// NOTE: This method takes a engine.Writer because reads are unnecessary when +// prevLastIndex is 0 and prevLastTerm is invalidLastTerm. In the case where +// reading is necessary (I.E. entries are getting overwritten or deleted), a +// engine.ReadWriter must be passed in. func (r *Replica) append( ctx context.Context, - batch engine.ReadWriter, + eng engine.Writer, prevLastIndex uint64, prevLastTerm uint64, prevRaftLogSize int64, @@ -616,30 +621,43 @@ func (r *Replica) append( value.InitChecksum(key) var err error if ent.Index > prevLastIndex { - err = engine.MVCCBlindPut(ctx, batch, &diff, key, hlc.Timestamp{}, value, nil /* txn */) + err = engine.MVCCBlindPut(ctx, eng, &diff, key, hlc.Timestamp{}, value, nil /* txn */) } else { - err = engine.MVCCPut(ctx, batch, &diff, key, hlc.Timestamp{}, value, nil /* txn */) + // We type assert eng to also be an engine.Reader only in the case where + // we're replacing existing entries. + eng, ok := eng.(engine.ReadWriter) + if !ok { + return 0, 0, 0, errors.Errorf("expected eng to be a engine.ReadWriter when overwriting log entries") + } + err = engine.MVCCPut(ctx, eng, &diff, key, hlc.Timestamp{}, value, nil /* txn */) } if err != nil { return 0, 0, 0, err } } - // Delete any previously appended log entries which never committed. lastIndex := entries[len(entries)-1].Index lastTerm := entries[len(entries)-1].Term - for i := lastIndex + 1; i <= prevLastIndex; i++ { - // Note that the caller is in charge of deleting any sideloaded payloads - // (which they must only do *after* the batch has committed). - err := engine.MVCCDelete(ctx, batch, &diff, r.raftMu.stateLoader.RaftLogKey(i), - hlc.Timestamp{}, nil /* txn */) - if err != nil { - return 0, 0, 0, err + // Delete any previously appended log entries which never committed. + if prevLastIndex > 0 { + // We type assert eng to also be an engine.Reader only in the case where + // we're deleting existing entries. + eng, ok := eng.(engine.ReadWriter) + if !ok { + return 0, 0, 0, errors.Errorf("expected eng to be a engine.ReadWriter when deleting log entries") + } + for i := lastIndex + 1; i <= prevLastIndex; i++ { + // Note that the caller is in charge of deleting any sideloaded payloads + // (which they must only do *after* the batch has committed). + err := engine.MVCCDelete(ctx, eng, &diff, r.raftMu.stateLoader.RaftLogKey(i), + hlc.Timestamp{}, nil /* txn */) + if err != nil { + return 0, 0, 0, err + } } } raftLogSize := prevRaftLogSize + diff.SysBytes - return lastIndex, lastTerm, raftLogSize, nil } @@ -673,62 +691,38 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error { return nil } +// clearRangeData clears the data associated with a range descriptor. If +// rangeIDLocalOnly is true, then only the range-id local keys are deleted. +// Otherwise, the range-id local keys, range local keys, and user keys are all +// deleted. If mustClearRange is true, ClearRange will always be used to remove +// the keys. Otherwise, ClearRangeWithHeuristic will be used, which chooses +// ClearRange or ClearIterRange depending on how many keys there are in the +// range. func clearRangeData( - ctx context.Context, desc *roachpb.RangeDescriptor, eng engine.Reader, - batch engine.Batch, - destroyData bool, + writer engine.Writer, + rangeIDLocalOnly bool, + mustClearRange bool, ) error { - iter := eng.NewIterator(engine.IterOptions{UpperBound: desc.EndKey.AsRawKey()}) - defer iter.Close() - - // It is expensive for there to be many range deletion tombstones in the same - // sstable because all of the tombstones in an sstable are loaded whenever the - // sstable is accessed. So we avoid using range deletion unless there is some - // minimum number of keys. The value here was pulled out of thin air. It might - // be better to make this dependent on the size of the data being deleted. Or - // perhaps we should fix RocksDB to handle large numbers of tombstones in an - // sstable better. - const clearRangeMinKeys = 64 - keyRanges := rditer.MakeAllKeyRanges(desc) - if !destroyData { - // TODO(benesch): The fact that we hardcode the number of - // "metadata" ranges (i.e. non-user-keyspace) suggests that - // rditer.MakeAllKeyRanges has the wrong API. - keyRanges = keyRanges[:1] + var keyRanges []rditer.KeyRange + if rangeIDLocalOnly { + keyRanges = []rditer.KeyRange{rditer.MakeRangeIDLocalKeyRange(desc.RangeID, false)} + } else { + keyRanges = rditer.MakeAllKeyRanges(desc) } - for _, keyRange := range keyRanges { - // Peek into the range to see whether it's large enough to justify - // ClearRange. Note that the work done here is bounded by - // clearRangeMinKeys, so it will be fairly cheap even for large - // ranges. - // - // TODO(bdarnell): Move this into ClearIterRange so we don't have - // to do this scan twice. - count := 0 - iter.Seek(keyRange.Start) - for { - valid, err := iter.Valid() - if err != nil { - return err - } - if !valid || !iter.Key().Less(keyRange.End) { - break - } - count++ - if count > clearRangeMinKeys { - break - } - iter.Next() - } - var err error - if count > clearRangeMinKeys { - err = batch.ClearRange(keyRange.Start, keyRange.End) - } else { - err = batch.ClearIterRange(iter, keyRange.Start, keyRange.End) + + var clearRangeFn func(engine.Reader, engine.Writer, engine.MVCCKey, engine.MVCCKey) error + if mustClearRange { + clearRangeFn = func(eng engine.Reader, writer engine.Writer, start, end engine.MVCCKey) error { + return writer.ClearRange(start, end) } - if err != nil { + } else { + clearRangeFn = engine.ClearRangeWithHeuristic + } + + for _, keyRange := range keyRanges { + if err := clearRangeFn(eng, writer, keyRange.Start, keyRange.End); err != nil { return err } } @@ -806,172 +800,144 @@ func (r *Replica) applySnapshot( } var stats struct { - clear time.Time - batch time.Time - entries time.Time - commit time.Time - } - - var size int - for _, b := range inSnap.Batches { - size += len(b) - } - for _, e := range inSnap.LogEntries { - size += len(e) - } - - log.Infof(ctx, "applying %s snapshot at index %d "+ - "(id=%s, encoded size=%d, %d rocksdb batches, %d log entries)", - snapType, snap.Metadata.Index, inSnap.SnapUUID.Short(), - size, len(inSnap.Batches), len(inSnap.LogEntries)) + // Time to clear unreplicated range-ID local keys and update unreplicated + // state. + unreplicatedState time.Time + // Time to process subsumed replicas. + subsumedReplicas time.Time + // Time to ingest SSTs. + ingestion time.Time + } + log.Infof(ctx, "applying %s snapshot [id=%s index=%d]", + snapType, inSnap.SnapUUID.Short(), snap.Metadata.Index) defer func(start time.Time) { now := timeutil.Now() - log.Infof(ctx, "applied %s snapshot in %0.0fms [clear=%0.0fms batch=%0.0fms entries=%0.0fms commit=%0.0fms]", - snapType, now.Sub(start).Seconds()*1000, - stats.clear.Sub(start).Seconds()*1000, - stats.batch.Sub(stats.clear).Seconds()*1000, - stats.entries.Sub(stats.batch).Seconds()*1000, - stats.commit.Sub(stats.entries).Seconds()*1000) - }(timeutil.Now()) - - // Use a more efficient write-only batch because we don't need to do any - // reads from the batch. - batch := r.store.Engine().NewWriteOnlyBatch() - defer batch.Close() - - // If we're subsuming a replica below, we don't have its last NextReplicaID, - // nor can we obtain it. That's OK: we can just be conservative and use the - // maximum possible replica ID. preDestroyRaftMuLocked will write a replica - // tombstone using this maximum possible replica ID, which would normally be - // problematic, as it would prevent this store from ever having a new replica - // of the removed range. In this case, however, it's copacetic, as subsumed - // ranges _can't_ have new replicas. - const subsumedNextReplicaID = math.MaxInt32 - - // As part of applying the snapshot, we may need to subsume replicas that have - // been merged into this range. Destroy their data in the same batch in which - // we apply the snapshot. - for _, sr := range subsumedRepls { - if err := sr.preDestroyRaftMuLocked( - ctx, r.store.Engine(), batch, subsumedNextReplicaID, true, /* destroyData */ - ); err != nil { - return err + totalLog := fmt.Sprintf( + "total=%0.0fms ", + now.Sub(start).Seconds()*1000, + ) + unreplicatedStateLog := fmt.Sprintf( + "unreplicatedState=%0.0fms ", + stats.unreplicatedState.Sub(start).Seconds()*1000, + ) + var subsumedReplicasLog string + if len(subsumedRepls) > 0 { + subsumedReplicasLog = fmt.Sprintf( + "subsumedReplicas=%d@%0.0fms ", + len(subsumedRepls), + stats.subsumedReplicas.Sub(stats.unreplicatedState).Seconds()*1000, + ) } - } + ingestionLog := fmt.Sprintf( + "ingestion=%d@%0.0fms ", + len(inSnap.SSSS.SSTs()), + stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000, + ) + log.Infof(ctx, "applied %s snapshot [%s%s%s%sid=%s index=%d]", + snapType, totalLog, unreplicatedStateLog, subsumedReplicasLog, + ingestionLog, inSnap.SnapUUID.Short(), snap.Metadata.Index) + }(timeutil.Now()) - // Delete everything in the range and recreate it from the snapshot. - // We need to delete any old Raft log entries here because any log entries - // that predate the snapshot will be orphaned and never truncated or GC'd. - if err := clearRangeData(ctx, s.Desc, r.store.Engine(), batch, true /* destroyData */); err != nil { + unreplicatedSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { return err } - // Clear the cached raft log entries to ensure that old or uncommitted - // entries don't impact the in-memory state. - r.store.raftEntryCache.Drop(r.RangeID) - stats.clear = timeutil.Now() + defer unreplicatedSST.Close() - // Write the snapshot into the range. - for _, batchRepr := range inSnap.Batches { - if err := batch.ApplyBatchRepr(batchRepr, false); err != nil { - return err - } + // Clearing the unreplicated state. + unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID) + unreplicatedStart := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey) + unreplicatedEnd := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey.PrefixEnd()) + if err = unreplicatedSST.ClearRange(unreplicatedStart, unreplicatedEnd); err != nil { + return errors.Wrapf(err, "error clearing range of unreplicated SST writer") } - // The log entries are all written to distinct keys so we can use a - // distinct batch. - distinctBatch := batch.Distinct() - stats.batch = timeutil.Now() + // Update HardState. + if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { + return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + } + // Update TruncatedState if it is unreplicated. if inSnap.UsesUnreplicatedTruncatedState { - // We're using the unreplicated truncated state, which we need to - // manually persist to disk. If we're not taking this branch, the - // snapshot contains a legacy TruncatedState and we don't need to do - // anything (in fact, must not -- the invariant is that exactly one of - // them exists at any given point in the state machine). - if err := stateloader.Make(s.Desc.RangeID).SetRaftTruncatedState( - ctx, distinctBatch, s.TruncatedState, + if err := r.raftMu.stateLoader.SetRaftTruncatedState( + ctx, &unreplicatedSST, s.TruncatedState, ); err != nil { - return err + return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer") } } - logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) - for i, bytes := range inSnap.LogEntries { - if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { - return err - } - } - // If this replica doesn't know its ReplicaID yet, we're applying a - // preemptive snapshot. In this case, we're going to have to write the - // sideloaded proposals into the Raft log. Otherwise, sideload. + // Update Raft entries. + var lastTerm uint64 var raftLogSize int64 - thinEntries := logEntries - if replicaID != 0 { + if len(inSnap.LogEntries) > 0 { + logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) + for i, bytes := range inSnap.LogEntries { + if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { + return err + } + } + // If this replica doesn't know its ReplicaID yet, we're applying a + // preemptive snapshot. In this case, we're going to have to write the + // sideloaded proposals into the Raft log. Otherwise, sideload. + if replicaID != 0 { + var err error + var sideloadedEntriesSize int64 + logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries) + if err != nil { + return err + } + raftLogSize += sideloadedEntriesSize + } var err error - var sideloadedEntriesSize int64 - thinEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries) + _, lastTerm, raftLogSize, err = r.append(ctx, &unreplicatedSST, 0, invalidLastTerm, raftLogSize, logEntries) if err != nil { return err } - raftLogSize += sideloadedEntriesSize + } else { + lastTerm = invalidLastTerm } + r.store.raftEntryCache.Drop(r.RangeID) - // Write the snapshot's Raft log into the range. - var lastTerm uint64 - _, lastTerm, raftLogSize, err = r.append( - ctx, distinctBatch, 0, invalidLastTerm, raftLogSize, thinEntries, - ) - if err != nil { + stats.unreplicatedState = timeutil.Now() + if err := inSnap.SSSS.WriteSST(ctx, &unreplicatedSST); err != nil { return err } - stats.entries = timeutil.Now() - - // Note that since this snapshot comes from Raft, we don't have to synthesize - // the HardState -- Raft wouldn't ask us to update the HardState in incorrect - // ways. - if err := r.raftMu.stateLoader.SetHardState(ctx, distinctBatch, hs); err != nil { - return errors.Wrapf(err, "unable to persist HardState %+v", &hs) - } - - // We need to close the distinct batch and start using the normal batch for - // the read below. - distinctBatch.Close() - // As outlined above, last and applied index are the same after applying - // the snapshot (i.e. the snapshot has no uncommitted tail). if s.RaftAppliedIndex != snap.Metadata.Index { log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d", s.RaftAppliedIndex, snap.Metadata.Index) } - // We've written Raft log entries, so we need to sync the WAL. - if err := batch.Commit(!disableSyncRaftLog.Get(&r.store.cfg.Settings.SV)); err != nil { + // If we're subsuming a replica below, we don't have its last NextReplicaID, + // nor can we obtain it. That's OK: we can just be conservative and use the + // maximum possible replica ID. preDestroyRaftMuLocked will write a replica + // tombstone using this maximum possible replica ID, which would normally be + // problematic, as it would prevent this store from ever having a new replica + // of the removed range. In this case, however, it's copacetic, as subsumed + // ranges _can't_ have new replicas. + const subsumedNextReplicaID = math.MaxInt32 + if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSSS, s.Desc, subsumedRepls, subsumedNextReplicaID); err != nil { return err } - stats.commit = timeutil.Now() + stats.subsumedReplicas = timeutil.Now() + + // Ingest all SSTs atomically. + if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil { + if err := fn(inSnap, snapType, inSnap.SSSS.SSTs()); err != nil { + return err + } + } + if err := r.store.engine.IngestExternalFiles(ctx, inSnap.SSSS.SSTs(), true /* skipWritingSeqNo */, true /* modify */); err != nil { + return errors.Wrapf(err, "while ingesting %s", inSnap.SSSS.SSTs()) + } + stats.ingestion = timeutil.Now() // The on-disk state is now committed, but the corresponding in-memory state // has not yet been updated. Any errors past this point must therefore be // treated as fatal. - for _, sr := range subsumedRepls { - // We removed sr's data when we committed the batch. Finish subsumption by - // updating the in-memory bookkeping. - if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil { - log.Fatalf(ctx, "unable to finish destroying %s while applying snapshot: %+v", sr, err) - } - // We already hold sr's raftMu, so we must call removeReplicaImpl directly. - // Note that it's safe to update the store's metadata for sr's removal - // separately from updating the store's metadata for r's new descriptor - // (i.e., under a different store.mu acquisition). Each store.mu acquisition - // leaves the store in a consistent state, and access to the replicas - // themselves is protected by their raftMus, which are held from start to - // finish. - if err := r.store.removeReplicaImpl(ctx, sr, subsumedNextReplicaID, RemoveOptions{ - DestroyData: false, // data is already destroyed - }); err != nil { - log.Fatalf(ctx, "unable to remove %s while applying snapshot: %+v", sr, err) - } + if err := r.clearSubsumedReplicaInMemoryData(ctx, subsumedRepls, subsumedNextReplicaID); err != nil { + log.Fatalf(ctx, "failed to clear in-memory data of subsumed replicas while applying snapshot: %+v", err) } // Atomically swap the placeholder, if any, for the replica, and update the @@ -1034,6 +1000,137 @@ func (r *Replica) applySnapshot( return nil } +// clearSubsumedReplicaDiskData clears the on disk data of the subsumed +// replicas by creating SSTs with range deletion tombstones. We have to be +// careful here not to have overlapping ranges with the SSTs we have already +// created since that will throw an error while we are ingesting them. This +// method requires that each of the subsumed replicas raftMu is held. +func (r *Replica) clearSubsumedReplicaDiskData( + ctx context.Context, + ssss *SSTSnapshotStorageScratch, + desc *roachpb.RangeDescriptor, + subsumedRepls []*Replica, + subsumedNextReplicaID roachpb.ReplicaID, +) error { + getKeyRanges := func(desc *roachpb.RangeDescriptor) [2]rditer.KeyRange { + return [...]rditer.KeyRange{ + rditer.MakeRangeLocalKeyRange(desc), + rditer.MakeUserKeyRange(desc), + } + } + keyRanges := getKeyRanges(desc) + totalKeyRanges := append([]rditer.KeyRange(nil), keyRanges[:]...) + for _, sr := range subsumedRepls { + // We have to create an SST for the subsumed replica's range-id local keys. + subsumedReplSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + // NOTE: We set mustClearRange to true because we are setting + // RaftTombstoneKey. Since Clears and Puts need to be done in increasing + // order of keys, it is not safe to use ClearRangeIter. + if err := sr.preDestroyRaftMuLocked( + ctx, + r.store.Engine(), + &subsumedReplSST, + subsumedNextReplicaID, + true, /* rangeIDLocalOnly */ + true, /* mustClearRange */ + ); err != nil { + subsumedReplSST.Close() + return err + } + if err := ssss.WriteSST(ctx, &subsumedReplSST); err != nil { + return err + } + + srKeyRanges := getKeyRanges(sr.Desc()) + // Compute the total key space covered by the current replica and all + // subsumed replicas. + for i := range srKeyRanges { + if srKeyRanges[i].Start.Key.Compare(totalKeyRanges[i].Start.Key) < 0 { + totalKeyRanges[i].Start = srKeyRanges[i].Start + } + if srKeyRanges[i].End.Key.Compare(totalKeyRanges[i].End.Key) > 0 { + totalKeyRanges[i].End = srKeyRanges[i].End + } + } + } + + // We might have to create SSTs for the range local keys and user keys + // depending on if the subsumed replicas are not fully contained by the + // replica in our snapshot. The following is an example to this case + // happening. + // + // a b c d + // |---1---|-------2-------| S1 + // |---1-------------------| S2 + // |---1-----------|---3---| S3 + // + // Since the merge is the first operation to happen, a follower could be down + // before it completes. It is reasonable for a snapshot for r1 from S3 to + // subsume both r1 and r2 in S1. + for i := range keyRanges { + if totalKeyRanges[i].End.Key.Compare(keyRanges[i].End.Key) > 0 { + subsumedReplSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + if err := engine.ClearRangeWithHeuristic( + r.store.Engine(), + &subsumedReplSST, + keyRanges[i].End, + totalKeyRanges[i].End, + ); err != nil { + subsumedReplSST.Close() + return err + } + if err := ssss.WriteSST(ctx, &subsumedReplSST); err != nil { + return err + } + } + // The snapshot must never subsume a replica that extends the range of the + // replica to the left. This is because splits and merges (the only + // operation that change the key bounds) always leave the start key intact. + // Extending to the left implies that either we merged "to the left" (we + // don't), or that we're applying a snapshot for another range (we don't do + // that either). Something is severely wrong for this to happen. + if totalKeyRanges[i].Start.Key.Compare(keyRanges[i].Start.Key) < 0 { + log.Fatalf(ctx, "subsuming replica to our left; key range: %v; total key range %v", + keyRanges[i], totalKeyRanges[i]) + } + } + return nil +} + +// clearSubsumedReplicaInMemoryData clears the in-memory data of the subsumed +// replicas. This method requires that each of the subsumed replicas raftMu is +// held. +func (r *Replica) clearSubsumedReplicaInMemoryData( + ctx context.Context, subsumedRepls []*Replica, subsumedNextReplicaID roachpb.ReplicaID, +) error { + for _, sr := range subsumedRepls { + // We removed sr's data when we committed the batch. Finish subsumption by + // updating the in-memory bookkeping. + if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil { + return err + } + // We already hold sr's raftMu, so we must call removeReplicaImpl directly. + // Note that it's safe to update the store's metadata for sr's removal + // separately from updating the store's metadata for r's new descriptor + // (i.e., under a different store.mu acquisition). Each store.mu + // acquisition leaves the store in a consistent state, and access to the + // replicas themselves is protected by their raftMus, which are held from + // start to finish. + if err := r.store.removeReplicaImpl(ctx, sr, subsumedNextReplicaID, RemoveOptions{ + DestroyData: false, // data is already destroyed + }); err != nil { + return err + } + } + return nil +} + type raftCommandEncodingVersion byte // Raft commands are encoded with a 1-byte version (currently 0 or 1), an 8-byte diff --git a/pkg/storage/replica_sst_snapshot_storage.go b/pkg/storage/replica_sst_snapshot_storage.go new file mode 100644 index 000000000000..eecf6bd41d64 --- /dev/null +++ b/pkg/storage/replica_sst_snapshot_storage.go @@ -0,0 +1,198 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/pkg/errors" + "golang.org/x/time/rate" +) + +// SSTSnapshotStorage provides an interface to create scratches and owns the +// directory of scratches created. A scratch manages the SSTs created during a +// specific snapshot. +type SSTSnapshotStorage struct { + engine engine.Engine + limiter *rate.Limiter + dir string +} + +// NewSSTSnapshotStorage creates a new SST snapshot storage. +func NewSSTSnapshotStorage(engine engine.Engine, limiter *rate.Limiter) SSTSnapshotStorage { + return SSTSnapshotStorage{ + engine: engine, + limiter: limiter, + dir: filepath.Join(engine.GetAuxiliaryDir(), "sstsnapshot"), + } +} + +// NewSSTSnapshotStorageScratch creates a new SST snapshot storage scratch for +// a specific snapshot. +func (sss *SSTSnapshotStorage) NewSSTSnapshotStorageScratch( + rangeID roachpb.RangeID, snapUUID uuid.UUID, +) *SSTSnapshotStorageScratch { + snapDir := filepath.Join(sss.dir, strconv.Itoa(int(rangeID)), snapUUID.String()) + ssss := &SSTSnapshotStorageScratch{ + sss: sss, + snapDir: snapDir, + } + return ssss +} + +// Clear removes all created directories and SSTs. +func (sss *SSTSnapshotStorage) Clear() error { + return os.RemoveAll(sss.dir) +} + +// SSTSnapshotStorageScratch keeps track of the SST files incrementally created +// when receiving a snapshot. Each scratch is associated with a specific +// snapshot. +type SSTSnapshotStorageScratch struct { + sss *SSTSnapshotStorage + ssts []string + snapDir string + dirCreated bool +} + +func (ssss *SSTSnapshotStorageScratch) filename(id int) string { + return filepath.Join(ssss.snapDir, fmt.Sprintf("%d.sst", id)) +} + +func (ssss *SSTSnapshotStorageScratch) createDir() error { + err := os.MkdirAll(ssss.snapDir, 0755) + ssss.dirCreated = ssss.dirCreated || err == nil + return err +} + +// NewFile adds another file to SSTSnapshotStorageScratch. This file is lazily +// created when the file is written to the first time. +func (ssss *SSTSnapshotStorageScratch) NewFile() (*SSTSnapshotStorageFile, error) { + id := len(ssss.ssts) + filename := ssss.filename(id) + ssss.ssts = append(ssss.ssts, filename) + sssf := &SSTSnapshotStorageFile{ + ssss: ssss, + filename: filename, + } + return sssf, nil +} + +// WriteSST writes an entire RocksDBSstFileWriter to a file. The method closes +// the provided SST when it is finished using it. If the provided SST is empty, +// then no file will be created and nothing will be written. +func (ssss *SSTSnapshotStorageScratch) WriteSST( + ctx context.Context, sst *engine.RocksDBSstFileWriter, +) error { + defer sst.Close() + if sst.DataSize == 0 { + return nil + } + data, err := sst.Finish() + if err != nil { + return err + } + sssf, err := ssss.NewFile() + if err != nil { + return err + } + defer func() { + // Closing an SSTSnapshotStorageFile multiple times is idempotent. Nothing + // actionable if closing fails. + _ = sssf.Close() + }() + if err := sssf.Write(ctx, data); err != nil { + return err + } + return sssf.Close() +} + +// SSTs returns the names of the files created. +func (ssss *SSTSnapshotStorageScratch) SSTs() []string { + return ssss.ssts +} + +// Clear removes the directory and SSTs created for a particular snapshot. +func (ssss *SSTSnapshotStorageScratch) Clear() error { + return os.RemoveAll(ssss.snapDir) +} + +// SSTSnapshotStorageFile is an SST file managed by a +// SSTSnapshotStorageScratch. +type SSTSnapshotStorageFile struct { + ssss *SSTSnapshotStorageScratch + created bool + file engine.DBFile + filename string +} + +func (sssf *SSTSnapshotStorageFile) openFile() error { + if sssf.created { + if sssf.file == nil { + return errors.Errorf("file has already been closed") + } + return nil + } + if !sssf.ssss.dirCreated { + if err := sssf.ssss.createDir(); err != nil { + return err + } + } + file, err := sssf.ssss.sss.engine.OpenFile(sssf.filename) + if err != nil { + return err + } + sssf.file = file + sssf.created = true + return nil +} + +// Write writes contents to the file while respecting the limiter passed into +// SSTSnapshotStorageScratch. Writing empty contents is okay and is treated as +// a noop. The file must have not been closed. +func (sssf *SSTSnapshotStorageFile) Write(ctx context.Context, contents []byte) error { + if len(contents) == 0 { + return nil + } + if err := sssf.openFile(); err != nil { + return err + } + limitBulkIOWrite(ctx, sssf.ssss.sss.limiter, len(contents)) + if err := sssf.file.Append(contents); err != nil { + return err + } + return sssf.file.Sync() +} + +// Close closes the file. Calling this function multiple times is idempotent. +// The file must have been written to before being closed. +func (sssf *SSTSnapshotStorageFile) Close() error { + // We throw an error for empty files because it would be an error to ingest + // an empty SST so catch this error earlier. + if !sssf.created { + return errors.New("file is empty") + } + if sssf.file == nil { + return nil + } + if err := sssf.file.Close(); err != nil { + return err + } + sssf.file = nil + return nil +} diff --git a/pkg/storage/replica_sst_snapshot_storage_test.go b/pkg/storage/replica_sst_snapshot_storage_test.go new file mode 100644 index 000000000000..1aacdcb14259 --- /dev/null +++ b/pkg/storage/replica_sst_snapshot_storage_test.go @@ -0,0 +1,98 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package storage + +import ( + "context" + "io/ioutil" + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func TestSSTSnapshotStorage(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.TODO() + testRangeID := roachpb.RangeID(1) + testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890"))) + testLimiter := rate.NewLimiter(rate.Inf, 0) + + cleanup, cache, eng := newRocksDB(t) + defer cleanup() + defer cache.Release() + defer eng.Close() + + sss := NewSSTSnapshotStorage(eng, testLimiter) + ssss := sss.NewSSTSnapshotStorageScratch(testRangeID, testSnapUUID) + + // Check that the storage lazily creates the directories on first write. + _, err := os.Stat(ssss.snapDir) + if !os.IsNotExist(err) { + t.Fatalf("expected %s to not exist", ssss.snapDir) + } + + sssf, err := ssss.NewFile() + require.NoError(t, err) + defer func() { + require.NoError(t, sssf.Close()) + }() + + // Check that even though the files aren't created, they are still recorded in SSTs(). + require.Equal(t, len(ssss.SSTs()), 1) + + // Check that the storage lazily creates the files on write. + for _, fileName := range ssss.SSTs() { + _, err := os.Stat(fileName) + if !os.IsNotExist(err) { + t.Fatalf("expected %s to not exist", fileName) + } + } + + require.NoError(t, sssf.Write(ctx, []byte("foo"))) + + // After writing to files, check that they have been flushed to disk. + for _, fileName := range ssss.SSTs() { + require.FileExists(t, fileName) + data, err := ioutil.ReadFile(fileName) + require.NoError(t, err) + require.Equal(t, data, []byte("foo")) + } + + // Check that closing is idempotent. + require.NoError(t, sssf.Close()) + require.NoError(t, sssf.Close()) + + // Check that writing to a closed file is an error. + require.EqualError(t, sssf.Write(ctx, []byte("foo")), "file has already been closed") + + // Check that closing an empty file is an error. + sssf, err = ssss.NewFile() + require.NoError(t, err) + require.EqualError(t, sssf.Close(), "file is empty") + require.NoError(t, sssf.Write(ctx, []byte("foo"))) + + // Check that Clear removes the directory. + require.NoError(t, ssss.Clear()) + _, err = os.Stat(ssss.snapDir) + if !os.IsNotExist(err) { + t.Fatalf("expected %s to not exist", ssss.snapDir) + } + require.NoError(t, sss.Clear()) + _, err = os.Stat(sss.dir) + if !os.IsNotExist(err) { + t.Fatalf("expected %s to not exist", sss.dir) + } +} diff --git a/pkg/storage/stateloader/stateloader.go b/pkg/storage/stateloader/stateloader.go index 9174d0a7ec7e..b401dfffa657 100644 --- a/pkg/storage/stateloader/stateloader.go +++ b/pkg/storage/stateloader/stateloader.go @@ -541,13 +541,21 @@ func (rsl StateLoader) LoadRaftTruncatedState( // SetRaftTruncatedState overwrites the truncated state. func (rsl StateLoader) SetRaftTruncatedState( - ctx context.Context, eng engine.ReadWriter, truncState *roachpb.RaftTruncatedState, + ctx context.Context, eng engine.Writer, truncState *roachpb.RaftTruncatedState, ) error { if (*truncState == roachpb.RaftTruncatedState{}) { return errors.New("cannot persist empty RaftTruncatedState") } - return engine.MVCCPutProto(ctx, eng, nil, /* ms */ - rsl.RaftTruncatedStateKey(), hlc.Timestamp{}, nil, truncState) + // "Blind" because ms == nil and timestamp == hlc.Timestamp{}. + return engine.MVCCBlindPutProto( + ctx, + eng, + nil, /* ms */ + rsl.RaftTruncatedStateKey(), + hlc.Timestamp{}, /* timestamp */ + truncState, + nil, /* txn */ + ) } // LoadHardState loads the HardState. @@ -566,10 +574,18 @@ func (rsl StateLoader) LoadHardState( // SetHardState overwrites the HardState. func (rsl StateLoader) SetHardState( - ctx context.Context, batch engine.ReadWriter, st raftpb.HardState, + ctx context.Context, batch engine.Writer, st raftpb.HardState, ) error { - return engine.MVCCPutProto(ctx, batch, nil, - rsl.RaftHardStateKey(), hlc.Timestamp{}, nil, &st) + // "Blind" because ms == nil and timestamp == hlc.Timestamp{}. + return engine.MVCCBlindPutProto( + ctx, + batch, + nil, /* ms */ + rsl.RaftHardStateKey(), + hlc.Timestamp{}, /* timestamp */ + &st, + nil, /* txn */ + ) } // SynthesizeRaftState creates a Raft state which synthesizes both a HardState diff --git a/pkg/storage/store.go b/pkg/storage/store.go index befd75eeed12..05a276f6b239 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -412,6 +412,7 @@ type Store struct { raftEntryCache *raftentry.Cache limiters batcheval.Limiters txnWaitMetrics *txnwait.Metrics + sss SSTSnapshotStorage // gossipRangeCountdown and leaseRangeCountdown are countdowns of // changes to range and leaseholder counts, after which the store @@ -865,6 +866,16 @@ func NewStore( s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter( "exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)), ) + + // The snapshot storage is usually empty at this point since it is cleared + // after each snapshot application, except when the node crashed right before + // it can clean it up. If this fails it's not a correctness issue since the + // storage is also cleared before receiving a snapshot. + s.sss = NewSSTSnapshotStorage(s.engine, s.limiters.BulkIOWriteRate) + if err := s.sss.Clear(); err != nil { + log.Warningf(ctx, "failed to clear snapshot storage: %v", err) + } + // On low-CPU instances, a default limit value may still allow ExportRequests // to tie up all cores so cap limiter at cores-1 when setting value is higher. exportCores := runtime.NumCPU() - 1 diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 7aafe83985a4..b11fc4fb6596 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -78,6 +79,9 @@ type snapshotStrategy interface { // Status provides a status report on the work performed during the // snapshot. Only valid if the strategy succeeded. Status() string + + // Close cleans up any resources associated with the snapshot strategy. + Close(context.Context) } func assertStrategy( @@ -94,47 +98,225 @@ type kvBatchSnapshotStrategy struct { raftCfg *base.RaftConfig status string - // Fields used when sending snapshots. + // The size of the batches of PUT operations to send to the receiver of the + // snapshot. Only used on the sender side. batchSize int64 - limiter *rate.Limiter - newBatch func() engine.Batch + // Limiter for sending KV batches. Only used on the sender side. + limiter *rate.Limiter + // Only used on the sender side. + newBatch func() engine.Batch + + // The approximate size of the SST chunk to buffer in memory on the receiver + // before flushing to disk. Only used on the receiver side. + sstChunkSize int64 + // Only used on the receiver side. + ssss *SSTSnapshotStorageScratch +} + +// multiSSTWriter is a wrapper around RocksDBSstFileWriter and +// SSTSnapshotStorageScratch that handles chunking SSTs and persisting them to +// disk. +type multiSSTWriter struct { + ssss *SSTSnapshotStorageScratch + currSST engine.RocksDBSstFileWriter + currSSTFile *SSTSnapshotStorageFile + keyRanges []rditer.KeyRange + currRange int + // The size of the SST the last time the SST file writer was truncated. This + // size is used to determine the size of the SST chunk buffered in-memory. + truncatedSize int64 + // The approximate size of the SST chunk to buffer in memory on the receiver + // before flushing to disk. + sstChunkSize int64 +} + +func newMultiSSTWriter( + ssss *SSTSnapshotStorageScratch, keyRanges []rditer.KeyRange, sstChunkSize int64, +) (multiSSTWriter, error) { + msstw := multiSSTWriter{ + ssss: ssss, + keyRanges: keyRanges, + sstChunkSize: sstChunkSize, + } + if err := msstw.initSST(); err != nil { + return msstw, err + } + return msstw, nil +} + +func (msstw *multiSSTWriter) initSST() error { + newSSTFile, err := msstw.ssss.NewFile() + if err != nil { + return errors.Wrap(err, "failed to create new sst file") + } + msstw.currSSTFile = newSSTFile + newSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return errors.Wrap(err, "failed to create sst file writer") + } + msstw.currSST = newSST + if err := msstw.currSST.ClearRange(msstw.keyRanges[msstw.currRange].Start, msstw.keyRanges[msstw.currRange].End); err != nil { + msstw.currSST.Close() + return errors.Wrap(err, "failed to clear range on sst file writer") + } + msstw.truncatedSize = 0 + return nil +} + +func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { + chunk, err := msstw.currSST.Finish() + if err != nil { + return errors.Wrap(err, "failed to finish sst") + } + if err := msstw.currSSTFile.Write(ctx, chunk); err != nil { + return errors.Wrap(err, "failed to write to sst file") + } + if err := msstw.currSSTFile.Close(); err != nil { + return errors.Wrap(err, "failed to close sst file") + } + msstw.currRange++ + msstw.currSST.Close() + return nil +} + +func (msstw *multiSSTWriter) Put(ctx context.Context, key engine.MVCCKey, value []byte) error { + for msstw.keyRanges[msstw.currRange].End.Key.Compare(key.Key) <= 0 { + // Finish the current SST, write to the file, and move to the next key + // range. + if err := msstw.finalizeSST(ctx); err != nil { + return err + } + if err := msstw.initSST(); err != nil { + return err + } + } + if msstw.keyRanges[msstw.currRange].Start.Key.Compare(key.Key) > 0 { + return crdberrors.AssertionFailedf("client error: expected %s to fall in one of %s", key.Key, msstw.keyRanges) + } + if err := msstw.currSST.Put(key, value); err != nil { + return errors.Wrap(err, "failed to put in sst") + } + if msstw.currSST.DataSize-msstw.truncatedSize > msstw.sstChunkSize { + msstw.truncatedSize = msstw.currSST.DataSize + chunk, err := msstw.currSST.Truncate() + if err != nil { + return errors.Wrap(err, "failed to truncate sst") + } + // NOTE: Chunk may be empty due to the semantics of Truncate(), but Write() + // handles an empty chunk as a noop. + if err := msstw.currSSTFile.Write(ctx, chunk); err != nil { + return errors.Wrap(err, "failed to write to sst file") + } + } + return nil +} + +func (msstw *multiSSTWriter) Finish(ctx context.Context) error { + if msstw.currRange < len(msstw.keyRanges) { + for { + if err := msstw.finalizeSST(ctx); err != nil { + return err + } + if msstw.currRange >= len(msstw.keyRanges) { + break + } + if err := msstw.initSST(); err != nil { + return err + } + } + } + return nil +} + +func (msstw *multiSSTWriter) Close() error { + msstw.currSST.Close() + return msstw.currSSTFile.Close() } // Receive implements the snapshotStrategy interface. +// +// NOTE: This function assumes that the key-value pairs are sent in sorted +// order. The key-value pairs are sent in the following sorted order: +// +// 1. Replicated range-id local key range +// 2. Range-local key range +// 3. User key range func (kvSS *kvBatchSnapshotStrategy) Receive( ctx context.Context, stream incomingSnapshotStream, header SnapshotRequest_Header, ) (IncomingSnapshot, error) { assertStrategy(ctx, header, SnapshotRequest_KV_BATCH) - var batches [][]byte + // At the moment we'll write at most three SSTs. + // TODO(jeffreyxiao): Re-evaluate as the default range size grows. + keyRanges := rditer.MakeReplicatedKeyRanges(header.State.Desc) + msstw, err := newMultiSSTWriter(kvSS.ssss, keyRanges, kvSS.sstChunkSize) + if err != nil { + return noSnap, err + } + defer func() { + // Nothing actionable if closing multiSSTWriter. Closing the same SST and + // SST file multiple times is idempotent. + if err := msstw.Close(); err != nil { + log.Warningf(ctx, "failed to close multiSSTWriter: %v", err) + } + }() var logEntries [][]byte + for { req, err := stream.Recv() if err != nil { - return IncomingSnapshot{}, err + return noSnap, err } if req.Header != nil { err := errors.New("client error: provided a header mid-stream") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(stream, err) } if req.KVBatch != nil { - batches = append(batches, req.KVBatch) + batchReader, err := engine.NewRocksDBBatchReader(req.KVBatch) + if err != nil { + return noSnap, errors.Wrap(err, "failed to decode batch") + } + // All operations in the batch are guaranteed to be puts. + for batchReader.Next() { + if batchReader.BatchType() != engine.BatchTypeValue { + return noSnap, crdberrors.AssertionFailedf("expected type %d, found type %d", engine.BatchTypeValue, batchReader.BatchType()) + } + key, err := batchReader.MVCCKey() + if err != nil { + return noSnap, errors.Wrap(err, "failed to decode mvcc key") + } + if err := msstw.Put(ctx, key, batchReader.Value()); err != nil { + return noSnap, err + } + } } if req.LogEntries != nil { logEntries = append(logEntries, req.LogEntries...) } if req.Final { + // We finished receiving all batches and log entries. It's possible that + // we did not receive any key-value pairs for some of the key ranges, but + // we must still construct SSTs with range deletion tombstones to remove + // the data. + if err := msstw.Finish(ctx); err != nil { + return noSnap, err + } + + if err := msstw.Close(); err != nil { + return noSnap, err + } + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { - err = errors.Wrap(err, "invalid snapshot") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + err = errors.Wrap(err, "client error: invalid snapshot") + return noSnap, sendSnapshotError(stream, err) } inSnap := IncomingSnapshot{ UsesUnreplicatedTruncatedState: header.UnreplicatedTruncatedState, SnapUUID: snapUUID, - Batches: batches, + SSSS: kvSS.ssss, LogEntries: logEntries, State: &header.State, snapType: header.Type, @@ -153,7 +335,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( inSnap.snapType = SnapshotRequest_PREEMPTIVE } - kvSS.status = fmt.Sprintf("kv batches: %d, log entries: %d", len(batches), len(logEntries)) + kvSS.status = fmt.Sprintf("log entries: %d, ssts: %d", len(logEntries), len(kvSS.ssss.SSTs())) return inSnap, nil } } @@ -190,10 +372,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } if int64(b.Len()) >= kvSS.batchSize { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } b = nil @@ -204,10 +383,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } } if b != nil { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } } @@ -330,8 +506,11 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } func (kvSS *kvBatchSnapshotStrategy) sendBatch( - stream outgoingSnapshotStream, batch engine.Batch, + ctx context.Context, stream outgoingSnapshotStream, batch engine.Batch, ) error { + if err := kvSS.limiter.WaitN(ctx, 1); err != nil { + return err + } repr := batch.Repr() batch.Close() return stream.Send(&SnapshotRequest{KVBatch: repr}) @@ -340,6 +519,18 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( // Status implements the snapshotStrategy interface. func (kvSS *kvBatchSnapshotStrategy) Status() string { return kvSS.status } +// Close implements the snapshotStrategy interface. +func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { + if kvSS.ssss != nil { + // A failure to clean up the storage is benign except that it will leak + // disk space (which is reclaimed on node restart). It is unexpected + // though, so log a warning. + if err := kvSS.ssss.Clear(); err != nil { + log.Warningf(ctx, "error closing kvBatchSnapshotStrategy: %v", err) + } + } +} + // reserveSnapshot throttles incoming snapshots. The returned closure is used // to cleanup the reservation and release its resources. A nil cleanup function // and a non-empty rejectionMessage indicates the reservation was declined. @@ -631,9 +822,18 @@ func (s *Store) receiveSnapshot( var ss snapshotStrategy switch header.Strategy { case SnapshotRequest_KV_BATCH: + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) + if err != nil { + err = errors.Wrap(err, "invalid snapshot") + return sendSnapshotError(stream, err) + } + ss = &kvBatchSnapshotStrategy{ - raftCfg: &s.cfg.RaftConfig, + raftCfg: &s.cfg.RaftConfig, + ssss: s.sss.NewSSTSnapshotStorageScratch(header.State.Desc.RangeID, snapUUID), + sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.cfg.Settings.SV), } + defer ss.Close(ctx) default: return sendSnapshotError(stream, errors.Errorf("%s,r%d: unknown snapshot strategy: %s", @@ -697,6 +897,15 @@ var recoverySnapshotRate = settings.RegisterByteSizeSetting( envutil.EnvOrDefaultBytes("COCKROACH_RAFT_SNAPSHOT_RATE", 8<<20), ) +// snapshotSSTWriteSyncRate is the size of chunks to write before fsync-ing. +// The default of 2 MiB was chosen to be in line with the behavior in bulk-io. +// See sstWriteSyncRate. +var snapshotSSTWriteSyncRate = settings.RegisterByteSizeSetting( + "kv.snapshot_sst.sync_size", + "threshold after which snapshot SST writes must fsync", + 2<<20, /* 2 MiB */ +) + func snapshotRateLimit( st *cluster.Settings, priority SnapshotRequest_Priority, ) (rate.Limit, error) { diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 30cf76e21458..be1e7a656063 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -195,6 +195,9 @@ type StoreTestingKnobs struct { // This ensures the `*Replica` will be materialized on the Store when it // returns. ReplicaAddStopAfterLearnerSnapshot func() bool + // BeforeSnapshotSSTIngestion is run just before the SSTs are ingested when + // applying a snapshot. + BeforeSnapshotSSTIngestion func(IncomingSnapshot, SnapshotRequest_Type, []string) error // MaxApplicationBatchSize enforces a maximum size on application batches. // This can be useful for testing conditions which require commands to be