Skip to content

Commit

Permalink
kv: integrate raft async storage writes
Browse files Browse the repository at this point in the history
Fixes cockroachdb#17500.
Waiting on github.com/cockroachdb/pebble/pull/2117.

This commit integrates with the `AsyncStorageWrites` functionality that
we added to Raft in github.com/etcd-io/raft/pull/8.

\## Approach

The commit makes the minimal changes needed to integrate with async
storage writes and pull fsyncs out of the raft state machine loop. It
does not make an effort to extract the non-durable portion of raft log
writes or raft log application onto separate goroutine pools, as was
described in cockroachdb#17500. Those changes will also be impactful, but they're
non trivial and bump into a pipelining vs. batching trade-off, so they
are left as future work items (TODO(nvanbenschoten): open new issues).

With this change, asynchronous Raft log syncs are enabled by the new
`DB.ApplyNoSyncWait` Pebble API introduced in github.com/cockroachdb/pebble/pull/2117.
The `handleRaftReady` state machine loop continues to initiate Raft log
writes, but it uses the Pebble API to offload waiting on durability to a
separate goroutine. This separate goroutine then sends the corresponding
`MsgStorageAppend`'s response messages where they need to go (locally
and/or to the Raft leader) when the fsync completes. The async storage
writes functionality in Raft makes this all safe.

\## Benchmark Results

The result of this change is reduced interference between Raft
proposals. As a result, it reduces end-to-end commit latency.

github.com/etcd-io/raft/pull/8 presented a collection of benchmark
results captured from integrating async storage writes with rafttoy.

When integrated into CockroachDB, we see similar improvements to average
and tail latency. However, it doesn't provide the throughput
improvements at the top end because log appends and state machine
application have not yet been extracted into separate goroutine pools,
which would facilitate increased opportunity for batching.

TODO: add images

----

Release note (performance improvement): The Raft proposal pipeline
has been optimized to reduce interference between Raft proposals.
This improves average and tail write latency at high concurrency.
  • Loading branch information
nvanbenschoten committed Jan 31, 2023
1 parent 2fd74fc commit a1b24ca
Show file tree
Hide file tree
Showing 11 changed files with 702 additions and 362 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/logstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"sideload.go",
"sideload_disk.go",
"stateloader.go",
"sync_waiter.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore",
visibility = ["//visibility:public"],
Expand All @@ -28,9 +29,11 @@ go_library(
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_pebble//record",
"@com_github_cockroachdb_redact//:redact",
"@io_etcd_go_raft_v3//:raft",
"@io_etcd_go_raft_v3//raftpb",
Expand All @@ -43,6 +46,7 @@ go_test(
srcs = [
"logstore_bench_test.go",
"sideload_test.go",
"sync_waiter_test.go",
],
args = ["-test.timeout=295s"],
embed = [":logstore"],
Expand All @@ -60,6 +64,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
Expand Down
167 changes: 124 additions & 43 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package logstore

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -43,25 +44,24 @@ var disableSyncRaftLog = settings.RegisterBoolSetting(
envutil.EnvOrDefaultBool("COCKROACH_DISABLE_RAFT_LOG_SYNCHRONIZATION_UNSAFE", false),
)

// Ready contains the log entries and state to be saved to stable storage. This
// is a subset of raft.Ready relevant to log storage. All fields are read-only.
type Ready struct {
// The current state of a replica to be saved to stable storage. Empty if
// there is no update.
raftpb.HardState
var enableNonBlockingRaftLogSync = settings.RegisterBoolSetting(
settings.TenantWritable,
"kv.raft_log.non_blocking_synchronization.enabled",
"set to true to enable non-blocking synchronization on Raft log writes to "+
"persistent storage. Setting to true does not risk data loss or data corruption "+
"on server crashes, but can reduce write latency.",
envutil.EnvOrDefaultBool("COCKROACH_ENABLE_RAFT_LOG_NON_BLOCKING_SYNCHRONIZATION", true),
)

// Entries specifies entries to be saved to stable storage. Empty if there is
// no update.
Entries []raftpb.Entry
// MsgStorageAppend is a raftpb.Message with type MsgStorageAppend.
type MsgStorageAppend raftpb.Message

// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk, or if an asynchronous write is permissible.
MustSync bool
}

// MakeReady constructs a Ready struct from raft.Ready.
func MakeReady(from raft.Ready) Ready {
return Ready{HardState: from.HardState, Entries: from.Entries, MustSync: from.MustSync}
// MakeMsgStorageAppend constructs a MsgStorageAppend from a raftpb.Message.
func MakeMsgStorageAppend(m raftpb.Message) MsgStorageAppend {
if m.Type != raftpb.MsgStorageAppend {
panic(fmt.Sprintf("unexpected message type %s", m.Type))
}
return MsgStorageAppend(m)
}

// RaftState stores information about the last entry and the size of the log.
Expand All @@ -86,6 +86,8 @@ type AppendStats struct {
PebbleBytes int64

Sync bool
// If true, PebbleEnd-PebbleBegin does not include the sync time.
NonBlocking bool
}

// Metrics contains metrics specific to the log storage.
Expand All @@ -99,37 +101,70 @@ type LogStore struct {
Engine storage.Engine
Sideload SideloadStorage
StateLoader StateLoader
SyncWaiter *SyncWaiterLoop
EntryCache *raftentry.Cache
Settings *cluster.Settings
Metrics Metrics
}

// SyncCallback is a callback that is notified when a raft log write has been
// durably committed to disk. The function is handed the response messages that
// are associated with the MsgStorageAppend that triggered the fsync.
type SyncCallback interface {
OnLogSync(context.Context, []raftpb.Message)
}

func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
// Use an unindexed batch because we don't need to read our writes, and
// it is more efficient.
return eng.NewUnindexedBatch(false /* writeOnly */)
}

// StoreEntries persists newly appended Raft log Entries to the log storage.
// StoreEntries persists newly appended Raft log Entries to the log storage,
// then calls the provided callback with the input's response messages (if any)
// once the entries are durable. The durable log write may or may not be
// blocking (and therefore the callback may or may not be called synchronously),
// depending on the kv.raft_log.non_blocking_synchronization.enabled cluster
// setting. Either way, the effects of the log append will be immediately
// visible to readers of the Engine.
//
// Accepts the state of the log before the operation, returns the state after.
// Persists HardState atomically with, or strictly after Entries.
func (s *LogStore) StoreEntries(
ctx context.Context, state RaftState, rd Ready, stats *AppendStats,
ctx context.Context, state RaftState, m MsgStorageAppend, cb SyncCallback, stats *AppendStats,
) (RaftState, error) {
batch := newStoreEntriesBatch(s.Engine)
defer batch.Close()
return s.storeEntriesAndCommitBatch(ctx, state, rd, stats, batch)
return s.storeEntriesAndCommitBatch(ctx, state, m, cb, stats, batch)
}

// storeEntriesAndCommitBatch is like StoreEntries, but it accepts a
// storage.Batch, which it takes responsibility for committing and closing.
func (s *LogStore) storeEntriesAndCommitBatch(
ctx context.Context, state RaftState, rd Ready, stats *AppendStats, batch storage.Batch,
ctx context.Context,
state RaftState,
m MsgStorageAppend,
cb SyncCallback,
stats *AppendStats,
batch storage.Batch,
) (RaftState, error) {
// Before returning, Close the batch if we haven't handed ownership of it to a
// SyncWaiterLoop. If batch == nil, SyncWaiterLoop is responsible for closing
// it once the in-progress disk writes complete.
defer func() {
if batch != nil {
defer batch.Close()
}
}()

prevLastIndex := state.LastIndex
if len(rd.Entries) > 0 {
overwriting := false
if len(m.Entries) > 0 {
firstPurge := m.Entries[0].Index // first new entry written
overwriting = firstPurge <= prevLastIndex
stats.Begin = timeutil.Now()
// All of the entries are appended to distinct keys, returning a new
// last index.
thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := MaybeSideloadEntries(ctx, rd.Entries, s.Sideload)
thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := MaybeSideloadEntries(ctx, m.Entries, s.Sideload)
if err != nil {
const expl = "during sideloading"
return RaftState{}, errors.Wrap(err, expl)
Expand All @@ -148,16 +183,21 @@ func (s *LogStore) storeEntriesAndCommitBatch(
stats.End = timeutil.Now()
}

if !raft.IsEmptyHardState(rd.HardState) {
hs := raftpb.HardState{
Term: m.Term,
Vote: m.Vote,
Commit: m.Commit,
}
if !raft.IsEmptyHardState(hs) {
// NB: Note that without additional safeguards, it's incorrect to write
// the HardState before appending rd.Entries. When catching up, a follower
// the HardState before appending m.Entries. When catching up, a follower
// will receive Entries that are immediately Committed in the same
// Ready. If we persist the HardState but happen to lose the Entries,
// assertions can be tripped.
//
// We have both in the same batch, so there's no problem. If that ever
// changes, we must write and sync the Entries before the HardState.
if err := s.StateLoader.SetHardState(ctx, batch, rd.HardState); err != nil {
if err := s.StateLoader.SetHardState(ctx, batch, hs); err != nil {
const expl = "during setHardState"
return RaftState{}, errors.Wrap(err, expl)
}
Expand All @@ -167,9 +207,9 @@ func (s *LogStore) storeEntriesAndCommitBatch(
//
// Note that the data is visible to other goroutines before it is synced to
// disk. This is fine. The important constraints are that these syncs happen
// before Raft messages are sent and before the call to RawNode.Advance. Our
// regular locking is sufficient for this and if other goroutines can see the
// data early, that's fine. In particular, snapshots are not a problem (I
// before the MsgStorageAppend's responses are delivered back to the RawNode.
// Our regular locking is sufficient for this and if other goroutines can see
// the data early, that's fine. In particular, snapshots are not a problem (I
// think they're the only thing that might access log entries or HardState
// from other goroutines). Snapshots do not include either the HardState or
// uncommitted log entries, and even if they did include log entries that
Expand All @@ -182,25 +222,57 @@ func (s *LogStore) storeEntriesAndCommitBatch(
// (Replica), so this comment might need to move.
stats.PebbleBegin = timeutil.Now()
stats.PebbleBytes = int64(batch.Len())
sync := rd.MustSync && !disableSyncRaftLog.Get(&s.Settings.SV)
if err := batch.Commit(sync); err != nil {
const expl = "while committing batch"
return RaftState{}, errors.Wrap(err, expl)
}
stats.Sync = sync
stats.PebbleEnd = timeutil.Now()
if rd.MustSync {
s.Metrics.RaftLogCommitLatency.RecordValue(stats.PebbleEnd.Sub(stats.PebbleBegin).Nanoseconds())
wantsSync := len(m.Responses) > 0
willSync := wantsSync && !disableSyncRaftLog.Get(&s.Settings.SV)
// Use the non-blocking log sync path if we are performing a log sync, the
// cluster setting to do so is enabled, and we are not overwriting any
// previous log entries. If we are overwriting, we may need to purge the
// sideloaded SSTables associated with overwritten entries. This must be
// performed after the corresponding entries are durably replaced and it's
// easier to do ensure proper ordering using a blocking log sync. This is a
// rare case, so it's not worth optimizing for.
nonBlockingSync := willSync && enableNonBlockingRaftLogSync.Get(&s.Settings.SV) && !overwriting
if nonBlockingSync {
// If non-blocking synchronization is enabled, apply the batched updates to
// the engine and initiate a synchronous disk write, but don't wait for the
// write to complete. Instead, enqueue that waiting on the SyncWaiterLoop,
// who will signal the callback when the write completes.
if err := batch.CommitNoSyncWait(); err != nil {
const expl = "while committing batch without sync wait"
return RaftState{}, errors.Wrap(err, expl)
}
stats.PebbleEnd = timeutil.Now()
s.SyncWaiter.enqueue(ctx, batch, func() {
// NOTE: run on the SyncWaiterLoop goroutine.
logCommitEnd := timeutil.Now()
s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds())
cb.OnLogSync(ctx, m.Responses)
})
// Do not Close batch on return. Will be Closed by SyncWaiterLoop.
batch = nil
} else {
if err := batch.Commit(willSync); err != nil {
const expl = "while committing batch"
return RaftState{}, errors.Wrap(err, expl)
}
stats.PebbleEnd = timeutil.Now()
if wantsSync {
logCommitEnd := stats.PebbleEnd
s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds())
cb.OnLogSync(ctx, m.Responses)
}
}
stats.Sync = wantsSync
stats.NonBlocking = nonBlockingSync

if len(rd.Entries) > 0 {
if overwriting {
// We may have just overwritten parts of the log which contain
// sideloaded SSTables from a previous term (and perhaps discarded some
// entries that we didn't overwrite). Remove any such leftover on-disk
// payloads (we can do that now because we've committed the deletion
// just above).
firstPurge := rd.Entries[0].Index // first new entry written
purgeTerm := rd.Entries[0].Term - 1
firstPurge := m.Entries[0].Index // first new entry written
purgeTerm := m.Entries[0].Term - 1
lastPurge := prevLastIndex // old end of the log, include in deletion
purgedSize, err := maybePurgeSideloaded(ctx, s.Sideload, firstPurge, lastPurge, purgeTerm)
if err != nil {
Expand All @@ -216,7 +288,16 @@ func (s *LogStore) storeEntriesAndCommitBatch(

// Update raft log entry cache. We clear any older, uncommitted log entries
// and cache the latest ones.
s.EntryCache.Add(s.RangeID, rd.Entries, true /* truncate */)
//
// In the blocking log sync case, these entries are already durable. In the
// non-blocking case, these entries have been written to the pebble engine (so
// reads of the engine will see them), but they are not yet be durable. This
// means that the entry cache can lead the durable log. This is allowed by
// etcd/raft, which maintains its own tracking of entry durability by
// splitting its log into an unstable portion for entries that are not known
// to be durable and a stable portion for entries that are known to be
// durable.
s.EntryCache.Add(s.RangeID, m.Entries, true /* truncate */)

return state, nil
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/kv/kvserver/logstore/logstore_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (b *discardBatch) Commit(bool) error {
return nil
}

type noopSyncCallback struct{}

func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message) {}

func BenchmarkLogStore_StoreEntries(b *testing.B) {
defer log.Scope(b).Close(b)
const kb = 1 << 10
Expand All @@ -48,23 +52,25 @@ func BenchmarkLogStore_StoreEntries(b *testing.B) {
}

func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) {
ctx := context.Background()
const tenMB = 10 * 1 << 20
ec := raftentry.NewCache(tenMB)
const rangeID = 1
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
st := cluster.MakeTestingClusterSettings()
enableNonBlockingRaftLogSync.Override(ctx, &st.SV, false)
s := LogStore{
RangeID: rangeID,
Engine: eng,
StateLoader: NewStateLoader(rangeID),
EntryCache: ec,
Settings: cluster.MakeTestingClusterSettings(),
Settings: st,
Metrics: Metrics{
RaftLogCommitLatency: metric.NewHistogram(metric.Metadata{}, 10*time.Second, metric.IOLatencyBuckets),
},
}

ctx := context.Background()
rs := RaftState{
LastTerm: 1,
ByteSize: 0,
Expand All @@ -89,17 +95,13 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) {
batch := &discardBatch{}
for i := 0; i < b.N; i++ {
batch.Batch = newStoreEntriesBatch(eng)
rd := Ready{
HardState: raftpb.HardState{},
Entries: ents,
MustSync: true,
}
m := MsgStorageAppend{Entries: ents}
cb := noopSyncCallback{}
var err error
rs, err = s.storeEntriesAndCommitBatch(ctx, rs, rd, stats, batch)
rs, err = s.storeEntriesAndCommitBatch(ctx, rs, m, cb, stats, batch)
if err != nil {
b.Fatal(err)
}
batch.Batch.Close()
ents[0].Index++
}
require.EqualValues(b, b.N, rs.LastIndex)
Expand Down
Loading

0 comments on commit a1b24ca

Please sign in to comment.