Skip to content

Commit

Permalink
patch memtable and add ts for wal
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <iskyzh@gmail.com>
  • Loading branch information
skyzh committed Jan 25, 2024
1 parent 89acc23 commit 218c73f
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 96 deletions.
4 changes: 4 additions & 0 deletions mini-lsm-mvcc/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ impl Key<Vec<u8>> {
}

impl Key<Bytes> {
pub fn new() -> Self {
Self(Bytes::new(), TS_DEFAULT)
}

pub fn as_key_slice(&self) -> KeySlice {
Key(&self.0, self.1)
}
Expand Down
42 changes: 36 additions & 6 deletions mini-lsm-mvcc/src/lsm_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,24 @@ pub struct LsmIterator {
inner: LsmIteratorInner,
end_bound: Bound<Bytes>,
is_valid: bool,
read_ts: u64,
prev_key: Vec<u8>,
}

impl LsmIterator {
pub(crate) fn new(iter: LsmIteratorInner, end_bound: Bound<Bytes>) -> Result<Self> {
pub(crate) fn new(
iter: LsmIteratorInner,
end_bound: Bound<Bytes>,
read_ts: u64,
) -> Result<Self> {
let mut iter = Self {
is_valid: iter.is_valid(),
inner: iter,
end_bound,
read_ts,
prev_key: Vec::new(),
};
iter.move_to_non_delete()?;
iter.move_to_key()?;
Ok(iter)
}

Expand All @@ -47,9 +55,31 @@ impl LsmIterator {
Ok(())
}

fn move_to_non_delete(&mut self) -> Result<()> {
while self.is_valid() && self.inner.value().is_empty() {
self.next_inner()?;
fn move_to_key(&mut self) -> Result<()> {
loop {
while self.inner.is_valid() && self.inner.key().key_ref() == self.prev_key {
self.next_inner()?;
}
if !self.inner.is_valid() {
break;
}
self.prev_key.clear();
self.prev_key.extend(self.inner.key().key_ref());
while self.inner.is_valid()
&& self.inner.key().key_ref() == self.prev_key
&& self.inner.key().ts() > self.read_ts
{
self.next_inner()?;
}
if !self.inner.is_valid() {
break;
}
if self.inner.key().key_ref() != self.prev_key {
continue;
}
if !self.inner.value().is_empty() {
break;
}
}
Ok(())
}
Expand All @@ -72,7 +102,7 @@ impl StorageIterator for LsmIterator {

fn next(&mut self) -> Result<()> {
self.next_inner()?;
self.move_to_non_delete()?;
self.move_to_key()?;
Ok(())
}

Expand Down
75 changes: 44 additions & 31 deletions mini-lsm-mvcc/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::ops::Bound;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;

use anyhow::{Context, Result};
Expand All @@ -21,7 +21,7 @@ use crate::iterators::StorageIterator;
use crate::key::{self, KeySlice};
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::{Manifest, ManifestRecord};
use crate::mem_table::{map_bound, MemTable};
use crate::mem_table::{map_bound, map_key_bound_plus_ts, MemTable};
use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator};

pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
Expand Down Expand Up @@ -152,6 +152,8 @@ pub(crate) struct LsmStorageInner {
pub(crate) options: Arc<LsmStorageOptions>,
pub(crate) compaction_controller: CompactionController,
pub(crate) manifest: Option<Manifest>,
pub(crate) ts: Arc<AtomicU64>,
pub(crate) write_lock: Mutex<()>,
}

/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
Expand Down Expand Up @@ -405,6 +407,8 @@ impl LsmStorageInner {
compaction_controller,
manifest: Some(manifest),
options: options.into(),
ts: Arc::new(AtomicU64::new(0)),
write_lock: Mutex::new(()),
};
storage.sync_dir()?;

Expand All @@ -422,25 +426,18 @@ impl LsmStorageInner {
Arc::clone(&guard)
}; // drop global lock here

// Search on the current memtable.
if let Some(value) = snapshot.memtable.get(key) {
if value.is_empty() {
// found tomestone, return key not exists
return Ok(None);
}
return Ok(Some(value));
}

// Search on immutable memtables.
let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1);
memtable_iters.push(Box::new(snapshot.memtable.scan(
Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_BEGIN)),
Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_END)),
)));
for memtable in snapshot.imm_memtables.iter() {
if let Some(value) = memtable.get(key) {
if value.is_empty() {
// found tomestone, return key not exists
return Ok(None);
}
return Ok(Some(value));
}
memtable_iters.push(Box::new(memtable.scan(
Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_BEGIN)),
Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_END)),
)));
}
let memtable_iter = MergeIterator::create(memtable_iters);

let mut l0_iters = Vec::with_capacity(snapshot.l0_sstables.len());

Expand All @@ -466,7 +463,7 @@ impl LsmStorageInner {
if keep_table(key, &table) {
l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_key(
table,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?));
}
}
Expand All @@ -482,20 +479,29 @@ impl LsmStorageInner {
}
let level_iter = SstConcatIterator::create_and_seek_to_key(
level_ssts,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?;
level_iters.push(Box::new(level_iter));
}

let iter = TwoMergeIterator::create(l0_iter, MergeIterator::create(level_iters))?;
let iter = LsmIterator::new(
TwoMergeIterator::create(
TwoMergeIterator::create(memtable_iter, l0_iter)?,
MergeIterator::create(level_iters),
)?,
Bound::Unbounded,
self.ts.load(Ordering::SeqCst),
)?;

if iter.is_valid() && iter.key().key_ref() == key && !iter.value().is_empty() {
if iter.is_valid() && iter.key() == key && !iter.value().is_empty() {
return Ok(Some(Bytes::copy_from_slice(iter.value())));
}
Ok(None)
}

pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
let _lck = self.write_lock.lock();
let ts = self.ts.fetch_add(1, Ordering::Relaxed);
for record in batch {
match record {
WriteBatchRecord::Del(key) => {
Expand All @@ -504,7 +510,7 @@ impl LsmStorageInner {
let size;
{
let guard = self.state.read();
guard.memtable.put(key, b"")?;
guard.memtable.put(KeySlice::from_slice(key, ts), b"")?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
Expand All @@ -517,7 +523,7 @@ impl LsmStorageInner {
let size;
{
let guard = self.state.read();
guard.memtable.put(key, value)?;
guard.memtable.put(KeySlice::from_slice(key, ts), value)?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
Expand Down Expand Up @@ -681,9 +687,15 @@ impl LsmStorageInner {
}; // drop global lock here

let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1);
memtable_iters.push(Box::new(snapshot.memtable.scan(lower, upper)));
memtable_iters.push(Box::new(snapshot.memtable.scan(
map_key_bound_plus_ts(lower, key::TS_RANGE_BEGIN),
map_key_bound_plus_ts(upper, key::TS_RANGE_END),
)));
for memtable in snapshot.imm_memtables.iter() {
memtable_iters.push(Box::new(memtable.scan(lower, upper)));
memtable_iters.push(Box::new(memtable.scan(
map_key_bound_plus_ts(lower, key::TS_RANGE_BEGIN),
map_key_bound_plus_ts(upper, key::TS_RANGE_END),
)));
}
let memtable_iter = MergeIterator::create(memtable_iters);

Expand All @@ -699,12 +711,12 @@ impl LsmStorageInner {
let iter = match lower {
Bound::Included(key) => SsTableIterator::create_and_seek_to_key(
table,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?,
Bound::Excluded(key) => {
let mut iter = SsTableIterator::create_and_seek_to_key(
table,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?;
if iter.is_valid() && iter.key().key_ref() == key {
iter.next()?;
Expand Down Expand Up @@ -737,12 +749,12 @@ impl LsmStorageInner {
let level_iter = match lower {
Bound::Included(key) => SstConcatIterator::create_and_seek_to_key(
level_ssts,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?,
Bound::Excluded(key) => {
let mut iter = SstConcatIterator::create_and_seek_to_key(
level_ssts,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?;
if iter.is_valid() && iter.key().key_ref() == key {
iter.next()?;
Expand All @@ -760,6 +772,7 @@ impl LsmStorageInner {
Ok(FusedIterator::new(LsmIterator::new(
iter,
map_bound(upper),
self.ts.load(Ordering::SeqCst),
)?))
}
}
Loading

0 comments on commit 218c73f

Please sign in to comment.