-
Notifications
You must be signed in to change notification settings - Fork 457
/
compaction.rs
463 lines (400 loc) · 14.3 KB
/
compaction.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
//! New compaction implementation. The algorithm itself is implemented in the
//! compaction crate. This file implements the callbacks and structs that allow
//! the algorithm to drive the process.
//!
//! The old legacy algorithm is implemented directly in `timeline.rs`.
use std::ops::{Deref, Range};
use std::sync::Arc;
use super::Timeline;
use async_trait::async_trait;
use fail::fail_point;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use crate::context::RequestContext;
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
use crate::tenant::timeline::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::DeltaLayer;
use crate::tenant::PageReconstructError;
use crate::ZERO_PAGE;
use crate::keyspace::KeySpace;
use crate::repository::Key;
use utils::lsn::Lsn;
use pageserver_compaction::helpers::overlaps_with;
use pageserver_compaction::interface::*;
use super::CompactionError;
impl Timeline {
/// Entry point for new tiered compaction algorithm.
///
/// All the real work is in the implementation in the pageserver_compaction
/// crate. The code here would apply to any algorithm implemented by the
/// same interface, but tiered is the only one at the moment.
///
/// TODO: cancellation
pub(crate) async fn compact_tiered(
self: &Arc<Self>,
_cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
let fanout = self.get_compaction_threshold() as u64;
let target_file_size = self.get_checkpoint_distance();
// Find the top of the historical layers
let end_lsn = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let l0_deltas = layers.get_level0_deltas()?;
drop(guard);
// As an optimization, if we find that there are too few L0 layers,
// bail out early. We know that the compaction algorithm would do
// nothing in that case.
if l0_deltas.len() < fanout as usize {
// doesn't need compacting
return Ok(());
}
l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
};
// Is the timeline being deleted?
if self.is_stopping() {
trace!("Dropping out of compaction on timeline shutdown");
return Err(CompactionError::ShuttingDown);
}
let keyspace = self.collect_keyspace(end_lsn, ctx).await?;
let mut adaptor = TimelineAdaptor::new(self, (end_lsn, keyspace));
pageserver_compaction::compact_tiered::compact_tiered(
&mut adaptor,
end_lsn,
target_file_size,
fanout,
ctx,
)
.await?;
adaptor.flush_updates().await?;
Ok(())
}
}
struct TimelineAdaptor {
timeline: Arc<Timeline>,
keyspace: (Lsn, KeySpace),
new_deltas: Vec<ResidentLayer>,
new_images: Vec<ResidentLayer>,
layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
}
impl TimelineAdaptor {
pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
Self {
timeline: timeline.clone(),
keyspace,
new_images: Vec::new(),
new_deltas: Vec::new(),
layers_to_delete: Vec::new(),
}
}
pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
let layers_to_delete = {
let guard = self.timeline.layers.read().await;
self.layers_to_delete
.iter()
.map(|x| guard.get_from_desc(x))
.collect::<Vec<Layer>>()
};
self.timeline
.finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
.await?;
self.new_images.clear();
self.new_deltas.clear();
self.layers_to_delete.clear();
Ok(())
}
}
#[derive(Clone)]
struct ResidentDeltaLayer(ResidentLayer);
#[derive(Clone)]
struct ResidentImageLayer(ResidentLayer);
#[async_trait]
impl CompactionJobExecutor for TimelineAdaptor {
type Key = crate::repository::Key;
type Layer = OwnArc<PersistentLayerDesc>;
type DeltaLayer = ResidentDeltaLayer;
type ImageLayer = ResidentImageLayer;
type RequestContext = crate::context::RequestContext;
async fn get_layers(
&mut self,
key_range: &Range<Key>,
lsn_range: &Range<Lsn>,
_ctx: &RequestContext,
) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
self.flush_updates().await?;
let guard = self.timeline.layers.read().await;
let layer_map = guard.layer_map();
let result = layer_map
.iter_historic_layers()
.filter(|l| {
overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
})
.map(OwnArc)
.collect();
Ok(result)
}
async fn get_keyspace(
&mut self,
key_range: &Range<Key>,
lsn: Lsn,
_ctx: &RequestContext,
) -> anyhow::Result<Vec<Range<Key>>> {
if lsn == self.keyspace.0 {
Ok(pageserver_compaction::helpers::intersect_keyspace(
&self.keyspace.1.ranges,
key_range,
))
} else {
// The current compaction implementatin only ever requests the key space
// at the compaction end LSN.
anyhow::bail!("keyspace not available for requested lsn");
}
}
async fn downcast_delta_layer(
&self,
layer: &OwnArc<PersistentLayerDesc>,
) -> anyhow::Result<Option<ResidentDeltaLayer>> {
// this is a lot more complex than a simple downcast...
if layer.is_delta() {
let l = {
let guard = self.timeline.layers.read().await;
guard.get_from_desc(layer)
};
let result = l.download_and_keep_resident().await?;
Ok(Some(ResidentDeltaLayer(result)))
} else {
Ok(None)
}
}
async fn create_image(
&mut self,
lsn: Lsn,
key_range: &Range<Key>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
Ok(self.create_image_impl(lsn, key_range, ctx).await?)
}
async fn create_delta(
&mut self,
lsn_range: &Range<Lsn>,
key_range: &Range<Key>,
input_layers: &[ResidentDeltaLayer],
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
let mut all_entries = Vec::new();
for dl in input_layers.iter() {
all_entries.extend(dl.load_keys(ctx).await?);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
let mut writer = DeltaLayerWriter::new(
self.timeline.conf,
self.timeline.timeline_id,
self.timeline.tenant_shard_id,
key_range.start,
lsn_range.clone(),
)
.await?;
let mut dup_values = 0;
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let mut prev: Option<(Key, Lsn)> = None;
for &DeltaEntry {
key, lsn, ref val, ..
} in all_entries.iter()
{
if prev == Some((key, lsn)) {
// This is a duplicate. Skip it.
//
// It can happen if compaction is interrupted after writing some
// layers but not all, and we are compacting the range again.
// The calculations in the algorithm assume that there are no
// duplicates, so the math on targeted file size is likely off,
// and we will create smaller files than expected.
dup_values += 1;
continue;
}
let value = val.load(ctx).await?;
writer.put_value(key, lsn, value).await?;
prev = Some((key, lsn));
}
if dup_values > 0 {
warn!("delta layer created with {} duplicate values", dup_values);
}
fail_point!("delta-layer-writer-fail-before-finish", |_| {
Err(anyhow::anyhow!(
"failpoint delta-layer-writer-fail-before-finish"
))
});
let new_delta_layer = writer
.finish(prev.unwrap().0.next(), &self.timeline)
.await?;
self.new_deltas.push(new_delta_layer);
Ok(())
}
async fn delete_layer(
&mut self,
layer: &OwnArc<PersistentLayerDesc>,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
self.layers_to_delete.push(layer.clone().0);
Ok(())
}
}
impl TimelineAdaptor {
async fn create_image_impl(
&mut self,
lsn: Lsn,
key_range: &Range<Key>,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
let timer = self.timeline.metrics.create_images_time_histo.start_timer();
let mut image_layer_writer = ImageLayerWriter::new(
self.timeline.conf,
self.timeline.timeline_id,
self.timeline.tenant_shard_id,
key_range,
lsn,
)
.await?;
fail_point!("image-layer-writer-fail-before-finish", |_| {
Err(PageReconstructError::Other(anyhow::anyhow!(
"failpoint image-layer-writer-fail-before-finish"
)))
});
let keyspace_ranges = self.get_keyspace(key_range, lsn, ctx).await?;
for range in &keyspace_ranges {
let mut key = range.start;
while key < range.end {
let img = match self.timeline.get(key, lsn, ctx).await {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(err);
}
}
};
image_layer_writer.put_image(key, img).await?;
key = key.next();
}
}
let image_layer = image_layer_writer.finish(&self.timeline).await?;
self.new_images.push(image_layer);
timer.stop_and_record();
Ok(())
}
}
impl CompactionRequestContext for crate::context::RequestContext {}
#[derive(Debug, Clone)]
pub struct OwnArc<T>(pub Arc<T>);
impl<T> Deref for OwnArc<T> {
type Target = <Arc<T> as Deref>::Target;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> AsRef<T> for OwnArc<T> {
fn as_ref(&self) -> &T {
self.0.as_ref()
}
}
impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
fn key_range(&self) -> &Range<Key> {
&self.key_range
}
fn lsn_range(&self) -> &Range<Lsn> {
&self.lsn_range
}
fn file_size(&self) -> u64 {
self.file_size
}
fn short_id(&self) -> std::string::String {
self.as_ref().short_id().to_string()
}
fn is_delta(&self) -> bool {
self.as_ref().is_delta()
}
}
impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
fn key_range(&self) -> &Range<Key> {
&self.layer_desc().key_range
}
fn lsn_range(&self) -> &Range<Lsn> {
&self.layer_desc().lsn_range
}
fn file_size(&self) -> u64 {
self.layer_desc().file_size
}
fn short_id(&self) -> std::string::String {
self.layer_desc().short_id().to_string()
}
fn is_delta(&self) -> bool {
true
}
}
use crate::tenant::timeline::DeltaEntry;
impl CompactionLayer<Key> for ResidentDeltaLayer {
fn key_range(&self) -> &Range<Key> {
&self.0.layer_desc().key_range
}
fn lsn_range(&self) -> &Range<Lsn> {
&self.0.layer_desc().lsn_range
}
fn file_size(&self) -> u64 {
self.0.layer_desc().file_size
}
fn short_id(&self) -> std::string::String {
self.0.layer_desc().short_id().to_string()
}
fn is_delta(&self) -> bool {
true
}
}
#[async_trait]
impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
type DeltaEntry<'a> = DeltaEntry<'a>;
async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
self.0.load_keys(ctx).await
}
}
impl CompactionLayer<Key> for ResidentImageLayer {
fn key_range(&self) -> &Range<Key> {
&self.0.layer_desc().key_range
}
fn lsn_range(&self) -> &Range<Lsn> {
&self.0.layer_desc().lsn_range
}
fn file_size(&self) -> u64 {
self.0.layer_desc().file_size
}
fn short_id(&self) -> std::string::String {
self.0.layer_desc().short_id().to_string()
}
fn is_delta(&self) -> bool {
false
}
}
impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}