Skip to content

Commit

Permalink
Write integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubham8287 committed Sep 5, 2024
1 parent ec9d524 commit 982a76e
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 65 deletions.
1 change: 1 addition & 0 deletions crates/commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ memmap2 = "0.9.4"
serde = { workspace = true, optional = true }
spacetimedb-primitives.workspace = true
spacetimedb-sats.workspace = true
tempfile.workspace = true
thiserror.workspace = true

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ impl<R: Repo, T> Generic<R, T> {
} else {
let byte_offset = segment::Header::LEN as u64 + bytes_read;
debug!("truncating segment {segment} to {offset} at {byte_offset}");
let file = self.repo.open_segment(segment)?;
file.ftruncate(byte_offset)?;
let mut file = self.repo.open_segment(segment)?;
file.ftruncate(offset, byte_offset)?;
// Some filesystems require fsync after ftruncate.
file.fsync()?;
break;
Expand Down
46 changes: 17 additions & 29 deletions crates/commitlog/src/index/indexfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
io,
marker::PhantomData,
mem,
path::PathBuf,
path::{Path, PathBuf},
};

use log::debug;
Expand All @@ -15,7 +15,7 @@ const KEY_SIZE: usize = mem::size_of::<u64>();
const ENTRY_SIZE: usize = KEY_SIZE + mem::size_of::<u64>();

/// Returns the offset index file name based on the root path and offset
pub fn offset_index_file_name(root: &PathBuf, offset: u64) -> PathBuf {
pub fn offset_index_file_name(root: &Path, offset: u64) -> PathBuf {
root.join(format!("{offset:0>20}{OFFSET_INDEX_FILE_EXT}"))
}

Expand All @@ -35,6 +35,7 @@ pub struct IndexFileMut<Key: Into<u64> + From<u64>> {
}

impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
// Searches for first 0-key, to count number of entries
fn num_entries(&self) -> Result<usize, IndexError> {
for index in 0.. {
match self.index_lookup(index) {
Expand All @@ -50,7 +51,7 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
Ok(0)
}

/// Finds the 0 based index of the first key encountered that is smaller than or equal to the given key.
/// Finds the 0 based index of the first key encountered that is just smaller than or equal to the given key.
///
/// # Error
///
Expand Down Expand Up @@ -114,7 +115,6 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
}

impl<Key: Into<u64> + From<u64>> IndexRead<Key> for IndexFileMut<Key> {
/// Find the index of key smaller or equal to given Key, and then look up its value
fn key_lookup(&self, key: Key) -> Result<(Key, u64), IndexError> {
let (_, idx) = self.find_index(key)?;
self.index_lookup(idx as usize)
Expand All @@ -123,12 +123,6 @@ impl<Key: Into<u64> + From<u64>> IndexRead<Key> for IndexFileMut<Key> {

/// Implementation of the `IndexWrite` trait for `IndexFileMut`.
impl<Key: Into<u64> + From<u64>> IndexWrite<Key> for IndexFileMut<Key> {
/// Appends a key-value pair to the index file.
/// Successive calls to `append` must supply key in ascending order
///
/// Errors
/// - `IndexError::InvalidInput`: Either Key or Value is 0
/// - `IndexError::OutOfMemory`: Append after index file is already full.
fn append(&mut self, key: Key, value: u64) -> Result<(), IndexError> {
let key = key.into();
if self.last_key()? >= key {
Expand All @@ -152,17 +146,15 @@ impl<Key: Into<u64> + From<u64>> IndexWrite<Key> for IndexFileMut<Key> {
Ok(())
}

/// Asynchronously flushes the index file.
fn async_flush(&self) -> Result<(), IndexError> {
self.inner.flush_async().map_err(Into::into)
}

/// Truncates the index file starting from the entry with a key greater than or equal to the given key.
fn truncate(&mut self, key: Key) -> Result<(), IndexError> {
let key = key.into();
let (found_key, index) = self.find_index(Key::from(key))?;

// Start index to truncate
// If returned key is smalled than asked key, truncate from next entry
self.num_entries = if found_key.into() == key {
index as usize
} else {
Expand All @@ -179,8 +171,8 @@ impl<Key: Into<u64> + From<u64>> IndexWrite<Key> for IndexFileMut<Key> {
}
}

pub fn create_index<Key: Into<u64> + From<u64>>(
path: &PathBuf,
pub fn create_index_file<Key: Into<u64> + From<u64>>(
path: &Path,
offset: u64,
cap: u64,
) -> Result<IndexFileMut<Key>, IndexError> {
Expand All @@ -202,15 +194,15 @@ pub fn create_index<Key: Into<u64> + From<u64>>(
.or_else(|e| {
if e.kind() == io::ErrorKind::AlreadyExists {
debug!("Index file {} already exists", path.display());
return open_index(path, offset);
open_index_file(path, offset)
} else {
debug!("Index file creation failed with error: {}", e);
Err(e.into())
}
})
}

pub fn open_index<Key: Into<u64> + From<u64>>(path: &PathBuf, offset: u64) -> Result<IndexFileMut<Key>, IndexError> {
pub fn open_index_file<Key: Into<u64> + From<u64>>(path: &Path, offset: u64) -> Result<IndexFileMut<Key>, IndexError> {
let file = File::options()
.read(true)
.write(true)
Expand All @@ -227,26 +219,23 @@ pub fn open_index<Key: Into<u64> + From<u64>>(path: &PathBuf, offset: u64) -> Re
Ok(me)
}

pub fn delete_index(path: &PathBuf, offset: u64) -> Result<(), IndexError> {
pub fn delete_index_file(path: &Path, offset: u64) -> Result<(), IndexError> {
fs::remove_file(offset_index_file_name(path, offset)).map_err(Into::into)
}

#[cfg(test)]
mod tests {
use std::ffi::FromBytesUntilNulError;

use super::*;
use rand::seq::index;
use tempfile::TempDir;

/// Create and fill index file with key as first `fill_till - 1` even numbers
/// Create and fill index file with key as first `fill_till - 1` even numbers
fn create_and_fill_index(cap: u64, fill_till: u64) -> Result<IndexFileMut<u64>, IndexError> {
// Create a temporary directory for testing
let temp_dir = TempDir::new()?;
let path = temp_dir.path().to_path_buf();

// Create an index file
let mut index_file: IndexFileMut<u64> = create_index(&path, 0, cap)?;
let mut index_file: IndexFileMut<u64> = create_index_file(&path, 0, cap)?;

// Enter even number keys from 2
for i in 1..fill_till {
Expand Down Expand Up @@ -280,12 +269,12 @@ mod tests {

// append smaller than already appended key
assert!(index.append(17, 300).is_err());

// append duplicate key
assert!(index.append(18, 500).is_err());

// append to fill the capacty
assert_eq!(index.append(22, 500)?, ());
assert!(index.append(22, 500).is_ok());

// Append after capacity should give error
assert!(index.append(224, 600).is_err());
Expand All @@ -299,7 +288,6 @@ mod tests {

assert_eq!(index.num_entries, 8);


// Truncate last present entry
index.truncate(16)?;
assert_eq!(index.num_entries, 7);
Expand All @@ -322,13 +310,13 @@ mod tests {
}

#[test]
fn test_open_index() -> Result<(), IndexError> {
fn test_close_open_index() -> Result<(), IndexError> {
// Create a temporary directory for testing
let temp_dir = TempDir::new()?;
let path = temp_dir.path().to_path_buf();

// Create an index file
let mut index_file: IndexFileMut<u64> = create_index(&path, 0, 100)?;
let mut index_file: IndexFileMut<u64> = create_index_file(&path, 0, 100)?;

for i in 1..10 {
index_file.append(i * 2, i * 2 * 100)?;
Expand All @@ -337,7 +325,7 @@ mod tests {
assert_eq!(index_file.num_entries, 9);
drop(index_file);

let open_index_file: IndexFileMut<u64> = open_index(&path, 0)?;
let open_index_file: IndexFileMut<u64> = open_index_file(&path, 0)?;
assert_eq!(open_index_file.num_entries, 9);
assert_eq!(open_index_file.key_lookup(6)?, (6, 600));

Expand Down
24 changes: 15 additions & 9 deletions crates/commitlog/src/index/mod.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,38 @@

use std::io;

use thiserror::Error;
mod indexfile;

pub use indexfile::create_index;
pub use indexfile::delete_index;
pub use indexfile::create_index_file;
pub use indexfile::delete_index_file;
pub use indexfile::IndexFileMut;

pub trait IndexRead<Key: Into<u64> + From<u64>> {
/// Returns the key and value that is lesser than or equal to the given key
// Return (key, value) pair of key just smaller or equal to given key
///
/// # Error
/// - `IndexError::KeyNotFound`: If the key is smaller than the first entry key
#[allow(dead_code)]
fn key_lookup(&self, key: Key) -> Result<(Key, u64), IndexError>;
}

/// Trait for writing operations on an index file
pub trait IndexWrite<Key: Into<u64> + From<u64>> {
/// Appends a new key-value pair to the index file
/// Appends a key-value pair to the index file.
/// Successive calls to `append` must supply key in ascending order
///
/// Errors
/// - `IndexError::InvalidInput`: Either Key or Value is 0
/// - `IndexError::OutOfMemory`: Append after index file is already full.
fn append(&mut self, key: Key, value: u64) -> Result<(), IndexError>;

/// Asynchronously flushes any pending changes to the index file
fn async_flush(&self) -> Result<(), IndexError>;

/// Truncates the index file up to the specified key, removing all entries after it
/// Truncates the index file starting from the entry with a key greater than or equal to the given key.
fn truncate(&mut self, key: Key) -> Result<(), IndexError>;
}



#[derive(Error, Debug)]
pub enum IndexError {
#[error("I/O error: {0}")]
Expand All @@ -38,7 +44,7 @@ pub enum IndexError {
#[error("Asked key is smaller than the first entry in the index")]
KeyNotFound,

#[error("Invalid input: Key should be monotnously increasing")]
#[error("Key should be monotnously increasing")]
InvalidInput,

#[error("index file is not readable")]
Expand Down
20 changes: 19 additions & 1 deletion crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use std::{io, num::NonZeroU16, path::PathBuf, sync::RwLock};
use std::{
io,
num::{NonZeroU16, NonZeroU64},
path::PathBuf,
sync::RwLock,
};

use log::trace;

mod commit;
pub mod commitlog;
mod index;
pub mod repo;
pub mod segment;
mod varchar;
Expand Down Expand Up @@ -45,6 +51,16 @@ pub struct Options {
///
/// Default: 65,535
pub max_records_in_commit: NonZeroU16,
/// Whenever at least this many bytes have been written to the currently
/// active segment, an entry is added to its offset index.
///
/// Default: 4096
pub offset_index_interval_bytes: NonZeroU64,
/// If `true`, require that the segment must be synced to disk before an
/// index entry is added.
///
/// Default: false
pub offset_index_require_segment_fsync: bool,
}

impl Default for Options {
Expand All @@ -53,6 +69,8 @@ impl Default for Options {
log_format_version: DEFAULT_LOG_FORMAT_VERSION,
max_segment_size: 1024 * 1024 * 1024,
max_records_in_commit: NonZeroU16::MAX,
offset_index_interval_bytes: NonZeroU64::new(4096).unwrap(),
offset_index_require_segment_fsync: false,
}
}
}
Expand Down
25 changes: 23 additions & 2 deletions crates/commitlog/src/repo/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use std::{
path::PathBuf,
};

use log::debug;
use log::{debug, info};

use super::Repo;
use crate::index::{create_index_file, delete_index_file};

use super::{Repo, TxOffset, TxOffsetIndex};

const SEGMENT_FILE_EXT: &str = ".stdb.log";

Expand Down Expand Up @@ -91,6 +93,7 @@ impl Repo for Fs {
}

fn remove_segment(&self, offset: u64) -> io::Result<()> {
self.remove_offset_index(offset.into());
fs::remove_file(self.segment_path(offset))
}

Expand All @@ -117,4 +120,22 @@ impl Repo for Fs {

Ok(segments)
}

fn get_offset_index(&self, offset: TxOffset, cap: u64) -> Option<TxOffsetIndex> {
let offset = offset.into();
create_index_file(&self.root, offset, cap)
.map_err(|err| {
info!("Opening offset index failed for offset: {}, error: {}", offset, err);
err
})
.ok()
}

fn remove_offset_index(&self, offset: TxOffset) {
let offset = offset.into();
let _ = delete_index_file(&self.root, offset).map_err(|err| {
info!("Failed to delete index offset:{}, error: {}", offset, err);
err
});
}
}
4 changes: 2 additions & 2 deletions crates/commitlog/src/repo/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ impl From<SharedBytes> for Segment {
}

impl FileLike for Segment {
fn fsync(&self) -> io::Result<()> {
fn fsync(&mut self) -> io::Result<()> {
Ok(())
}

fn ftruncate(&self, size: u64) -> io::Result<()> {
fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> {
let mut inner = self.buf.write().unwrap();
inner.resize(size as usize, 0);
Ok(())
Expand Down
Loading

0 comments on commit 982a76e

Please sign in to comment.