From 107b8d17f405f30760af9558b214424cc49bd6fd Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Wed, 17 Jul 2019 10:47:03 -0400 Subject: [PATCH] storage: build SSTable from KV_BATCH snapshot Release note: None --- pkg/keys/constants.go | 4 + pkg/keys/keys.go | 6 + pkg/keys/printer.go | 1 + pkg/roachpb/internal_raft.pb.go | 233 +++++++++++++++++--- pkg/roachpb/internal_raft.proto | 10 + pkg/storage/client_raft_test.go | 2 + pkg/storage/engine/in_mem.go | 32 ++- pkg/storage/replica_init.go | 29 +++ pkg/storage/replica_raftstorage.go | 65 ++++-- pkg/storage/replica_sst_snapshot_storage.go | 147 ++++++++++++ pkg/storage/stateloader/stateloader.go | 29 +++ pkg/storage/store_snapshot.go | 136 ++++++++++-- pkg/storage/store_test.go | 92 ++++++++ 13 files changed, 714 insertions(+), 72 deletions(-) create mode 100644 pkg/storage/replica_sst_snapshot_storage.go diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index 88cd1b41fc2e..85d9a2ed0477 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -150,6 +150,10 @@ var ( // last verification timestamp (for checking integrity of on-disk data). // Note: DEPRECATED. LocalRangeLastVerificationTimestampSuffixDeprecated = []byte("rlvt") + // LocalRangeSSTSnapshotInProgress is the UUID of the snapshot in progress, + // if any. If this key is set, the replica must finish processing the + // snapshot by ingesting the SSTs of the snapshot on startup. + LocalRangeSSTSnapshotInProgressSuffix = []byte("rssp") // LocalRangePrefix is the prefix identifying per-range data indexed // by range key (either start key, or some key in the range). The // key is appended to this prefix, encoded using EncodeBytes. The diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index 56e00cff1f19..fb95a1fc75ba 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -974,3 +974,9 @@ func (b RangeIDPrefixBuf) RangeLastReplicaGCTimestampKey() roachpb.Key { func (b RangeIDPrefixBuf) RangeLastVerificationTimestampKeyDeprecated() roachpb.Key { return append(b.unreplicatedPrefix(), LocalRangeLastVerificationTimestampSuffixDeprecated...) } + +// RangeSSTSnapshotInProgress returns a range-local key for the snapshot in +// progress, in any. +func (b RangeIDPrefixBuf) RangeSSTSnapshotInProgress() roachpb.Key { + return append(b.unreplicatedPrefix(), LocalRangeSSTSnapshotInProgressSuffix...) +} diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index 63c5c4d1ec5a..55c71e24ce11 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -140,6 +140,7 @@ var ( {name: "RaftTombstone", suffix: LocalRaftTombstoneSuffix}, {name: "RaftHardState", suffix: LocalRaftHardStateSuffix}, {name: "RangeAppliedState", suffix: LocalRangeAppliedStateSuffix}, + {name: "RangeSSTSnapshotInProgress", suffix: LocalRangeSSTSnapshotInProgressSuffix}, {name: "RaftAppliedIndex", suffix: LocalRaftAppliedIndexLegacySuffix}, {name: "LeaseAppliedIndex", suffix: LocalLeaseAppliedIndexLegacySuffix}, {name: "RaftLog", suffix: LocalRaftLogSuffix, diff --git a/pkg/roachpb/internal_raft.pb.go b/pkg/roachpb/internal_raft.pb.go index 82e8a51341a5..b4523d05ed9d 100644 --- a/pkg/roachpb/internal_raft.pb.go +++ b/pkg/roachpb/internal_raft.pb.go @@ -8,6 +8,8 @@ import fmt "fmt" import math "math" import hlc "github.com/cockroachdb/cockroach/pkg/util/hlc" +import github_com_cockroachdb_cockroach_pkg_util_uuid "github.com/cockroachdb/cockroach/pkg/util/uuid" + import io "io" // Reference imports to suppress errors if they are not otherwise used. @@ -35,7 +37,7 @@ func (m *RaftTruncatedState) Reset() { *m = RaftTruncatedState{} } func (m *RaftTruncatedState) String() string { return proto.CompactTextString(m) } func (*RaftTruncatedState) ProtoMessage() {} func (*RaftTruncatedState) Descriptor() ([]byte, []int) { - return fileDescriptor_internal_raft_7877dd9fc6c34240, []int{0} + return fileDescriptor_internal_raft_f618e845b567f051, []int{0} } func (m *RaftTruncatedState) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -69,7 +71,7 @@ func (m *RaftTombstone) Reset() { *m = RaftTombstone{} } func (m *RaftTombstone) String() string { return proto.CompactTextString(m) } func (*RaftTombstone) ProtoMessage() {} func (*RaftTombstone) Descriptor() ([]byte, []int) { - return fileDescriptor_internal_raft_7877dd9fc6c34240, []int{1} + return fileDescriptor_internal_raft_f618e845b567f051, []int{1} } func (m *RaftTombstone) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -109,7 +111,7 @@ func (m *RaftSnapshotData) Reset() { *m = RaftSnapshotData{} } func (m *RaftSnapshotData) String() string { return proto.CompactTextString(m) } func (*RaftSnapshotData) ProtoMessage() {} func (*RaftSnapshotData) Descriptor() ([]byte, []int) { - return fileDescriptor_internal_raft_7877dd9fc6c34240, []int{2} + return fileDescriptor_internal_raft_f618e845b567f051, []int{2} } func (m *RaftSnapshotData) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -144,7 +146,7 @@ func (m *RaftSnapshotData_KeyValue) Reset() { *m = RaftSnapshotData_KeyV func (m *RaftSnapshotData_KeyValue) String() string { return proto.CompactTextString(m) } func (*RaftSnapshotData_KeyValue) ProtoMessage() {} func (*RaftSnapshotData_KeyValue) Descriptor() ([]byte, []int) { - return fileDescriptor_internal_raft_7877dd9fc6c34240, []int{2, 0} + return fileDescriptor_internal_raft_f618e845b567f051, []int{2, 0} } func (m *RaftSnapshotData_KeyValue) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -169,11 +171,49 @@ func (m *RaftSnapshotData_KeyValue) XXX_DiscardUnknown() { var xxx_messageInfo_RaftSnapshotData_KeyValue proto.InternalMessageInfo +// SSTSnapshotInProgressData is the persisted record that a snapshot is in +// progess which is durably written to coordinate recovery from an untimely +// crash. +type SSTSnapshotInProgressData struct { + // The uuid of the in-progress snapshot. + ID github_com_cockroachdb_cockroach_pkg_util_uuid.UUID `protobuf:"bytes,1,opt,name=id,customtype=github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" json:"id"` +} + +func (m *SSTSnapshotInProgressData) Reset() { *m = SSTSnapshotInProgressData{} } +func (m *SSTSnapshotInProgressData) String() string { return proto.CompactTextString(m) } +func (*SSTSnapshotInProgressData) ProtoMessage() {} +func (*SSTSnapshotInProgressData) Descriptor() ([]byte, []int) { + return fileDescriptor_internal_raft_f618e845b567f051, []int{3} +} +func (m *SSTSnapshotInProgressData) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SSTSnapshotInProgressData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *SSTSnapshotInProgressData) XXX_Merge(src proto.Message) { + xxx_messageInfo_SSTSnapshotInProgressData.Merge(dst, src) +} +func (m *SSTSnapshotInProgressData) XXX_Size() int { + return m.Size() +} +func (m *SSTSnapshotInProgressData) XXX_DiscardUnknown() { + xxx_messageInfo_SSTSnapshotInProgressData.DiscardUnknown(m) +} + +var xxx_messageInfo_SSTSnapshotInProgressData proto.InternalMessageInfo + func init() { proto.RegisterType((*RaftTruncatedState)(nil), "cockroach.roachpb.RaftTruncatedState") proto.RegisterType((*RaftTombstone)(nil), "cockroach.roachpb.RaftTombstone") proto.RegisterType((*RaftSnapshotData)(nil), "cockroach.roachpb.RaftSnapshotData") proto.RegisterType((*RaftSnapshotData_KeyValue)(nil), "cockroach.roachpb.RaftSnapshotData.KeyValue") + proto.RegisterType((*SSTSnapshotInProgressData)(nil), "cockroach.roachpb.SSTSnapshotInProgressData") } func (this *RaftTruncatedState) Equal(that interface{}) bool { if that == nil { @@ -331,6 +371,32 @@ func (m *RaftSnapshotData_KeyValue) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *SSTSnapshotInProgressData) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SSTSnapshotInProgressData) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintInternalRaft(dAtA, i, uint64(m.ID.Size())) + n3, err := m.ID.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n3 + return i, nil +} + func encodeVarintInternalRaft(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -484,6 +550,17 @@ func (m *RaftSnapshotData_KeyValue) Size() (n int) { return n } +func (m *SSTSnapshotInProgressData) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = m.ID.Size() + n += 1 + l + sovInternalRaft(uint64(l)) + return n +} + func sovInternalRaft(x uint64) (n int) { for { n++ @@ -936,6 +1013,86 @@ func (m *RaftSnapshotData_KeyValue) Unmarshal(dAtA []byte) error { } return nil } +func (m *SSTSnapshotInProgressData) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInternalRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SSTSnapshotInProgressData: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SSTSnapshotInProgressData: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInternalRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthInternalRaft + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.ID.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipInternalRaft(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthInternalRaft + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipInternalRaft(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -1042,38 +1199,42 @@ var ( ) func init() { - proto.RegisterFile("roachpb/internal_raft.proto", fileDescriptor_internal_raft_7877dd9fc6c34240) + proto.RegisterFile("roachpb/internal_raft.proto", fileDescriptor_internal_raft_f618e845b567f051) } -var fileDescriptor_internal_raft_7877dd9fc6c34240 = []byte{ - // 454 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xf6, 0x4f, 0x23, 0xda, 0x4d, 0xa2, 0x86, 0x55, 0x85, 0xac, 0x20, 0xec, 0xc8, 0xa7, 0x20, - 0x21, 0x47, 0xea, 0x91, 0x1b, 0x51, 0x90, 0x80, 0x48, 0x1c, 0x9c, 0x28, 0x07, 0x38, 0x58, 0x5b, - 0x7b, 0xea, 0x58, 0xb5, 0x77, 0xad, 0xf5, 0x04, 0xa5, 0x6f, 0xc1, 0x23, 0xf4, 0x31, 0x78, 0x84, - 0x70, 0xeb, 0xb1, 0xa7, 0x08, 0x9c, 0x0b, 0xcf, 0xc0, 0x09, 0x79, 0xed, 0x84, 0x54, 0x70, 0x9b, - 0xf9, 0xbe, 0x6f, 0x3e, 0xcf, 0x37, 0x5e, 0xf2, 0x5c, 0x0a, 0x16, 0x2e, 0xf3, 0xab, 0x51, 0xc2, - 0x11, 0x24, 0x67, 0x69, 0x20, 0xd9, 0x35, 0x7a, 0xb9, 0x14, 0x28, 0xe8, 0xd3, 0x50, 0x84, 0x37, - 0x4a, 0xe0, 0x35, 0xb2, 0xfe, 0xb3, 0xbd, 0x3e, 0x03, 0x64, 0x11, 0x43, 0x56, 0x4b, 0xfb, 0xd6, - 0x0a, 0x93, 0x74, 0xb4, 0x4c, 0xc3, 0x11, 0x26, 0x19, 0x14, 0xc8, 0xb2, 0xbc, 0x61, 0x2e, 0x62, - 0x11, 0x0b, 0x55, 0x8e, 0xaa, 0xaa, 0x46, 0xdd, 0x39, 0xa1, 0x3e, 0xbb, 0xc6, 0xb9, 0x5c, 0xf1, - 0x90, 0x21, 0x44, 0x33, 0x64, 0x08, 0xb4, 0x4f, 0x5a, 0x09, 0x8f, 0x60, 0x6d, 0xe9, 0x03, 0x7d, - 0x78, 0x32, 0x3e, 0xd9, 0x6c, 0x1d, 0xcd, 0xaf, 0x21, 0x6a, 0x91, 0x13, 0x04, 0x99, 0x59, 0xc6, - 0x11, 0xa5, 0x90, 0xd7, 0xa7, 0xdf, 0xee, 0x1c, 0xfd, 0xd7, 0x9d, 0xa3, 0xbb, 0x9f, 0x49, 0x57, - 0xb9, 0x8a, 0xec, 0xaa, 0x40, 0xc1, 0x81, 0x7e, 0x20, 0xe7, 0x1c, 0xd6, 0x18, 0x48, 0xc8, 0xd3, - 0x24, 0x64, 0x41, 0x12, 0x29, 0xeb, 0xd6, 0xd8, 0xad, 0xe6, 0xcb, 0xad, 0xd3, 0xfd, 0x08, 0x6b, - 0xf4, 0x6b, 0xf6, 0xfd, 0xe4, 0xf7, 0xd6, 0x39, 0x3b, 0x34, 0x7e, 0x97, 0x1f, 0x71, 0x91, 0xfb, - 0xdd, 0x20, 0xbd, 0xca, 0x7d, 0xc6, 0x59, 0x5e, 0x2c, 0x05, 0x4e, 0x18, 0x32, 0x3a, 0x23, 0x3d, - 0xc9, 0x78, 0x0c, 0x41, 0x04, 0x45, 0x28, 0x93, 0x1c, 0x85, 0x54, 0x5f, 0x68, 0x5f, 0xba, 0xde, - 0x3f, 0xd7, 0xf3, 0xfc, 0x4a, 0x3a, 0x39, 0x28, 0x9b, 0x14, 0xe7, 0xf2, 0x31, 0x4c, 0xdf, 0x11, - 0x63, 0xba, 0xb0, 0x8c, 0x81, 0x39, 0x6c, 0x5f, 0xbe, 0xfa, 0xaf, 0xcd, 0xe3, 0x2d, 0xbc, 0x29, - 0xdc, 0x2e, 0x58, 0xba, 0x82, 0x31, 0x69, 0x62, 0x19, 0xd3, 0x85, 0x6f, 0x4c, 0x17, 0xd4, 0x21, - 0xed, 0x54, 0xc4, 0x01, 0x70, 0x94, 0x09, 0x14, 0x96, 0x39, 0x30, 0x87, 0x1d, 0x9f, 0xa4, 0x22, - 0x7e, 0x5b, 0x23, 0xfd, 0x15, 0x39, 0xdd, 0x0f, 0xd3, 0x1e, 0x31, 0x6f, 0xe0, 0x56, 0xad, 0xdf, - 0xf1, 0xab, 0x92, 0x5e, 0x90, 0xd6, 0x97, 0x8a, 0x52, 0x47, 0xef, 0xf8, 0x75, 0x43, 0xdf, 0x90, - 0xb3, 0xc3, 0x4f, 0xb6, 0x4c, 0x15, 0xf6, 0xc5, 0xd1, 0x96, 0xd5, 0x4b, 0xf0, 0x96, 0x69, 0xe8, - 0xcd, 0xf7, 0xa2, 0x26, 0xe7, 0xdf, 0xa9, 0xf1, 0xcb, 0xcd, 0x4f, 0x5b, 0xdb, 0x94, 0xb6, 0x7e, - 0x5f, 0xda, 0xfa, 0x43, 0x69, 0xeb, 0x3f, 0x4a, 0x5b, 0xff, 0xba, 0xb3, 0xb5, 0xfb, 0x9d, 0xad, - 0x3d, 0xec, 0x6c, 0xed, 0xd3, 0x93, 0x26, 0xec, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xa5, 0x84, - 0xa0, 0xae, 0xa2, 0x02, 0x00, 0x00, +var fileDescriptor_internal_raft_f618e845b567f051 = []byte{ + // 521 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0x41, 0x6e, 0xd3, 0x40, + 0x14, 0x8d, 0x9d, 0x46, 0xb4, 0x93, 0x44, 0x0d, 0xa3, 0x0a, 0x99, 0x20, 0xec, 0xc8, 0xab, 0x20, + 0x21, 0x5b, 0x2a, 0x3b, 0x76, 0x8d, 0x82, 0x44, 0x88, 0x84, 0xc0, 0x49, 0xb3, 0x80, 0x45, 0x34, + 0xb1, 0xa7, 0xce, 0x28, 0xf6, 0x8c, 0x35, 0xfe, 0x46, 0xe9, 0x2d, 0x38, 0x42, 0x8f, 0xc1, 0x11, + 0xc2, 0xae, 0xcb, 0x8a, 0x45, 0x04, 0xc9, 0x86, 0x33, 0xb0, 0x42, 0x1e, 0x3b, 0x69, 0x2a, 0xd8, + 0xfd, 0x79, 0xef, 0xfd, 0x3f, 0xff, 0xbd, 0x19, 0xf4, 0x4c, 0x0a, 0xe2, 0xcf, 0x93, 0x99, 0xcb, + 0x38, 0x50, 0xc9, 0x49, 0x34, 0x95, 0xe4, 0x0a, 0x9c, 0x44, 0x0a, 0x10, 0xf8, 0xb1, 0x2f, 0xfc, + 0x85, 0x12, 0x38, 0xa5, 0xac, 0xfd, 0x64, 0xa7, 0x8f, 0x29, 0x90, 0x80, 0x00, 0x29, 0xa4, 0x6d, + 0x23, 0x03, 0x16, 0xb9, 0xf3, 0xc8, 0x77, 0x81, 0xc5, 0x34, 0x05, 0x12, 0x27, 0x25, 0x73, 0x16, + 0x8a, 0x50, 0xa8, 0xd2, 0xcd, 0xab, 0x02, 0xb5, 0xc7, 0x08, 0x7b, 0xe4, 0x0a, 0xc6, 0x32, 0xe3, + 0x3e, 0x01, 0x1a, 0x8c, 0x80, 0x00, 0xc5, 0x6d, 0x54, 0x63, 0x3c, 0xa0, 0x4b, 0x43, 0xeb, 0x68, + 0xdd, 0xa3, 0xde, 0xd1, 0x6a, 0x6d, 0x55, 0xbc, 0x02, 0xc2, 0x06, 0x3a, 0x02, 0x2a, 0x63, 0x43, + 0x3f, 0xa0, 0x14, 0xf2, 0xfa, 0xf8, 0xdb, 0x8d, 0xa5, 0xfd, 0xbe, 0xb1, 0x34, 0xfb, 0x33, 0x6a, + 0xaa, 0xa9, 0x22, 0x9e, 0xa5, 0x20, 0x38, 0xc5, 0xef, 0xd0, 0x29, 0xa7, 0x4b, 0x98, 0x4a, 0x9a, + 0x44, 0xcc, 0x27, 0x53, 0x16, 0xa8, 0xd1, 0xb5, 0x9e, 0x9d, 0xf7, 0x6f, 0xd6, 0x56, 0xf3, 0x3d, + 0x5d, 0x82, 0x57, 0xb0, 0x83, 0xfe, 0x9f, 0xb5, 0x75, 0xb2, 0x3f, 0x78, 0x4d, 0x7e, 0xc0, 0x05, + 0xf6, 0x77, 0x1d, 0xb5, 0xf2, 0xe9, 0x23, 0x4e, 0x92, 0x74, 0x2e, 0xa0, 0x4f, 0x80, 0xe0, 0x11, + 0x6a, 0x49, 0xc2, 0x43, 0x3a, 0x0d, 0x68, 0xea, 0x4b, 0x96, 0x80, 0x90, 0xea, 0x86, 0xfa, 0xb9, + 0xed, 0xfc, 0x93, 0x9e, 0xe3, 0xe5, 0xd2, 0xfe, 0x5e, 0x59, 0xba, 0x38, 0x95, 0x0f, 0x61, 0xfc, + 0x16, 0xe9, 0xc3, 0x89, 0xa1, 0x77, 0xaa, 0xdd, 0xfa, 0xf9, 0xcb, 0xff, 0x8e, 0x79, 0xb8, 0x85, + 0x33, 0xa4, 0xd7, 0x13, 0x12, 0x65, 0xb4, 0x87, 0x4a, 0x5b, 0xfa, 0x70, 0xe2, 0xe9, 0xc3, 0x09, + 0xb6, 0x50, 0x3d, 0x12, 0xe1, 0x94, 0x72, 0x90, 0x8c, 0xa6, 0x46, 0xb5, 0x53, 0xed, 0x36, 0x3c, + 0x14, 0x89, 0xf0, 0x4d, 0x81, 0xb4, 0x33, 0x74, 0xbc, 0x6b, 0xc6, 0x2d, 0x54, 0x5d, 0xd0, 0x6b, + 0xb5, 0x7e, 0xc3, 0xcb, 0x4b, 0x7c, 0x86, 0x6a, 0x5f, 0x72, 0x4a, 0x85, 0xde, 0xf0, 0x8a, 0x03, + 0xbe, 0x40, 0x27, 0xfb, 0x47, 0x36, 0xaa, 0xca, 0xec, 0xf3, 0x83, 0x2d, 0xf3, 0x9f, 0xe0, 0xcc, + 0x23, 0xdf, 0x19, 0xef, 0x44, 0xa5, 0xcf, 0xfb, 0x2e, 0x9b, 0xa3, 0xa7, 0xa3, 0xd1, 0x78, 0xe7, + 0x61, 0xc0, 0x3f, 0x48, 0x11, 0x4a, 0x9a, 0xa6, 0x2a, 0xd3, 0x8f, 0x48, 0x2f, 0xdf, 0xa9, 0xd1, + 0xbb, 0xc8, 0x3b, 0x7f, 0xac, 0xad, 0x57, 0x21, 0x83, 0x79, 0x36, 0x73, 0x7c, 0x11, 0xbb, 0xfb, + 0xab, 0x82, 0xd9, 0x7d, 0xed, 0x26, 0x8b, 0xd0, 0x55, 0x9f, 0x30, 0xcb, 0x58, 0xe0, 0x5c, 0x5e, + 0x0e, 0xfa, 0x79, 0x0e, 0x83, 0xbe, 0xa7, 0xb3, 0xa0, 0xf7, 0x62, 0xf5, 0xcb, 0xac, 0xac, 0x36, + 0xa6, 0x76, 0xbb, 0x31, 0xb5, 0xbb, 0x8d, 0xa9, 0xfd, 0xdc, 0x98, 0xda, 0xd7, 0xad, 0x59, 0xb9, + 0xdd, 0x9a, 0x95, 0xbb, 0xad, 0x59, 0xf9, 0xf4, 0xa8, 0x0c, 0xf7, 0x6f, 0x00, 0x00, 0x00, 0xff, + 0xff, 0x26, 0x41, 0xe6, 0xed, 0x12, 0x03, 0x00, 0x00, } diff --git a/pkg/roachpb/internal_raft.proto b/pkg/roachpb/internal_raft.proto index 78bddd5e4e6f..0fa34eed7515 100644 --- a/pkg/roachpb/internal_raft.proto +++ b/pkg/roachpb/internal_raft.proto @@ -51,3 +51,13 @@ message RaftSnapshotData { // roundtripping through memory. repeated bytes log_entries = 3; } + +// SSTSnapshotInProgressData is the persisted record that a snapshot is in +// progess which is durably written to coordinate recovery from an untimely +// crash. +message SSTSnapshotInProgressData { + // The uuid of the in-progress snapshot. + optional bytes id = 1 [(gogoproto.nullable) = false, + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.customname) = "ID"]; +} diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index b6aef43fd271..7fa3698e61d2 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1070,6 +1071,7 @@ func TestFailedSnapshotFillsReservation(t *testing.T) { RangeSize: 100, State: storagepb.ReplicaState{Desc: rep.Desc()}, } + header.RaftMessageRequest.Message.Snapshot.Data = uuid.UUID{}.GetBytes() // Cause this stream to return an error as soon as we ask it for something. // This injects an error into HandleSnapshotStream when we try to send the // "snapshot accepted" message. diff --git a/pkg/storage/engine/in_mem.go b/pkg/storage/engine/in_mem.go index d0d39b894513..d85b469b600f 100644 --- a/pkg/storage/engine/in_mem.go +++ b/pkg/storage/engine/in_mem.go @@ -10,7 +10,13 @@ package engine -import "github.com/cockroachdb/cockroach/pkg/roachpb" +import ( + "context" + "io/ioutil" + "os" + + "github.com/cockroachdb/cockroach/pkg/roachpb" +) // InMem wraps RocksDB and configures it for in-memory only storage. type InMem struct { @@ -38,4 +44,28 @@ func NewInMem(attrs roachpb.Attributes, cacheSize int64) InMem { return db } +// IngestExternalFiles for an in-memory RocksDB first loads each file into +// memory, then ingests them (again, in memory). This implementation is +// provided solely to make tests work. +func (db InMem) IngestExternalFiles( + ctx context.Context, paths []string, skipWritingSeqNo, allowFileModifications bool, +) error { + for _, file := range paths { + data, err := ioutil.ReadFile(file) + if err != nil { + if os.IsNotExist(err) { + // The file may already be in the correct in-memory env. Ignore + // the error, it will be caught by IngestExternalFiles if the + // file truly is missing. + continue + } + return err + } + if err := db.RocksDB.WriteFile(file, data); err != nil { + return err + } + } + return db.RocksDB.IngestExternalFiles(ctx, paths, skipWritingSeqNo, allowFileModifications) +} + var _ Engine = InMem{} diff --git a/pkg/storage/replica_init.go b/pkg/storage/replica_init.go index b67ff73755ba..a5beb46d0c43 100644 --- a/pkg/storage/replica_init.go +++ b/pkg/storage/replica_init.go @@ -152,6 +152,35 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked( return err } + // If we crashed before finishing applying the disk changes of a snapshot, we + // must ingest the SSTs. Note that ingesting the same set of SSTs is + // idempotent so removing SSTSnapshotInProgressData and performing the + // ingestion does not have to be atomic to be safe. + sstSnapshotInProgressData, found, err := r.mu.stateLoader.LoadSSTSnapshotInProgressData(ctx, r.store.Engine()) + if err != nil { + return err + } + // TODO(jeffreyxiao): Test an untimely crash here. Since + // SSTSnapshotInProgressData still exists, the same set of SSTs should be + // ingested again. This is safe because ingesting the same set of SSTs should + // be idempotent. + if found { + sss, err := newSSTSnapshotStorage(r.store.cfg.Settings, desc.RangeID, sstSnapshotInProgressData.ID, + r.store.engine.GetAuxiliaryDir(), r.store.limiters.BulkIOWriteRate, r.store.Engine()) + if err != nil { + return err + } + if err := r.store.Engine().IngestExternalFiles(ctx, sss.ssts, true /* skipWritingSeqNo */, true /* modify */); err != nil { + return err + } + if err := r.mu.stateLoader.DeleteSSTSnapshotInProgressData(ctx, r.store.Engine()); err != nil { + return err + } + if err := sss.Clear(); err != nil { + return err + } + } + r.assertStateLocked(ctx, r.store.Engine()) return nil } diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 7d6793b4e288..d81288e87cb2 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -482,8 +482,10 @@ func (s *OutgoingSnapshot) Close() { // IncomingSnapshot contains the data for an incoming streaming snapshot message. type IncomingSnapshot struct { SnapUUID uuid.UUID - // The RocksDB BatchReprs that make up this snapshot. - Batches [][]byte + // SSTables that make up this snapshot. + SSTs []string + // The RocksDB BatchRepr that makes up this snapshot. + Batch []byte // The Raft log entries for this snapshot. LogEntries [][]byte // The replica state at the time the snapshot was generated (never nil). @@ -817,17 +819,15 @@ func (r *Replica) applySnapshot( } var size int - for _, b := range inSnap.Batches { - size += len(b) - } + size += len(inSnap.Batch) for _, e := range inSnap.LogEntries { size += len(e) } log.Infof(ctx, "applying %s snapshot at index %d "+ - "(id=%s, encoded size=%d, %d rocksdb batches, %d log entries)", + "(id=%s, encoded size=%d, %d log entries, %d SSTs)", snapType, snap.Metadata.Index, inSnap.SnapUUID.Short(), - size, len(inSnap.Batches), len(inSnap.LogEntries)) + size, len(inSnap.LogEntries), len(inSnap.SSTs)) defer func(start time.Time) { now := timeutil.Now() log.Infof(ctx, "applied %s snapshot in %0.0fms [clear=%0.0fms batch=%0.0fms entries=%0.0fms commit=%0.0fms]", @@ -866,6 +866,10 @@ func (r *Replica) applySnapshot( // Delete everything in the range and recreate it from the snapshot. // We need to delete any old Raft log entries here because any log entries // that predate the snapshot will be orphaned and never truncated or GC'd. + // TODO(jeffreyxiao): The downside of putting the data range deletions in the + // batch is that it guarantee that something in the memtable overlaps with + // the SST causing it to get stuck in a higher level of the LSM. Need to + // determine if adding a range deletion tombstone is better. if err := clearRangeData(ctx, s.Desc, r.store.Engine(), batch, true /* destroyData */); err != nil { return err } @@ -875,10 +879,8 @@ func (r *Replica) applySnapshot( stats.clear = timeutil.Now() // Write the snapshot into the range. - for _, batchRepr := range inSnap.Batches { - if err := batch.ApplyBatchRepr(batchRepr, false); err != nil { - return err - } + if err := batch.ApplyBatchRepr(inSnap.Batch, false); err != nil { + return err } // The log entries are all written to distinct keys so we can use a @@ -886,18 +888,28 @@ func (r *Replica) applySnapshot( distinctBatch := batch.Distinct() stats.batch = timeutil.Now() + rsl := stateloader.Make(s.Desc.RangeID) if inSnap.UsesUnreplicatedTruncatedState { // We're using the unreplicated truncated state, which we need to // manually persist to disk. If we're not taking this branch, the // snapshot contains a legacy TruncatedState and we don't need to do // anything (in fact, must not -- the invariant is that exactly one of // them exists at any given point in the state machine). - if err := stateloader.Make(s.Desc.RangeID).SetRaftTruncatedState( + if err := rsl.SetRaftTruncatedState( ctx, distinctBatch, s.TruncatedState, ); err != nil { return err } } + // There is a possibility that the number of SSTs is zero because it is an + // error to ingest an empty SST. In this case, there won't be any in progress + // snapshots after we commit this batch because there are no SSTs to ingest. + if len(inSnap.SSTs) > 0 { + sstSnapshotInProgressData := roachpb.SSTSnapshotInProgressData{ID: inSnap.SnapUUID} + if err := rsl.SetSSTSnapshotInProgressData(ctx, distinctBatch, sstSnapshotInProgressData); err != nil { + return err + } + } logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) for i, bytes := range inSnap.LogEntries { @@ -955,8 +967,33 @@ func (r *Replica) applySnapshot( stats.commit = timeutil.Now() // The on-disk state is now committed, but the corresponding in-memory state - // has not yet been updated. Any errors past this point must therefore be - // treated as fatal. + // has not yet been updated and the data SST has not been ingested. Any + // errors past this point must therefore be treated as fatal. If the node + // crashes before the data SST is ingested, the unreplicated range-ID local + // key LocalRangeSSTSnapshotInProgress will indicate to ingest the data SST + // on replica startup. + + // There is a possibility that the number of SSTs is zero because it is an + // error to ingest an empty SST. + if len(inSnap.SSTs) > 0 { + // TODO(jeffreyxiao): Test an untimely crash here. + // Since SSTSnapshotInProgressData is persisted as part of the batch, a + // crash here is safe because the SSTs should be ingested on replica + // startup. + if err := r.store.engine.IngestExternalFiles(ctx, inSnap.SSTs, true /* skipWritingSeqNo */, true /* modify */); err != nil { + log.Fatalf(ctx, "unable to ingest SSTs %s while applying snapshot: %+v", inSnap.SSTs, err) + } + + // TODO(jeffreyxiao): Test an untimely crash here. + // If deleting SSTSnapshotInProgressData fails, we will ingest the same set + // of SSTs on replica startup. A crash here is safe because ingesting the + // same set of SSTs should be idempotent. Therefore, removing + // SSTSnapshotInProgressData and performing the ingestion does not have to + // be atomic to be safe. + if err := rsl.DeleteSSTSnapshotInProgressData(ctx, r.store.engine); err != nil { + log.Fatalf(ctx, "unable to remove RangeSSTSnapshotInProgress: %+v", err) + } + } for _, sr := range subsumedRepls { // We removed sr's data when we committed the batch. Finish subsumption by diff --git a/pkg/storage/replica_sst_snapshot_storage.go b/pkg/storage/replica_sst_snapshot_storage.go new file mode 100644 index 000000000000..5d68d6ed0327 --- /dev/null +++ b/pkg/storage/replica_sst_snapshot_storage.go @@ -0,0 +1,147 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/pkg/errors" + "golang.org/x/time/rate" +) + +// SSTSnapshotStorage keeps track of the SST files created when receiving a +// snapshot with the SST strategy. +type SSTSnapshotStorage struct { + st *cluster.Settings + limiter *rate.Limiter + activeFile bool + currFile *os.File + ssts []string + rangeDir string + snapDir string + dirCreated bool + eng engine.Engine +} + +func newSSTSnapshotStorage( + st *cluster.Settings, + rangeID roachpb.RangeID, + snapUUID uuid.UUID, + baseDir string, + limiter *rate.Limiter, + eng engine.Engine, +) (*SSTSnapshotStorage, error) { + rangeDir := filepath.Join(baseDir, "sstsnapshot") + snapDir := filepath.Join(rangeDir, snapUUID.String()) + var ssts []string + matches, err := filepath.Glob(filepath.Join(snapDir, "*.sst")) + if err != nil { + return nil, err + } + ssts = append(ssts, matches...) + sss := &SSTSnapshotStorage{ + st: st, + limiter: limiter, + activeFile: false, + currFile: nil, + ssts: ssts, + rangeDir: rangeDir, + snapDir: snapDir, + eng: eng, + } + return sss, nil +} + +func (sss *SSTSnapshotStorage) filename(index int) string { + return filepath.Join(sss.snapDir, fmt.Sprintf("%d.sst", index)) +} + +func (sss *SSTSnapshotStorage) createDir() error { + err := os.MkdirAll(sss.snapDir, 0755) + sss.dirCreated = sss.dirCreated || err == nil + return err +} + +func (sss *SSTSnapshotStorage) createFile(index int) error { + if !sss.dirCreated { + if err := sss.createDir(); err != nil { + return err + } + } + filename := sss.filename(index) + // Use 0644 since that's what RocksDB uses: + // https://github.com/facebook/rocksdb/blob/56656e12d67d8a63f1e4c4214da9feeec2bd442b/env/env_posix.cc#L171 + currFile, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + sss.currFile = currFile + return err +} + +// NewFile adds another file to SSSSnapshotStorage. This file is lazily created +// when the file is written to the first time. This file must be closed before +// NewFile is called again. +func (sss *SSTSnapshotStorage) NewFile() error { + if sss.activeFile { + return errors.New("exists an active file that hasn't been closed") + } + sss.ssts = append(sss.ssts, sss.filename(len(sss.ssts))) + sss.activeFile = true + return nil +} + +// Write writes contents to the current file while respecting the limiter +// passed into SSTSnapshotStorage. +func (sss *SSTSnapshotStorage) Write(ctx context.Context, contents []byte) error { + if !sss.activeFile { + return errors.New("no active file") + } + if sss.currFile == nil { + if err := sss.createFile(len(sss.ssts) - 1); err != nil { + return err + } + } + // TODO(jeffreyxiao): We should limit the size of a single SST, but right now + // we won't need this because the size of ranges aren't large enough. + limitBulkIOWrite(ctx, sss.limiter, len(contents)) + _, err := sss.currFile.Write(contents) + return err +} + +// SSTs returns the names of the files created. +func (sss *SSTSnapshotStorage) SSTs() []string { + return sss.ssts +} + +// Close closes the current file, if any. Calling this function multiple times +// is idempotent. +func (sss *SSTSnapshotStorage) Close() error { + // We throw an error for empty files because it would be an error to ingest + // an empty SST so catch this error earlier. + if sss.activeFile && sss.currFile == nil { + return errors.New("closing an empty file") + } + if sss.currFile != nil { + return sss.currFile.Close() + } + sss.activeFile = false + return nil +} + +// Clear removes the directory and all SST files created. +func (sss *SSTSnapshotStorage) Clear() error { + return os.RemoveAll(sss.rangeDir) +} diff --git a/pkg/storage/stateloader/stateloader.go b/pkg/storage/stateloader/stateloader.go index 4d0b2c7b27bc..04553b7c4baa 100644 --- a/pkg/storage/stateloader/stateloader.go +++ b/pkg/storage/stateloader/stateloader.go @@ -629,3 +629,32 @@ func (rsl StateLoader) SynthesizeHardState( err := rsl.SetHardState(ctx, eng, newHS) return errors.Wrapf(err, "writing HardState %+v", &newHS) } + +// LoadSSTSnapshotInProgressData returns the record that indicates an +// in-progress snapshot, if any. +func (rsl StateLoader) LoadSSTSnapshotInProgressData( + ctx context.Context, reader engine.Reader, +) (roachpb.SSTSnapshotInProgressData, bool, error) { + var sstSnapshotInProgressData roachpb.SSTSnapshotInProgressData + found, err := engine.MVCCGetProto(ctx, reader, rsl.RangeSSTSnapshotInProgress(), + hlc.Timestamp{}, &sstSnapshotInProgressData, engine.MVCCGetOptions{}) + return sstSnapshotInProgressData, found, err +} + +// SetSSTSnapshotInProgressData persists the record that indicates an +// in-progress snapshot. +func (rsl StateLoader) SetSSTSnapshotInProgressData( + ctx context.Context, + eng engine.ReadWriter, + sstSnapshotInProgressData roachpb.SSTSnapshotInProgressData, +) error { + return engine.MVCCPutProto(ctx, eng, nil /* ms */, rsl.RangeSSTSnapshotInProgress(), hlc.Timestamp{}, nil, &sstSnapshotInProgressData) +} + +// DeleteSSTSnapshotInProgressData deletes the record that indicates an +// in-progress snapshot. +func (rsl StateLoader) DeleteSSTSnapshotInProgressData( + ctx context.Context, eng engine.ReadWriter, +) error { + return engine.MVCCDelete(ctx, eng, nil, rsl.RangeSSTSnapshotInProgress(), hlc.Timestamp{}, nil) +} diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 7aafe83985a4..325f5bc9922f 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -78,6 +79,9 @@ type snapshotStrategy interface { // Status provides a status report on the work performed during the // snapshot. Only valid if the strategy succeeded. Status() string + + // Close cleans up any resources associated with the snapshot strategy. + Close(context.Context) } func assertStrategy( @@ -95,9 +99,11 @@ type kvBatchSnapshotStrategy struct { status string // Fields used when sending snapshots. - batchSize int64 - limiter *rate.Limiter - newBatch func() engine.Batch + batchSize int64 + sstChunkSize int64 + limiter *rate.Limiter + sss *SSTSnapshotStorage + newBatch func() engine.Batch } // Receive implements the snapshotStrategy interface. @@ -106,35 +112,99 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( ) (IncomingSnapshot, error) { assertStrategy(ctx, header, SnapshotRequest_KV_BATCH) - var batches [][]byte + var ssts []string var logEntries [][]byte + var err error + + emptySST := true + b := kvSS.newBatch() + defer b.Close() + + if err := kvSS.sss.NewFile(); err != nil { + return noSnap, errors.Wrap(err, "failed to create sst file") + } + + lastSizeCheck := int64(0) + sst, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + return noSnap, errors.Wrap(err, "failed to create sst file writer") + } + for { req, err := stream.Recv() if err != nil { - return IncomingSnapshot{}, err + return noSnap, err } if req.Header != nil { err := errors.New("client error: provided a header mid-stream") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(stream, err) } if req.KVBatch != nil { - batches = append(batches, req.KVBatch) + batchReader, err := engine.NewRocksDBBatchReader(req.KVBatch) + if err != nil { + return noSnap, errors.Wrap(err, "failed to decode batch") + } + // All operations in the batch are guaranteed to be puts. + for batchReader.Next() { + if batchReader.BatchType() != engine.BatchTypeValue { + log.Fatalf(ctx, "expected type %s, found type %s", engine.BatchTypeValue, batchReader.BatchType()) + } + key, err := batchReader.MVCCKey() + if err != nil { + return noSnap, errors.Wrap(err, "failed to decode mvcc key") + } + // Add the key to the sst table if it is not a part of the local key + // range. + if key.Key.Compare(keys.LocalMax) >= 0 { + if err := sst.Put(key, batchReader.Value()); err != nil { + return noSnap, errors.Wrap(err, "failed to put in sst") + } + emptySST = false + if sst.DataSize-lastSizeCheck > kvSS.sstChunkSize { + lastSizeCheck = sst.DataSize + chunk, err := sst.Truncate() + if err != nil { + return noSnap, errors.Wrap(err, "failed to truncate sst") + } + if err := kvSS.sss.Write(ctx, chunk); err != nil { + return noSnap, errors.Wrap(err, "failed to write to sst file") + } + } + } else if err := b.Put(key, batchReader.Value()); err != nil { + return noSnap, errors.Wrap(err, "failed to put in batch") + } + } } if req.LogEntries != nil { logEntries = append(logEntries, req.LogEntries...) } if req.Final { + if !emptySST { + chunk, err := sst.Finish() + if err != nil { + return noSnap, errors.Wrap(err, "failed to finish sst") + } + if err := kvSS.sss.Write(ctx, chunk); err != nil { + return noSnap, errors.Wrap(err, "failed to write to sst file") + } + if err := kvSS.sss.Close(); err != nil { + return noSnap, errors.Wrap(err, "failed to close sst file") + } + ssts = append(ssts, kvSS.sss.SSTs()...) + } + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { - err = errors.Wrap(err, "invalid snapshot") - return IncomingSnapshot{}, sendSnapshotError(stream, err) + err = errors.Wrap(err, "client error: invalid snapshot") + return noSnap, sendSnapshotError(stream, err) } inSnap := IncomingSnapshot{ UsesUnreplicatedTruncatedState: header.UnreplicatedTruncatedState, SnapUUID: snapUUID, - Batches: batches, + Batch: b.Repr(), + SSTs: ssts, LogEntries: logEntries, State: &header.State, snapType: header.Type, @@ -153,7 +223,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( inSnap.snapType = SnapshotRequest_PREEMPTIVE } - kvSS.status = fmt.Sprintf("kv batches: %d, log entries: %d", len(batches), len(logEntries)) + kvSS.status = fmt.Sprintf("log entries: %d, ssts: %d", len(logEntries), len(ssts)) return inSnap, nil } } @@ -190,10 +260,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } if int64(b.Len()) >= kvSS.batchSize { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } b = nil @@ -204,10 +271,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } } if b != nil { - if err := kvSS.limiter.WaitN(ctx, 1); err != nil { - return err - } - if err := kvSS.sendBatch(stream, b); err != nil { + if err := kvSS.sendBatch(ctx, stream, b); err != nil { return err } } @@ -330,8 +394,11 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } func (kvSS *kvBatchSnapshotStrategy) sendBatch( - stream outgoingSnapshotStream, batch engine.Batch, + ctx context.Context, stream outgoingSnapshotStream, batch engine.Batch, ) error { + if err := kvSS.limiter.WaitN(ctx, 1); err != nil { + return err + } repr := batch.Repr() batch.Close() return stream.Send(&SnapshotRequest{KVBatch: repr}) @@ -340,6 +407,16 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( // Status implements the snapshotStrategy interface. func (kvSS *kvBatchSnapshotStrategy) Status() string { return kvSS.status } +// Close implements the snapshotStrategy interface. +func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { + if kvSS.sss != nil { + // Nothing actionable to do when removing directory fails. + if err := kvSS.sss.Clear(); err != nil { + log.Warningf(ctx, "error closing kvBatchSnapshotStrategy: %v", err) + } + } +} + // reserveSnapshot throttles incoming snapshots. The returned closure is used // to cleanup the reservation and release its resources. A nil cleanup function // and a non-empty rejectionMessage indicates the reservation was declined. @@ -631,8 +708,25 @@ func (s *Store) receiveSnapshot( var ss snapshotStrategy switch header.Strategy { case SnapshotRequest_KV_BATCH: + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) + if err != nil { + err = errors.Wrap(err, "invalid snapshot") + return sendSnapshotError(stream, err) + } + sss, err := newSSTSnapshotStorage(s.cfg.Settings, header.State.Desc.RangeID, snapUUID, + s.engine.GetAuxiliaryDir(), s.limiters.BulkIOWriteRate, s.engine) + if err != nil { + return sendSnapshotError(stream, + errors.Wrapf(err, "%s,r%d: error opening sst snapshot storage", + s, header.State.Desc.RangeID), + ) + } + ss = &kvBatchSnapshotStrategy{ - raftCfg: &s.cfg.RaftConfig, + raftCfg: &s.cfg.RaftConfig, + sss: sss, + sstChunkSize: 256 << 10, /* 256 KB */ + newBatch: s.Engine().NewBatch, } default: return sendSnapshotError(stream, diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 51e248c7d5b3..77bb8f139fa9 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -3365,6 +3365,98 @@ func TestSnapshotRateLimit(t *testing.T) { } } +// TestSnapshotSSTRecovery simulates a node crash just before the SSTs of a +// snapshot get ingested to test that when a replica starts up, the SSTs are +// properly ingested. +func TestSnapshotSSTRecovery(t *testing.T) { + defer leaktest.AfterTest(t)() + + testKey := testutils.MakeKey(keys.MakeTablePrefix(50), roachpb.RKey("a")) + testTimestamp := hlc.MinTimestamp + testValue := roachpb.MakeValueFromBytesAndTimestamp([]byte("foo"), testTimestamp) + // The test is setup so that the testRangeID corresponds to the range that + // starts with testKey. + testRangeID := roachpb.RangeID(7) + testUUID := uuid.UUID{} + + stopper := stop.NewStopper() + ctx := context.TODO() + defer stopper.Stop(ctx) + + cfg := TestStoreConfig(nil) + cfg.Transport = NewDummyRaftTransport(cfg.Settings) + cfg.DB = client.NewDB(cfg.AmbientCtx, &testSenderFactory{}, cfg.Clock) + + // Initialize engine. + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + stopper.AddCloser(eng) + if err := InitEngine(ctx, eng, testIdent, cfg.Settings.Version.BootstrapVersion()); err != nil { + t.Fatal(err) + } + + // Create splits. + var splits []roachpb.RKey + splits = config.StaticSplits() + splits = append(splits, testKey) + sort.Slice(splits, func(i, j int) bool { + return splits[i].Less(splits[j]) + }) + if err := WriteInitialClusterData( + ctx, eng, nil /* initialValues */, cfg.Settings.Version.BootstrapVersion().Version, + 1 /* numStores */, splits, cfg.Clock.PhysicalNow(), + ); err != nil { + t.Fatal(err) + } + + rsl := stateloader.Make(testRangeID) + sstSnapshotInProgressData := roachpb.SSTSnapshotInProgressData{ID: testUUID} + if err := rsl.SetSSTSnapshotInProgressData(ctx, eng, sstSnapshotInProgressData); err != nil { + t.Fatal(err) + } + + // Create a test SST to ingest. + sst, err := engine.MakeRocksDBSstFileWriter() + if err != nil { + t.Fatal(err) + } + if err := sst.Put(engine.MVCCKey{Key: roachpb.Key(testKey), Timestamp: testTimestamp}, testValue.RawBytes); err != nil { + t.Fatal(err) + } + sss, err := newSSTSnapshotStorage(cfg.Settings, testRangeID, testUUID, + eng.GetAuxiliaryDir(), rate.NewLimiter(rate.Inf, 0), eng) + if err != nil { + t.Fatal(err) + } + if err := sss.NewFile(); err != nil { + t.Fatal(err) + } + data, err := sst.Finish() + if err != nil { + t.Fatal(err) + } + if err := sss.Write(ctx, data); err != nil { + t.Fatal(err) + } + if err := sss.Close(); err != nil { + t.Fatal(err) + } + + // Start store. + store := NewStore(ctx, cfg, eng, &roachpb.NodeDescriptor{NodeID: 1}) + if err := store.Start(ctx, stopper); err != nil { + t.Fatalf("failure initializing bootstrapped store: %+v", err) + } + + // Verify that SST has been ingested. + if value, _, err := engine.MVCCGet( + ctx, store.Engine(), testKey, testTimestamp, engine.MVCCGetOptions{}, + ); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(value, &testValue) { + t.Fatalf("expected key '%s' to exist with value '%s', but found '%s'", testKey, testValue, value) + } +} + func BenchmarkStoreGetReplica(b *testing.B) { stopper := stop.NewStopper() defer stopper.Stop(context.TODO())