Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: use IngestExternalFile instead of WriteBatches for applying snapshots #16954

Closed
petermattis opened this issue Jul 10, 2017 · 33 comments
Closed
Assignees
Labels
A-kv-distribution Relating to rebalancing and leasing. C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception)

Comments

@petermattis
Copy link
Collaborator

Sending a snapshot of a range currently involves generating a series of 256KB WriteBatches which are then applied on the recipient. Per the recent work to use the RocksDB IngestExternalFile functionality for restore, it would likely be significantly faster on the recipient to send an sstable instead. Using IngestExternalFile for applying snapshots should be straightforward in comparison to the usage within restore as we don't have any of the complexity of Raft involved. The sender can create the sstable locally and then stream it to the recipient. And because we send a header before streaming the snapshot, we can probe whether the recipient supports receiving an sstable snapshot which will make the migration story easier.

There is no particular urgency here as applying snapshots is not currently a bottleneck.

@danhhz Do you have benchmark numbers readily available showing the performance difference between IngestExternalFile and applying the equivalent WriteBatches?

@petermattis petermattis added this to the Later milestone Jul 10, 2017
@tbg
Copy link
Member

tbg commented Jul 10, 2017

One "problem" is that we always put a range tombstone down right before the ingestion, which would force the SSTable into higher layers. That said, it's probably still faster than what we have today.

@bdarnell
Copy link
Contributor

Since the receiving side could stream the sstable straight to disk, we could also avoid holding the entire snapshot in memory at once on the recipient. This would be an important step towards being able to increase the default range size from 64MB.

@tbg tbg self-assigned this Dec 6, 2017
@tbg
Copy link
Member

tbg commented Dec 7, 2017

I looked into this in light of a privately reported user issue (the fundamental problem in which is that we at some point added a check that prevents large snapshots from being sent).

I was initially hopeful that this was fully straightforward, but now it seems that I've written an RFC draft and think this needs some care. This whole thing seems workable though and worth tackling, though it definitely merits some more preliminary discussion here and then an RFC.

Ok, here we go:

The receiving side will always end up with an SSTable on disk which can then be ingested. On the sender, there are two options:

  1. construct an SSTable on and stream that over. Pro: less bandwidth used because SSTable blocks can be compressed.
  2. stream the keys in some other format and write them into an SSTable at the receiver. Pro: a little more aligned with the current code, and thus a smaller change.

I'll assume 1. because it seems like the right thing to do™. We have to do some experimentation to decide whether we need to preserve both paths, but I hope not. We can always stream an SSTable, but perhaps we a) avoid flushing it to disk unless it grows large enough and b) avoid ingesting it if it's too small, reading it into a WriteBatch instead. At the end of this, we may also consider making the default range size closer to our target L6 SSTable size (128mb?).

We need to extend RocksDBSstFileWriter so that it allows us to access the SSTable as it's being written (right now, it holds it all in memory and only gives it back on Finish()). Instead, we want an API like this:

// Add adds the supplied key-value pair into the SSTable. The returned
// boolean is true if the caller may retrieve the next chunk of the SSTable
// via `Fetch` if they desire to do so.
func (fw *RocksDBSstFileWriter) Add(kv MVCCKeyValue) (bool, error)

// Fetch retrieves the data that has been flushed into the SSTable
// but not retrieved yet (by an earlier invocation of Fetch). By calling `Fetch`
// every time `Add` returns `true`, the caller can retrieve the constructed SSTable
// piecemeal.
func (fw *RocksDBSstFileWriter) Fetch() ([]byte, error)

// FinishAndFetch finalizes the SSTable and calls `Fetch`. `Close` must still
// be called, but the instance must not be used other than that.
func (fw *RocksDBSstFileWriter) FinishAndFetch() ([]byte, error)

// Close idempotently releases the underlying resources attached.
func (fw *RocksDBSstFileWriter) Close()

Assuming we have that, the sending side uses an RocksDBSstFileWriter in its iteration over the user-space keys and streams straight to the connection (keeping the rate limiting intact).

There's a caveat here, which is that we use a ReplicaDataIterator but we really only want to put the main chunk of userspace keys into an SSTable (holes are bad). We should augment the iterator suitably so that we can detect when we're jumping a gap. Other chunks may be large, too, so the most straightforward solution might be to just make the code adaptive from the start: always stream an SSTable, but send a marker RPC every time the chunk changes, so that the recipient can decide whether the current chunk should be ingested or loaded via a WriteBatch.

Log entries deserve a mention. These will have to keep special treatment as log entries will have inlined sideloading payloads which applySnapshot sideloads again, so we don't ever want to ingest these. Log entries are already plenty special, as they're not rate limited on the [sendsnapshot][sender] and could be quite large due to inlined payloads. As a first pass, it seems reasonable to leave them alone.

As a corollary caveat (and I think the largest complexity with this approach), the recipient has an atomicity problem: we'll definitely end up with something to ingest (the ~64mb-\infty of user data) and something to apply via WriteBatch. This implies two constraints, namely ordering: we must only make the range visible when it's been written completely (this implies that the RangeDescriptor must be written last, etc) but it also requires us to have a way to recover from an ill-timed crash or I/O error during snapshot application.

There might be a better way to address that, but one I can think off right now is to first write (and sync) a marker key to RocksDB that we delete at the end of a snapshot application; that we make sure to remove all partial state potentially written by an earlier snapshot before writing our own data; and clearing that marker (atomically with or after syncing) the final write which puts down the RangeDescriptor. I haven't thought this through, but potentially Raft tombstones could be augmented to carry a sort of "dirty bit" (which can be cleared by applying a snapshot successfully or GC'ing the replica). Recovering from an I/O error in particular will be tricky. We have to remove the replica from the store and also make sure it isn't used (in meaningful ways) by anyone who might be holding on to it. It's easier to recover from a crash as we'll see the dirty bit and get to clean it up eagerly.

It seems pretty bad to render a replica unusable after a failed snapshot application, but keep in mind that the "failure" (assuming no bugs) would be an I/O error, and we generally crash on those intentionally (to avoid undefined behavior). Additionally, a snapshot is either preemptive (in which case there isn't really a Replica) or Raft-trigger (in which case the Replica is so far behind that it is effectively not participating much in the Range, though it does get to vote).

@tbg
Copy link
Member

tbg commented Dec 7, 2017

@petermattis am I correct in assuming that ingesting a "everything-covering" SSTable is a terrible idea? Do you think there's any chance of teaching RocksDB to ingest a set of SSTables plus WriteBatches atomically?

@petermattis
Copy link
Collaborator Author

am I correct in assuming that ingesting a "everything-covering" SSTable is a terrible idea?

Yeah, I'm pretty sure that would be very bad.

Do you think there's any chance of teaching RocksDB to ingest a set of SSTables plus WriteBatches atomically?

I don't know. I think the sstable ingestion is atomic due to writing the manifest. But WriteBatches are atomic due to writing to the WAL. I'm not sure if it if would be possible to connect to the two atomic mechanisms.

@danhhz
Copy link
Contributor

danhhz commented Dec 7, 2017

An "everything-covering" sstable effectively just means that it will be ingested at L0, which is fine if it's small, but probably bad if it's big.

RocksDB has assured us that it will be Hard(tm) to make an atomic sstable ingestion + writebatch application, but ingesting 2 sstables is already atomic. Is there any way we can break up what you're doing into a large sstable that affects only a small keyrange and a small sstable that covers everything? Perhaps that could be made to work, though it seems to close enough to the margin of what RocksDB has imagined that I'd try it before committing to that plan

@tbg
Copy link
Member

tbg commented Dec 7, 2017

@danhhz using only SSTables might be one way out. I'm not clear how bad a small everything-covering SSTable would be. BTW, are you saying 2 sstables are atomic or any number of SSTables is atomic? I wonder if it'd be better to ingest ~10 small SSTables and one large one if that makes these small ones "gapless".

@danhhz
Copy link
Contributor

danhhz commented Dec 7, 2017

any number. IngestExternalFile takes a vector of paths and applies them all atomically

@danhhz
Copy link
Contributor

danhhz commented Dec 7, 2017

i'm reading over the code again and just noticed that the files you ingest must themselves be mutually non-overlapping. dunno if that's relevant but fyi

it does appear to place each file individually in the lowest level where it doesn't overlap anything, so you should indeed be able to atomically have small, overlappy ones end up in L0 and big, non-overlappy ones in lower levels

One thing you do have to consider in the snapshot case is whether the range of data you're ingesting has any recent writes. If so, you could still end up in the bad case

@tbg
Copy link
Member

tbg commented Dec 9, 2017

@danhhz using only SSTables is a valid idea. That might work! I think the log entries can be streamlined by changing the API: (receiver permitting), send slim entries and send the files separately. The receiver can first populate the sideloaded storage and then do the ingestion.

@tbg
Copy link
Member

tbg commented Dec 9, 2017

we'd likely want to send multiple SSTables anyway, right? We have a target size of ~128MB I think in L6, so that seems like the size at which we'd want to start a new one. Or is that not necessary/worse than sending a huge one (for the user data, i.e. without holes)?

Does RocksDB ever have to load SSTables into memory (I hope not)? If so, large SSTables would be problematic anyway. I'm not sure if atomically ingesting N smaller SSTables would be any better, though.

@petermattis
Copy link
Collaborator Author

RocksDB streams sstables into memory while compacting. And it loads/pins sstable index blocks into memory for normal reads, but these index blocks are a small fraction of the sstable size. The only downside to large sstables that I'm aware of are they make certain compactions slower which ties up resources that could be used for other (higher priority) compactions. The downside to smaller sstables is we can either run out of file descriptors, or thrash the cache of open sstables.

@tbg
Copy link
Member

tbg commented Dec 9, 2017

@petermattis in light of that, sticking to ~128mb SSTables seems reasonable?

nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Dec 9, 2017
In a privately reported user issue, we've seen that [our attempts](cockroachdb#7788)
at [preventing large snapshots](cockroachdb#7581)
can result in replica unavailability. Our current approach to limiting
large snapshots assumes is that its ok to block snapshots indefinitely
while waiting for a range to first split. Unfortunately, this can create
a dependency cycle where a range requires a snapshot to split (because it
can't achieve an up-to-date quorum without it) but isn't allowed to perform
a snapshot until its size is reduced below the threshold. This can result
in unavailability even when a majority of replicas remain live.

Currently, we still need this snapshot size limit because unbounded snapshots
can result in OOM errors that crash entire nodes. However, once snapshots
are streamed from disk to disk, never needing to buffer in-memory on the
sending or receiving side, we should be able to remove any snapshot size
limit (see cockroachdb#16954).

As a holdover, this change introduces a `permitLargeSnapshots` flag on a
replica which is set when the replica is too large to snapshot but observes
splits failing. When set, the flag allows snapshots to ignore the size
limit until the snapshot goes through and splits are able to succeed
again.

Release note: None
@petermattis
Copy link
Collaborator Author

@petermattis in light of that, sticking to ~128mb SSTables seems reasonable?

Yes. I'm not aware of any compelling reason to adjust that size right now.

nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Dec 12, 2017
In a privately reported user issue, we've seen that [our attempts](cockroachdb#7788)
at [preventing large snapshots](cockroachdb#7581)
can result in replica unavailability. Our current approach to limiting
large snapshots assumes is that its ok to block snapshots indefinitely
while waiting for a range to first split. Unfortunately, this can create
a dependency cycle where a range requires a snapshot to split (because it
can't achieve an up-to-date quorum without it) but isn't allowed to perform
a snapshot until its size is reduced below the threshold. This can result
in unavailability even when a majority of replicas remain live.

Currently, we still need this snapshot size limit because unbounded snapshots
can result in OOM errors that crash entire nodes. However, once snapshots
are streamed from disk to disk, never needing to buffer in-memory on the
sending or receiving side, we should be able to remove any snapshot size
limit (see cockroachdb#16954).

As a holdover, this change introduces a `permitLargeSnapshots` flag on a
replica which is set when the replica is too large to snapshot but observes
splits failing. When set, the flag allows snapshots to ignore the size
limit until the snapshot goes through and splits are able to succeed
again.

Release note (bug fix): Fixed a scenario where a range that is too big
to snapshot can lose availability even with a majority of nodes alive.
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Dec 15, 2017
In a privately reported user issue, we've seen that [our attempts](cockroachdb#7788)
at [preventing large snapshots](cockroachdb#7581)
can result in replica unavailability. Our current approach to limiting
large snapshots assumes is that its ok to block snapshots indefinitely
while waiting for a range to first split. Unfortunately, this can create
a dependency cycle where a range requires a snapshot to split (because it
can't achieve an up-to-date quorum without it) but isn't allowed to perform
a snapshot until its size is reduced below the threshold. This can result
in unavailability even when a majority of replicas remain live.

Currently, we still need this snapshot size limit because unbounded snapshots
can result in OOM errors that crash entire nodes. However, once snapshots
are streamed from disk to disk, never needing to buffer in-memory on the
sending or receiving side, we should be able to remove any snapshot size
limit (see cockroachdb#16954).

As a holdover, this change introduces a `permitLargeSnapshots` flag on a
replica which is set when the replica is too large to snapshot but observes
splits failing. When set, the flag allows snapshots to ignore the size
limit until the snapshot goes through and splits are able to succeed
again.

Release note (bug fix): Fixed a scenario where a range that is too big
to snapshot can lose availability even with a majority of nodes alive.
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Dec 15, 2017
In a privately reported user issue, we've seen that [our attempts](cockroachdb#7788)
at [preventing large snapshots](cockroachdb#7581)
can result in replica unavailability. Our current approach to limiting
large snapshots assumes is that its ok to block snapshots indefinitely
while waiting for a range to first split. Unfortunately, this can create
a dependency cycle where a range requires a snapshot to split (because it
can't achieve an up-to-date quorum without it) but isn't allowed to perform
a snapshot until its size is reduced below the threshold. This can result
in unavailability even when a majority of replicas remain live.

Currently, we still need this snapshot size limit because unbounded snapshots
can result in OOM errors that crash entire nodes. However, once snapshots
are streamed from disk to disk, never needing to buffer in-memory on the
sending or receiving side, we should be able to remove any snapshot size
limit (see cockroachdb#16954).

As a holdover, this change introduces a `permitLargeSnapshots` flag on a
replica which is set when the replica is too large to snapshot but observes
splits failing. When set, the flag allows snapshots to ignore the size
limit until the snapshot goes through and splits are able to succeed
again.

Release note (bug fix): Fixed a scenario where a range that is too big
to snapshot can lose availability even with a majority of nodes alive.
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Dec 20, 2017
In a privately reported user issue, we've seen that [our attempts](cockroachdb#7788)
at [preventing large snapshots](cockroachdb#7581)
can result in replica unavailability. Our current approach to limiting
large snapshots assumes is that its ok to block snapshots indefinitely
while waiting for a range to first split. Unfortunately, this can create
a dependency cycle where a range requires a snapshot to split (because it
can't achieve an up-to-date quorum without it) but isn't allowed to perform
a snapshot until its size is reduced below the threshold. This can result
in unavailability even when a majority of replicas remain live.

Currently, we still need this snapshot size limit because unbounded snapshots
can result in OOM errors that crash entire nodes. However, once snapshots
are streamed from disk to disk, never needing to buffer in-memory on the
sending or receiving side, we should be able to remove any snapshot size
limit (see cockroachdb#16954).

As a holdover, this change introduces a `permitLargeSnapshots` flag on a
replica which is set when the replica is too large to snapshot but observes
splits failing. When set, the flag allows snapshots to ignore the size
limit until the snapshot goes through and splits are able to succeed
again.

Release note (bug fix): Fixed a scenario where a range that is too big
to snapshot can lose availability even with a majority of nodes alive.
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this issue Apr 27, 2018
Fixes cockroachdb#16954.
Related to cockroachdb#25047.

This depends on the following two upstream changes to RockDB:
- facebook/rocksdb#3778
- facebook/rocksdb#3779

The change introduces a new snapshot strategy called "SST". This strategy
stream sst files consisting of all keys in a range from the sender to the
receiver. These sst files are then atomically ingested directly into RocksDB.
An important property of the strategy is that the amount of memory required
for a receiver using the strategy is constant with respect to the size of
a range, instead of linear as it is with the KV_BATCH strategy. This will
be critical for increasing the default range size and potentially for
increasing the number of concurrent snapshots allowed per node. The
strategy also seems to significantly speed up snapshots once ranges are
above a certain size (somewhere in the single digit MBs).

This is a WIP change. Before it can be merged it needs:
- to be cleaned up a bit
- more testing (unit test, testing knobs, maybe some chaos)
- proper version handling
- heuristic tuning
- decisions on questions like compactions after ingestion

Release note: None
@tbg tbg added the A-kv-distribution Relating to rebalancing and leasing. label May 15, 2018
@tbg tbg added the C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) label Jul 22, 2018
@nvanbenschoten nvanbenschoten modified the milestones: 2.1, 2.2 Aug 15, 2018
@petermattis petermattis removed this from the 2.2 milestone Oct 5, 2018
craig bot pushed a commit that referenced this issue Aug 9, 2019
38932: storage: build SSTs from KV_BATCH snapshot r=jeffrey-xiao a=jeffrey-xiao

Implements the SST snapshot strategy discussed in #16954 and partially implemented in #25134 and #38873, but only have the logic on the receiver side for ease of testing and compatibility. This PR also handles the complications of subsumed replicas that are not fully contained by the current replica.

The maximum number of SSTs created using this strategy is 4 + SR + 2 where SR is the number of subsumed replicas.

- Three SSTs get streamed from the sender (range local keys, replicated range-id local keys, and data keys)
- One SST is constructed for the unreplicated range-id local keys.
- One SST is constructed for every subsumed replica to clear the range-id local keys. These SSTs consists of one range deletion tombstone and one `RaftTombstone` key.
- A maximum of two SSTs for all subsumed replicas to account for the case of not fully contained subsumed replicas. Note that currently, subsumed replicas can have keys right of the current replica, but not left of, so there will be a maximum of one SST created for the range-local keys and one for the data keys. These SSTs consist of one range deletion tombstone.

This number can be further reduced to 3 + SR if we pass the file handles and sst writers from the receiving step to the application step. We can combine the SSTs of the unreplicated range id and replicated id, and the range local of the subsumed replicas and data SSTs of the subsumed replicas. We probably don't want to do this optimization since we'll have to undo this optimization if we start constructing the SSTs from the sender or start chunking large SSTs into smaller SSTs.

Blocked by facebook/rocksdb#5649.

# Test Plan

- [x] Testing knob to inspect SSTs before ingestion. Ensure that expected SSTs for subsumed replicas are ingested.
- [x] Unit tests for `SSTSnapshotStorage`.
 
# Metrics and Evaluation

One way to evaluate this change is the following steps:

1. Setup 3 node cluster
2. Set default Raft log truncation threshold to some low constant:
```go
defaultRaftLogTruncationThreshold = envutil.EnvOrDefaultInt64(
    "COCKROACH_RAFT_LOG_TRUNCATION_THRESHOLD", 128<<10 /* 128 KB */)
```
3. Set `range_min_bytes` to 0 and `range_max_bytes` to some large number.
4. Increase `kv.snapshot_recovery.max_rate` and `kv.snapshot_rebalance.max_rate` to some large number.
5. Disable load-based splitting.
6. Stop node 2.
7. Run an insert heavy workload (kv0) on the cluster.
8. Start node 2.
9. Time how long it takes for node 2 to have all the ranges.

Roachtest: https://gist.github.com/jeffrey-xiao/e69fcad04968822d603f6807ca77ef3b

We can have two independent variables

1. Fixed total data size (4000000 ops; ~3.81 GiB), variable number of splits
- 32 splits (~121 MiB ranges)
- 64 splits (~61.0 MiB ranges)
- 128 splits (~31.2 MiB ranges)
- 256 splits (~15.7 MiB ranges)
- 512 splits (~7.9 MiB ranges)
- 1024 splits (~3.9 MiB ranges)
2. Fixed number of splits (32), variable total data size
- 125000 (~ 3.7 MiB ranges)
- 250000 (~7.5 MiB ranges)
- 500000 (~15 MiB ranges)
- 1000000 (~30 MiB ranges)
- 2000000 (60 MiB ranges)
- 4000000 (121 MiB ranges)

# Fsync Chunk Size

The size of the SST chunk that we write before fsync-ing impacts how fast node 2 has all the ranges. I've experimented 32 splits and an median range size of 121 MB with no fsync-ing (~27s recovery), fsync-ing in 8 MB chunks (~30s recovery), fsync-ing in 2 MB chunks (~40s recovery), fsync-ing in 256 KB chunks (~42s recovery). The default bulk sst sync rate is 2MB and #20352 sets `bytes_per_sync` to 512 KB, so something between those options is probably good. The reason we would want to fsync is to prevent the OS from accumulating such a large buffer that it blocks unrelated small/fast writes for a long time when it flushes.

# Impact on Foreground Traffic

For testing the impact on foreground traffic, I ran kv0 on a four node cluster with the merge queue and split queue disabled and starting with a constant number of splits. After 5 minutes, I decommissioned node 1 so its replicas would drain to other nodes using snapshots.

Roachtest: https://gist.github.com/jeffrey-xiao/5d9443a37b0929884aca927f9c320b6c

**Average Range Size of 3 MiB**
- [Before](https://user-images.githubusercontent.com/8853434/62398633-41a2bb00-b547-11e9-9e3d-747ee724943b.png)
- [After](https://user-images.githubusercontent.com/8853434/62398634-41a2bb00-b547-11e9-85e7-445b7989d173.png)

**Average Range Size of 32 MiB**
- [Before](https://user-images.githubusercontent.com/8853434/62398631-410a2480-b547-11e9-9019-86d3bd2e6f73.png)
- [After](https://user-images.githubusercontent.com/8853434/62398632-410a2480-b547-11e9-9513-8763e132e76b.png)

**Average Range Size 128 MiB**
- [Before](https://user-images.githubusercontent.com/8853434/62398558-15873a00-b547-11e9-8ab6-2e8e9bae658c.png)
- [After](https://user-images.githubusercontent.com/8853434/62398559-15873a00-b547-11e9-9c72-b3e90fce1acc.png)

We see p99 latency wins for larger range sizes and comparable performance for smaller range sizes.

Release note (performance improvement): Snapshots sent between replicas are now applied more performantly and use less memory.

Co-authored-by: Jeffrey Xiao <jeffrey.xiao1998@gmail.com>
@jeffrey-xiao
Copy link
Contributor

Closed by #38932.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-kv-distribution Relating to rebalancing and leasing. C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception)
Projects
None yet
6 participants