Skip to content

Commit

Permalink
Optimization: Use notify to synchronize threads
Browse files Browse the repository at this point in the history
  • Loading branch information
akiradeveloper committed Jun 8, 2024
1 parent 0db8206 commit 1707cf8
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 23 deletions.
1 change: 0 additions & 1 deletion lolraft/src/generated/lolraft.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// This file is @generated by prost-build.
/// Update request to the `RaftApp`.
/// This type of request is serialized in the log and processed sequentially.
/// `request_id` is unique identifier of the request to avoid executing duplicating requests.
Expand Down
Binary file modified lolraft/src/generated/lolraft_descriptor.bin
Binary file not shown.
20 changes: 18 additions & 2 deletions lolraft/src/process/peer_svc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,26 @@ pub struct Inner {

command_log: Ref<CommandLog>,
driver: RaftDriver,

queue_rx: thread::EventConsumer<thread::QueueEvent>,
replication_tx: thread::EventProducer<thread::ReplicationEvent>,
}

#[derive(shrinkwraprs::Shrinkwrap, Clone)]
pub struct PeerSvc(pub Arc<Inner>);
impl PeerSvc {
pub fn new(command_log: Ref<CommandLog>, driver: RaftDriver) -> Self {
pub fn new(
command_log: Ref<CommandLog>,
queue_rx: thread::EventConsumer<thread::QueueEvent>,
replication_tx: thread::EventProducer<thread::ReplicationEvent>,
driver: RaftDriver,
) -> Self {
let inner = Inner {
membership: HashSet::new().into(),
peer_contexts: HashMap::new().into(),
peer_threads: HashMap::new().into(),
queue_rx,
replication_tx,
command_log,
driver,
};
Expand Down Expand Up @@ -103,7 +113,13 @@ impl PeerSvc {
);

let thread_handles = ThreadHandles {
replicator_handle: thread::replication::new(id.clone(), self.clone(), voter.clone()),
replicator_handle: thread::replication::new(
id.clone(),
self.clone(),
voter.clone(),
self.queue_rx.clone(),
self.replication_tx.clone(),
),
heartbeater_handle: thread::heartbeat::new(id.clone(), voter),
};
self.peer_threads.lock().insert(id, thread_handles);
Expand Down
33 changes: 30 additions & 3 deletions lolraft/src/process/raft_process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub struct RaftProcess {
query_queue: QueryQueue,
driver: RaftDriver,
_thread_handles: ThreadHandles,

queue_tx: thread::EventProducer<thread::QueueEvent>,
replication_tx: thread::EventProducer<thread::ReplicationEvent>,
}

impl RaftProcess {
Expand All @@ -42,7 +45,17 @@ impl RaftProcess {
let command_log = CommandLog::new(log_store, app.clone());
command_log.restore_state().await?;

let peers = PeerSvc::new(Ref(command_log.clone()), driver.clone());
let (queue_tx, queue_rx) = thread::notify();
let (replication_tx, replication_rx) = thread::notify();
let (commit_tx, commit_rx) = thread::notify();
let (kern_tx, kern_rx) = thread::notify();

let peers = PeerSvc::new(
Ref(command_log.clone()),
queue_rx.clone(),
replication_tx.clone(),
driver.clone(),
);

let voter = Voter::new(
ballot_store,
Expand All @@ -54,13 +67,24 @@ impl RaftProcess {
peers.restore_state(Ref(voter.clone())).await?;

let _thread_handles = ThreadHandles {
advance_kern_handle: thread::advance_kern::new(command_log.clone(), voter.clone()),
advance_user_handle: thread::advance_user::new(command_log.clone(), app.clone()),
advance_kern_handle: thread::advance_kern::new(
command_log.clone(),
voter.clone(),
commit_rx.clone(),
kern_tx.clone(),
),
advance_user_handle: thread::advance_user::new(
command_log.clone(),
app.clone(),
kern_rx.clone(),
),
advance_snapshot_handle: thread::advance_snapshot::new(command_log.clone()),
advance_commit_handle: thread::advance_commit::new(
command_log.clone(),
Ref(peers.clone()),
Ref(voter.clone()),
replication_rx.clone(),
commit_tx.clone(),
),
election_handle: thread::election::new(voter.clone()),
log_compaction_handle: thread::log_compaction::new(command_log.clone()),
Expand All @@ -79,6 +103,9 @@ impl RaftProcess {
query_queue,
driver,
_thread_handles,

queue_tx,
replication_tx,
})
}
}
3 changes: 3 additions & 0 deletions lolraft/src/process/raft_process/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ impl RaftProcess {
self.process_configuration_command(&command, append_index)
.await?;

self.queue_tx.push_event(thread::QueueEvent);
self.replication_tx.push_event(thread::ReplicationEvent);

Ok(append_index)
}

Expand Down
16 changes: 13 additions & 3 deletions lolraft/src/process/thread/advance_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pub struct Thread {
command_log: CommandLog,
peers: Ref<PeerSvc>,
voter: Ref<Voter>,
consumer: EventConsumer<ReplicationEvent>,
producer: EventProducer<CommitEvent>,
}
impl Thread {
async fn run_once(&self) -> Result<()> {
Expand All @@ -18,16 +20,16 @@ impl Thread {
self.command_log
.commit_pointer
.store(new_commit_index, Ordering::SeqCst);
self.producer.push_event(CommitEvent);
}

Ok(())
}

fn do_loop(self) -> ThreadHandle {
let fut = async move {
let mut interval = tokio::time::interval(Duration::from_millis(100));
loop {
interval.tick().await;
self.consumer.consume_events(Duration::from_secs(1)).await;
self.run_once().await.ok();
}
};
Expand All @@ -37,11 +39,19 @@ impl Thread {
}
}

pub fn new(command_log: CommandLog, peers: Ref<PeerSvc>, voter: Ref<Voter>) -> ThreadHandle {
pub fn new(
command_log: CommandLog,
peers: Ref<PeerSvc>,
voter: Ref<Voter>,
consume: EventConsumer<ReplicationEvent>,
produce: EventProducer<CommitEvent>,
) -> ThreadHandle {
Thread {
command_log,
peers,
voter,
consumer: consume,
producer: produce,
}
.do_loop()
}
21 changes: 17 additions & 4 deletions lolraft/src/process/thread/advance_kern.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use super::*;
pub struct Thread {
command_log: CommandLog,
voter: Voter,
consumer: EventConsumer<CommitEvent>,
producer: EventProducer<KernEvent>,
}
impl Thread {
async fn advance_once(&self) -> Result<()> {
Expand All @@ -14,10 +16,10 @@ impl Thread {

fn do_loop(self) -> ThreadHandle {
let fut = async move {
let mut interval = tokio::time::interval(Duration::from_millis(100));
loop {
interval.tick().await;
self.consumer.consume_events(Duration::from_secs(1)).await;
while self.advance_once().await.is_ok() {
self.producer.push_event(KernEvent);
tokio::task::yield_now().await;
}
}
Expand All @@ -28,6 +30,17 @@ impl Thread {
}
}

pub fn new(command_log: CommandLog, voter: Voter) -> ThreadHandle {
Thread { command_log, voter }.do_loop()
pub fn new(
command_log: CommandLog,
voter: Voter,
consumer: EventConsumer<CommitEvent>,
producer: EventProducer<KernEvent>,
) -> ThreadHandle {
Thread {
command_log,
voter,
consumer,
producer,
}
.do_loop()
}
2 changes: 1 addition & 1 deletion lolraft/src/process/thread/advance_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ impl Thread {

fn do_loop(self) -> ThreadHandle {
let fut = async move {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
self.run_once().await.ok();
Expand Down
13 changes: 9 additions & 4 deletions lolraft/src/process/thread/advance_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::*;
pub struct Thread {
command_log: CommandLog,
app: App,
consumer: EventConsumer<KernEvent>,
}

impl Thread {
Expand All @@ -15,9 +16,8 @@ impl Thread {

fn do_loop(self) -> ThreadHandle {
let fut = async move {
let mut interval = tokio::time::interval(Duration::from_millis(100));
loop {
interval.tick().await;
self.consumer.consume_events(Duration::from_secs(1)).await;
while self.advance_once().await.is_ok() {
tokio::task::yield_now().await;
}
Expand All @@ -28,6 +28,11 @@ impl Thread {
}
}

pub fn new(command_log: CommandLog, app: App) -> ThreadHandle {
Thread { command_log, app }.do_loop()
pub fn new(command_log: CommandLog, app: App, consumer: EventConsumer<KernEvent>) -> ThreadHandle {
Thread {
command_log,
app,
consumer,
}
.do_loop()
}
2 changes: 1 addition & 1 deletion lolraft/src/process/thread/log_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl Thread {

fn do_loop(self) -> ThreadHandle {
let fut = async move {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
self.run_once().await.ok();
Expand Down
55 changes: 55 additions & 0 deletions lolraft/src/process/thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,58 @@ impl Drop for ThreadHandle {
self.0.abort();
}
}

use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::Notify;

#[derive(Clone)]
pub struct EventProducer<T> {
inner: Arc<Notify>,
phantom: PhantomData<T>,
}
impl<T> EventProducer<T> {
pub fn push_event(&self, _: T) {
self.inner.notify_one();
}
}

#[derive(Clone)]
pub struct EventConsumer<T> {
inner: Arc<Notify>,
phantom: PhantomData<T>,
}
impl<T> EventConsumer<T> {
/// Return if events are produced or timeout.
pub async fn consume_events(&self, timeout: Duration) {
tokio::time::timeout(timeout, self.inner.notified())
.await
.ok();
}
}

pub fn notify<T>() -> (EventProducer<T>, EventConsumer<T>) {
let inner = Arc::new(Notify::new());
(
EventProducer {
inner: inner.clone(),
phantom: PhantomData,
},
EventConsumer {
inner,
phantom: PhantomData,
},
)
}

#[derive(Clone)]
pub struct QueueEvent;

#[derive(Clone)]
pub struct ReplicationEvent;

#[derive(Clone)]
pub struct CommitEvent;

#[derive(Clone)]
pub struct KernEvent;
16 changes: 13 additions & 3 deletions lolraft/src/process/thread/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pub struct Thread {
follower_id: NodeId,
peers: PeerSvc,
voter: Ref<Voter>,
consumer: EventConsumer<QueueEvent>,
producer: EventProducer<ReplicationEvent>,
}
impl Thread {
async fn advance_once(&self) -> Result<bool> {
Expand All @@ -22,10 +24,10 @@ impl Thread {

fn do_loop(self) -> ThreadHandle {
let fut = async move {
let mut interval = tokio::time::interval(Duration::from_millis(100));
loop {
interval.tick().await;
self.consumer.consume_events(Duration::from_secs(1)).await;
while let Ok(true) = self.advance_once().await {
self.producer.push_event(ReplicationEvent);
tokio::task::yield_now().await;
}
}
Expand All @@ -36,11 +38,19 @@ impl Thread {
}
}

pub fn new(follower_id: NodeId, peers: PeerSvc, voter: Ref<Voter>) -> ThreadHandle {
pub fn new(
follower_id: NodeId,
peers: PeerSvc,
voter: Ref<Voter>,
consumer: EventConsumer<QueueEvent>,
producer: EventProducer<ReplicationEvent>,
) -> ThreadHandle {
Thread {
follower_id,
peers,
voter,
consumer,
producer,
}
.do_loop()
}
2 changes: 1 addition & 1 deletion lolraft/src/process/thread/snapshot_deleter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl Thread {

fn do_loop(self) -> ThreadHandle {
let fut = async move {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
self.run_once().await.ok();
Expand Down
15 changes: 15 additions & 0 deletions tests/lol-tests/tests/2_n5.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use anyhow::Result;
use lol_tests::*;
use serial_test::serial;

#[serial]
#[tokio::test(flavor = "multi_thread")]
async fn n5_cluster() -> Result<()> {
let mut cluster = Cluster::new(5, 1).await?;
cluster.add_server(0, 0, 0).await?;
cluster.add_server(0, 0, 1).await?;
cluster.add_server(0, 1, 2).await?;
cluster.add_server(0, 2, 3).await?;
cluster.add_server(0, 3, 4).await?;
Ok(())
}
File renamed without changes.
File renamed without changes.

0 comments on commit 1707cf8

Please sign in to comment.