diff --git a/mini-lsm-book/src/week2-07-snacks.md b/mini-lsm-book/src/week2-07-snacks.md index 78e63b4b..94c9cc73 100644 --- a/mini-lsm-book/src/week2-07-snacks.md +++ b/mini-lsm-book/src/week2-07-snacks.md @@ -87,7 +87,7 @@ We will do a per-record checksum in the write-ahead log. To do this, you have tw * Generate a buffer of the key-value record, and use `crc32fast::hash` to compute the checksum at once. * Write one field at a time (e.g., key length, key slice), and use a `crc32fast::Hasher` to compute the checksum incrementally on each field. -This is up to your choice and you will need to *choose your own adventure*. The new WAL encoding should be like: +This is up to your choice and you will need to *choose your own adventure*. Both method should produce exactly the same result, as long as you handle little endian / big endian correctly. The new WAL encoding should be like: ``` | key_len | key | value_len | value | checksum | diff --git a/mini-lsm-book/src/week3-02-snapshot-read-part-1.md b/mini-lsm-book/src/week3-02-snapshot-read-part-1.md index 50d0db09..367cddcb 100644 --- a/mini-lsm-book/src/week3-02-snapshot-read-part-1.md +++ b/mini-lsm-book/src/week3-02-snapshot-read-part-1.md @@ -80,6 +80,12 @@ It should now store `(KeyBytes, Bytes)` and the return key type should be `KeySl Write-ahead log should now accept a key slice instead of a user key slice. When serializing and deserializing the WAL record, you should put timestamp into the WAL file and do checksum over the timestamp and all other fields you had before. +The WAL format is as follows: + +``` +| key_len (exclude ts len) (u16) | key | ts (u64) | value_len (u16) | value | checksum (u32) | +``` + **LsmStorageInner::get** Previously, we implement `get` as first probe the memtables and then scan the SSTs. Now that we change the memtable to use the new key-ts APIs, we will need to re-implement the `get` interface. The easiest way to do this is to create a merge iterator over everything we have -- memtables, immutable memtables, L0 SSTs, and other level SSTs, the same as what you have done in `scan`, except that we do a bloom filter filtering over the SSTs. diff --git a/mini-lsm-book/src/week3-05-txn-occ.md b/mini-lsm-book/src/week3-05-txn-occ.md index 0fbd6101..ff00ffa1 100644 --- a/mini-lsm-book/src/week3-05-txn-occ.md +++ b/mini-lsm-book/src/week3-05-txn-occ.md @@ -41,11 +41,40 @@ We assume that a transaction will only be used on a single thread. Once your tra Your commit implementation should simply collect all key-value pairs from the local storage and submit a write batch to the storage engine. +## Task 4: Atomic WAL + +In this task, you will need to modify: + +``` +src/wal.rs +src/mem_table.rs +``` + +Note that `commit` involves producing a write batch, and for now, the write batch does not guarantee atomicity. You will need to change the WAL implementation to produce a header and a footer for the write batch. + +The new WAL encoding is as follows: + +``` +| HEADER | BODY | FOOTER | +| u32 | u16 | var | u64 | u16 | var | ... | u32 | +| batch_size | key_len | key | ts | value_len | value | more key-value pairs ... | checksum | +``` + +`batch_size` is the size of the `BODY` section. `checksum` is the checksum for the `BODY` section. + +There are no test cases to verify your implementation. As long as you pass all existing test cases and implement the above WAL format, everything should be fine. + +You should implement `Wal::put_batch` and `MemTable::put_batch`. The original `put` function should treat the +single key-value pair as a batch. That is to say, at this point, your `put` function should call `put_batch`. + +A batch should be handled in the same mem table and the same WAL, even if it exceeds the mem table size limit. + ## Test Your Understanding * With all the things we have implemented up to this point, does the system satisfy snapshot isolation? If not, what else do we need to do to support snapshot isolation? (Note: snapshot isolation is different from serializable snapshot isolation we will talk about in the next chapter) * What if the user wants to batch import data (i.e., 1TB?) If they use the transaction API to do that, will you give them some advice? Is there any opportunity to optimize for this case? * What is optimistic concurrency control? What would the system be like if we implement pessimistic concurrency control instead in Mini-LSM? +* What happens if your system crashes and leave a corrupted WAL on the disk? How do you handle this situation? ## Bonus Tasks diff --git a/mini-lsm-mvcc/src/mem_table.rs b/mini-lsm-mvcc/src/mem_table.rs index b7e0d77e..d0fe5d94 100644 --- a/mini-lsm-mvcc/src/mem_table.rs +++ b/mini-lsm-mvcc/src/mem_table.rs @@ -122,16 +122,25 @@ impl MemTable { /// /// In week 1, day 1, simply put the key-value pair into the skipmap. /// In week 2, day 6, also flush the data to WAL. + /// In week 3, day 5, modify the function to use the batch API. pub fn put(&self, key: KeySlice, value: &[u8]) -> Result<()> { - let estimated_size = key.raw_len() + value.len(); - self.map.insert( - key.to_key_vec().into_key_bytes(), - Bytes::copy_from_slice(value), - ); + self.put_batch(&[(key, value)]) + } + + /// Implement this in week 3, day 5. + pub fn put_batch(&self, data: &[(KeySlice, &[u8])]) -> Result<()> { + let mut estimated_size = 0; + for (key, value) in data { + estimated_size += key.raw_len() + value.len(); + self.map.insert( + key.to_key_vec().into_key_bytes(), + Bytes::copy_from_slice(value), + ); + } self.approximate_size .fetch_add(estimated_size, std::sync::atomic::Ordering::Relaxed); if let Some(ref wal) = self.wal { - wal.put(key, value)?; + wal.put_batch(data)?; } Ok(()) } diff --git a/mini-lsm-mvcc/src/wal.rs b/mini-lsm-mvcc/src/wal.rs index bb7fab31..f333ce73 100644 --- a/mini-lsm-mvcc/src/wal.rs +++ b/mini-lsm-mvcc/src/wal.rs @@ -40,51 +40,71 @@ impl Wal { file.read_to_end(&mut buf)?; let mut rbuf: &[u8] = buf.as_slice(); while rbuf.has_remaining() { + let batch_size = rbuf.get_u32() as usize; + if rbuf.remaining() < batch_size { + bail!("incomplete WAL"); + } + let mut batch_buf = &rbuf[..batch_size]; + let mut kv_pairs = Vec::new(); let mut hasher = crc32fast::Hasher::new(); - let key_len = rbuf.get_u16() as usize; - hasher.write_u16(key_len as u16); - let key = Bytes::copy_from_slice(&rbuf[..key_len]); - hasher.write(&key); - rbuf.advance(key_len); - let ts = rbuf.get_u64(); - hasher.write_u64(ts); - let value_len = rbuf.get_u16() as usize; - hasher.write_u16(value_len as u16); - let value = Bytes::copy_from_slice(&rbuf[..value_len]); - hasher.write(&value); - rbuf.advance(value_len); - let checksum = rbuf.get_u32(); - if hasher.finalize() != checksum { + // The checksum computed from the individual components should be the same as a direct checksum on the buffer. + // Students' implementation only needs to do a single checksum on the buffer. We compute both for verification purpose. + let single_checksum = crc32fast::hash(batch_buf); + while batch_buf.has_remaining() { + let key_len = batch_buf.get_u16() as usize; + hasher.write(&(key_len as u16).to_be_bytes()); + let key = Bytes::copy_from_slice(&batch_buf[..key_len]); + hasher.write(&key); + batch_buf.advance(key_len); + let ts = batch_buf.get_u64(); + hasher.write(&ts.to_be_bytes()); + let value_len = batch_buf.get_u16() as usize; + hasher.write(&(value_len as u16).to_be_bytes()); + let value = Bytes::copy_from_slice(&batch_buf[..value_len]); + hasher.write(&value); + kv_pairs.push((key, ts, value)); + batch_buf.advance(value_len); + } + rbuf.advance(batch_size); + let expected_checksum = rbuf.get_u32(); + let component_checksum = hasher.finalize(); + assert_eq!(component_checksum, single_checksum); + if single_checksum != expected_checksum { bail!("checksum mismatch"); } - skiplist.insert(KeyBytes::from_bytes_with_ts(key, ts), value); + for (key, ts, value) in kv_pairs { + skiplist.insert(KeyBytes::from_bytes_with_ts(key, ts), value); + } } Ok(Self { file: Arc::new(Mutex::new(BufWriter::new(file))), }) } - pub fn put(&self, key: KeySlice, value: &[u8]) -> Result<()> { + /// Implement this in week 3, day 5. + pub fn put_batch(&self, data: &[(KeySlice, &[u8])]) -> Result<()> { let mut file = self.file.lock(); - let mut buf: Vec = - Vec::with_capacity(key.raw_len() + value.len() + std::mem::size_of::()); - let mut hasher = crc32fast::Hasher::new(); - hasher.write_u16(key.key_len() as u16); - buf.put_u16(key.key_len() as u16); - hasher.write(key.key_ref()); - buf.put_slice(key.key_ref()); - hasher.write_u64(key.ts()); - buf.put_u64(key.ts()); - hasher.write_u16(value.len() as u16); - buf.put_u16(value.len() as u16); - buf.put_slice(value); - hasher.write(value); - // add checksum: week 2 day 7 - buf.put_u32(hasher.finalize()); + let mut buf = Vec::::new(); + for (key, value) in data { + buf.put_u16(key.key_len() as u16); + buf.put_slice(key.key_ref()); + buf.put_u64(key.ts()); + buf.put_u16(value.len() as u16); + buf.put_slice(value); + } + // write batch_size header (u32) + file.write_all(&(buf.len() as u32).to_be_bytes())?; + // write key-value pairs body file.write_all(&buf)?; + // write checksum (u32) + file.write_all(&crc32fast::hash(&buf).to_be_bytes())?; Ok(()) } + pub fn put(&self, key: KeySlice, value: &[u8]) -> Result<()> { + self.put_batch(&[(key, value)]) + } + pub fn sync(&self) -> Result<()> { let mut file = self.file.lock(); file.flush()?; diff --git a/mini-lsm-starter/src/mem_table.rs b/mini-lsm-starter/src/mem_table.rs index 57c96148..dc4fa5a1 100644 --- a/mini-lsm-starter/src/mem_table.rs +++ b/mini-lsm-starter/src/mem_table.rs @@ -76,10 +76,16 @@ impl MemTable { /// /// In week 1, day 1, simply put the key-value pair into the skipmap. /// In week 2, day 6, also flush the data to WAL. + /// In week 3, day 5, modify the function to use the batch API. pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> { unimplemented!() } + /// Implement this in week 3, day 5. + pub fn put_batch(&self, _data: &[(KeySlice, &[u8])]) -> Result<()> { + unimplemented!() + } + pub fn sync_wal(&self) -> Result<()> { if let Some(ref wal) = self.wal { wal.sync()?; diff --git a/mini-lsm-starter/src/wal.rs b/mini-lsm-starter/src/wal.rs index 2b31d436..ad7d850a 100644 --- a/mini-lsm-starter/src/wal.rs +++ b/mini-lsm-starter/src/wal.rs @@ -27,6 +27,11 @@ impl Wal { unimplemented!() } + /// Implement this in week 3, day 5. + pub fn put_batch(&self, _data: &[(&[u8], &[u8])]) -> Result<()> { + unimplemented!() + } + pub fn sync(&self) -> Result<()> { unimplemented!() } diff --git a/mini-lsm/src/mem_table.rs b/mini-lsm/src/mem_table.rs index 89da268f..de9a5d91 100644 --- a/mini-lsm/src/mem_table.rs +++ b/mini-lsm/src/mem_table.rs @@ -91,6 +91,7 @@ impl MemTable { /// /// In week 1, day 1, simply put the key-value pair into the skipmap. /// In week 2, day 6, also flush the data to WAL. + /// In week 3, day 5, modify the function to use the batch API. pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { let estimated_size = key.len() + value.len(); self.map @@ -103,6 +104,11 @@ impl MemTable { Ok(()) } + /// Implement this in week 3, day 5. + pub fn put_batch(&self, _data: &[(KeySlice, &[u8])]) -> Result<()> { + unimplemented!() + } + pub fn sync_wal(&self) -> Result<()> { if let Some(ref wal) = self.wal { wal.sync()?; diff --git a/mini-lsm/src/wal.rs b/mini-lsm/src/wal.rs index a8165675..a4771d30 100644 --- a/mini-lsm/src/wal.rs +++ b/mini-lsm/src/wal.rs @@ -79,6 +79,11 @@ impl Wal { Ok(()) } + /// Implement this in week 3, day 5. + pub fn put_batch(&self, _data: &[(&[u8], &[u8])]) -> Result<()> { + unimplemented!() + } + pub fn sync(&self) -> Result<()> { let mut file = self.file.lock(); file.flush()?;