From 78c132e6bc812c9ec24aaf69edada535a6f9b4a0 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 29 Aug 2022 14:19:35 -0400 Subject: [PATCH] [DNM] kv: prototype async Raft log writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See #17500. This commit polishes and flushes a prototype I've had lying around that demonstrates the async Raft log appends component of #17500. I'm not actively planning to productionize this, but it sounds like we may work on this project in v23.1, so this prototype might help. It also demonstrates the kind of performance wins we can expect to see on write-heavy workloads. To this point, we had only [demonstrated the potential speedup](https://github.com/cockroachdb/cockroach/issues/17500#issuecomment-757344603) in a simulated environment with [rafttoy](https://github.com/nvanbenschoten/rafttoy). Half of the change here is to `etcd/raft` itself, which needs to be adapted to support asynchronous log writes. These changes are presented in https://github.com/nvanbenschoten/etcd/commit/1d1fa32ec22715f6e21a6d6486f865356c3eb4fc. The other half of the change is extracting a Raft log writer component that handles the process of asynchronously appending to a collection of Raft logs and notifying individual replicas about the eventual durability of these writes. This component is pretty basic and should probably be entirely rewritten, but it gets the job done for the prototype. The Raft log writer reveals an interesting dynamic where concurrency at this level actually hurts performance because it leads to concurrent calls to sync Pebble's WAL, which is less performant than having a single caller due to the fact that Pebble only exposes a synchronous Sync API and coalesces all Sync requests on to a single thread. An async Pebble Sync API would be valuable here. See the comment in NewWriter for more details. \### Benchmarks ``` name old ops/s new ops/s delta kv0/enc=false/nodes=3/cpu=32 36.4k ± 5% 46.5k ± 5% +27.64% (p=0.000 n=10+10) name old avg(ms) new avg(ms) delta kv0/enc=false/nodes=3/cpu=32 5.26 ± 3% 4.14 ± 6% -21.33% (p=0.000 n=8+10) name old p99(ms) new p99(ms) delta kv0/enc=false/nodes=3/cpu=32 10.9 ± 8% 9.1 ±10% -15.94% (p=0.000 n=10+10) ``` These are compelling results. I haven't pushed on this enough to know whether there's actually a throughput win here, or whether the fixed concurrency and reduced average latency is just making it look like there is. `kv0bench` should help answer that question. --- DEPS.bzl | 6 +- build/bazelutil/distdir_files.bzl | 2 +- go.mod | 6 +- go.sum | 4 +- pkg/BUILD.bazel | 2 + pkg/kv/kvserver/BUILD.bazel | 2 + pkg/kv/kvserver/raftlog/BUILD.bazel | 23 ++ pkg/kv/kvserver/raftlog/writer.go | 466 ++++++++++++++++++++++ pkg/kv/kvserver/replica.go | 3 + pkg/kv/kvserver/replica_destroy.go | 4 + pkg/kv/kvserver/replica_init.go | 1 + pkg/kv/kvserver/replica_raft.go | 120 +----- pkg/kv/kvserver/replica_raftlog_writer.go | 55 +++ pkg/kv/kvserver/replica_raftstorage.go | 78 ---- pkg/kv/kvserver/store.go | 5 + pkg/kv/kvserver/store_raft.go | 2 + vendor | 2 +- 17 files changed, 586 insertions(+), 195 deletions(-) create mode 100644 pkg/kv/kvserver/raftlog/BUILD.bazel create mode 100644 pkg/kv/kvserver/raftlog/writer.go create mode 100644 pkg/kv/kvserver/replica_raftlog_writer.go diff --git a/DEPS.bzl b/DEPS.bzl index d9b2cb31bd5a..cf50f6afb5fd 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -8747,10 +8747,10 @@ def go_deps(): ], build_file_proto_mode = "default", importpath = "go.etcd.io/etcd/raft/v3", - sha256 = "62faedd81e10061a4e0d7476865a62b84121ea462514afeaa1b9d66cc53b5a4b", - strip_prefix = "go.etcd.io/etcd/raft/v3@v3.0.0-20210320072418-e51c697ec6e8", + sha256 = "eb8a7668eab08f0a7845ba34c7de6b00e7caf83eb26b67dd79366c8302c38b50", + strip_prefix = "github.com/nvanbenschoten/etcd/raft/v3@v3.0.0-20220826205122-68e4f4f51143", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/raft/v3/io_etcd_go_etcd_raft_v3-v3.0.0-20210320072418-e51c697ec6e8.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nvanbenschoten/etcd/raft/v3/com_github_nvanbenschoten_etcd_raft_v3-v3.0.0-20220826205122-68e4f4f51143.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index a4173cded00e..a0c29495a60e 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -643,6 +643,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nightlyone/lockfile/com_github_nightlyone_lockfile-v1.0.0.zip": "0abd22d55b704c18426167732414806b2a70d99bce65fa9f943cb88c185689ad", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nishanths/predeclared/com_github_nishanths_predeclared-v0.0.0-20200524104333-86fad755b4d3.zip": "f3a40ab7d3e0570570e7bc41a6cc7b08b3e23df5ef5f08553ef622a3752d6e03", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nkovacs/streamquote/com_github_nkovacs_streamquote-v0.0.0-20170412213628-49af9bddb229.zip": "679a789b4b1409ea81054cb12e5f8441199f5fb17d4a2d3510c51f3aa5f3f0cc", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nvanbenschoten/etcd/raft/v3/com_github_nvanbenschoten_etcd_raft_v3-v3.0.0-20220826205122-68e4f4f51143.zip": "eb8a7668eab08f0a7845ba34c7de6b00e7caf83eb26b67dd79366c8302c38b50", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nxadm/tail/com_github_nxadm_tail-v1.4.4.zip": "c9bb9d05b3afd1bacc35e7d305a22b07cd7db38f5fabd4ccd95a9227c5709890", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/oklog/oklog/com_github_oklog_oklog-v0.3.2.zip": "b37d032de5b0dd5e96063c06b77fcb29a692a07bd52a4d99a361f2fef68822ec", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/oklog/run/com_github_oklog_run-v1.1.0.zip": "d6f69fc71aa155043f926c2a98fc1e5b3a8ebab422f2f36d785cfba38a7ebee4", @@ -838,7 +839,6 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/client/v2/io_etcd_go_etcd_client_v2-v2.305.0.zip": "91fcb507fe8c193844b56bfb6c8741aaeb6ffa11ee9043de2af0f141173679f3", "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/io_etcd_go_etcd-v0.5.0-alpha.5.0.20200910180754-dd1b699fc489.zip": "d982ee501979b41b68625693bad77d15e4ae79ab9d0eae5f6028205f96a74e49", "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/pkg/v3/io_etcd_go_etcd_pkg_v3-v3.0.0-20201109164711-01844fd28560.zip": "1700dfed48becf82ccfe6865fe59daac2121d48f60b7c4bf090f0ff2320d33d4", - "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/raft/v3/io_etcd_go_etcd_raft_v3-v3.0.0-20210320072418-e51c697ec6e8.zip": "62faedd81e10061a4e0d7476865a62b84121ea462514afeaa1b9d66cc53b5a4b", "https://storage.googleapis.com/cockroach-godeps/gomod/go.mongodb.org/mongo-driver/org_mongodb_go_mongo_driver-v1.5.1.zip": "446cff132e82c64af7ffcf48e268eb16ec81f694914aa6baecb06cbbae1be0d7", "https://storage.googleapis.com/cockroach-godeps/gomod/go.mozilla.org/pkcs7/org_mozilla_go_pkcs7-v0.0.0-20200128120323-432b2356ecb1.zip": "3c4c1667907ff3127e371d44696326bad9e965216d4257917ae28e8b82a9e08d", "https://storage.googleapis.com/cockroach-godeps/gomod/go.opencensus.io/io_opencensus_go-v0.23.0.zip": "81c78beb84872084d6d5ddc0a0bffc47294412898472c891a29cfcb66f3fa2d8", diff --git a/go.mod b/go.mod index 4e59392db834..a1a699eac8c1 100644 --- a/go.mod +++ b/go.mod @@ -125,6 +125,7 @@ require ( github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 github.com/pierrre/geohash v1.0.0 github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 + github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 github.com/pressly/goose/v3 v3.5.3 github.com/prometheus/client_golang v1.12.0 @@ -147,7 +148,7 @@ require ( github.com/xdg-go/scram v1.0.2 github.com/xdg-go/stringprep v1.0.2 github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292 - go.etcd.io/etcd/raft/v3 v3.0.0-20210320072418-e51c697ec6e8 + go.etcd.io/etcd/raft/v3 v3.5.4 go.opentelemetry.io/otel v1.0.0-RC3 go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC3 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.0.0-RC3 @@ -296,7 +297,6 @@ require ( github.com/openzipkin/zipkin-go v0.2.5 // indirect github.com/pelletier/go-toml v1.9.3 // indirect github.com/pierrec/lz4 v2.6.0+incompatible // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pkg/profile v1.6.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/pquerna/cachecontrol v0.0.0-20200921180117-858c6e7e6b7e // indirect @@ -366,6 +366,8 @@ replace gopkg.in/yaml.v2 => github.com/cockroachdb/yaml v0.0.0-20210825132133-2d replace github.com/knz/go-libedit => github.com/otan-cockroach/go-libedit v1.10.2-0.20201030151939-7cced08450e7 +replace go.etcd.io/etcd/raft/v3 => github.com/nvanbenschoten/etcd/raft/v3 v3.0.0-20220826205122-68e4f4f51143 + // At the time of writing (i.e. as of this version below) the `etcd` repo is in the process of properly introducing // modules, and as part of that uses an unsatisfiable version for this dependency (v3.0.0-00010101000000-000000000000). // We just force it to the same SHA as the `go.etcd.io/etcd/raft/v3` module (they live in the same VCS root). diff --git a/go.sum b/go.sum index 973c1c7ba756..b66d7193bde7 100644 --- a/go.sum +++ b/go.sum @@ -1736,6 +1736,8 @@ github.com/nightlyone/lockfile v1.0.0 h1:RHep2cFKK4PonZJDdEl4GmkabuhbsRMgk/k3uAm github.com/nightlyone/lockfile v1.0.0/go.mod h1:rywoIealpdNse2r832aiD9jRk8ErCatROs6LzC841CI= github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso= github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= +github.com/nvanbenschoten/etcd/raft/v3 v3.0.0-20220826205122-68e4f4f51143 h1:pe+4QxsScT+K8cpDOJEF8olYpNc0LQI4X1Wk1xruJTw= +github.com/nvanbenschoten/etcd/raft/v3 v3.0.0-20220826205122-68e4f4f51143/go.mod h1:i+srOieUHQl4y/EwlGOpuYtoKG7nb2uhtA/hrFsFTsc= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= @@ -2244,8 +2246,6 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3 go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ= go.etcd.io/etcd/pkg/v3 v3.0.0-20201109164711-01844fd28560 h1:U/PIBuOTa8JXLPKF81Xh7xhIjA0jbpyqFWUPIiT4Ilc= go.etcd.io/etcd/pkg/v3 v3.0.0-20201109164711-01844fd28560/go.mod h1:0HiXlybqS+XtfgnNkiEZWwGXYYEhWsWL8fDVdZzb7is= -go.etcd.io/etcd/raft/v3 v3.0.0-20210320072418-e51c697ec6e8 h1:aRP8pJvbOsFy8SaZ0tcWeV4RYTlp8ZIWS49usJjO4Ac= -go.etcd.io/etcd/raft/v3 v3.0.0-20210320072418-e51c697ec6e8/go.mod h1:i+srOieUHQl4y/EwlGOpuYtoKG7nb2uhtA/hrFsFTsc= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 4427a90cf0e8..abcd486d5fb5 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1126,6 +1126,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/protectedts:protectedts_test", "//pkg/kv/kvserver/raftentry:raftentry", "//pkg/kv/kvserver/raftentry:raftentry_test", + "//pkg/kv/kvserver/raftlog:raftlog", "//pkg/kv/kvserver/raftutil:raftutil", "//pkg/kv/kvserver/raftutil:raftutil_test", "//pkg/kv/kvserver/rangefeed:rangefeed", @@ -2382,6 +2383,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/protectedts/ptstorage:get_x_data", "//pkg/kv/kvserver/protectedts/ptutil:get_x_data", "//pkg/kv/kvserver/raftentry:get_x_data", + "//pkg/kv/kvserver/raftlog:get_x_data", "//pkg/kv/kvserver/raftutil:get_x_data", "//pkg/kv/kvserver/rangefeed:get_x_data", "//pkg/kv/kvserver/rditer:get_x_data", diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 6986e9a4182a..e673bd387e85 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -59,6 +59,7 @@ go_library( "replica_raft.go", "replica_raft_overload.go", "replica_raft_quiesce.go", + "replica_raftlog_writer.go", "replica_raftstorage.go", "replica_range_lease.go", "replica_rangefeed.go", @@ -139,6 +140,7 @@ go_library( "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/raftentry", + "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", diff --git a/pkg/kv/kvserver/raftlog/BUILD.bazel b/pkg/kv/kvserver/raftlog/BUILD.bazel new file mode 100644 index 000000000000..96fd7f02e422 --- /dev/null +++ b/pkg/kv/kvserver/raftlog/BUILD.bazel @@ -0,0 +1,23 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "raftlog", + srcs = ["writer.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv/kvserver/stateloader", + "//pkg/roachpb", + "//pkg/storage", + "//pkg/storage/enginepb", + "//pkg/util/hlc", + "//pkg/util/stop", + "//pkg/util/syncutil", + "@io_etcd_go_etcd_raft_v3//:raft", + "@io_etcd_go_etcd_raft_v3//raftpb", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/raftlog/writer.go b/pkg/kv/kvserver/raftlog/writer.go new file mode 100644 index 000000000000..5791e0916a11 --- /dev/null +++ b/pkg/kv/kvserver/raftlog/writer.go @@ -0,0 +1,466 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package raftlog + +import ( + "context" + "sort" + "sync" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +// Writer is responsible for performing log writes to a collection of replicas' +// raft logs. It exposes an asynchronous interface so that replicas can enqueue +// log writes without waiting for their completion. Instead, completion is +// signalled using a callback interface. +type Writer struct { + eng storage.Engine + cache RaftEntryCache + shards []writerShard + stopped int32 +} + +// writerShard is responsible for a subset of ranges, sharded by range ID. +type writerShard struct { + w *Writer + eventsMu syncutil.Mutex + eventsCond sync.Cond + events []event +} + +// event is a union of different event types that the Writer goroutines needs +// to be informed of. It is used so that all events can be sent over the same +// channel, which is necessary to prevent reordering. +type event struct { + app appendEvent + syncC chan struct{} +} + +type appendEvent struct { + rr RaftRange + rangeID roachpb.RangeID + entries []raftpb.Entry + hardState raftpb.HardState +} + +func NewWriter(eng storage.Engine, cache RaftEntryCache) *Writer { + // NOTE: The optimal number of shards is currently 1. With any value greater + // than one, most calls to sync end up waiting for another shard to complete a + // sync before they are able to sync, so average end-to-end latency doubles. + // + // This demonstrates a limitation of the Pebble sync API, which mandates that + // some goroutine wait on a sync to be notified of durability. An asynchronous + // variant of Pebble's API (similar to Pebble's own syncQueue) that is hooked + // directly into Pebble's LogWriter.flushLoop would help. + const shards = 1 + w := &Writer{ + eng: eng, + cache: cache, + shards: make([]writerShard, shards), + } + for i := range w.shards { + s := &w.shards[i] + s.w = w + s.eventsCond.L = &s.eventsMu + } + return w +} + +func (w *Writer) Start(stopper *stop.Stopper) { + ctx := context.Background() + waitQuiesce := func(context.Context) { + <-stopper.ShouldQuiesce() + w.stop() + } + // TODO: hook up to scheduler. + //_ = stopper.RunAsyncTaskEx(ctx, + // stop.TaskOpts{ + // TaskName: "raftlog-writer-wait-quiesce", + // // This task doesn't reference a parent because it runs for the server's + // // lifetime. + // SpanOpt: stop.SterileRootSpan, + // }, + // waitQuiesce) + // + //for i := 0; i < len(w.pipeline.stages); i++ { + // _ = stopper.RunAsyncTaskEx(ctx, + // stop.TaskOpts{ + // TaskName: "raftlog-writer-worker", + // // This task doesn't reference a parent because it runs for the server's + // // lifetime. + // SpanOpt: stop.SterileRootSpan, + // }, + // w.writerLoop) + //} + go waitQuiesce(ctx) + for i := range w.shards { + s := &w.shards[i] + go s.writerLoop(ctx) + } +} + +func (w *Writer) pushEvent(rangeID roachpb.RangeID, ev event) { + shard := &w.shards[int(rangeID)%len(w.shards)] + shard.eventsMu.Lock() + wasEmpty := len(shard.events) == 0 + shard.events = append(shard.events, ev) + shard.eventsMu.Unlock() + if wasEmpty { + shard.eventsCond.Signal() + } +} + +func (w *Writer) Append(rangeID roachpb.RangeID, rr RaftRange, rd raft.Ready) { + w.pushEvent(rangeID, event{app: appendEvent{ + rr: rr, + rangeID: rangeID, + entries: rd.Entries, + hardState: rd.HardState, + }}) +} + +func (w *Writer) Sync(rangeID roachpb.RangeID) { + ch := make(chan struct{}) + w.pushEvent(rangeID, event{syncC: ch}) + // TODO: handle shutdown? + <-ch +} + +func (s *writerShard) writerLoop(ctx context.Context) { + var recycled []event + var rangeIDs []roachpb.RangeID + workByRangeID := make(map[roachpb.RangeID]work) + for { + events, ok := s.waitForEvents(recycled) + if !ok { + return + } + appends, syncs := s.splitEvents(events) + + s.prepareAppends(appends, &rangeIDs, workByRangeID) + batch := s.w.eng.NewUnindexedBatch(false /* writeOnly */) + s.stageAppends(ctx, rangeIDs, workByRangeID, batch) + s.commitAppends(rangeIDs, workByRangeID, batch) + s.processSyncEvents(syncs) + + // Recycle data structures. + for i := range events { + events[i] = event{} + } + recycled = events[:0] + rangeIDs = rangeIDs[:0] + for i := range workByRangeID { + delete(workByRangeID, i) + } + } +} + +func (s *writerShard) waitForEvents(recycled []event) ([]event, bool) { + s.eventsMu.Lock() + defer s.eventsMu.Unlock() + for { + if s.w.isStopped() { + return nil, false + } + if len(s.events) > 0 { + events := s.events + s.events = recycled + return events, true + } + s.eventsCond.Wait() + } +} + +func (s *writerShard) splitEvents(events []event) (appends, syncs []event) { + // Stable sort, append up front, by range ID, then sync. + sort.SliceStable(events, func(i, j int) bool { + // Sync events sort last. + if events[i].syncC != nil { + return false + } + if events[j].syncC != nil { + return true + } + + // Append events sort by range ID. + return events[i].app.rangeID < events[j].app.rangeID + }) + i := sort.Search(len(events), func(i int) bool { + return events[i].syncC != nil + }) + return events[:i], events[i:] +} + +type work struct { + entSlices []event + meta RaftLogMetadata +} + +func (s *writerShard) prepareAppends( + appends []event, rangeIDs *[]roachpb.RangeID, workByRangeID map[roachpb.RangeID]work, +) { + for i := 0; i < len(appends); { + rangeID := appends[i].app.rangeID + j := i + 1 + for j < len(appends) { + if appends[j].app.rangeID != rangeID { + break + } + j++ + } + *rangeIDs = append(*rangeIDs, rangeID) + workByRangeID[rangeID] = work{ + entSlices: appends[i:j], + meta: appends[i].app.rr.GetRaftLogMetadata(), + } + i = j + } + + // Remove duplicate entries. Entries with an index from later batches replace + // entries with the same index from earlier batches. + for _, rangeID := range *rangeIDs { + entrySlices := workByRangeID[rangeID].entSlices + for i := len(entrySlices) - 1; i > 0; i-- { + laterEnts := entrySlices[i].app.entries + if len(laterEnts) == 0 { + continue + } + for j := i - 1; j >= 0; j-- { + earlierEnts := entrySlices[j].app.entries + if len(earlierEnts) == 0 { + continue + } + idxOffset := int(laterEnts[0].Index) - int(earlierEnts[0].Index) + if idxOffset <= 0 { + entrySlices[j].app.entries = nil + continue + } + if idxOffset < len(earlierEnts) { + entrySlices[j].app.entries = entrySlices[j].app.entries[:idxOffset] + } + break + } + } + } +} + +func (s *writerShard) stageAppends( + ctx context.Context, + rangeIDs []roachpb.RangeID, + workByRangeID map[roachpb.RangeID]work, + batch storage.Batch, +) { + for _, rangeID := range rangeIDs { + rangeWork := workByRangeID[rangeID] + for _, app := range rangeWork.entSlices { + var err error + rangeWork.meta, err = s.processPreAppend(ctx, &app.app, rangeWork.meta, batch) + if err != nil { + panic(err) + } + } + workByRangeID[rangeID] = rangeWork + } +} + +func (s *writerShard) processPreAppend( + ctx context.Context, app *appendEvent, meta RaftLogMetadata, batch storage.Batch, +) (RaftLogMetadata, error) { + if !raft.IsEmptyHardState(app.hardState) { + // NB: Note that without additional safeguards, it's incorrect to write + // the HardState before appending rd.Entries. When catching up, a follower + // will receive Entries that are immediately Committed in the same + // Ready. If we persist the HardState but happen to lose the Entries, + // assertions can be tripped. + // + // We have both in the same batch, so there's no problem. If that ever + // changes, we must write and sync the Entries before the HardState. + if err := app.rr.StateLoader().SetHardState(ctx, batch, app.hardState); err != nil { + return RaftLogMetadata{}, err + } + } + + thinEntries, sideLoadedSize, err := app.rr.MaybeSideloadEntries(ctx, app.entries) + if err != nil { + return RaftLogMetadata{}, err + } + meta.LogSize += sideLoadedSize + + meta.LastIndex, meta.LastTerm, meta.LogSize, err = appendEntries( + ctx, batch, app.rr.StateLoader(), meta.LastIndex, meta.LastTerm, meta.LogSize, thinEntries) + if err != nil { + return RaftLogMetadata{}, err + } + + // Update raft log entry cache. We clear any older, uncommitted log entries + // and cache the latest ones. + s.w.cache.Add(app.rangeID, app.entries, true /* truncate */) + + return meta, nil +} + +func (s *writerShard) commitAppends( + rangeIDs []roachpb.RangeID, workByRangeID map[roachpb.RangeID]work, batch storage.Batch, +) { + if err := batch.Commit(true); err != nil { + panic(err) + } + batch.Close() + + // Notify each range about the sync. + for _, rangeID := range rangeIDs { + rangeWork := workByRangeID[rangeID] + for i := len(rangeWork.entSlices) - 1; i >= 0; i-- { + app := &rangeWork.entSlices[len(rangeWork.entSlices)-1].app + if len(app.entries) > 0 { + rd := raft.Ready{Entries: app.entries} + app.rr.LogStableTo(rd, rangeWork.meta) + break + } + } + } +} + +// TODO: how does this work? +//func (w *Writer) processPostAppend(app *appendEvent) { +// //// We may have just overwritten parts of the log which contain +// //// sideloaded SSTables from a previous term (and perhaps discarded some +// //// entries that we didn't overwrite). Remove any such leftover on-disk +// //// payloads (we can do that now because we've committed the deletion +// //// just above). +// //firstPurge := app.entries[0].Index // first new entry written +// //purgeTerm := app.entries[0].Term - 1 +// //lastPurge := prevLastIndex // old end of the log, include in deletion +// //purgedSize, err := app.rr.MaybePurgeSideloaded(ctx, firstPurge, lastPurge, purgeTerm) +// //if err != nil { +// // return RaftLogMetadata{}, err +// //} +// //meta.LogSize -= purgedSize +// +// // Update raft log entry cache. We clear any older, uncommitted log entries +// // and cache the latest ones. +// w.cache.Add(app.rangeID, app.entries, true /* truncate */) +// +// return +//} + +func (s *writerShard) processSyncEvents(events []event) { + for i := range events { + if c := events[i].syncC; c != nil { + close(c) + } + } +} + +func (w *Writer) stop() { + atomic.StoreInt32(&w.stopped, 1) + for i := range w.shards { + w.shards[i].eventsCond.Broadcast() + } +} + +func (w *Writer) isStopped() bool { + return atomic.LoadInt32(&w.stopped) > 0 +} + +type RaftLogMetadata struct { + LastIndex uint64 + LastTerm uint64 + LogSize int64 +} + +// RaftRange is a handle to a Replica. +type RaftRange interface { + StateLoader() stateloader.StateLoader + GetRaftLogMetadata() RaftLogMetadata + LogStableTo(raft.Ready, RaftLogMetadata) + MaybeSideloadEntries(context.Context, []raftpb.Entry) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) + //MaybePurgeSideloaded(_ context.Context, firstIndex, LastIndex, term uint64) (size int64, _ error) +} + +// RaftEntryCache is a specialized data structure for storing deserialized +// raftpb.Entry values tailored to the access patterns of the storage package. +type RaftEntryCache interface { + Add(id roachpb.RangeID, ents []raftpb.Entry, truncate bool) +} + +// append the given entries to the raft log. Takes the previous values of +// r.mu.LastIndex, r.mu.LastTerm, and r.mu.LogSize, and returns new values. +// We do this rather than modifying them directly because these modifications +// need to be atomic with the commit of the batch. This method requires that +// r.raftMu is held. +// +// append is intentionally oblivious to the existence of sideloaded proposals. +// They are managed by the caller, including cleaning up obsolete on-disk +// payloads in case the log tail is replaced. +func appendEntries( + ctx context.Context, + batch storage.Batch, + stateLoader stateloader.StateLoader, + prevLastIndex uint64, + prevLastTerm uint64, + prevRaftLogSize int64, + entries []raftpb.Entry, +) (uint64, uint64, int64, error) { + if len(entries) == 0 { + return prevLastIndex, prevLastTerm, prevRaftLogSize, nil + } + prefix := stateLoader.RaftLogPrefix() + var diff enginepb.MVCCStats + var value roachpb.Value + for i := range entries { + ent := &entries[i] + key := keys.RaftLogKeyFromPrefix(prefix, ent.Index) + + if err := value.SetProto(ent); err != nil { + return 0, 0, 0, err + } + value.InitChecksum(key) + var err error + if ent.Index > prevLastIndex { + err = storage.MVCCBlindPut(ctx, batch, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */) + } else { + err = storage.MVCCPut(ctx, batch, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */) + } + if err != nil { + return 0, 0, 0, err + } + } + + lastIndex := entries[len(entries)-1].Index + lastTerm := entries[len(entries)-1].Term + // Delete any previously appended log entries which never committed. + if prevLastIndex > 0 { + for i := lastIndex + 1; i <= prevLastIndex; i++ { + // Note that the caller is in charge of deleting any sideloaded payloads + // (which they must only do *after* the batch has committed). + key := keys.RaftLogKeyFromPrefix(prefix, i) + _, err := storage.MVCCDelete(ctx, batch, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil) + if err != nil { + return 0, 0, 0, err + } + } + } + + raftLogSize := prevRaftLogSize + diff.SysBytes + return lastIndex, lastTerm, raftLogSize, nil +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 5617a9d19add..48821695fb2f 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -327,6 +327,9 @@ type Replica struct { // range descriptor. When it is temporarily dropped and recreated, the // newly recreated replica will have a complete range descriptor. lastToReplica, lastFromReplica roachpb.ReplicaDescriptor + + logWriterStateLoader stateloader.StateLoader + logWriterReady raft.Ready } // Contains the lease history when enabled. diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index ba9518b08d3f..8c5276614510 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -90,6 +90,10 @@ func (r *Replica) preDestroyRaftMuLocked( log.Fatalf(ctx, "replica not marked as destroyed before call to preDestroyRaftMuLocked: %v", r) } + // Sync the async raft log writer so that nothing new will be written to the + // range-local keyspace after we clear the range's data. + r.store.raftLogWriter.Sync(r.RangeID) + err := clearRangeData(desc, reader, writer, clearRangeIDLocalOnly, mustUseClearRange) if err != nil { return err diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 18555e0761b1..0de71fd04cd2 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -123,6 +123,7 @@ func newUnloadedReplica( // replica GC issues, but is a distraction at the moment. // r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r))) r.raftMu.stateLoader = stateloader.Make(desc.RangeID) + r.raftMu.logWriterStateLoader = stateloader.Make(desc.RangeID) r.splitQueueThrottle = util.Every(splitQueueThrottleDuration) r.mergeQueueThrottle = util.Every(mergeQueueThrottleDuration) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index cb5aa863c4d9..9a4ea09ef74c 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -689,11 +689,14 @@ func (r *Replica) handleRaftReadyRaftMuLocked( var rd raft.Ready r.mu.Lock() lastIndex := r.mu.lastIndex // used for append below - lastTerm := r.mu.lastTerm - raftLogSize := r.mu.raftLogSize leaderID := r.mu.leaderID lastLeaderID := leaderID err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { + if len(r.raftMu.logWriterReady.Entries) != 0 { + raftGroup.StableTo(r.raftMu.logWriterReady) + r.raftMu.logWriterReady = raft.Ready{} + } + numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup) if err != nil { return false, err @@ -773,6 +776,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked( log.Fatalf(ctx, "incoming snapshot id doesn't match raft snapshot id: %s != %s", snapUUID, inSnap.SnapUUID) } + r.store.raftLogWriter.Sync(r.RangeID) + // Applying this snapshot may require us to subsume one or more of our right // neighbors. This occurs if this replica is informed about the merges via a // Raft snapshot instead of a MsgApp containing the merge commits, e.g., @@ -789,15 +794,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( stats.tSnapEnd = timeutil.Now() stats.snap.applied = true - // r.mu.lastIndex, r.mu.lastTerm and r.mu.raftLogSize were updated in - // applySnapshot, but we also want to make sure we reflect these changes in - // the local variables we're tracking here. - r.mu.RLock() - lastIndex = r.mu.lastIndex - lastTerm = r.mu.lastTerm - raftLogSize = r.mu.raftLogSize - r.mu.RUnlock() - // We refresh pending commands after applying a snapshot because this // replica may have been temporarily partitioned from the Raft group and // missed leadership changes that occurred. Suppose node A is the leader, @@ -906,111 +902,22 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // durably synced. // See: // https://github.com/etcd-io/etcd/issues/7625#issuecomment-489232411 + // TODO: remove this splitting logic. It's not needed anymore. msgApps, otherMsgs := splitMsgApps(rd.Messages) r.traceMessageSends(msgApps, "sending msgApp") r.sendRaftMessagesRaftMuLocked(ctx, msgApps, pausedFollowers) - // Use a more efficient write-only batch because we don't need to do any - // reads from the batch. Any reads are performed on the underlying DB. - batch := r.store.Engine().NewUnindexedBatch(false /* writeOnly */) - defer batch.Close() - - prevLastIndex := lastIndex - if len(rd.Entries) > 0 { - stats.tAppendBegin = timeutil.Now() - // All of the entries are appended to distinct keys, returning a new - // last index. - thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := r.maybeSideloadEntriesRaftMuLocked(ctx, rd.Entries) - if err != nil { - const expl = "during sideloading" - return stats, expl, errors.Wrap(err, expl) - } - raftLogSize += sideLoadedEntriesSize - if lastIndex, lastTerm, raftLogSize, err = r.append( - ctx, batch, lastIndex, lastTerm, raftLogSize, thinEntries, - ); err != nil { - const expl = "during append" - return stats, expl, errors.Wrap(err, expl) - } - stats.appendedRegularCount += len(thinEntries) - numSideloaded - stats.appendedRegularBytes += otherEntriesSize - stats.appendedSideloadedCount += numSideloaded - stats.appendedSideloadedBytes += sideLoadedEntriesSize - stats.tAppendEnd = timeutil.Now() - } - - if !raft.IsEmptyHardState(rd.HardState) { - if !r.IsInitialized() && rd.HardState.Commit != 0 { - log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState) - } - // NB: Note that without additional safeguards, it's incorrect to write - // the HardState before appending rd.Entries. When catching up, a follower - // will receive Entries that are immediately Committed in the same - // Ready. If we persist the HardState but happen to lose the Entries, - // assertions can be tripped. - // - // We have both in the same batch, so there's no problem. If that ever - // changes, we must write and sync the Entries before the HardState. - if err := r.raftMu.stateLoader.SetHardState(ctx, batch, rd.HardState); err != nil { - const expl = "during setHardState" - return stats, expl, errors.Wrap(err, expl) - } - } - // Synchronously commit the batch with the Raft log entries and Raft hard - // state as we're promising not to lose this data. - // - // Note that the data is visible to other goroutines before it is synced to - // disk. This is fine. The important constraints are that these syncs happen - // before Raft messages are sent and before the call to RawNode.Advance. Our - // regular locking is sufficient for this and if other goroutines can see the - // data early, that's fine. In particular, snapshots are not a problem (I - // think they're the only thing that might access log entries or HardState - // from other goroutines). Snapshots do not include either the HardState or - // uncommitted log entries, and even if they did include log entries that - // were not persisted to disk, it wouldn't be a problem because raft does not - // infer the that entries are persisted on the node that sends a snapshot. - stats.tPebbleCommitBegin = timeutil.Now() - stats.pebbleBatchBytes = int64(batch.Len()) - sync := rd.MustSync && !disableSyncRaftLog.Get(&r.store.cfg.Settings.SV) - if err := batch.Commit(sync); err != nil { - const expl = "while committing batch" - return stats, expl, errors.Wrap(err, expl) - } - stats.sync = sync - stats.tPebbleCommitEnd = timeutil.Now() - if rd.MustSync { - r.store.metrics.RaftLogCommitLatency.RecordValue( - stats.tPebbleCommitEnd.Sub(stats.tPebbleCommitBegin).Nanoseconds()) - } - - if len(rd.Entries) > 0 { - // We may have just overwritten parts of the log which contain - // sideloaded SSTables from a previous term (and perhaps discarded some - // entries that we didn't overwrite). Remove any such leftover on-disk - // payloads (we can do that now because we've committed the deletion - // just above). - firstPurge := rd.Entries[0].Index // first new entry written - purgeTerm := rd.Entries[0].Term - 1 - lastPurge := prevLastIndex // old end of the log, include in deletion - purgedSize, err := maybePurgeSideloaded(ctx, r.raftMu.sideloaded, firstPurge, lastPurge, purgeTerm) - if err != nil { - const expl = "while purging sideloaded storage" - return stats, expl, err - } - raftLogSize -= purgedSize - if raftLogSize < 0 { - // Might have gone negative if node was recently restarted. - raftLogSize = 0 + if len(rd.Entries) > 0 || !raft.IsEmptyHardState(rd.HardState) { + r.store.raftLogWriter.Append(r.RangeID, (*replicaRaftLog)(r), rd) + if rd.MustSync { + r.store.raftLogWriter.Sync(r.RangeID) } } // Update protected state - last index, last term, raft log size, and raft // leader ID. r.mu.Lock() - r.mu.lastIndex = lastIndex - r.mu.lastTerm = lastTerm - r.mu.raftLogSize = raftLogSize var becameLeader bool if r.mu.leaderID != leaderID { r.mu.leaderID = leaderID @@ -1027,9 +934,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.replicateQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) } - // Update raft log entry cache. We clear any older, uncommitted log entries - // and cache the latest ones. - r.store.raftEntryCache.Add(r.RangeID, rd.Entries, true /* truncate */) r.sendRaftMessagesRaftMuLocked(ctx, otherMsgs, nil /* blocked */) r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") diff --git a/pkg/kv/kvserver/replica_raftlog_writer.go b/pkg/kv/kvserver/replica_raftlog_writer.go new file mode 100644 index 000000000000..485017d10330 --- /dev/null +++ b/pkg/kv/kvserver/replica_raftlog_writer.go @@ -0,0 +1,55 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +type replicaRaftLog Replica + +var _ raftlog.RaftRange = &replicaRaftLog{} + +func (r *replicaRaftLog) StateLoader() stateloader.StateLoader { + return r.raftMu.logWriterStateLoader +} + +func (r *replicaRaftLog) MaybeSideloadEntries( + ctx context.Context, entries []raftpb.Entry, +) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) { + thinEntries, _, sideloadedEntriesSize, _, err := maybeSideloadEntriesImpl(ctx, entries, r.raftMu.sideloaded) + return thinEntries, sideloadedEntriesSize, err +} + +func (r *replicaRaftLog) GetRaftLogMetadata() raftlog.RaftLogMetadata { + r.mu.RLock() + defer r.mu.RUnlock() + return raftlog.RaftLogMetadata{ + LastIndex: r.mu.lastIndex, + LastTerm: r.mu.lastTerm, + LogSize: r.mu.raftLogSize, + } +} + +func (r *replicaRaftLog) LogStableTo(rd raft.Ready, meta raftlog.RaftLogMetadata) { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.lastIndex = meta.LastIndex + r.mu.lastTerm = meta.LastTerm + r.mu.raftLogSize = meta.LogSize + r.raftMu.logWriterReady = rd + r.store.enqueueRaftUpdateCheck(r.RangeID) +} diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 788ae283ba35..2b018e6e4efb 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -635,84 +635,6 @@ func snapshot( }, nil } -// append the given entries to the raft log. Takes the previous values of -// r.mu.lastIndex, r.mu.lastTerm, and r.mu.raftLogSize, and returns new values. -// We do this rather than modifying them directly because these modifications -// need to be atomic with the commit of the batch. This method requires that -// r.raftMu is held. -// -// append is intentionally oblivious to the existence of sideloaded proposals. -// They are managed by the caller, including cleaning up obsolete on-disk -// payloads in case the log tail is replaced. -// -// NOTE: This method takes a engine.Writer because reads are unnecessary when -// prevLastIndex is 0 and prevLastTerm is invalidLastTerm. In the case where -// reading is necessary (I.E. entries are getting overwritten or deleted), a -// engine.ReadWriter must be passed in. -func (r *Replica) append( - ctx context.Context, - writer storage.Writer, - prevLastIndex uint64, - prevLastTerm uint64, - prevRaftLogSize int64, - entries []raftpb.Entry, -) (uint64, uint64, int64, error) { - if len(entries) == 0 { - return prevLastIndex, prevLastTerm, prevRaftLogSize, nil - } - prefix := r.raftMu.stateLoader.RaftLogPrefix() - var diff enginepb.MVCCStats - var value roachpb.Value - for i := range entries { - ent := &entries[i] - key := keys.RaftLogKeyFromPrefix(prefix, ent.Index) - - if err := value.SetProto(ent); err != nil { - return 0, 0, 0, err - } - value.InitChecksum(key) - var err error - if ent.Index > prevLastIndex { - err = storage.MVCCBlindPut(ctx, writer, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */) - } else { - // We type assert `writer` to also be an engine.ReadWriter only in - // the case where we're replacing existing entries. - eng, ok := writer.(storage.ReadWriter) - if !ok { - panic("expected writer to be a engine.ReadWriter when overwriting log entries") - } - err = storage.MVCCPut(ctx, eng, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */) - } - if err != nil { - return 0, 0, 0, err - } - } - - lastIndex := entries[len(entries)-1].Index - lastTerm := entries[len(entries)-1].Term - // Delete any previously appended log entries which never committed. - if prevLastIndex > 0 { - // We type assert `writer` to also be an engine.ReadWriter only in the - // case where we're deleting existing entries. - eng, ok := writer.(storage.ReadWriter) - if !ok { - panic("expected writer to be a engine.ReadWriter when deleting log entries") - } - for i := lastIndex + 1; i <= prevLastIndex; i++ { - // Note that the caller is in charge of deleting any sideloaded payloads - // (which they must only do *after* the batch has committed). - _, err := storage.MVCCDelete(ctx, eng, &diff, keys.RaftLogKeyFromPrefix(prefix, i), - hlc.Timestamp{}, hlc.ClockTimestamp{}, nil) - if err != nil { - return 0, 0, 0, err - } - } - } - - raftLogSize := prevRaftLogSize + diff.SysBytes - return lastIndex, lastTerm, raftLogSize, nil -} - // updateRangeInfo is called whenever a range is updated by ApplySnapshot // or is created by range splitting to setup the fields which are // uninitialized or need updating. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b69d7c639a22..5b43d0b1873b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" @@ -928,6 +929,8 @@ type Store struct { scheduler *raftScheduler + raftLogWriter *raftlog.Writer + // livenessMap is a map from nodeID to a bool indicating // liveness. It is updated periodically in raftTickLoop() // and reactively in nodeIsLiveCallback() on liveness updates. @@ -1237,6 +1240,8 @@ func NewStore( s.raftEntryCache = raftentry.NewCache(cfg.RaftEntryCacheSize) s.metrics.registry.AddMetricStruct(s.raftEntryCache.Metrics()) + s.raftLogWriter = raftlog.NewWriter(s.engine, s.raftEntryCache) + s.coalescedMu.Lock() s.coalescedMu.heartbeats = map[roachpb.StoreIdent][]kvserverpb.RaftHeartbeat{} s.coalescedMu.heartbeatResponses = map[roachpb.StoreIdent][]kvserverpb.RaftHeartbeat{} diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index eefd61f34f95..651a8b242872 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -709,6 +709,8 @@ func (s *Store) processRaft(ctx context.Context) { s.scheduler.Wait(ctx) } + s.raftLogWriter.Start(s.stopper) + _ = s.stopper.RunAsyncTask(ctx, "sched-tick-loop", s.raftTickLoop) _ = s.stopper.RunAsyncTask(ctx, "coalesced-hb-loop", s.coalescedHeartbeatsLoop) s.stopper.AddCloser(stop.CloserFn(func() { diff --git a/vendor b/vendor index de32bf616c1b..13d49285f530 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit de32bf616c1ba93bbcbecf6691698ed081d0c72b +Subproject commit 13d49285f530221a7c22d86809d0ab9dd28f1da9