Skip to content

Commit

Permalink
fix: a non-voter not in joint config should not block replication
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Aug 31, 2021
1 parent 23f5f7e commit 4d58a51
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 6 deletions.
33 changes: 27 additions & 6 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::sync::Arc;

use anyhow::anyhow;
Expand Down Expand Up @@ -28,6 +29,7 @@ use crate::replication::RaftEvent;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
use crate::MessageSummary;
use crate::RaftNetwork;
use crate::RaftStorage;

Expand All @@ -52,6 +54,12 @@ impl<D: AppData, R: AppDataResponse> ClientRequestEntry<D, R> {
}
}

impl<D: AppData, R: AppDataResponse> MessageSummary for ClientRequestEntry<D, R> {
fn summary(&self) -> String {
format!("entry:{}", self.entry.summary())
}
}

/// An enum type wrapping either a client response channel or an internal Raft response channel.
#[derive(derive_more::From)]
pub enum ClientOrInternalResponseTx<D: AppData, R: AppDataResponse> {
Expand Down Expand Up @@ -232,7 +240,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

/// Handle client write requests.
#[tracing::instrument(level = "trace", skip(self, rpc, tx))]
#[tracing::instrument(level = "trace", skip(self, tx), fields(rpc=%rpc.summary()))]
pub(super) async fn handle_client_write_request(
&mut self,
rpc: ClientWriteRequest<D>,
Expand Down Expand Up @@ -275,22 +283,35 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
/// NOTE WELL: this routine does not wait for the request to actually finish replication, it
/// merely beings the process. Once the request is committed to the cluster, its response will
/// be generated asynchronously.
#[tracing::instrument(level = "trace", skip(self, req))]
#[tracing::instrument(level = "trace", skip(self, req), fields(req=%req.summary()))]
pub(super) async fn replicate_client_request(&mut self, req: ClientRequestEntry<D, R>) {
// Replicate the request if there are other cluster members. The client response will be
// returned elsewhere after the entry has been committed to the cluster.
let entry_arc = req.entry.clone();

if self.nodes.is_empty() && self.non_voters.is_empty() {
// TODO(xp): calculate nodes set that need to replicate to, when updating membership
// TODO(xp): Or add to-non-voter replication into self.nodes.

let all_members = self.core.membership.all_nodes();
let non_voter_ids = self.non_voters.keys().copied().collect::<BTreeSet<_>>();

let joint_non_voter_ids = all_members.intersection(&non_voter_ids).collect::<Vec<_>>();

let nodes = self.nodes.keys().collect::<Vec<_>>();
tracing::debug!(?nodes, ?joint_non_voter_ids, "replicate_client_request");

let await_quorum = !self.nodes.is_empty() || !joint_non_voter_ids.is_empty();

if await_quorum {
self.awaiting_committed.push(req);
} else {
// Else, there are no voting nodes for replication, so the payload is now committed.
self.core.commit_index = entry_arc.log_id.index;
tracing::debug!(self.core.commit_index, "update commit index, no need to replicate");
self.leader_report_metrics();
self.client_request_post_commit(req).await;
return;
}

self.awaiting_committed.push(req);

if !self.nodes.is_empty() {
for node in self.nodes.values() {
let _ = node.replstream.repl_tx.send((
Expand Down
56 changes: 56 additions & 0 deletions async-raft/tests/replication_1_voter_to_isolated_non_voter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::Config;
use fixtures::RaftRouter;
use maplit::btreeset;

#[macro_use]
mod fixtures;

/// Test replication to non-voter that is not in membership should not block.
///
/// What does this test do?
///
/// - bring on a cluster of 1 voter and 1 non-voter.
/// - isolate replication to node 1.
/// - client write should not be blocked.
///
/// export RUST_LOG=async_raft,memstore,replication_1_voter_to_isolated_non_voter=trace
/// cargo test -p async-raft --test replication_1_voter_to_isolated_non_voter
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn replication_1_voter_to_isolated_non_voter() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let router = Arc::new(RaftRouter::new(config.clone()));

let mut n_logs = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?;

tracing::info!("--- stop replication to node 1");
{
router.isolate_node(1).await;

router.client_request_many(0, "0", (10 - n_logs) as usize).await;
n_logs = 10;

router.wait_for_log(&btreeset![0], n_logs, timeout(), "send log to trigger snapshot").await?;
}

tracing::info!("--- restore replication to node 1");
{
router.restore_node(1).await;

router.client_request_many(0, "0", (10 - n_logs) as usize).await;
n_logs = 10;

router.wait_for_log(&btreeset![0], n_logs, timeout(), "send log to trigger snapshot").await?;
}
Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(5000))
}

0 comments on commit 4d58a51

Please sign in to comment.