Skip to content

Commit

Permalink
implement 2.7
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <iskyzh@gmail.com>
  • Loading branch information
skyzh committed Jan 25, 2024
1 parent 8dbaf54 commit 89acc23
Show file tree
Hide file tree
Showing 16 changed files with 237 additions and 81 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ We are working on chapter 3 and more test cases for all existing contents.

| Week + Chapter | Topic | Solution | Starter Code | Writeup |
| -------------- | ----------------------------------------------- | -------- | ------------ | ------- |
| 2.7 | Batch Write + Checksum | 🚧 | | |
| 2.7 | Batch Write + Checksum | 🚧 | 🚧 | |
| 3.1 | Timestamp Key Encoding | 🚧 | | |
| 3.2 | Snapshot Read - Blocks, Memtables, and SSTs | | | |
| 3.3 | Snapshot Read - Engine Read Path | | | |
Expand Down
4 changes: 4 additions & 0 deletions mini-lsm-book/src/week2-07-snacks.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ In this chapter, you will:

We do not provide reference answers to the questions, and feel free to discuss about them in the Discord community.

## Bonus Tasks

* **Try Recovering**. If there is a checksum error, open the database in a safe mode so that no writes can be performed and non-corrupted data can still be retrieved.

{{#include copyright.md}}
1 change: 1 addition & 0 deletions mini-lsm-mvcc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ crossbeam-channel = "0.5.11"
serde_json = { version = "1.0" }
serde = { version = "1.0", features = ["derive"] }
farmhash = "1"
crc32fast = "1.3.2"

[dev-dependencies]
tempfile = "3"
Expand Down
68 changes: 43 additions & 25 deletions mini-lsm-mvcc/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ pub struct LsmStorageState {
pub sstables: HashMap<usize, Arc<SsTable>>,
}

pub enum WriteBatchRecord<T: AsRef<[u8]>> {
Put(T, T),
Del(T),
}

impl LsmStorageState {
fn create(options: &LsmStorageOptions) -> Self {
let levels = match &options.compaction_options {
Expand Down Expand Up @@ -234,6 +239,10 @@ impl MiniLsm {
self.inner.get(key)
}

pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
self.inner.write_batch(batch)
}

pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.inner.put(key, value)
}
Expand Down Expand Up @@ -486,37 +495,46 @@ impl LsmStorageInner {
Ok(None)
}

/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
assert!(!value.is_empty(), "value cannot be empty");
assert!(!key.is_empty(), "key cannot be empty");

let size;
{
let guard = self.state.read();
guard.memtable.put(key, value)?;
size = guard.memtable.approximate_size();
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
for record in batch {
match record {
WriteBatchRecord::Del(key) => {
let key = key.as_ref();
assert!(!key.is_empty(), "key cannot be empty");
let size;
{
let guard = self.state.read();
guard.memtable.put(key, b"")?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
}
WriteBatchRecord::Put(key, value) => {
let key = key.as_ref();
let value = value.as_ref();
assert!(!key.is_empty(), "key cannot be empty");
assert!(!value.is_empty(), "value cannot be empty");
let size;
{
let guard = self.state.read();
guard.memtable.put(key, value)?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
}
}
}

self.try_freeze(size)?;

Ok(())
}

/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.write_batch(&[WriteBatchRecord::Put(key, value)])
}

/// Remove a key from the storage by writing an empty value.
pub fn delete(&self, key: &[u8]) -> Result<()> {
assert!(!key.is_empty(), "key cannot be empty");

let size;
{
let guard = self.state.read();
guard.memtable.put(key, b"")?;
size = guard.memtable.approximate_size();
}

self.try_freeze(size)?;

Ok(())
self.write_batch(&[WriteBatchRecord::Del(key)])
}

fn try_freeze(&self, estimated_size: usize) -> Result<()> {
Expand Down
23 changes: 17 additions & 6 deletions mini-lsm-mvcc/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::io::{Read, Write};
use std::path::Path;
use std::sync::Arc;

use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use bytes::{Buf, BufMut};
use parking_lot::{Mutex, MutexGuard};
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;

use crate::compact::CompactionTask;

Expand Down Expand Up @@ -43,10 +43,18 @@ impl Manifest {
.context("failed to recover manifest")?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let stream = Deserializer::from_slice(&buf).into_iter::<ManifestRecord>();
let mut buf_ptr = buf.as_slice();
let mut records = Vec::new();
for x in stream {
records.push(x?);
while buf_ptr.has_remaining() {
let len = buf_ptr.get_u64();
let slice = &buf_ptr[..len as usize];
let json = serde_json::from_slice::<ManifestRecord>(slice)?;
buf_ptr.advance(len as usize);
let checksum = buf_ptr.get_u32();
if checksum != crc32fast::hash(slice) {
bail!("checksum mismatched!");
}
records.push(json);
}
Ok((
Self {
Expand All @@ -66,7 +74,10 @@ impl Manifest {

pub fn add_record_when_init(&self, record: ManifestRecord) -> Result<()> {
let mut file = self.file.lock();
let buf = serde_json::to_vec(&record)?;
let mut buf = serde_json::to_vec(&record)?;
let hash = crc32fast::hash(&buf);
file.write_all(&(buf.len() as u64).to_be_bytes())?;
buf.put_u32(hash);
file.write_all(&buf)?;
file.sync_all()?;
Ok(())
Expand Down
31 changes: 23 additions & 8 deletions mini-lsm-mvcc/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::fs::File;
use std::path::Path;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Result};
pub use builder::SsTableBuilder;
use bytes::{Buf, BufMut};
pub use iterator::SsTableIterator;
Expand All @@ -30,7 +30,7 @@ pub struct BlockMeta {
impl BlockMeta {
/// Encode block meta to a buffer.
pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec<u8>) {
let mut estimated_size = 0;
let mut estimated_size = std::mem::size_of::<u32>();
for meta in block_meta {
// The size of offset
estimated_size += std::mem::size_of::<u32>();
Expand All @@ -43,10 +43,12 @@ impl BlockMeta {
// The size of actual key
estimated_size += meta.last_key.raw_len();
}
estimated_size += std::mem::size_of::<u32>();
// Reserve the space to improve performance, especially when the size of incoming data is
// large
buf.reserve(estimated_size);
let original_len = buf.len();
buf.put_u32(block_meta.len() as u32);
for meta in block_meta {
buf.put_u32(meta.offset as u32);
buf.put_u16(meta.first_key.key_len() as u16);
Expand All @@ -56,13 +58,16 @@ impl BlockMeta {
buf.put_slice(meta.last_key.key_ref());
buf.put_u64(meta.last_key.ts());
}
buf.put_u32(crc32fast::hash(&buf[original_len + 4..]));
assert_eq!(estimated_size, buf.len() - original_len);
}

/// Decode block meta from a buffer.
pub fn decode_block_meta(mut buf: impl Buf) -> Vec<BlockMeta> {
pub fn decode_block_meta(mut buf: &[u8]) -> Result<Vec<BlockMeta>> {
let mut block_meta = Vec::new();
while buf.has_remaining() {
let num = buf.get_u32() as usize;
let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]);
for _ in 0..num {
let offset = buf.get_u32() as usize;
let first_key_len = buf.get_u16() as usize;
let first_key =
Expand All @@ -76,7 +81,11 @@ impl BlockMeta {
last_key,
});
}
block_meta
if buf.get_u32() != checksum {
bail!("meta checksum mismatched");
}

Ok(block_meta)
}
}

Expand Down Expand Up @@ -145,7 +154,7 @@ impl SsTable {
let raw_meta_offset = file.read(bloom_offset - 4, 4)?;
let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64;
let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?;
let block_meta = BlockMeta::decode_block_meta(&raw_meta[..]);
let block_meta = BlockMeta::decode_block_meta(&raw_meta[..])?;
Ok(Self {
file,
first_key: block_meta.first().unwrap().first_key.clone(),
Expand Down Expand Up @@ -184,10 +193,16 @@ impl SsTable {
.block_meta
.get(block_idx + 1)
.map_or(self.block_meta_offset, |x| x.offset);
let block_data = self
let block_len = offset_end - offset - 4;
let block_data_with_chksum: Vec<u8> = self
.file
.read(offset as u64, (offset_end - offset) as u64)?;
Ok(Arc::new(Block::decode(&block_data[..])))
let block_data = &block_data_with_chksum[..block_len];
let checksum = (&block_data_with_chksum[block_len..]).get_u32();
if checksum != crc32fast::hash(block_data) {
bail!("block checksum mismatched");
}
Ok(Arc::new(Block::decode(block_data)))
}

/// Read a block from disk, with block cache.
Expand Down
19 changes: 18 additions & 1 deletion mini-lsm-mvcc/src/wal.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::fs::{File, OpenOptions};
use std::hash::Hasher;
use std::io::{Read, Write};
use std::path::Path;
use std::sync::Arc;

use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use bytes::{Buf, BufMut, Bytes};
use crossbeam_skiplist::SkipMap;
use parking_lot::Mutex;
Expand Down Expand Up @@ -37,12 +38,21 @@ impl Wal {
file.read_to_end(&mut buf)?;
let mut rbuf: &[u8] = buf.as_slice();
while rbuf.has_remaining() {
let mut hasher = crc32fast::Hasher::new();
let key_len = rbuf.get_u16() as usize;
hasher.write_u16(key_len as u16);
let key = Bytes::copy_from_slice(&rbuf[..key_len]);
hasher.write(&key);
rbuf.advance(key_len);
let value_len = rbuf.get_u16() as usize;
hasher.write_u16(value_len as u16);
let value = Bytes::copy_from_slice(&rbuf[..value_len]);
hasher.write(&value);
rbuf.advance(value_len);
let checksum = rbuf.get_u32();
if hasher.finalize() != checksum {
bail!("checksum mismatch");
}
skiplist.insert(key, value);
}
Ok(Self {
Expand All @@ -54,10 +64,17 @@ impl Wal {
let mut file = self.file.lock();
let mut buf: Vec<u8> =
Vec::with_capacity(key.len() + value.len() + std::mem::size_of::<u16>());
let mut hasher = crc32fast::Hasher::new();
hasher.write_u16(key.len() as u16);
buf.put_u16(key.len() as u16);
hasher.write(key);
buf.put_slice(key);
hasher.write_u16(value.len() as u16);
buf.put_u16(value.len() as u16);
buf.put_slice(value);
hasher.write(value);
// add checksum: week 2 day 7
buf.put_u32(hasher.finalize());
file.write_all(&buf)?;
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions mini-lsm-starter/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl CompactionController {
}
}

#[derive(Debug, Clone)]
pub enum CompactionOptions {
/// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled
/// Compaction)
Expand Down
14 changes: 14 additions & 0 deletions mini-lsm-starter/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ pub struct LsmStorageState {
pub sstables: HashMap<usize, Arc<SsTable>>,
}

pub enum WriteBatchRecord<T: AsRef<[u8]>> {
Put(T, T),
Del(T),
}

impl LsmStorageState {
fn create(options: &LsmStorageOptions) -> Self {
let levels = match &options.compaction_options {
Expand Down Expand Up @@ -156,6 +161,10 @@ impl MiniLsm {
}))
}

pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
self.inner.write_batch(batch)
}

pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
self.inner.get(key)
}
Expand Down Expand Up @@ -245,6 +254,11 @@ impl LsmStorageInner {
unimplemented!()
}

/// Write a batch of data into the storage. Implement in week 2 day 7.
pub fn write_batch<T: AsRef<[u8]>>(&self, _batch: &[WriteBatchRecord<T>]) -> Result<()> {
unimplemented!()
}

/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
unimplemented!()
Expand Down
1 change: 1 addition & 0 deletions mini-lsm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ crossbeam-channel = "0.5.11"
serde_json = { version = "1.0" }
serde = { version = "1.0", features = ["derive"] }
farmhash = "1"
crc32fast = "1.3.2"

[dev-dependencies]
tempfile = "3"
Expand Down
Loading

0 comments on commit 89acc23

Please sign in to comment.