Skip to content

Commit

Permalink
optimization: Implement heartbeat multiplexing
Browse files Browse the repository at this point in the history
  • Loading branch information
akiradeveloper committed May 22, 2024
1 parent a6950b5 commit 12ac6a3
Show file tree
Hide file tree
Showing 19 changed files with 181 additions and 75 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ jobs:
name: Test
runs-on: ubuntu-latest
steps:
- name: CPU
run: lscpu

- name: Checkout
uses: actions/checkout@v4

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM 'rust:1.76-bookworm'
FROM 'rust:1.78-bookworm'

WORKDIR '/work'

Expand Down
10 changes: 6 additions & 4 deletions lolraft/proto/lolraft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ message VoteResponse {
bool vote_granted = 1;
}

message LeaderCommitState {
uint64 leader_term = 1;
uint64 leader_commit_index = 2;
}
message Heartbeat {
uint32 lane_id = 1;
uint64 leader_term = 2;
string leader_id = 3;
uint64 leader_commit_index = 4;
string leader_id = 1;
map<uint32, LeaderCommitState> leader_commit_states = 2;
}

// Request to add a Raft process with `server_id` to a lane.
Expand Down
54 changes: 54 additions & 0 deletions lolraft/src/communicator/heartbeat_multiplex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use super::*;

use spin::Mutex;
use std::collections::HashMap;

pub struct HeartbeatBuffer {
buf: HashMap<LaneId, request::Heartbeat>,
}
impl HeartbeatBuffer {
pub fn new() -> Self {
Self {
buf: HashMap::new(),
}
}

pub fn push(&mut self, lane_id: LaneId, req: request::Heartbeat) {
self.buf.insert(lane_id, req);
}

fn drain(&mut self) -> HashMap<LaneId, request::Heartbeat> {
self.buf.drain().collect()
}
}

pub async fn run(
buf: Arc<Mutex<HeartbeatBuffer>>,
mut cli: raft::RaftClient,
self_node_id: NodeId,
) {
loop {
tokio::time::sleep(Duration::from_millis(300)).await;

let states = {
let mut buf = buf.lock();
let heartbeats = buf.drain();

let mut out = HashMap::new();
for (lane_id, heartbeat) in heartbeats {
let state = raft::LeaderCommitState {
leader_term: heartbeat.leader_term,
leader_commit_index: heartbeat.leader_commit_index,
};
out.insert(lane_id, state);
}
out
};

let req = raft::Heartbeat {
leader_id: self_node_id.to_string(),
leader_commit_states: states,
};
cli.send_heartbeat(req).await.ok();
}
}
89 changes: 69 additions & 20 deletions lolraft/src/communicator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,59 @@
use super::*;

mod heartbeat_multiplex;
mod stream;

use heartbeat_multiplex::*;
use process::*;
use spin::Mutex;
use std::sync::Arc;
use tokio::task::AbortHandle;

mod stream;
pub struct HandleDrop(AbortHandle);
impl Drop for HandleDrop {
fn drop(&mut self) {
self.0.abort();
}
}

#[derive(Clone)]
pub struct RaftConnection {
client: raft::RaftClient,
heartbeat_buffer: Arc<Mutex<HeartbeatBuffer>>,
_abort_hdl: Arc<HandleDrop>,
}
impl RaftConnection {
pub fn new(self_node_id: NodeId, dest_node_id: NodeId) -> Self {
let client = {
let endpoint = tonic::transport::Endpoint::from(dest_node_id.0.clone());
let chan = endpoint.connect_lazy();
raft::RaftClient::new(chan)
};

let heartbeat_buffer = Arc::new(Mutex::new(HeartbeatBuffer::new()));

let abort_hdl = tokio::spawn(heartbeat_multiplex::run(
heartbeat_buffer.clone(),
client.clone(),
self_node_id,
))
.abort_handle();

Self {
client,
heartbeat_buffer,
_abort_hdl: Arc::new(HandleDrop(abort_hdl)),
}
}
}

pub struct Communicator {
cli: raft::RaftClient,
conn: RaftConnection,
lane_id: LaneId,
}
impl Communicator {
pub fn new(cli: raft::RaftClient, lane_id: LaneId) -> Self {
Self { cli, lane_id }
pub fn new(conn: RaftConnection, lane_id: LaneId) -> Self {
Self { conn, lane_id }
}
}

Expand All @@ -20,20 +63,19 @@ impl Communicator {
lane_id: self.lane_id,
index,
};
let st = self.cli.clone().get_snapshot(req).await?.into_inner();
let st = self
.conn
.client
.clone()
.get_snapshot(req)
.await?
.into_inner();
let st = Box::pin(stream::into_internal_snapshot_stream(st));
Ok(st)
}

pub async fn send_heartbeat(&self, req: request::Heartbeat) -> Result<()> {
let req = raft::Heartbeat {
lane_id: self.lane_id,
leader_id: req.leader_id.to_string(),
leader_term: req.leader_term,
leader_commit_index: req.leader_commit_index,
};
self.cli.clone().send_heartbeat(req).await?;
Ok(())
pub fn queue_heartbeat(&self, req: request::Heartbeat) {
self.conn.heartbeat_buffer.lock().push(self.lane_id, req);
}

pub async fn process_user_write_request(
Expand All @@ -45,7 +87,7 @@ impl Communicator {
message: req.message,
request_id: req.request_id,
};
let resp = self.cli.clone().write(req).await?.into_inner();
let resp = self.conn.client.clone().write(req).await?.into_inner();
Ok(resp.message)
}

Expand All @@ -54,7 +96,7 @@ impl Communicator {
lane_id: self.lane_id,
message: req.message,
};
let resp = self.cli.clone().read(req).await?.into_inner();
let resp = self.conn.client.clone().read(req).await?.into_inner();
Ok(resp.message)
}

Expand All @@ -63,15 +105,15 @@ impl Communicator {
lane_id: self.lane_id,
message: req.message,
};
self.cli.clone().process_kern_request(req).await?;
self.conn.client.clone().process_kern_request(req).await?;
Ok(())
}

pub async fn send_timeout_now(&self) -> Result<()> {
let req = raft::TimeoutNow {
lane_id: self.lane_id,
};
self.cli.clone().send_timeout_now(req).await?;
self.conn.client.clone().send_timeout_now(req).await?;
Ok(())
}

Expand All @@ -81,7 +123,8 @@ impl Communicator {
) -> Result<response::ReplicationStream> {
let st = stream::into_external_replication_stream(self.lane_id, st);
let resp = self
.cli
.conn
.client
.clone()
.send_replication_stream(st)
.await?
Expand All @@ -107,7 +150,13 @@ impl Communicator {
force_vote: req.force_vote,
pre_vote: req.pre_vote,
};
let resp = self.cli.clone().request_vote(req).await?.into_inner();
let resp = self
.conn
.client
.clone()
.request_vote(req)
.await?
.into_inner();
Ok(resp.vote_granted)
}
}
18 changes: 11 additions & 7 deletions lolraft/src/generated/lolraft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,20 @@ pub struct VoteResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Heartbeat {
#[prost(uint32, tag = "1")]
pub lane_id: u32,
#[prost(uint64, tag = "2")]
pub struct LeaderCommitState {
#[prost(uint64, tag = "1")]
pub leader_term: u64,
#[prost(string, tag = "3")]
pub leader_id: ::prost::alloc::string::String,
#[prost(uint64, tag = "4")]
#[prost(uint64, tag = "2")]
pub leader_commit_index: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Heartbeat {
#[prost(string, tag = "1")]
pub leader_id: ::prost::alloc::string::String,
#[prost(map = "uint32, message", tag = "2")]
pub leader_commit_states: ::std::collections::HashMap<u32, LeaderCommitState>,
}
/// Request to add a Raft process with `server_id` to a lane.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
Binary file modified lolraft/src/generated/lolraft_descriptor.bin
Binary file not shown.
21 changes: 10 additions & 11 deletions lolraft/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use super::*;

use communicator::{Communicator, RaftConnection};
use std::collections::HashMap;

pub struct Inner {
self_node_id: NodeId,
cache: moka::sync::Cache<NodeId, raft::RaftClient>,
cache: moka::sync::Cache<NodeId, RaftConnection>,
process: spin::RwLock<HashMap<LaneId, Arc<RaftProcess>>>,
}

Expand All @@ -15,8 +16,8 @@ impl RaftNode {
/// Create a new Raft node with a given node ID.
pub fn new(id: NodeId) -> Self {
let builder = moka::sync::Cache::builder()
.initial_capacity(1000)
.time_to_live(Duration::from_secs(60));
.initial_capacity(3)
.time_to_idle(Duration::from_secs(60));
let inner = Inner {
self_node_id: id,
cache: builder.build(),
Expand All @@ -30,7 +31,7 @@ impl RaftNode {
RaftDriver {
lane_id,
self_node_id: self.self_node_id.clone(),
cache: self.cache.clone(),
connection_cache: self.cache.clone(),
}
}

Expand All @@ -54,19 +55,17 @@ impl RaftNode {
pub struct RaftDriver {
lane_id: LaneId,
self_node_id: NodeId,
cache: moka::sync::Cache<NodeId, raft::RaftClient>,
connection_cache: moka::sync::Cache<NodeId, RaftConnection>,
}
impl RaftDriver {
pub(crate) fn self_node_id(&self) -> NodeId {
self.self_node_id.clone()
}

pub(crate) fn connect(&self, id: NodeId) -> communicator::Communicator {
let conn = self.cache.get_with(id.clone(), || {
let endpoint = tonic::transport::Endpoint::from(id.0);
let chan = endpoint.connect_lazy();
raft::RaftClient::new(chan)
pub(crate) fn connect(&self, dest_node_id: NodeId) -> Communicator {
let conn: RaftConnection = self.connection_cache.get_with(dest_node_id.clone(), || {
RaftConnection::new(self.self_node_id.clone(), dest_node_id.clone())
});
communicator::Communicator::new(conn, self.lane_id)
Communicator::new(conn, self.lane_id)
}
}
1 change: 0 additions & 1 deletion lolraft/src/process/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pub mod request {
}

pub struct Heartbeat {
pub leader_id: NodeId,
pub leader_term: Term,
pub leader_commit_index: Index,
}
Expand Down
2 changes: 1 addition & 1 deletion lolraft/src/process/command_log/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl CommandLog {
let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst);
let proposed_snapshot_index = self.app.get_latest_snapshot().await?;
if proposed_snapshot_index > cur_snapshot_index {
info!("found a newer proposed snapshot@{proposed_snapshot_index}. will move the snapshot index.");
info!("found a newer proposed snapshot@{proposed_snapshot_index} > {cur_snapshot_index}. will move the snapshot index.");

// Calculate membership at the new snapshot index
let new_config = {
Expand Down
2 changes: 1 addition & 1 deletion lolraft/src/process/command_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Inner {
self.snapshot_pointer
.store(new_snapshot_index, Ordering::SeqCst);

info!("inserted a new snapshot@{new_snapshot_index}");
info!("inserted a new snapshot@{new_snapshot_index} (prev={cur_snapshot_index})");
Ok(())
}

Expand Down
7 changes: 5 additions & 2 deletions lolraft/src/process/raft_process/responder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,11 @@ impl RaftProcess {
Ok(resp)
}

pub(crate) async fn send_heartbeat(&self, req: request::Heartbeat) -> Result<()> {
let leader_id = req.leader_id;
pub(crate) async fn receive_heartbeat(
&self,
leader_id: NodeId,
req: request::Heartbeat,
) -> Result<()> {
let term = req.leader_term;
let leader_commit = req.leader_commit_index;
self.voter
Expand Down
2 changes: 1 addition & 1 deletion lolraft/src/process/thread/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Thread {

fn do_loop(self) -> ThreadHandle {
let hdl = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(100));
let mut interval = tokio::time::interval(Duration::from_millis(300));
loop {
interval.tick().await;
self.run_once().await.ok();
Expand Down
10 changes: 3 additions & 7 deletions lolraft/src/process/thread/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@ impl Thread {

fn do_loop(self) -> ThreadHandle {
let hdl = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(300));
loop {
// Every iteration involves
// T = 100ms sleep + RPC round trip time.
// So, heartbeat is observed at follower site every T time.
// We can't use tokio::time::interval instead because it results in
// follower receives heartbeat every 100ms regardless of RPC round trip time.
// In this case, the failure detector at follower site will not work correctly.
tokio::time::sleep(Duration::from_millis(100)).await;
interval.tick().await;
// Periodically sending a new commit state to the buffer.
self.run_once().await.ok();
}
})
Expand Down
Loading

0 comments on commit 12ac6a3

Please sign in to comment.