From c9f88f17a681816fc5c1006513a7a2827faba852 Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Fri, 12 Jul 2019 12:16:55 -0400 Subject: [PATCH 1/4] storage: implement writer interface for RocksDBSstFileWriter Rename Add to Put and Delete to Clear. Additionally implement ClearRange using DBSstFileWriterDeleteRange and ClearRangeIter using a Clear on all iterated keys on the Go side. Release note: None --- c-deps/libroach/db.cc | 8 ++ c-deps/libroach/include/libroach.h | 7 ++ pkg/ccl/importccl/load.go | 2 +- pkg/ccl/importccl/sst_writer_proc.go | 23 ++--- pkg/ccl/storageccl/bench_test.go | 2 +- pkg/ccl/storageccl/export_test.go | 2 +- pkg/ccl/storageccl/import_test.go | 3 +- pkg/storage/batcheval/cmd_add_sstable_test.go | 7 +- pkg/storage/bulk/sst_writer_test.go | 2 +- pkg/storage/engine/rocksdb.go | 95 ++++++++++++++++--- pkg/storage/engine/rocksdb_test.go | 27 ++---- pkg/storage/engine/sst_iterator_test.go | 2 +- pkg/storage/helpers_test.go | 2 +- 13 files changed, 125 insertions(+), 57 deletions(-) diff --git a/c-deps/libroach/db.cc b/c-deps/libroach/db.cc index ee30b419cd7c..b4562cfccb62 100644 --- a/c-deps/libroach/db.cc +++ b/c-deps/libroach/db.cc @@ -960,6 +960,14 @@ DBStatus DBSstFileWriterDelete(DBSstFileWriter* fw, DBKey key) { return kSuccess; } +DBStatus DBSstFileWriterDeleteRange(DBSstFileWriter *fw, DBKey start, DBKey end) { + rocksdb::Status status = fw->rep.DeleteRange(EncodeKey(start), EncodeKey(end)); + if (!status.ok()) { + return ToDBStatus(status); + } + return kSuccess; +} + DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) { rocksdb::Status status = fw->rep.Finish(); if (!status.ok()) { diff --git a/c-deps/libroach/include/libroach.h b/c-deps/libroach/include/libroach.h index c366a3acdf48..fe655bc8ef19 100644 --- a/c-deps/libroach/include/libroach.h +++ b/c-deps/libroach/include/libroach.h @@ -472,6 +472,13 @@ DBStatus DBSstFileWriterAdd(DBSstFileWriter* fw, DBKey key, DBSlice val); // Adds a deletion tombstone to the sstable being built. See DBSstFileWriterAdd for more. DBStatus DBSstFileWriterDelete(DBSstFileWriter* fw, DBKey key); +// Adds a range deletion tombstone to the sstable being built. This function +// can be called at any time with respect to DBSstFileWriter{Put,Merge,Delete} +// (I.E. does not have to be greater than any previously added entry). Range +// deletion tombstones do not take precedence over other Puts in the same SST. +// `Open` must have been called. `Close` cannot have been called. +DBStatus DBSstFileWriterDeleteRange(DBSstFileWriter* fw, DBKey start, DBKey end); + // Finalizes the writer and stores the constructed file's contents in *data. At // least one kv entry must have been added. May only be called once. DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data); diff --git a/pkg/ccl/importccl/load.go b/pkg/ccl/importccl/load.go index c03717fef5f5..6ab6019c5a46 100644 --- a/pkg/ccl/importccl/load.go +++ b/pkg/ccl/importccl/load.go @@ -352,7 +352,7 @@ func writeSST( defer sst.Close() for _, kv := range kvs { kv.Key.Timestamp = ts - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { return err } } diff --git a/pkg/ccl/importccl/sst_writer_proc.go b/pkg/ccl/importccl/sst_writer_proc.go index 225b0a31d1b7..ce27d98563a1 100644 --- a/pkg/ccl/importccl/sst_writer_proc.go +++ b/pkg/ccl/importccl/sst_writer_proc.go @@ -386,8 +386,9 @@ func makeSSTs( return nil } - var kv engine.MVCCKeyValue - kv.Key.Timestamp.WallTime = walltime + var key engine.MVCCKey + var value []byte + key.Timestamp.WallTime = walltime // firstKey is always the first key of the span. lastKey, if nil, means the // current SST hasn't yet filled up. Once the SST has filled up, lastKey is // set to the key at which to stop adding KVs. We have to do this because @@ -422,11 +423,11 @@ func makeSSTs( writtenKVs++ - kv.Key.Key = it.Key() - kv.Value = it.UnsafeValue() + key.Key = it.Key() + value = it.UnsafeValue() if lastKey != nil { - if kv.Key.Key.Compare(lastKey) >= 0 { + if key.Key.Compare(lastKey) >= 0 { if err := writeSST(firstKey, lastKey, true); err != nil { return err } @@ -441,17 +442,17 @@ func makeSSTs( } } - if err := sst.Add(kv); err != nil { - return errors.Wrapf(err, errSSTCreationMaybeDuplicateTemplate, kv.Key.Key) + if err := sst.Put(key, value); err != nil { + return errors.Wrapf(err, errSSTCreationMaybeDuplicateTemplate, key.Key) } - if err := counts.Count(kv.Key.Key); err != nil { + if err := counts.Count(key.Key); err != nil { return errors.Wrapf(err, "failed to count key") } if sst.DataSize > sstMaxSize && lastKey == nil { // When we would like to split the file, proceed until we aren't in the // middle of a row. Start by finding the next safe split key. - lastKey, err = keys.EnsureSafeSplitKey(kv.Key.Key) + lastKey, err = keys.EnsureSafeSplitKey(key.Key) if err != nil { return err } @@ -463,9 +464,9 @@ func makeSSTs( // Although we don't need to avoid row splitting here because there aren't any // more keys to read, we do still want to produce the same kind of lastKey // argument for the span as in the case above. lastKey <= the most recent - // sst.Add call, but since we call PrefixEnd below, it will be guaranteed + // sst.Put call, but since we call PrefixEnd below, it will be guaranteed // to be > the most recent added key. - lastKey, err = keys.EnsureSafeSplitKey(kv.Key.Key) + lastKey, err = keys.EnsureSafeSplitKey(key.Key) if err != nil { return err } diff --git a/pkg/ccl/storageccl/bench_test.go b/pkg/ccl/storageccl/bench_test.go index faf631c9ecf2..0d0bdf28455e 100644 --- a/pkg/ccl/storageccl/bench_test.go +++ b/pkg/ccl/storageccl/bench_test.go @@ -65,7 +65,7 @@ func BenchmarkAddSSTable(b *testing.B) { b.Fatalf("%+v", err) } for _, kv := range kvs { - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { b.Fatalf("%+v", err) } } diff --git a/pkg/ccl/storageccl/export_test.go b/pkg/ccl/storageccl/export_test.go index a86950d6a5c2..b080c5b777fa 100644 --- a/pkg/ccl/storageccl/export_test.go +++ b/pkg/ccl/storageccl/export_test.go @@ -248,7 +248,7 @@ func exportUsingGoIterator( continue } - if err := sst.Add(engine.MVCCKeyValue{Key: iter.UnsafeKey(), Value: iter.UnsafeValue()}); err != nil { + if err := sst.Put(iter.UnsafeKey(), iter.UnsafeValue()); err != nil { return nil, err } } diff --git a/pkg/ccl/storageccl/import_test.go b/pkg/ccl/storageccl/import_test.go index 19f324267714..48114f128834 100644 --- a/pkg/ccl/storageccl/import_test.go +++ b/pkg/ccl/storageccl/import_test.go @@ -180,8 +180,7 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) { key := keys[idx] value.ClearChecksum() value.InitChecksum(key) - kv := engine.MVCCKeyValue{Key: engine.MVCCKey{Key: key, Timestamp: ts}, Value: value.RawBytes} - if err := sst.Add(kv); err != nil { + if err := sst.Put(engine.MVCCKey{Key: key, Timestamp: ts}, value.RawBytes); err != nil { t.Fatalf("%+v", err) } } diff --git a/pkg/storage/batcheval/cmd_add_sstable_test.go b/pkg/storage/batcheval/cmd_add_sstable_test.go index 3391d167a5cc..58ea5ad13253 100644 --- a/pkg/storage/batcheval/cmd_add_sstable_test.go +++ b/pkg/storage/batcheval/cmd_add_sstable_test.go @@ -41,8 +41,7 @@ func singleKVSSTable(key engine.MVCCKey, value []byte) ([]byte, error) { return nil, err } defer sst.Close() - kv := engine.MVCCKeyValue{Key: key, Value: value} - if err := sst.Add(kv); err != nil { + if err := sst.Put(key, value); err != nil { return nil, err } return sst.Finish() @@ -332,7 +331,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { } defer sst.Close() for _, kv := range kvs { - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { t.Fatalf("%+v", err) } } @@ -436,7 +435,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) { } defer sst.Close() for _, kv := range sstKVs { - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { t.Fatalf("%+v", err) } } diff --git a/pkg/storage/bulk/sst_writer_test.go b/pkg/storage/bulk/sst_writer_test.go index 2516d7de656f..e006de2d371f 100644 --- a/pkg/storage/bulk/sst_writer_test.go +++ b/pkg/storage/bulk/sst_writer_test.go @@ -56,7 +56,7 @@ func makeRocksSST(t testing.TB, kvs []engine.MVCCKeyValue) []byte { defer w.Close() for i := range kvs { - if err := w.Add(kvs[i]); err != nil { + if err := w.Put(kvs[i].Key, kvs[i].Value); err != nil { t.Fatal(err) } } diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index 3b8aa521903f..9a14ff7db8b4 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -2923,13 +2923,15 @@ func CheckForKeyCollisions(existingIter Iterator, sstIter Iterator) error { } // RocksDBSstFileWriter creates a file suitable for importing with -// RocksDBSstFileReader. +// RocksDBSstFileReader. It implements the Writer interface. type RocksDBSstFileWriter struct { fw *C.DBSstFileWriter // DataSize tracks the total key and value bytes added so far. DataSize int64 } +var _ Writer = &RocksDBSstFileWriter{} + // MakeRocksDBSstFileWriter creates a new RocksDBSstFileWriter with the default // configuration. func MakeRocksDBSstFileWriter() (RocksDBSstFileWriter, error) { @@ -2938,28 +2940,91 @@ func MakeRocksDBSstFileWriter() (RocksDBSstFileWriter, error) { return RocksDBSstFileWriter{fw: fw}, err } -// Add puts a kv entry into the sstable being built. An error is returned if it -// is not greater than any previously added entry (according to the comparator -// configured during writer creation). `Close` cannot have been called. -func (fw *RocksDBSstFileWriter) Add(kv MVCCKeyValue) error { +// ApplyBatchRepr implements the Writer interface. +func (fw *RocksDBSstFileWriter) ApplyBatchRepr(repr []byte, sync bool) error { + panic("unimplemented") +} + +// Clear implements the Writer interface. Note that it inserts a tombstone +// rather than actually remove the entry from the storage engine. An error is +// returned if it is not greater than any previous key used in Put or Clear +// (according to the comparator configured during writer creation). Close +// cannot have been called. +func (fw *RocksDBSstFileWriter) Clear(key MVCCKey) error { if fw.fw == nil { - return errors.New("cannot call Open on a closed writer") + return errors.New("cannot call Clear on a closed writer") + } + fw.DataSize += int64(len(key.Key)) + return statusToError(C.DBSstFileWriterDelete(fw.fw, goToCKey(key))) +} + +// SingleClear implements the Writer interface. +func (fw *RocksDBSstFileWriter) SingleClear(key MVCCKey) error { + panic("unimplemented") +} + +// ClearRange implements the Writer interface. Note that it inserts a range deletion +// tombstone rather than actually remove the entries from the storage engine. +// It can be called at any time with respect to Put and Clear. +func (fw *RocksDBSstFileWriter) ClearRange(start, end MVCCKey) error { + if fw.fw == nil { + return errors.New("cannot call ClearRange on a closed writer") + } + fw.DataSize += int64(len(start.Key)) + int64(len(end.Key)) + return statusToError(C.DBSstFileWriterDeleteRange(fw.fw, goToCKey(start), goToCKey(end))) +} + +// ClearIterRange implements the Writer interface. +// +// NOTE: This method is fairly expensive as it performs a Cgo call for every +// key deleted. +func (fw *RocksDBSstFileWriter) ClearIterRange(iter Iterator, start, end MVCCKey) error { + if fw.fw == nil { + return errors.New("cannot call ClearIterRange on a closed writer") + } + iter.Seek(start) + for { + valid, err := iter.Valid() + if err != nil { + return err + } + if !valid || !iter.Key().Less(end) { + break + } + if err := fw.Clear(iter.Key()); err != nil { + return err + } + iter.Next() } - fw.DataSize += int64(len(kv.Key.Key)) + int64(len(kv.Value)) - return statusToError(C.DBSstFileWriterAdd(fw.fw, goToCKey(kv.Key), goToCSlice(kv.Value))) + return nil } -// Delete puts a deletion tombstone into the sstable being built. See -// the Add method for more. -func (fw *RocksDBSstFileWriter) Delete(k MVCCKey) error { +// Merge implements the Writer interface. +func (fw *RocksDBSstFileWriter) Merge(key MVCCKey, value []byte) error { + panic("unimplemented") +} + +// Put implements the Writer interface. It puts a kv entry into the sstable +// being built. An error is returned if it is not greater than any previous key +// used in Put or Clear (according to the comparator configured during writer +// creation). Close cannot have been called. +func (fw *RocksDBSstFileWriter) Put(key MVCCKey, value []byte) error { if fw.fw == nil { - return errors.New("cannot call Delete on a closed writer") + return errors.New("cannot call Put on a closed writer") } - fw.DataSize += int64(len(k.Key)) - return statusToError(C.DBSstFileWriterDelete(fw.fw, goToCKey(k))) + fw.DataSize += int64(len(key.Key)) + int64(len(value)) + return statusToError(C.DBSstFileWriterAdd(fw.fw, goToCKey(key), goToCSlice(value))) +} + +// LogData implements the Writer interface. +func (fw *RocksDBSstFileWriter) LogData(data []byte) error { + panic("unimplemented") } -var _ = (*RocksDBSstFileWriter).Delete +// LogLogicalOp implements the Writer interface. +func (fw *RocksDBSstFileWriter) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails) { + // No-op. Logical logging disabled. +} // Finish finalizes the writer and returns the constructed file's contents. At // least one kv entry must have been added. diff --git a/pkg/storage/engine/rocksdb_test.go b/pkg/storage/engine/rocksdb_test.go index 432814fd2b51..31d207d295c1 100644 --- a/pkg/storage/engine/rocksdb_test.go +++ b/pkg/storage/engine/rocksdb_test.go @@ -757,7 +757,7 @@ func BenchmarkRocksDBSstFileWriter(b *testing.B) { kv.Key.Key = []byte(fmt.Sprintf("%09d", i)) copy(kv.Value, kv.Key.Key) b.StartTimer() - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { b.Fatal(err) } } @@ -800,7 +800,7 @@ func BenchmarkRocksDBSstFileReader(b *testing.B) { for i := 0; i < entries; i++ { kv.Key.Key = []byte(fmt.Sprintf("%09d", i)) copy(kv.Value, kv.Key.Key) - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { b.Fatal(err) } } @@ -1336,13 +1336,7 @@ func TestRocksDBDeleteRangeCompaction(t *testing.T) { defer sst.Close() for i := 0; i < numEntries; i++ { - kv := MVCCKeyValue{ - Key: MVCCKey{ - Key: makeKey(string(p), i), - }, - Value: randutil.RandBytes(rnd, valueSize), - } - if err := sst.Add(kv); err != nil { + if err := sst.Put(MVCCKey{Key: makeKey(string(p), i)}, randutil.RandBytes(rnd, valueSize)); err != nil { t.Fatal(err) } } @@ -1464,12 +1458,7 @@ func BenchmarkRocksDBDeleteRangeIterate(b *testing.B) { defer sst.Close() for i := 0; i < entries; i++ { - kv := MVCCKeyValue{ - Key: MVCCKey{ - Key: makeKey(i), - }, - } - if err := sst.Add(kv); err != nil { + if err := sst.Put(MVCCKey{Key: makeKey(i)}, nil); err != nil { b.Fatal(err) } } @@ -1588,10 +1577,10 @@ func TestSstFileWriterTimeBound(t *testing.T) { t.Fatal(sst) } defer sst.Close() - if err := sst.Add(MVCCKeyValue{ - Key: MVCCKey{Key: []byte("key"), Timestamp: hlc.Timestamp{WallTime: walltime}}, - Value: []byte("value"), - }); err != nil { + if err := sst.Put( + MVCCKey{Key: []byte("key"), Timestamp: hlc.Timestamp{WallTime: walltime}}, + []byte("value"), + ); err != nil { t.Fatal(err) } sstContents, err := sst.Finish() diff --git a/pkg/storage/engine/sst_iterator_test.go b/pkg/storage/engine/sst_iterator_test.go index 4070d94173d9..45989392a46b 100644 --- a/pkg/storage/engine/sst_iterator_test.go +++ b/pkg/storage/engine/sst_iterator_test.go @@ -86,7 +86,7 @@ func TestSSTIterator(t *testing.T) { }, Value: []byte{'a' + byte(i)}, } - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { t.Fatalf("%+v", err) } allKVs = append(allKVs, kv) diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index c91831dbf348..dfded040766b 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -391,7 +391,7 @@ func MakeSSTable(key, value string, ts hlc.Timestamp) ([]byte, engine.MVCCKeyVal Value: v.RawBytes, } - if err := sst.Add(kv); err != nil { + if err := sst.Put(kv.Key, kv.Value); err != nil { panic(errors.Wrap(err, "while finishing SSTable")) } b, err := sst.Finish() From a24b09896627edd72931d0b56394f76ab851b010 Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Fri, 12 Jul 2019 13:45:00 -0400 Subject: [PATCH 2/4] storage: add Truncate method to RocksDBSstFileWriter This method truncates the SSTfile being written and returns the data that was truncated. It can be called multiple times when writing an SST file and can be used to chunk an SST file into pieces. Since SSTs are built in an append-only manner, the concatenated chunks is equivalent to an SST built without using Truncate and using Finish. Release note: None --- c-deps/libroach/db.cc | 29 +++++++--- c-deps/libroach/include/libroach.h | 6 +++ pkg/storage/engine/rocksdb.go | 15 ++++++ pkg/storage/engine/rocksdb_test.go | 85 ++++++++++++++++++++++++++++++ 4 files changed, 128 insertions(+), 7 deletions(-) diff --git a/c-deps/libroach/db.cc b/c-deps/libroach/db.cc index b4562cfccb62..88c4425e2a51 100644 --- a/c-deps/libroach/db.cc +++ b/c-deps/libroach/db.cc @@ -968,17 +968,15 @@ DBStatus DBSstFileWriterDeleteRange(DBSstFileWriter *fw, DBKey start, DBKey end) return kSuccess; } -DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) { - rocksdb::Status status = fw->rep.Finish(); - if (!status.ok()) { - return ToDBStatus(status); - } - +DBStatus DBSstFileWriterCopyData(DBSstFileWriter* fw, DBString* data) { uint64_t file_size; - status = fw->memenv->GetFileSize("sst", &file_size); + rocksdb::Status status = fw->memenv->GetFileSize("sst", &file_size); if (!status.ok()) { return ToDBStatus(status); } + if (file_size == 0) { + return kSuccess; + } const rocksdb::EnvOptions soptions; std::unique_ptr sst; @@ -1016,6 +1014,23 @@ DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) { return kSuccess; } +DBStatus DBSstFileWriterTruncate(DBSstFileWriter* fw, DBString* data) { + DBStatus status = DBSstFileWriterCopyData(fw, data); + if (status.data != NULL) { + return status; + } + return ToDBStatus(fw->memenv->Truncate("sst", 0)); +} + +DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data) { + rocksdb::Status status = fw->rep.Finish(); + if (!status.ok()) { + return ToDBStatus(status); + } + + return DBSstFileWriterCopyData(fw, data); +} + void DBSstFileWriterClose(DBSstFileWriter* fw) { delete fw; } DBStatus DBLockFile(DBSlice filename, DBFileLock* lock) { diff --git a/c-deps/libroach/include/libroach.h b/c-deps/libroach/include/libroach.h index fe655bc8ef19..2550588ed6b1 100644 --- a/c-deps/libroach/include/libroach.h +++ b/c-deps/libroach/include/libroach.h @@ -479,6 +479,12 @@ DBStatus DBSstFileWriterDelete(DBSstFileWriter* fw, DBKey key); // `Open` must have been called. `Close` cannot have been called. DBStatus DBSstFileWriterDeleteRange(DBSstFileWriter* fw, DBKey start, DBKey end); +// Truncates the writer and stores the constructed file's contents in *data. +// May be called multiple times. The returned data won't necessarily reflect +// the latest writes, only the keys whose underlying RocksDB blocks have been +// flushed. Close cannot have been called. +DBStatus DBSstFileWriterTruncate(DBSstFileWriter *fw, DBString* data); + // Finalizes the writer and stores the constructed file's contents in *data. At // least one kv entry must have been added. May only be called once. DBStatus DBSstFileWriterFinish(DBSstFileWriter* fw, DBString* data); diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index 9a14ff7db8b4..d2cd066d2e9e 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -3026,6 +3026,21 @@ func (fw *RocksDBSstFileWriter) LogLogicalOp(op MVCCLogicalOpType, details MVCCL // No-op. Logical logging disabled. } +// Truncate truncates the writer's current memory buffer and returns the +// contents it contained. May be called multiple times. The function may not +// truncate and return all keys if the underlying RocksDB blocks have not been +// flushed. Close cannot have been called. +func (fw *RocksDBSstFileWriter) Truncate() ([]byte, error) { + if fw.fw == nil { + return nil, errors.New("cannot call Truncate on a closed writer") + } + var contents C.DBString + if err := statusToError(C.DBSstFileWriterTruncate(fw.fw, &contents)); err != nil { + return nil, err + } + return cStringToGoBytes(contents), nil +} + // Finish finalizes the writer and returns the constructed file's contents. At // least one kv entry must have been added. func (fw *RocksDBSstFileWriter) Finish() ([]byte, error) { diff --git a/pkg/storage/engine/rocksdb_test.go b/pkg/storage/engine/rocksdb_test.go index 31d207d295c1..9b8d122f9957 100644 --- a/pkg/storage/engine/rocksdb_test.go +++ b/pkg/storage/engine/rocksdb_test.go @@ -715,6 +715,91 @@ func TestConcurrentBatch(t *testing.T) { } } +// TestRocksDBSstFileWriterTruncate ensures that sum of the chunks created by +// calling Truncate on a RocksDBSstFileWriter is equivalent to an SST built +// without ever calling Truncate. +func TestRocksDBSstFileWriterTruncate(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Truncate will be used on this writer. + sst1, err := MakeRocksDBSstFileWriter() + if err != nil { + t.Fatal(err) + } + defer sst1.Close() + + // Truncate will not be used on this writer. + sst2, err := MakeRocksDBSstFileWriter() + if err != nil { + t.Fatal(err) + } + defer sst2.Close() + + const keyLen = 10 + const valLen = 950 + ts := hlc.Timestamp{WallTime: 1} + key := MVCCKey{Key: roachpb.Key(make([]byte, keyLen)), Timestamp: ts} + value := make([]byte, valLen) + + var resBuf1, resBuf2 []byte + const entries = 100000 + const truncateChunk = entries / 10 + for i := 0; i < entries; i++ { + key.Key = []byte(fmt.Sprintf("%09d", i)) + copy(value, key.Key) + + if err := sst1.Put(key, value); err != nil { + t.Fatal(err) + } + if err := sst2.Put(key, value); err != nil { + t.Fatal(err) + } + + if i > 0 && i%truncateChunk == 0 { + sst1Chunk, err := sst1.Truncate() + if err != nil { + t.Fatal(err) + } + t.Logf("iteration %d, truncate chunk\tlen=%d", i, len(sst1Chunk)) + + // Even though we added keys, it is not guaranteed strictly by the + // contract of Truncate that a byte slice will be returned. This is + // because the keys may be in un-flushed blocks. This test had been tuned + // such that every other batch chunk is always large enough to require at + // least one block to be flushed. + empty := len(sst1Chunk) == 0 + if i%(2*truncateChunk) == 0 { + if empty { + t.Fatalf("expected non-empty SST chunk during iteration %d", i) + } + resBuf1 = append(resBuf1, sst1Chunk...) + } else { + if !empty { + t.Fatalf("expected empty SST chunk during iteration %d", i) + } + } + } + } + + sst1FinishBuf, err := sst1.Finish() + if err != nil { + t.Fatal(err) + } + resBuf1 = append(resBuf1, sst1FinishBuf...) + t.Logf("truncated sst final chunk\t\tlen=%d", len(sst1FinishBuf)) + + resBuf2, err = sst2.Finish() + if err != nil { + t.Fatal(err) + } + t.Logf("non-truncated sst final chunk\tlen=%d", len(resBuf2)) + + if !bytes.Equal(resBuf1, resBuf2) { + t.Errorf("expected SST made up of truncate chunks (len=%d) to be equivalent to SST that "+ + "was not (len=%d)", len(sst1FinishBuf), len(resBuf2)) + } +} + func BenchmarkRocksDBSstFileWriter(b *testing.B) { dir, err := ioutil.TempDir("", "BenchmarkRocksDBSstFileWriter") if err != nil { From 56c7a565bff7517ec1b6af24d46daa29d32249d2 Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Fri, 2 Aug 2019 22:53:20 -0400 Subject: [PATCH 3/4] storage: add SSTSnapshotStorage SSTSnapshotStorage is associated with a store and can be used to create SSTSnapshotStorageScratches. Each SSTSnapshotStorageScratch is associated with a snapshot and keeps track of the SSTs incrementally created when receiving a snapshot. Release note: None --- pkg/storage/replica_sst_snapshot_storage.go | 198 ++++++++++++++++++ .../replica_sst_snapshot_storage_test.go | 98 +++++++++ 2 files changed, 296 insertions(+) create mode 100644 pkg/storage/replica_sst_snapshot_storage.go create mode 100644 pkg/storage/replica_sst_snapshot_storage_test.go diff --git a/pkg/storage/replica_sst_snapshot_storage.go b/pkg/storage/replica_sst_snapshot_storage.go new file mode 100644 index 000000000000..eecf6bd41d64 --- /dev/null +++ b/pkg/storage/replica_sst_snapshot_storage.go @@ -0,0 +1,198 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/pkg/errors" + "golang.org/x/time/rate" +) + +// SSTSnapshotStorage provides an interface to create scratches and owns the +// directory of scratches created. A scratch manages the SSTs created during a +// specific snapshot. +type SSTSnapshotStorage struct { + engine engine.Engine + limiter *rate.Limiter + dir string +} + +// NewSSTSnapshotStorage creates a new SST snapshot storage. +func NewSSTSnapshotStorage(engine engine.Engine, limiter *rate.Limiter) SSTSnapshotStorage { + return SSTSnapshotStorage{ + engine: engine, + limiter: limiter, + dir: filepath.Join(engine.GetAuxiliaryDir(), "sstsnapshot"), + } +} + +// NewSSTSnapshotStorageScratch creates a new SST snapshot storage scratch for +// a specific snapshot. +func (sss *SSTSnapshotStorage) NewSSTSnapshotStorageScratch( + rangeID roachpb.RangeID, snapUUID uuid.UUID, +) *SSTSnapshotStorageScratch { + snapDir := filepath.Join(sss.dir, strconv.Itoa(int(rangeID)), snapUUID.String()) + ssss := &SSTSnapshotStorageScratch{ + sss: sss, + snapDir: snapDir, + } + return ssss +} + +// Clear removes all created directories and SSTs. +func (sss *SSTSnapshotStorage) Clear() error { + return os.RemoveAll(sss.dir) +} + +// SSTSnapshotStorageScratch keeps track of the SST files incrementally created +// when receiving a snapshot. Each scratch is associated with a specific +// snapshot. +type SSTSnapshotStorageScratch struct { + sss *SSTSnapshotStorage + ssts []string + snapDir string + dirCreated bool +} + +func (ssss *SSTSnapshotStorageScratch) filename(id int) string { + return filepath.Join(ssss.snapDir, fmt.Sprintf("%d.sst", id)) +} + +func (ssss *SSTSnapshotStorageScratch) createDir() error { + err := os.MkdirAll(ssss.snapDir, 0755) + ssss.dirCreated = ssss.dirCreated || err == nil + return err +} + +// NewFile adds another file to SSTSnapshotStorageScratch. This file is lazily +// created when the file is written to the first time. +func (ssss *SSTSnapshotStorageScratch) NewFile() (*SSTSnapshotStorageFile, error) { + id := len(ssss.ssts) + filename := ssss.filename(id) + ssss.ssts = append(ssss.ssts, filename) + sssf := &SSTSnapshotStorageFile{ + ssss: ssss, + filename: filename, + } + return sssf, nil +} + +// WriteSST writes an entire RocksDBSstFileWriter to a file. The method closes +// the provided SST when it is finished using it. If the provided SST is empty, +// then no file will be created and nothing will be written. +func (ssss *SSTSnapshotStorageScratch) WriteSST( + ctx context.Context, sst *engine.RocksDBSstFileWriter, +) error { + defer sst.Close() + if sst.DataSize == 0 { + return nil + } + data, err := sst.Finish() + if err != nil { + return err + } + sssf, err := ssss.NewFile() + if err != nil { + return err + } + defer func() { + // Closing an SSTSnapshotStorageFile multiple times is idempotent. Nothing + // actionable if closing fails. + _ = sssf.Close() + }() + if err := sssf.Write(ctx, data); err != nil { + return err + } + return sssf.Close() +} + +// SSTs returns the names of the files created. +func (ssss *SSTSnapshotStorageScratch) SSTs() []string { + return ssss.ssts +} + +// Clear removes the directory and SSTs created for a particular snapshot. +func (ssss *SSTSnapshotStorageScratch) Clear() error { + return os.RemoveAll(ssss.snapDir) +} + +// SSTSnapshotStorageFile is an SST file managed by a +// SSTSnapshotStorageScratch. +type SSTSnapshotStorageFile struct { + ssss *SSTSnapshotStorageScratch + created bool + file engine.DBFile + filename string +} + +func (sssf *SSTSnapshotStorageFile) openFile() error { + if sssf.created { + if sssf.file == nil { + return errors.Errorf("file has already been closed") + } + return nil + } + if !sssf.ssss.dirCreated { + if err := sssf.ssss.createDir(); err != nil { + return err + } + } + file, err := sssf.ssss.sss.engine.OpenFile(sssf.filename) + if err != nil { + return err + } + sssf.file = file + sssf.created = true + return nil +} + +// Write writes contents to the file while respecting the limiter passed into +// SSTSnapshotStorageScratch. Writing empty contents is okay and is treated as +// a noop. The file must have not been closed. +func (sssf *SSTSnapshotStorageFile) Write(ctx context.Context, contents []byte) error { + if len(contents) == 0 { + return nil + } + if err := sssf.openFile(); err != nil { + return err + } + limitBulkIOWrite(ctx, sssf.ssss.sss.limiter, len(contents)) + if err := sssf.file.Append(contents); err != nil { + return err + } + return sssf.file.Sync() +} + +// Close closes the file. Calling this function multiple times is idempotent. +// The file must have been written to before being closed. +func (sssf *SSTSnapshotStorageFile) Close() error { + // We throw an error for empty files because it would be an error to ingest + // an empty SST so catch this error earlier. + if !sssf.created { + return errors.New("file is empty") + } + if sssf.file == nil { + return nil + } + if err := sssf.file.Close(); err != nil { + return err + } + sssf.file = nil + return nil +} diff --git a/pkg/storage/replica_sst_snapshot_storage_test.go b/pkg/storage/replica_sst_snapshot_storage_test.go new file mode 100644 index 000000000000..1aacdcb14259 --- /dev/null +++ b/pkg/storage/replica_sst_snapshot_storage_test.go @@ -0,0 +1,98 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package storage + +import ( + "context" + "io/ioutil" + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func TestSSTSnapshotStorage(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.TODO() + testRangeID := roachpb.RangeID(1) + testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890"))) + testLimiter := rate.NewLimiter(rate.Inf, 0) + + cleanup, cache, eng := newRocksDB(t) + defer cleanup() + defer cache.Release() + defer eng.Close() + + sss := NewSSTSnapshotStorage(eng, testLimiter) + ssss := sss.NewSSTSnapshotStorageScratch(testRangeID, testSnapUUID) + + // Check that the storage lazily creates the directories on first write. + _, err := os.Stat(ssss.snapDir) + if !os.IsNotExist(err) { + t.Fatalf("expected %s to not exist", ssss.snapDir) + } + + sssf, err := ssss.NewFile() + require.NoError(t, err) + defer func() { + require.NoError(t, sssf.Close()) + }() + + // Check that even though the files aren't created, they are still recorded in SSTs(). + require.Equal(t, len(ssss.SSTs()), 1) + + // Check that the storage lazily creates the files on write. + for _, fileName := range ssss.SSTs() { + _, err := os.Stat(fileName) + if !os.IsNotExist(err) { + t.Fatalf("expected %s to not exist", fileName) + } + } + + require.NoError(t, sssf.Write(ctx, []byte("foo"))) + + // After writing to files, check that they have been flushed to disk. + for _, fileName := range ssss.SSTs() { + require.FileExists(t, fileName) + data, err := ioutil.ReadFile(fileName) + require.NoError(t, err) + require.Equal(t, data, []byte("foo")) + } + + // Check that closing is idempotent. + require.NoError(t, sssf.Close()) + require.NoError(t, sssf.Close()) + + // Check that writing to a closed file is an error. + require.EqualError(t, sssf.Write(ctx, []byte("foo")), "file has already been closed") + + // Check that closing an empty file is an error. + sssf, err = ssss.NewFile() + require.NoError(t, err) + require.EqualError(t, sssf.Close(), "file is empty") + require.NoError(t, sssf.Write(ctx, []byte("foo"))) + + // Check that Clear removes the directory. + require.NoError(t, ssss.Clear()) + _, err = os.Stat(ssss.snapDir) + if !os.IsNotExist(err) { + t.Fatalf("expected %s to not exist", ssss.snapDir) + } + require.NoError(t, sss.Clear()) + _, err = os.Stat(sss.dir) + if !os.IsNotExist(err) { + t.Fatalf("expected %s to not exist", sss.dir) + } +} From b320ff5c5a3338e2207496c88228f781e4b711b1 Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Thu, 8 Aug 2019 21:16:37 -0400 Subject: [PATCH 4/4] storage: build SSTs from KV_BATCH snapshot Incrementally build SSTs from the batches sent in a KV_BATCH snapshot. This logic is only on the receiver side for ease of testing and compatibility. The complications of subsumed replicas that are not fully contained by the current replica are also handled. The following is an example of this case happening. a b c d |---1---|-------2-------| S1 |---1-------------------| S2 |---1-----------|---3---| S3 Since the merge is the first operation to happen, a follower could be down before it completes. It is reasonable for r1-snapshot from S3 to subsume both r1 and r2 in S1. Note that it's impossible for a replica to subsume anything to its left. The maximum number of SSTs created using the strategy is 4 + SR + 2 where SR is the number of subsumed replicas. - Three SSTs get created when the snapshot is being received (range local keys, replicated range-id local keys, and user keys). - One SST is constructed for the unreplicated range-id local keys when the snapshot is being applied. - One SST is constructed for every subsumed replica to clear the range-id local keys. These SSTs consist of one range deletion tombstone and one RaftTombstoneKey. - A maximum of two SSTs for all subsumed replicas are constructed to account the case of not fully contained subsumed replicas. We need to delete the key space of the subsumed replicas that we did not delete in the previous SSTs. We need one for the range-local keys and one for the user keys. These SSTs consist of normal tombstones, one range deletion tombstone, or they could be empty. This commit also introduced a cluster setting "kv.snapshot_sst.sync_size" which defines the maximum SST chunk size before fsync-ing. Fsync-ing is necessary to prevent the OS from accumulating such a large buffer that it blocks unrelated small/fast writes for a long time when it flushes. Release note (performance improvement): Snapshots sent between replicas are now applied more performantly and use less memory. --- docs/generated/settings/settings.html | 1 + pkg/storage/client_merge_test.go | 110 ++++ pkg/storage/client_raft_test.go | 2 + pkg/storage/engine/engine.go | 50 ++ pkg/storage/engine/mvcc.go | 20 + pkg/storage/rditer/replica_data_iter.go | 76 ++- .../replica_application_state_machine.go | 5 +- pkg/storage/replica_command.go | 30 +- pkg/storage/replica_destroy.go | 31 +- pkg/storage/replica_raftstorage.go | 483 +++++++++++------- pkg/storage/stateloader/stateloader.go | 28 +- pkg/storage/store.go | 11 + pkg/storage/store_snapshot.go | 251 ++++++++- pkg/storage/testing_knobs.go | 3 + 14 files changed, 835 insertions(+), 266 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 5cc133424d0a..7018b07a4586 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -51,6 +51,7 @@ kv.rangefeed.enabledbooleanfalseif set, rangefeed registration is enabled kv.snapshot_rebalance.max_ratebyte size8.0 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots kv.snapshot_recovery.max_ratebyte size8.0 MiBthe rate limit (bytes/sec) to use for recovery snapshots +kv.snapshot_sst.sync_sizebyte size2.0 MiBthreshold after which snapshot SST writes must fsync kv.transaction.max_intents_bytesinteger262144maximum number of bytes used to track write intents in transactions kv.transaction.max_refresh_spans_bytesinteger256000maximum number of bytes used to track refresh spans in serializable transactions kv.transaction.parallel_commits_enabledbooleantrueif enabled, transactional commits will be parallelized with transactional writes diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index dff83d9866a0..98ddd75e011e 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -18,6 +18,7 @@ import ( "math/rand" "reflect" "regexp" + "strconv" "strings" "sync" "sync/atomic" @@ -37,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" @@ -3046,10 +3048,105 @@ func (h *unreliableRaftHandler) HandleRaftResponse( func TestStoreRangeMergeRaftSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() + // We will be testing the SSTs written on store2's engine. + var eng engine.Engine ctx := context.Background() storeCfg := storage.TestStoreConfig(nil) storeCfg.TestingKnobs.DisableReplicateQueue = true storeCfg.TestingKnobs.DisableReplicaGCQueue = true + storeCfg.TestingKnobs.BeforeSnapshotSSTIngestion = func( + inSnap storage.IncomingSnapshot, + snapType storage.SnapshotRequest_Type, + sstNames []string, + ) error { + // Only verify snapshots of type RAFT and on the range under exercise + // (range 2). Note that the keys of range 2 aren't verified in this + // functions. Unreplicated range-id local keys are not verified because + // there are too many keys and the other replicated keys are verified later + // on in the test. This function verifies that the subsumed replicas have + // been handled properly. + if snapType != storage.SnapshotRequest_RAFT || inSnap.State.Desc.RangeID != roachpb.RangeID(2) { + return nil + } + // The seven SSTs we are expecting to ingest are in the following order: + // 1. Replicated range-id local keys of the range in the snapshot. + // 2. Range-local keys of the range in the snapshot. + // 3. User keys of the range in the snapshot. + // 4. Unreplicated range-id local keys of the range in the snapshot. + // 5. SST to clear range-id local keys of the subsumed replica with + // RangeID 3. + // 6. SST to clear range-id local keys of the subsumed replica with + // RangeID 4. + // 7. SST to clear the user keys of the subsumed replicas. + // + // NOTE: There are no range-local keys in [d, /Max) in the store we're + // sending a snapshot to, so we aren't expecting an SST to clear those + // keys. + if len(sstNames) != 7 { + return errors.Errorf("expected to ingest 7 SSTs, got %d SSTs", len(sstNames)) + } + + // Only verify the SSTs of the subsumed replicas (the last three SSTs) by + // constructing the expected SST and ensuring that they are byte-by-byte + // equal. This verification ensures that the SSTs have the same tombstones + // and range deletion tombstones. + var expectedSSTs [][]byte + sstNames = sstNames[4:] + + // Range-id local range of subsumed replicas. + for _, rangeID := range []roachpb.RangeID{roachpb.RangeID(3), roachpb.RangeID(4)} { + sst, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + defer sst.Close() + r := rditer.MakeRangeIDLocalKeyRange(rangeID, false) + if err := sst.ClearRange(r.Start, r.End); err != nil { + return err + } + tombstoneKey := keys.RaftTombstoneKey(rangeID) + tombstoneValue := &roachpb.RaftTombstone{NextReplicaID: math.MaxInt32} + if err := engine.MVCCBlindPutProto(context.TODO(), &sst, nil, tombstoneKey, hlc.Timestamp{}, tombstoneValue, nil); err != nil { + return err + } + expectedSST, err := sst.Finish() + if err != nil { + return err + } + expectedSSTs = append(expectedSSTs, expectedSST) + } + + // User key range of subsumed replicas. + sst, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + defer sst.Close() + desc := roachpb.RangeDescriptor{ + StartKey: roachpb.RKey("d"), + EndKey: roachpb.RKeyMax, + } + r := rditer.MakeUserKeyRange(&desc) + if err := engine.ClearRangeWithHeuristic(eng, &sst, r.Start, r.End); err != nil { + return err + } + expectedSST, err := sst.Finish() + if err != nil { + return err + } + expectedSSTs = append(expectedSSTs, expectedSST) + + for i := range sstNames { + actualSST, err := eng.ReadFile(sstNames[i]) + if err != nil { + return err + } + if !bytes.Equal(actualSST, expectedSSTs[i]) { + return errors.Errorf("contents of %s were unexpected", sstNames[i]) + } + } + return nil + } mtc := &multiTestContext{ storeConfig: &storeCfg, // This test was written before the multiTestContext started creating many @@ -3060,6 +3157,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { mtc.Start(t, 3) defer mtc.Stop() store0, store2 := mtc.Store(0), mtc.Store(2) + eng = store2.Engine() distSender := mtc.distSenders[0] // Create three fully-caught-up, adjacent ranges on all three stores. @@ -3074,6 +3172,18 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { mtc.waitForValues(key, []int64{1, 1, 1}) } + // Put some keys in [d, /Max) so the subsumed replica of [c, /Max) with range + // ID 4 has tombstones. We will clear uncontained key range of subsumed + // replicas, so when we are receiving a snapshot for [a, d), we expect to + // clear the keys in [d, /Max). + for i := 0; i < 10; i++ { + key := roachpb.Key("d" + strconv.Itoa(i)) + if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil { + t.Fatal(pErr) + } + mtc.waitForValues(key, []int64{1, 1, 1}) + } + aRepl0 := store0.LookupReplica(roachpb.RKey("a")) // Start dropping all Raft traffic to the first range on store1. diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index e8a7b42816d8..15f487ca7fbb 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -45,6 +45,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1072,6 +1073,7 @@ func TestFailedSnapshotFillsReservation(t *testing.T) { RangeSize: 100, State: storagepb.ReplicaState{Desc: rep.Desc()}, } + header.RaftMessageRequest.Message.Snapshot.Data = uuid.UUID{}.GetBytes() // Cause this stream to return an error as soon as we ask it for something. // This injects an error into HandleSnapshotStream when we try to send the // "snapshot accepted" message. diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index 0ed91a63c105..a8addb28cea1 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -504,3 +504,53 @@ func WriteSyncNoop(ctx context.Context, eng Engine) error { } return nil } + +// ClearRangeWithHeuristic clears the keys from start (inclusive) to end +// (exclusive). Depending on the number of keys, it will either use ClearRange +// or ClearRangeIter. +func ClearRangeWithHeuristic(eng Reader, writer Writer, start, end MVCCKey) error { + iter := eng.NewIterator(IterOptions{UpperBound: end.Key}) + defer iter.Close() + + // It is expensive for there to be many range deletion tombstones in the same + // sstable because all of the tombstones in an sstable are loaded whenever the + // sstable is accessed. So we avoid using range deletion unless there is some + // minimum number of keys. The value here was pulled out of thin air. It might + // be better to make this dependent on the size of the data being deleted. Or + // perhaps we should fix RocksDB to handle large numbers of tombstones in an + // sstable better. + const clearRangeMinKeys = 64 + // Peek into the range to see whether it's large enough to justify + // ClearRange. Note that the work done here is bounded by + // clearRangeMinKeys, so it will be fairly cheap even for large + // ranges. + // + // TODO(bdarnell): Move this into ClearIterRange so we don't have + // to do this scan twice. + count := 0 + iter.Seek(start) + for { + valid, err := iter.Valid() + if err != nil { + return err + } + if !valid || !iter.Key().Less(end) { + break + } + count++ + if count > clearRangeMinKeys { + break + } + iter.Next() + } + var err error + if count > clearRangeMinKeys { + err = writer.ClearRange(start, end) + } else { + err = writer.ClearIterRange(iter, start, end) + } + if err != nil { + return err + } + return nil +} diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index 937375f81298..8d401ad0d053 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -621,6 +621,26 @@ func MVCCPutProto( return MVCCPut(ctx, engine, ms, key, timestamp, value, txn) } +// MVCCBlindPutProto sets the given key to the protobuf-serialized byte string +// of msg and the provided timestamp. See MVCCBlindPut for a discussion on this +// fast-path and when it is appropriate to use. +func MVCCBlindPutProto( + ctx context.Context, + engine Writer, + ms *enginepb.MVCCStats, + key roachpb.Key, + timestamp hlc.Timestamp, + msg protoutil.Message, + txn *roachpb.Transaction, +) error { + value := roachpb.Value{} + if err := value.SetProto(msg); err != nil { + return err + } + value.InitChecksum(key) + return MVCCBlindPut(ctx, engine, ms, key, timestamp, value, txn) +} + type getBuffer struct { meta enginepb.MVCCMetadata value roachpb.Value diff --git a/pkg/storage/rditer/replica_data_iter.go b/pkg/storage/rditer/replica_data_iter.go index f8a998250bc6..f63a56e61090 100644 --- a/pkg/storage/rditer/replica_data_iter.go +++ b/pkg/storage/rditer/replica_data_iter.go @@ -43,21 +43,60 @@ type ReplicaDataIterator struct { // MakeAllKeyRanges returns all key ranges for the given Range. func MakeAllKeyRanges(d *roachpb.RangeDescriptor) []KeyRange { - return makeReplicaKeyRanges(d, keys.MakeRangeIDPrefix) + return []KeyRange{ + MakeRangeIDLocalKeyRange(d.RangeID, false /* replicatedOnly */), + MakeRangeLocalKeyRange(d), + MakeUserKeyRange(d), + } } -// MakeReplicatedKeyRanges returns all key ranges that are fully Raft replicated -// for the given Range. +// MakeReplicatedKeyRanges returns all key ranges that are fully Raft +// replicated for the given Range. +// +// NOTE: The logic for receiving snapshot relies on this function returning the +// ranges in the following sorted order: +// +// 1. Replicated range-id local key range +// 2. Range-local key range +// 3. User key range func MakeReplicatedKeyRanges(d *roachpb.RangeDescriptor) []KeyRange { - return makeReplicaKeyRanges(d, keys.MakeRangeIDReplicatedPrefix) + return []KeyRange{ + MakeRangeIDLocalKeyRange(d.RangeID, true /* replicatedOnly */), + MakeRangeLocalKeyRange(d), + MakeUserKeyRange(d), + } } -// makeReplicaKeyRanges returns a slice of 3 key ranges. The last key range in -// the returned slice corresponds to the actual range data (i.e. not the range -// metadata). -func makeReplicaKeyRanges( - d *roachpb.RangeDescriptor, metaFunc func(roachpb.RangeID) roachpb.Key, -) []KeyRange { +// MakeRangeIDLocalKeyRange returns the range-id local key range. If +// replicatedOnly is true, then it returns only the replicated keys, otherwise, +// it only returns both the replicated and unreplicated keys. +func MakeRangeIDLocalKeyRange(rangeID roachpb.RangeID, replicatedOnly bool) KeyRange { + var prefixFn func(roachpb.RangeID) roachpb.Key + if replicatedOnly { + prefixFn = keys.MakeRangeIDReplicatedPrefix + } else { + prefixFn = keys.MakeRangeIDPrefix + } + sysRangeIDKey := prefixFn(rangeID) + return KeyRange{ + Start: engine.MakeMVCCMetadataKey(sysRangeIDKey), + End: engine.MakeMVCCMetadataKey(sysRangeIDKey.PrefixEnd()), + } +} + +// MakeRangeLocalKeyRange returns the range local key range. Range-local keys +// are replicated keys that do not belong to the range they would naturally +// sort into. For example, /Local/Range/Table/1 would sort into [/Min, +// /System), but it actually belongs to [/Table/1, /Table/2). +func MakeRangeLocalKeyRange(d *roachpb.RangeDescriptor) KeyRange { + return KeyRange{ + Start: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.StartKey)), + End: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.EndKey)), + } +} + +// MakeUserKeyRange returns the user key range. +func MakeUserKeyRange(d *roachpb.RangeDescriptor) KeyRange { // The first range in the keyspace starts at KeyMin, which includes the // node-local space. We need the original StartKey to find the range // metadata, but the actual data starts at LocalMax. @@ -65,20 +104,9 @@ func makeReplicaKeyRanges( if d.StartKey.Equal(roachpb.RKeyMin) { dataStartKey = keys.LocalMax } - sysRangeIDKey := metaFunc(d.RangeID) - return []KeyRange{ - { - Start: engine.MakeMVCCMetadataKey(sysRangeIDKey), - End: engine.MakeMVCCMetadataKey(sysRangeIDKey.PrefixEnd()), - }, - { - Start: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.StartKey)), - End: engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(d.EndKey)), - }, - { - Start: engine.MakeMVCCMetadataKey(dataStartKey), - End: engine.MakeMVCCMetadataKey(d.EndKey.AsRawKey()), - }, + return KeyRange{ + Start: engine.MakeMVCCMetadataKey(dataStartKey), + End: engine.MakeMVCCMetadataKey(d.EndKey.AsRawKey()), } } diff --git a/pkg/storage/replica_application_state_machine.go b/pkg/storage/replica_application_state_machine.go index fc644d74320a..75829de30021 100644 --- a/pkg/storage/replica_application_state_machine.go +++ b/pkg/storage/replica_application_state_machine.go @@ -550,9 +550,10 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat if err != nil { return wrapWithNonDeterministicFailure(err, "unable to get replica for merge") } - const destroyData = false + const rangeIDLocalOnly = true + const mustClearRange = false if err := rhsRepl.preDestroyRaftMuLocked( - ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, destroyData, + ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, rangeIDLocalOnly, mustClearRange, ); err != nil { return wrapWithNonDeterministicFailure(err, "unable to destroy range before merge") } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index f1668813a64c..fb4ec63eb805 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -1313,16 +1313,30 @@ func execChangeReplicasTxn( // waiting for a second and final response from the recipient which indicates if // the snapshot was a success. // +// `receiveSnapshot` takes the key-value pairs sent and creates three SSTs from +// them for direct ingestion: one for the replicated range-ID local keys, one +// for the range local keys, and one for the user keys. The reason it creates +// three separate SSTs is to prevent overlaps with the memtable and existing +// SSTs in RocksDB. Each of the SSTs also has a range deletion tombstone to +// delete the existing data in the range. +// // Applying the snapshot: After the recipient has received the message // indicating it has all the data, it hands it all to -// `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks the -// same things as `shouldAcceptSnapshotData` to make sure nothing has changed -// while the snapshot was being transferred. It then guarantees that there is -// either an initialized[3] replica or a `ReplicaPlaceholder`[4] to accept the -// snapshot by creating a placeholder if necessary. Finally, a *Raft snapshot* -// message is manually handed to the replica's Raft node (by calling -// `stepRaftGroup` + `handleRaftReadyRaftMuLocked`), at which point the snapshot -// has been applied. +// `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks +// the same things as `shouldAcceptSnapshotData` to make sure nothing has +// changed while the snapshot was being transferred. It then guarantees that +// there is either an initialized[3] replica or a `ReplicaPlaceholder`[4] to +// accept the snapshot by creating a placeholder if necessary. Finally, a *Raft +// snapshot* message is manually handed to the replica's Raft node (by calling +// `stepRaftGroup` + `handleRaftReadyRaftMuLocked`). During the application +// process, several other SSTs may be created for direct ingestion. An SST for +// the unreplicated range-ID local keys is created for the Raft entries, hard +// state, and truncated state. An SST is created for deleting each subsumed +// replica's range-ID local keys and at most two SSTs are created for deleting +// the user keys and range local keys of all subsumed replicas. All in all, a +// maximum of 6 + SR SSTs will be created for direct ingestion where SR is the +// number of subsumed replicas. In the case where there are no subsumed +// replicas, 4 SSTs will be created. // // [1]: There is a third kind of snapshot, called "preemptive", which is how we // avoided the above fragility before learner replicas were introduced in the diff --git a/pkg/storage/replica_destroy.go b/pkg/storage/replica_destroy.go index 5dac458b4311..b2b5b253275d 100644 --- a/pkg/storage/replica_destroy.go +++ b/pkg/storage/replica_destroy.go @@ -65,12 +65,13 @@ func (s destroyStatus) Removed() bool { func (r *Replica) preDestroyRaftMuLocked( ctx context.Context, reader engine.Reader, - batch engine.Batch, + writer engine.Writer, nextReplicaID roachpb.ReplicaID, - destroyData bool, + rangeIDLocalOnly bool, + mustClearRange bool, ) error { desc := r.Desc() - err := clearRangeData(ctx, desc, reader, batch, destroyData) + err := clearRangeData(desc, reader, writer, rangeIDLocalOnly, mustClearRange) if err != nil { return err } @@ -80,7 +81,7 @@ func (r *Replica) preDestroyRaftMuLocked( // NB: Legacy tombstones (which are in the replicated key space) are wiped // in clearRangeData, but that's OK since we're writing a new one in the same // batch (and in particular, sequenced *after* the wipe). - return r.setTombstoneKey(ctx, batch, nextReplicaID) + return r.setTombstoneKey(ctx, writer, nextReplicaID) } func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCStats) error { @@ -109,22 +110,27 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS if r.raftMu.sideloaded != nil { return r.raftMu.sideloaded.Clear(ctx) } + return nil } // destroyRaftMuLocked deletes data associated with a replica, leaving a -// tombstone. If `destroyData` is true, data in all of the range's keyspaces -// will be deleted. Otherwise, only data in the range-ID local keyspace will be -// deleted. Requires that Replica.raftMu is held. +// tombstone. func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb.ReplicaID) error { startTime := timeutil.Now() ms := r.GetMVCCStats() - const destroyData = true batch := r.Engine().NewWriteOnlyBatch() defer batch.Close() - if err := r.preDestroyRaftMuLocked(ctx, r.Engine(), batch, nextReplicaID, destroyData); err != nil { + if err := r.preDestroyRaftMuLocked( + ctx, + r.Engine(), + batch, + nextReplicaID, + false, /* rangeIDLocalOnly */ + false, /* mustClearRange */ + ); err != nil { return err } preTime := timeutil.Now() @@ -172,7 +178,7 @@ func (r *Replica) cancelPendingCommandsLocked() { // ID that it hasn't yet received a RangeDescriptor for if it receives raft // requests for that replica ID (as seen in #14231). func (r *Replica) setTombstoneKey( - ctx context.Context, eng engine.ReadWriter, externalNextReplicaID roachpb.ReplicaID, + ctx context.Context, eng engine.Writer, externalNextReplicaID roachpb.ReplicaID, ) error { r.mu.Lock() nextReplicaID := r.mu.state.Desc.NextReplicaID @@ -188,6 +194,7 @@ func (r *Replica) setTombstoneKey( tombstone := &roachpb.RaftTombstone{ NextReplicaID: nextReplicaID, } - return engine.MVCCPutProto(ctx, eng, nil, tombstoneKey, - hlc.Timestamp{}, nil, tombstone) + // "Blind" because ms == nil and timestamp == hlc.Timestamp{}. + return engine.MVCCBlindPutProto(ctx, eng, nil, tombstoneKey, + hlc.Timestamp{}, tombstone, nil) } diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 443a73cc4dd3..cded380c95a2 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -482,8 +482,8 @@ func (s *OutgoingSnapshot) Close() { // IncomingSnapshot contains the data for an incoming streaming snapshot message. type IncomingSnapshot struct { SnapUUID uuid.UUID - // The RocksDB BatchReprs that make up this snapshot. - Batches [][]byte + // The storage interface for the underlying SSTs. + SSSS *SSTSnapshotStorageScratch // The Raft log entries for this snapshot. LogEntries [][]byte // The replica state at the time the snapshot was generated (never nil). @@ -593,9 +593,14 @@ func snapshot( // append is intentionally oblivious to the existence of sideloaded proposals. // They are managed by the caller, including cleaning up obsolete on-disk // payloads in case the log tail is replaced. +// +// NOTE: This method takes a engine.Writer because reads are unnecessary when +// prevLastIndex is 0 and prevLastTerm is invalidLastTerm. In the case where +// reading is necessary (I.E. entries are getting overwritten or deleted), a +// engine.ReadWriter must be passed in. func (r *Replica) append( ctx context.Context, - batch engine.ReadWriter, + eng engine.Writer, prevLastIndex uint64, prevLastTerm uint64, prevRaftLogSize int64, @@ -616,30 +621,43 @@ func (r *Replica) append( value.InitChecksum(key) var err error if ent.Index > prevLastIndex { - err = engine.MVCCBlindPut(ctx, batch, &diff, key, hlc.Timestamp{}, value, nil /* txn */) + err = engine.MVCCBlindPut(ctx, eng, &diff, key, hlc.Timestamp{}, value, nil /* txn */) } else { - err = engine.MVCCPut(ctx, batch, &diff, key, hlc.Timestamp{}, value, nil /* txn */) + // We type assert eng to also be an engine.Reader only in the case where + // we're replacing existing entries. + eng, ok := eng.(engine.ReadWriter) + if !ok { + return 0, 0, 0, errors.Errorf("expected eng to be a engine.ReadWriter when overwriting log entries") + } + err = engine.MVCCPut(ctx, eng, &diff, key, hlc.Timestamp{}, value, nil /* txn */) } if err != nil { return 0, 0, 0, err } } - // Delete any previously appended log entries which never committed. lastIndex := entries[len(entries)-1].Index lastTerm := entries[len(entries)-1].Term - for i := lastIndex + 1; i <= prevLastIndex; i++ { - // Note that the caller is in charge of deleting any sideloaded payloads - // (which they must only do *after* the batch has committed). - err := engine.MVCCDelete(ctx, batch, &diff, r.raftMu.stateLoader.RaftLogKey(i), - hlc.Timestamp{}, nil /* txn */) - if err != nil { - return 0, 0, 0, err + // Delete any previously appended log entries which never committed. + if prevLastIndex > 0 { + // We type assert eng to also be an engine.Reader only in the case where + // we're deleting existing entries. + eng, ok := eng.(engine.ReadWriter) + if !ok { + return 0, 0, 0, errors.Errorf("expected eng to be a engine.ReadWriter when deleting log entries") + } + for i := lastIndex + 1; i <= prevLastIndex; i++ { + // Note that the caller is in charge of deleting any sideloaded payloads + // (which they must only do *after* the batch has committed). + err := engine.MVCCDelete(ctx, eng, &diff, r.raftMu.stateLoader.RaftLogKey(i), + hlc.Timestamp{}, nil /* txn */) + if err != nil { + return 0, 0, 0, err + } } } raftLogSize := prevRaftLogSize + diff.SysBytes - return lastIndex, lastTerm, raftLogSize, nil } @@ -673,62 +691,38 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error { return nil } +// clearRangeData clears the data associated with a range descriptor. If +// rangeIDLocalOnly is true, then only the range-id local keys are deleted. +// Otherwise, the range-id local keys, range local keys, and user keys are all +// deleted. If mustClearRange is true, ClearRange will always be used to remove +// the keys. Otherwise, ClearRangeWithHeuristic will be used, which chooses +// ClearRange or ClearIterRange depending on how many keys there are in the +// range. func clearRangeData( - ctx context.Context, desc *roachpb.RangeDescriptor, eng engine.Reader, - batch engine.Batch, - destroyData bool, + writer engine.Writer, + rangeIDLocalOnly bool, + mustClearRange bool, ) error { - iter := eng.NewIterator(engine.IterOptions{UpperBound: desc.EndKey.AsRawKey()}) - defer iter.Close() - - // It is expensive for there to be many range deletion tombstones in the same - // sstable because all of the tombstones in an sstable are loaded whenever the - // sstable is accessed. So we avoid using range deletion unless there is some - // minimum number of keys. The value here was pulled out of thin air. It might - // be better to make this dependent on the size of the data being deleted. Or - // perhaps we should fix RocksDB to handle large numbers of tombstones in an - // sstable better. - const clearRangeMinKeys = 64 - keyRanges := rditer.MakeAllKeyRanges(desc) - if !destroyData { - // TODO(benesch): The fact that we hardcode the number of - // "metadata" ranges (i.e. non-user-keyspace) suggests that - // rditer.MakeAllKeyRanges has the wrong API. - keyRanges = keyRanges[:1] + var keyRanges []rditer.KeyRange + if rangeIDLocalOnly { + keyRanges = []rditer.KeyRange{rditer.MakeRangeIDLocalKeyRange(desc.RangeID, false)} + } else { + keyRanges = rditer.MakeAllKeyRanges(desc) } - for _, keyRange := range keyRanges { - // Peek into the range to see whether it's large enough to justify - // ClearRange. Note that the work done here is bounded by - // clearRangeMinKeys, so it will be fairly cheap even for large - // ranges. - // - // TODO(bdarnell): Move this into ClearIterRange so we don't have - // to do this scan twice. - count := 0 - iter.Seek(keyRange.Start) - for { - valid, err := iter.Valid() - if err != nil { - return err - } - if !valid || !iter.Key().Less(keyRange.End) { - break - } - count++ - if count > clearRangeMinKeys { - break - } - iter.Next() - } - var err error - if count > clearRangeMinKeys { - err = batch.ClearRange(keyRange.Start, keyRange.End) - } else { - err = batch.ClearIterRange(iter, keyRange.Start, keyRange.End) + + var clearRangeFn func(engine.Reader, engine.Writer, engine.MVCCKey, engine.MVCCKey) error + if mustClearRange { + clearRangeFn = func(eng engine.Reader, writer engine.Writer, start, end engine.MVCCKey) error { + return writer.ClearRange(start, end) } - if err != nil { + } else { + clearRangeFn = engine.ClearRangeWithHeuristic + } + + for _, keyRange := range keyRanges { + if err := clearRangeFn(eng, writer, keyRange.Start, keyRange.End); err != nil { return err } } @@ -806,172 +800,144 @@ func (r *Replica) applySnapshot( } var stats struct { - clear time.Time - batch time.Time - entries time.Time - commit time.Time - } - - var size int - for _, b := range inSnap.Batches { - size += len(b) - } - for _, e := range inSnap.LogEntries { - size += len(e) - } - - log.Infof(ctx, "applying %s snapshot at index %d "+ - "(id=%s, encoded size=%d, %d rocksdb batches, %d log entries)", - snapType, snap.Metadata.Index, inSnap.SnapUUID.Short(), - size, len(inSnap.Batches), len(inSnap.LogEntries)) + // Time to clear unreplicated range-ID local keys and update unreplicated + // state. + unreplicatedState time.Time + // Time to process subsumed replicas. + subsumedReplicas time.Time + // Time to ingest SSTs. + ingestion time.Time + } + log.Infof(ctx, "applying %s snapshot [id=%s index=%d]", + snapType, inSnap.SnapUUID.Short(), snap.Metadata.Index) defer func(start time.Time) { now := timeutil.Now() - log.Infof(ctx, "applied %s snapshot in %0.0fms [clear=%0.0fms batch=%0.0fms entries=%0.0fms commit=%0.0fms]", - snapType, now.Sub(start).Seconds()*1000, - stats.clear.Sub(start).Seconds()*1000, - stats.batch.Sub(stats.clear).Seconds()*1000, - stats.entries.Sub(stats.batch).Seconds()*1000, - stats.commit.Sub(stats.entries).Seconds()*1000) - }(timeutil.Now()) - - // Use a more efficient write-only batch because we don't need to do any - // reads from the batch. - batch := r.store.Engine().NewWriteOnlyBatch() - defer batch.Close() - - // If we're subsuming a replica below, we don't have its last NextReplicaID, - // nor can we obtain it. That's OK: we can just be conservative and use the - // maximum possible replica ID. preDestroyRaftMuLocked will write a replica - // tombstone using this maximum possible replica ID, which would normally be - // problematic, as it would prevent this store from ever having a new replica - // of the removed range. In this case, however, it's copacetic, as subsumed - // ranges _can't_ have new replicas. - const subsumedNextReplicaID = math.MaxInt32 - - // As part of applying the snapshot, we may need to subsume replicas that have - // been merged into this range. Destroy their data in the same batch in which - // we apply the snapshot. - for _, sr := range subsumedRepls { - if err := sr.preDestroyRaftMuLocked( - ctx, r.store.Engine(), batch, subsumedNextReplicaID, true, /* destroyData */ - ); err != nil { - return err + totalLog := fmt.Sprintf( + "total=%0.0fms ", + now.Sub(start).Seconds()*1000, + ) + unreplicatedStateLog := fmt.Sprintf( + "unreplicatedState=%0.0fms ", + stats.unreplicatedState.Sub(start).Seconds()*1000, + ) + var subsumedReplicasLog string + if len(subsumedRepls) > 0 { + subsumedReplicasLog = fmt.Sprintf( + "subsumedReplicas=%d@%0.0fms ", + len(subsumedRepls), + stats.subsumedReplicas.Sub(stats.unreplicatedState).Seconds()*1000, + ) } - } + ingestionLog := fmt.Sprintf( + "ingestion=%d@%0.0fms ", + len(inSnap.SSSS.SSTs()), + stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000, + ) + log.Infof(ctx, "applied %s snapshot [%s%s%s%sid=%s index=%d]", + snapType, totalLog, unreplicatedStateLog, subsumedReplicasLog, + ingestionLog, inSnap.SnapUUID.Short(), snap.Metadata.Index) + }(timeutil.Now()) - // Delete everything in the range and recreate it from the snapshot. - // We need to delete any old Raft log entries here because any log entries - // that predate the snapshot will be orphaned and never truncated or GC'd. - if err := clearRangeData(ctx, s.Desc, r.store.Engine(), batch, true /* destroyData */); err != nil { + unreplicatedSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { return err } - // Clear the cached raft log entries to ensure that old or uncommitted - // entries don't impact the in-memory state. - r.store.raftEntryCache.Drop(r.RangeID) - stats.clear = timeutil.Now() + defer unreplicatedSST.Close() - // Write the snapshot into the range. - for _, batchRepr := range inSnap.Batches { - if err := batch.ApplyBatchRepr(batchRepr, false); err != nil { - return err - } + // Clearing the unreplicated state. + unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID) + unreplicatedStart := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey) + unreplicatedEnd := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey.PrefixEnd()) + if err = unreplicatedSST.ClearRange(unreplicatedStart, unreplicatedEnd); err != nil { + return errors.Wrapf(err, "error clearing range of unreplicated SST writer") } - // The log entries are all written to distinct keys so we can use a - // distinct batch. - distinctBatch := batch.Distinct() - stats.batch = timeutil.Now() + // Update HardState. + if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { + return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + } + // Update TruncatedState if it is unreplicated. if inSnap.UsesUnreplicatedTruncatedState { - // We're using the unreplicated truncated state, which we need to - // manually persist to disk. If we're not taking this branch, the - // snapshot contains a legacy TruncatedState and we don't need to do - // anything (in fact, must not -- the invariant is that exactly one of - // them exists at any given point in the state machine). - if err := stateloader.Make(s.Desc.RangeID).SetRaftTruncatedState( - ctx, distinctBatch, s.TruncatedState, + if err := r.raftMu.stateLoader.SetRaftTruncatedState( + ctx, &unreplicatedSST, s.TruncatedState, ); err != nil { - return err + return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer") } } - logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) - for i, bytes := range inSnap.LogEntries { - if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { - return err - } - } - // If this replica doesn't know its ReplicaID yet, we're applying a - // preemptive snapshot. In this case, we're going to have to write the - // sideloaded proposals into the Raft log. Otherwise, sideload. + // Update Raft entries. + var lastTerm uint64 var raftLogSize int64 - thinEntries := logEntries - if replicaID != 0 { + if len(inSnap.LogEntries) > 0 { + logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) + for i, bytes := range inSnap.LogEntries { + if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { + return err + } + } + // If this replica doesn't know its ReplicaID yet, we're applying a + // preemptive snapshot. In this case, we're going to have to write the + // sideloaded proposals into the Raft log. Otherwise, sideload. + if replicaID != 0 { + var err error + var sideloadedEntriesSize int64 + logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries) + if err != nil { + return err + } + raftLogSize += sideloadedEntriesSize + } var err error - var sideloadedEntriesSize int64 - thinEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries) + _, lastTerm, raftLogSize, err = r.append(ctx, &unreplicatedSST, 0, invalidLastTerm, raftLogSize, logEntries) if err != nil { return err } - raftLogSize += sideloadedEntriesSize + } else { + lastTerm = invalidLastTerm } + r.store.raftEntryCache.Drop(r.RangeID) - // Write the snapshot's Raft log into the range. - var lastTerm uint64 - _, lastTerm, raftLogSize, err = r.append( - ctx, distinctBatch, 0, invalidLastTerm, raftLogSize, thinEntries, - ) - if err != nil { + stats.unreplicatedState = timeutil.Now() + if err := inSnap.SSSS.WriteSST(ctx, &unreplicatedSST); err != nil { return err } - stats.entries = timeutil.Now() - - // Note that since this snapshot comes from Raft, we don't have to synthesize - // the HardState -- Raft wouldn't ask us to update the HardState in incorrect - // ways. - if err := r.raftMu.stateLoader.SetHardState(ctx, distinctBatch, hs); err != nil { - return errors.Wrapf(err, "unable to persist HardState %+v", &hs) - } - - // We need to close the distinct batch and start using the normal batch for - // the read below. - distinctBatch.Close() - // As outlined above, last and applied index are the same after applying - // the snapshot (i.e. the snapshot has no uncommitted tail). if s.RaftAppliedIndex != snap.Metadata.Index { log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d", s.RaftAppliedIndex, snap.Metadata.Index) } - // We've written Raft log entries, so we need to sync the WAL. - if err := batch.Commit(!disableSyncRaftLog.Get(&r.store.cfg.Settings.SV)); err != nil { + // If we're subsuming a replica below, we don't have its last NextReplicaID, + // nor can we obtain it. That's OK: we can just be conservative and use the + // maximum possible replica ID. preDestroyRaftMuLocked will write a replica + // tombstone using this maximum possible replica ID, which would normally be + // problematic, as it would prevent this store from ever having a new replica + // of the removed range. In this case, however, it's copacetic, as subsumed + // ranges _can't_ have new replicas. + const subsumedNextReplicaID = math.MaxInt32 + if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSSS, s.Desc, subsumedRepls, subsumedNextReplicaID); err != nil { return err } - stats.commit = timeutil.Now() + stats.subsumedReplicas = timeutil.Now() + + // Ingest all SSTs atomically. + if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil { + if err := fn(inSnap, snapType, inSnap.SSSS.SSTs()); err != nil { + return err + } + } + if err := r.store.engine.IngestExternalFiles(ctx, inSnap.SSSS.SSTs(), true /* skipWritingSeqNo */, true /* modify */); err != nil { + return errors.Wrapf(err, "while ingesting %s", inSnap.SSSS.SSTs()) + } + stats.ingestion = timeutil.Now() // The on-disk state is now committed, but the corresponding in-memory state // has not yet been updated. Any errors past this point must therefore be // treated as fatal. - for _, sr := range subsumedRepls { - // We removed sr's data when we committed the batch. Finish subsumption by - // updating the in-memory bookkeping. - if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil { - log.Fatalf(ctx, "unable to finish destroying %s while applying snapshot: %+v", sr, err) - } - // We already hold sr's raftMu, so we must call removeReplicaImpl directly. - // Note that it's safe to update the store's metadata for sr's removal - // separately from updating the store's metadata for r's new descriptor - // (i.e., under a different store.mu acquisition). Each store.mu acquisition - // leaves the store in a consistent state, and access to the replicas - // themselves is protected by their raftMus, which are held from start to - // finish. - if err := r.store.removeReplicaImpl(ctx, sr, subsumedNextReplicaID, RemoveOptions{ - DestroyData: false, // data is already destroyed - }); err != nil { - log.Fatalf(ctx, "unable to remove %s while applying snapshot: %+v", sr, err) - } + if err := r.clearSubsumedReplicaInMemoryData(ctx, subsumedRepls, subsumedNextReplicaID); err != nil { + log.Fatalf(ctx, "failed to clear in-memory data of subsumed replicas while applying snapshot: %+v", err) } // Atomically swap the placeholder, if any, for the replica, and update the @@ -1034,6 +1000,137 @@ func (r *Replica) applySnapshot( return nil } +// clearSubsumedReplicaDiskData clears the on disk data of the subsumed +// replicas by creating SSTs with range deletion tombstones. We have to be +// careful here not to have overlapping ranges with the SSTs we have already +// created since that will throw an error while we are ingesting them. This +// method requires that each of the subsumed replicas raftMu is held. +func (r *Replica) clearSubsumedReplicaDiskData( + ctx context.Context, + ssss *SSTSnapshotStorageScratch, + desc *roachpb.RangeDescriptor, + subsumedRepls []*Replica, + subsumedNextReplicaID roachpb.ReplicaID, +) error { + getKeyRanges := func(desc *roachpb.RangeDescriptor) [2]rditer.KeyRange { + return [...]rditer.KeyRange{ + rditer.MakeRangeLocalKeyRange(desc), + rditer.MakeUserKeyRange(desc), + } + } + keyRanges := getKeyRanges(desc) + totalKeyRanges := append([]rditer.KeyRange(nil), keyRanges[:]...) + for _, sr := range subsumedRepls { + // We have to create an SST for the subsumed replica's range-id local keys. + subsumedReplSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + // NOTE: We set mustClearRange to true because we are setting + // RaftTombstoneKey. Since Clears and Puts need to be done in increasing + // order of keys, it is not safe to use ClearRangeIter. + if err := sr.preDestroyRaftMuLocked( + ctx, + r.store.Engine(), + &subsumedReplSST, + subsumedNextReplicaID, + true, /* rangeIDLocalOnly */ + true, /* mustClearRange */ + ); err != nil { + subsumedReplSST.Close() + return err + } + if err := ssss.WriteSST(ctx, &subsumedReplSST); err != nil { + return err + } + + srKeyRanges := getKeyRanges(sr.Desc()) + // Compute the total key space covered by the current replica and all + // subsumed replicas. + for i := range srKeyRanges { + if srKeyRanges[i].Start.Key.Compare(totalKeyRanges[i].Start.Key) < 0 { + totalKeyRanges[i].Start = srKeyRanges[i].Start + } + if srKeyRanges[i].End.Key.Compare(totalKeyRanges[i].End.Key) > 0 { + totalKeyRanges[i].End = srKeyRanges[i].End + } + } + } + + // We might have to create SSTs for the range local keys and user keys + // depending on if the subsumed replicas are not fully contained by the + // replica in our snapshot. The following is an example to this case + // happening. + // + // a b c d + // |---1---|-------2-------| S1 + // |---1-------------------| S2 + // |---1-----------|---3---| S3 + // + // Since the merge is the first operation to happen, a follower could be down + // before it completes. It is reasonable for a snapshot for r1 from S3 to + // subsume both r1 and r2 in S1. + for i := range keyRanges { + if totalKeyRanges[i].End.Key.Compare(keyRanges[i].End.Key) > 0 { + subsumedReplSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + if err := engine.ClearRangeWithHeuristic( + r.store.Engine(), + &subsumedReplSST, + keyRanges[i].End, + totalKeyRanges[i].End, + ); err != nil { + subsumedReplSST.Close() + return err + } + if err := ssss.WriteSST(ctx, &subsumedReplSST); err != nil { + return err + } + } + // The snapshot must never subsume a replica that extends the range of the + // replica to the left. This is because splits and merges (the only + // operation that change the key bounds) always leave the start key intact. + // Extending to the left implies that either we merged "to the left" (we + // don't), or that we're applying a snapshot for another range (we don't do + // that either). Something is severely wrong for this to happen. + if totalKeyRanges[i].Start.Key.Compare(keyRanges[i].Start.Key) < 0 { + log.Fatalf(ctx, "subsuming replica to our left; key range: %v; total key range %v", + keyRanges[i], totalKeyRanges[i]) + } + } + return nil +} + +// clearSubsumedReplicaInMemoryData clears the in-memory data of the subsumed +// replicas. This method requires that each of the subsumed replicas raftMu is +// held. +func (r *Replica) clearSubsumedReplicaInMemoryData( + ctx context.Context, subsumedRepls []*Replica, subsumedNextReplicaID roachpb.ReplicaID, +) error { + for _, sr := range subsumedRepls { + // We removed sr's data when we committed the batch. Finish subsumption by + // updating the in-memory bookkeping. + if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil { + return err + } + // We already hold sr's raftMu, so we must call removeReplicaImpl directly. + // Note that it's safe to update the store's metadata for sr's removal + // separately from updating the store's metadata for r's new descriptor + // (i.e., under a different store.mu acquisition). Each store.mu + // acquisition leaves the store in a consistent state, and access to the + // replicas themselves is protected by their raftMus, which are held from + // start to finish. + if err := r.store.removeReplicaImpl(ctx, sr, subsumedNextReplicaID, RemoveOptions{ + DestroyData: false, // data is already destroyed + }); err != nil { + return err + } + } + return nil +} + type raftCommandEncodingVersion byte // Raft commands are encoded with a 1-byte version (currently 0 or 1), an 8-byte diff --git a/pkg/storage/stateloader/stateloader.go b/pkg/storage/stateloader/stateloader.go index 9174d0a7ec7e..b401dfffa657 100644 --- a/pkg/storage/stateloader/stateloader.go +++ b/pkg/storage/stateloader/stateloader.go @@ -541,13 +541,21 @@ func (rsl StateLoader) LoadRaftTruncatedState( // SetRaftTruncatedState overwrites the truncated state. func (rsl StateLoader) SetRaftTruncatedState( - ctx context.Context, eng engine.ReadWriter, truncState *roachpb.RaftTruncatedState, + ctx context.Context, eng engine.Writer, truncState *roachpb.RaftTruncatedState, ) error { if (*truncState == roachpb.RaftTruncatedState{}) { return errors.New("cannot persist empty RaftTruncatedState") } - return engine.MVCCPutProto(ctx, eng, nil, /* ms */ - rsl.RaftTruncatedStateKey(), hlc.Timestamp{}, nil, truncState) + // "Blind" because ms == nil and timestamp == hlc.Timestamp{}. + return engine.MVCCBlindPutProto( + ctx, + eng, + nil, /* ms */ + rsl.RaftTruncatedStateKey(), + hlc.Timestamp{}, /* timestamp */ + truncState, + nil, /* txn */ + ) } // LoadHardState loads the HardState. @@ -566,10 +574,18 @@ func (rsl StateLoader) LoadHardState( // SetHardState overwrites the HardState. func (rsl StateLoader) SetHardState( - ctx context.Context, batch engine.ReadWriter, st raftpb.HardState, + ctx context.Context, batch engine.Writer, st raftpb.HardState, ) error { - return engine.MVCCPutProto(ctx, batch, nil, - rsl.RaftHardStateKey(), hlc.Timestamp{}, nil, &st) + // "Blind" because ms == nil and timestamp == hlc.Timestamp{}. + return engine.MVCCBlindPutProto( + ctx, + batch, + nil, /* ms */ + rsl.RaftHardStateKey(), + hlc.Timestamp{}, /* timestamp */ + &st, + nil, /* txn */ + ) } // SynthesizeRaftState creates a Raft state which synthesizes both a HardState diff --git a/pkg/storage/store.go b/pkg/storage/store.go index befd75eeed12..05a276f6b239 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -412,6 +412,7 @@ type Store struct { raftEntryCache *raftentry.Cache limiters batcheval.Limiters txnWaitMetrics *txnwait.Metrics + sss SSTSnapshotStorage // gossipRangeCountdown and leaseRangeCountdown are countdowns of // changes to range and leaseholder counts, after which the store @@ -865,6 +866,16 @@ func NewStore( s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter( "exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)), ) + + // The snapshot storage is usually empty at this point since it is cleared + // after each snapshot application, except when the node crashed right before + // it can clean it up. If this fails it's not a correctness issue since the + // storage is also cleared before receiving a snapshot. + s.sss = NewSSTSnapshotStorage(s.engine, s.limiters.BulkIOWriteRate) + if err := s.sss.Clear(); err != nil { + log.Warningf(ctx, "failed to clear snapshot storage: %v", err) + } + // On low-CPU instances, a default limit value may still allow ExportRequests // to tie up all cores so cap limiter at cores-1 when setting value is higher. exportCores := runtime.NumCPU() - 1 diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 7aafe83985a4..b11fc4fb6596 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -78,6 +79,9 @@ type snapshotStrategy interface { // Status provides a status report on the work performed during the // snapshot. Only valid if the strategy succeeded. Status() string + + // Close cleans up any resources associated with the snapshot strategy. + Close(context.Context) } func assertStrategy( @@ -94,47 +98,225 @@ type kvBatchSnapshotStrategy struct { raftCfg *base.RaftConfig status string - // Fields used when sending snapshots. + // The size of the batches of PUT operations to send to the receiver of the + // snapshot. Only used on the sender side. batchSize int64 - limiter *rate.Limiter - newBatch func() engine.Batch + // Limiter for sending KV batches. Only used on the sender side. + limiter *rate.Limiter + // Only used on the sender side. + newBatch func() engine.Batch + + // The approximate size of the SST chunk to buffer in memory on the receiver + // before flushing to disk. Only used on the receiver side. + sstChunkSize int64 + // Only used on the receiver side. + ssss *SSTSnapshotStorageScratch +} + +// multiSSTWriter is a wrapper around RocksDBSstFileWriter and +// SSTSnapshotStorageScratch that handles chunking SSTs and persisting them to +// disk. +type multiSSTWriter struct { + ssss *SSTSnapshotStorageScratch + currSST engine.RocksDBSstFileWriter + currSSTFile *SSTSnapshotStorageFile + keyRanges []rditer.KeyRange + currRange int + // The size of the SST the last time the SST file writer was truncated. This + // size is used to determine the size of the SST chunk buffered in-memory. + truncatedSize int64 + // The approximate size of the SST chunk to buffer in memory on the receiver + // before flushing to disk. + sstChunkSize int64 +} + +func newMultiSSTWriter( + ssss *SSTSnapshotStorageScratch, keyRanges []rditer.KeyRange, sstChunkSize int64, +) (multiSSTWriter, error) { + msstw := multiSSTWriter{ + ssss: ssss, + keyRanges: keyRanges, + sstChunkSize: sstChunkSize, + } + if err := msstw.initSST(); err != nil { + return msstw, err + } + return msstw, nil +} + +func (msstw *multiSSTWriter) initSST() error { + newSSTFile, err := msstw.ssss.NewFile() + if err != nil { + return errors.Wrap(err, "failed to create new sst file") + } + msstw.currSSTFile = newSSTFile + newSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return errors.Wrap(err, "failed to create sst file writer") + } + msstw.currSST = newSST + if err := msstw.currSST.ClearRange(msstw.keyRanges[msstw.currRange].Start, msstw.keyRanges[msstw.currRange].End); err != nil { + msstw.currSST.Close() + return errors.Wrap(err, "failed to clear range on sst file writer") + } + msstw.truncatedSize = 0 + return nil +} + +func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { + chunk, err := msstw.currSST.Finish() + if err != nil { + return errors.Wrap(err, "failed to finish sst") + } + if err := msstw.currSSTFile.Write(ctx, chunk); err != nil { + return errors.Wrap(err, "failed to write to sst file") + } + if err := msstw.currSSTFile.Close(); err != nil { + return errors.Wrap(err, "failed to close sst file") + } + msstw.currRange++ + msstw.currSST.Close() + return nil +} + +func (msstw *multiSSTWriter) Put(ctx context.Context, key engine.MVCCKey, value []byte) error { + for msstw.keyRanges[msstw.currRange].End.Key.Compare(key.Key) <= 0 { + // Finish the current SST, write to the file, and move to the next key + // range. + if err := msstw.finalizeSST(ctx); err != nil { + return err + } + if err := msstw.initSST(); err != nil { + return err + } + } + if msstw.keyRanges[msstw.currRange].Start.Key.Compare(key.Key) > 0 { + return crdberrors.AssertionFailedf("client error: expected %s to fall in one of %s", key.Key, msstw.keyRanges) + } + if err := msstw.currSST.Put(key, value); err != nil { + return errors.Wrap(err, "failed to put in sst") + } + if msstw.currSST.DataSize-msstw.truncatedSize > msstw.sstChunkSize { + msstw.truncatedSize = msstw.currSST.DataSize + chunk, err := msstw.currSST.Truncate() + if err != nil { + return errors.Wrap(err, "failed to truncate sst") + } + // NOTE: Chunk may be empty due to the semantics of Truncate(), but Write() + // handles an empty chunk as a noop. + if err := msstw.currSSTFile.Write(ctx, chunk); err != nil { + return errors.Wrap(err, "failed to write to sst file") + } + } + return nil +} + +func (msstw *multiSSTWriter) Finish(ctx context.Context) error { + if msstw.currRange < len(msstw.keyRanges) { + for { + if err := msstw.finalizeSST(ctx); err != nil { + return err + } + if msstw.currRange >= len(msstw.keyRanges) { + break + } + if err := msstw.initSST(); err != nil { + return err + } + } + } + return nil +} + +func (msstw *multiSSTWriter) Close() error { + msstw.currSST.Close() + return msstw.currSSTFile.Close() } // Receive implements the snapshotStrategy interface. +// +// NOTE: This function assumes that the key-value pairs are sent in sorted +// order. The key-value pairs are sent in the following sorted order: +// +// 1. Replicated range-id local key range +// 2. Range-local key range +// 3. User key range func (kvSS *kvBatchSnapshotStrategy) Receive( ctx context.Context, stream incomingSnapshotStream, header SnapshotRequest_Header, ) (IncomingSnapshot, error) { assertStrategy(ctx, header, SnapshotRequest_KV_BATCH) - var batches [][]byte + // At the moment we'll write at most three SSTs. + // TODO(jeffreyxiao): Re-evaluate as the default range size grows. + keyRanges := rditer.MakeReplicatedKeyRanges(header.State.Desc) + msstw, err := newMultiSSTWriter(kvSS.ssss, keyRanges, kvSS.sstChunkSize) + if err != nil { + return noSnap, err + } + defer func() { + // Nothing actionable if closing multiSSTWriter. Closing the same SST and + // SST file multiple times is idempotent. + if err := msstw.Close(); err != nil { + log.Warningf(ctx, "failed to close multiSSTWriter: %v", err) + } + }() var logEntries [][]byte + for { req, err := stream.Recv() if err != nil { - return IncomingSnapshot{}, err + return noSnap, err } if req.Header != nil { err := errors.New("client error: provided a header mid-stream") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(stream, err) } if req.KVBatch != nil { - batches = append(batches, req.KVBatch) + batchReader, err := engine.NewRocksDBBatchReader(req.KVBatch) + if err != nil { + return noSnap, errors.Wrap(err, "failed to decode batch") + } + // All operations in the batch are guaranteed to be puts. + for batchReader.Next() { + if batchReader.BatchType() != engine.BatchTypeValue { + return noSnap, crdberrors.AssertionFailedf("expected type %d, found type %d", engine.BatchTypeValue, batchReader.BatchType()) + } + key, err := batchReader.MVCCKey() + if err != nil { + return noSnap, errors.Wrap(err, "failed to decode mvcc key") + } + if err := msstw.Put(ctx, key, batchReader.Value()); err != nil { + return noSnap, err + } + } } if req.LogEntries != nil { logEntries = append(logEntries, req.LogEntries...) } if req.Final { + // We finished receiving all batches and log entries. It's possible that + // we did not receive any key-value pairs for some of the key ranges, but + // we must still construct SSTs with range deletion tombstones to remove + // the data. + if err := msstw.Finish(ctx); err != nil { + return noSnap, err + } + + if err := msstw.Close(); err != nil { + return noSnap, err + } + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { - err = errors.Wrap(err, "invalid snapshot") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + err = errors.Wrap(err, "client error: invalid snapshot") + return noSnap, sendSnapshotError(stream, err) } inSnap := IncomingSnapshot{ UsesUnreplicatedTruncatedState: header.UnreplicatedTruncatedState, SnapUUID: snapUUID, - Batches: batches, + SSSS: kvSS.ssss, LogEntries: logEntries, State: &header.State, snapType: header.Type, @@ -153,7 +335,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( inSnap.snapType = SnapshotRequest_PREEMPTIVE } - kvSS.status = fmt.Sprintf("kv batches: %d, log entries: %d", len(batches), len(logEntries)) + kvSS.status = fmt.Sprintf("log entries: %d, ssts: %d", len(logEntries), len(kvSS.ssss.SSTs())) return inSnap, nil } } @@ -190,10 +372,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } if int64(b.Len()) >= kvSS.batchSize { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } b = nil @@ -204,10 +383,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } } if b != nil { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } } @@ -330,8 +506,11 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } func (kvSS *kvBatchSnapshotStrategy) sendBatch( - stream outgoingSnapshotStream, batch engine.Batch, + ctx context.Context, stream outgoingSnapshotStream, batch engine.Batch, ) error { + if err := kvSS.limiter.WaitN(ctx, 1); err != nil { + return err + } repr := batch.Repr() batch.Close() return stream.Send(&SnapshotRequest{KVBatch: repr}) @@ -340,6 +519,18 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( // Status implements the snapshotStrategy interface. func (kvSS *kvBatchSnapshotStrategy) Status() string { return kvSS.status } +// Close implements the snapshotStrategy interface. +func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { + if kvSS.ssss != nil { + // A failure to clean up the storage is benign except that it will leak + // disk space (which is reclaimed on node restart). It is unexpected + // though, so log a warning. + if err := kvSS.ssss.Clear(); err != nil { + log.Warningf(ctx, "error closing kvBatchSnapshotStrategy: %v", err) + } + } +} + // reserveSnapshot throttles incoming snapshots. The returned closure is used // to cleanup the reservation and release its resources. A nil cleanup function // and a non-empty rejectionMessage indicates the reservation was declined. @@ -631,9 +822,18 @@ func (s *Store) receiveSnapshot( var ss snapshotStrategy switch header.Strategy { case SnapshotRequest_KV_BATCH: + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) + if err != nil { + err = errors.Wrap(err, "invalid snapshot") + return sendSnapshotError(stream, err) + } + ss = &kvBatchSnapshotStrategy{ - raftCfg: &s.cfg.RaftConfig, + raftCfg: &s.cfg.RaftConfig, + ssss: s.sss.NewSSTSnapshotStorageScratch(header.State.Desc.RangeID, snapUUID), + sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.cfg.Settings.SV), } + defer ss.Close(ctx) default: return sendSnapshotError(stream, errors.Errorf("%s,r%d: unknown snapshot strategy: %s", @@ -697,6 +897,15 @@ var recoverySnapshotRate = settings.RegisterByteSizeSetting( envutil.EnvOrDefaultBytes("COCKROACH_RAFT_SNAPSHOT_RATE", 8<<20), ) +// snapshotSSTWriteSyncRate is the size of chunks to write before fsync-ing. +// The default of 2 MiB was chosen to be in line with the behavior in bulk-io. +// See sstWriteSyncRate. +var snapshotSSTWriteSyncRate = settings.RegisterByteSizeSetting( + "kv.snapshot_sst.sync_size", + "threshold after which snapshot SST writes must fsync", + 2<<20, /* 2 MiB */ +) + func snapshotRateLimit( st *cluster.Settings, priority SnapshotRequest_Priority, ) (rate.Limit, error) { diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 30cf76e21458..be1e7a656063 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -195,6 +195,9 @@ type StoreTestingKnobs struct { // This ensures the `*Replica` will be materialized on the Store when it // returns. ReplicaAddStopAfterLearnerSnapshot func() bool + // BeforeSnapshotSSTIngestion is run just before the SSTs are ingested when + // applying a snapshot. + BeforeSnapshotSSTIngestion func(IncomingSnapshot, SnapshotRequest_Type, []string) error // MaxApplicationBatchSize enforces a maximum size on application batches. // This can be useful for testing conditions which require commands to be