Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf committed Jun 23, 2022
1 parent 405eb43 commit 10bfa06
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
19 changes: 10 additions & 9 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ use crate::consistency::ConsistencyChecker;
use crate::env::{DefaultFileSystem, FileSystem};
use crate::event_listener::EventListener;
use crate::file_pipe_log::debug::LogItemReader;
use crate::file_pipe_log::{
DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder, RecoveryConfig,
};
use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder};
use crate::log_batch::{Command, LogBatch, MessageExt};
use crate::memtable::{EntryIndex, MemTableRecoverContextFactory, MemTables};
use crate::metrics::*;
Expand Down Expand Up @@ -141,18 +139,16 @@ where
let start = Instant::now();
let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
let block_handle = {
let mut writer = Writer::new(log_batch, sync, start);
let mut writer = Writer::new(log_batch, sync);
// Snapshot and clear the current perf context temporarily, so the write group
// leader will collect the perf context diff later.
let mut perf_context = take_perf_context();
let before_enter = Instant::now();
if let Some(mut group) = self.write_barrier.enter(&mut writer) {
let now = Instant::now();
let _t = StopWatch::new_with(&*ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
for writer in group.iter_mut() {
ENGINE_WRITE_PREPROCESS_DURATION_HISTOGRAM.observe(
now.saturating_duration_since(writer.start_time)
.as_secs_f64(),
);
writer.entered_time = Some(now);
sync |= writer.sync;
let log_batch = writer.get_payload();
let res = if !log_batch.is_empty() {
Expand Down Expand Up @@ -183,6 +179,11 @@ where
writer.perf_context_diff = diff.clone();
}
}
let entered_time = writer.entered_time.unwrap();
ENGINE_WRITE_PREPROCESS_DURATION_HISTOGRAM
.observe(entered_time.saturating_duration_since(start).as_secs_f64());
perf_context.write_leader_wait_duration =
entered_time.saturating_duration_since(before_enter);
perf_context += &writer.perf_context_diff;
set_perf_context(perf_context);
writer.finish()?
Expand Down Expand Up @@ -394,7 +395,7 @@ where
script: String,
file_system: Arc<F>,
) -> Result<()> {
use crate::file_pipe_log::ReplayMachine;
use crate::file_pipe_log::{RecoveryConfig, ReplayMachine};

if !path.exists() {
return Err(Error::InvalidArgument(format!(
Expand Down
16 changes: 7 additions & 9 deletions src/write_barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::time::Instant;
use fail::fail_point;
use parking_lot::{Condvar, Mutex};

use crate::{metrics::TimeMetric, perf_context, PerfContext};
use crate::PerfContext;

type Ptr<T> = Option<NonNull<T>>;

Expand All @@ -24,7 +24,7 @@ pub struct Writer<P, O> {
output: Option<O>,

pub(crate) sync: bool,
pub(crate) start_time: Instant,
pub(crate) entered_time: Option<Instant>,
pub(crate) perf_context_diff: PerfContext,
}

Expand All @@ -35,13 +35,13 @@ impl<P, O> Writer<P, O> {
///
/// Data pointed by `payload` is mutably referenced by this writer. Do not
/// access the payload by its original name during this writer's lifetime.
pub fn new(payload: &mut P, sync: bool, start_time: Instant) -> Self {
pub fn new(payload: &mut P, sync: bool) -> Self {
Writer {
next: Cell::new(None),
payload: payload as *mut _,
output: None,
sync,
start_time,
entered_time: None,
perf_context_diff: PerfContext::default(),
}
}
Expand Down Expand Up @@ -169,7 +169,6 @@ impl<P, O> WriteBarrier<P, O> {
/// the leader of a set of writers, returns a [`WriteGroup`] that contains
/// them, `writer` included.
pub fn enter<'a>(&self, writer: &'a mut Writer<P, O>) -> Option<WriteGroup<'_, 'a, P, O>> {
let start = Instant::now();
let node = unsafe { Some(NonNull::new_unchecked(writer)) };
let mut inner = self.inner.lock();
if let Some(tail) = inner.tail.get() {
Expand All @@ -191,7 +190,6 @@ impl<P, O> WriteBarrier<P, O> {
//
self.leader_cv.wait(&mut inner);
inner.pending_leader.set(None);
perf_context!(write_leader_wait_duration).observe_since(start);
}
} else {
// leader of a empty write group. proceed directly.
Expand Down Expand Up @@ -244,7 +242,7 @@ mod tests {
let mut processed_writers = 0;

for _ in 0..4 {
let mut writer = Writer::new(&mut payload, false, Instant::now());
let mut writer = Writer::new(&mut payload, false);
{
let mut wg = barrier.enter(&mut writer).unwrap();
leaders += 1;
Expand Down Expand Up @@ -296,7 +294,7 @@ mod tests {
self.ths.push(
ThreadBuilder::new()
.spawn(move || {
let mut writer = Writer::new(&mut seq, false, Instant::now());
let mut writer = Writer::new(&mut seq, false);
{
let mut wg = barrier.enter(&mut writer).unwrap();
leader_enter_tx.send(()).unwrap();
Expand Down Expand Up @@ -330,7 +328,7 @@ mod tests {
self.ths.push(
ThreadBuilder::new()
.spawn(move || {
let mut writer = Writer::new(&mut seq, false, Instant::now());
let mut writer = Writer::new(&mut seq, false);
start_thread.wait();
if let Some(mut wg) = barrier.enter(&mut writer) {
leader_enter_tx_clone.send(()).unwrap();
Expand Down

0 comments on commit 10bfa06

Please sign in to comment.