diff --git a/pkg/ccl/storageccl/engineccl/mvcc.go b/pkg/ccl/storageccl/engineccl/mvcc.go index cc9eb8b5cd14..680b6a568ca8 100644 --- a/pkg/ccl/storageccl/engineccl/mvcc.go +++ b/pkg/ccl/storageccl/engineccl/mvcc.go @@ -192,8 +192,8 @@ func (i *MVCCIncrementalIterator) UnsafeKey() engine.MVCCKey { return i.iter.UnsafeKey() } -// UnsafeValue returns the same value as Value, but the memory is invalidated on -// the next call to {Next,Reset,Close}. +// UnsafeValue returns the same value as a byte slice, but the memory is +// invalidated on the next call to {Next,Reset,Close}. func (i *MVCCIncrementalIterator) UnsafeValue() []byte { return i.iter.UnsafeValue() } diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 1f2ee2fe3de5..be270361e346 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -1037,6 +1037,7 @@ func (m *multiTestContext) changeReplicas( // is lost. We could make a this into a roachpb.Error but it seems overkill // for this one usage. if testutils.IsError(err, "snapshot failed: .*") { + m.t.Logf("snapshot failed with error: %v", err) continue } return 0, err diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index 0d5e9756aad6..e035d8399ab7 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -243,8 +243,6 @@ type Engine interface { GetStats() (*Stats, error) // GetAuxiliaryDir returns a path under which files can be stored // persistently, and from which data can be ingested by the engine. - // - // Not thread safe. GetAuxiliaryDir() string // NewBatch returns a new instance of a batched engine which wraps // this engine. Batched engines accumulate all mutations and apply diff --git a/pkg/storage/engine/in_mem.go b/pkg/storage/engine/in_mem.go index e86084b3f3e4..1394a05c7820 100644 --- a/pkg/storage/engine/in_mem.go +++ b/pkg/storage/engine/in_mem.go @@ -14,13 +14,43 @@ package engine -import "github.com/cockroachdb/cockroach/pkg/roachpb" +import ( + "context" + "io/ioutil" + "os" + + "github.com/cockroachdb/cockroach/pkg/roachpb" +) // InMem wraps RocksDB and configures it for in-memory only storage. type InMem struct { *RocksDB } +// IngestExternalFiles for an in-memory RocksDB first loads each file into +// memory, then ingests them (again, in memory). This implementation is provided +// solely to make tests work. +func (db InMem) IngestExternalFiles( + ctx context.Context, paths []string, allowFileModifications bool, +) error { + for _, file := range paths { + data, err := ioutil.ReadFile(file) + if err != nil { + if os.IsNotExist(err) { + // The file may already be in the correct in-memory env. Ignore + // the error, it will be caught by IngestExternalFiles if the + // file truly is missing. + continue + } + return err + } + if err := db.RocksDB.WriteFile(file, data); err != nil { + return err + } + } + return db.RocksDB.IngestExternalFiles(ctx, paths, allowFileModifications) +} + // NewInMem allocates and returns a new, opened InMem engine. // The caller must call the engine's Close method when the engine is no longer // needed. diff --git a/pkg/storage/raft.pb.go b/pkg/storage/raft.pb.go index e0417325a451..fab6af3f58b8 100644 --- a/pkg/storage/raft.pb.go +++ b/pkg/storage/raft.pb.go @@ -74,13 +74,19 @@ const ( // combined into a large RocksDB WriteBatch that is atomically // applied. SnapshotRequest_KV_BATCH SnapshotRequest_Strategy = 0 + // SST snapshots stream sst files consisting of all keys in a range + // from the sender to the receiver. These sst files are then atomically + // ingested directly into RocksDB. + SnapshotRequest_SST SnapshotRequest_Strategy = 1 ) var SnapshotRequest_Strategy_name = map[int32]string{ 0: "KV_BATCH", + 1: "SST", } var SnapshotRequest_Strategy_value = map[string]int32{ "KV_BATCH": 0, + "SST": 1, } func (x SnapshotRequest_Strategy) Enum() *SnapshotRequest_Strategy { @@ -235,13 +241,23 @@ func (*RaftMessageResponse) Descriptor() ([]byte, []int) { return fileDescriptor // SnapshotRequest is the request used to send streaming snapshot requests. type SnapshotRequest struct { Header *SnapshotRequest_Header `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` - // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple request messages. + // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple + // request messages. Only used by KV_BATCH snapshots. KVBatch []byte `protobuf:"bytes,2,opt,name=kv_batch,json=kvBatch" json:"kv_batch,omitempty"` // These are really raftpb.Entry, but we model them as raw bytes to avoid - // roundtripping through memory. They are separate from the kv_batch to - // allow flexibility in log implementations. + // roundtripping through memory. They are separate from the kv_batch to allow + // flexibility in log implementations. Used by KV_BATCH and SST snapshots. LogEntries [][]byte `protobuf:"bytes,3,rep,name=log_entries,json=logEntries" json:"log_entries,omitempty"` - Final bool `protobuf:"varint,4,opt,name=final" json:"final"` + // A chunk of an SST file. Multiple SST chunks may be sent across multiple + // request messages. These chunks will combine to form one or more complete + // SST files, delimited by intermittent sst_final flags. Only used by SST + // snapshots. + SSTChunk []byte `protobuf:"bytes,5,opt,name=sst_chunk,json=sstChunk" json:"sst_chunk,omitempty"` + // If set, the SST chunk is the last chunk in the current SST file. Only used + // by SST snapshots. + SSTFinal bool `protobuf:"varint,6,opt,name=sst_final,json=sstFinal" json:"sst_final"` + // Indicates that this is the last request in the snapshot stream. + Final bool `protobuf:"varint,4,opt,name=final" json:"final"` } func (m *SnapshotRequest) Reset() { *m = SnapshotRequest{} } @@ -752,6 +768,20 @@ func (m *SnapshotRequest) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0 } i++ + if m.SSTChunk != nil { + dAtA[i] = 0x2a + i++ + i = encodeVarintRaft(dAtA, i, uint64(len(m.SSTChunk))) + i += copy(dAtA[i:], m.SSTChunk) + } + dAtA[i] = 0x30 + i++ + if m.SSTFinal { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ return i, nil } @@ -967,6 +997,11 @@ func (m *SnapshotRequest) Size() (n int) { } } n += 2 + if m.SSTChunk != nil { + l = len(m.SSTChunk) + n += 1 + l + sovRaft(uint64(l)) + } + n += 2 return n } @@ -1907,6 +1942,57 @@ func (m *SnapshotRequest) Unmarshal(dAtA []byte) error { } } m.Final = bool(v != 0) + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SSTChunk", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRaft + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SSTChunk = append(m.SSTChunk[:0], dAtA[iNdEx:postIndex]...) + if m.SSTChunk == nil { + m.SSTChunk = []byte{} + } + iNdEx = postIndex + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SSTFinal", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.SSTFinal = bool(v != 0) default: iNdEx = preIndex skippy, err := skipRaft(dAtA[iNdEx:]) @@ -2461,74 +2547,78 @@ var ( func init() { proto.RegisterFile("storage/raft.proto", fileDescriptorRaft) } var fileDescriptorRaft = []byte{ - // 1104 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xcd, 0x6e, 0xdb, 0x46, - 0x17, 0x15, 0xad, 0xff, 0x2b, 0x29, 0x66, 0xe6, 0x33, 0xbe, 0x12, 0x6a, 0x21, 0xa9, 0x4c, 0x13, - 0xa8, 0x29, 0x40, 0x05, 0x42, 0xd0, 0x45, 0x77, 0xfa, 0x61, 0x6c, 0xc5, 0xbf, 0xa0, 0x1d, 0x17, - 0x2d, 0x10, 0x08, 0x23, 0x6a, 0x24, 0x11, 0x96, 0x38, 0x32, 0x39, 0x4a, 0xeb, 0x3c, 0x45, 0x1f, - 0x21, 0x9b, 0xee, 0xba, 0xef, 0x2b, 0x78, 0x53, 0xa0, 0xcb, 0x2c, 0x0a, 0xa3, 0x75, 0xdf, 0xa2, - 0xe8, 0xa2, 0x98, 0xe1, 0x8c, 0x4d, 0xdb, 0x6a, 0x63, 0x17, 0x45, 0x37, 0xdd, 0xd8, 0xe4, 0x9d, - 0x39, 0xe7, 0x70, 0xee, 0xb9, 0xf7, 0x8e, 0x00, 0x85, 0x8c, 0x06, 0x78, 0x4c, 0x1a, 0x01, 0x1e, - 0x31, 0x6b, 0x1e, 0x50, 0x46, 0xd1, 0x7d, 0x97, 0xba, 0x47, 0x01, 0xc5, 0xee, 0xc4, 0x92, 0xab, - 0xe5, 0x35, 0xf1, 0x3a, 0x1f, 0x34, 0x48, 0x10, 0xd0, 0x20, 0x8c, 0x36, 0x96, 0xff, 0xaf, 0xa2, - 0x33, 0xc2, 0xf0, 0x10, 0x33, 0x2c, 0xe3, 0x55, 0x45, 0x2a, 0xff, 0x0f, 0x70, 0xc8, 0x9f, 0x31, - 0x23, 0x72, 0xc3, 0xfb, 0x84, 0xb9, 0x43, 0x21, 0x29, 0xfe, 0xcc, 0x07, 0x31, 0xf9, 0xf2, 0xda, - 0x98, 0x8e, 0xa9, 0x78, 0x6c, 0xf0, 0xa7, 0x28, 0x6a, 0x7e, 0x9f, 0x84, 0x92, 0x83, 0x47, 0x6c, - 0x83, 0xe0, 0x80, 0x0d, 0x08, 0x66, 0x68, 0x00, 0xb9, 0x00, 0xfb, 0x63, 0xd2, 0xf7, 0x86, 0x86, - 0x56, 0xd3, 0xea, 0xa9, 0xf6, 0xfa, 0xe9, 0x59, 0x35, 0x71, 0x7e, 0x56, 0xcd, 0x3a, 0x3c, 0xde, - 0xeb, 0xfe, 0x76, 0x56, 0x7d, 0x3a, 0xf6, 0xd8, 0x64, 0x31, 0xb0, 0x5c, 0x3a, 0x6b, 0x5c, 0x1c, - 0x6b, 0x38, 0xb8, 0x7c, 0x6e, 0xcc, 0x8f, 0xc6, 0x0d, 0x79, 0x0e, 0x4b, 0xe2, 0x9c, 0xac, 0x20, - 0xee, 0x0d, 0xd1, 0x57, 0xb0, 0x3a, 0x0a, 0xe8, 0xac, 0x1f, 0x90, 0xf9, 0xd4, 0x73, 0x31, 0x97, - 0x5a, 0xa9, 0x69, 0xf5, 0x52, 0x7b, 0x57, 0x4a, 0x95, 0x9e, 0x05, 0x74, 0xe6, 0x44, 0xab, 0x42, - 0xf0, 0xd3, 0xbb, 0x09, 0x2a, 0xa4, 0x53, 0x1a, 0xc5, 0x88, 0x86, 0xe8, 0x18, 0x4a, 0x8c, 0xc6, - 0x65, 0x93, 0x42, 0x76, 0x5b, 0xca, 0x16, 0x0e, 0xe8, 0x3f, 0x21, 0x5a, 0x60, 0xf4, 0x52, 0xd2, - 0x80, 0x14, 0x23, 0xc1, 0xcc, 0x48, 0x89, 0x5c, 0xa6, 0xb8, 0x92, 0x23, 0x22, 0xe8, 0x03, 0xc8, - 0xb8, 0x74, 0x36, 0xf3, 0x98, 0x91, 0x8e, 0xad, 0xc9, 0x18, 0xaa, 0x40, 0xf6, 0x78, 0xe1, 0x91, - 0xd0, 0x25, 0x46, 0xa6, 0xa6, 0xd5, 0x73, 0x72, 0x59, 0x05, 0xcd, 0xdf, 0x93, 0x80, 0xb8, 0x73, - 0xdb, 0x24, 0x0c, 0xf1, 0x98, 0x38, 0xe4, 0x78, 0x41, 0xc2, 0x7f, 0xc7, 0xbe, 0x6d, 0x28, 0xc6, - 0xed, 0x13, 0xde, 0x15, 0x9a, 0x1f, 0x59, 0x97, 0x05, 0x7e, 0x2d, 0x27, 0x5d, 0x12, 0xba, 0x81, - 0x37, 0x67, 0x34, 0x90, 0xa7, 0x28, 0xc4, 0x6c, 0x41, 0x3d, 0x80, 0x4b, 0x53, 0x84, 0x23, 0x77, - 0x23, 0xcb, 0x5f, 0xa4, 0x1b, 0x35, 0x20, 0x3b, 0x8b, 0xf2, 0x21, 0xf2, 0x5d, 0x68, 0xae, 0x5a, - 0x51, 0x27, 0x58, 0x32, 0x4d, 0x2a, 0x8b, 0x72, 0x57, 0x3c, 0xcb, 0xe9, 0x25, 0x59, 0x46, 0xcf, - 0x00, 0x26, 0xaa, 0x35, 0x42, 0x23, 0x53, 0x4b, 0xd6, 0x0b, 0xcd, 0x9a, 0x75, 0xa3, 0x93, 0xad, - 0x2b, 0x3d, 0x24, 0x49, 0x62, 0x48, 0xb4, 0x0b, 0xab, 0x17, 0x6f, 0xfd, 0x80, 0x84, 0xf3, 0xd0, - 0xc8, 0xde, 0x89, 0xec, 0xde, 0x05, 0xdc, 0xe1, 0x68, 0x73, 0x00, 0xef, 0xdd, 0x74, 0xbf, 0x8d, - 0x99, 0x3b, 0x41, 0xeb, 0x90, 0x0b, 0xa2, 0xf7, 0xd0, 0xd0, 0x84, 0xc8, 0xc3, 0x3f, 0x11, 0xb9, - 0x86, 0x8e, 0x94, 0x2e, 0xc0, 0xe6, 0x1e, 0x18, 0x57, 0x76, 0x85, 0x73, 0xea, 0x87, 0xe4, 0x85, - 0xef, 0x51, 0x1f, 0x59, 0x90, 0x16, 0x43, 0x4b, 0x14, 0x59, 0xa1, 0x69, 0x2c, 0xf1, 0xcb, 0xe6, - 0xeb, 0x4e, 0xb4, 0xed, 0xb3, 0xd4, 0xe9, 0x9b, 0xaa, 0x66, 0xfe, 0xb4, 0x02, 0xff, 0x5b, 0x42, - 0xf9, 0x1f, 0xaf, 0xda, 0x75, 0x48, 0x2f, 0x78, 0x52, 0x65, 0xcd, 0x7e, 0xf2, 0x2e, 0xb7, 0x62, - 0x3e, 0x48, 0xb2, 0x08, 0x6f, 0x7e, 0x97, 0x86, 0xd5, 0x7d, 0x1f, 0xcf, 0xc3, 0x09, 0x65, 0x6a, - 0x20, 0xb4, 0x20, 0x33, 0x21, 0x78, 0x48, 0x94, 0x53, 0x1f, 0x2f, 0x61, 0xbf, 0x86, 0xb1, 0x36, - 0x04, 0xc0, 0x91, 0x40, 0xf4, 0x08, 0x72, 0x47, 0xaf, 0xfa, 0x03, 0x5e, 0x5c, 0x22, 0x6b, 0xc5, - 0x76, 0x81, 0x3b, 0xb3, 0x79, 0x28, 0xea, 0xcd, 0xc9, 0x1e, 0xbd, 0x8a, 0x0a, 0xaf, 0x0a, 0x85, - 0x29, 0x1d, 0xf7, 0x89, 0xcf, 0x02, 0x8f, 0x84, 0x46, 0xb2, 0x96, 0xac, 0x17, 0x1d, 0x98, 0xd2, - 0xb1, 0x1d, 0x45, 0x50, 0x19, 0xd2, 0x23, 0xcf, 0xc7, 0x53, 0x71, 0x50, 0xd5, 0x6b, 0x51, 0xa8, - 0xfc, 0x26, 0x09, 0x99, 0x48, 0x17, 0xbd, 0x84, 0x35, 0xde, 0xb5, 0x7d, 0xd9, 0xa4, 0x7d, 0x59, - 0x90, 0xd2, 0xb1, 0x3b, 0x15, 0x33, 0x0a, 0x6e, 0x8e, 0xc8, 0x07, 0x00, 0x51, 0xb1, 0x85, 0xde, - 0x6b, 0x22, 0x9c, 0x4b, 0x2a, 0x4f, 0x44, 0x7c, 0xdf, 0x7b, 0x4d, 0xd0, 0x43, 0x28, 0xb8, 0xd8, - 0xef, 0x0f, 0x89, 0x3b, 0xf5, 0x7c, 0x72, 0xe5, 0x83, 0xc1, 0xc5, 0x7e, 0x37, 0x8a, 0x73, 0xeb, - 0xc4, 0x0d, 0x2c, 0xa6, 0xc7, 0x72, 0xeb, 0x62, 0xb7, 0xb5, 0x2a, 0x86, 0x7d, 0x0e, 0x51, 0xc7, - 0x17, 0x78, 0xb4, 0x0d, 0xb9, 0x79, 0xe0, 0xd1, 0xc0, 0x63, 0x27, 0x62, 0xde, 0xdf, 0x5b, 0xca, - 0x75, 0xdd, 0xa8, 0x3d, 0x09, 0x51, 0xad, 0xab, 0x28, 0x38, 0x5d, 0xc8, 0x02, 0xcc, 0xc8, 0xf8, - 0xc4, 0xc8, 0xde, 0x9a, 0x6e, 0x5f, 0x42, 0x14, 0x9d, 0xa2, 0x78, 0x9e, 0xca, 0x69, 0xfa, 0x8a, - 0xf9, 0x14, 0x72, 0x4a, 0x10, 0x15, 0x20, 0xfb, 0x62, 0x67, 0x73, 0x67, 0xf7, 0xf3, 0x1d, 0x3d, - 0x81, 0x8a, 0x90, 0x73, 0xec, 0xce, 0xee, 0xa1, 0xed, 0x7c, 0xa1, 0x6b, 0xa8, 0x04, 0x79, 0xc7, - 0x6e, 0xb7, 0xb6, 0x5a, 0x3b, 0x1d, 0x5b, 0x5f, 0x31, 0x0d, 0xc8, 0x29, 0x5e, 0xbe, 0x71, 0xf3, - 0xb0, 0xdf, 0x6e, 0x1d, 0x74, 0x36, 0xf4, 0x84, 0xf9, 0x83, 0x06, 0xfa, 0xe5, 0x27, 0xc8, 0x51, - 0xb0, 0x01, 0x19, 0x9e, 0x91, 0x45, 0x28, 0xea, 0xf5, 0x5e, 0xf3, 0xf1, 0x5f, 0x7e, 0x77, 0x04, - 0xb2, 0xf6, 0x05, 0x42, 0xdd, 0xa0, 0x11, 0x9e, 0xcf, 0x76, 0x75, 0x19, 0xf0, 0xca, 0xc9, 0x5f, - 0x9b, 0xfd, 0x66, 0x0f, 0x32, 0x11, 0xee, 0xc6, 0x61, 0x5a, 0x9d, 0x8e, 0xbd, 0x77, 0x60, 0x77, - 0x75, 0x8d, 0x2f, 0xb5, 0xf6, 0xf6, 0xb6, 0x7a, 0x76, 0x57, 0x5f, 0x41, 0x79, 0x48, 0xdb, 0x8e, - 0xb3, 0xeb, 0xe8, 0x49, 0xbe, 0xab, 0x6b, 0x77, 0xb6, 0x7a, 0x3b, 0x76, 0x57, 0x4f, 0x3d, 0x4f, - 0xe5, 0x92, 0x7a, 0xca, 0xfc, 0x56, 0x83, 0xfb, 0x1d, 0xea, 0x8f, 0x3a, 0x13, 0x5e, 0x46, 0x1d, - 0xea, 0x33, 0xf2, 0x35, 0x43, 0x4f, 0x00, 0xf8, 0x95, 0x8e, 0xfd, 0xa1, 0x9a, 0x6e, 0xf9, 0xf6, - 0x7d, 0x39, 0xdd, 0xf2, 0x9d, 0x68, 0xa5, 0xd7, 0x75, 0xf2, 0x72, 0x93, 0xf8, 0xc9, 0x90, 0x9d, - 0xe3, 0x93, 0x29, 0xc5, 0xd1, 0xcf, 0xa2, 0xa2, 0xa3, 0x5e, 0x51, 0x17, 0xb2, 0x7f, 0x7f, 0xe2, - 0x28, 0x68, 0xf3, 0xad, 0x06, 0xf9, 0xed, 0xc5, 0x94, 0x79, 0xbc, 0x6d, 0xd0, 0x14, 0xf4, 0x58, - 0xfb, 0x44, 0x9d, 0xfc, 0xf8, 0x76, 0x3d, 0xc6, 0xf7, 0x96, 0x1f, 0xdd, 0x6e, 0x5c, 0x99, 0x89, - 0xba, 0xf6, 0x44, 0x43, 0x2f, 0xa1, 0xc8, 0x17, 0x95, 0x83, 0xc8, 0x7c, 0x77, 0x59, 0x96, 0x1f, - 0xdc, 0xa2, 0x04, 0x22, 0xfa, 0xf6, 0x87, 0xa7, 0xbf, 0x54, 0x12, 0xa7, 0xe7, 0x15, 0xed, 0xc7, - 0xf3, 0x8a, 0xf6, 0xf6, 0xbc, 0xa2, 0xfd, 0x7c, 0x5e, 0xd1, 0xbe, 0xf9, 0xb5, 0x92, 0xf8, 0x32, - 0x2b, 0x91, 0x7f, 0x04, 0x00, 0x00, 0xff, 0xff, 0xa4, 0xd6, 0xb6, 0x01, 0x9c, 0x0b, 0x00, 0x00, + // 1160 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xdd, 0x6e, 0x1b, 0x45, + 0x14, 0xce, 0xc6, 0x8e, 0x77, 0x7d, 0xec, 0x34, 0xdb, 0xa1, 0x82, 0x95, 0x41, 0x76, 0xba, 0xa5, + 0x55, 0x5a, 0xc4, 0xba, 0xb2, 0x2a, 0x2e, 0xb8, 0xf3, 0xcf, 0xb6, 0x71, 0xdb, 0xfc, 0x68, 0x9d, + 0x16, 0x81, 0x54, 0x59, 0xe3, 0xf5, 0xd8, 0x5e, 0xc5, 0xde, 0x71, 0x77, 0xc7, 0x85, 0xf6, 0x25, + 0xe0, 0x11, 0x7a, 0xc3, 0x33, 0xf0, 0x0a, 0xb9, 0x41, 0xe2, 0xb2, 0x17, 0x28, 0x02, 0xf3, 0x16, + 0x88, 0x0b, 0x34, 0x7f, 0x89, 0x9b, 0x18, 0x9a, 0x20, 0xc4, 0x0d, 0x37, 0xc9, 0xec, 0x99, 0xf3, + 0x7d, 0xc7, 0x73, 0xce, 0x77, 0xce, 0x0c, 0xa0, 0x94, 0xd1, 0x04, 0x0f, 0x49, 0x35, 0xc1, 0x03, + 0xe6, 0x4d, 0x13, 0xca, 0x28, 0xba, 0x1a, 0xd2, 0xf0, 0x30, 0xa1, 0x38, 0x1c, 0x79, 0x6a, 0xb7, + 0x74, 0x4d, 0x7c, 0x4e, 0x7b, 0x55, 0x92, 0x24, 0x34, 0x49, 0xa5, 0x63, 0xe9, 0x7d, 0x6d, 0x9d, + 0x10, 0x86, 0xfb, 0x98, 0x61, 0x65, 0xaf, 0x68, 0x52, 0xf5, 0xbf, 0x87, 0x53, 0xbe, 0xc6, 0x8c, + 0x28, 0x87, 0x0f, 0x09, 0x0b, 0xfb, 0x22, 0xa4, 0xf8, 0x33, 0xed, 0x2d, 0x84, 0x2f, 0x5d, 0x1b, + 0xd2, 0x21, 0x15, 0xcb, 0x2a, 0x5f, 0x49, 0xab, 0xfb, 0x43, 0x06, 0xd6, 0x03, 0x3c, 0x60, 0xdb, + 0x04, 0x27, 0xac, 0x47, 0x30, 0x43, 0x3d, 0xb0, 0x12, 0x1c, 0x0f, 0x49, 0x37, 0xea, 0x3b, 0xc6, + 0xa6, 0xb1, 0x95, 0x6d, 0x3c, 0x38, 0x3a, 0xae, 0xac, 0xcc, 0x8f, 0x2b, 0x66, 0xc0, 0xed, 0xed, + 0xd6, 0xef, 0xc7, 0x95, 0x7b, 0xc3, 0x88, 0x8d, 0x66, 0x3d, 0x2f, 0xa4, 0x93, 0xea, 0xc9, 0xb1, + 0xfa, 0xbd, 0xd3, 0x75, 0x75, 0x7a, 0x38, 0xac, 0xaa, 0x73, 0x78, 0x0a, 0x17, 0x98, 0x82, 0xb8, + 0xdd, 0x47, 0x5f, 0xc3, 0xc6, 0x20, 0xa1, 0x93, 0x6e, 0x42, 0xa6, 0xe3, 0x28, 0xc4, 0x3c, 0xd4, + 0xea, 0xa6, 0xb1, 0xb5, 0xde, 0xd8, 0x53, 0xa1, 0xd6, 0xef, 0x27, 0x74, 0x12, 0xc8, 0x5d, 0x11, + 0xf0, 0xb3, 0xcb, 0x05, 0xd4, 0xc8, 0x60, 0x7d, 0xb0, 0x40, 0xd4, 0x47, 0xcf, 0x61, 0x9d, 0xd1, + 0xc5, 0xb0, 0x19, 0x11, 0x76, 0x47, 0x85, 0x2d, 0x1c, 0xd0, 0x7f, 0x23, 0x68, 0x81, 0xd1, 0xd3, + 0x90, 0x0e, 0x64, 0x19, 0x49, 0x26, 0x4e, 0x56, 0xe4, 0x32, 0xcb, 0x23, 0x05, 0xc2, 0x82, 0x3e, + 0x82, 0x5c, 0x48, 0x27, 0x93, 0x88, 0x39, 0x6b, 0x0b, 0x7b, 0xca, 0x86, 0xca, 0x60, 0x3e, 0x9f, + 0x45, 0x24, 0x0d, 0x89, 0x93, 0xdb, 0x34, 0xb6, 0x2c, 0xb5, 0xad, 0x8d, 0xee, 0x1f, 0x19, 0x40, + 0xbc, 0x72, 0x3b, 0x24, 0x4d, 0xf1, 0x90, 0x04, 0xe4, 0xf9, 0x8c, 0xa4, 0xff, 0x4d, 0xf9, 0x76, + 0xa0, 0xb8, 0x58, 0x3e, 0x51, 0xbb, 0x42, 0xed, 0x63, 0xef, 0x54, 0xe0, 0x67, 0x72, 0xd2, 0x22, + 0x69, 0x98, 0x44, 0x53, 0x46, 0x13, 0x75, 0x8a, 0xc2, 0x42, 0x59, 0x50, 0x1b, 0xe0, 0xb4, 0x28, + 0xa2, 0x22, 0x97, 0x23, 0xcb, 0x9f, 0xa4, 0x1b, 0x55, 0xc1, 0x9c, 0xc8, 0x7c, 0x88, 0x7c, 0x17, + 0x6a, 0x1b, 0x9e, 0xec, 0x04, 0x4f, 0xa5, 0x49, 0x67, 0x51, 0x79, 0x2d, 0x66, 0x79, 0x6d, 0x49, + 0x96, 0xd1, 0x7d, 0x80, 0x91, 0x6e, 0x8d, 0xd4, 0xc9, 0x6d, 0x66, 0xb6, 0x0a, 0xb5, 0x4d, 0xef, + 0x5c, 0x27, 0x7b, 0x6f, 0xf5, 0x90, 0x22, 0x59, 0x40, 0xa2, 0x3d, 0xd8, 0x38, 0xf9, 0xea, 0x26, + 0x24, 0x9d, 0xa6, 0x8e, 0x79, 0x29, 0xb2, 0x2b, 0x27, 0xf0, 0x80, 0xa3, 0xdd, 0x1e, 0x7c, 0x70, + 0xbe, 0xfa, 0x0d, 0xcc, 0xc2, 0x11, 0x7a, 0x00, 0x56, 0x22, 0xbf, 0x53, 0xc7, 0x10, 0x41, 0x6e, + 0xfe, 0x45, 0x90, 0x33, 0x68, 0x19, 0xe9, 0x04, 0xec, 0xee, 0x83, 0xf3, 0x96, 0x57, 0x3a, 0xa5, + 0x71, 0x4a, 0x9e, 0xc4, 0x11, 0x8d, 0x91, 0x07, 0x6b, 0x62, 0x68, 0x09, 0x91, 0x15, 0x6a, 0xce, + 0x92, 0x7a, 0xf9, 0x7c, 0x3f, 0x90, 0x6e, 0x9f, 0x67, 0x8f, 0x5e, 0x57, 0x0c, 0xf7, 0xe7, 0x55, + 0x78, 0x6f, 0x09, 0xe5, 0xff, 0x5c, 0xb5, 0x0f, 0x60, 0x6d, 0xc6, 0x93, 0xaa, 0x34, 0xfb, 0xc9, + 0xbb, 0xaa, 0xb5, 0x50, 0x07, 0x45, 0x26, 0xf1, 0xee, 0xb7, 0x39, 0xd8, 0xe8, 0xc4, 0x78, 0x9a, + 0x8e, 0x28, 0xd3, 0x03, 0xa1, 0x0e, 0xb9, 0x11, 0xc1, 0x7d, 0xa2, 0x2b, 0x75, 0x7b, 0x09, 0xfb, + 0x19, 0x8c, 0xb7, 0x2d, 0x00, 0x81, 0x02, 0xa2, 0x5b, 0x60, 0x1d, 0xbe, 0xe8, 0xf6, 0xb8, 0xb8, + 0x44, 0xd6, 0x8a, 0x8d, 0x02, 0xaf, 0xcc, 0xa3, 0xa7, 0x42, 0x6f, 0x81, 0x79, 0xf8, 0x42, 0x0a, + 0xaf, 0x02, 0x85, 0x31, 0x1d, 0x76, 0x49, 0xcc, 0x92, 0x88, 0xa4, 0x4e, 0x66, 0x33, 0xb3, 0x55, + 0x0c, 0x60, 0x4c, 0x87, 0xbe, 0xb4, 0xa0, 0x12, 0xac, 0x0d, 0xa2, 0x18, 0x8f, 0xc5, 0x41, 0x75, + 0xaf, 0x49, 0x13, 0xba, 0x0d, 0xf9, 0x34, 0x65, 0xdd, 0x70, 0x34, 0x8b, 0x0f, 0x45, 0x2f, 0x16, + 0x1b, 0xc5, 0xf9, 0x71, 0xc5, 0xea, 0x74, 0x0e, 0x9a, 0xdc, 0x16, 0x58, 0x69, 0xca, 0xc4, 0x0a, + 0x7d, 0x2a, 0x5d, 0x25, 0x95, 0x1c, 0x8e, 0xb6, 0x92, 0x0b, 0x77, 0xbf, 0xcf, 0xed, 0xc2, 0x5d, + 0xac, 0x4a, 0xaf, 0x33, 0x90, 0x93, 0x27, 0x42, 0xcf, 0xe0, 0x1a, 0x9f, 0x07, 0x5d, 0xd5, 0xfe, + 0x5d, 0x25, 0x75, 0xa5, 0x85, 0x4b, 0xb5, 0x09, 0x4a, 0xce, 0x0f, 0xdf, 0x1b, 0x00, 0x52, 0xc6, + 0x69, 0xf4, 0x8a, 0x08, 0x4d, 0x64, 0x74, 0xb5, 0x85, 0xbd, 0x13, 0xbd, 0x22, 0xe8, 0x26, 0x14, + 0x42, 0x1c, 0x77, 0xfb, 0x24, 0x1c, 0x47, 0x31, 0x79, 0x2b, 0x15, 0x10, 0xe2, 0xb8, 0x25, 0xed, + 0x5c, 0x14, 0xe2, 0x6e, 0x17, 0xb9, 0x58, 0x2e, 0x8a, 0x85, 0x77, 0x80, 0x96, 0x59, 0x87, 0x43, + 0x74, 0x62, 0x05, 0x1e, 0xed, 0x80, 0x35, 0x4d, 0x22, 0x9a, 0x44, 0xec, 0xa5, 0x48, 0xd6, 0x95, + 0xa5, 0x5c, 0x67, 0x25, 0xb0, 0xaf, 0x20, 0x7a, 0x28, 0x68, 0x0a, 0x4e, 0x97, 0xb2, 0x04, 0x33, + 0x32, 0x7c, 0xe9, 0x98, 0x17, 0xa6, 0xeb, 0x28, 0x88, 0xa6, 0xd3, 0x14, 0x0f, 0xb3, 0x96, 0x61, + 0xaf, 0xba, 0xf7, 0xc0, 0xd2, 0x01, 0x51, 0x01, 0xcc, 0x27, 0xbb, 0x8f, 0x76, 0xf7, 0xbe, 0xd8, + 0xb5, 0x57, 0x50, 0x11, 0xac, 0xc0, 0x6f, 0xee, 0x3d, 0xf5, 0x83, 0x2f, 0x6d, 0x03, 0xad, 0x43, + 0x3e, 0xf0, 0x1b, 0xf5, 0xc7, 0xf5, 0xdd, 0xa6, 0x6f, 0xaf, 0xba, 0xd7, 0xc1, 0xd2, 0xbc, 0xdc, + 0xf1, 0xd1, 0xd3, 0x6e, 0xa3, 0x7e, 0xd0, 0xdc, 0xb6, 0x57, 0x90, 0x09, 0x99, 0x4e, 0xe7, 0xc0, + 0x36, 0xdc, 0x1f, 0x0d, 0xb0, 0x4f, 0x7f, 0x8b, 0x9a, 0x36, 0xdb, 0x90, 0xe3, 0xa9, 0x99, 0xa5, + 0xa2, 0x25, 0xae, 0xd4, 0xee, 0xfc, 0xed, 0x01, 0x24, 0xc8, 0xeb, 0x08, 0x84, 0xbe, 0xa4, 0x25, + 0x9e, 0x5f, 0x1f, 0xfa, 0xbe, 0xe1, 0x12, 0xca, 0x9f, 0xb9, 0x5e, 0xdc, 0x36, 0xe4, 0x24, 0xee, + 0xdc, 0xa9, 0xea, 0xcd, 0xa6, 0xbf, 0x7f, 0xe0, 0xb7, 0x6c, 0x83, 0x6f, 0xd5, 0xf7, 0xf7, 0x1f, + 0xb7, 0xfd, 0x96, 0xbd, 0x8a, 0xf2, 0xb0, 0xe6, 0x07, 0xc1, 0x5e, 0x60, 0x67, 0xb8, 0x57, 0xcb, + 0x6f, 0x3e, 0x6e, 0xef, 0xfa, 0x2d, 0x3b, 0xfb, 0x30, 0x6b, 0x65, 0xec, 0xac, 0xfb, 0xbd, 0x01, + 0x57, 0x9b, 0x34, 0x1e, 0x34, 0x47, 0x5c, 0x4f, 0x4d, 0x1a, 0x33, 0xf2, 0x0d, 0x43, 0x77, 0x01, + 0xf8, 0xab, 0x01, 0xc7, 0x7d, 0x3d, 0x40, 0xf3, 0x8d, 0xab, 0xaa, 0x23, 0xf2, 0x4d, 0xb9, 0xd3, + 0x6e, 0x05, 0x79, 0xe5, 0x24, 0x5e, 0x25, 0xe6, 0x14, 0xbf, 0x1c, 0x53, 0x2c, 0x5f, 0x5e, 0xc5, + 0x40, 0x7f, 0xa2, 0x16, 0x98, 0xff, 0x7c, 0xa8, 0x69, 0x68, 0xed, 0x8d, 0x01, 0xf9, 0x9d, 0xd9, + 0x98, 0x45, 0xbc, 0x7f, 0xd0, 0x18, 0xec, 0x85, 0x3e, 0x92, 0xc3, 0xe2, 0xce, 0xc5, 0x9a, 0x8d, + 0xfb, 0x96, 0x6e, 0x5d, 0x6c, 0x22, 0xba, 0x2b, 0x5b, 0xc6, 0x5d, 0x03, 0x3d, 0x83, 0x22, 0xdf, + 0xd4, 0x15, 0x44, 0xee, 0xbb, 0xf5, 0x59, 0xba, 0x71, 0x01, 0x09, 0x48, 0xfa, 0xc6, 0xf5, 0xa3, + 0x5f, 0xcb, 0x2b, 0x47, 0xf3, 0xb2, 0xf1, 0xd3, 0xbc, 0x6c, 0xbc, 0x99, 0x97, 0x8d, 0x5f, 0xe6, + 0x65, 0xe3, 0xbb, 0xdf, 0xca, 0x2b, 0x5f, 0x99, 0x0a, 0xf9, 0x67, 0x00, 0x00, 0x00, 0xff, 0xff, + 0x1a, 0xf3, 0x8f, 0x6d, 0xff, 0x0b, 0x00, 0x00, } diff --git a/pkg/storage/raft.proto b/pkg/storage/raft.proto index 33b56e8e1514..04d57544c1df 100644 --- a/pkg/storage/raft.proto +++ b/pkg/storage/raft.proto @@ -117,6 +117,10 @@ message SnapshotRequest { // combined into a large RocksDB WriteBatch that is atomically // applied. KV_BATCH = 0; + // SST snapshots stream sst files consisting of all keys in a range + // from the sender to the receiver. These sst files are then atomically + // ingested directly into RocksDB. + SST = 1; } message Header { @@ -149,14 +153,26 @@ message SnapshotRequest { optional Header header = 1; - // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple request messages. + // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple + // request messages. Only used by KV_BATCH snapshots. optional bytes kv_batch = 2 [(gogoproto.customname) = "KVBatch"]; // These are really raftpb.Entry, but we model them as raw bytes to avoid - // roundtripping through memory. They are separate from the kv_batch to - // allow flexibility in log implementations. + // roundtripping through memory. They are separate from the kv_batch to allow + // flexibility in log implementations. Used by KV_BATCH and SST snapshots. repeated bytes log_entries = 3; + // A chunk of an SST file. Multiple SST chunks may be sent across multiple + // request messages. These chunks will combine to form one or more complete + // SST files, delimited by intermittent sst_final flags. Only used by SST + // snapshots. + optional bytes sst_chunk = 5 [(gogoproto.customname) = "SSTChunk"]; + + // If set, the SST chunk is the last chunk in the current SST file. Only used + // by SST snapshots. + optional bool sst_final = 6 [(gogoproto.customname) = "SSTFinal", (gogoproto.nullable) = false]; + + // Indicates that this is the last request in the snapshot stream. optional bool final = 4 [(gogoproto.nullable) = false]; } diff --git a/pkg/storage/rditer/replica_data_iter.go b/pkg/storage/rditer/replica_data_iter.go index c149a5855dae..25169eea41dd 100644 --- a/pkg/storage/rditer/replica_data_iter.go +++ b/pkg/storage/rditer/replica_data_iter.go @@ -142,11 +142,33 @@ func (ri *ReplicaDataIterator) advance() { } } +// KeyRanges returns all key ranges that the iterator will iterate over. +func (ri *ReplicaDataIterator) KeyRanges() []KeyRange { + return ri.ranges +} + +// Index returns the index of the key range that the iterator points to. +func (ri *ReplicaDataIterator) Index() int { + return ri.curIndex +} + // Valid returns true if the iterator currently points to a valid value. func (ri *ReplicaDataIterator) Valid() (bool, error) { return ri.it.Valid() } +// UnsafeKey returns the current key, but the memory is invalidated on the next +// call to {NextKey,Seek}. +func (ri *ReplicaDataIterator) UnsafeKey() engine.MVCCKey { + return ri.it.UnsafeKey() +} + +// UnsafeValue returns the same value as a byte slice, but the memory is +// invalidated on the next call to {Next,Reset,Close}. +func (ri *ReplicaDataIterator) UnsafeValue() []byte { + return ri.it.UnsafeValue() +} + // Key returns the current key. func (ri *ReplicaDataIterator) Key() engine.MVCCKey { key := ri.it.UnsafeKey() diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 921f0d93d90e..5f9c785ee69a 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -1875,6 +1875,18 @@ func (r *Replica) sendSnapshot( } } + // TODO we should introduce a heuristic here for which snapshot strategy + // to use. An easy heuristic would be to use SST snapshots only when a + // range is over a certain size. That is what we have right now. The 8MB + // number was tuned based on the how much it affected the speed of tests. + // Right now this provides a ~10% speedup when running ./pkg/storage tests. + // We should actually tune this for real. + // TODO we need to stick SST snapshots behind a version check. + strategy := SnapshotRequest_KV_BATCH + if r.GetMVCCStats().LiveBytes > (8 << 20) /* 8MB */ { + strategy = SnapshotRequest_SST + } + status := r.RaftStatus() if status == nil { return errors.New("raft status not initialized") @@ -1898,7 +1910,7 @@ func (r *Replica) sendSnapshot( // Recipients can choose to decline preemptive snapshots. CanDecline: snapType == snapTypePreemptive, Priority: priority, - Strategy: SnapshotRequest_KV_BATCH, + Strategy: strategy, } sent := func() { r.store.metrics.RangeSnapshotsGenerated.Inc(1) diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 6492e639c5b7..79ea47ecb15a 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -638,7 +638,7 @@ func clearRangeData( desc *roachpb.RangeDescriptor, keyCount int64, eng engine.Engine, - batch engine.Batch, + writer engine.Writer, ) error { iter := eng.NewIterator(engine.IterOptions{}) defer iter.Close() @@ -657,9 +657,9 @@ func clearRangeData( // above), but the data range's key count needs to be explicitly checked. var err error if i >= metadataRanges && keyCount >= clearRangeMinKeys { - err = batch.ClearRange(keyRange.Start, keyRange.End) + err = writer.ClearRange(keyRange.Start, keyRange.End) } else { - err = batch.ClearIterRange(iter, keyRange.Start, keyRange.End) + err = writer.ClearIterRange(iter, keyRange.Start, keyRange.End) } if err != nil { return err @@ -868,6 +868,111 @@ func (r *Replica) applySnapshot( return err } stats.commit = timeutil.Now() + case *sstSnapshotStrategy: + // TODO DURING REVIEW: do we need to worry about RaftTombstoneIncorrectLegacyKey here? + // TODO track stats of ingestion and log them. + + unreplicatedSST, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + defer unreplicatedSST.Close() + + // Clear out all existing unreplicated keys in the range. + unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID) + unreplicatedStart := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey) + unreplicatedEnd := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey.PrefixEnd()) + err = unreplicatedSST.ClearRange(unreplicatedStart, unreplicatedEnd) + if err != nil { + return errors.Wrapf(err, "unable to write ClearRange to unreplicated SST writer") + } + raftLogSize = 0 + + //////// + + // Note that since this snapshot comes from Raft, we don't have to synthesize + // the HardState -- Raft wouldn't ask us to update the HardState in incorrect + // ways. + hsKey := r.raftMu.stateLoader.RaftHardStateKey() + var hsValue roachpb.Value + if err := hsValue.SetProto(&hs); err != nil { + return err + } + hsValue.InitChecksum(hsKey) + err = engine.MVCCBlindPut(ctx, &unreplicatedSST, nil, hsKey, hlc.Timestamp{}, hsValue, nil) + if err != nil { + return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + } + + /////// + + if len(ss.LogEntries) > 0 { + logEntries := make([]raftpb.Entry, len(ss.LogEntries)) + for i, bytes := range ss.LogEntries { + if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { + return err + } + } + // If this replica doesn't know its ReplicaID yet, we're applying a + // preemptive snapshot. In this case, we're going to have to write the + // sideloaded proposals into the Raft log. Otherwise, sideload. + thinEntries := logEntries + if replicaID != 0 { + var err error + var sideloadedEntriesSize int64 + thinEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries) + if err != nil { + return err + } + raftLogSize += sideloadedEntriesSize + } + var logDiff enginepb.MVCCStats + var logValue roachpb.Value + for i := range thinEntries { + ent := &thinEntries[i] + entKey := r.raftMu.stateLoader.RaftLogKey(ent.Index) + + if err := logValue.SetProto(ent); err != nil { + return err + } + logValue.InitChecksum(entKey) + err = engine.MVCCBlindPut(ctx, &unreplicatedSST, &logDiff, entKey, hlc.Timestamp{}, + logValue, nil /* txn */) + if err != nil { + return errors.Wrapf(err, "unable to write log entry to unreplicated SST writer") + } + } + raftLogSize += logDiff.SysBytes + lastTerm = thinEntries[len(thinEntries)-1].Term + } else { + lastTerm = invalidLastTerm + } + + // Create a new file to write the SST into. + unreplicatedSSTFile, err := ss.createEmptySSTFile(r.RangeID) + if err != nil { + return errors.Wrapf(err, "unable to open empty SST file") + } + data, err := unreplicatedSST.Finish() + if err != nil { + unreplicatedSSTFile.Close() + return errors.Wrapf(err, "unable to Finish sst file writer") + } + _, err = unreplicatedSSTFile.Write(data) + unreplicatedSSTFile.Close() + if err != nil { + return errors.Wrapf(err, "unable to write SST data to file") + } + + // Ingest all SSTs atomically. + if err := r.store.engine.IngestExternalFiles(ctx, ss.SSTs, true /* modify */); err != nil { + return errors.Wrapf(err, "while ingesting %s", ss.SSTs) + } + + // TODO should we force a compaction? Only in the global keyspace? The + // SSTs will probably be living in L0 after the ingestion, which isn't + // great, especially because they each contain a large RangeDeletion + // tombstone. default: log.Fatalf(ctx, "unknown snapshot strategy: %v", ss) } diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 60b39d14f715..f94fb311a51b 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -18,7 +18,10 @@ import ( "context" "fmt" "io" + "io/ioutil" "math" + "os" + "path/filepath" "github.com/coreos/etcd/raft/raftpb" "github.com/pkg/errors" @@ -85,6 +88,10 @@ type snapshotStrategy interface { // Status provides a status report on the work performed during the // snapshot. Only valid if the strategy succeeded. Status() string + + // Close cleans up any resources associated with the operation of the + // snapshot strategy. + Close() } func assertStrategy( @@ -121,11 +128,11 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( for { req, err := stream.Recv() if err != nil { - return IncomingSnapshot{}, err + return noSnap, err } if req.Header != nil { err := errors.New("client error: provided a header mid-stream") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(stream, err) } if req.KVBatch != nil { @@ -134,11 +141,19 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( if req.LogEntries != nil { kvSS.LogEntries = append(kvSS.LogEntries, req.LogEntries...) } + if req.SSTChunk != nil { + err := errors.New("client error: provided SST chunks for KV_BATCH strategy") + return noSnap, sendSnapshotError(stream, err) + } + if req.SSTFinal { + err := errors.New("client error: provided SST final flag for KV_BATCH strategy") + return noSnap, sendSnapshotError(stream, err) + } if req.Final { snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "invalid snapshot") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(stream, err) } inSnap := IncomingSnapshot{ @@ -188,10 +203,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } if int64(len(b.Repr())) >= kvSS.batchSize { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } b = nil @@ -202,10 +214,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } } if b != nil { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } } @@ -291,8 +300,11 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } func (kvSS *kvBatchSnapshotStrategy) sendBatch( - stream outgoingSnapshotStream, batch engine.Batch, + ctx context.Context, stream outgoingSnapshotStream, batch engine.Batch, ) error { + if err := kvSS.limiter.WaitN(ctx, 1); err != nil { + return err + } repr := batch.Repr() batch.Close() return stream.Send(&SnapshotRequest{KVBatch: repr}) @@ -301,6 +313,372 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( // Status implements the snapshotStrategy interface. func (kvSS *kvBatchSnapshotStrategy) Status() string { return kvSS.status } +// Close implements the snapshotStrategy interface. +func (kvSS *kvBatchSnapshotStrategy) Close() {} + +// sstSnapshotStrategy is an implementation of snapshotStrategy that streams +// sst files. +type sstSnapshotStrategy struct { + status string + + // Fields used when receiving snapshots. + auxDir string + sstDir string + SSTs []string // Paths of SSTs to ingest atomically + LogEntries [][]byte // Raft log entries + + // Fields used when sending snapshots. + batchSize int64 + limiter *rate.Limiter +} + +// Send implements the snapshotStrategy interface. +func (sstSS *sstSnapshotStrategy) Receive( + ctx context.Context, stream incomingSnapshotStream, header SnapshotRequest_Header, +) (IncomingSnapshot, error) { + assertStrategy(ctx, header, SnapshotRequest_SST) + + var curSST *os.File + defer func() { + // Clean up open sst file, if necessary. + if curSST != nil { + if err := curSST.Close(); err != nil { + log.Warningf(ctx, "error closing snapshot sst file: %v", err) + } + } + }() + + sstSS.SSTs = nil + sstSS.LogEntries = nil + for { + req, err := stream.Recv() + if err != nil { + return noSnap, err + } + if req.Header != nil { + err := errors.New("client error: provided a header mid-stream") + return noSnap, sendSnapshotError(stream, err) + } + + if req.SSTChunk != nil { + if curSST == nil { + // Create new sst file. + curSST, err = sstSS.createEmptySSTFile(header.State.Desc.RangeID) + if err != nil { + err = errors.Wrap(err, "error creating file for snapshot SSTs") + return noSnap, sendSnapshotError(stream, err) + } + } + + if _, err := curSST.Write(req.SSTChunk); err != nil { + err = errors.Wrapf(err, "error writing to %s", curSST.Name()) + return noSnap, sendSnapshotError(stream, err) + } + } + if req.SSTFinal { + if curSST == nil { + err := errors.New("client error: unexpected SSTFinal flag") + return noSnap, sendSnapshotError(stream, err) + } + + err := curSST.Close() + curSST = nil + if err != nil { + err = errors.Wrapf(err, "error closing snapshot sst") + return noSnap, sendSnapshotError(stream, err) + } + } + if req.LogEntries != nil { + sstSS.LogEntries = append(sstSS.LogEntries, req.LogEntries...) + } + if req.KVBatch != nil { + err := errors.New("client error: provided KV batch for SST strategy") + return noSnap, sendSnapshotError(stream, err) + } + if req.Final { + if curSST != nil { + err := errors.New("client error: SSTFinal flag never provided") + return noSnap, sendSnapshotError(stream, err) + } + + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) + if err != nil { + err = errors.Wrap(err, "invalid snapshot") + return noSnap, sendSnapshotError(stream, err) + } + + inSnap := IncomingSnapshot{ + SnapUUID: snapUUID, + Receiver: sstSS, + State: &header.State, + snapType: snapTypeRaft, + } + if header.RaftMessageRequest.ToReplica.ReplicaID == 0 { + inSnap.snapType = snapTypePreemptive + } + sstSS.status = fmt.Sprintf("sst files: %d, log entries: %d", + len(sstSS.SSTs), len(sstSS.LogEntries)) + return inSnap, nil + } + } +} + +func (sstSS *sstSnapshotStrategy) createDirForSSTFiles(rangeID roachpb.RangeID) error { + sstAuxDir := filepath.Join(sstSS.auxDir, "snapshotsst") + if err := os.MkdirAll(sstAuxDir, 0755); err != nil { + return err + } + dir, err := ioutil.TempDir(sstSS.auxDir, fmt.Sprintf("r%d_", rangeID)) + if err != nil { + return err + } + sstSS.sstDir = dir + return nil +} + +func (sstSS *sstSnapshotStrategy) createEmptySSTFile(rangeID roachpb.RangeID) (*os.File, error) { + // Create a directory to store sst files, if necessary. + if sstSS.sstDir == "" { + if err := sstSS.createDirForSSTFiles(rangeID); err != nil { + return nil, err + } + } + + // Create a new file with the next available index. + sstIndex := len(sstSS.SSTs) + filename := filepath.Join(sstSS.sstDir, fmt.Sprintf("%d.sst", sstIndex)) + file, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return nil, err + } + + sstSS.SSTs = append(sstSS.SSTs, file.Name()) + return file, nil +} + +// Send implements the snapshotStrategy interface. +func (sstSS *sstSnapshotStrategy) Send( + ctx context.Context, + stream outgoingSnapshotStream, + header SnapshotRequest_Header, + snap *OutgoingSnapshot, +) error { + assertStrategy(ctx, header, SnapshotRequest_SST) + + keyRanges := snap.Iter.KeyRanges() + curRange := 0 + + sst, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return err + } + defer func() { + // Idemptotent - ok to call even if already called. + sst.Close() + }() + + // finishAndSend finishes the current sst and sends it out. It then + // increments the curRange counter. + finishAndSend := func() error { + // We add a DeleteRange tombstone to each file to delete any + // existing data in the desired bounds. This also has the effect of + // expanding the bounds of the sst to exactly what we want. + r := keyRanges[curRange] + if err := sst.ClearRange(r.Start, r.End); err != nil { + return err + } + chunk, err := sst.Finish() + if err != nil { + return err + } + + if err := sstSS.sendSSTData(ctx, stream, chunk, true /* final */); err != nil { + return err + } + + sst.Close() + curRange++ + return nil + } + + var lastSizeCheck int64 + for iter := snap.Iter; ; iter.Next() { + valid, err := iter.Valid() + if err != nil { + return err + } + if !valid || curRange < iter.Index() { + if err := finishAndSend(); err != nil { + return err + } + + // Send any empty ranges that the iterator may have skipped. + sendTo := iter.Index() + if !valid { + sendTo = len(keyRanges) + } + for curRange < sendTo { + if sst, err = engine.MakeRocksDBSstFileWriter(); err != nil { + return err + } + if err := finishAndSend(); err != nil { + return err + } + } + + if !valid { + break + } + if sst, err = engine.MakeRocksDBSstFileWriter(); err != nil { + return err + } + } + + if err := sst.Put(iter.UnsafeKey(), iter.UnsafeValue()); err != nil { + return err + } + + newDataSize := sst.DataSize - lastSizeCheck + if newDataSize >= sstSS.batchSize { + lastSizeCheck = sst.DataSize + + chunk, err := sst.Truncate() + if err != nil { + return err + } + if err := sstSS.sendSSTData(ctx, stream, chunk, false /* final */); err != nil { + return err + } + } + } + + // Iterate over the specified range of Raft entries and send them all out + // together. The receiver with then create an SST our of them and ingest it + // with the other SSTs that we've already sent. We could create an SST using + // these entries and send that instead, but there are a few reasons why that + // approach is difficult: + // - we would need to handle sideloaded entries separately. This would be a + // good change in the long run, but for now it doesn't get us anything. + // - the receiver will have a HardState key that it needs to include in an + // SST. We don't know what that HardState is in advance because it is + // handled inside of Raft. + // - the receiver wants to track some stats about the log entries, like + // lastTerm and raftLogSize. Putting them into an SST on the sender side + // makes this more difficult. + // + // WIP eliminate code duplication. + firstIndex := header.State.TruncatedState.Index + 1 + endIndex := snap.RaftSnap.Metadata.Index + 1 + logEntries := make([][]byte, 0, endIndex-firstIndex) + + scanFunc := func(kv roachpb.KeyValue) (bool, error) { + bytes, err := kv.Value.GetBytes() + if err == nil { + logEntries = append(logEntries, bytes) + } + return false, err + } + + rangeID := header.State.Desc.RangeID + + if err := iterateEntries(ctx, snap.EngineSnap, rangeID, firstIndex, endIndex, scanFunc); err != nil { + return err + } + + // Inline the payloads for all sideloaded proposals. + // + // TODO(tschottdorf): could also send slim proposals and attach sideloaded + // SSTables directly to the snapshot. Probably the better long-term + // solution, but let's see if it ever becomes relevant. Snapshots with + // inlined proposals are hopefully the exception. + { + var ent raftpb.Entry + for i := range logEntries { + if err := protoutil.Unmarshal(logEntries[i], &ent); err != nil { + return err + } + if !sniffSideloadedRaftCommand(ent.Data) { + continue + } + if err := snap.WithSideloaded(func(ss sideloadStorage) error { + newEnt, err := maybeInlineSideloadedRaftCommand( + ctx, rangeID, ent, ss, snap.RaftEntryCache, + ) + if err != nil { + return err + } + if newEnt != nil { + ent = *newEnt + } + return nil + }); err != nil { + if errors.Cause(err) == errSideloadedFileNotFound { + // We're creating the Raft snapshot based on a snapshot of + // the engine, but the Raft log may since have been + // truncated and corresponding on-disk sideloaded payloads + // unlinked. Luckily, we can just abort this snapshot; the + // caller can retry. + // + // TODO(tschottdorf): check how callers handle this. They + // should simply retry. In some scenarios, perhaps this can + // happen repeatedly and prevent a snapshot; not sending the + // log entries wouldn't help, though, and so we'd really + // need to make sure the entries are always here, for + // instance by pre-loading them into memory. Or we can make + // log truncation less aggressive about removing sideloaded + // files, by delaying trailing file deletion for a bit. + return &errMustRetrySnapshotDueToTruncation{ + index: ent.Index, + term: ent.Term, + } + } + return err + } + // TODO(tschottdorf): it should be possible to reuse `logEntries[i]` here. + var err error + if logEntries[i], err = protoutil.Marshal(&ent); err != nil { + return err + } + } + } + sstSS.status = fmt.Sprintf("sst files: %d, log entries: %d", len(keyRanges), len(logEntries)) + return stream.Send(&SnapshotRequest{LogEntries: logEntries}) +} + +func (sstSS *sstSnapshotStrategy) sendSSTData( + ctx context.Context, stream outgoingSnapshotStream, data []byte, final bool, +) error { + for len(data) > 0 { + var chunk []byte + if int64(len(data)) < 2*sstSS.batchSize { + chunk, data = data, nil + } else { + chunk, data = data[:int(sstSS.batchSize)], data[int(sstSS.batchSize):] + } + + if err := sstSS.limiter.WaitN(ctx, 1); err != nil { + return err + } + if err := stream.Send(&SnapshotRequest{ + SSTChunk: chunk, + SSTFinal: final, + }); err != nil { + return err + } + } + return nil +} + +// Status implements the snapshotStrategy interface. +func (sstSS *sstSnapshotStrategy) Status() string { return sstSS.status } + +// Close implements the snapshotStrategy interface. +func (sstSS *sstSnapshotStrategy) Close() { + if sstSS.sstDir != "" { + _ = os.RemoveAll(sstSS.sstDir) + } +} + // reserveSnapshot throttles incoming snapshots. The returned closure is used // to cleanup the reservation and release its resources. A nil cleanup function // and a non-empty rejectionMessage indicates the reservation was declined. @@ -447,12 +825,17 @@ func (s *Store) receiveSnapshot( switch header.Strategy { case SnapshotRequest_KV_BATCH: ss = &kvBatchSnapshotStrategy{} + case SnapshotRequest_SST: + ss = &sstSnapshotStrategy{ + auxDir: s.engine.GetAuxiliaryDir(), + } default: return sendSnapshotError(stream, errors.Errorf("%s,r%d: unknown snapshot strategy: %s", s, header.State.Desc.RangeID, header.Strategy), ) } + defer ss.Close() if err := stream.Send(&SnapshotResponse{Status: SnapshotResponse_ACCEPTED}); err != nil { return err @@ -591,11 +974,18 @@ func sendSnapshot( limiter: limiter, newBatch: newBatch, } + case SnapshotRequest_SST: + ss = &sstSnapshotStrategy{ + batchSize: batchSize, + limiter: limiter, + } default: log.Fatalf(ctx, "unknown snapshot strategy: %s", header.Strategy) } + defer ss.Close() if err := ss.Send(ctx, stream, header, snap); err != nil { + log.Errorf(ctx, "saw error when streaming snapshot %v", err) return err }