Skip to content

Commit

Permalink
[DNM] storage: introduce SST snapshot strategy
Browse files Browse the repository at this point in the history
Fixes #16954.
Related to #25047.

This depends on the following two upstream changes to RockDB:
- facebook/rocksdb#3778
- facebook/rocksdb#3779

The change introduces a new snapshot strategy called "SST". This strategy
stream sst files consisting of all keys in a range from the sender to the
receiver. These sst files are then atomically ingested directly into RocksDB.
An important property of the strategy is that the amount of memory required
for a receiver using the strategy is constant with respect to the size of
a range, instead of linear as it is with the KV_BATCH strategy. This will
be critical for increasing the default range size and potentially for
increasing the number of concurrent snapshots allowed per node. The
strategy also seems to significantly speed up snapshots once ranges are
above a certain size (somewhere in the single digit MBs).

This is a WIP change. Before it can be merged it needs:
- to be cleaned up a bit
- more testing (unit test, testing knobs, maybe some chaos)
- proper version handling
- heuristic tuning
- decisions on questions like compactions after ingestion

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 27, 2018
1 parent 46e4abb commit 2881a8f
Show file tree
Hide file tree
Showing 10 changed files with 762 additions and 98 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/engineccl/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func (i *MVCCIncrementalIterator) UnsafeKey() engine.MVCCKey {
return i.iter.UnsafeKey()
}

// UnsafeValue returns the same value as Value, but the memory is invalidated on
// the next call to {Next,Reset,Close}.
// UnsafeValue returns the same value as a byte slice, but the memory is
// invalidated on the next call to {Next,Reset,Close}.
func (i *MVCCIncrementalIterator) UnsafeValue() []byte {
return i.iter.UnsafeValue()
}
1 change: 1 addition & 0 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ func (m *multiTestContext) changeReplicas(
// is lost. We could make a this into a roachpb.Error but it seems overkill
// for this one usage.
if testutils.IsError(err, "snapshot failed: .*") {
m.t.Logf("snapshot failed with error: %v", err)
continue
}
return 0, err
Expand Down
2 changes: 0 additions & 2 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,6 @@ type Engine interface {
GetStats() (*Stats, error)
// GetAuxiliaryDir returns a path under which files can be stored
// persistently, and from which data can be ingested by the engine.
//
// Not thread safe.
GetAuxiliaryDir() string
// NewBatch returns a new instance of a batched engine which wraps
// this engine. Batched engines accumulate all mutations and apply
Expand Down
32 changes: 31 additions & 1 deletion pkg/storage/engine/in_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,43 @@

package engine

import "github.com/cockroachdb/cockroach/pkg/roachpb"
import (
"context"
"io/ioutil"
"os"

"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// InMem wraps RocksDB and configures it for in-memory only storage.
type InMem struct {
*RocksDB
}

// IngestExternalFiles for an in-memory RocksDB first loads each file into
// memory, then ingests them (again, in memory). This implementation is provided
// solely to make tests work.
func (db InMem) IngestExternalFiles(
ctx context.Context, paths []string, allowFileModifications bool,
) error {
for _, file := range paths {
data, err := ioutil.ReadFile(file)
if err != nil {
if os.IsNotExist(err) {
// The file may already be in the correct in-memory env. Ignore
// the error, it will be caught by IngestExternalFiles if the
// file truly is missing.
continue
}
return err
}
if err := db.RocksDB.WriteFile(file, data); err != nil {
return err
}
}
return db.RocksDB.IngestExternalFiles(ctx, paths, allowFileModifications)
}

// NewInMem allocates and returns a new, opened InMem engine.
// The caller must call the engine's Close method when the engine is no longer
// needed.
Expand Down
238 changes: 164 additions & 74 deletions pkg/storage/raft.pb.go

Large diffs are not rendered by default.

22 changes: 19 additions & 3 deletions pkg/storage/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ message SnapshotRequest {
// combined into a large RocksDB WriteBatch that is atomically
// applied.
KV_BATCH = 0;
// SST snapshots stream sst files consisting of all keys in a range
// from the sender to the receiver. These sst files are then atomically
// ingested directly into RocksDB.
SST = 1;
}

message Header {
Expand Down Expand Up @@ -149,14 +153,26 @@ message SnapshotRequest {

optional Header header = 1;

// A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple request messages.
// A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple
// request messages. Only used by KV_BATCH snapshots.
optional bytes kv_batch = 2 [(gogoproto.customname) = "KVBatch"];

// These are really raftpb.Entry, but we model them as raw bytes to avoid
// roundtripping through memory. They are separate from the kv_batch to
// allow flexibility in log implementations.
// roundtripping through memory. They are separate from the kv_batch to allow
// flexibility in log implementations. Used by KV_BATCH and SST snapshots.
repeated bytes log_entries = 3;

// A chunk of an SST file. Multiple SST chunks may be sent across multiple
// request messages. These chunks will combine to form one or more complete
// SST files, delimited by intermittent sst_final flags. Only used by SST
// snapshots.
optional bytes sst_chunk = 5 [(gogoproto.customname) = "SSTChunk"];

// If set, the SST chunk is the last chunk in the current SST file. Only used
// by SST snapshots.
optional bool sst_final = 6 [(gogoproto.customname) = "SSTFinal", (gogoproto.nullable) = false];

// Indicates that this is the last request in the snapshot stream.
optional bool final = 4 [(gogoproto.nullable) = false];
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/storage/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,33 @@ func (ri *ReplicaDataIterator) advance() {
}
}

// KeyRanges returns all key ranges that the iterator will iterate over.
func (ri *ReplicaDataIterator) KeyRanges() []KeyRange {
return ri.ranges
}

// Index returns the index of the key range that the iterator points to.
func (ri *ReplicaDataIterator) Index() int {
return ri.curIndex
}

// Valid returns true if the iterator currently points to a valid value.
func (ri *ReplicaDataIterator) Valid() (bool, error) {
return ri.it.Valid()
}

// UnsafeKey returns the current key, but the memory is invalidated on the next
// call to {NextKey,Seek}.
func (ri *ReplicaDataIterator) UnsafeKey() engine.MVCCKey {
return ri.it.UnsafeKey()
}

// UnsafeValue returns the same value as a byte slice, but the memory is
// invalidated on the next call to {Next,Reset,Close}.
func (ri *ReplicaDataIterator) UnsafeValue() []byte {
return ri.it.UnsafeValue()
}

// Key returns the current key.
func (ri *ReplicaDataIterator) Key() engine.MVCCKey {
key := ri.it.UnsafeKey()
Expand Down
14 changes: 13 additions & 1 deletion pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,18 @@ func (r *Replica) sendSnapshot(
}
}

// TODO we should introduce a heuristic here for which snapshot strategy
// to use. An easy heuristic would be to use SST snapshots only when a
// range is over a certain size. That is what we have right now. The 8MB
// number was tuned based on the how much it affected the speed of tests.
// Right now this provides a ~10% speedup when running ./pkg/storage tests.
// We should actually tune this for real.
// TODO we need to stick SST snapshots behind a version check.
strategy := SnapshotRequest_KV_BATCH
if r.GetMVCCStats().LiveBytes > (8 << 20) /* 8MB */ {
strategy = SnapshotRequest_SST
}

status := r.RaftStatus()
if status == nil {
return errors.New("raft status not initialized")
Expand All @@ -1898,7 +1910,7 @@ func (r *Replica) sendSnapshot(
// Recipients can choose to decline preemptive snapshots.
CanDecline: snapType == snapTypePreemptive,
Priority: priority,
Strategy: SnapshotRequest_KV_BATCH,
Strategy: strategy,
}
sent := func() {
r.store.metrics.RangeSnapshotsGenerated.Inc(1)
Expand Down
111 changes: 108 additions & 3 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func clearRangeData(
desc *roachpb.RangeDescriptor,
keyCount int64,
eng engine.Engine,
batch engine.Batch,
writer engine.Writer,
) error {
iter := eng.NewIterator(engine.IterOptions{})
defer iter.Close()
Expand All @@ -657,9 +657,9 @@ func clearRangeData(
// above), but the data range's key count needs to be explicitly checked.
var err error
if i >= metadataRanges && keyCount >= clearRangeMinKeys {
err = batch.ClearRange(keyRange.Start, keyRange.End)
err = writer.ClearRange(keyRange.Start, keyRange.End)
} else {
err = batch.ClearIterRange(iter, keyRange.Start, keyRange.End)
err = writer.ClearIterRange(iter, keyRange.Start, keyRange.End)
}
if err != nil {
return err
Expand Down Expand Up @@ -868,6 +868,111 @@ func (r *Replica) applySnapshot(
return err
}
stats.commit = timeutil.Now()
case *sstSnapshotStrategy:
// TODO DURING REVIEW: do we need to worry about RaftTombstoneIncorrectLegacyKey here?
// TODO track stats of ingestion and log them.

unreplicatedSST, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return err
}
defer unreplicatedSST.Close()

// Clear out all existing unreplicated keys in the range.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID)
unreplicatedStart := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey)
unreplicatedEnd := engine.MakeMVCCMetadataKey(unreplicatedPrefixKey.PrefixEnd())
err = unreplicatedSST.ClearRange(unreplicatedStart, unreplicatedEnd)
if err != nil {
return errors.Wrapf(err, "unable to write ClearRange to unreplicated SST writer")
}
raftLogSize = 0

////////

// Note that since this snapshot comes from Raft, we don't have to synthesize
// the HardState -- Raft wouldn't ask us to update the HardState in incorrect
// ways.
hsKey := r.raftMu.stateLoader.RaftHardStateKey()
var hsValue roachpb.Value
if err := hsValue.SetProto(&hs); err != nil {
return err
}
hsValue.InitChecksum(hsKey)
err = engine.MVCCBlindPut(ctx, &unreplicatedSST, nil, hsKey, hlc.Timestamp{}, hsValue, nil)
if err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}

///////

if len(ss.LogEntries) > 0 {
logEntries := make([]raftpb.Entry, len(ss.LogEntries))
for i, bytes := range ss.LogEntries {
if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil {
return err
}
}
// If this replica doesn't know its ReplicaID yet, we're applying a
// preemptive snapshot. In this case, we're going to have to write the
// sideloaded proposals into the Raft log. Otherwise, sideload.
thinEntries := logEntries
if replicaID != 0 {
var err error
var sideloadedEntriesSize int64
thinEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)
if err != nil {
return err
}
raftLogSize += sideloadedEntriesSize
}
var logDiff enginepb.MVCCStats
var logValue roachpb.Value
for i := range thinEntries {
ent := &thinEntries[i]
entKey := r.raftMu.stateLoader.RaftLogKey(ent.Index)

if err := logValue.SetProto(ent); err != nil {
return err
}
logValue.InitChecksum(entKey)
err = engine.MVCCBlindPut(ctx, &unreplicatedSST, &logDiff, entKey, hlc.Timestamp{},
logValue, nil /* txn */)
if err != nil {
return errors.Wrapf(err, "unable to write log entry to unreplicated SST writer")
}
}
raftLogSize += logDiff.SysBytes
lastTerm = thinEntries[len(thinEntries)-1].Term
} else {
lastTerm = invalidLastTerm
}

// Create a new file to write the SST into.
unreplicatedSSTFile, err := ss.createEmptySSTFile(r.RangeID)
if err != nil {
return errors.Wrapf(err, "unable to open empty SST file")
}
data, err := unreplicatedSST.Finish()
if err != nil {
unreplicatedSSTFile.Close()
return errors.Wrapf(err, "unable to Finish sst file writer")
}
_, err = unreplicatedSSTFile.Write(data)
unreplicatedSSTFile.Close()
if err != nil {
return errors.Wrapf(err, "unable to write SST data to file")
}

// Ingest all SSTs atomically.
if err := r.store.engine.IngestExternalFiles(ctx, ss.SSTs, true /* modify */); err != nil {
return errors.Wrapf(err, "while ingesting %s", ss.SSTs)
}

// TODO should we force a compaction? Only in the global keyspace? The
// SSTs will probably be living in L0 after the ingestion, which isn't
// great, especially because they each contain a large RangeDeletion
// tombstone.
default:
log.Fatalf(ctx, "unknown snapshot strategy: %v", ss)
}
Expand Down
Loading

0 comments on commit 2881a8f

Please sign in to comment.