Skip to content

Commit

Permalink
[fix] #4140: Fix registration of new peer
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
  • Loading branch information
mversic committed Dec 12, 2023
1 parent 49e837d commit 2500149
Show file tree
Hide file tree
Showing 17 changed files with 195 additions and 99 deletions.
3 changes: 2 additions & 1 deletion cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ async fn main() -> Result<(), color_eyre::Report> {
"Hyperledgerいろは2にようこそ!(translation) Welcome to Hyperledger Iroha!"
);

assert!(args.submit_genesis || config.sumeragi.trusted_peers.peers.len() > 1,
let trusted_peers = &config.sumeragi.trusted_peers.peers;
assert!(args.submit_genesis || trusted_peers.len() > 1 || trusted_peers.len() == 1 && !trusted_peers.contains(&config.sumeragi.peer_id),
"Only peer in network, yet required to receive genesis topology. This is a configuration error."
);

Expand Down
126 changes: 88 additions & 38 deletions client/tests/integration/connected_peers.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::thread;

use eyre::{Context, Result};
use iroha_client::{
client::Client,
data_model::{
parameter::{default::MAX_TRANSACTIONS_IN_BLOCK, ParametersBuilder},
peer::Peer as DataModelPeer,
prelude::*,
},
use iroha_client::{client::Client, data_model::peer::Peer as DataModelPeer};
use iroha_data_model::{
isi::{RegisterExpr, UnregisterExpr},
IdBox,
};
use iroha_primitives::unique_vec;
use rand::{seq::SliceRandom, thread_rng, Rng};
use test_network::*;
use tokio::runtime::Runtime;

use super::Configuration;

Expand All @@ -24,11 +24,50 @@ fn connected_peers_with_f_1_0_1() -> Result<()> {
connected_peers_with_f(1, Some(11_000))
}

#[test]
fn register_new_peer() -> Result<()> {
let (_rt, network, _) = <Network>::start_test_with_runtime(4, Some(11_040));
wait_for_genesis_committed(&network.clients(), 0);
let pipeline_time = Configuration::pipeline_time();

let mut peer_clients: Vec<_> = Network::peers(&network)
.zip(Network::clients(&network))
.collect();

check_status(&peer_clients, 1);

// Start new peer
let mut configuration = Configuration::test();
configuration.sumeragi.trusted_peers.peers =
unique_vec![peer_clients.choose(&mut thread_rng()).unwrap().0.id.clone()];
let rt = Runtime::test();
let new_peer = rt.block_on(
PeerBuilder::new()
.with_configuration(configuration.clone())
.with_into_genesis(WithGenesis::None)
.with_port(10_000)
.start(),
);

let register_peer = RegisterExpr::new(DataModelPeer::new(new_peer.id.clone()));
peer_clients
.choose(&mut thread_rng())
.unwrap()
.1
.submit_blocking(register_peer)?;
peer_clients.push((&new_peer, Client::test(&new_peer.api_address)));
thread::sleep(pipeline_time * 2); // Wait for some time to allow peers to connect

check_status(&peer_clients, 2);

Ok(())
}

/// Test the number of connected peers, changing the number of faults tolerated down and up
fn connected_peers_with_f(faults: u64, start_port: Option<u16>) -> Result<()> {
let n_peers = 3 * faults + 1;

let (_rt, network, client) = <Network>::start_test_with_runtime(
let (_rt, network, _) = <Network>::start_test_with_runtime(
(n_peers)
.try_into()
.wrap_err("`faults` argument `u64` value too high, cannot convert to `u32`")?,
Expand All @@ -37,40 +76,51 @@ fn connected_peers_with_f(faults: u64, start_port: Option<u16>) -> Result<()> {
wait_for_genesis_committed(&network.clients(), 0);
let pipeline_time = Configuration::pipeline_time();

client.submit_blocking(
ParametersBuilder::new()
.add_parameter(MAX_TRANSACTIONS_IN_BLOCK, 1u32)?
.into_set_parameters(),
)?;
let mut peer_clients: Vec<_> = Network::peers(&network)
.zip(Network::clients(&network))
.collect();

// Confirm all peers connected
let mut status = client.get_status()?;
assert_eq!(status.peers, n_peers - 1);
assert_eq!(status.blocks, 2);
check_status(&peer_clients, 1);

// Unregister a peer: committed with f = `faults`
// then `status.peers` decrements
let peer = network.peers.values().last().unwrap();
let peer_client = Client::test(&peer.api_address);
let unregister_peer = UnregisterExpr::new(IdBox::PeerId(peer.id.clone()));
client.submit_blocking(unregister_peer)?;
// Unregister a peer: committed with f = `faults` then `status.peers` decrements
let removed_peer_idx = rand::thread_rng().gen_range(0..peer_clients.len());
let (removed_peer, _) = &peer_clients[removed_peer_idx];
let unregister_peer = UnregisterExpr::new(IdBox::PeerId(removed_peer.id.clone()));
peer_clients
.choose(&mut thread_rng())
.unwrap()
.1
.submit_blocking(unregister_peer)?;
thread::sleep(pipeline_time * 2); // Wait for some time to allow peers to connect
status = client.get_status()?;
assert_eq!(status.peers, n_peers - 2);
assert_eq!(status.blocks, 3);
status = peer_client.get_status()?;
let (removed_peer, removed_peer_client) = peer_clients.remove(removed_peer_idx);

check_status(&peer_clients, 2);
let status = removed_peer_client.get_status()?;
assert_eq!(status.blocks, 2);
assert_eq!(status.peers, 0);

// Re-register the peer: committed with f = `faults` - 1 then
// `status.peers` increments
let register_peer = RegisterExpr::new(DataModelPeer::new(peer.id.clone()));
client.submit_blocking(register_peer)?;
thread::sleep(pipeline_time * 4); // Wait for some time to allow peers to connect
status = client.get_status()?;
assert_eq!(status.peers, n_peers - 1);
assert_eq!(status.blocks, 4);
status = peer_client.get_status()?;
assert_eq!(status.peers, n_peers - 1);
assert_eq!(status.blocks, 4);
// Re-register the peer: committed with f = `faults` - 1 then `status.peers` increments
let register_peer = RegisterExpr::new(DataModelPeer::new(removed_peer.id.clone()));
peer_clients
.choose(&mut thread_rng())
.unwrap()
.1
.submit_blocking(register_peer)?;
peer_clients.insert(removed_peer_idx, (removed_peer, removed_peer_client));
thread::sleep(pipeline_time * 2); // Wait for some time to allow peers to connect

check_status(&peer_clients, 3);

Ok(())
}

fn check_status(peer_clients: &[(&Peer, Client)], expected_blocks: u64) {
let n_peers = peer_clients.len() as u64;

for (_, peer_client) in peer_clients {
let status = peer_client.get_status().unwrap();

assert_eq!(status.peers, n_peers - 1);
assert_eq!(status.blocks, expected_blocks);
}
}
5 changes: 1 addition & 4 deletions client/tests/integration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
pub use iroha_config::{
base::proxy::Builder,
iroha::{Configuration, ConfigurationProxy},
};
pub use iroha_config::iroha::Configuration;

mod add_account;
mod add_domain;
Expand Down
58 changes: 56 additions & 2 deletions client/tests/integration/offline_peers.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use eyre::Result;
use eyre::{Context, Result};
use iroha_client::{
client::{self, QueryResult},
client::{self, Client, QueryResult},
data_model::{
parameter::{default::MAX_TRANSACTIONS_IN_BLOCK, ParametersBuilder},
peer::Peer as DataModelPeer,
prelude::*,
},
};
use iroha_crypto::KeyPair;
use iroha_data_model::peer::PeerId;
use test_network::*;
use tokio::runtime::Runtime;

use super::Configuration;

#[test]
fn genesis_block_is_committed_with_some_offline_peers() -> Result<()> {
// Given
Expand Down Expand Up @@ -43,3 +48,52 @@ fn genesis_block_is_committed_with_some_offline_peers() -> Result<()> {
assert_eq!(AssetValue::Quantity(alice_has_roses), *asset.value());
Ok(())
}

#[test]
fn register_offline_peer() -> Result<()> {
let n_peers = 3 * 1 + 1;

let (_rt, network, client) = <Network>::start_test_with_runtime(
(n_peers)
.try_into()
.wrap_err("`faults` argument `u64` value too high, cannot convert to `u32`")?,
Some(11_080),
);
wait_for_genesis_committed(&network.clients(), 0);
let pipeline_time = Configuration::pipeline_time();

let peer_clients: Vec<_> = network
.peers
.values()
.chain(core::iter::once(&network.genesis))
.map(|peer| Client::test(&peer.api_address))
.collect();

check_status(&peer_clients, 1);

let address = "128.0.0.2:8085".parse()?;
let key_pair = KeyPair::generate().unwrap();
let public_key = key_pair.public_key().clone();
let peer_id = PeerId::new(&address, &public_key);
let register_peer = RegisterExpr::new(DataModelPeer::new(peer_id));

// Wait for some time to allow peers to connect
client.submit_blocking(register_peer)?;
std::thread::sleep(pipeline_time * 2);

// Make sure status hasn't change
check_status(&peer_clients, 2);

Ok(())
}

fn check_status(peer_clients: &[Client], expected_blocks: u64) {
let n_peers = peer_clients.len() as u64;

for peer_client in peer_clients {
let status = peer_client.get_status().unwrap();

assert_eq!(status.peers, n_peers - 1);
assert_eq!(status.blocks, expected_blocks);
}
}
2 changes: 1 addition & 1 deletion client/tests/integration/restart_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> {
let temp_dir = Arc::new(TempDir::new()?);

let mut configuration = Configuration::test();
let mut peer = <PeerBuilder>::new().with_port(10_000).build()?;
let mut peer = PeerBuilder::new().with_port(10_000).build()?;
configuration.sumeragi.trusted_peers.peers = unique_vec![peer.id.clone()];

let account_id = AccountId::from_str("alice@wonderland").unwrap();
Expand Down
37 changes: 19 additions & 18 deletions core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,27 +267,28 @@ mod valid {
topology: &Topology,
wsv: &mut WorldStateView,
) -> Result<ValidBlock, (SignedBlock, BlockValidationError)> {
let actual_commit_topology = &block.payload().commit_topology;
let expected_commit_topology = &topology.ordered_peers;

if actual_commit_topology != expected_commit_topology {
let actual_commit_topology = actual_commit_topology.clone();

return Err((
block,
BlockValidationError::TopologyMismatch {
expected: expected_commit_topology.clone(),
actual: actual_commit_topology,
},
));
}
if !block.payload().header.is_genesis() {
let actual_commit_topology = &block.payload().commit_topology;
let expected_commit_topology = &topology.ordered_peers;

if actual_commit_topology != expected_commit_topology {
let actual_commit_topology = actual_commit_topology.clone();

return Err((
block,
BlockValidationError::TopologyMismatch {
expected: expected_commit_topology.clone(),
actual: actual_commit_topology,
},
));
}

if !block.payload().header.is_genesis()
&& topology
if topology
.filter_signatures_by_roles(&[Role::Leader], block.signatures())
.is_empty()
{
return Err((block, SignatureVerificationError::LeaderMissing.into()));
{
return Err((block, SignatureVerificationError::LeaderMissing.into()));
}
}

let expected_block_height = wsv.height() + 1;
Expand Down
1 change: 0 additions & 1 deletion core/src/gossiper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ impl TransactionGossiper {
.n_random_transactions(self.gossip_batch_size, &self.wsv);

if txs.is_empty() {
iroha_logger::debug!("Nothing to gossip");
return;
}

Expand Down
23 changes: 11 additions & 12 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl Sumeragi {
&mut self,
shutdown_receiver: &mut tokio::sync::oneshot::Receiver<()>,
) -> Result<(), EarlyReturn> {
trace!("Listen for genesis");
info!(addr = %self.peer_id.address, "Listen for genesis");

loop {
std::thread::sleep(Duration::from_millis(50));
Expand Down Expand Up @@ -223,6 +223,8 @@ impl Sumeragi {
}
};

new_wsv.world_mut().trusted_peers_ids =
block.payload().commit_topology.clone();
self.commit_block(block, new_wsv);
return Err(EarlyReturn::GenesisBlockReceivedAndCommitted);
}
Expand Down Expand Up @@ -295,7 +297,7 @@ impl Sumeragi {
info!(
addr=%self.peer_id.address,
role=%self.current_topology.role(&self.peer_id),
block_height=%self.wsv.height(),
block_height=%block.payload().header.height,
block_hash=%block.hash(),
"{}", Strategy::LOG_MESSAGE,
);
Expand All @@ -313,11 +315,8 @@ impl Sumeragi {
// Parameters are updated before updating public copy of sumeragi
self.update_params();

let new_topology = Topology::recreate_topology(
block.as_ref(),
0,
self.wsv.peers_ids().iter().cloned().collect(),
);
let new_topology =
Topology::recreate_topology(block.as_ref(), 0, self.wsv.peers().cloned().collect());
let events = block.produce_events();

// https://github.com/hyperledger/iroha/issues/3396
Expand Down Expand Up @@ -801,10 +800,10 @@ pub(crate) fn run(
};
span.exit();

trace!(
me=%sumeragi.peer_id.public_key,
info!(
addr=%sumeragi.peer_id.address,
role_in_next_round=%sumeragi.current_topology.role(&sumeragi.peer_id),
"Finished sumeragi init.",
"Sumeragi initialized",
);

let mut voting_block = None;
Expand Down Expand Up @@ -1125,7 +1124,7 @@ fn handle_block_sync(
let last_committed_block = new_wsv
.latest_block_ref()
.expect("Not in genesis round so must have at least genesis block");
let new_peers = new_wsv.peers_ids().clone();
let new_peers = new_wsv.peers().cloned().collect();
let view_change_index = block.payload().header().view_change_index;
Topology::recreate_topology(&last_committed_block, view_change_index, new_peers)
};
Expand All @@ -1145,7 +1144,7 @@ fn handle_block_sync(
let last_committed_block = new_wsv
.latest_block_ref()
.expect("Not in genesis round so must have at least genesis block");
let new_peers = new_wsv.peers_ids().clone();
let new_peers = new_wsv.peers().cloned().collect();
let view_change_index = block.payload().header().view_change_index;
Topology::recreate_topology(&last_committed_block, view_change_index, new_peers)
};
Expand Down
Loading

0 comments on commit 2500149

Please sign in to comment.