Skip to content

Commit

Permalink
fix: avoid leveled compaction crash when recovering from manifest (#63)
Browse files Browse the repository at this point in the history
* Fix: Avoid leveled copaction crash when recovering from manifest

* Also sort SSTs in manifest recovery

* Add `in_recovery` flag to `apply_compaction_result`

- Don't sort the SSTs inside `apply_compaction_result` if in recovery
  • Loading branch information
YangchenYe323 committed Jul 3, 2024
1 parent 2b527fd commit 77e15ef
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 27 deletions.
6 changes: 4 additions & 2 deletions mini-lsm-mvcc/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ impl CompactionController {
snapshot: &LsmStorageState,
task: &CompactionTask,
output: &[usize],
in_recovery: bool,
) -> (LsmStorageState, Vec<usize>) {
match (self, task) {
(CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => {
ctrl.apply_compaction_result(snapshot, task, output)
ctrl.apply_compaction_result(snapshot, task, output, in_recovery)
}
(CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => {
ctrl.apply_compaction_result(snapshot, task, output)
Expand Down Expand Up @@ -381,7 +382,8 @@ impl LsmStorageInner {
}
let (mut snapshot, files_to_remove) = self
.compaction_controller
.apply_compaction_result(&snapshot, &task, &output);
.apply_compaction_result(&snapshot, &task, &output, false);

let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
for file_to_remove in &files_to_remove {
let result = snapshot.sstables.remove(file_to_remove);
Expand Down
20 changes: 12 additions & 8 deletions mini-lsm-mvcc/src/compact/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl LeveledCompactionController {
snapshot: &LsmStorageState,
task: &LeveledCompactionTask,
output: &[usize],
in_recovery: bool,
) -> (LsmStorageState, Vec<usize>) {
let mut snapshot = snapshot.clone();
let mut files_to_remove = Vec::new();
Expand Down Expand Up @@ -216,14 +217,17 @@ impl LeveledCompactionController {
.collect::<Vec<_>>();
assert!(lower_level_sst_ids_set.is_empty());
new_lower_level_ssts.extend(output);
new_lower_level_ssts.sort_by(|x, y| {
snapshot
.sstables
.get(x)
.unwrap()
.first_key()
.cmp(snapshot.sstables.get(y).unwrap().first_key())
});
// Don't sort the SST IDs during recovery because actual SSTs are not loaded at that point
if !in_recovery {
new_lower_level_ssts.sort_by(|x, y| {
snapshot
.sstables
.get(x)
.unwrap()
.first_key()
.cmp(snapshot.sstables.get(y).unwrap().first_key())
});
}
snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts;
(snapshot, files_to_remove)
}
Expand Down
16 changes: 14 additions & 2 deletions mini-lsm-mvcc/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ impl LsmStorageInner {
memtables.insert(x);
}
ManifestRecord::Compaction(task, output) => {
let (new_state, _) =
compaction_controller.apply_compaction_result(&state, &task, &output);
let (new_state, _) = compaction_controller
.apply_compaction_result(&state, &task, &output, true);
// TODO: apply remove again
state = new_state;
next_sst_id =
Expand Down Expand Up @@ -400,6 +400,18 @@ impl LsmStorageInner {

next_sst_id += 1;

// Sort SSTs on each level
for (_id, ssts) in &mut state.levels {
ssts.sort_by(|x, y| {
state
.sstables
.get(x)
.unwrap()
.first_key()
.cmp(state.sstables.get(y).unwrap().first_key())
})
}

// recover memtables
if options.enable_wal {
let mut wal_cnt = 0;
Expand Down
8 changes: 6 additions & 2 deletions mini-lsm-starter/src/bin/compaction-simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,12 @@ fn main() {
.join(", ")
);
max_space = max_space.max(storage.file_list.len());
let (snapshot, del) =
controller.apply_compaction_result(&storage.snapshot, &task, &sst_ids);
let (snapshot, del) = controller.apply_compaction_result(
&storage.snapshot,
&task,
&sst_ids,
false,
);
storage.snapshot = snapshot;
storage.remove(&del);
println!("--- After Compaction ---");
Expand Down
3 changes: 2 additions & 1 deletion mini-lsm-starter/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ impl CompactionController {
snapshot: &LsmStorageState,
task: &CompactionTask,
output: &[usize],
in_recovery: bool,
) -> (LsmStorageState, Vec<usize>) {
match (self, task) {
(CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => {
ctrl.apply_compaction_result(snapshot, task, output)
ctrl.apply_compaction_result(snapshot, task, output, in_recovery)
}
(CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => {
ctrl.apply_compaction_result(snapshot, task, output)
Expand Down
1 change: 1 addition & 0 deletions mini-lsm-starter/src/compact/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl LeveledCompactionController {
_snapshot: &LsmStorageState,
_task: &LeveledCompactionTask,
_output: &[usize],
_in_recovery: bool,
) -> (LsmStorageState, Vec<usize>) {
unimplemented!()
}
Expand Down
6 changes: 4 additions & 2 deletions mini-lsm/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ impl CompactionController {
snapshot: &LsmStorageState,
task: &CompactionTask,
output: &[usize],
in_recovery: bool,
) -> (LsmStorageState, Vec<usize>) {
match (self, task) {
(CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => {
ctrl.apply_compaction_result(snapshot, task, output)
ctrl.apply_compaction_result(snapshot, task, output, in_recovery)
}
(CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => {
ctrl.apply_compaction_result(snapshot, task, output)
Expand Down Expand Up @@ -335,7 +336,8 @@ impl LsmStorageInner {
}
let (mut snapshot, files_to_remove) = self
.compaction_controller
.apply_compaction_result(&snapshot, &task, &output);
.apply_compaction_result(&snapshot, &task, &output, false);

let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
for file_to_remove in &files_to_remove {
let result = snapshot.sstables.remove(file_to_remove);
Expand Down
20 changes: 12 additions & 8 deletions mini-lsm/src/compact/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ impl LeveledCompactionController {
snapshot: &LsmStorageState,
task: &LeveledCompactionTask,
output: &[usize],
in_recovery: bool,
) -> (LsmStorageState, Vec<usize>) {
let mut snapshot = snapshot.clone();
let mut files_to_remove = Vec::new();
Expand Down Expand Up @@ -215,14 +216,17 @@ impl LeveledCompactionController {
.collect::<Vec<_>>();
assert!(lower_level_sst_ids_set.is_empty());
new_lower_level_ssts.extend(output);
new_lower_level_ssts.sort_by(|x, y| {
snapshot
.sstables
.get(x)
.unwrap()
.first_key()
.cmp(snapshot.sstables.get(y).unwrap().first_key())
});
// Don't sort the SST IDs during recovery because actual SSTs are not loaded at that point
if !in_recovery {
new_lower_level_ssts.sort_by(|x, y| {
snapshot
.sstables
.get(x)
.unwrap()
.first_key()
.cmp(snapshot.sstables.get(y).unwrap().first_key())
});
}
snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts;
(snapshot, files_to_remove)
}
Expand Down
16 changes: 14 additions & 2 deletions mini-lsm/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ impl LsmStorageInner {
memtables.insert(x);
}
ManifestRecord::Compaction(task, output) => {
let (new_state, _) =
compaction_controller.apply_compaction_result(&state, &task, &output);
let (new_state, _) = compaction_controller
.apply_compaction_result(&state, &task, &output, true);
// TODO: apply remove again
state = new_state;
next_sst_id =
Expand Down Expand Up @@ -395,6 +395,18 @@ impl LsmStorageInner {

next_sst_id += 1;

// Sort SSTs on each level
for (_id, ssts) in &mut state.levels {
ssts.sort_by(|x, y| {
state
.sstables
.get(x)
.unwrap()
.first_key()
.cmp(state.sstables.get(y).unwrap().first_key())
})
}

// recover memtables
if options.enable_wal {
let mut wal_cnt = 0;
Expand Down
72 changes: 72 additions & 0 deletions mini-lsm/src/tests/week2_day5.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::time::Duration;

use bytes::BufMut;
use tempfile::tempdir;

use crate::{
Expand Down Expand Up @@ -38,6 +41,64 @@ fn test_integration_simple() {
}));
}

/// Provision the storage such that base_level contains 2 SST files (target size is 2MB and each SST is 1MB).
/// This configuration has the effect that compaction will generate a new lower-level containing more than 1 SST files,
/// and leveled compaction should handle this situation correctly: These files might not be sorted by first-key and
/// should NOT be sorted inside the `apply_compaction_result` function, because we don't have any actual SST loaded at the
/// point where this function is called during manifest recovery.
#[test]
fn test_multiple_compacted_ssts_leveled() {
let compaction_options = CompactionOptions::Leveled(LeveledCompactionOptions {
level_size_multiplier: 4,
level0_file_num_compaction_trigger: 2,
max_levels: 2,
base_level_size_mb: 2,
});

let lsm_storage_options = LsmStorageOptions::default_for_week2_test(compaction_options.clone());

let dir = tempdir().unwrap();
let storage = MiniLsm::open(&dir, lsm_storage_options).unwrap();

// Insert approximately 10MB of data to ensure that at least one compaction is triggered by priority.
// Insert 500 key-value pairs where each pair is 2KB
for i in 0..500 {
let (key, val) = key_value_pair_with_target_size(i, 20 * 1024);
storage.put(&key, &val).unwrap();
}

let mut prev_snapshot = storage.inner.state.read().clone();
while {
std::thread::sleep(Duration::from_secs(1));
let snapshot = storage.inner.state.read().clone();
let to_cont = prev_snapshot.levels != snapshot.levels
|| prev_snapshot.l0_sstables != snapshot.l0_sstables;
prev_snapshot = snapshot;
to_cont
} {
println!("waiting for compaction to converge");
}

storage.close().unwrap();
assert!(storage.inner.state.read().memtable.is_empty());
assert!(storage.inner.state.read().imm_memtables.is_empty());

storage.dump_structure();
drop(storage);
dump_files_in_dir(&dir);

let storage = MiniLsm::open(
&dir,
LsmStorageOptions::default_for_week2_test(compaction_options.clone()),
)
.unwrap();

for i in 0..500 {
let (key, val) = key_value_pair_with_target_size(i, 20 * 1024);
assert_eq!(&storage.get(&key).unwrap().unwrap()[..], &val);
}
}

fn test_integration(compaction_options: CompactionOptions) {
let dir = tempdir().unwrap();
let storage = MiniLsm::open(
Expand Down Expand Up @@ -79,3 +140,14 @@ fn test_integration(compaction_options: CompactionOptions) {
assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"v20".as_slice());
assert_eq!(storage.get(b"2").unwrap(), None);
}

/// Create a key value pair where key and value are of target size in bytes
fn key_value_pair_with_target_size(seed: i32, target_size_byte: usize) -> (Vec<u8>, Vec<u8>) {
let mut key = vec![0; target_size_byte - 4];
key.put_i32(seed);

let mut val = vec![0; target_size_byte - 4];
val.put_i32(seed);

(key, val)
}

0 comments on commit 77e15ef

Please sign in to comment.