Skip to content

Commit

Permalink
storage: include snapshotStrategy directly in IncomingSnapshot
Browse files Browse the repository at this point in the history
Instead of decomposing the results of a received snapshot and inserting
each separate piece of state into an `IncomingSnapshot`, we now store
this state directly on each `snapshotStrategy`. The `snapshotStrategy`
is then attached to a `IncomingSnapshot`. This makes more sense, because
the state is specific to the snapshotStrategy, and different strategies
will receive different types of data in different representations.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 27, 2018
1 parent 8a94f29 commit 46e4abb
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 151 deletions.
285 changes: 144 additions & 141 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,8 @@ func (s *OutgoingSnapshot) Close() {
// IncomingSnapshot contains the data for an incoming streaming snapshot message.
type IncomingSnapshot struct {
SnapUUID uuid.UUID
// The RocksDB BatchReprs that make up this snapshot.
Batches [][]byte
// The Raft log entries for this snapshot.
LogEntries [][]byte
// The snapshotStrategy that received this snapshot.
Receiver snapshotStrategy
// The replica state at the time the snapshot was generated (never nil).
State *storagebase.ReplicaState
snapType string
Expand Down Expand Up @@ -712,162 +710,167 @@ func (r *Replica) applySnapshot(
log.Fatalf(ctx, "found empty HardState for non-empty Snapshot %+v", snap)
}

var stats struct {
clear time.Time
batch time.Time
entries time.Time
commit time.Time
}

var size int
for _, b := range inSnap.Batches {
size += len(b)
}
for _, e := range inSnap.LogEntries {
size += len(e)
}

log.Infof(ctx, "applying %s snapshot at index %d "+
"(id=%s, encoded size=%d, %d rocksdb batches, %d log entries)",
snapType, snap.Metadata.Index, inSnap.SnapUUID.Short(),
size, len(inSnap.Batches), len(inSnap.LogEntries))
defer func(start time.Time) {
now := timeutil.Now()
log.Infof(ctx, "applied %s snapshot in %0.0fms [clear=%0.0fms batch=%0.0fms entries=%0.0fms commit=%0.0fms]",
snapType, now.Sub(start).Seconds()*1000,
stats.clear.Sub(start).Seconds()*1000,
stats.batch.Sub(stats.clear).Seconds()*1000,
stats.entries.Sub(stats.batch).Seconds()*1000,
stats.commit.Sub(stats.entries).Seconds()*1000)
}(timeutil.Now())

// Use a more efficient write-only batch because we don't need to do any
// reads from the batch.
batch := r.store.Engine().NewWriteOnlyBatch()
defer batch.Close()

// Before clearing out the range, grab any existing legacy Raft tombstone
// since we have to write it back later.
var existingLegacyTombstone *roachpb.RaftTombstone
{
legacyTombstoneKey := keys.RaftTombstoneIncorrectLegacyKey(r.RangeID)
var tomb roachpb.RaftTombstone
// Intentionally read from the engine to avoid the write-only batch (this is
// allowed since raftMu is held).
ok, err := engine.MVCCGetProto(
ctx, r.store.Engine(), legacyTombstoneKey, hlc.Timestamp{}, true /* consistent */, nil, /* txn */
&tomb)
if err != nil {
return err
var lastTerm uint64
var raftLogSize int64
switch ss := inSnap.Receiver.(type) {
case *kvBatchSnapshotStrategy:
var stats struct {
clear time.Time
batch time.Time
entries time.Time
commit time.Time
}
if ok {
existingLegacyTombstone = &tomb

var size int
for _, b := range ss.Batches {
size += len(b)
}
for _, e := range ss.LogEntries {
size += len(e)
}
}

// Delete everything in the range and recreate it from the snapshot.
// We need to delete any old Raft log entries here because any log entries
// that predate the snapshot will be orphaned and never truncated or GC'd.
if err := clearRangeData(ctx, s.Desc, keyCount, r.store.Engine(), batch); err != nil {
return err
}
stats.clear = timeutil.Now()
log.Infof(ctx, "applying %s snapshot at index %d "+
"(id=%s, encoded size=%d, %d rocksdb batches, %d log entries)",
snapType, snap.Metadata.Index, inSnap.SnapUUID.Short(),
size, len(ss.Batches), len(ss.LogEntries))
defer func(start time.Time) {
now := timeutil.Now()
log.Infof(ctx, "applied %s snapshot in %0.0fms [clear=%0.0fms batch=%0.0fms entries=%0.0fms commit=%0.0fms]",
snapType, now.Sub(start).Seconds()*1000,
stats.clear.Sub(start).Seconds()*1000,
stats.batch.Sub(stats.clear).Seconds()*1000,
stats.entries.Sub(stats.batch).Seconds()*1000,
stats.commit.Sub(stats.entries).Seconds()*1000)
}(timeutil.Now())

// Use a more efficient write-only batch because we don't need to do any
// reads from the batch.
batch := r.store.Engine().NewWriteOnlyBatch()
defer batch.Close()

// Before clearing out the range, grab any existing legacy Raft tombstone
// since we have to write it back later.
var existingLegacyTombstone *roachpb.RaftTombstone
{
legacyTombstoneKey := keys.RaftTombstoneIncorrectLegacyKey(r.RangeID)
var tomb roachpb.RaftTombstone
// Intentionally read from the engine to avoid the write-only batch (this is
// allowed since raftMu is held).
ok, err := engine.MVCCGetProto(
ctx, r.store.Engine(), legacyTombstoneKey, hlc.Timestamp{}, true /* consistent */, nil, /* txn */
&tomb)
if err != nil {
return err
}
if ok {
existingLegacyTombstone = &tomb
}
}

// Write the snapshot into the range.
for _, batchRepr := range inSnap.Batches {
if err := batch.ApplyBatchRepr(batchRepr, false); err != nil {
// Delete everything in the range and recreate it from the snapshot.
// We need to delete any old Raft log entries here because any log entries
// that predate the snapshot will be orphaned and never truncated or GC'd.
if err := clearRangeData(ctx, s.Desc, keyCount, r.store.Engine(), batch); err != nil {
return err
}
}
stats.clear = timeutil.Now()

// Nodes before v2.0 may send an incorrect Raft tombstone (see #12154) that
// was supposed to be unreplicated. Simply remove it, but note how we go
// through some trouble to ensure that our potential own tombstone survives
// the snapshot application: we can't have a preemptive snapshot (which may
// not be followed by a corresponding replication change) wipe out our Raft
// tombstone.
//
// NB: this can be removed in v2.1. This is because when we are running a
// binary at v2.1, we know that peers are at least running v2.0, which will
// never send out these snapshots.
if err := clearLegacyTombstone(batch, r.RangeID); err != nil {
return errors.Wrap(err, "while clearing legacy tombstone key")
}

// If before this snapshot there was a legacy tombstone, it was removed and
// we must issue a replacement. Note that we *could* check the cluster version
// and write a new-style tombstone here, but that is more complicated as we'd
// need to incorporate any existing new-style tombstone in the update. It's
// more straightforward to propagate the legacy tombstone; there's no
// requirement that snapshots participate in the migration.
if existingLegacyTombstone != nil {
err := engine.MVCCPutProto(
ctx, batch, nil /* ms */, keys.RaftTombstoneIncorrectLegacyKey(r.RangeID), hlc.Timestamp{}, nil /* txn */, existingLegacyTombstone,
)
if err != nil {
return err
// Write the snapshot into the range.
for _, batchRepr := range ss.Batches {
if err := batch.ApplyBatchRepr(batchRepr, false); err != nil {
return err
}
}
}

// The log entries are all written to distinct keys so we can use a
// distinct batch.
distinctBatch := batch.Distinct()
stats.batch = timeutil.Now()
// Nodes before v2.0 may send an incorrect Raft tombstone (see #12154) that
// was supposed to be unreplicated. Simply remove it, but note how we go
// through some trouble to ensure that our potential own tombstone survives
// the snapshot application: we can't have a preemptive snapshot (which may
// not be followed by a corresponding replication change) wipe out our Raft
// tombstone.
//
// NB: this can be removed in v2.1. This is because when we are running a
// binary at v2.1, we know that peers are at least running v2.0, which will
// never send out these snapshots.
if err := clearLegacyTombstone(batch, r.RangeID); err != nil {
return errors.Wrap(err, "while clearing legacy tombstone key")
}

logEntries := make([]raftpb.Entry, len(inSnap.LogEntries))
for i, bytes := range inSnap.LogEntries {
if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil {
return err
// If before this snapshot there was a legacy tombstone, it was removed and
// we must issue a replacement. Note that we *could* check the cluster version
// and write a new-style tombstone here, but that is more complicated as we'd
// need to incorporate any existing new-style tombstone in the update. It's
// more straightforward to propagate the legacy tombstone; there's no
// requirement that snapshots participate in the migration.
if existingLegacyTombstone != nil {
err := engine.MVCCPutProto(
ctx, batch, nil /* ms */, keys.RaftTombstoneIncorrectLegacyKey(r.RangeID), hlc.Timestamp{}, nil /* txn */, existingLegacyTombstone,
)
if err != nil {
return err
}
}
}
// If this replica doesn't know its ReplicaID yet, we're applying a
// preemptive snapshot. In this case, we're going to have to write the
// sideloaded proposals into the Raft log. Otherwise, sideload.
var raftLogSize int64
thinEntries := logEntries
if replicaID != 0 {
var err error
var sideloadedEntriesSize int64
thinEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)

// The log entries are all written to distinct keys so we can use a
// distinct batch.
distinctBatch := batch.Distinct()
stats.batch = timeutil.Now()

logEntries := make([]raftpb.Entry, len(ss.LogEntries))
for i, bytes := range ss.LogEntries {
if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil {
return err
}
}
// If this replica doesn't know its ReplicaID yet, we're applying a
// preemptive snapshot. In this case, we're going to have to write the
// sideloaded proposals into the Raft log. Otherwise, sideload.
thinEntries := logEntries
if replicaID != 0 {
var err error
var sideloadedEntriesSize int64
thinEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)
if err != nil {
return err
}
raftLogSize += sideloadedEntriesSize
}

// Write the snapshot's Raft log into the range.
_, lastTerm, raftLogSize, err = r.append(
ctx, distinctBatch, 0, invalidLastTerm, raftLogSize, thinEntries,
)
if err != nil {
return err
}
raftLogSize += sideloadedEntriesSize
}
stats.entries = timeutil.Now()

// Write the snapshot's Raft log into the range.
var lastTerm uint64
_, lastTerm, raftLogSize, err = r.append(
ctx, distinctBatch, 0, invalidLastTerm, raftLogSize, thinEntries,
)
if err != nil {
return err
}
stats.entries = timeutil.Now()

// Note that since this snapshot comes from Raft, we don't have to synthesize
// the HardState -- Raft wouldn't ask us to update the HardState in incorrect
// ways.
if err := r.raftMu.stateLoader.SetHardState(ctx, distinctBatch, hs); err != nil {
return errors.Wrapf(err, "unable to persist HardState %+v", &hs)
}
// Note that since this snapshot comes from Raft, we don't have to synthesize
// the HardState -- Raft wouldn't ask us to update the HardState in incorrect
// ways.
if err := r.raftMu.stateLoader.SetHardState(ctx, distinctBatch, hs); err != nil {
return errors.Wrapf(err, "unable to persist HardState %+v", &hs)
}

// We need to close the distinct batch and start using the normal batch for
// the read below.
distinctBatch.Close()
// We need to close the distinct batch and start using the normal batch for
// the read below.
distinctBatch.Close()

// As outlined above, last and applied index are the same after applying
// the snapshot (i.e. the snapshot has no uncommitted tail).
if s.RaftAppliedIndex != snap.Metadata.Index {
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
s.RaftAppliedIndex, snap.Metadata.Index)
}
// As outlined above, last and applied index are the same after applying
// the snapshot (i.e. the snapshot has no uncommitted tail).
if s.RaftAppliedIndex != snap.Metadata.Index {
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
s.RaftAppliedIndex, snap.Metadata.Index)
}

// We've written Raft log entries, so we need to sync the WAL.
if err := batch.Commit(syncRaftLog.Get(&r.store.cfg.Settings.SV)); err != nil {
return err
// We've written Raft log entries, so we need to sync the WAL.
if err := batch.Commit(syncRaftLog.Get(&r.store.cfg.Settings.SV)); err != nil {
return err
}
stats.commit = timeutil.Now()
default:
log.Fatalf(ctx, "unknown snapshot strategy: %v", ss)
}
stats.commit = timeutil.Now()

r.mu.Lock()
// We set the persisted last index to the last applied index. This is
Expand Down
24 changes: 14 additions & 10 deletions pkg/storage/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func assertStrategy(
type kvBatchSnapshotStrategy struct {
status string

// Fields used when receiving snapshots.
Batches [][]byte // RocksDB BatchReprs
LogEntries [][]byte // Raft log entries

// Fields used when sending snapshots.
batchSize int64
limiter *rate.Limiter
Expand All @@ -112,8 +116,8 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
) (IncomingSnapshot, error) {
assertStrategy(ctx, header, SnapshotRequest_KV_BATCH)

var batches [][]byte
var logEntries [][]byte
kvSS.Batches = nil
kvSS.LogEntries = nil
for {
req, err := stream.Recv()
if err != nil {
Expand All @@ -125,10 +129,10 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
}

if req.KVBatch != nil {
batches = append(batches, req.KVBatch)
kvSS.Batches = append(kvSS.Batches, req.KVBatch)
}
if req.LogEntries != nil {
logEntries = append(logEntries, req.LogEntries...)
kvSS.LogEntries = append(kvSS.LogEntries, req.LogEntries...)
}
if req.Final {
snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data)
Expand All @@ -138,16 +142,16 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
}

inSnap := IncomingSnapshot{
SnapUUID: snapUUID,
Batches: batches,
LogEntries: logEntries,
State: &header.State,
snapType: snapTypeRaft,
SnapUUID: snapUUID,
Receiver: kvSS,
State: &header.State,
snapType: snapTypeRaft,
}
if header.RaftMessageRequest.ToReplica.ReplicaID == 0 {
inSnap.snapType = snapTypePreemptive
}
kvSS.status = fmt.Sprintf("kv batches: %d, log entries: %d", len(batches), len(logEntries))
kvSS.status = fmt.Sprintf("kv batches: %d, log entries: %d",
len(kvSS.Batches), len(kvSS.LogEntries))
return inSnap, nil
}
}
Expand Down

0 comments on commit 46e4abb

Please sign in to comment.