Skip to content

Commit

Permalink
refactor levels & add level & tiered compactor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Nov 28, 2023
1 parent 16bb004 commit 87f9927
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 141 deletions.
2 changes: 1 addition & 1 deletion src/compaction/major.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{Choice, CompactionStrategy, Options};
use crate::level::Levels;
use crate::levels::Levels;
use std::sync::Arc;

/// Major compaction
Expand Down
2 changes: 1 addition & 1 deletion src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub(crate) mod major;
pub(crate) mod tiered;
pub(crate) mod worker;

use crate::level::Levels;
use crate::levels::Levels;

/// Configuration for compaction
#[derive(Debug, Eq, PartialEq)]
Expand Down
108 changes: 67 additions & 41 deletions src/compaction/tiered.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{Choice, CompactionStrategy, Options};
use crate::level::Levels;
use crate::levels::Levels;
use std::sync::Arc;

/// Size-tiered compaction strategy (STCS)
Expand Down Expand Up @@ -77,22 +77,24 @@ impl CompactionStrategy for Strategy {
}
}

/* #[cfg(test)]
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, sync::Arc};
use super::{CompactionChoice, Strategy};
use super::Strategy;
use crate::{
block_cache::BlockCache,
compaction::{CompactionStrategy, SingleSegmentPayload},
compaction::{Choice, CompactionStrategy, Options},
levels::Levels,
segment::{index::MetaIndex, meta::Metadata, Segment},
};
use std::sync::Arc;

fn fixture_segment(id: String) -> Arc<Segment> {
let block_cache = Arc::new(BlockCache::new(0));

fn fixture_segment(id: String) -> Segment {
Segment {
path: ".".into(),
block_index: Arc::new(MetaIndex::new(BTreeMap::new())),
Arc::new(Segment {
block_index: Arc::new(MetaIndex::new(id.clone(), block_cache.clone())),
metadata: Metadata {
path: ".".into(),
block_count: 0,
block_size: 0,
created_at: 0,
Expand All @@ -103,50 +105,59 @@ mod tests {
key_range: (vec![], vec![]),
tombstone_count: 0,
uncompressed_size: 0,
seqnos: (0, 0),
},
block_cache: Arc::new(BlockCache::new(0)),
}
block_cache,
})
}

#[test]
fn empty_levels() {
fn empty_levels() -> crate::Result<()> {
let tempdir = tempfile::tempdir()?;
let compactor = Strategy::default();

let levels = Levels::create_new(4);
let levels = Levels::create_new(4, tempdir.path().join("levels.json"))?;

assert_eq!(compactor.choose(&levels), Choice::DoNothing);

assert_eq!(compactor.choose(&levels), CompactionChoice::DoNothing);
Ok(())
}

#[test]
fn default_l0() {
fn default_l0() -> crate::Result<()> {
let tempdir = tempfile::tempdir()?;
let compactor = Strategy::default();

let mut levels = Levels::create_new(4);
let mut levels = Levels::create_new(4, tempdir.path().join("levels.json"))?;

levels.add(fixture_segment("1".into()));
assert_eq!(compactor.choose(&levels), CompactionChoice::DoNothing);
assert_eq!(compactor.choose(&levels), Choice::DoNothing);

levels.add(fixture_segment("2".into()));
assert_eq!(compactor.choose(&levels), CompactionChoice::DoNothing);
assert_eq!(compactor.choose(&levels), Choice::DoNothing);

levels.add(fixture_segment("3".into()));
assert_eq!(compactor.choose(&levels), CompactionChoice::DoNothing);
assert_eq!(compactor.choose(&levels), Choice::DoNothing);

levels.add(fixture_segment("4".into()));
assert_eq!(
compactor.choose(&levels),
CompactionChoice::SingleSegment(SingleSegmentPayload {
Choice::DoCompact(Options {
dest_level: 1,
segment_ids: vec!["1".into(), "2".into(), "3".into(), "4".into()]
segment_ids: vec!["1".into(), "2".into(), "3".into(), "4".into()],
target_size: u64::MAX,
})
);

Ok(())
}

#[test]
fn more_than_min() {
fn more_than_min() -> crate::Result<()> {
let tempdir = tempfile::tempdir()?;
let compactor = Strategy::new(2, 8);

let mut levels = Levels::create_new(4);
let mut levels = Levels::create_new(4, tempdir.path().join("levels.json"))?;
levels.add(fixture_segment("1".into()));
levels.add(fixture_segment("2".into()));
levels.add(fixture_segment("3".into()));
Expand All @@ -159,73 +170,88 @@ mod tests {

assert_eq!(
compactor.choose(&levels),
CompactionChoice::SingleSegment(SingleSegmentPayload {
Choice::DoCompact(Options {
dest_level: 1,
segment_ids: vec!["1".into(), "2".into(), "3".into(), "4".into()]
segment_ids: vec!["1".into(), "2".into(), "3".into(), "4".into()],
target_size: u64::MAX,
})
);

Ok(())
}

#[test]
fn many_segments() {
fn many_segments() -> crate::Result<()> {
let tempdir = tempfile::tempdir()?;
let compactor = Strategy::new(2, 2);

let mut levels = Levels::create_new(4);
let mut levels = Levels::create_new(4, tempdir.path().join("levels.json"))?;
levels.add(fixture_segment("1".into()));
levels.add(fixture_segment("2".into()));
levels.add(fixture_segment("3".into()));
levels.add(fixture_segment("4".into()));

assert_eq!(
compactor.choose(&levels),
CompactionChoice::SingleSegment(SingleSegmentPayload {
Choice::DoCompact(Options {
dest_level: 1,
segment_ids: vec!["1".into(), "2".into()]
segment_ids: vec!["1".into(), "2".into()],
target_size: u64::MAX,
})
);

Ok(())
}

#[test]
fn deeper_level() {
fn deeper_level() -> crate::Result<()> {
let tempdir = tempfile::tempdir()?;
let compactor = Strategy::new(2, 4);

let mut levels = Levels::create_new(4);
let mut levels = Levels::create_new(4, tempdir.path().join("levels.json"))?;
levels.add(fixture_segment("1".into()));

levels.insert_into_level(1, fixture_segment("2".into()));
levels.insert_into_level(1, fixture_segment("3".into()));

assert_eq!(
compactor.choose(&levels),
CompactionChoice::SingleSegment(SingleSegmentPayload {
Choice::DoCompact(Options {
dest_level: 2,
segment_ids: vec!["2".into(), "3".into()]
segment_ids: vec!["2".into(), "3".into()],
target_size: u64::MAX,
})
);

let mut levels = Levels::create_new(4);
let tempdir = tempfile::tempdir()?;
let mut levels = Levels::create_new(4, tempdir.path().join("levels.json"))?;

levels.insert_into_level(2, fixture_segment("2".into()));
levels.insert_into_level(2, fixture_segment("3".into()));

assert_eq!(
compactor.choose(&levels),
CompactionChoice::SingleSegment(SingleSegmentPayload {
Choice::DoCompact(Options {
dest_level: 3,
segment_ids: vec!["2".into(), "3".into()]
segment_ids: vec!["2".into(), "3".into()],
target_size: u64::MAX,
})
);

Ok(())
}

#[test]
fn last_level() {
fn last_level() -> crate::Result<()> {
let tempdir = tempfile::tempdir()?;
let compactor = Strategy::new(2, 4);

let mut levels = Levels::create_new(4);
let mut levels = Levels::create_new(4, tempdir.path().join("levels.json"))?;
levels.insert_into_level(3, fixture_segment("2".into()));
levels.insert_into_level(3, fixture_segment("3".into()));

assert_eq!(compactor.choose(&levels), CompactionChoice::DoNothing);
assert_eq!(compactor.choose(&levels), Choice::DoNothing);

Ok(())
}
}
*/
2 changes: 1 addition & 1 deletion src/compaction/worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
compaction::Choice,
level::Levels,
levels::Levels,
merge::MergeIterator,
segment::{index::MetaIndex, writer::MultiWriter, Segment},
Tree,
Expand Down
78 changes: 78 additions & 0 deletions src/levels/level.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use super::HiddenSet;
use crate::segment::Segment;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, ops::DerefMut, sync::Arc};

#[derive(Serialize, Deserialize)]
pub struct Level(Vec<String>);

impl std::ops::Deref for Level {
type Target = Vec<String>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for Level {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl Default for Level {
fn default() -> Self {
Self(Vec::with_capacity(20))
}
}

pub struct ResolvedLevel(pub(crate) Vec<Arc<Segment>>);

impl std::ops::Deref for ResolvedLevel {
type Target = Vec<Arc<Segment>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for ResolvedLevel {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl ResolvedLevel {
pub fn new(
level: &Level,
hidden_set: &HiddenSet,
segments: &HashMap<String, Arc<Segment>>,
) -> Self {
let mut new_level = Vec::new();

for segment_id in level.iter() {
if !hidden_set.contains(segment_id) {
new_level.push(
segments
.get(segment_id)
.cloned()
.expect("where's the segment at?"),
);
}
}

Self(new_level)
}

pub fn get_overlapping_segments(&self, start: Vec<u8>, end: Vec<u8>) -> Vec<&String> {
use std::ops::Bound::Included;

let bounds = (Included(start), Included(end));

self.0
.iter()
.filter(|x| Segment::check_key_range_overlap(x, &bounds))
.map(|x| &x.metadata.id)
.collect()
}
}
Loading

0 comments on commit 87f9927

Please sign in to comment.