Skip to content

Commit

Permalink
respect format version during data read write (#221)
Browse files Browse the repository at this point in the history
* Make VERSION significant

Ref #220

This implementation is used for building the basic structure of binding the LogBatch `decoding` and `encoding` with `VERSION`.
Briefly, it introduces the trait `Header` for `LogFileHeader`, and make relevant implementations in `LogFileWriter` and `LogFileReader`.

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Replace the return of `build_file_header` with `Result<>`

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Reconstruct the arrangement of `FileCollection` in `pipe.rs`

This commit has reconstructed the handles in `FileCollection`, replaced with `FileHandler` which comprised of `F::Handle` and `Version`. This implementation is meaningful both for binding the `Version` to each file and reducing unnecessary `LogFileHeader` loading in several `READ` cases.

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Tidy the supplementary annotations.

According to the guidline of standard `fmt --check` tool, supplementary annotations are formatted in this commit.

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Fix the unexpected `double-ref` error.

The `double-ref` error in the processing of accessing the last one of `[FileHandler]` container was fixed.

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Refine the implementation on `LogFileReader` according to comments.

This commit has done the several modifications:
>* Refine the implementation on `LogFileReader`
>* Supplement extra uts for `LogFileReader`

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Minor modifications for compatibility.

This commit includes several minor modifications and codes clean for compatibility.

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Redesign the implementation on `recover_queue` for better performance and compatibility.

Considering the compatibility to `VERSION` and good performance on `recovery`, the implementation on `recover_queue` is modified here.
And compared with benchmark on `master`, the result of benchmark on current PR shows no performance regression.
Configuration of my local environment for benchmark was listed as following:
> * OS: MacOS - Monterey 12.1
> * Mem: 16GB
> * CPU: Apple M1 Pro
> * DISK: NVME SSD - APPLE SSD AP0512R

By `cargo bench -- --bench benches`, the performance between `master` branch and current PR could be reviewed with the following details:
>* `Master`: 108.99ms on `default` workload, 122.41ms on `compressed` workload, 994ms on `mini-batch(1kb)` workload, 2.2368s on `10GB` workload.
>* current PR: 111.63ms on `default` workload, 116.35ms on `compressed` workload, 921.98ms on `mini-batch(1kb)` workload, 2.0905s on `10GB` workload.

All the results are generated by `bench` on a same dataset, and the minor fluctuation on performance resulted by other processes.

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Remove unnecessary codes and make the implementation related to `VERSION` more coherent.

This commit has done the several works:
>* Re-design the implementation on `recover_queue` to make it more coherent.
>* Code clean-up on struct functions.

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Minor modifications for better cohesion

This commit includes:
>* Redesgin the implementation on the building of `LogFileHeader`, marked as a `method` for `LogFileReader`;
>* Modify the code-style in `pipe_builder.rs`;
>* Replace the public `recover_queue`, relied on the calling of `get_mut_file_list`, with coherent implementation.

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Code-style clean-up and supplements on testing.

This commit do these things:
>* Codes clean-up for better consistency on code-style;
>* Supplement necessary testcases for meeting the standard on `codecov/path`;

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Codes clean-up.

This commit does the followings:
>* Remove stale annoatations;
>* Integrate the code style of relevant codes in `pipe_builder.rs` and `log_file.rs`;

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Codes clean-up in log_file.rs.

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

* Codes clean-up in log_file.rs.

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

Co-authored-by: Lucasliang <yangkexin.liang@pingcap.com>
Co-authored-by: Xinye Tao <xy.tao@outlook.com>
  • Loading branch information
3 people authored Jun 1, 2022
1 parent e15b9ec commit 812a8c0
Show file tree
Hide file tree
Showing 11 changed files with 397 additions and 119 deletions.
21 changes: 21 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use log::warn;
use serde::{Deserialize, Serialize};

use crate::file_pipe_log::Version;
use crate::{util::ReadableSize, Result};

const MIN_RECOVERY_READ_BLOCK_SIZE: usize = 512;
Expand Down Expand Up @@ -53,6 +54,12 @@ pub struct Config {
///
/// Default: "4MB"
pub bytes_per_sync: ReadableSize,

/// Version of the log file.
///
/// Default: 1
pub format_version: u64,

/// Target file size for rotating log files.
///
/// Default: "128MB"
Expand Down Expand Up @@ -88,6 +95,7 @@ impl Default for Config {
recovery_threads: 4,
batch_compression_threshold: ReadableSize::kb(8),
bytes_per_sync: ReadableSize::mb(4),
format_version: 1, // 1 => Version::V1
target_file_size: ReadableSize::mb(128),
purge_threshold: ReadableSize::gb(10),
purge_rewrite_threshold: None,
Expand Down Expand Up @@ -132,6 +140,14 @@ impl Config {
);
self.recovery_threads = MIN_RECOVERY_THREADS;
}
if !Version::is_valid(self.format_version) {
warn!(
"format-version ({}) is invalid, setting it to {}",
self.format_version,
Version::default() as u64
);
self.format_version = Version::default() as u64;
}
#[cfg(not(feature = "swap"))]
if self.memory_limit.is_some() {
warn!("memory-limit will be ignored because swap feature is not enabled");
Expand Down Expand Up @@ -160,13 +176,16 @@ mod tests {
bytes-per-sync = "2KB"
target-file-size = "1MB"
purge-threshold = "3MB"
format-version = 11
"#;
let load: Config = toml::from_str(custom).unwrap();
assert_eq!(load.dir, "custom_dir");
assert_eq!(load.recovery_mode, RecoveryMode::TolerateTailCorruption);
assert_eq!(load.bytes_per_sync, ReadableSize::kb(2));
assert_eq!(load.target_file_size, ReadableSize::mb(1));
assert_eq!(load.purge_threshold, ReadableSize::mb(3));
assert_eq!(load.format_version, 11_u64);
assert!(!Version::is_valid(load.format_version));
}

#[test]
Expand All @@ -183,6 +202,7 @@ mod tests {
recovery-threads = 0
bytes-per-sync = "0KB"
target-file-size = "5000MB"
format-version = 20
"#;
let soft_load: Config = toml::from_str(soft_error).unwrap();
let mut soft_sanitized = soft_load;
Expand All @@ -194,6 +214,7 @@ mod tests {
soft_sanitized.purge_rewrite_threshold.unwrap(),
soft_sanitized.target_file_size
);
assert_eq!(soft_sanitized.format_version, Version::default() as u64);
}

#[test]
Expand Down
30 changes: 26 additions & 4 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use crate::consistency::ConsistencyChecker;
use crate::env::{DefaultFileSystem, FileSystem};
use crate::event_listener::EventListener;
use crate::file_pipe_log::debug::LogItemReader;
use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder};
use crate::file_pipe_log::{
DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder, RecoveryConfig,
};
use crate::log_batch::{Command, LogBatch, MessageExt};
use crate::memtable::{EntryIndex, MemTableRecoverContextFactory, MemTables};
use crate::metrics::*;
Expand Down Expand Up @@ -151,6 +153,7 @@ where
sync |= writer.sync;
let log_batch = writer.get_payload();
let res = if !log_batch.is_empty() {
// @TODO(lucasliang): bind `Version` to each `LogBatch`
self.pipe_log
.append(LogQueue::Append, log_batch.encoded_bytes())
} else {
Expand Down Expand Up @@ -387,16 +390,35 @@ where
recovery_mode: RecoveryMode::TolerateAnyCorruption,
..Default::default()
};

let recovery_mode = cfg.recovery_mode;
let read_block_size = cfg.recovery_read_block_size.0;
let mut builder = FilePipeLogBuilder::new(cfg, file_system.clone(), Vec::new());
builder.scan()?;
let factory = crate::filter::RhaiFilterMachineFactory::from_script(script);
let mut machine = None;
if queue.is_none() || queue.unwrap() == LogQueue::Append {
machine = Some(builder.recover_queue(LogQueue::Append, &factory, 1)?);
machine = Some(builder.recover_queue(
file_system.clone(),
RecoveryConfig {
queue: LogQueue::Append,
mode: recovery_mode,
concurrency: 1,
read_block_size,
},
&factory,
)?);
}
if queue.is_none() || queue.unwrap() == LogQueue::Rewrite {
let machine2 = builder.recover_queue(LogQueue::Rewrite, &factory, 1)?;
let machine2 = builder.recover_queue(
file_system.clone(),
RecoveryConfig {
queue: LogQueue::Rewrite,
mode: recovery_mode,
concurrency: 1,
read_block_size,
},
&factory,
)?;
if let Some(machine) = &mut machine {
machine.merge(machine2, LogQueue::Rewrite)?;
}
Expand Down
3 changes: 3 additions & 0 deletions src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ impl WriteExt for ObfuscatedWriter {
}
}

/// `[ObfuscatedFileSystem]` is a special implementation of `[FileSystem]`,
/// which is used for constructing and simulating an abnormal file system for
/// `[Read]` and `[Write]`.
pub struct ObfuscatedFileSystem(DefaultFileSystem);

impl Default for ObfuscatedFileSystem {
Expand Down
57 changes: 45 additions & 12 deletions src/file_pipe_log/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,33 +80,46 @@ pub(super) fn lock_file_path<P: AsRef<Path>>(dir: P) -> PathBuf {
}

/// Version of log file format.
#[derive(Clone, Copy, FromPrimitive, ToPrimitive)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, FromPrimitive, ToPrimitive)]
#[repr(u64)]
enum Version {
pub enum Version {
V1 = 1,
}

/// In-memory representation of the log file header.
pub(super) struct LogFileHeader {
version: Version,
impl Version {
pub fn is_valid(version: u64) -> bool {
Version::from_u64(version).is_some()
}
}

impl Default for LogFileHeader {
impl Default for Version {
fn default() -> Self {
Self {
version: Version::V1,
}
Version::V1
}
}

impl LogFileHeader {
/// In-memory representation of `Format` in log files.
#[derive(Clone, Default)]
pub struct LogFileFormat {
version: Version,
}

impl LogFileFormat {
/// Length of header written on storage.
pub const fn len() -> usize {
LOG_FILE_MAGIC_HEADER.len() + std::mem::size_of::<Version>()
}

/// Decodes a slice of bytes into a `LogFileHeader`.
pub fn decode(buf: &mut &[u8]) -> Result<LogFileHeader> {
pub fn from_version(version: Version) -> Self {
Self { version }
}

pub fn version(&self) -> Version {
self.version
}

/// Decodes a slice of bytes into a `LogFileFormat`.
pub fn decode(buf: &mut &[u8]) -> Result<LogFileFormat> {
if buf.len() < Self::len() {
return Err(Error::Corruption("log file header too short".to_owned()));
}
Expand Down Expand Up @@ -169,4 +182,24 @@ mod tests {
assert!(FileId::parse_file_name(case).is_none());
}
}

#[test]
fn test_version() {
let version = Version::default();
assert_eq!(Version::V1.to_u64().unwrap(), version.to_u64().unwrap());
let version2 = Version::from_u64(1).unwrap();
assert_eq!(version, version2);
assert!(Version::is_valid(1));
assert!(!Version::is_valid(2));
}

#[test]
fn test_file_header() {
let header1 = LogFileFormat::default();
assert_eq!(header1.version().to_u64().unwrap(), 1);
let header2 = LogFileFormat::from_version(Version::default());
assert_eq!(header2.version().to_u64(), header1.version().to_u64());
let header3 = LogFileFormat::from_version(Version::default());
assert_eq!(header3.version(), header1.version());
}
}
95 changes: 81 additions & 14 deletions src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,53 @@ use crate::metrics::*;
use crate::pipe_log::FileBlockHandle;
use crate::{Error, Result};

use super::format::LogFileHeader;
use super::format::{LogFileFormat, Version};
use crate::env::{FileSystem, Handle, WriteExt};

/// Maximum number of bytes to allocate ahead.
const FILE_ALLOCATE_SIZE: usize = 2 * 1024 * 1024;

/// Combination of `[Handle]` and `[Version]`, specifying a handler of a file.
#[derive(Debug)]
pub struct FileHandler<F: FileSystem> {
pub handle: Arc<F::Handle>,
pub version: Version,
}

/// Build a file writer.
///
/// * `[handle]`: standard handle of a log file.
/// * `[version]`: format version of the log file.
pub(super) fn build_file_writer<F: FileSystem>(
system: &F,
handle: Arc<F::Handle>,
version: Version,
) -> Result<LogFileWriter<F>> {
let writer = system.new_writer(handle.clone())?;
LogFileWriter::open(handle, writer)
LogFileWriter::open(handle, writer, version)
}

/// Append-only writer for log file.
pub struct LogFileWriter<F: FileSystem> {
/// header of file
pub header: LogFileFormat,
writer: F::Writer,
written: usize,
capacity: usize,
last_sync: usize,
}

impl<F: FileSystem> LogFileWriter<F> {
fn open(handle: Arc<F::Handle>, writer: F::Writer) -> Result<Self> {
fn open(handle: Arc<F::Handle>, writer: F::Writer, version: Version) -> Result<Self> {
let file_size = handle.file_size()?;
let mut f = Self {
header: LogFileFormat::from_version(version),
writer,
written: file_size,
capacity: file_size,
last_sync: file_size,
};
if file_size < LogFileHeader::len() {
if file_size < LogFileFormat::len() {
f.write_header()?;
} else {
f.writer.seek(SeekFrom::Start(file_size as u64))?;
Expand All @@ -52,8 +67,8 @@ impl<F: FileSystem> LogFileWriter<F> {
self.writer.seek(SeekFrom::Start(0))?;
self.last_sync = 0;
self.written = 0;
let mut buf = Vec::with_capacity(LogFileHeader::len());
LogFileHeader::default().encode(&mut buf)?;
let mut buf = Vec::with_capacity(LogFileFormat::len());
self.header.encode(&mut buf)?;
self.write(&buf, 0)
}

Expand Down Expand Up @@ -110,30 +125,54 @@ impl<F: FileSystem> LogFileWriter<F> {
}
}

/// Build a file reader.
///
/// Attention please, the reader do not need a specified `[LogFileFormat]` from
/// users.
///
/// * `[handle]`: standard handle of a log file.
/// * `[version]`: if `[None]`, reloads the log file header and parse
/// the relevant `Version` before building the `reader`.
pub(super) fn build_file_reader<F: FileSystem>(
system: &F,
handle: Arc<F::Handle>,
version: Option<Version>,
) -> Result<LogFileReader<F>> {
let reader = system.new_reader(handle.clone())?;
LogFileReader::open(handle, reader)
LogFileReader::open(handle, reader, version)
}

/// Random-access reader for log file.
pub struct LogFileReader<F: FileSystem> {
format: LogFileFormat,
handle: Arc<F::Handle>,
reader: F::Reader,

offset: u64,
}

impl<F: FileSystem> LogFileReader<F> {
fn open(handle: Arc<F::Handle>, reader: F::Reader) -> Result<Self> {
Ok(Self {
handle,
reader,
// Set to an invalid offset to force a reseek at first read.
offset: u64::MAX,
})
fn open(handle: Arc<F::Handle>, reader: F::Reader, version: Option<Version>) -> Result<Self> {
match version {
Some(ver) => Ok(Self {
format: LogFileFormat::from_version(ver),
handle,
reader,
// Set to an invalid offset to force a reseek at first read.
offset: u64::MAX,
}),
None => {
let mut reader = Self {
format: LogFileFormat::from_version(Version::default()),
handle,
reader,
// Set to an invalid offset to force a reseek at first read.
offset: u64::MAX,
};
reader.parse_format()?;
Ok(reader)
}
}
}

pub fn read(&mut self, handle: FileBlockHandle) -> Result<Vec<u8>> {
Expand Down Expand Up @@ -164,8 +203,36 @@ impl<F: FileSystem> LogFileReader<F> {
Ok((self.offset - offset) as usize)
}

/// Function for reading the header of the log file, and return a
/// `[LogFileFormat]`.
///
/// Attention please, this function would move the `reader.offset`
/// to `0`, that is, the beginning of the file, to parse the
/// related `[LogFileFormat]`.
pub fn parse_format(&mut self) -> Result<LogFileFormat> {
// Here, the caller expected that the given `handle` has pointed to
// a log file with valid format. Otherwise, it should return with
// `Err`.
let file_size = self.handle.file_size()?;
// [1] If the length lessed than the standard `LogFileFormat::len()`.
let header_len = LogFileFormat::len();
if file_size < header_len {
return Err(Error::Corruption("Invalid header of LogFile!".to_owned()));
}
// [2] Parse the header of the file.
let mut container = vec![0; header_len];
self.read_to(0, &mut container[..])?;
self.format = LogFileFormat::decode(&mut container.as_slice())?;
Ok(self.format.clone())
}

#[inline]
pub fn file_size(&self) -> Result<usize> {
Ok(self.handle.file_size()?)
}

#[inline]
pub fn file_format(&self) -> &LogFileFormat {
&self.format
}
}
Loading

0 comments on commit 812a8c0

Please sign in to comment.