From 2881a8f91753be0b0722352fdeb05cef2e259044 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 27 Apr 2018 18:05:35 -0400 Subject: [PATCH] [DNM] storage: introduce SST snapshot strategy Fixes #16954. Related to #25047. This depends on the following two upstream changes to RockDB: - https://github.com/facebook/rocksdb/pull/3778 - https://github.com/facebook/rocksdb/pull/3779 The change introduces a new snapshot strategy called "SST". This strategy stream sst files consisting of all keys in a range from the sender to the receiver. These sst files are then atomically ingested directly into RocksDB. An important property of the strategy is that the amount of memory required for a receiver using the strategy is constant with respect to the size of a range, instead of linear as it is with the KV_BATCH strategy. This will be critical for increasing the default range size and potentially for increasing the number of concurrent snapshots allowed per node. The strategy also seems to significantly speed up snapshots once ranges are above a certain size (somewhere in the single digit MBs). This is a WIP change. Before it can be merged it needs: - to be cleaned up a bit - more testing (unit test, testing knobs, maybe some chaos) - proper version handling - heuristic tuning - decisions on questions like compactions after ingestion Release note: None --- pkg/ccl/storageccl/engineccl/mvcc.go | 4 +- pkg/storage/client_test.go | 1 + pkg/storage/engine/engine.go | 2 - pkg/storage/engine/in_mem.go | 32 +- pkg/storage/raft.pb.go | 238 +++++++++----- pkg/storage/raft.proto | 22 +- pkg/storage/rditer/replica_data_iter.go | 22 ++ pkg/storage/replica_command.go | 14 +- pkg/storage/replica_raftstorage.go | 111 ++++++- pkg/storage/store_snapshot.go | 414 +++++++++++++++++++++++- 10 files changed, 762 insertions(+), 98 deletions(-) diff --git a/pkg/ccl/storageccl/engineccl/mvcc.go b/pkg/ccl/storageccl/engineccl/mvcc.go index cc9eb8b5cd14..680b6a568ca8 100644 --- a/pkg/ccl/storageccl/engineccl/mvcc.go +++ b/pkg/ccl/storageccl/engineccl/mvcc.go @@ -192,8 +192,8 @@ func (i *MVCCIncrementalIterator) UnsafeKey() engine.MVCCKey { return i.iter.UnsafeKey() } -// UnsafeValue returns the same value as Value, but the memory is invalidated on -// the next call to {Next,Reset,Close}. +// UnsafeValue returns the same value as a byte slice, but the memory is +// invalidated on the next call to {Next,Reset,Close}. func (i *MVCCIncrementalIterator) UnsafeValue() []byte { return i.iter.UnsafeValue() } diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 1f2ee2fe3de5..be270361e346 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -1037,6 +1037,7 @@ func (m *multiTestContext) changeReplicas( // is lost. We could make a this into a roachpb.Error but it seems overkill // for this one usage. if testutils.IsError(err, "snapshot failed: .*") { + m.t.Logf("snapshot failed with error: %v", err) continue } return 0, err diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index 0d5e9756aad6..e035d8399ab7 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -243,8 +243,6 @@ type Engine interface { GetStats() (*Stats, error) // GetAuxiliaryDir returns a path under which files can be stored // persistently, and from which data can be ingested by the engine. - // - // Not thread safe. GetAuxiliaryDir() string // NewBatch returns a new instance of a batched engine which wraps // this engine. Batched engines accumulate all mutations and apply diff --git a/pkg/storage/engine/in_mem.go b/pkg/storage/engine/in_mem.go index e86084b3f3e4..1394a05c7820 100644 --- a/pkg/storage/engine/in_mem.go +++ b/pkg/storage/engine/in_mem.go @@ -14,13 +14,43 @@ package engine -import "github.com/cockroachdb/cockroach/pkg/roachpb" +import ( + "context" + "io/ioutil" + "os" + + "github.com/cockroachdb/cockroach/pkg/roachpb" +) // InMem wraps RocksDB and configures it for in-memory only storage. type InMem struct { *RocksDB } +// IngestExternalFiles for an in-memory RocksDB first loads each file into +// memory, then ingests them (again, in memory). This implementation is provided +// solely to make tests work. +func (db InMem) IngestExternalFiles( + ctx context.Context, paths []string, allowFileModifications bool, +) error { + for _, file := range paths { + data, err := ioutil.ReadFile(file) + if err != nil { + if os.IsNotExist(err) { + // The file may already be in the correct in-memory env. Ignore + // the error, it will be caught by IngestExternalFiles if the + // file truly is missing. + continue + } + return err + } + if err := db.RocksDB.WriteFile(file, data); err != nil { + return err + } + } + return db.RocksDB.IngestExternalFiles(ctx, paths, allowFileModifications) +} + // NewInMem allocates and returns a new, opened InMem engine. // The caller must call the engine's Close method when the engine is no longer // needed. diff --git a/pkg/storage/raft.pb.go b/pkg/storage/raft.pb.go index e0417325a451..fab6af3f58b8 100644 --- a/pkg/storage/raft.pb.go +++ b/pkg/storage/raft.pb.go @@ -74,13 +74,19 @@ const ( // combined into a large RocksDB WriteBatch that is atomically // applied. SnapshotRequest_KV_BATCH SnapshotRequest_Strategy = 0 + // SST snapshots stream sst files consisting of all keys in a range + // from the sender to the receiver. These sst files are then atomically + // ingested directly into RocksDB. + SnapshotRequest_SST SnapshotRequest_Strategy = 1 ) var SnapshotRequest_Strategy_name = map[int32]string{ 0: "KV_BATCH", + 1: "SST", } var SnapshotRequest_Strategy_value = map[string]int32{ "KV_BATCH": 0, + "SST": 1, } func (x SnapshotRequest_Strategy) Enum() *SnapshotRequest_Strategy { @@ -235,13 +241,23 @@ func (*RaftMessageResponse) Descriptor() ([]byte, []int) { return fileDescriptor // SnapshotRequest is the request used to send streaming snapshot requests. type SnapshotRequest struct { Header *SnapshotRequest_Header `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` - // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple request messages. + // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple + // request messages. Only used by KV_BATCH snapshots. KVBatch []byte `protobuf:"bytes,2,opt,name=kv_batch,json=kvBatch" json:"kv_batch,omitempty"` // These are really raftpb.Entry, but we model them as raw bytes to avoid - // roundtripping through memory. They are separate from the kv_batch to - // allow flexibility in log implementations. + // roundtripping through memory. They are separate from the kv_batch to allow + // flexibility in log implementations. Used by KV_BATCH and SST snapshots. LogEntries [][]byte `protobuf:"bytes,3,rep,name=log_entries,json=logEntries" json:"log_entries,omitempty"` - Final bool `protobuf:"varint,4,opt,name=final" json:"final"` + // A chunk of an SST file. Multiple SST chunks may be sent across multiple + // request messages. These chunks will combine to form one or more complete + // SST files, delimited by intermittent sst_final flags. Only used by SST + // snapshots. + SSTChunk []byte `protobuf:"bytes,5,opt,name=sst_chunk,json=sstChunk" json:"sst_chunk,omitempty"` + // If set, the SST chunk is the last chunk in the current SST file. Only used + // by SST snapshots. + SSTFinal bool `protobuf:"varint,6,opt,name=sst_final,json=sstFinal" json:"sst_final"` + // Indicates that this is the last request in the snapshot stream. + Final bool `protobuf:"varint,4,opt,name=final" json:"final"` } func (m *SnapshotRequest) Reset() { *m = SnapshotRequest{} } @@ -752,6 +768,20 @@ func (m *SnapshotRequest) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0 } i++ + if m.SSTChunk != nil { + dAtA[i] = 0x2a + i++ + i = encodeVarintRaft(dAtA, i, uint64(len(m.SSTChunk))) + i += copy(dAtA[i:], m.SSTChunk) + } + dAtA[i] = 0x30 + i++ + if m.SSTFinal { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ return i, nil } @@ -967,6 +997,11 @@ func (m *SnapshotRequest) Size() (n int) { } } n += 2 + if m.SSTChunk != nil { + l = len(m.SSTChunk) + n += 1 + l + sovRaft(uint64(l)) + } + n += 2 return n } @@ -1907,6 +1942,57 @@ func (m *SnapshotRequest) Unmarshal(dAtA []byte) error { } } m.Final = bool(v != 0) + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SSTChunk", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRaft + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SSTChunk = append(m.SSTChunk[:0], dAtA[iNdEx:postIndex]...) + if m.SSTChunk == nil { + m.SSTChunk = []byte{} + } + iNdEx = postIndex + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SSTFinal", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.SSTFinal = bool(v != 0) default: iNdEx = preIndex skippy, err := skipRaft(dAtA[iNdEx:]) @@ -2461,74 +2547,78 @@ var ( func init() { proto.RegisterFile("storage/raft.proto", fileDescriptorRaft) } var fileDescriptorRaft = []byte{ - // 1104 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xcd, 0x6e, 0xdb, 0x46, - 0x17, 0x15, 0xad, 0xff, 0x2b, 0x29, 0x66, 0xe6, 0x33, 0xbe, 0x12, 0x6a, 0x21, 0xa9, 0x4c, 0x13, - 0xa8, 0x29, 0x40, 0x05, 0x42, 0xd0, 0x45, 0x77, 0xfa, 0x61, 0x6c, 0xc5, 0xbf, 0xa0, 0x1d, 0x17, - 0x2d, 0x10, 0x08, 0x23, 0x6a, 0x24, 0x11, 0x96, 0x38, 0x32, 0x39, 0x4a, 0xeb, 0x3c, 0x45, 0x1f, - 0x21, 0x9b, 0xee, 0xba, 0xef, 0x2b, 0x78, 0x53, 0xa0, 0xcb, 0x2c, 0x0a, 0xa3, 0x75, 0xdf, 0xa2, - 0xe8, 0xa2, 0x98, 0xe1, 0x8c, 0x4d, 0xdb, 0x6a, 0x63, 0x17, 0x45, 0x37, 0xdd, 0xd8, 0xe4, 0x9d, - 0x39, 0xe7, 0x70, 0xee, 0xb9, 0xf7, 0x8e, 0x00, 0x85, 0x8c, 0x06, 0x78, 0x4c, 0x1a, 0x01, 0x1e, - 0x31, 0x6b, 0x1e, 0x50, 0x46, 0xd1, 0x7d, 0x97, 0xba, 0x47, 0x01, 0xc5, 0xee, 0xc4, 0x92, 0xab, - 0xe5, 0x35, 0xf1, 0x3a, 0x1f, 0x34, 0x48, 0x10, 0xd0, 0x20, 0x8c, 0x36, 0x96, 0xff, 0xaf, 0xa2, - 0x33, 0xc2, 0xf0, 0x10, 0x33, 0x2c, 0xe3, 0x55, 0x45, 0x2a, 0xff, 0x0f, 0x70, 0xc8, 0x9f, 0x31, - 0x23, 0x72, 0xc3, 0xfb, 0x84, 0xb9, 0x43, 0x21, 0x29, 0xfe, 0xcc, 0x07, 0x31, 0xf9, 0xf2, 0xda, - 0x98, 0x8e, 0xa9, 0x78, 0x6c, 0xf0, 0xa7, 0x28, 0x6a, 0x7e, 0x9f, 0x84, 0x92, 0x83, 0x47, 0x6c, - 0x83, 0xe0, 0x80, 0x0d, 0x08, 0x66, 0x68, 0x00, 0xb9, 0x00, 0xfb, 0x63, 0xd2, 0xf7, 0x86, 0x86, - 0x56, 0xd3, 0xea, 0xa9, 0xf6, 0xfa, 0xe9, 0x59, 0x35, 0x71, 0x7e, 0x56, 0xcd, 0x3a, 0x3c, 0xde, - 0xeb, 0xfe, 0x76, 0x56, 0x7d, 0x3a, 0xf6, 0xd8, 0x64, 0x31, 0xb0, 0x5c, 0x3a, 0x6b, 0x5c, 0x1c, - 0x6b, 0x38, 0xb8, 0x7c, 0x6e, 0xcc, 0x8f, 0xc6, 0x0d, 0x79, 0x0e, 0x4b, 0xe2, 0x9c, 0xac, 0x20, - 0xee, 0x0d, 0xd1, 0x57, 0xb0, 0x3a, 0x0a, 0xe8, 0xac, 0x1f, 0x90, 0xf9, 0xd4, 0x73, 0x31, 0x97, - 0x5a, 0xa9, 0x69, 0xf5, 0x52, 0x7b, 0x57, 0x4a, 0x95, 0x9e, 0x05, 0x74, 0xe6, 0x44, 0xab, 0x42, - 0xf0, 0xd3, 0xbb, 0x09, 0x2a, 0xa4, 0x53, 0x1a, 0xc5, 0x88, 0x86, 0xe8, 0x18, 0x4a, 0x8c, 0xc6, - 0x65, 0x93, 0x42, 0x76, 0x5b, 0xca, 0x16, 0x0e, 0xe8, 0x3f, 0x21, 0x5a, 0x60, 0xf4, 0x52, 0xd2, - 0x80, 0x14, 0x23, 0xc1, 0xcc, 0x48, 0x89, 0x5c, 0xa6, 0xb8, 0x92, 0x23, 0x22, 0xe8, 0x03, 0xc8, - 0xb8, 0x74, 0x36, 0xf3, 0x98, 0x91, 0x8e, 0xad, 0xc9, 0x18, 0xaa, 0x40, 0xf6, 0x78, 0xe1, 0x91, - 0xd0, 0x25, 0x46, 0xa6, 0xa6, 0xd5, 0x73, 0x72, 0x59, 0x05, 0xcd, 0xdf, 0x93, 0x80, 0xb8, 0x73, - 0xdb, 0x24, 0x0c, 0xf1, 0x98, 0x38, 0xe4, 0x78, 0x41, 0xc2, 0x7f, 0xc7, 0xbe, 0x6d, 0x28, 0xc6, - 0xed, 0x13, 0xde, 0x15, 0x9a, 0x1f, 0x59, 0x97, 0x05, 0x7e, 0x2d, 0x27, 0x5d, 0x12, 0xba, 0x81, - 0x37, 0x67, 0x34, 0x90, 0xa7, 0x28, 0xc4, 0x6c, 0x41, 0x3d, 0x80, 0x4b, 0x53, 0x84, 0x23, 0x77, - 0x23, 0xcb, 0x5f, 0xa4, 0x1b, 0x35, 0x20, 0x3b, 0x8b, 0xf2, 0x21, 0xf2, 0x5d, 0x68, 0xae, 0x5a, - 0x51, 0x27, 0x58, 0x32, 0x4d, 0x2a, 0x8b, 0x72, 0x57, 0x3c, 0xcb, 0xe9, 0x25, 0x59, 0x46, 0xcf, - 0x00, 0x26, 0xaa, 0x35, 0x42, 0x23, 0x53, 0x4b, 0xd6, 0x0b, 0xcd, 0x9a, 0x75, 0xa3, 0x93, 0xad, - 0x2b, 0x3d, 0x24, 0x49, 0x62, 0x48, 0xb4, 0x0b, 0xab, 0x17, 0x6f, 0xfd, 0x80, 0x84, 0xf3, 0xd0, - 0xc8, 0xde, 0x89, 0xec, 0xde, 0x05, 0xdc, 0xe1, 0x68, 0x73, 0x00, 0xef, 0xdd, 0x74, 0xbf, 0x8d, - 0x99, 0x3b, 0x41, 0xeb, 0x90, 0x0b, 0xa2, 0xf7, 0xd0, 0xd0, 0x84, 0xc8, 0xc3, 0x3f, 0x11, 0xb9, - 0x86, 0x8e, 0x94, 0x2e, 0xc0, 0xe6, 0x1e, 0x18, 0x57, 0x76, 0x85, 0x73, 0xea, 0x87, 0xe4, 0x85, - 0xef, 0x51, 0x1f, 0x59, 0x90, 0x16, 0x43, 0x4b, 0x14, 0x59, 0xa1, 0x69, 0x2c, 0xf1, 0xcb, 0xe6, - 0xeb, 0x4e, 0xb4, 0xed, 0xb3, 0xd4, 0xe9, 0x9b, 0xaa, 0x66, 0xfe, 0xb4, 0x02, 0xff, 0x5b, 0x42, - 0xf9, 0x1f, 0xaf, 0xda, 0x75, 0x48, 0x2f, 0x78, 0x52, 0x65, 0xcd, 0x7e, 0xf2, 0x2e, 0xb7, 0x62, - 0x3e, 0x48, 0xb2, 0x08, 0x6f, 0x7e, 0x97, 0x86, 0xd5, 0x7d, 0x1f, 0xcf, 0xc3, 0x09, 0x65, 0x6a, - 0x20, 0xb4, 0x20, 0x33, 0x21, 0x78, 0x48, 0x94, 0x53, 0x1f, 0x2f, 0x61, 0xbf, 0x86, 0xb1, 0x36, - 0x04, 0xc0, 0x91, 0x40, 0xf4, 0x08, 0x72, 0x47, 0xaf, 0xfa, 0x03, 0x5e, 0x5c, 0x22, 0x6b, 0xc5, - 0x76, 0x81, 0x3b, 0xb3, 0x79, 0x28, 0xea, 0xcd, 0xc9, 0x1e, 0xbd, 0x8a, 0x0a, 0xaf, 0x0a, 0x85, - 0x29, 0x1d, 0xf7, 0x89, 0xcf, 0x02, 0x8f, 0x84, 0x46, 0xb2, 0x96, 0xac, 0x17, 0x1d, 0x98, 0xd2, - 0xb1, 0x1d, 0x45, 0x50, 0x19, 0xd2, 0x23, 0xcf, 0xc7, 0x53, 0x71, 0x50, 0xd5, 0x6b, 0x51, 0xa8, - 0xfc, 0x26, 0x09, 0x99, 0x48, 0x17, 0xbd, 0x84, 0x35, 0xde, 0xb5, 0x7d, 0xd9, 0xa4, 0x7d, 0x59, - 0x90, 0xd2, 0xb1, 0x3b, 0x15, 0x33, 0x0a, 0x6e, 0x8e, 0xc8, 0x07, 0x00, 0x51, 0xb1, 0x85, 0xde, - 0x6b, 0x22, 0x9c, 0x4b, 0x2a, 0x4f, 0x44, 0x7c, 0xdf, 0x7b, 0x4d, 0xd0, 0x43, 0x28, 0xb8, 0xd8, - 0xef, 0x0f, 0x89, 0x3b, 0xf5, 0x7c, 0x72, 0xe5, 0x83, 0xc1, 0xc5, 0x7e, 0x37, 0x8a, 0x73, 0xeb, - 0xc4, 0x0d, 0x2c, 0xa6, 0xc7, 0x72, 0xeb, 0x62, 0xb7, 0xb5, 0x2a, 0x86, 0x7d, 0x0e, 0x51, 0xc7, - 0x17, 0x78, 0xb4, 0x0d, 0xb9, 0x79, 0xe0, 0xd1, 0xc0, 0x63, 0x27, 0x62, 0xde, 0xdf, 0x5b, 0xca, - 0x75, 0xdd, 0xa8, 0x3d, 0x09, 0x51, 0xad, 0xab, 0x28, 0x38, 0x5d, 0xc8, 0x02, 0xcc, 0xc8, 0xf8, - 0xc4, 0xc8, 0xde, 0x9a, 0x6e, 0x5f, 0x42, 0x14, 0x9d, 0xa2, 0x78, 0x9e, 0xca, 0x69, 0xfa, 0x8a, - 0xf9, 0x14, 0x72, 0x4a, 0x10, 0x15, 0x20, 0xfb, 0x62, 0x67, 0x73, 0x67, 0xf7, 0xf3, 0x1d, 0x3d, - 0x81, 0x8a, 0x90, 0x73, 0xec, 0xce, 0xee, 0xa1, 0xed, 0x7c, 0xa1, 0x6b, 0xa8, 0x04, 0x79, 0xc7, - 0x6e, 0xb7, 0xb6, 0x5a, 0x3b, 0x1d, 0x5b, 0x5f, 0x31, 0x0d, 0xc8, 0x29, 0x5e, 0xbe, 0x71, 0xf3, - 0xb0, 0xdf, 0x6e, 0x1d, 0x74, 0x36, 0xf4, 0x84, 0xf9, 0x83, 0x06, 0xfa, 0xe5, 0x27, 0xc8, 0x51, - 0xb0, 0x01, 0x19, 0x9e, 0x91, 0x45, 0x28, 0xea, 0xf5, 0x5e, 0xf3, 0xf1, 0x5f, 0x7e, 0x77, 0x04, - 0xb2, 0xf6, 0x05, 0x42, 0xdd, 0xa0, 0x11, 0x9e, 0xcf, 0x76, 0x75, 0x19, 0xf0, 0xca, 0xc9, 0x5f, - 0x9b, 0xfd, 0x66, 0x0f, 0x32, 0x11, 0xee, 0xc6, 0x61, 0x5a, 0x9d, 0x8e, 0xbd, 0x77, 0x60, 0x77, - 0x75, 0x8d, 0x2f, 0xb5, 0xf6, 0xf6, 0xb6, 0x7a, 0x76, 0x57, 0x5f, 0x41, 0x79, 0x48, 0xdb, 0x8e, - 0xb3, 0xeb, 0xe8, 0x49, 0xbe, 0xab, 0x6b, 0x77, 0xb6, 0x7a, 0x3b, 0x76, 0x57, 0x4f, 0x3d, 0x4f, - 0xe5, 0x92, 0x7a, 0xca, 0xfc, 0x56, 0x83, 0xfb, 0x1d, 0xea, 0x8f, 0x3a, 0x13, 0x5e, 0x46, 0x1d, - 0xea, 0x33, 0xf2, 0x35, 0x43, 0x4f, 0x00, 0xf8, 0x95, 0x8e, 0xfd, 0xa1, 0x9a, 0x6e, 0xf9, 0xf6, - 0x7d, 0x39, 0xdd, 0xf2, 0x9d, 0x68, 0xa5, 0xd7, 0x75, 0xf2, 0x72, 0x93, 0xf8, 0xc9, 0x90, 0x9d, - 0xe3, 0x93, 0x29, 0xc5, 0xd1, 0xcf, 0xa2, 0xa2, 0xa3, 0x5e, 0x51, 0x17, 0xb2, 0x7f, 0x7f, 0xe2, - 0x28, 0x68, 0xf3, 0xad, 0x06, 0xf9, 0xed, 0xc5, 0x94, 0x79, 0xbc, 0x6d, 0xd0, 0x14, 0xf4, 0x58, - 0xfb, 0x44, 0x9d, 0xfc, 0xf8, 0x76, 0x3d, 0xc6, 0xf7, 0x96, 0x1f, 0xdd, 0x6e, 0x5c, 0x99, 0x89, - 0xba, 0xf6, 0x44, 0x43, 0x2f, 0xa1, 0xc8, 0x17, 0x95, 0x83, 0xc8, 0x7c, 0x77, 0x59, 0x96, 0x1f, - 0xdc, 0xa2, 0x04, 0x22, 0xfa, 0xf6, 0x87, 0xa7, 0xbf, 0x54, 0x12, 0xa7, 0xe7, 0x15, 0xed, 0xc7, - 0xf3, 0x8a, 0xf6, 0xf6, 0xbc, 0xa2, 0xfd, 0x7c, 0x5e, 0xd1, 0xbe, 0xf9, 0xb5, 0x92, 0xf8, 0x32, - 0x2b, 0x91, 0x7f, 0x04, 0x00, 0x00, 0xff, 0xff, 0xa4, 0xd6, 0xb6, 0x01, 0x9c, 0x0b, 0x00, 0x00, + // 1160 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xdd, 0x6e, 0x1b, 0x45, + 0x14, 0xce, 0xc6, 0x8e, 0x77, 0x7d, 0xec, 0x34, 0xdb, 0xa1, 0x82, 0x95, 0x41, 0x76, 0xba, 0xa5, + 0x55, 0x5a, 0xc4, 0xba, 0xb2, 0x2a, 0x2e, 0xb8, 0xf3, 0xcf, 0xb6, 0x71, 0xdb, 0xfc, 0x68, 0x9d, + 0x16, 0x81, 0x54, 0x59, 0xe3, 0xf5, 0xd8, 0x5e, 0xc5, 0xde, 0x71, 0x77, 0xc7, 0x85, 0xf6, 0x25, + 0xe0, 0x11, 0x7a, 0xc3, 0x33, 0xf0, 0x0a, 0xb9, 0x41, 0xe2, 0xb2, 0x17, 0x28, 0x02, 0xf3, 0x16, + 0x88, 0x0b, 0x34, 0x7f, 0x89, 0x9b, 0x18, 0x9a, 0x20, 0xc4, 0x0d, 0x37, 0xc9, 0xec, 0x99, 0xf3, + 0x7d, 0xc7, 0x73, 0xce, 0x77, 0xce, 0x0c, 0xa0, 0x94, 0xd1, 0x04, 0x0f, 0x49, 0x35, 0xc1, 0x03, + 0xe6, 0x4d, 0x13, 0xca, 0x28, 0xba, 0x1a, 0xd2, 0xf0, 0x30, 0xa1, 0x38, 0x1c, 0x79, 0x6a, 0xb7, + 0x74, 0x4d, 0x7c, 0x4e, 0x7b, 0x55, 0x92, 0x24, 0x34, 0x49, 0xa5, 0x63, 0xe9, 0x7d, 0x6d, 0x9d, + 0x10, 0x86, 0xfb, 0x98, 0x61, 0x65, 0xaf, 0x68, 0x52, 0xf5, 0xbf, 0x87, 0x53, 0xbe, 0xc6, 0x8c, + 0x28, 0x87, 0x0f, 0x09, 0x0b, 0xfb, 0x22, 0xa4, 0xf8, 0x33, 0xed, 0x2d, 0x84, 0x2f, 0x5d, 0x1b, + 0xd2, 0x21, 0x15, 0xcb, 0x2a, 0x5f, 0x49, 0xab, 0xfb, 0x43, 0x06, 0xd6, 0x03, 0x3c, 0x60, 0xdb, + 0x04, 0x27, 0xac, 0x47, 0x30, 0x43, 0x3d, 0xb0, 0x12, 0x1c, 0x0f, 0x49, 0x37, 0xea, 0x3b, 0xc6, + 0xa6, 0xb1, 0x95, 0x6d, 0x3c, 0x38, 0x3a, 0xae, 0xac, 0xcc, 0x8f, 0x2b, 0x66, 0xc0, 0xed, 0xed, + 0xd6, 0xef, 0xc7, 0x95, 0x7b, 0xc3, 0x88, 0x8d, 0x66, 0x3d, 0x2f, 0xa4, 0x93, 0xea, 0xc9, 0xb1, + 0xfa, 0xbd, 0xd3, 0x75, 0x75, 0x7a, 0x38, 0xac, 0xaa, 0x73, 0x78, 0x0a, 0x17, 0x98, 0x82, 0xb8, + 0xdd, 0x47, 0x5f, 0xc3, 0xc6, 0x20, 0xa1, 0x93, 0x6e, 0x42, 0xa6, 0xe3, 0x28, 0xc4, 0x3c, 0xd4, + 0xea, 0xa6, 0xb1, 0xb5, 0xde, 0xd8, 0x53, 0xa1, 0xd6, 0xef, 0x27, 0x74, 0x12, 0xc8, 0x5d, 0x11, + 0xf0, 0xb3, 0xcb, 0x05, 0xd4, 0xc8, 0x60, 0x7d, 0xb0, 0x40, 0xd4, 0x47, 0xcf, 0x61, 0x9d, 0xd1, + 0xc5, 0xb0, 0x19, 0x11, 0x76, 0x47, 0x85, 0x2d, 0x1c, 0xd0, 0x7f, 0x23, 0x68, 0x81, 0xd1, 0xd3, + 0x90, 0x0e, 0x64, 0x19, 0x49, 0x26, 0x4e, 0x56, 0xe4, 0x32, 0xcb, 0x23, 0x05, 0xc2, 0x82, 0x3e, + 0x82, 0x5c, 0x48, 0x27, 0x93, 0x88, 0x39, 0x6b, 0x0b, 0x7b, 0xca, 0x86, 0xca, 0x60, 0x3e, 0x9f, + 0x45, 0x24, 0x0d, 0x89, 0x93, 0xdb, 0x34, 0xb6, 0x2c, 0xb5, 0xad, 0x8d, 0xee, 0x1f, 0x19, 0x40, + 0xbc, 0x72, 0x3b, 0x24, 0x4d, 0xf1, 0x90, 0x04, 0xe4, 0xf9, 0x8c, 0xa4, 0xff, 0x4d, 0xf9, 0x76, + 0xa0, 0xb8, 0x58, 0x3e, 0x51, 0xbb, 0x42, 0xed, 0x63, 0xef, 0x54, 0xe0, 0x67, 0x72, 0xd2, 0x22, + 0x69, 0x98, 0x44, 0x53, 0x46, 0x13, 0x75, 0x8a, 0xc2, 0x42, 0x59, 0x50, 0x1b, 0xe0, 0xb4, 0x28, + 0xa2, 0x22, 0x97, 0x23, 0xcb, 0x9f, 0xa4, 0x1b, 0x55, 0xc1, 0x9c, 0xc8, 0x7c, 0x88, 0x7c, 0x17, + 0x6a, 0x1b, 0x9e, 0xec, 0x04, 0x4f, 0xa5, 0x49, 0x67, 0x51, 0x79, 0x2d, 0x66, 0x79, 0x6d, 0x49, + 0x96, 0xd1, 0x7d, 0x80, 0x91, 0x6e, 0x8d, 0xd4, 0xc9, 0x6d, 0x66, 0xb6, 0x0a, 0xb5, 0x4d, 0xef, + 0x5c, 0x27, 0x7b, 0x6f, 0xf5, 0x90, 0x22, 0x59, 0x40, 0xa2, 0x3d, 0xd8, 0x38, 0xf9, 0xea, 0x26, + 0x24, 0x9d, 0xa6, 0x8e, 0x79, 0x29, 0xb2, 0x2b, 0x27, 0xf0, 0x80, 0xa3, 0xdd, 0x1e, 0x7c, 0x70, + 0xbe, 0xfa, 0x0d, 0xcc, 0xc2, 0x11, 0x7a, 0x00, 0x56, 0x22, 0xbf, 0x53, 0xc7, 0x10, 0x41, 0x6e, + 0xfe, 0x45, 0x90, 0x33, 0x68, 0x19, 0xe9, 0x04, 0xec, 0xee, 0x83, 0xf3, 0x96, 0x57, 0x3a, 0xa5, + 0x71, 0x4a, 0x9e, 0xc4, 0x11, 0x8d, 0x91, 0x07, 0x6b, 0x62, 0x68, 0x09, 0x91, 0x15, 0x6a, 0xce, + 0x92, 0x7a, 0xf9, 0x7c, 0x3f, 0x90, 0x6e, 0x9f, 0x67, 0x8f, 0x5e, 0x57, 0x0c, 0xf7, 0xe7, 0x55, + 0x78, 0x6f, 0x09, 0xe5, 0xff, 0x5c, 0xb5, 0x0f, 0x60, 0x6d, 0xc6, 0x93, 0xaa, 0x34, 0xfb, 0xc9, + 0xbb, 0xaa, 0xb5, 0x50, 0x07, 0x45, 0x26, 0xf1, 0xee, 0xb7, 0x39, 0xd8, 0xe8, 0xc4, 0x78, 0x9a, + 0x8e, 0x28, 0xd3, 0x03, 0xa1, 0x0e, 0xb9, 0x11, 0xc1, 0x7d, 0xa2, 0x2b, 0x75, 0x7b, 0x09, 0xfb, + 0x19, 0x8c, 0xb7, 0x2d, 0x00, 0x81, 0x02, 0xa2, 0x5b, 0x60, 0x1d, 0xbe, 0xe8, 0xf6, 0xb8, 0xb8, + 0x44, 0xd6, 0x8a, 0x8d, 0x02, 0xaf, 0xcc, 0xa3, 0xa7, 0x42, 0x6f, 0x81, 0x79, 0xf8, 0x42, 0x0a, + 0xaf, 0x02, 0x85, 0x31, 0x1d, 0x76, 0x49, 0xcc, 0x92, 0x88, 0xa4, 0x4e, 0x66, 0x33, 0xb3, 0x55, + 0x0c, 0x60, 0x4c, 0x87, 0xbe, 0xb4, 0xa0, 0x12, 0xac, 0x0d, 0xa2, 0x18, 0x8f, 0xc5, 0x41, 0x75, + 0xaf, 0x49, 0x13, 0xba, 0x0d, 0xf9, 0x34, 0x65, 0xdd, 0x70, 0x34, 0x8b, 0x0f, 0x45, 0x2f, 0x16, + 0x1b, 0xc5, 0xf9, 0x71, 0xc5, 0xea, 0x74, 0x0e, 0x9a, 0xdc, 0x16, 0x58, 0x69, 0xca, 0xc4, 0x0a, + 0x7d, 0x2a, 0x5d, 0x25, 0x95, 0x1c, 0x8e, 0xb6, 0x92, 0x0b, 0x77, 0xbf, 0xcf, 0xed, 0xc2, 0x5d, + 0xac, 0x4a, 0xaf, 0x33, 0x90, 0x93, 0x27, 0x42, 0xcf, 0xe0, 0x1a, 0x9f, 0x07, 0x5d, 0xd5, 0xfe, + 0x5d, 0x25, 0x75, 0xa5, 0x85, 0x4b, 0xb5, 0x09, 0x4a, 0xce, 0x0f, 0xdf, 0x1b, 0x00, 0x52, 0xc6, + 0x69, 0xf4, 0x8a, 0x08, 0x4d, 0x64, 0x74, 0xb5, 0x85, 0xbd, 0x13, 0xbd, 0x22, 0xe8, 0x26, 0x14, + 0x42, 0x1c, 0x77, 0xfb, 0x24, 0x1c, 0x47, 0x31, 0x79, 0x2b, 0x15, 0x10, 0xe2, 0xb8, 0x25, 0xed, + 0x5c, 0x14, 0xe2, 0x6e, 0x17, 0xb9, 0x58, 0x2e, 0x8a, 0x85, 0x77, 0x80, 0x96, 0x59, 0x87, 0x43, + 0x74, 0x62, 0x05, 0x1e, 0xed, 0x80, 0x35, 0x4d, 0x22, 0x9a, 0x44, 0xec, 0xa5, 0x48, 0xd6, 0x95, + 0xa5, 0x5c, 0x67, 0x25, 0xb0, 0xaf, 0x20, 0x7a, 0x28, 0x68, 0x0a, 0x4e, 0x97, 0xb2, 0x04, 0x33, + 0x32, 0x7c, 0xe9, 0x98, 0x17, 0xa6, 0xeb, 0x28, 0x88, 0xa6, 0xd3, 0x14, 0x0f, 0xb3, 0x96, 0x61, + 0xaf, 0xba, 0xf7, 0xc0, 0xd2, 0x01, 0x51, 0x01, 0xcc, 0x27, 0xbb, 0x8f, 0x76, 0xf7, 0xbe, 0xd8, + 0xb5, 0x57, 0x50, 0x11, 0xac, 0xc0, 0x6f, 0xee, 0x3d, 0xf5, 0x83, 0x2f, 0x6d, 0x03, 0xad, 0x43, + 0x3e, 0xf0, 0x1b, 0xf5, 0xc7, 0xf5, 0xdd, 0xa6, 0x6f, 0xaf, 0xba, 0xd7, 0xc1, 0xd2, 0xbc, 0xdc, + 0xf1, 0xd1, 0xd3, 0x6e, 0xa3, 0x7e, 0xd0, 0xdc, 0xb6, 0x57, 0x90, 0x09, 0x99, 0x4e, 0xe7, 0xc0, + 0x36, 0xdc, 0x1f, 0x0d, 0xb0, 0x4f, 0x7f, 0x8b, 0x9a, 0x36, 0xdb, 0x90, 0xe3, 0xa9, 0x99, 0xa5, + 0xa2, 0x25, 0xae, 0xd4, 0xee, 0xfc, 0xed, 0x01, 0x24, 0xc8, 0xeb, 0x08, 0x84, 0xbe, 0xa4, 0x25, + 0x9e, 0x5f, 0x1f, 0xfa, 0xbe, 0xe1, 0x12, 0xca, 0x9f, 0xb9, 0x5e, 0xdc, 0x36, 0xe4, 0x24, 0xee, + 0xdc, 0xa9, 0xea, 0xcd, 0xa6, 0xbf, 0x7f, 0xe0, 0xb7, 0x6c, 0x83, 0x6f, 0xd5, 0xf7, 0xf7, 0x1f, + 0xb7, 0xfd, 0x96, 0xbd, 0x8a, 0xf2, 0xb0, 0xe6, 0x07, 0xc1, 0x5e, 0x60, 0x67, 0xb8, 0x57, 0xcb, + 0x6f, 0x3e, 0x6e, 0xef, 0xfa, 0x2d, 0x3b, 0xfb, 0x30, 0x6b, 0x65, 0xec, 0xac, 0xfb, 0xbd, 0x01, + 0x57, 0x9b, 0x34, 0x1e, 0x34, 0x47, 0x5c, 0x4f, 0x4d, 0x1a, 0x33, 0xf2, 0x0d, 0x43, 0x77, 0x01, + 0xf8, 0xab, 0x01, 0xc7, 0x7d, 0x3d, 0x40, 0xf3, 0x8d, 0xab, 0xaa, 0x23, 0xf2, 0x4d, 0xb9, 0xd3, + 0x6e, 0x05, 0x79, 0xe5, 0x24, 0x5e, 0x25, 0xe6, 0x14, 0xbf, 0x1c, 0x53, 0x2c, 0x5f, 0x5e, 0xc5, + 0x40, 0x7f, 0xa2, 0x16, 0x98, 0xff, 0x7c, 0xa8, 0x69, 0x68, 0xed, 0x8d, 0x01, 0xf9, 0x9d, 0xd9, + 0x98, 0x45, 0xbc, 0x7f, 0xd0, 0x18, 0xec, 0x85, 0x3e, 0x92, 0xc3, 0xe2, 0xce, 0xc5, 0x9a, 0x8d, + 0xfb, 0x96, 0x6e, 0x5d, 0x6c, 0x22, 0xba, 0x2b, 0x5b, 0xc6, 0x5d, 0x03, 0x3d, 0x83, 0x22, 0xdf, + 0xd4, 0x15, 0x44, 0xee, 0xbb, 0xf5, 0x59, 0xba, 0x71, 0x01, 0x09, 0x48, 0xfa, 0xc6, 0xf5, 0xa3, + 0x5f, 0xcb, 0x2b, 0x47, 0xf3, 0xb2, 0xf1, 0xd3, 0xbc, 0x6c, 0xbc, 0x99, 0x97, 0x8d, 0x5f, 0xe6, + 0x65, 0xe3, 0xbb, 0xdf, 0xca, 0x2b, 0x5f, 0x99, 0x0a, 0xf9, 0x67, 0x00, 0x00, 0x00, 0xff, 0xff, + 0x1a, 0xf3, 0x8f, 0x6d, 0xff, 0x0b, 0x00, 0x00, } diff --git a/pkg/storage/raft.proto b/pkg/storage/raft.proto index 33b56e8e1514..04d57544c1df 100644 --- a/pkg/storage/raft.proto +++ b/pkg/storage/raft.proto @@ -117,6 +117,10 @@ message SnapshotRequest { // combined into a large RocksDB WriteBatch that is atomically // applied. KV_BATCH = 0; + // SST snapshots stream sst files consisting of all keys in a range + // from the sender to the receiver. These sst files are then atomically + // ingested directly into RocksDB. + SST = 1; } message Header { @@ -149,14 +153,26 @@ message SnapshotRequest { optional Header header = 1; - // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple request messages. + // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple + // request messages. Only used by KV_BATCH snapshots. optional bytes kv_batch = 2 [(gogoproto.customname) = "KVBatch"]; // These are really raftpb.Entry, but we model them as raw bytes to avoid - // roundtripping through memory. They are separate from the kv_batch to - // allow flexibility in log implementations. + // roundtripping through memory. They are separate from the kv_batch to allow + // flexibility in log implementations. Used by KV_BATCH and SST snapshots. repeated bytes log_entries = 3; + // A chunk of an SST file. Multiple SST chunks may be sent across multiple + // request messages. These chunks will combine to form one or more complete + // SST files, delimited by intermittent sst_final flags. Only used by SST + // snapshots. + optional bytes sst_chunk = 5 [(gogoproto.customname) = "SSTChunk"]; + + // If set, the SST chunk is the last chunk in the current SST file. Only used + // by SST snapshots. + optional bool sst_final = 6 [(gogoproto.customname) = "SSTFinal", (gogoproto.nullable) = false]; + + // Indicates that this is the last request in the snapshot stream. optional bool final = 4 [(gogoproto.nullable) = false]; } diff --git a/pkg/storage/rditer/replica_data_iter.go b/pkg/storage/rditer/replica_data_iter.go index c149a5855dae..25169eea41dd 100644 --- a/pkg/storage/rditer/replica_data_iter.go +++ b/pkg/storage/rditer/replica_data_iter.go @@ -142,11 +142,33 @@ func (ri *ReplicaDataIterator) advance() { } } +// KeyRanges returns all key ranges that the iterator will iterate over. +func (ri *ReplicaDataIterator) KeyRanges() []KeyRange { + return ri.ranges +} + +// Index returns the index of the key range that the iterator points to. +func (ri *ReplicaDataIterator) Index() int { + return ri.curIndex +} + // Valid returns true if the iterator currently points to a valid value. func (ri *ReplicaDataIterator) Valid() (bool, error) { return ri.it.Valid() } +// UnsafeKey returns the current key, but the memory is invalidated on the next +// call to {NextKey,Seek}. +func (ri *ReplicaDataIterator) UnsafeKey() engine.MVCCKey { + return ri.it.UnsafeKey() +} + +// UnsafeValue returns the same value as a byte slice, but the memory is +// invalidated on the next call to {Next,Reset,Close}. +func (ri *ReplicaDataIterator) UnsafeValue() []byte { + return ri.it.UnsafeValue() +} + // Key returns the current key. func (ri *ReplicaDataIterator) Key() engine.MVCCKey { key := ri.it.UnsafeKey() diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 921f0d93d90e..5f9c785ee69a 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -1875,6 +1875,18 @@ func (r *Replica) sendSnapshot( } } + // TODO we should introduce a heuristic here for which snapshot strategy + // to use. An easy heuristic would be to use SST snapshots only when a + // range is over a certain size. That is what we have right now. The 8MB + // number was tuned based on the how much it affected the speed of tests. + // Right now this provides a ~10% speedup when running ./pkg/storage tests. + // We should actually tune this for real. + // TODO we need to stick SST snapshots behind a version check. + strategy := SnapshotRequest_KV_BATCH + if r.GetMVCCStats().LiveBytes > (8 << 20) /* 8MB */ { + strategy = SnapshotRequest_SST + } + status := r.RaftStatus() if status == nil { return errors.New("raft status not initialized") @@ -1898,7 +1910,7 @@ func (r *Replica) sendSnapshot( // Recipients can choose to decline preemptive snapshots. CanDecline: snapType == snapTypePreemptive, Priority: priority, - Strategy: SnapshotRequest_KV_BATCH, + Strategy: strategy, } sent := func() { r.store.metrics.RangeSnapshotsGenerated.Inc(1) diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 6492e639c5b7..79ea47ecb15a 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -638,7 +638,7 @@ func clearRangeData( desc *roachpb.RangeDescriptor, keyCount int64, eng engine.Engine, - batch engine.Batch, + writer engine.Writer, ) error { iter := eng.NewIterator(engine.IterOptions{}) defer iter.Close() @@ -657,9 +657,9 @@ func clearRangeData( // above), but the data range's key count needs to be explicitly checked. var err error if i >= metadataRanges && keyCount >= clearRangeMinKeys { - err = batch.ClearRange(keyRange.Start, keyRange.End) + err = writer.ClearRange(keyRange.Start, keyRange.End) } else { - err = batch.ClearIterRange(iter, keyRange.Start, keyRange.End) + err = writer.ClearIterRange(iter, keyRange.Start, keyRange.End) } if err != nil { return err @@ -868,6 +868,111 @@ func (r *Replica) applySnapshot( return err } stats.commit = timeutil.Now() + case *sstSnapshotStrategy: + // TODO DURING REVIEW: do we need to worry about RaftTombstoneIncorrectLegacyKey here? + // TODO track stats of ingestion and log them. + + unreplicatedSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + defer unreplicatedSST.Close() + + // Clear out all existing unreplicated keys in the range. + unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID) + unreplicatedStart := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey) + unreplicatedEnd := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey.PrefixEnd()) + err = unreplicatedSST.ClearRange(unreplicatedStart, unreplicatedEnd) + if err != nil { + return errors.Wrapf(err, "unable to write ClearRange to unreplicated SST writer") + } + raftLogSize = 0 + + //////// + + // Note that since this snapshot comes from Raft, we don't have to synthesize + // the HardState -- Raft wouldn't ask us to update the HardState in incorrect + // ways. + hsKey := r.raftMu.stateLoader.RaftHardStateKey() + var hsValue roachpb.Value + if err := hsValue.SetProto(&hs); err != nil { + return err + } + hsValue.InitChecksum(hsKey) + err = engine.MVCCBlindPut(ctx, &unreplicatedSST, nil, hsKey, hlc.Timestamp{}, hsValue, nil) + if err != nil { + return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + } + + /////// + + if len(ss.LogEntries) > 0 { + logEntries := make([]raftpb.Entry, len(ss.LogEntries)) + for i, bytes := range ss.LogEntries { + if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { + return err + } + } + // If this replica doesn't know its ReplicaID yet, we're applying a + // preemptive snapshot. In this case, we're going to have to write the + // sideloaded proposals into the Raft log. Otherwise, sideload. + thinEntries := logEntries + if replicaID != 0 { + var err error + var sideloadedEntriesSize int64 + thinEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries) + if err != nil { + return err + } + raftLogSize += sideloadedEntriesSize + } + var logDiff enginepb.MVCCStats + var logValue roachpb.Value + for i := range thinEntries { + ent := &thinEntries[i] + entKey := r.raftMu.stateLoader.RaftLogKey(ent.Index) + + if err := logValue.SetProto(ent); err != nil { + return err + } + logValue.InitChecksum(entKey) + err = engine.MVCCBlindPut(ctx, &unreplicatedSST, &logDiff, entKey, hlc.Timestamp{}, + logValue, nil /* txn */) + if err != nil { + return errors.Wrapf(err, "unable to write log entry to unreplicated SST writer") + } + } + raftLogSize += logDiff.SysBytes + lastTerm = thinEntries[len(thinEntries)-1].Term + } else { + lastTerm = invalidLastTerm + } + + // Create a new file to write the SST into. + unreplicatedSSTFile, err := ss.createEmptySSTFile(r.RangeID) + if err != nil { + return errors.Wrapf(err, "unable to open empty SST file") + } + data, err := unreplicatedSST.Finish() + if err != nil { + unreplicatedSSTFile.Close() + return errors.Wrapf(err, "unable to Finish sst file writer") + } + _, err = unreplicatedSSTFile.Write(data) + unreplicatedSSTFile.Close() + if err != nil { + return errors.Wrapf(err, "unable to write SST data to file") + } + + // Ingest all SSTs atomically. + if err := r.store.engine.IngestExternalFiles(ctx, ss.SSTs, true /* modify */); err != nil { + return errors.Wrapf(err, "while ingesting %s", ss.SSTs) + } + + // TODO should we force a compaction? Only in the global keyspace? The + // SSTs will probably be living in L0 after the ingestion, which isn't + // great, especially because they each contain a large RangeDeletion + // tombstone. default: log.Fatalf(ctx, "unknown snapshot strategy: %v", ss) } diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 60b39d14f715..f94fb311a51b 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -18,7 +18,10 @@ import ( "context" "fmt" "io" + "io/ioutil" "math" + "os" + "path/filepath" "github.com/coreos/etcd/raft/raftpb" "github.com/pkg/errors" @@ -85,6 +88,10 @@ type snapshotStrategy interface { // Status provides a status report on the work performed during the // snapshot. Only valid if the strategy succeeded. Status() string + + // Close cleans up any resources associated with the operation of the + // snapshot strategy. + Close() } func assertStrategy( @@ -121,11 +128,11 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( for { req, err := stream.Recv() if err != nil { - return IncomingSnapshot{}, err + return noSnap, err } if req.Header != nil { err := errors.New("client error: provided a header mid-stream") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(stream, err) } if req.KVBatch != nil { @@ -134,11 +141,19 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( if req.LogEntries != nil { kvSS.LogEntries = append(kvSS.LogEntries, req.LogEntries...) } + if req.SSTChunk != nil { + err := errors.New("client error: provided SST chunks for KV_BATCH strategy") + return noSnap, sendSnapshotError(stream, err) + } + if req.SSTFinal { + err := errors.New("client error: provided SST final flag for KV_BATCH strategy") + return noSnap, sendSnapshotError(stream, err) + } if req.Final { snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "invalid snapshot") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(stream, err) } inSnap := IncomingSnapshot{ @@ -188,10 +203,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } if int64(len(b.Repr())) >= kvSS.batchSize { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } b = nil @@ -202,10 +214,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } } if b != nil { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } } @@ -291,8 +300,11 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } func (kvSS *kvBatchSnapshotStrategy) sendBatch( - stream outgoingSnapshotStream, batch engine.Batch, + ctx context.Context, stream outgoingSnapshotStream, batch engine.Batch, ) error { + if err := kvSS.limiter.WaitN(ctx, 1); err != nil { + return err + } repr := batch.Repr() batch.Close() return stream.Send(&SnapshotRequest{KVBatch: repr}) @@ -301,6 +313,372 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( // Status implements the snapshotStrategy interface. func (kvSS *kvBatchSnapshotStrategy) Status() string { return kvSS.status } +// Close implements the snapshotStrategy interface. +func (kvSS *kvBatchSnapshotStrategy) Close() {} + +// sstSnapshotStrategy is an implementation of snapshotStrategy that streams +// sst files. +type sstSnapshotStrategy struct { + status string + + // Fields used when receiving snapshots. + auxDir string + sstDir string + SSTs []string // Paths of SSTs to ingest atomically + LogEntries [][]byte // Raft log entries + + // Fields used when sending snapshots. + batchSize int64 + limiter *rate.Limiter +} + +// Send implements the snapshotStrategy interface. +func (sstSS *sstSnapshotStrategy) Receive( + ctx context.Context, stream incomingSnapshotStream, header SnapshotRequest_Header, +) (IncomingSnapshot, error) { + assertStrategy(ctx, header, SnapshotRequest_SST) + + var curSST *os.File + defer func() { + // Clean up open sst file, if necessary. + if curSST != nil { + if err := curSST.Close(); err != nil { + log.Warningf(ctx, "error closing snapshot sst file: %v", err) + } + } + }() + + sstSS.SSTs = nil + sstSS.LogEntries = nil + for { + req, err := stream.Recv() + if err != nil { + return noSnap, err + } + if req.Header != nil { + err := errors.New("client error: provided a header mid-stream") + return noSnap, sendSnapshotError(stream, err) + } + + if req.SSTChunk != nil { + if curSST == nil { + // Create new sst file. + curSST, err = sstSS.createEmptySSTFile(header.State.Desc.RangeID) + if err != nil { + err = errors.Wrap(err, "error creating file for snapshot SSTs") + return noSnap, sendSnapshotError(stream, err) + } + } + + if _, err := curSST.Write(req.SSTChunk); err != nil { + err = errors.Wrapf(err, "error writing to %s", curSST.Name()) + return noSnap, sendSnapshotError(stream, err) + } + } + if req.SSTFinal { + if curSST == nil { + err := errors.New("client error: unexpected SSTFinal flag") + return noSnap, sendSnapshotError(stream, err) + } + + err := curSST.Close() + curSST = nil + if err != nil { + err = errors.Wrapf(err, "error closing snapshot sst") + return noSnap, sendSnapshotError(stream, err) + } + } + if req.LogEntries != nil { + sstSS.LogEntries = append(sstSS.LogEntries, req.LogEntries...) + } + if req.KVBatch != nil { + err := errors.New("client error: provided KV batch for SST strategy") + return noSnap, sendSnapshotError(stream, err) + } + if req.Final { + if curSST != nil { + err := errors.New("client error: SSTFinal flag never provided") + return noSnap, sendSnapshotError(stream, err) + } + + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) + if err != nil { + err = errors.Wrap(err, "invalid snapshot") + return noSnap, sendSnapshotError(stream, err) + } + + inSnap := IncomingSnapshot{ + SnapUUID: snapUUID, + Receiver: sstSS, + State: &header.State, + snapType: snapTypeRaft, + } + if header.RaftMessageRequest.ToReplica.ReplicaID == 0 { + inSnap.snapType = snapTypePreemptive + } + sstSS.status = fmt.Sprintf("sst files: %d, log entries: %d", + len(sstSS.SSTs), len(sstSS.LogEntries)) + return inSnap, nil + } + } +} + +func (sstSS *sstSnapshotStrategy) createDirForSSTFiles(rangeID roachpb.RangeID) error { + sstAuxDir := filepath.Join(sstSS.auxDir, "snapshotsst") + if err := os.MkdirAll(sstAuxDir, 0755); err != nil { + return err + } + dir, err := ioutil.TempDir(sstSS.auxDir, fmt.Sprintf("r%d_", rangeID)) + if err != nil { + return err + } + sstSS.sstDir = dir + return nil +} + +func (sstSS *sstSnapshotStrategy) createEmptySSTFile(rangeID roachpb.RangeID) (*os.File, error) { + // Create a directory to store sst files, if necessary. + if sstSS.sstDir == "" { + if err := sstSS.createDirForSSTFiles(rangeID); err != nil { + return nil, err + } + } + + // Create a new file with the next available index. + sstIndex := len(sstSS.SSTs) + filename := filepath.Join(sstSS.sstDir, fmt.Sprintf("%d.sst", sstIndex)) + file, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return nil, err + } + + sstSS.SSTs = append(sstSS.SSTs, file.Name()) + return file, nil +} + +// Send implements the snapshotStrategy interface. +func (sstSS *sstSnapshotStrategy) Send( + ctx context.Context, + stream outgoingSnapshotStream, + header SnapshotRequest_Header, + snap *OutgoingSnapshot, +) error { + assertStrategy(ctx, header, SnapshotRequest_SST) + + keyRanges := snap.Iter.KeyRanges() + curRange := 0 + + sst, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + defer func() { + // Idemptotent - ok to call even if already called. + sst.Close() + }() + + // finishAndSend finishes the current sst and sends it out. It then + // increments the curRange counter. + finishAndSend := func() error { + // We add a DeleteRange tombstone to each file to delete any + // existing data in the desired bounds. This also has the effect of + // expanding the bounds of the sst to exactly what we want. + r := keyRanges[curRange] + if err := sst.ClearRange(r.Start, r.End); err != nil { + return err + } + chunk, err := sst.Finish() + if err != nil { + return err + } + + if err := sstSS.sendSSTData(ctx, stream, chunk, true /* final */); err != nil { + return err + } + + sst.Close() + curRange++ + return nil + } + + var lastSizeCheck int64 + for iter := snap.Iter; ; iter.Next() { + valid, err := iter.Valid() + if err != nil { + return err + } + if !valid || curRange < iter.Index() { + if err := finishAndSend(); err != nil { + return err + } + + // Send any empty ranges that the iterator may have skipped. + sendTo := iter.Index() + if !valid { + sendTo = len(keyRanges) + } + for curRange < sendTo { + if sst, err = engine.MakeRocksDBSstFileWriter(); err != nil { + return err + } + if err := finishAndSend(); err != nil { + return err + } + } + + if !valid { + break + } + if sst, err = engine.MakeRocksDBSstFileWriter(); err != nil { + return err + } + } + + if err := sst.Put(iter.UnsafeKey(), iter.UnsafeValue()); err != nil { + return err + } + + newDataSize := sst.DataSize - lastSizeCheck + if newDataSize >= sstSS.batchSize { + lastSizeCheck = sst.DataSize + + chunk, err := sst.Truncate() + if err != nil { + return err + } + if err := sstSS.sendSSTData(ctx, stream, chunk, false /* final */); err != nil { + return err + } + } + } + + // Iterate over the specified range of Raft entries and send them all out + // together. The receiver with then create an SST our of them and ingest it + // with the other SSTs that we've already sent. We could create an SST using + // these entries and send that instead, but there are a few reasons why that + // approach is difficult: + // - we would need to handle sideloaded entries separately. This would be a + // good change in the long run, but for now it doesn't get us anything. + // - the receiver will have a HardState key that it needs to include in an + // SST. We don't know what that HardState is in advance because it is + // handled inside of Raft. + // - the receiver wants to track some stats about the log entries, like + // lastTerm and raftLogSize. Putting them into an SST on the sender side + // makes this more difficult. + // + // WIP eliminate code duplication. + firstIndex := header.State.TruncatedState.Index + 1 + endIndex := snap.RaftSnap.Metadata.Index + 1 + logEntries := make([][]byte, 0, endIndex-firstIndex) + + scanFunc := func(kv roachpb.KeyValue) (bool, error) { + bytes, err := kv.Value.GetBytes() + if err == nil { + logEntries = append(logEntries, bytes) + } + return false, err + } + + rangeID := header.State.Desc.RangeID + + if err := iterateEntries(ctx, snap.EngineSnap, rangeID, firstIndex, endIndex, scanFunc); err != nil { + return err + } + + // Inline the payloads for all sideloaded proposals. + // + // TODO(tschottdorf): could also send slim proposals and attach sideloaded + // SSTables directly to the snapshot. Probably the better long-term + // solution, but let's see if it ever becomes relevant. Snapshots with + // inlined proposals are hopefully the exception. + { + var ent raftpb.Entry + for i := range logEntries { + if err := protoutil.Unmarshal(logEntries[i], &ent); err != nil { + return err + } + if !sniffSideloadedRaftCommand(ent.Data) { + continue + } + if err := snap.WithSideloaded(func(ss sideloadStorage) error { + newEnt, err := maybeInlineSideloadedRaftCommand( + ctx, rangeID, ent, ss, snap.RaftEntryCache, + ) + if err != nil { + return err + } + if newEnt != nil { + ent = *newEnt + } + return nil + }); err != nil { + if errors.Cause(err) == errSideloadedFileNotFound { + // We're creating the Raft snapshot based on a snapshot of + // the engine, but the Raft log may since have been + // truncated and corresponding on-disk sideloaded payloads + // unlinked. Luckily, we can just abort this snapshot; the + // caller can retry. + // + // TODO(tschottdorf): check how callers handle this. They + // should simply retry. In some scenarios, perhaps this can + // happen repeatedly and prevent a snapshot; not sending the + // log entries wouldn't help, though, and so we'd really + // need to make sure the entries are always here, for + // instance by pre-loading them into memory. Or we can make + // log truncation less aggressive about removing sideloaded + // files, by delaying trailing file deletion for a bit. + return &errMustRetrySnapshotDueToTruncation{ + index: ent.Index, + term: ent.Term, + } + } + return err + } + // TODO(tschottdorf): it should be possible to reuse `logEntries[i]` here. + var err error + if logEntries[i], err = protoutil.Marshal(&ent); err != nil { + return err + } + } + } + sstSS.status = fmt.Sprintf("sst files: %d, log entries: %d", len(keyRanges), len(logEntries)) + return stream.Send(&SnapshotRequest{LogEntries: logEntries}) +} + +func (sstSS *sstSnapshotStrategy) sendSSTData( + ctx context.Context, stream outgoingSnapshotStream, data []byte, final bool, +) error { + for len(data) > 0 { + var chunk []byte + if int64(len(data)) < 2*sstSS.batchSize { + chunk, data = data, nil + } else { + chunk, data = data[:int(sstSS.batchSize)], data[int(sstSS.batchSize):] + } + + if err := sstSS.limiter.WaitN(ctx, 1); err != nil { + return err + } + if err := stream.Send(&SnapshotRequest{ + SSTChunk: chunk, + SSTFinal: final, + }); err != nil { + return err + } + } + return nil +} + +// Status implements the snapshotStrategy interface. +func (sstSS *sstSnapshotStrategy) Status() string { return sstSS.status } + +// Close implements the snapshotStrategy interface. +func (sstSS *sstSnapshotStrategy) Close() { + if sstSS.sstDir != "" { + _ = os.RemoveAll(sstSS.sstDir) + } +} + // reserveSnapshot throttles incoming snapshots. The returned closure is used // to cleanup the reservation and release its resources. A nil cleanup function // and a non-empty rejectionMessage indicates the reservation was declined. @@ -447,12 +825,17 @@ func (s *Store) receiveSnapshot( switch header.Strategy { case SnapshotRequest_KV_BATCH: ss = &kvBatchSnapshotStrategy{} + case SnapshotRequest_SST: + ss = &sstSnapshotStrategy{ + auxDir: s.engine.GetAuxiliaryDir(), + } default: return sendSnapshotError(stream, errors.Errorf("%s,r%d: unknown snapshot strategy: %s", s, header.State.Desc.RangeID, header.Strategy), ) } + defer ss.Close() if err := stream.Send(&SnapshotResponse{Status: SnapshotResponse_ACCEPTED}); err != nil { return err @@ -591,11 +974,18 @@ func sendSnapshot( limiter: limiter, newBatch: newBatch, } + case SnapshotRequest_SST: + ss = &sstSnapshotStrategy{ + batchSize: batchSize, + limiter: limiter, + } default: log.Fatalf(ctx, "unknown snapshot strategy: %s", header.Strategy) } + defer ss.Close() if err := ss.Send(ctx, stream, header, snap); err != nil { + log.Errorf(ctx, "saw error when streaming snapshot %v", err) return err }