diff --git a/async-raft/src/core/admin.rs b/async-raft/src/core/admin.rs index 04cc5c208..53a1de895 100644 --- a/async-raft/src/core/admin.rs +++ b/async-raft/src/core/admin.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::collections::HashSet; use futures::future::FutureExt; @@ -29,7 +30,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage #[tracing::instrument(level = "trace", skip(self))] pub(super) async fn handle_init_with_config( &mut self, - mut members: HashSet, + mut members: BTreeSet, ) -> Result<(), InitializeError> { if self.core.last_log_id.index != 0 || self.core.current_term != 0 { tracing::error!({self.core.last_log_id.index, self.core.current_term}, "rejecting init_with_config request as last_log_index or current_term is 0"); @@ -97,7 +98,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } #[tracing::instrument(level = "trace", skip(self, tx))] - pub(super) async fn change_membership(&mut self, members: HashSet, tx: ChangeMembershipTx) { + pub(super) async fn change_membership(&mut self, members: BTreeSet, tx: ChangeMembershipTx) { // Ensure cluster will have at least one node. if members.is_empty() { let _ = tx.send(Err(ChangeConfigError::InoperableConfig)); @@ -219,7 +220,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage .all_nodes() .into_iter() .filter(|elem| elem != &self.core.id) - .collect::>(); + .collect::>(); let old_node_ids = self.core.membership.members.clone(); let node_ids_to_add = new_node_ids.difference(&old_node_ids); diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index 86656badf..e86345fc7 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -8,6 +8,7 @@ pub(crate) mod replication; mod vote; use std::collections::BTreeMap; +use std::collections::BTreeSet; use std::collections::HashSet; use std::sync::Arc; @@ -778,7 +779,7 @@ pub enum ConsensusState { /// The set of non-voters nodes which are still being synced. awaiting: HashSet, /// The full membership change which has been proposed. - members: HashSet, + members: BTreeSet, /// The response channel to use once the consensus state is back into uniform state. tx: ChangeMembershipTx, }, diff --git a/async-raft/src/core/replication.rs b/async-raft/src/core/replication.rs index 9711fd114..7626d44a4 100644 --- a/async-raft/src/core/replication.rs +++ b/async-raft/src/core/replication.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::BTreeSet; use tokio::sync::oneshot; @@ -221,7 +221,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage std::cmp::min(c0_index, c1_index) } - fn calc_members_commit_index(&self, mem: &HashSet, msg: &str) -> u64 { + fn calc_members_commit_index(&self, mem: &BTreeSet, msg: &str) -> u64 { let log_ids = self.get_match_log_ids(mem); tracing::debug!("{} matched log_ids: {:?}", msg, log_ids); @@ -232,7 +232,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } /// Extract the matching index/term of the replication state of specified nodes. - fn get_match_log_ids(&self, node_ids: &HashSet) -> Vec { + fn get_match_log_ids(&self, node_ids: &BTreeSet) -> Vec { tracing::debug!("to get match log ids of nodes: {:?}", node_ids); let mut rst = Vec::with_capacity(node_ids.len()); diff --git a/async-raft/src/metrics.rs b/async-raft/src/metrics.rs index db08ef2ee..06c529b8d 100644 --- a/async-raft/src/metrics.rs +++ b/async-raft/src/metrics.rs @@ -7,8 +7,8 @@ //! Metrics are observed on a running Raft node via the `Raft::metrics()` method, which will //! return a stream of metrics. +use std::collections::BTreeSet; use std::collections::HashMap; -use std::collections::HashSet; use serde::Deserialize; use serde::Serialize; @@ -171,7 +171,7 @@ impl Wait { /// Wait for `membership_config.members` to become expected node set or timeout. #[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn members(&self, want_members: HashSet, msg: impl ToString) -> Result { + pub async fn members(&self, want_members: BTreeSet, msg: impl ToString) -> Result { self.metrics( |x| x.membership_config.members == want_members, &format!("{} .membership_config.members -> {:?}", msg.to_string(), want_members), @@ -183,7 +183,7 @@ impl Wait { #[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn next_members( &self, - want_members: Option>, + want_members: Option>, msg: impl ToString, ) -> Result { self.metrics( diff --git a/async-raft/src/metrics_wait_test.rs b/async-raft/src/metrics_wait_test.rs index d1dc546d2..8e885b40f 100644 --- a/async-raft/src/metrics_wait_test.rs +++ b/async-raft/src/metrics_wait_test.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use maplit::hashset; +use maplit::btreeset; use tokio::sync::watch; use tokio::time::sleep; @@ -73,14 +73,14 @@ async fn test_wait() -> anyhow::Result<()> { let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; let mut update = init.clone(); - update.membership_config.members = hashset![1, 2]; + update.membership_config.members = btreeset![1, 2]; let rst = tx.send(update); assert!(rst.is_ok()); }); - let got = w.members(hashset![1, 2], "members").await?; + let got = w.members(btreeset![1, 2], "members").await?; h.await?; - assert_eq!(hashset![1, 2], got.membership_config.members); + assert_eq!(btreeset![1, 2], got.membership_config.members); } { @@ -90,14 +90,14 @@ async fn test_wait() -> anyhow::Result<()> { let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; let mut update = init.clone(); - update.membership_config.members_after_consensus = Some(hashset![1, 2]); + update.membership_config.members_after_consensus = Some(btreeset![1, 2]); let rst = tx.send(update); assert!(rst.is_ok()); }); - let got = w.next_members(Some(hashset![1, 2]), "next_members").await?; + let got = w.next_members(Some(btreeset![1, 2]), "next_members").await?; h.await?; - assert_eq!(Some(hashset![1, 2]), got.membership_config.members_after_consensus); + assert_eq!(Some(btreeset![1, 2]), got.membership_config.members_after_consensus); } tracing::info!("--- wait for snapshot, Ok"); diff --git a/async-raft/src/raft.rs b/async-raft/src/raft.rs index 5cfe75a45..1ad97d24b 100644 --- a/async-raft/src/raft.rs +++ b/async-raft/src/raft.rs @@ -1,6 +1,6 @@ //! Public Raft interface and data types. -use std::collections::HashSet; +use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; @@ -214,7 +214,7 @@ impl, S: RaftStorage> Ra /// free, and Raft guarantees that the first node to become the cluster leader will propagate /// only its own config. #[tracing::instrument(level = "debug", skip(self))] - pub async fn initialize(&self, members: HashSet) -> Result<(), InitializeError> { + pub async fn initialize(&self, members: BTreeSet) -> Result<(), InitializeError> { let (tx, rx) = oneshot::channel(); self.inner.tx_api.send(RaftMsg::Initialize { members, tx }).map_err(|_| RaftError::ShuttingDown)?; rx.await.map_err(|_| InitializeError::RaftError(RaftError::ShuttingDown)).and_then(|res| res) @@ -251,7 +251,7 @@ impl, S: RaftStorage> Ra /// If this Raft node is not the cluster leader, then the proposed configuration change will be /// rejected. #[tracing::instrument(level = "debug", skip(self))] - pub async fn change_membership(&self, members: HashSet) -> Result<(), ChangeConfigError> { + pub async fn change_membership(&self, members: BTreeSet) -> Result<(), ChangeConfigError> { let (tx, rx) = oneshot::channel(); self.inner .tx_api @@ -339,7 +339,7 @@ pub(crate) enum RaftMsg { tx: ClientReadResponseTx, }, Initialize { - members: HashSet, + members: BTreeSet, tx: oneshot::Sender>, }, AddNonVoter { @@ -347,7 +347,7 @@ pub(crate) enum RaftMsg { tx: ChangeMembershipTx, }, ChangeMembership { - members: HashSet, + members: BTreeSet, tx: ChangeMembershipTx, }, } @@ -484,16 +484,16 @@ pub struct EntrySnapshotPointer { #[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct MembershipConfig { /// All members of the Raft cluster. - pub members: HashSet, + pub members: BTreeSet, /// All members of the Raft cluster after joint consensus is finalized. /// /// The presence of a value here indicates that the config is in joint consensus. - pub members_after_consensus: Option>, + pub members_after_consensus: Option>, } impl MembershipConfig { /// Get an iterator over all nodes in the current config. - pub fn all_nodes(&self) -> HashSet { + pub fn all_nodes(&self) -> BTreeSet { let mut all = self.members.clone(); if let Some(members) = &self.members_after_consensus { all.extend(members); @@ -520,7 +520,7 @@ impl MembershipConfig { /// Create a new initial config containing only the given node ID. pub fn new_initial(id: NodeId) -> Self { - let mut members = HashSet::new(); + let mut members = BTreeSet::new(); members.insert(id); Self { members, diff --git a/async-raft/tests/add_remove_voter.rs b/async-raft/tests/add_remove_voter.rs index da5128525..c5deb8883 100644 --- a/async-raft/tests/add_remove_voter.rs +++ b/async-raft/tests/add_remove_voter.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; @@ -7,7 +7,7 @@ use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; use futures::stream::StreamExt; -use maplit::hashset; +use maplit::btreeset; mod fixtures; @@ -19,7 +19,7 @@ mod fixtures; /// - add 4 non-voter as follower. /// - asserts that the leader was able to successfully commit logs and that the followers has successfully replicated /// the payload. -/// - remove one folower: node-4 +/// - remove one follower: node-4 /// - asserts node-4 becomes non-voter and the leader stops sending logs to it. /// /// RUST_LOG=async_raft,memstore,add_remove_voter=trace cargo test -p async-raft --test add_remove_voter @@ -28,8 +28,8 @@ async fn add_remove_voter() -> Result<()> { fixtures::init_tracing(); let timeout = Duration::from_millis(500); - let all_members = hashset![0, 1, 2, 3, 4]; - let left_members = hashset![0, 1, 2, 3]; + let all_members = btreeset![0, 1, 2, 3, 4]; + let left_members = btreeset![0, 1, 2, 3]; // Setup test dependencies. let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); @@ -62,7 +62,7 @@ async fn add_remove_voter() -> Result<()> { router.initialize_from_single_node(0).await?; want = 1; - wait_log(router.clone(), &hashset![0], want).await?; + wait_log(router.clone(), &btreeset![0], want).await?; router.assert_stable_cluster(Some(1), Some(want)).await; // Sync some new nodes. @@ -123,7 +123,7 @@ async fn add_remove_voter() -> Result<()> { Ok(()) } -async fn wait_log(router: std::sync::Arc, node_ids: &HashSet, want_log: u64) -> Result<()> { +async fn wait_log(router: std::sync::Arc, node_ids: &BTreeSet, want_log: u64) -> Result<()> { let timeout = Duration::from_millis(500); for i in node_ids.iter() { router diff --git a/async-raft/tests/api_install_snapshot.rs b/async-raft/tests/api_install_snapshot.rs index 94e0b91fa..cf5e86d90 100644 --- a/async-raft/tests/api_install_snapshot.rs +++ b/async-raft/tests/api_install_snapshot.rs @@ -9,7 +9,7 @@ use async_raft::LogId; use async_raft::SnapshotMeta; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// API test: install_snapshot with various condition. /// @@ -33,13 +33,13 @@ async fn snapshot_ge_half_threshold() -> Result<()> { { router.new_raft_node(0).await; - router.wait_for_log(&hashset![0], want, None, "empty").await?; - router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, None, "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, None, "init leader").await?; + router.wait_for_log(&btreeset![0], want, None, "init leader").await?; router.assert_stable_cluster(Some(1), Some(want)).await; } diff --git a/async-raft/tests/client_reads.rs b/async-raft/tests/client_reads.rs index 3f79a3d42..9f2d42645 100644 --- a/async-raft/tests/client_reads.rs +++ b/async-raft/tests/client_reads.rs @@ -6,7 +6,7 @@ use anyhow::Result; use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// Client read tests. /// @@ -31,8 +31,8 @@ async fn client_reads() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0, 1, 2], want, None, "empty node").await?; - router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty node").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "empty node").await?; + router.wait_for_state(&btreeset![0, 1, 2], State::NonVoter, None, "empty node").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -40,7 +40,7 @@ async fn client_reads() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0, 1, 2], want, None, "init leader").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "init leader").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Get the ID of the leader, and assert that client_read succeeds. diff --git a/async-raft/tests/client_writes.rs b/async-raft/tests/client_writes.rs index cece2d841..5d401d80c 100644 --- a/async-raft/tests/client_writes.rs +++ b/async-raft/tests/client_writes.rs @@ -7,7 +7,7 @@ use async_raft::LogId; use async_raft::State; use fixtures::RaftRouter; use futures::prelude::*; -use maplit::hashset; +use maplit::btreeset; mod fixtures; @@ -34,8 +34,8 @@ async fn client_writes() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0, 1, 2], want, None, "empty").await?; - router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "empty").await?; + router.wait_for_state(&btreeset![0, 1, 2], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -43,8 +43,8 @@ async fn client_writes() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0, 1, 2], want, None, "leader init log").await?; - router.wait_for_state(&hashset![0], State::Leader, None, "init").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "leader init log").await?; + router.wait_for_state(&btreeset![0], State::Leader, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; @@ -60,7 +60,7 @@ async fn client_writes() -> Result<()> { while clients.next().await.is_some() {} want = 6001; - router.wait_for_log(&hashset![0, 1, 2], want, None, "sync logs").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "sync logs").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // The extra 1 is from the leader's initial commit entry. router @@ -70,7 +70,7 @@ async fn client_writes() -> Result<()> { Some(0), LogId { term: 1, index: want }, Some(((5000..5100).into(), 1, MembershipConfig { - members: hashset![0, 1, 2], + members: btreeset![0, 1, 2], members_after_consensus: None, })), ) diff --git a/async-raft/tests/compaction.rs b/async-raft/tests/compaction.rs index 65972d4de..cf1ffa7d1 100644 --- a/async-raft/tests/compaction.rs +++ b/async-raft/tests/compaction.rs @@ -9,7 +9,7 @@ use async_raft::LogId; use async_raft::SnapshotPolicy; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// Compaction test. /// @@ -39,8 +39,8 @@ async fn compaction() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0], want, None, "empty").await?; - router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, None, "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; @@ -49,7 +49,7 @@ async fn compaction() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, None, "init leader").await?; + router.wait_for_log(&btreeset![0], want, None, "init leader").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Send enough requests to the cluster that compaction on the node should be triggered. @@ -57,9 +57,9 @@ async fn compaction() -> Result<()> { router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await; want = snapshot_threshold; - router.wait_for_log(&hashset![0], want, None, "write").await?; + router.wait_for_log(&btreeset![0], want, None, "write").await?; router.assert_stable_cluster(Some(1), Some(want)).await; - router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?; + router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?; router .assert_storage_state( 1, @@ -67,7 +67,7 @@ async fn compaction() -> Result<()> { Some(0), LogId { term: 1, index: want }, Some((want.into(), 1, MembershipConfig { - members: hashset![0], + members: btreeset![0], members_after_consensus: None, })), ) @@ -82,9 +82,9 @@ async fn compaction() -> Result<()> { router.client_request_many(0, "0", 1).await; want += 1; - router.wait_for_log(&hashset![0, 1], want, None, "add follower").await?; + router.wait_for_log(&btreeset![0, 1], want, None, "add follower").await?; let expected_snap = Some((snapshot_threshold.into(), 1, MembershipConfig { - members: hashset![0u64], + members: btreeset![0u64], members_after_consensus: None, })); router diff --git a/async-raft/tests/concurrent_write_and_add_non_voter.rs b/async-raft/tests/concurrent_write_and_add_non_voter.rs index 9aff36fb2..86852fe8c 100644 --- a/async-raft/tests/concurrent_write_and_add_non_voter.rs +++ b/async-raft/tests/concurrent_write_and_add_non_voter.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; @@ -6,7 +6,7 @@ use anyhow::Result; use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; mod fixtures; @@ -41,7 +41,7 @@ async fn concurrent_write_and_add_non_voter() -> Result<()> { fixtures::init_tracing(); let timeout = Duration::from_millis(500); - let candidates = hashset![0, 1, 2]; + let candidates = btreeset![0, 1, 2]; // Setup test dependencies. let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); @@ -56,7 +56,7 @@ async fn concurrent_write_and_add_non_voter() -> Result<()> { router.initialize_from_single_node(0).await?; want = 1; - wait_log(router.clone(), &hashset![0], want).await?; + wait_log(router.clone(), &btreeset![0], want).await?; } tracing::info!("--- adding two candidate nodes"); @@ -130,7 +130,7 @@ async fn concurrent_write_and_add_non_voter() -> Result<()> { async fn wait_log( router: std::sync::Arc, - node_ids: &HashSet, + node_ids: &BTreeSet, want_log: u64, ) -> anyhow::Result<()> { let timeout = Duration::from_millis(500); diff --git a/async-raft/tests/current_leader.rs b/async-raft/tests/current_leader.rs index ddff785d5..d3afb6599 100644 --- a/async-raft/tests/current_leader.rs +++ b/async-raft/tests/current_leader.rs @@ -6,7 +6,7 @@ use anyhow::Result; use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// Current leader tests. /// @@ -30,8 +30,8 @@ async fn current_leader() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0, 1, 2], want, None, "empty").await?; - router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "empty").await?; + router.wait_for_state(&btreeset![0, 1, 2], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -39,7 +39,7 @@ async fn current_leader() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0, 1, 2], want, None, "init").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // Get the ID of the leader, and assert that current_leader succeeds. diff --git a/async-raft/tests/dynamic_membership.rs b/async-raft/tests/dynamic_membership.rs index 83041488c..7d6eaf765 100644 --- a/async-raft/tests/dynamic_membership.rs +++ b/async-raft/tests/dynamic_membership.rs @@ -8,7 +8,7 @@ use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; use futures::stream::StreamExt; -use maplit::hashset; +use maplit::btreeset; use tokio::time::sleep; /// Dynamic membership test. @@ -34,8 +34,8 @@ async fn dynamic_membership() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0], want, None, "empty").await?; - router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, None, "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -43,7 +43,7 @@ async fn dynamic_membership() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, None, "init").await?; + router.wait_for_log(&btreeset![0], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // Sync some new nodes. @@ -63,10 +63,10 @@ async fn dynamic_membership() -> Result<()> { } tracing::info!("--- changing cluster config"); - router.change_membership(0, hashset![0, 1, 2, 3, 4]).await?; + router.change_membership(0, btreeset![0, 1, 2, 3, 4]).await?; want += 2; - router.wait_for_log(&hashset![0, 1, 2, 3, 4], want, None, "cluster of 5 candidates").await?; + router.wait_for_log(&btreeset![0, 1, 2, 3, 4], want, None, "cluster of 5 candidates").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // Still in term 1, so leader is still node 0. // Isolate old leader and assert that a new leader takes over. diff --git a/async-raft/tests/fixtures/mod.rs b/async-raft/tests/fixtures/mod.rs index 7014e851c..e8ba2fd98 100644 --- a/async-raft/tests/fixtures/mod.rs +++ b/async-raft/tests/fixtures/mod.rs @@ -3,6 +3,7 @@ #![allow(dead_code)] use std::collections::BTreeMap; +use std::collections::BTreeSet; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -134,7 +135,7 @@ impl RaftRouter { pub async fn initialize_from_single_node(&self, node: NodeId) -> Result<()> { tracing::info!({ node }, "initializing cluster from single node"); let rt = self.routing_table.read().await; - let members: HashSet = rt.keys().cloned().collect(); + let members: BTreeSet = rt.keys().cloned().collect(); rt.get(&node) .ok_or_else(|| anyhow!("node {} not found in routing table", node))? .0 @@ -144,7 +145,7 @@ impl RaftRouter { } /// Initialize cluster with specified node ids. - pub async fn initialize_with(&self, node: NodeId, members: HashSet) -> Result<()> { + pub async fn initialize_with(&self, node: NodeId, members: BTreeSet) -> Result<()> { tracing::info!({ node }, "initializing cluster from single node"); let rt = self.routing_table.read().await; rt.get(&node) @@ -207,7 +208,7 @@ impl RaftRouter { #[tracing::instrument(level = "info", skip(self))] pub async fn wait_for_log( &self, - node_ids: &HashSet, + node_ids: &BTreeSet, want_log: u64, timeout: Option, msg: &str, @@ -222,7 +223,7 @@ impl RaftRouter { #[tracing::instrument(level = "info", skip(self))] pub async fn wait_for_state( &self, - node_ids: &HashSet, + node_ids: &BTreeSet, want_state: State, timeout: Option, msg: &str, @@ -237,7 +238,7 @@ impl RaftRouter { #[tracing::instrument(level = "info", skip(self))] pub async fn wait_for_snapshot( &self, - node_ids: &HashSet, + node_ids: &BTreeSet, want: LogId, timeout: Option, msg: &str, @@ -277,7 +278,7 @@ impl RaftRouter { node.0.add_non_voter(target).await } - pub async fn change_membership(&self, leader: NodeId, members: HashSet) -> Result<(), ChangeConfigError> { + pub async fn change_membership(&self, leader: NodeId, members: BTreeSet) -> Result<(), ChangeConfigError> { let rt = self.routing_table.read().await; let node = rt.get(&leader).unwrap_or_else(|| panic!("node with ID {} does not exist", leader)); node.0.change_membership(members).await diff --git a/async-raft/tests/initialization.rs b/async-raft/tests/initialization.rs index f0e573d48..bf37b07a4 100644 --- a/async-raft/tests/initialization.rs +++ b/async-raft/tests/initialization.rs @@ -8,7 +8,7 @@ use async_raft::raft::MembershipConfig; use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// Cluster initialization test. /// @@ -36,8 +36,8 @@ async fn initialization() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0, 1, 2], want, None, "empty").await?; - router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "empty").await?; + router.wait_for_state(&btreeset![0, 1, 2], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -45,7 +45,7 @@ async fn initialization() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0, 1, 2], want, None, "init").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; for i in 0..3 { @@ -59,12 +59,12 @@ async fn initialization() -> Result<()> { panic!("expect ConfigChange payload") } }; - assert_eq!(hashset![0, 1, 2], mem.members); + assert_eq!(btreeset![0, 1, 2], mem.members); let sm_mem = sto.get_state_machine().await.last_membership.clone(); assert_eq!( Some(MembershipConfig { - members: hashset![0, 1, 2], + members: btreeset![0, 1, 2], members_after_consensus: None, }), sm_mem diff --git a/async-raft/tests/lagging_network_write.rs b/async-raft/tests/lagging_network_write.rs index 19ac16591..9569b1087 100644 --- a/async-raft/tests/lagging_network_write.rs +++ b/async-raft/tests/lagging_network_write.rs @@ -6,7 +6,7 @@ use anyhow::Result; use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// Lagging network test. /// @@ -39,8 +39,8 @@ async fn lagging_network_write() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0], want, timeout, "empty").await?; - router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, timeout, "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -48,8 +48,8 @@ async fn lagging_network_write() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, timeout, "init").await?; - router.wait_for_state(&hashset![0], State::Leader, None, "init").await?; + router.wait_for_log(&btreeset![0], want, timeout, "init").await?; + router.wait_for_state(&btreeset![0], State::Leader, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // Sync some new nodes. @@ -59,21 +59,21 @@ async fn lagging_network_write() -> Result<()> { router.new_raft_node(2).await; router.add_non_voter(0, 2).await?; - router.wait_for_log(&hashset![1, 2], want, timeout, "non-voter init").await?; + router.wait_for_log(&btreeset![1, 2], want, timeout, "non-voter init").await?; router.client_request_many(0, "client", 1).await; want += 1; - router.wait_for_log(&hashset![0, 1, 2], want, timeout, "write one log").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, timeout, "write one log").await?; - router.change_membership(0, hashset![0, 1, 2]).await?; + router.change_membership(0, btreeset![0, 1, 2]).await?; want += 2; - router.wait_for_state(&hashset![0], State::Leader, None, "changed").await?; - router.wait_for_state(&hashset![1, 2], State::Follower, None, "changed").await?; - router.wait_for_log(&hashset![0, 1, 2], want, timeout, "3 candidates").await?; + router.wait_for_state(&btreeset![0], State::Leader, None, "changed").await?; + router.wait_for_state(&btreeset![1, 2], State::Follower, None, "changed").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, timeout, "3 candidates").await?; router.client_request_many(0, "client", 1).await; want += 1; - router.wait_for_log(&hashset![0, 1, 2], want, timeout, "write 2nd log").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, timeout, "write 2nd log").await?; Ok(()) } diff --git a/async-raft/tests/leader_metrics.rs b/async-raft/tests/leader_metrics.rs index 321c75d4e..0e72bfce7 100644 --- a/async-raft/tests/leader_metrics.rs +++ b/async-raft/tests/leader_metrics.rs @@ -11,7 +11,7 @@ use async_raft::State; use fixtures::RaftRouter; use futures::stream::StreamExt; use maplit::hashmap; -use maplit::hashset; +use maplit::btreeset; mod fixtures; @@ -32,8 +32,8 @@ async fn leader_metrics() -> Result<()> { fixtures::init_tracing(); let timeout = Some(Duration::from_millis(1000)); - let all_members = hashset![0, 1, 2, 3, 4]; - let left_members = hashset![0, 1, 2, 3]; + let all_members = btreeset![0, 1, 2, 3, 4]; + let left_members = btreeset![0, 1, 2, 3]; // Setup test dependencies. let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); @@ -42,8 +42,8 @@ async fn leader_metrics() -> Result<()> { // Assert all nodes are in non-voter state & have no entries. let mut want = 0; - router.wait_for_log(&hashset![0], want, timeout, "init").await?; - router.wait_for_state(&hashset![0], State::NonVoter, timeout, "init").await?; + router.wait_for_log(&btreeset![0], want, timeout, "init").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, timeout, "init").await?; router.assert_pristine_cluster().await; @@ -52,7 +52,7 @@ async fn leader_metrics() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, timeout, "init cluster").await?; + router.wait_for_log(&btreeset![0], want, timeout, "init cluster").await?; router.assert_stable_cluster(Some(1), Some(want)).await; router @@ -167,7 +167,7 @@ async fn leader_metrics() -> Result<()> { }) .await?; - router.wait_for_state(&hashset![0], State::Candidate, timeout, "node 0 to candidate").await?; + router.wait_for_state(&btreeset![0], State::Candidate, timeout, "node 0 to candidate").await?; router .wait_for_metrics( diff --git a/async-raft/tests/members_012_to_234.rs b/async-raft/tests/members_012_to_234.rs index 4f5a8eade..0ad001dcb 100644 --- a/async-raft/tests/members_012_to_234.rs +++ b/async-raft/tests/members_012_to_234.rs @@ -7,7 +7,7 @@ use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; use futures::stream::StreamExt; -use maplit::hashset; +use maplit::btreeset; /// Replace membership with another one with only one common node. /// To reproduce the bug that new config does not actually examine the term/index of non-voter, but instead only @@ -30,8 +30,8 @@ async fn members_012_to_234() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0], want, None, "empty").await?; - router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, None, "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -39,7 +39,7 @@ async fn members_012_to_234() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, None, "init").await?; + router.wait_for_log(&btreeset![0], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; tracing::info!("--- adding 4 new nodes to cluster"); @@ -64,23 +64,23 @@ async fn members_012_to_234() -> Result<()> { router.isolate_node(4).await; tracing::info!("--- changing config to 0,1,2"); - router.change_membership(0, hashset![0, 1, 2]).await?; + router.change_membership(0, btreeset![0, 1, 2]).await?; want += 2; - router.wait_for_log(&hashset![0, 1, 2], want, None, "cluster of 0,1,2").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "cluster of 0,1,2").await?; tracing::info!("--- changing config to 2,3,4"); { let router = router.clone(); // this is expected to be blocked since 3 and 4 are isolated. tokio::spawn(async move { - router.change_membership(0, hashset![2, 3, 4]).await?; + router.change_membership(0, btreeset![2, 3, 4]).await?; Ok::<(), anyhow::Error>(()) }); } want += 1; - let wait_rst = router.wait_for_log(&hashset![0], want, None, "cluster of joint").await; + let wait_rst = router.wait_for_log(&btreeset![0], want, None, "cluster of joint").await; // the first step of joint should not pass because the new config can not constitute a quorum assert!(wait_rst.is_err()); diff --git a/async-raft/tests/metrics_state_machine_consistency.rs b/async-raft/tests/metrics_state_machine_consistency.rs index 5d84e60b1..13e15f5bb 100644 --- a/async-raft/tests/metrics_state_machine_consistency.rs +++ b/async-raft/tests/metrics_state_machine_consistency.rs @@ -4,7 +4,7 @@ use anyhow::Result; use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; mod fixtures; @@ -32,8 +32,8 @@ async fn metrics_state_machine_consistency() -> Result<()> { tracing::info!("--- initializing single node cluster"); // Wait for node 0 to become leader. - router.initialize_with(0, hashset![0]).await?; - router.wait_for_state(&hashset![0], State::Leader, None, "init").await?; + router.initialize_with(0, btreeset![0]).await?; + router.wait_for_state(&btreeset![0], State::Leader, None, "init").await?; tracing::info!("--- add one non-voter"); router.add_non_voter(0, 1).await?; @@ -46,7 +46,7 @@ async fn metrics_state_machine_consistency() -> Result<()> { tracing::info!("--- wait for log to sync"); let want = 2u64; for node_id in 0..2 { - router.wait_for_log(&hashset![node_id], want, None, "write one log").await?; + router.wait_for_log(&btreeset![node_id], want, None, "write one log").await?; let sto = router.get_storage_handle(&node_id).await?; assert!(sto.get_state_machine().await.client_status.get("foo").is_some()); } diff --git a/async-raft/tests/metrics_wait.rs b/async-raft/tests/metrics_wait.rs index a4cf8bf02..d43e69b62 100644 --- a/async-raft/tests/metrics_wait.rs +++ b/async-raft/tests/metrics_wait.rs @@ -6,7 +6,7 @@ use async_raft::metrics::WaitError; use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; mod fixtures; @@ -27,7 +27,7 @@ async fn metrics_wait() -> Result<()> { let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); - let cluster = hashset![0]; + let cluster = btreeset![0]; router.new_raft_node(0).await; router.initialize_with(0, cluster.clone()).await?; router.wait_for_state(&cluster, State::Leader, None, "init").await?; diff --git a/async-raft/tests/non_voter_restart.rs b/async-raft/tests/non_voter_restart.rs index 538799712..6b31b1968 100644 --- a/async-raft/tests/non_voter_restart.rs +++ b/async-raft/tests/non_voter_restart.rs @@ -7,7 +7,7 @@ use async_raft::NodeId; use async_raft::Raft; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; use tokio::time::sleep; use crate::fixtures::MemRaft; @@ -40,20 +40,20 @@ async fn non_voter_restart() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0, 1], want, None, "empty").await?; - router.wait_for_state(&hashset![0, 1], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0, 1], want, None, "empty").await?; + router.wait_for_state(&btreeset![0, 1], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; tracing::info!("--- initializing single node cluster"); - router.initialize_with(0, hashset![0]).await?; + router.initialize_with(0, btreeset![0]).await?; want += 1; router.add_non_voter(0, 1).await?; router.client_request(0, "foo", 1).await; want += 1; - router.wait_for_log(&hashset![0, 1], want, None, "write one log").await?; + router.wait_for_log(&btreeset![0, 1], want, None, "write one log").await?; let (node0, _sto0) = router.remove_node(0).await.unwrap(); assert_node_state(0, &node0, 1, 2, State::Leader); diff --git a/async-raft/tests/shutdown.rs b/async-raft/tests/shutdown.rs index 2e61ba7d9..9fe35014c 100644 --- a/async-raft/tests/shutdown.rs +++ b/async-raft/tests/shutdown.rs @@ -7,7 +7,7 @@ use anyhow::Result; use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// Cluster shutdown test. /// @@ -32,8 +32,8 @@ async fn initialization() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0, 1, 2], want, None, "empty").await?; - router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "empty").await?; + router.wait_for_state(&btreeset![0, 1, 2], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -41,7 +41,7 @@ async fn initialization() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0, 1, 2], want, None, "init").await?; + router.wait_for_log(&btreeset![0, 1, 2], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(1)).await; tracing::info!("--- performing node shutdowns"); diff --git a/async-raft/tests/singlenode.rs b/async-raft/tests/singlenode.rs index 3902b9a95..43fc99844 100644 --- a/async-raft/tests/singlenode.rs +++ b/async-raft/tests/singlenode.rs @@ -7,7 +7,7 @@ use async_raft::Config; use async_raft::LogId; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// Single-node cluster initialization test. /// @@ -32,8 +32,8 @@ async fn singlenode() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0], want, None, "empty").await?; - router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, None, "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -41,7 +41,7 @@ async fn singlenode() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, None, "init").await?; + router.wait_for_log(&btreeset![0], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Write some data to the single node cluster. diff --git a/async-raft/tests/snapshot_chunk_size.rs b/async-raft/tests/snapshot_chunk_size.rs index 282f51e52..c63b2a467 100644 --- a/async-raft/tests/snapshot_chunk_size.rs +++ b/async-raft/tests/snapshot_chunk_size.rs @@ -9,7 +9,7 @@ use async_raft::LogId; use async_raft::SnapshotPolicy; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// Test transfer snapshot in small chnuks /// @@ -42,13 +42,13 @@ async fn snapshot_chunk_size() -> Result<()> { { router.new_raft_node(0).await; - router.wait_for_log(&hashset![0], want, None, "empty").await?; - router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, None, "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, None, "init leader").await?; + router.wait_for_log(&btreeset![0], want, None, "init leader").await?; } tracing::info!("--- send just enough logs to trigger snapshot"); @@ -57,12 +57,12 @@ async fn snapshot_chunk_size() -> Result<()> { want = snapshot_threshold; let want_snap = Some((want.into(), 1, MembershipConfig { - members: hashset![0u64], + members: btreeset![0u64], members_after_consensus: None, })); - router.wait_for_log(&hashset![0], want, None, "send log to trigger snapshot").await?; - router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?; + router.wait_for_log(&btreeset![0], want, None, "send log to trigger snapshot").await?; + router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?; router.assert_storage_state(1, want, Some(0), LogId { term: 1, index: want }, want_snap).await; } @@ -72,12 +72,12 @@ async fn snapshot_chunk_size() -> Result<()> { router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter"); let want_snap = Some((want.into(), 1, MembershipConfig { - members: hashset![0u64], + members: btreeset![0u64], members_after_consensus: None, })); - router.wait_for_log(&hashset![0, 1], want, None, "add non-voter").await?; - router.wait_for_snapshot(&hashset![1], LogId { term: 1, index: want }, None, "").await?; + router.wait_for_log(&btreeset![0, 1], want, None, "add non-voter").await?; + router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, None, "").await?; router .assert_storage_state( 1, diff --git a/async-raft/tests/snapshot_ge_half_threshold.rs b/async-raft/tests/snapshot_ge_half_threshold.rs index a289f1f4b..8fb0d8156 100644 --- a/async-raft/tests/snapshot_ge_half_threshold.rs +++ b/async-raft/tests/snapshot_ge_half_threshold.rs @@ -9,7 +9,7 @@ use async_raft::LogId; use async_raft::SnapshotPolicy; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// A leader should create and send snapshot when snapshot is old and is not that old to trigger a snapshot, i.e.: /// `threshold/2 < leader.last_log_index - snapshot.applied_index < threshold` @@ -45,12 +45,12 @@ async fn snapshot_ge_half_threshold() -> Result<()> { { router.new_raft_node(0).await; - router.wait_for_log(&hashset![0], want, None, "empty").await?; - router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, None, "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, None, "init leader").await?; + router.wait_for_log(&btreeset![0], want, None, "init leader").await?; router.assert_stable_cluster(Some(1), Some(want)).await; } @@ -59,10 +59,10 @@ async fn snapshot_ge_half_threshold() -> Result<()> { router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await; want = snapshot_threshold; - router.wait_for_log(&hashset![0], want, None, "send log to trigger snapshot").await?; + router.wait_for_log(&btreeset![0], want, None, "send log to trigger snapshot").await?; router.assert_stable_cluster(Some(1), Some(want)).await; - router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?; + router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?; router .assert_storage_state( 1, @@ -70,7 +70,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> { Some(0), LogId { term: 1, index: want }, Some((want.into(), 1, MembershipConfig { - members: hashset![0], + members: btreeset![0], members_after_consensus: None, })), ) @@ -88,12 +88,12 @@ async fn snapshot_ge_half_threshold() -> Result<()> { router.new_raft_node(1).await; router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter"); - router.wait_for_log(&hashset![0, 1], want, None, "add non-voter").await?; + router.wait_for_log(&btreeset![0, 1], want, None, "add non-voter").await?; let expected_snap = Some((want.into(), 1, MembershipConfig { - members: hashset![0u64], + members: btreeset![0u64], members_after_consensus: None, })); - router.wait_for_snapshot(&hashset![1], LogId { term: 1, index: want }, None, "").await?; + router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, None, "").await?; router .assert_storage_state( 1, diff --git a/async-raft/tests/snapshot_overrides_membership.rs b/async-raft/tests/snapshot_overrides_membership.rs index 19b409aee..244450693 100644 --- a/async-raft/tests/snapshot_overrides_membership.rs +++ b/async-raft/tests/snapshot_overrides_membership.rs @@ -15,7 +15,7 @@ use async_raft::RaftStorage; use async_raft::SnapshotPolicy; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// Test membership info is sync correctly along with snapshot. /// @@ -47,12 +47,12 @@ async fn snapshot_overrides_membership() -> Result<()> { { router.new_raft_node(0).await; - router.wait_for_log(&hashset![0], want, None, "empty").await?; - router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, None, "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, None, "init leader").await?; + router.wait_for_log(&btreeset![0], want, None, "init leader").await?; router.assert_stable_cluster(Some(1), Some(want)).await; } @@ -61,10 +61,10 @@ async fn snapshot_overrides_membership() -> Result<()> { router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await; want = snapshot_threshold; - router.wait_for_log(&hashset![0], want, None, "send log to trigger snapshot").await?; + router.wait_for_log(&btreeset![0], want, None, "send log to trigger snapshot").await?; router.assert_stable_cluster(Some(1), Some(want)).await; - router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?; + router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?; router .assert_storage_state( 1, @@ -72,7 +72,7 @@ async fn snapshot_overrides_membership() -> Result<()> { Some(0), LogId { term: 1, index: want }, Some((want.into(), 1, MembershipConfig { - members: hashset![0], + members: btreeset![0], members_after_consensus: None, })), ) @@ -95,7 +95,7 @@ async fn snapshot_overrides_membership() -> Result<()> { log_id: LogId { term: 1, index: 1 }, payload: EntryPayload::ConfigChange(EntryConfigChange { membership: MembershipConfig { - members: hashset![2, 3], + members: btreeset![2, 3], members_after_consensus: None, }, }), @@ -109,7 +109,7 @@ async fn snapshot_overrides_membership() -> Result<()> { let m = sto.get_membership_config().await?; assert_eq!( MembershipConfig { - members: hashset![2, 3], + members: btreeset![2, 3], members_after_consensus: None }, m @@ -121,12 +121,12 @@ async fn snapshot_overrides_membership() -> Result<()> { { router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter"); - router.wait_for_log(&hashset![0, 1], want, None, "add non-voter").await?; + router.wait_for_log(&btreeset![0, 1], want, None, "add non-voter").await?; let expected_snap = Some((want.into(), 1, MembershipConfig { - members: hashset![0u64], + members: btreeset![0u64], members_after_consensus: None, })); - router.wait_for_snapshot(&hashset![1], LogId { term: 1, index: want }, None, "").await?; + router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, None, "").await?; router .assert_storage_state( 1, @@ -140,7 +140,7 @@ async fn snapshot_overrides_membership() -> Result<()> { let m = sto.get_membership_config().await?; assert_eq!( MembershipConfig { - members: hashset![0], + members: btreeset![0], members_after_consensus: None }, m, diff --git a/async-raft/tests/snapshot_uses_prev_snap_membership.rs b/async-raft/tests/snapshot_uses_prev_snap_membership.rs index 8b7167929..710b76404 100644 --- a/async-raft/tests/snapshot_uses_prev_snap_membership.rs +++ b/async-raft/tests/snapshot_uses_prev_snap_membership.rs @@ -10,7 +10,7 @@ use async_raft::RaftStorage; use async_raft::SnapshotPolicy; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; /// Test a second compaction should not lose membership. /// @@ -43,21 +43,21 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { { router.new_raft_node(0).await; - router.wait_for_log(&hashset![0], want, None, "empty").await?; - router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, None, "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, None, "init").await?; - router.wait_for_state(&hashset![0], State::Leader, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, None, "init").await?; + router.wait_for_state(&btreeset![0], State::Leader, None, "empty").await?; router.new_raft_node(1).await; router.add_non_voter(0, 1).await?; - router.change_membership(0, hashset![0, 1]).await?; + router.change_membership(0, btreeset![0, 1]).await?; want += 2; - router.wait_for_log(&hashset![0, 1], want, None, "cluster of 2").await?; + router.wait_for_log(&btreeset![0, 1], want, None, "cluster of 2").await?; } let sto = router.get_storage_handle(&0).await?; @@ -67,13 +67,13 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await; want = snapshot_threshold; - router.wait_for_log(&hashset![0, 1], want, None, "send log to trigger snapshot").await?; - router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?; + router.wait_for_log(&btreeset![0, 1], want, None, "send log to trigger snapshot").await?; + router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?; let m = sto.get_membership_config().await?; assert_eq!( MembershipConfig { - members: hashset![0, 1], + members: btreeset![0, 1], members_after_consensus: None }, m, @@ -91,7 +91,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { // Some(0), // want, // Some((want.into(), 1, MembershipConfig { - // members: hashset![0, 1], + // members: btreeset![0, 1], // members_after_consensus: None, // })), // ) @@ -103,8 +103,8 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { router.client_request_many(0, "0", (snapshot_threshold * 2 - want) as usize).await; want = snapshot_threshold * 2; - router.wait_for_log(&hashset![0, 1], want, None, "send log to trigger snapshot").await?; - router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?; + router.wait_for_log(&btreeset![0, 1], want, None, "send log to trigger snapshot").await?; + router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?; } tracing::info!("--- check membership"); @@ -112,7 +112,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { let m = sto.get_membership_config().await?; assert_eq!( MembershipConfig { - members: hashset![0, 1], + members: btreeset![0, 1], members_after_consensus: None }, m, diff --git a/async-raft/tests/state_machien_apply_membership.rs b/async-raft/tests/state_machien_apply_membership.rs index b016f3775..47c0fd26f 100644 --- a/async-raft/tests/state_machien_apply_membership.rs +++ b/async-raft/tests/state_machien_apply_membership.rs @@ -8,7 +8,7 @@ use async_raft::Config; use async_raft::State; use fixtures::RaftRouter; use futures::stream::StreamExt; -use maplit::hashset; +use maplit::btreeset; /// All log should be applied to state machine. /// @@ -31,8 +31,8 @@ async fn state_machine_apply_membership() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0], want, None, "empty").await?; - router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, None, "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -40,7 +40,7 @@ async fn state_machine_apply_membership() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0], want, None, "init").await?; + router.wait_for_log(&btreeset![0], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; for i in 0..=0 { @@ -48,7 +48,7 @@ async fn state_machine_apply_membership() -> Result<()> { let sm = sto.get_state_machine().await; assert_eq!( Some(MembershipConfig { - members: hashset![0], + members: btreeset![0], members_after_consensus: None }), sm.last_membership @@ -72,10 +72,10 @@ async fn state_machine_apply_membership() -> Result<()> { } tracing::info!("--- changing cluster config"); - router.change_membership(0, hashset![0, 1, 2]).await?; + router.change_membership(0, btreeset![0, 1, 2]).await?; want += 2; - router.wait_for_log(&hashset![0, 1, 2, 3, 4], want, None, "cluster of 5 candidates").await?; + router.wait_for_log(&btreeset![0, 1, 2, 3, 4], want, None, "cluster of 5 candidates").await?; tracing::info!("--- check applied membership config"); for i in 0..5 { @@ -83,7 +83,7 @@ async fn state_machine_apply_membership() -> Result<()> { let sm = sto.get_state_machine().await; assert_eq!( Some(MembershipConfig { - members: hashset![0, 1, 2], + members: btreeset![0, 1, 2], members_after_consensus: None }), sm.last_membership diff --git a/async-raft/tests/stepdown.rs b/async-raft/tests/stepdown.rs index 556b4553e..28550aef3 100644 --- a/async-raft/tests/stepdown.rs +++ b/async-raft/tests/stepdown.rs @@ -8,7 +8,7 @@ use async_raft::Config; use async_raft::LogId; use async_raft::State; use fixtures::RaftRouter; -use maplit::hashset; +use maplit::btreeset; use tokio::time::sleep; /// Client write tests. @@ -34,8 +34,8 @@ async fn stepdown() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&hashset![0, 1], want, None, "empty").await?; - router.wait_for_state(&hashset![0, 1], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0, 1], want, None, "empty").await?; + router.wait_for_state(&btreeset![0, 1], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -43,7 +43,7 @@ async fn stepdown() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&hashset![0, 1], want, None, "init").await?; + router.wait_for_log(&btreeset![0, 1], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Submit a config change which adds two new nodes and removes the current leader. @@ -51,17 +51,17 @@ async fn stepdown() -> Result<()> { assert_eq!(0, orig_leader, "expected original leader to be node 0"); router.new_raft_node(2).await; router.new_raft_node(3).await; - router.change_membership(orig_leader, hashset![1, 2, 3]).await?; + router.change_membership(orig_leader, btreeset![1, 2, 3]).await?; want += 2; for id in 0..4 { if id == orig_leader { - router.wait_for_log(&hashset![id], want, None, "update membership: 1, 2, 3; old leader").await?; + router.wait_for_log(&btreeset![id], want, None, "update membership: 1, 2, 3; old leader").await?; } else { // a new leader elected and propose a log router .wait_for_log( - &hashset![id], + &btreeset![id], want + 1, None, "update membership: 1, 2, 3; new candidate", @@ -103,7 +103,7 @@ async fn stepdown() -> Result<()> { ); assert_eq!( cfg.members, - hashset![1, 2, 3], + btreeset![1, 2, 3], "expected old leader to have membership of [1, 2, 3], got {:?}", cfg.members ); diff --git a/memstore/src/test.rs b/memstore/src/test.rs index e8981ae89..d4d07fa53 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::BTreeSet; use async_raft::raft::EntryConfigChange; use async_raft::raft::EntryNormal; @@ -24,7 +24,7 @@ async fn test_get_membership_config_default() -> Result<()> { #[tokio::test] async fn test_get_membership_config_with_previous_state() -> Result<()> { let mut log = BTreeMap::new(); - let mut members: HashSet = Default::default(); + let mut members: BTreeSet = Default::default(); members.insert(1); members.insert(2); members.insert(3);