Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record perf contexts to the write procedure #227

Merged
merged 6 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

* Unconditionally tolerate `fallocate` failures as a fix to its portability issue. Errors other than `EOPNOTSUPP` will still emit a warning.

### New Features

* Add `PerfContext` which records detailed time breakdown of the write process to thread-local storage.

### Public API Changes

* Add `is_empty` to `Engine` API.
Expand Down
77 changes: 61 additions & 16 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ use crate::consistency::ConsistencyChecker;
use crate::env::{DefaultFileSystem, FileSystem};
use crate::event_listener::EventListener;
use crate::file_pipe_log::debug::LogItemReader;
use crate::file_pipe_log::{
DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder, RecoveryConfig,
};
use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder};
use crate::log_batch::{Command, LogBatch, MessageExt};
use crate::memtable::{EntryIndex, MemTableRecoverContextFactory, MemTables};
use crate::metrics::*;
use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog};
use crate::purge::{PurgeHook, PurgeManager};
use crate::write_barrier::{WriteBarrier, Writer};
use crate::{Error, GlobalStats, Result};
use crate::{perf_context, Error, GlobalStats, Result};

const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -141,15 +139,16 @@ where
let start = Instant::now();
let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
let block_handle = {
let mut writer = Writer::new(log_batch, sync, start);
let mut writer = Writer::new(log_batch, sync);
// Snapshot and clear the current perf context temporarily, so the write group
// leader will collect the perf context diff later.
let mut perf_context = take_perf_context();
let before_enter = Instant::now();
if let Some(mut group) = self.write_barrier.enter(&mut writer) {
let now = Instant::now();
let _t = StopWatch::new_with(&ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
let _t = StopWatch::new_with(&*ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
for writer in group.iter_mut() {
ENGINE_WRITE_PREPROCESS_DURATION_HISTOGRAM.observe(
now.saturating_duration_since(writer.start_time)
.as_secs_f64(),
);
writer.entered_time = Some(now);
sync |= writer.sync;
let log_batch = writer.get_payload();
let res = if !log_batch.is_empty() {
Expand All @@ -166,14 +165,28 @@ where
};
writer.set_output(res);
}
perf_context!(log_write_duration).observe_since(now);
if let Err(e) = self.pipe_log.maybe_sync(LogQueue::Append, sync) {
panic!(
"Cannot sync {:?} queue due to IO error: {}",
LogQueue::Append,
e
);
}
// Pass the perf context diff to all the writers.
let diff = get_perf_context();
for writer in group.iter_mut() {
writer.perf_context_diff = diff.clone();
}
}
let entered_time = writer.entered_time.unwrap();
ENGINE_WRITE_PREPROCESS_DURATION_HISTOGRAM
.observe(entered_time.saturating_duration_since(start).as_secs_f64());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug assert perf_context_diff.write_wait_duration == 0.

perf_context.write_wait_duration +=
entered_time.saturating_duration_since(before_enter);
debug_assert_eq!(writer.perf_context_diff.write_wait_duration, Duration::ZERO);
perf_context += &writer.perf_context_diff;
set_perf_context(perf_context);
writer.finish()?
};

Expand All @@ -185,8 +198,9 @@ where
listener.post_apply_memtables(block_handle.id);
}
let end = Instant::now();
ENGINE_WRITE_APPLY_DURATION_HISTOGRAM
.observe(end.saturating_duration_since(now).as_secs_f64());
let apply_duration = end.saturating_duration_since(now);
ENGINE_WRITE_APPLY_DURATION_HISTOGRAM.observe(apply_duration.as_secs_f64());
perf_context!(apply_duration).observe(apply_duration);
now = end;
}
ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
Expand All @@ -201,7 +215,7 @@ where
}

pub fn get_message<S: Message>(&self, region_id: u64, key: &[u8]) -> Result<Option<S>> {
let _t = StopWatch::new(&ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
if let Some(value) = memtable.read().get(key) {
return Ok(Some(parse_from_bytes(&value)?));
Expand All @@ -215,7 +229,7 @@ where
region_id: u64,
log_idx: u64,
) -> Result<Option<M::Entry>> {
let _t = StopWatch::new(&ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
if let Some(idx) = memtable.read().get_entry(log_idx) {
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(1.0);
Expand Down Expand Up @@ -243,7 +257,7 @@ where
max_size: Option<usize>,
vec: &mut Vec<M::Entry>,
) -> Result<usize> {
let _t = StopWatch::new(&ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity((end - begin) as usize);
memtable
Expand Down Expand Up @@ -382,7 +396,7 @@ where
script: String,
file_system: Arc<F>,
) -> Result<()> {
use crate::file_pipe_log::ReplayMachine;
use crate::file_pipe_log::{RecoveryConfig, ReplayMachine};

if !path.exists() {
return Err(Error::InvalidArgument(format!(
Expand Down Expand Up @@ -1771,4 +1785,35 @@ mod tests {
&start
);
}

#[test]
fn test_simple_write_perf_context() {
let dir = tempfile::Builder::new()
.prefix("test_simple_write_perf_context")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let rid = 1;
let entry_size = 5120;
let engine = RaftLogEngine::open(cfg).unwrap();
let data = vec![b'x'; entry_size];
let old_perf_context = get_perf_context();
engine.append(rid, 1, 5, Some(&data));
let new_perf_context = get_perf_context();
assert_ne!(
old_perf_context.log_populating_duration,
new_perf_context.log_populating_duration
);
assert_ne!(
old_perf_context.log_write_duration,
new_perf_context.log_write_duration
);
assert_ne!(
old_perf_context.apply_duration,
new_perf_context.apply_duration
);
}
}
4 changes: 2 additions & 2 deletions src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl<F: FileSystem> LogFileWriter<F> {
pub fn write(&mut self, buf: &[u8], target_size_hint: usize) -> Result<()> {
let new_written = self.written + buf.len();
if self.capacity < new_written {
let _t = StopWatch::new(&LOG_ALLOCATE_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*LOG_ALLOCATE_DURATION_HISTOGRAM);
let alloc = std::cmp::max(
new_written - self.capacity,
std::cmp::min(
Expand All @@ -111,7 +111,7 @@ impl<F: FileSystem> LogFileWriter<F> {

pub fn sync(&mut self) -> Result<()> {
if self.last_sync < self.written {
let _t = StopWatch::new(&LOG_SYNC_DURATION_HISTOGRAM);
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
self.writer.sync()?;
self.last_sync = self.written;
}
Expand Down
8 changes: 6 additions & 2 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::env::FileSystem;
use crate::event_listener::EventListener;
use crate::metrics::*;
use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue, PipeLog};
use crate::{Error, Result};
use crate::{perf_context, Error, Result};

use super::format::{FileNameExt, Version};
use super::log_file::{build_file_reader, build_file_writer, FileHandler, LogFileWriter};
Expand Down Expand Up @@ -154,7 +154,10 @@ impl<F: FileSystem> SinglePipe<F> {
///
/// This operation is atomic in face of errors.
fn rotate_imp(&self, active_file: &mut MutexGuard<ActiveFile<F>>) -> Result<()> {
let _t = StopWatch::new(&LOG_ROTATE_DURATION_HISTOGRAM);
let _t = StopWatch::new((
&*LOG_ROTATE_DURATION_HISTOGRAM,
perf_context!(log_rotate_duration),
));
let seq = active_file.seq + 1;
debug_assert!(seq > 1);

Expand Down Expand Up @@ -258,6 +261,7 @@ impl<F: FileSystem> SinglePipe<F> {
panic!("error when rotate [{:?}:{}]: {}", self.queue, seq, e);
}
} else if writer.since_last_sync() >= self.bytes_per_sync || force {
let _t = StopWatch::new(perf_context!(log_sync_duration));
if let Err(e) = writer.sync() {
panic!("error when sync [{:?}:{}]: {}", self.queue, seq, e,);
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub use config::{Config, RecoveryMode};
pub use engine::Engine;
pub use errors::{Error, Result};
pub use log_batch::{Command, LogBatch, MessageExt};
pub use metrics::{get_perf_context, set_perf_context, take_perf_context, PerfContext};
pub use util::ReadableSize;

#[cfg(feature = "internals")]
Expand Down
4 changes: 3 additions & 1 deletion src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use protobuf::Message;
use crate::codec::{self, NumberEncoder};
use crate::file_pipe_log::Version;
use crate::memtable::EntryIndex;
use crate::metrics::StopWatch;
use crate::pipe_log::{FileBlockHandle, FileId};
use crate::util::{crc32, lz4};
use crate::{Error, Result};
use crate::{perf_context, Error, Result};

pub(crate) const LOG_BATCH_HEADER_LEN: usize = 16;
pub(crate) const LOG_BATCH_CHECKSUM_LEN: usize = 4;
Expand Down Expand Up @@ -697,6 +698,7 @@ impl LogBatch {
/// Internally, encodes and optionally compresses log entries. Sets the
/// compression type to each entry index.
pub(crate) fn finish_populate(&mut self, compression_threshold: usize) -> Result<usize> {
let _t = StopWatch::new(perf_context!(log_populating_duration));
debug_assert!(self.buf_state == BufState::Open);
if self.is_empty() {
self.buf_state = BufState::Sealed(self.buf.len(), 0);
Expand Down
Loading