From 2a50dc33f11a9ef4c4e0e630cd5b5584415bccf0 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 5 Sep 2024 02:31:07 +0530 Subject: [PATCH] Write integration --- crates/commitlog/Cargo.toml | 1 + crates/commitlog/src/commitlog.rs | 4 +- crates/commitlog/src/index/indexfile.rs | 44 +++----- crates/commitlog/src/index/mod.rs | 24 +++-- crates/commitlog/src/lib.rs | 20 +++- crates/commitlog/src/repo/fs.rs | 25 ++++- crates/commitlog/src/repo/mem.rs | 4 +- crates/commitlog/src/repo/mod.rs | 42 +++++++- crates/commitlog/src/segment.rs | 130 +++++++++++++++++++++--- crates/commitlog/src/tests/partial.rs | 6 +- 10 files changed, 236 insertions(+), 64 deletions(-) diff --git a/crates/commitlog/Cargo.toml b/crates/commitlog/Cargo.toml index 1b5e22f25a9..205c93e0152 100644 --- a/crates/commitlog/Cargo.toml +++ b/crates/commitlog/Cargo.toml @@ -19,6 +19,7 @@ memmap2 = "0.9.4" serde = { workspace = true, optional = true } spacetimedb-primitives.workspace = true spacetimedb-sats.workspace = true +tempfile.workspace = true thiserror.workspace = true [dev-dependencies] diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 1c14d9b94cd..d6d03aa5dbd 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -230,8 +230,8 @@ impl Generic { } else { let byte_offset = segment::Header::LEN as u64 + bytes_read; debug!("truncating segment {segment} to {offset} at {byte_offset}"); - let file = self.repo.open_segment(segment)?; - file.ftruncate(byte_offset)?; + let mut file = self.repo.open_segment(segment)?; + file.ftruncate(offset, byte_offset)?; // Some filesystems require fsync after ftruncate. file.fsync()?; break; diff --git a/crates/commitlog/src/index/indexfile.rs b/crates/commitlog/src/index/indexfile.rs index dd1874e6f0e..08673beeb14 100644 --- a/crates/commitlog/src/index/indexfile.rs +++ b/crates/commitlog/src/index/indexfile.rs @@ -3,7 +3,7 @@ use std::{ io, marker::PhantomData, mem, - path::PathBuf, + path::{Path, PathBuf}, }; use log::debug; @@ -15,7 +15,7 @@ const KEY_SIZE: usize = mem::size_of::(); const ENTRY_SIZE: usize = KEY_SIZE + mem::size_of::(); /// Returns the offset index file name based on the root path and offset -pub fn offset_index_file_name(root: &PathBuf, offset: u64) -> PathBuf { +pub fn offset_index_file_name(root: &Path, offset: u64) -> PathBuf { root.join(format!("{offset:0>20}{OFFSET_INDEX_FILE_EXT}")) } @@ -35,6 +35,7 @@ pub struct IndexFileMut + From> { } impl + From> IndexFileMut { + // Searches for first 0-key, to count number of entries fn num_entries(&self) -> Result { for index in 0.. { match self.index_lookup(index) { @@ -50,7 +51,7 @@ impl + From> IndexFileMut { Ok(0) } - /// Finds the 0 based index of the first key encountered that is smaller than or equal to the given key. + /// Finds the 0 based index of the first key encountered that is just smaller than or equal to the given key. /// /// # Error /// @@ -114,7 +115,6 @@ impl + From> IndexFileMut { } impl + From> IndexRead for IndexFileMut { - /// Find the index of key smaller or equal to given Key, and then look up its value fn key_lookup(&self, key: Key) -> Result<(Key, u64), IndexError> { let (_, idx) = self.find_index(key)?; self.index_lookup(idx as usize) @@ -123,12 +123,6 @@ impl + From> IndexRead for IndexFileMut { /// Implementation of the `IndexWrite` trait for `IndexFileMut`. impl + From> IndexWrite for IndexFileMut { - /// Appends a key-value pair to the index file. - /// Successive calls to `append` must supply key in ascending order - /// - /// Errors - /// - `IndexError::InvalidInput`: Either Key or Value is 0 - /// - `IndexError::OutOfMemory`: Append after index file is already full. fn append(&mut self, key: Key, value: u64) -> Result<(), IndexError> { let key = key.into(); if self.last_key()? >= key { @@ -152,17 +146,15 @@ impl + From> IndexWrite for IndexFileMut { Ok(()) } - /// Asynchronously flushes the index file. fn async_flush(&self) -> Result<(), IndexError> { self.inner.flush_async().map_err(Into::into) } - /// Truncates the index file starting from the entry with a key greater than or equal to the given key. fn truncate(&mut self, key: Key) -> Result<(), IndexError> { let key = key.into(); let (found_key, index) = self.find_index(Key::from(key))?; - // Start index to truncate + // If returned key is smalled than asked key, truncate from next entry self.num_entries = if found_key.into() == key { index as usize } else { @@ -179,7 +171,7 @@ impl + From> IndexWrite for IndexFileMut { } } -pub fn create_index + From>( +pub fn create_index_file + From>( path: &PathBuf, offset: u64, cap: u64, @@ -202,7 +194,7 @@ pub fn create_index + From>( .or_else(|e| { if e.kind() == io::ErrorKind::AlreadyExists { debug!("Index file {} already exists", path.display()); - return open_index(path, offset); + open_index_file(path, offset) } else { debug!("Index file creation failed with error: {}", e); Err(e.into()) @@ -210,7 +202,7 @@ pub fn create_index + From>( }) } -pub fn open_index + From>(path: &PathBuf, offset: u64) -> Result, IndexError> { +pub fn open_index_file + From>(path: &Path, offset: u64) -> Result, IndexError> { let file = File::options() .read(true) .write(true) @@ -227,26 +219,23 @@ pub fn open_index + From>(path: &PathBuf, offset: u64) -> Re Ok(me) } -pub fn delete_index(path: &PathBuf, offset: u64) -> Result<(), IndexError> { +pub fn delete_index_file(path: &Path, offset: u64) -> Result<(), IndexError> { fs::remove_file(offset_index_file_name(path, offset)).map_err(Into::into) } #[cfg(test)] mod tests { - use std::ffi::FromBytesUntilNulError; - use super::*; - use rand::seq::index; use tempfile::TempDir; - /// Create and fill index file with key as first `fill_till - 1` even numbers + /// Create and fill index file with key as first `fill_till - 1` even numbers fn create_and_fill_index(cap: u64, fill_till: u64) -> Result, IndexError> { // Create a temporary directory for testing let temp_dir = TempDir::new()?; let path = temp_dir.path().to_path_buf(); // Create an index file - let mut index_file: IndexFileMut = create_index(&path, 0, cap)?; + let mut index_file: IndexFileMut = create_index_file(&path, 0, cap)?; // Enter even number keys from 2 for i in 1..fill_till { @@ -280,12 +269,12 @@ mod tests { // append smaller than already appended key assert!(index.append(17, 300).is_err()); - + // append duplicate key assert!(index.append(18, 500).is_err()); // append to fill the capacty - assert_eq!(index.append(22, 500)?, ()); + assert!(index.append(22, 500).is_ok()); // Append after capacity should give error assert!(index.append(224, 600).is_err()); @@ -299,7 +288,6 @@ mod tests { assert_eq!(index.num_entries, 8); - // Truncate last present entry index.truncate(16)?; assert_eq!(index.num_entries, 7); @@ -322,13 +310,13 @@ mod tests { } #[test] - fn test_open_index() -> Result<(), IndexError> { + fn test_close_open_index() -> Result<(), IndexError> { // Create a temporary directory for testing let temp_dir = TempDir::new()?; let path = temp_dir.path().to_path_buf(); // Create an index file - let mut index_file: IndexFileMut = create_index(&path, 0, 100)?; + let mut index_file: IndexFileMut = create_index_file(&path, 0, 100)?; for i in 1..10 { index_file.append(i * 2, i * 2 * 100)?; @@ -337,7 +325,7 @@ mod tests { assert_eq!(index_file.num_entries, 9); drop(index_file); - let open_index_file: IndexFileMut = open_index(&path, 0)?; + let open_index_file: IndexFileMut = open_index_file(&path, 0)?; assert_eq!(open_index_file.num_entries, 9); assert_eq!(open_index_file.key_lookup(6)?, (6, 600)); diff --git a/crates/commitlog/src/index/mod.rs b/crates/commitlog/src/index/mod.rs index 9f8d516c367..ccea55a5949 100644 --- a/crates/commitlog/src/index/mod.rs +++ b/crates/commitlog/src/index/mod.rs @@ -1,32 +1,38 @@ - use std::io; use thiserror::Error; mod indexfile; -pub use indexfile::create_index; -pub use indexfile::delete_index; +pub use indexfile::create_index_file; +pub use indexfile::delete_index_file; pub use indexfile::IndexFileMut; pub trait IndexRead + From> { - /// Returns the key and value that is lesser than or equal to the given key + // Return (key, value) pair of key just smaller or equal to given key + /// + /// # Error + /// - `IndexError::KeyNotFound`: If the key is smaller than the first entry key + #[allow(dead_code)] fn key_lookup(&self, key: Key) -> Result<(Key, u64), IndexError>; } /// Trait for writing operations on an index file pub trait IndexWrite + From> { - /// Appends a new key-value pair to the index file + /// Appends a key-value pair to the index file. + /// Successive calls to `append` must supply key in ascending order + /// + /// Errors + /// - `IndexError::InvalidInput`: Either Key or Value is 0 + /// - `IndexError::OutOfMemory`: Append after index file is already full. fn append(&mut self, key: Key, value: u64) -> Result<(), IndexError>; /// Asynchronously flushes any pending changes to the index file fn async_flush(&self) -> Result<(), IndexError>; - /// Truncates the index file up to the specified key, removing all entries after it + /// Truncates the index file starting from the entry with a key greater than or equal to the given key. fn truncate(&mut self, key: Key) -> Result<(), IndexError>; } - - #[derive(Error, Debug)] pub enum IndexError { #[error("I/O error: {0}")] @@ -38,7 +44,7 @@ pub enum IndexError { #[error("Asked key is smaller than the first entry in the index")] KeyNotFound, - #[error("Invalid input: Key should be monotnously increasing")] + #[error("Key should be monotnously increasing")] InvalidInput, #[error("index file is not readable")] diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 00e84b37655..d176253020a 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -1,9 +1,15 @@ -use std::{io, num::NonZeroU16, path::PathBuf, sync::RwLock}; +use std::{ + io, + num::{NonZeroU16, NonZeroU64}, + path::PathBuf, + sync::RwLock, +}; use log::trace; mod commit; pub mod commitlog; +mod index; pub mod repo; pub mod segment; mod varchar; @@ -45,6 +51,16 @@ pub struct Options { /// /// Default: 65,535 pub max_records_in_commit: NonZeroU16, + /// Whenever at least this many bytes have been written to the currently + /// active segment, an entry is added to its offset index. + /// + /// Default: 4096 + pub offset_index_interval_bytes: NonZeroU64, + /// If `true`, require that the segment must be synced to disk before an + /// index entry is added. + /// + /// Default: false + pub offset_index_require_segment_fsync: bool, } impl Default for Options { @@ -53,6 +69,8 @@ impl Default for Options { log_format_version: DEFAULT_LOG_FORMAT_VERSION, max_segment_size: 1024 * 1024 * 1024, max_records_in_commit: NonZeroU16::MAX, + offset_index_interval_bytes: NonZeroU64::new(4096).unwrap(), + offset_index_require_segment_fsync: false, } } } diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 49361d4ebb7..e15d29aa70f 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -4,9 +4,11 @@ use std::{ path::PathBuf, }; -use log::debug; +use log::{debug, info}; -use super::Repo; +use crate::index::{create_index_file, delete_index_file}; + +use super::{Repo, TxOffset, TxOffsetIndex}; const SEGMENT_FILE_EXT: &str = ".stdb.log"; @@ -91,6 +93,7 @@ impl Repo for Fs { } fn remove_segment(&self, offset: u64) -> io::Result<()> { + self.remove_offset_index(offset.into()); fs::remove_file(self.segment_path(offset)) } @@ -117,4 +120,22 @@ impl Repo for Fs { Ok(segments) } + + fn get_offset_index(&self, offset: TxOffset, cap: u64) -> Option { + let offset = offset.into(); + create_index_file(&self.root, offset, cap) + .map_err(|err| { + info!("Opening offset index failed for offset: {}, error: {}", offset, err); + err + }) + .ok() + } + + fn remove_offset_index(&self, offset: TxOffset) { + let offset = offset.into(); + let _ = delete_index_file(&self.root, offset).map_err(|err| { + info!("Failed to delete index offset:{}, error: {}", offset, err); + err + }); + } } diff --git a/crates/commitlog/src/repo/mem.rs b/crates/commitlog/src/repo/mem.rs index 3219d395dc9..06efbf3868b 100644 --- a/crates/commitlog/src/repo/mem.rs +++ b/crates/commitlog/src/repo/mem.rs @@ -40,11 +40,11 @@ impl From for Segment { } impl FileLike for Segment { - fn fsync(&self) -> io::Result<()> { + fn fsync(&mut self) -> io::Result<()> { Ok(()) } - fn ftruncate(&self, size: u64) -> io::Result<()> { + fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { let mut inner = self.buf.write().unwrap(); inner.resize(size as usize, 0); Ok(()) diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index a69d91ef407..6b10b062ee5 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -1,11 +1,12 @@ -use std::{io, u64}; +use std::io; use log::{debug, warn}; use crate::{ commit::Commit, error, - segment::{FileLike, Header, Metadata, Reader, Writer}, + index::IndexFileMut, + segment::{FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer}, Options, }; @@ -17,6 +18,23 @@ pub use fs::Fs; #[cfg(test)] pub use mem::Memory; +#[derive(Debug, Default, Clone, Copy)] +pub struct TxOffset(u64); + +impl From for u64 { + fn from(value: TxOffset) -> Self { + value.0 + } +} + +impl From for TxOffset { + fn from(value: u64) -> Self { + Self(value) + } +} + +pub type TxOffsetIndex = IndexFileMut; + /// A repository of log segments. /// /// This is mainly an internal trait to allow testing against an in-memory @@ -53,6 +71,19 @@ pub trait Repo: Clone { /// Traverse all segments in this repository and return list of their /// offsets, sorted in ascending order. fn existing_offsets(&self) -> io::Result>; + + /// Create or get an existing `TxOffsetIndex` for the given `offset`. + /// The `cap` parameter is the maximum number of entries in the index. + fn get_offset_index(&self, _offset: TxOffset, _cap: u64) -> Option { + None + } + + /// Remove `TxOffsetIndex` named with `offset`. + fn remove_offset_index(&self, _offset: TxOffset) {} +} + +fn offset_index_len(opts: Options) -> u64 { + opts.max_segment_size / opts.offset_index_interval_bytes } /// Create a new segment [`Writer`] with `offset`. @@ -70,6 +101,7 @@ pub fn create_segment_writer(repo: &R, opts: Options, offset: u64) -> i .write(&mut storage)?; storage.fsync()?; + let offset_index_head = repo.get_offset_index(offset.into(), offset_index_len(opts)); Ok(Writer { commit: Commit { min_tx_offset: offset, @@ -82,6 +114,8 @@ pub fn create_segment_writer(repo: &R, opts: Options, offset: u64) -> i bytes_written: Header::LEN as u64, max_records_in_commit: opts.max_records_in_commit, + + offset_index_head: offset_index_head.map(|index| OffsetIndexWriter::new(index, opts)), }) } @@ -123,6 +157,8 @@ pub fn resume_segment_writer( .ensure_compatible(opts.log_format_version, Commit::CHECKSUM_ALGORITHM) .map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?; + let offset_index_head = repo.get_offset_index(offset.into(), offset_index_len(opts)); + Ok(Ok(Writer { commit: Commit { min_tx_offset: tx_range.end, @@ -135,6 +171,8 @@ pub fn resume_segment_writer( bytes_written: size_in_bytes, max_records_in_commit: opts.max_records_in_commit, + + offset_index_head: offset_index_head.map(|index| OffsetIndexWriter::new(index, opts)), })) } diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 8d76c0aaa6d..e3d7eb5cd2a 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -1,16 +1,19 @@ use std::{ fs::File, io::{self, BufWriter, Write as _}, - num::NonZeroU16, + num::{NonZeroU16, NonZeroU64}, ops::Range, }; -use log::debug; +use log::{debug, info}; use crate::{ commit::{self, Commit, StoredCommit}, error, + index::{IndexError, IndexWrite as _}, payload::Encode, + repo::{TxOffset, TxOffsetIndex}, + Options, }; pub const MAGIC: [u8; 6] = [b'(', b'd', b's', b')', b'^', b'2']; @@ -83,6 +86,8 @@ pub struct Writer { pub(crate) bytes_written: u64, pub(crate) max_records_in_commit: NonZeroU16, + + pub(crate) offset_index_head: Option, } impl Writer { @@ -116,7 +121,16 @@ impl Writer { self.commit.write(&mut self.inner)?; self.inner.flush()?; - self.bytes_written += self.commit.encoded_len() as u64; + let commit_len = self.commit.encoded_len() as u64; + self.offset_index_head.as_mut().map(|index| { + index + .append_after_commit(self.commit.min_tx_offset.into(), self.bytes_written, commit_len) + .map_err(|e| { + info!("failed to append to offset index: {:?}", e); + }) + }); + + self.bytes_written += commit_len; self.commit.min_tx_offset += self.commit.n as u64; self.commit.n = 0; self.commit.records.clear(); @@ -149,40 +163,123 @@ impl Writer { } pub trait FileLike { - fn fsync(&self) -> io::Result<()>; - fn ftruncate(&self, size: u64) -> io::Result<()>; + fn fsync(&mut self) -> io::Result<()>; + fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()>; } impl FileLike for File { - fn fsync(&self) -> io::Result<()> { + fn fsync(&mut self) -> io::Result<()> { self.sync_all() } - fn ftruncate(&self, size: u64) -> io::Result<()> { + fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { self.set_len(size) } } impl FileLike for BufWriter { - fn fsync(&self) -> io::Result<()> { - self.get_ref().fsync() + fn fsync(&mut self) -> io::Result<()> { + self.get_mut().fsync() } - fn ftruncate(&self, size: u64) -> io::Result<()> { - self.get_ref().ftruncate(size) + fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> { + self.get_mut().ftruncate(tx_offset, size) } } impl FileLike for Writer { - fn fsync(&self) -> io::Result<()> { - self.inner.fsync() + fn fsync(&mut self) -> io::Result<()> { + self.inner.fsync()?; + self.offset_index_head.as_mut().map(|index| index.fsync()); + Ok(()) + } + + fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> { + self.inner.ftruncate(tx_offset, size)?; + self.offset_index_head + .as_mut() + .map(|index| index.ftruncate(tx_offset, size)); + Ok(()) + } +} + +#[derive(Debug)] +pub struct OffsetIndexWriter { + pub(crate) head: TxOffsetIndex, + + require_segment_fsync: bool, + min_write_interval: NonZeroU64, + + pub(crate) candidate_min_tx_offset: TxOffset, + pub(crate) candidate_byte_offset: u64, + pub(crate) bytes_since_last_index: u64, +} + +impl OffsetIndexWriter { + pub fn new(head: TxOffsetIndex, opts: Options) -> Self { + OffsetIndexWriter { + head, + require_segment_fsync: opts.offset_index_require_segment_fsync, + min_write_interval: opts.offset_index_interval_bytes, + candidate_min_tx_offset: TxOffset::default(), + candidate_byte_offset: 0, + bytes_since_last_index: 0, + } + } + + fn reset(&mut self) { + self.candidate_byte_offset = 0; + self.candidate_min_tx_offset = TxOffset::default(); + self.bytes_since_last_index = 0; } - fn ftruncate(&self, size: u64) -> io::Result<()> { - self.inner.ftruncate(size) + /// Either append to index or save offsets to append at future fsync + fn append_after_commit( + &mut self, + min_tx_offset: TxOffset, + byte_offset: u64, + commit_len: u64, + ) -> Result<(), IndexError> { + self.bytes_since_last_index += commit_len; + + if !self.require_segment_fsync && self.bytes_since_last_index >= self.min_write_interval.get() { + self.head.append(min_tx_offset, byte_offset)?; + self.head.async_flush()?; + self.reset(); + } else { + self.candidate_byte_offset = byte_offset; + self.candidate_min_tx_offset = min_tx_offset; + } + Ok(()) + } + + pub fn append_after_fsync(&mut self) -> Result<(), IndexError> { + if self.bytes_since_last_index >= self.min_write_interval.get() { + self.head + .append(self.candidate_min_tx_offset, self.candidate_byte_offset)?; + + self.head.async_flush()?; + self.reset(); + } + Ok(()) } } +impl FileLike for OffsetIndexWriter { + /// Must be called via SegmentWriter::fsync + fn fsync(&mut self) -> io::Result<()> { + let _ = self.append_after_fsync().map_err(|e| { + info!("failed to append to offset index: {:?}", e); + }); + Ok(()) + } + + fn ftruncate(&mut self, _tx_offset: u64, tx_offset: u64) -> io::Result<()> { + self.reset(); + let _ = self.head.truncate(tx_offset.into()); + Ok(()) + } +} #[derive(Debug)] pub struct Reader { pub header: Header, @@ -511,6 +608,8 @@ mod tests { bytes_written: 0, max_records_in_commit, + + offset_index_head: None, }; for i in 0..max_records_in_commit.get() { @@ -539,6 +638,7 @@ mod tests { bytes_written: 0, max_records_in_commit: NonZeroU16::MAX, + offset_index_head: None, }; assert_eq!(0, writer.next_tx_offset()); diff --git a/crates/commitlog/src/tests/partial.rs b/crates/commitlog/src/tests/partial.rs index 1aff582a6a8..cb883510a49 100644 --- a/crates/commitlog/src/tests/partial.rs +++ b/crates/commitlog/src/tests/partial.rs @@ -161,12 +161,12 @@ struct ShortSegment { } impl FileLike for ShortSegment { - fn fsync(&self) -> std::io::Result<()> { + fn fsync(&mut self) -> std::io::Result<()> { self.inner.fsync() } - fn ftruncate(&self, size: u64) -> std::io::Result<()> { - self.inner.ftruncate(size) + fn ftruncate(&mut self, tx_offset: u64, size: u64) -> std::io::Result<()> { + self.inner.ftruncate(tx_offset, size) } }