Skip to content

Commit

Permalink
finish full tutorial
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <iskyzh@gmail.com>
  • Loading branch information
skyzh committed Jan 30, 2024
1 parent 417e81e commit a6c32e8
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 23 deletions.
45 changes: 28 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ cargo x book
If you changed public API in the reference solution, you might also need to synchronize it to the starter crate.
To do this, use `cargo x sync`.

## Structure
## Code Structure

* mini-lsm: the final solution code for <= week 2
* mini-lsm-mvcc: the final solution code for week 3 MVCC
Expand All @@ -70,27 +70,38 @@ cargo run --bin compaction-simulator-ref
cargo run --bin compaction-simulator-mvcc-ref
```

## Progress
## Tutorial Structure

We are working on chapter 3 and more test cases for all existing contents.
We have 3 weeks + 1 extra week (in progress) for this tutorial.

* Week 1: Storage Format + Engine Skeleton
* Week 2: Compaction and Persistence
* Week 3: Multi-Version Concurrency Control
* The Extra Week / Rest of Your Life: Optimizations (unlikely to be available in 2024...)

✅: Finished \
🚧: WIP and will likely be available soon

| Week + Chapter | Topic | Solution | Starter Code | Writeup |
| -------------- | ----------------------------------------------- | -------- | ------------ | ------- |
| 3.1 | Timestamp Key Encoding ||||
| 3.2 | Snapshot Read - Blocks, Memtables, and SSTs ||||
| 3.3 | Snapshot Read - Engine Read Path ||||
| 3.4 | Watermark and Garbage Collection ||||
| 3.5 | Transactions and Optimistic Concurrency Control ||||
| 3.6 | Serializable Snapshot Isolation ||||
| 3.7 | Compaction Filter | 🚧 | | |
* The Extra Week / Rest of Your Life: Optimizations (unlikely to be available in 2024...)

| Week + Chapter | Topic |
| -------------- | ----------------------------------------------------------- |
| 1.1 | Memtable |
| 1.2 | Merge Iterator |
| 1.3 | Block |
| 1.4 | Sorted String Table (SST) |
| 1.5 | Read Path |
| 1.6 | Write Path |
| 1.7 | SST Optimizations: Prefix Key Encoding + Bloom Filters |
| 2.1 | Compaction Implementation |
| 2.2 | Simple Compaction Strategy (Traditional Leveled Compaction) |
| 2.3 | Tiered Compaction Strategy (RocksDB Universal Compaction) |
| 2.4 | Leveled Compaction Strategy (RocksDB Leveled Compaction) |
| 2.5 | Manifest |
| 2.6 | Write-Ahead Log (WAL) |
| 2.7 | Batch Write and Checksums |
| 3.1 | Timestamp Key Encoding |
| 3.2 | Snapshot Read - Memtables and Timestamps |
| 3.3 | Snapshot Read - Transaction API |
| 3.4 | Watermark and Garbage Collection |
| 3.5 | Transactions and Optimistic Concurrency Control |
| 3.6 | Serializable Snapshot Isolation |
| 3.7 | Compaction Filters |

## License

Expand Down
2 changes: 1 addition & 1 deletion mini-lsm-book/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
- [Watermark and GC](./week3-04-watermark.md)
- [Transaction and OCC](./week3-05-txn-occ.md)
- [Serializable Snapshot Isolation](./week3-06-serializable.md)
- [Snack Time: Compaction Filter (WIP)](./week3-07-compaction-filter.md)
- [Snack Time: Compaction Filters](./week3-07-compaction-filter.md)
- [The Rest of Your Life (TBD)](./week4-overview.md)

---
Expand Down
44 changes: 43 additions & 1 deletion mini-lsm-book/src/week3-07-compaction-filter.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,46 @@
# Snack Time: Compaction Filter
# Snack Time: Compaction Filters

Congratulations! You made it there! In the previous chapter, you made your LSM engine multi-version capable, and the users can use transaction APIs to interact with your storage engine. At the end of this week, we will implement some easy but important features of the storage engine. Welcome to Mini-LSM's week 3 snack time!

In this chapter, we will generalize our compaction garbage collection logic to become compaction filters.

For now, our compaction will simply retain the keys above the watermark and the latest version of the keys below the watermark. We can add some magic to the compaction process to help the user collect some unused data automatically as a background job.

Consider a case that the user uses Mini-LSM to store database tables. Each row in the table are prefixed with the table name. For example,

```
table1_key1 -> row
table1_key2 -> row
table1_key3 -> row
table2_key1 -> row
table2_key2 -> row
```

Now the user executes `DROP TABLE table1`. The engine will need to clean up all the data beginning with `table1`.

There are a lot of ways to achieve the goal. The user of Mini-LSM can scan all the keys beginning with `table1` and requests the engine to delete it. However, scanning a very large database might be slow, and it will generate the same number of delete tombstones as the existing keys. Therefore, scan-and-delete will not free up the space occupied by the dropped table -- instead, it will add more data to the engine and the space can only be reclaimed when the tombstones reach the bottom level of the engine.

Or, they can create column families (we will talk about this in *rest of your life* chapter). They store each table in a column family, which is a standalone LSM state, and directly remove the SST files corresponding to the column family when the user drop the table.

In this tutorial, we will implement the third approach: compaction filters. Compaction filters can be dynamically added to the engine at runtime. During the compaction, if a key matching the compaction filter is found, we can silently remove it in the background. Therefore, the user can attach a compaction filter of `prefix=table1` to the engine, and all these keys will be removed during compaction.

## Task 1: Compaction Filter

In this task, you will need to modify:

```
src/compact.rs
```

You can iterate all compaction filters in `LsmStorageInner::compaction_filters`. If the first version of the key below watermark matches the compaction filter, simply remove it instead of keeping it in the SST file.

To run test cases,

```
cargo x copy-test --week 3 --day 7
cargo x scheck
```

You can assume that the user will not get the keys within the prefix filter range. And, they will not scan the keys in the prefix range. Therefore, it is okay to return a wrong value when a user requests the keys in the prefix filter range (i.e., undefined behavior).

{{#include copyright.md}}
23 changes: 19 additions & 4 deletions mini-lsm-mvcc/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::iterators::merge_iterator::MergeIterator;
use crate::iterators::two_merge_iterator::TwoMergeIterator;
use crate::iterators::StorageIterator;
use crate::key::KeySlice;
use crate::lsm_storage::{LsmStorageInner, LsmStorageState};
use crate::lsm_storage::{CompactionFilter, LsmStorageInner, LsmStorageState};
use crate::manifest::ManifestRecord;
use crate::table::{SsTable, SsTableBuilder, SsTableIterator};

Expand Down Expand Up @@ -122,7 +122,8 @@ impl LsmStorageInner {
let watermark = self.mvcc().watermark();
let mut last_key = Vec::<u8>::new();
let mut first_key_below_watermark = false;
while iter.is_valid() {
let compaction_filters = self.compaction_filters.lock().clone();
'outer: while iter.is_valid() {
if builder.is_none() {
builder = Some(SsTableBuilder::new(self.options.block_size));
}
Expand All @@ -144,12 +145,26 @@ impl LsmStorageInner {
continue;
}

if same_as_last_key && iter.key().ts() <= watermark {
if !first_key_below_watermark {
if iter.key().ts() <= watermark {
if same_as_last_key && !first_key_below_watermark {
iter.next()?;
continue;
}

first_key_below_watermark = false;

if !compaction_filters.is_empty() {
for filter in &compaction_filters {
match filter {
CompactionFilter::Prefix(x) => {
if iter.key().key_ref().starts_with(x) {
iter.next()?;
continue 'outer;
}
}
}
}
}
}

let builder_inner = builder.as_mut().unwrap();
Expand Down
2 changes: 2 additions & 0 deletions mini-lsm-mvcc/src/iterators/two_merge_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ impl<

fn key(&self) -> A::KeyType<'_> {
if self.choose_a {
debug_assert!(self.a.is_valid());
self.a.key()
} else {
debug_assert!(self.b.is_valid());
self.b.key()
}
}
Expand Down
16 changes: 16 additions & 0 deletions mini-lsm-mvcc/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ fn key_within(user_key: &[u8], table_begin: KeySlice, table_end: KeySlice) -> bo
table_begin.key_ref() <= user_key && user_key <= table_end.key_ref()
}

#[derive(Clone, Debug)]
pub enum CompactionFilter {
Prefix(Bytes),
}

/// The storage interface of the LSM tree.
pub(crate) struct LsmStorageInner {
pub(crate) state: Arc<RwLock<Arc<LsmStorageState>>>,
Expand All @@ -160,6 +165,7 @@ pub(crate) struct LsmStorageInner {
pub(crate) compaction_controller: CompactionController,
pub(crate) manifest: Option<Manifest>,
pub(crate) mvcc: Option<LsmMvccInner>,
pub(crate) compaction_filters: Arc<Mutex<Vec<CompactionFilter>>>,
}

/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
Expand Down Expand Up @@ -243,6 +249,10 @@ impl MiniLsm {
}))
}

pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) {
self.inner.add_compaction_filter(compaction_filter)
}

pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
self.inner.get(key)
}
Expand Down Expand Up @@ -431,12 +441,18 @@ impl LsmStorageInner {
manifest: Some(manifest),
options: options.into(),
mvcc: Some(LsmMvccInner::new(last_commit_ts)),
compaction_filters: Arc::new(Mutex::new(Vec::new())),
};
storage.sync_dir()?;

Ok(storage)
}

pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) {
let mut compaction_filters = self.compaction_filters.lock();
compaction_filters.push(compaction_filter);
}

pub fn sync(&self) -> Result<()> {
self.state.read().memtable.sync_wal()
}
Expand Down
1 change: 1 addition & 0 deletions mini-lsm-mvcc/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ mod week3_day3;
mod week3_day4;
mod week3_day5;
mod week3_day6;
mod week3_day7;
70 changes: 70 additions & 0 deletions mini-lsm-mvcc/src/tests/week3_day7.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use bytes::Bytes;
use tempfile::tempdir;

use crate::{
compact::CompactionOptions,
lsm_storage::{CompactionFilter, LsmStorageOptions, MiniLsm, WriteBatchRecord},
};

use super::harness::{check_iter_result_by_key, construct_merge_iterator_over_storage};

#[test]
fn test_task3_mvcc_compaction() {
let dir = tempdir().unwrap();
let options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction);
let storage = MiniLsm::open(&dir, options.clone()).unwrap();
storage
.write_batch(&[
WriteBatchRecord::Put("table1_a", "1"),
WriteBatchRecord::Put("table1_b", "1"),
WriteBatchRecord::Put("table1_c", "1"),
WriteBatchRecord::Put("table2_a", "1"),
WriteBatchRecord::Put("table2_b", "1"),
WriteBatchRecord::Put("table2_c", "1"),
])
.unwrap();
storage.force_flush().unwrap();
let snapshot0 = storage.new_txn().unwrap();
storage
.write_batch(&[
WriteBatchRecord::Put("table1_a", "2"),
WriteBatchRecord::Del("table1_b"),
WriteBatchRecord::Put("table1_c", "2"),
WriteBatchRecord::Put("table2_a", "2"),
WriteBatchRecord::Del("table2_b"),
WriteBatchRecord::Put("table2_c", "2"),
])
.unwrap();
storage.force_flush().unwrap();
storage.add_compaction_filter(CompactionFilter::Prefix(Bytes::from("table2_")));
storage.force_full_compaction().unwrap();

let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read());
check_iter_result_by_key(
&mut iter,
vec![
(Bytes::from("table1_a"), Bytes::from("2")),
(Bytes::from("table1_a"), Bytes::from("1")),
(Bytes::from("table1_b"), Bytes::new()),
(Bytes::from("table1_b"), Bytes::from("1")),
(Bytes::from("table1_c"), Bytes::from("2")),
(Bytes::from("table1_c"), Bytes::from("1")),
(Bytes::from("table2_a"), Bytes::from("2")),
(Bytes::from("table2_b"), Bytes::new()),
(Bytes::from("table2_c"), Bytes::from("2")),
],
);

drop(snapshot0);

storage.force_full_compaction().unwrap();

let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read());
check_iter_result_by_key(
&mut iter,
vec![
(Bytes::from("table1_a"), Bytes::from("2")),
(Bytes::from("table1_c"), Bytes::from("2")),
],
);
}
16 changes: 16 additions & 0 deletions mini-lsm-starter/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ impl LsmStorageOptions {
}
}

#[derive(Clone, Debug)]
pub enum CompactionFilter {
Prefix(Bytes),
}

/// The storage interface of the LSM tree.
pub(crate) struct LsmStorageInner {
pub(crate) state: Arc<RwLock<Arc<LsmStorageState>>>,
Expand All @@ -124,6 +129,7 @@ pub(crate) struct LsmStorageInner {
pub(crate) compaction_controller: CompactionController,
pub(crate) manifest: Option<Manifest>,
pub(crate) mvcc: Option<LsmMvccInner>,
pub(crate) compaction_filters: Arc<Mutex<Vec<CompactionFilter>>>,
}

/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
Expand Down Expand Up @@ -176,6 +182,10 @@ impl MiniLsm {
self.inner.write_batch(batch)
}

pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) {
self.inner.add_compaction_filter(compaction_filter)
}

pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
self.inner.get(key)
}
Expand Down Expand Up @@ -252,6 +262,7 @@ impl LsmStorageInner {
manifest: None,
options: options.into(),
mvcc: None,
compaction_filters: Arc::new(Mutex::new(Vec::new())),
};

Ok(storage)
Expand All @@ -261,6 +272,11 @@ impl LsmStorageInner {
unimplemented!()
}

pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) {
let mut compaction_filters = self.compaction_filters.lock();
compaction_filters.push(compaction_filter);
}

/// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
pub fn get(&self, _key: &[u8]) -> Result<Option<Bytes>> {
unimplemented!()
Expand Down
Loading

0 comments on commit a6c32e8

Please sign in to comment.