From 12e053324c5e5c10092e610841936d375024951d Mon Sep 17 00:00:00 2001 From: Akira Hayakawa Date: Fri, 24 May 2024 07:39:30 +0000 Subject: [PATCH] refactor --- lolraft/src/process/command_log/mod.rs | 2 +- lolraft/src/process/peer_svc/replication.rs | 11 ++++------- lolraft/src/process/thread/replication.rs | 5 ++--- tests/env/tests/tests.rs | 2 +- tests/lol-tests/tests/multi_raft.rs | 3 +-- tests/testapp/src/lib.rs | 4 +--- 6 files changed, 10 insertions(+), 17 deletions(-) diff --git a/lolraft/src/process/command_log/mod.rs b/lolraft/src/process/command_log/mod.rs index 658a1c86..ccc0bb59 100644 --- a/lolraft/src/process/command_log/mod.rs +++ b/lolraft/src/process/command_log/mod.rs @@ -106,7 +106,7 @@ impl Inner { // If the same snapshot already exists, we can skip the insertion. if new_snapshot_index == cur_snapshot_index { - return Ok(()) + return Ok(()); } self.storage.insert_entry(new_snapshot_index, e).await?; diff --git a/lolraft/src/process/peer_svc/replication.rs b/lolraft/src/process/peer_svc/replication.rs index b1668b29..38a24bfe 100644 --- a/lolraft/src/process/peer_svc/replication.rs +++ b/lolraft/src/process/peer_svc/replication.rs @@ -31,7 +31,7 @@ impl PeerSvc { }) } - pub async fn advance_replication(&self, follower_id: NodeId) -> Result { + pub async fn advance_replication(&self, follower_id: NodeId) -> Result<()> { let peer_context = self .peer_contexts .read() @@ -43,10 +43,7 @@ impl PeerSvc { let cur_last_log_index = self.command_log.get_log_last_index().await?; // More entries to send? - let should_send = old_progress.next_index <= cur_last_log_index; - if !should_send { - return Ok(false); - } + ensure!(old_progress.next_index <= cur_last_log_index); // The entries to be sent may be deleted due to a previous compaction. // In this case, replication will reset from the current snapshot index. @@ -62,7 +59,7 @@ impl PeerSvc { .get_mut(&follower_id) .context(Error::PeerNotFound(follower_id.clone()))? .progress = new_progress; - return Ok(true); + return Ok(()); } let n_max_possible = cur_last_log_index - old_progress.next_index + 1; @@ -107,6 +104,6 @@ impl PeerSvc { .context(Error::PeerNotFound(follower_id.clone()))? .progress = new_progress; - Ok(true) + Ok(()) } } diff --git a/lolraft/src/process/thread/replication.rs b/lolraft/src/process/thread/replication.rs index 1d1cfdab..b9f1be00 100644 --- a/lolraft/src/process/thread/replication.rs +++ b/lolraft/src/process/thread/replication.rs @@ -13,12 +13,11 @@ impl Thread { return Ok(false); } - let cont = self - .peers + self.peers .advance_replication(self.follower_id.clone()) .await?; - Ok(cont) + Ok(true) } fn do_loop(self) -> ThreadHandle { diff --git a/tests/env/tests/tests.rs b/tests/env/tests/tests.rs index af012e55..0f93c1b7 100644 --- a/tests/env/tests/tests.rs +++ b/tests/env/tests/tests.rs @@ -61,4 +61,4 @@ async fn drop_env() -> Result<()> { } Ok(()) -} \ No newline at end of file +} diff --git a/tests/lol-tests/tests/multi_raft.rs b/tests/lol-tests/tests/multi_raft.rs index 49cb5c36..c60fbaeb 100644 --- a/tests/lol-tests/tests/multi_raft.rs +++ b/tests/lol-tests/tests/multi_raft.rs @@ -30,11 +30,10 @@ async fn N3_L100_K3_multi_raft_cluster() -> Result<()> { futs.push(fut); } futures::future::try_join_all(futs).await?; - + Ok(()) } - #[serial] #[tokio::test(flavor = "multi_thread")] async fn N1_L100_K3_multi_raft_io() -> Result<()> { diff --git a/tests/testapp/src/lib.rs b/tests/testapp/src/lib.rs index ff23a59c..07f6c8be 100644 --- a/tests/testapp/src/lib.rs +++ b/tests/testapp/src/lib.rs @@ -76,9 +76,7 @@ impl Client { use tokio_retry::Retry; // 200ms, 400, 800, 1600, 3200 - let strategy = ExponentialBackoff::from_millis(2) - .factor(100) - .take(8); + let strategy = ExponentialBackoff::from_millis(2).factor(100).take(8); let fut = Retry::spawn(strategy, || { let mut cli = self.cli.clone();