diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 4a1c6be2bd..698f355819 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -169,7 +169,6 @@ impl Generic { pub fn commits_from(&self, offset: u64) -> Commits { let offsets = self.segment_offsets_from(offset); - let next_offset = offsets.first().cloned().unwrap_or(offset); let segments = Segments { offs: offsets.into_iter(), repo: self.repo.clone(), @@ -178,7 +177,7 @@ impl Generic { Commits { inner: None, segments, - last_commit: CommitInfo::Initial { next_offset }, + last_commit: CommitInfo::Initial { next_offset: offset }, last_error: None, } } @@ -468,6 +467,26 @@ impl CommitInfo { Self::LastSeen { tx_range, .. } => &tx_range.end, } } + + // If initial offset falls within a commit, adjust it to the commit boundary. + // + // Returns `true` if the initial offset is past `commit`. + // Returns `false` if `self` isn't `Self::Initial`, + // or the initial offset has been adjusted to the starting offset of `commit`. + // + // For iteration, `true` means to skip the commit, `false` to yield it. + fn adjust_initial_offset(&mut self, commit: &StoredCommit) -> bool { + if let Self::Initial { next_offset } = self { + let last_tx_offset = commit.min_tx_offset + commit.n as u64 - 1; + if *next_offset > last_tx_offset { + return true; + } else { + *next_offset = commit.min_tx_offset; + } + } + + false + } } pub struct Commits { @@ -496,8 +515,11 @@ impl Commits { // interesting. let prev_error = self.last_error.take(); + // Skip entries before the initial commit. + if self.last_commit.adjust_initial_offset(&commit) { + self.next() // Same offset: ignore if duplicate (same crc), else report a "fork". - if self.last_commit.same_offset_as(&commit) { + } else if self.last_commit.same_offset_as(&commit) { if !self.last_commit.same_checksum_as(&commit) { warn!( "forked: commit={:?} last-error={:?} last-crc={:?}", @@ -579,7 +601,23 @@ impl Iterator for Commits { None => self.last_error.take().map(Err), Some(segment) => segment.map_or_else( |e| Some(Err(e.into())), - |segment| { + |mut segment| { + // Try to use offset index to advance segment to Intial commit + if let CommitInfo::Initial { next_offset } = self.last_commit { + let _ = self + .segments + .repo + .get_offset_index(segment.min_tx_offset) + .map_err(Into::into) + .and_then(|index_file| segment.seek_to_offset(&index_file, next_offset)) + .inspect_err(|e| { + warn!( + "commitlog offset index is not used: {}, at: segment {}", + e, segment.min_tx_offset + ); + }); + } + self.inner = Some(segment.commits()); self.next() }, @@ -671,9 +709,7 @@ mod tests { assert!(commit.min_tx_offset >= offset); } } - // Nb.: the head commit is always returned, - // because we don't know its offset upper bound - assert_eq!(1, log.commits_from(10).count()); + assert_eq!(0, log.commits_from(10).count()); } #[test] diff --git a/crates/commitlog/src/index/indexfile.rs b/crates/commitlog/src/index/indexfile.rs index 2eccfa1d56..2584173530 100644 --- a/crates/commitlog/src/index/indexfile.rs +++ b/crates/commitlog/src/index/indexfile.rs @@ -35,6 +35,54 @@ pub struct IndexFileMut + From> { } impl + From> IndexFileMut { + pub fn create_index_file(path: &Path, offset: u64, cap: u64) -> io::Result { + File::options() + .write(true) + .read(true) + .create_new(true) + .open(offset_index_file_path(path, offset)) + .and_then(|file| { + file.set_len(cap * ENTRY_SIZE as u64)?; + let mmap = unsafe { MmapMut::map_mut(&file) }?; + + Ok(IndexFileMut { + inner: mmap, + num_entries: 0, + _marker: PhantomData, + }) + }) + .or_else(|e| { + if e.kind() == io::ErrorKind::AlreadyExists { + debug!("Index file {} already exists", path.display()); + Self::open_index_file(path, offset, cap) + } else { + Err(e) + } + }) + } + + pub fn open_index_file(path: &Path, offset: u64, cap: u64) -> io::Result { + let file = File::options() + .read(true) + .write(true) + .open(offset_index_file_path(path, offset))?; + file.set_len(cap * ENTRY_SIZE as u64)?; + let mmap = unsafe { MmapMut::map_mut(&file)? }; + + let mut me = IndexFileMut { + inner: mmap, + num_entries: 0, + _marker: PhantomData, + }; + me.num_entries = me.num_entries().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + Ok(me) + } + + pub fn delete_index_file(path: &Path, offset: u64) -> io::Result<()> { + fs::remove_file(offset_index_file_path(path, offset)).map_err(Into::into) + } + // Searches for first 0-key, to count number of entries fn num_entries(&self) -> Result { for index in 0.. { @@ -189,60 +237,38 @@ impl + From> IndexFileMut { } } -pub fn create_index_file + From>( - path: &Path, - offset: u64, - cap: u64, -) -> io::Result> { - File::options() - .write(true) - .read(true) - .create_new(true) - .open(offset_index_file_path(path, offset)) - .and_then(|file| { - file.set_len(cap * ENTRY_SIZE as u64)?; - let mmap = unsafe { MmapMut::map_mut(&file) }?; - - Ok(IndexFileMut { - inner: mmap, - num_entries: 0, - _marker: PhantomData, - }) - }) - .or_else(|e| { - if e.kind() == io::ErrorKind::AlreadyExists { - debug!("Index file {} already exists", path.display()); - open_index_file(path, offset, cap) - } else { - Err(e) - } - }) +/// A wrapper over [`IndexFileMut`] to provide read-only access to the index file. +pub struct IndexFile + From> { + inner: IndexFileMut, + _marker: PhantomData, } -pub fn open_index_file + From>( - path: &Path, - offset: u64, - cap: u64, -) -> io::Result> { - let file = File::options() - .read(true) - .write(true) - .open(offset_index_file_path(path, offset))?; - file.set_len(cap * ENTRY_SIZE as u64)?; - let mmap = unsafe { MmapMut::map_mut(&file)? }; - - let mut me = IndexFileMut { - inner: mmap, - num_entries: 0, - _marker: PhantomData, - }; - - me.num_entries = me.num_entries().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - Ok(me) -} +impl + From> IndexFile { + pub fn open_index_file(path: &Path, offset: u64) -> io::Result { + let file = File::options() + .read(true) + .append(true) + .open(offset_index_file_path(path, offset))?; + let mmap = unsafe { MmapMut::map_mut(&file)? }; + + let mut inner = IndexFileMut { + inner: mmap, + num_entries: 0, + _marker: PhantomData, + }; + inner.num_entries = inner + .num_entries() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + Ok(Self { + inner, + _marker: PhantomData, + }) + } -pub fn delete_index_file(path: &Path, offset: u64) -> io::Result<()> { - fs::remove_file(offset_index_file_path(path, offset)).map_err(Into::into) + pub fn key_lookup(&self, key: Key) -> Result<(Key, u64), IndexError> { + self.inner.key_lookup(key) + } } #[cfg(test)] @@ -257,7 +283,7 @@ mod tests { let path = temp_dir.path().to_path_buf(); // Create an index file - let mut index_file: IndexFileMut = create_index_file(&path, 0, cap)?; + let mut index_file: IndexFileMut = IndexFileMut::create_index_file(&path, 0, cap)?; // Enter even number keys from 2 for i in 1..fill_till { @@ -341,7 +367,7 @@ mod tests { let path = temp_dir.path().to_path_buf(); // Create an index file - let mut index_file: IndexFileMut = create_index_file(&path, 0, 100)?; + let mut index_file: IndexFileMut = IndexFileMut::create_index_file(&path, 0, 100)?; for i in 1..10 { index_file.append(i * 2, i * 2 * 100)?; @@ -350,7 +376,7 @@ mod tests { assert_eq!(index_file.num_entries, 9); drop(index_file); - let open_index_file: IndexFileMut = open_index_file(&path, 0, 100)?; + let open_index_file: IndexFileMut = IndexFileMut::open_index_file(&path, 0, 100)?; assert_eq!(open_index_file.num_entries, 9); assert_eq!(open_index_file.key_lookup(6)?, (6, 600)); diff --git a/crates/commitlog/src/index/mod.rs b/crates/commitlog/src/index/mod.rs index e9163c3872..5469b8111d 100644 --- a/crates/commitlog/src/index/mod.rs +++ b/crates/commitlog/src/index/mod.rs @@ -3,10 +3,8 @@ use std::io; use thiserror::Error; mod indexfile; -pub use indexfile::create_index_file; -pub use indexfile::delete_index_file; pub use indexfile::offset_index_file_path; -pub use indexfile::IndexFileMut; +pub use indexfile::{IndexFile, IndexFileMut}; #[derive(Error, Debug)] pub enum IndexError { diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 9bd9ca1874..9430e4c624 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -6,9 +6,9 @@ use std::{ use log::{debug, warn}; -use crate::index::{create_index_file, delete_index_file, offset_index_file_path}; +use crate::index::offset_index_file_path; -use super::{Repo, TxOffset, TxOffsetIndex}; +use super::{Repo, TxOffset, TxOffsetIndex, TxOffsetIndexMut}; const SEGMENT_FILE_EXT: &str = ".stdb.log"; @@ -128,11 +128,15 @@ impl Repo for Fs { Ok(segments) } - fn get_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result { - create_index_file(&self.root, offset, cap) + fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result { + TxOffsetIndexMut::create_index_file(&self.root, offset, cap) } fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> { - delete_index_file(&self.root, offset) + TxOffsetIndexMut::delete_index_file(&self.root, offset) + } + + fn get_offset_index(&self, offset: TxOffset) -> io::Result { + TxOffsetIndex::open_index_file(&self.root, offset) } } diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index b8d0ae5990..e91eb3cb8f 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -5,7 +5,7 @@ use log::{debug, warn}; use crate::{ commit::Commit, error, - index::IndexFileMut, + index::{IndexFile, IndexFileMut}, segment::{FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer}, Options, }; @@ -19,7 +19,8 @@ pub use fs::Fs; pub use mem::Memory; pub type TxOffset = u64; -pub type TxOffsetIndex = IndexFileMut; +pub type TxOffsetIndexMut = IndexFileMut; +pub type TxOffsetIndex = IndexFile; /// A repository of log segments. /// @@ -27,7 +28,7 @@ pub type TxOffsetIndex = IndexFileMut; /// representation. pub trait Repo: Clone { /// The type of log segments managed by this repo, which must behave like a file. - type Segment: io::Read + io::Write + FileLike; + type Segment: io::Read + io::Write + FileLike + io::Seek; /// Create a new segment with the minimum transaction offset `offset`. /// @@ -58,24 +59,29 @@ pub trait Repo: Clone { /// offsets, sorted in ascending order. fn existing_offsets(&self) -> io::Result>; - /// Create or get an existing `TxOffsetIndex` for the given `offset`. + /// Create [`TxOffsetIndexMut`] for the given `offset` or open it if already exist. /// The `cap` parameter is the maximum number of entries in the index. - fn get_offset_index(&self, _offset: TxOffset, _cap: u64) -> io::Result { + fn create_offset_index(&self, _offset: TxOffset, _cap: u64) -> io::Result { Err(io::Error::new(io::ErrorKind::Other, "not implemented")) } - /// Remove `TxOffsetIndex` named with `offset`. + /// Remove [`TxOffsetIndexMut`] named with `offset`. fn remove_offset_index(&self, _offset: TxOffset) -> io::Result<()> { Err(io::Error::new(io::ErrorKind::Other, "not implemented")) } + + /// Get [`TxOffsetIndex`] for the given `offset`. + fn get_offset_index(&self, _offset: TxOffset) -> io::Result { + Err(io::Error::new(io::ErrorKind::Other, "not implemented")) + } } fn offset_index_len(opts: Options) -> u64 { opts.max_segment_size / opts.offset_index_interval_bytes } -fn get_offset_index_writer(repo: &R, offset: u64, opts: Options) -> Option { - repo.get_offset_index(offset, offset_index_len(opts)) +fn create_offset_index_writer(repo: &R, offset: u64, opts: Options) -> Option { + repo.create_offset_index(offset, offset_index_len(opts)) .map(|index| OffsetIndexWriter::new(index, opts)) .map_err(|e| { warn!("failed to get offset index for segment {offset}: {e}"); @@ -111,7 +117,7 @@ pub fn create_segment_writer(repo: &R, opts: Options, offset: u64) -> i max_records_in_commit: opts.max_records_in_commit, - offset_index_head: get_offset_index_writer(repo, offset, opts), + offset_index_head: create_offset_index_writer(repo, offset, opts), }) } @@ -166,7 +172,7 @@ pub fn resume_segment_writer( max_records_in_commit: opts.max_records_in_commit, - offset_index_head: get_offset_index_writer(repo, offset, opts), + offset_index_head: create_offset_index_writer(repo, offset, opts), })) } diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index c143983d86..5c6202d98c 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -1,6 +1,6 @@ use std::{ fs::File, - io::{self, BufWriter, Write as _}, + io::{self, BufWriter, ErrorKind, SeekFrom, Write as _}, num::{NonZeroU16, NonZeroU64}, ops::Range, }; @@ -12,7 +12,7 @@ use crate::{ error, index::IndexError, payload::Encode, - repo::{TxOffset, TxOffsetIndex}, + repo::{TxOffset, TxOffsetIndex, TxOffsetIndexMut}, Options, }; @@ -223,7 +223,7 @@ impl FileLike for Writer { #[derive(Debug)] pub struct OffsetIndexWriter { - pub(crate) head: TxOffsetIndex, + pub(crate) head: TxOffsetIndexMut, require_segment_fsync: bool, min_write_interval: NonZeroU64, @@ -234,7 +234,7 @@ pub struct OffsetIndexWriter { } impl OffsetIndexWriter { - pub fn new(head: TxOffsetIndex, opts: Options) -> Self { + pub fn new(head: TxOffsetIndexMut, opts: Options) -> Self { OffsetIndexWriter { head, require_segment_fsync: opts.offset_index_require_segment_fsync, @@ -306,6 +306,7 @@ impl FileLike for OffsetIndexWriter { Ok(()) } } + #[derive(Debug)] pub struct Reader { pub header: Header, @@ -313,7 +314,7 @@ pub struct Reader { inner: R, } -impl Reader { +impl Reader { pub fn new(max_log_format_version: u8, min_tx_offset: u64, mut inner: R) -> io::Result { let header = Header::decode(&mut inner)?; header @@ -328,7 +329,7 @@ impl Reader { } } -impl Reader { +impl Reader { pub fn commits(self) -> Commits { Commits { header: self.header, @@ -336,6 +337,10 @@ impl Reader { } } + pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<(), IndexError> { + seek_to_offset(&mut self.inner, index_file, start_tx_offset) + } + #[cfg(test)] pub fn transactions<'a, D>(self, de: &'a D) -> impl Iterator, D::Error>> + 'a where @@ -359,6 +364,55 @@ impl Reader { } } +/// Advances the `segment` reader to the position corresponding to the `start_tx_offset` +/// using the `index_file` for efficient seeking. +/// +/// Input: +/// - `segment` - segment reader +/// - `min_tx_offset` - minimum transaction offset in the segment +/// - `start_tx_offset` - transaction offset to advance to +pub fn seek_to_offset( + mut segment: &mut R, + index_file: &TxOffsetIndex, + start_tx_offset: u64, +) -> Result<(), IndexError> { + let (index_key, byte_offset) = index_file.key_lookup(start_tx_offset)?; + debug!("index lookup for key={start_tx_offset}: found key={index_key} at byte-offset={byte_offset}"); + // returned `index_key` should never be greater than `start_tx_offset` + debug_assert!(index_key <= start_tx_offset); + + // Check if the offset index is pointing to the right commit. + validate_commit_header(&mut segment, byte_offset).map(|hdr| { + if hdr.min_tx_offset == index_key { + // Advance the segment Seek if expected commit is found. + segment + .seek(SeekFrom::Start(byte_offset)) + .map(|_| ()) + .map_err(Into::into) + } else { + Err(io::Error::new(io::ErrorKind::InvalidData, "mismatch key in index offset file").into()) + } + })? +} + +/// Try to extract the commit header from the asked position without advancing seek. +/// `IndexFileMut` fsync asynchoronously, which makes it important for reader to verify its entry +pub fn validate_commit_header( + mut reader: &mut Reader, + byte_offset: u64, +) -> io::Result { + let pos = reader.stream_position()?; + reader.seek(SeekFrom::Start(byte_offset))?; + + let hdr = commit::Header::decode(&mut reader) + .and_then(|hdr| hdr.ok_or_else(|| io::Error::new(ErrorKind::UnexpectedEof, "unexpected EOF"))); + + // Restore the original position + reader.seek(SeekFrom::Start(pos))?; + + hdr +} + /// Pair of transaction offset and payload. /// /// Created by iterators which "flatten" commits into individual transaction