Skip to content

Commit

Permalink
feat: replace condvar with watch channel
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Apr 13, 2023
1 parent 1f03ec9 commit dc70cab
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 68 deletions.
15 changes: 6 additions & 9 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::{
};
use log::{debug, error, info};
use snafu::{Backtrace, ResultExt, Snafu};
use table_engine::{predicate::Predicate, table::Result as TableResult};
use table_engine::predicate::Predicate;
use tokio::sync::oneshot;
use wal::manager::WalLocation;

Expand Down Expand Up @@ -137,7 +137,7 @@ pub struct TableFlushOptions {
/// Flush result sender.
///
/// Default is None.
pub res_sender: Option<oneshot::Sender<TableResult<()>>>,
pub res_sender: Option<oneshot::Sender<Result<()>>>,
/// Schedule a compaction request after flush if it is not [None].
///
/// If it is [None], no compaction will be scheduled.
Expand Down Expand Up @@ -274,7 +274,6 @@ impl Flusher {
block_on: bool,
) -> Result<()> {
let table_data = flush_req.table_data.clone();
let table = table_data.name.clone();

let flush_task = FlushTask {
table_data: table_data.clone(),
Expand All @@ -299,25 +298,23 @@ impl Flusher {

flush_scheduler
.flush_sequentially(
table,
&table_data.metrics,
flush_job,
on_flush_success,
block_on,
&self.runtime,
opts.res_sender,
&self.runtime,
&table_data.metrics,
)
.await
} else {
flush_scheduler
.flush_sequentially(
table,
&table_data.metrics,
flush_job,
async {},
block_on,
&self.runtime,
opts.res_sender,
&self.runtime,
&table_data.metrics,
)
.await
}
Expand Down
136 changes: 80 additions & 56 deletions analytic_engine/src/instance/serializer.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,62 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{
sync::{Arc, Condvar, Mutex},
sync::{Arc, Mutex},
time::Instant,
};

use common_util::{runtime::Runtime, time::InstantExt};
use futures::Future;
use log::error;
use table_engine::table::{Error as TableError, Result as TableResult, TableId};
use tokio::sync::oneshot;
use table_engine::table::TableId;
use tokio::sync::{
oneshot,
watch::{self, Receiver, Sender},
};

use crate::{
instance::flush_compaction::{self, BackgroundFlushFailed},
instance::flush_compaction::{BackgroundFlushFailed, Other, Result},
table::metrics::Metrics,
};

#[derive(Default)]
enum FlushState {
#[default]
Ok,
Ready,
Flushing,
Failed {
err_msg: String,
},
}

type ScheduleLock = Arc<(Mutex<FlushState>, Condvar)>;
type ScheduleSyncRef = Arc<ScheduleSync>;

struct ScheduleSync {
state: Mutex<FlushState>,
notifier: Sender<()>,
}

#[derive(Default)]
pub struct TableFlushScheduler {
schedule_lock: ScheduleLock,
schedule_sync: ScheduleSyncRef,
state_watcher: Receiver<()>,
}

impl Default for TableFlushScheduler {
fn default() -> Self {
let (tx, rx) = watch::channel(());
let schedule_sync = ScheduleSync {
state: Mutex::new(FlushState::Ready),
notifier: tx,
};
Self {
schedule_sync: Arc::new(schedule_sync),
state_watcher: rx,
}
}
}

/// All operations on tables must hold the mutable reference of this serializer.
///
/// To ensure the consistency of a table's data, these rules are required:
/// - The write procedure (write wal + write memtable) should be serialized as a
/// whole, that is to say, it is not allowed to write wal and memtable
Expand Down Expand Up @@ -70,31 +94,33 @@ impl TableFlushScheduler {
/// sequential.
///
/// REQUIRE: should only be called by the write thread.
#[allow(clippy::too_many_arguments)]
pub async fn flush_sequentially<F, T>(
&mut self,
table: String,
metrics: &Metrics,
flush_job: F,
on_flush_success: T,
block_on_write_thread: bool,
res_sender: Option<oneshot::Sender<Result<()>>>,
runtime: &Runtime,
res_sender: Option<oneshot::Sender<TableResult<()>>>,
) -> flush_compaction::Result<()>
metrics: &Metrics,
) -> Result<()>
where
F: Future<Output = flush_compaction::Result<()>> + Send + 'static,
F: Future<Output = Result<()>> + Send + 'static,
T: Future<Output = ()> + Send + 'static,
{
// If flush operation is running, then we need to wait for it to complete first.
// Actually, the loop waiting ensures the multiple flush procedures to be
// sequential, that is to say, at most one flush is being executed at
// the same time.
{
let mut stall_begin: Option<Instant> = None;
let mut flush_state = self.schedule_lock.0.lock().unwrap();
loop {
let mut stall_begin: Option<Instant> = None;
loop {
{
// Check if the flush procedure is running and the lock will be dropped when
// leaving the block.
let mut flush_state = self.schedule_sync.state.lock().unwrap();
match &*flush_state {
FlushState::Ok => {
FlushState::Ready => {
// Mark the worker is flushing.
*flush_state = FlushState::Flushing;
break;
}
FlushState::Flushing => (),
Expand All @@ -106,40 +132,33 @@ impl TableFlushScheduler {
if stall_begin.is_none() {
stall_begin = Some(Instant::now());
}
flush_state = self.schedule_lock.1.wait(flush_state).unwrap();
}
if let Some(stall_begin) = stall_begin {
metrics.on_write_stall(stall_begin.saturating_elapsed());
}

// TODO(yingwen): Store pending flush requests and retry flush on recoverable
// error, or try to recover from background error.
if self.state_watcher.changed().await.is_err() {
return Other {
msg: "State notifier is dropped unexpectedly",
}
.fail();
}
}

// Mark the worker is flushing.
*flush_state = FlushState::Flushing;
// Record the write stall cost.
if let Some(stall_begin) = stall_begin {
metrics.on_write_stall(stall_begin.saturating_elapsed());
}

let schedule_lock = self.schedule_lock.clone();
// TODO(yingwen): Store pending flush requests and retry flush on
// recoverable error, or try to recover from background
// error.

let schedule_sync = self.schedule_sync.clone();
let task = async move {
let flush_res = flush_job.await;
on_flush_finished(schedule_lock, &flush_res);

match flush_res {
Ok(()) => {
on_flush_success.await;
send_flush_result(res_sender, Ok(()));
}
Err(e) => {
let e = Arc::new(e);
send_flush_result(
res_sender,
Err(TableError::Flush {
source: Box::new(e),
table,
}),
);
}
on_flush_finished(schedule_sync, &flush_res);
if flush_res.is_ok() {
on_flush_success.await;
}
send_flush_result(res_sender, flush_res);
};

if block_on_write_thread {
Expand All @@ -152,21 +171,26 @@ impl TableFlushScheduler {
}
}

fn on_flush_finished(schedule_lock: ScheduleLock, res: &flush_compaction::Result<()>) {
let mut flush_state = schedule_lock.0.lock().unwrap();
match res {
Ok(()) => {
*flush_state = FlushState::Ok;
}
Err(e) => {
let err_msg = e.to_string();
*flush_state = FlushState::Failed { err_msg };
fn on_flush_finished(schedule_sync: ScheduleSyncRef, res: &Result<()>) {
{
let mut flush_state = schedule_sync.state.lock().unwrap();
match res {
Ok(()) => {
*flush_state = FlushState::Ready;
}
Err(e) => {
let err_msg = e.to_string();
*flush_state = FlushState::Failed { err_msg };
}
}
}
schedule_lock.1.notify_all();

if schedule_sync.notifier.send(()).is_err() {
error!("Fail to notify flush state change, flush_res:{res:?}");
}
}

fn send_flush_result(res_sender: Option<oneshot::Sender<TableResult<()>>>, res: TableResult<()>) {
fn send_flush_result(res_sender: Option<oneshot::Sender<Result<()>>>, res: Result<()>) {
if let Some(tx) = res_sender {
if let Err(send_res) = tx.send(res) {
error!("Fail to send flush result, send_res:{:?}", send_res);
Expand Down
2 changes: 0 additions & 2 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,6 @@ impl<'a> MemTableWriter<'a> {
}

impl<'a> Writer<'a> {
/// Do the actual write, must called by write worker in write thread
/// sequentially.
pub(crate) async fn write(&mut self, request: WriteRequest) -> Result<usize> {
let _timer = self.table_data.metrics.start_table_write_timer();
self.table_data.metrics.on_write_request_begin();
Expand Down
2 changes: 1 addition & 1 deletion server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
let builder = meta_event_service::Builder {
cluster: v,
instance: instance.clone(),
runtime: runtimes.meta_runtime.clone(),
runtime: runtimes.default_runtime.clone(),
opened_wals,
};
MetaEventServiceServer::new(builder.build())
Expand Down

0 comments on commit dc70cab

Please sign in to comment.