Skip to content

Commit

Permalink
[fix] hyperledger-iroha#3352: Split up control flow and data messages
Browse files Browse the repository at this point in the history
Signed-off-by: Sam H. Smith <sam.henning.smith@protonmail.com>
  • Loading branch information
SamHSmith committed Jun 26, 2023
1 parent eb4a876 commit 8685901
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 21 deletions.
77 changes: 57 additions & 20 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct Sumeragi {
pub kura: Arc<Kura>,
/// [`iroha_p2p::Network`] actor address
pub network: IrohaNetwork,
/// Receiver channel, for control flow messages.
pub control_message_receiver: mpsc::Receiver<MessagePacket>,
/// Receiver channel.
pub message_receiver: mpsc::Receiver<MessagePacket>,
/// Only used in testing. Causes the genesis peer to withhold blocks when it
Expand Down Expand Up @@ -130,28 +132,57 @@ impl Sumeragi {
}

#[allow(clippy::panic)]
fn receive_network_packet(&self, view_change_proof_chain: &mut ProofChain) -> Option<Message> {
let current_topology = &self.current_topology;
match self.message_receiver.try_recv() {
Ok(packet) => {
if let Err(error) = view_change_proof_chain.merge(
packet.view_change_proofs,
&current_topology.sorted_peers,
current_topology.max_faults(),
self.wsv.latest_block_hash(),
) {
trace!(%error, "Failed to add proofs into view change proof chain")
}
Some(packet.message)
fn receive_network_packet(
&self,
view_change_proof_chain: &mut ProofChain,
control_message_in_a_row_counter: &mut usize,
) -> Option<Message> {
const MAX_CONTROL_MSG_IN_A_ROW: usize = 25;

let mut packet = if *control_message_in_a_row_counter < MAX_CONTROL_MSG_IN_A_ROW {
*control_message_in_a_row_counter += 1;
match self.control_message_receiver.try_recv() {
Ok(packet) => Some(packet),
Err(recv_error) => match recv_error {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("Sumeragi control message pump disconnected. This is not a recoverable error.")
// TODO: Use early return.
}
},
}
Err(recv_error) => match recv_error {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("Sumeragi message pump disconnected. This is not a recoverable error.")
// TODO: Use early return.
} else {
None
};

if packet.is_none() {
*control_message_in_a_row_counter = 0;
packet = match self.message_receiver.try_recv() {
Ok(packet) => Some(packet),
Err(recv_error) => {
match recv_error {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("Sumeragi message pump disconnected. This is not a recoverable error.")
// TODO: Use early return.
}
}
}
},
}
}

packet.map(|packet| {
if let Err(error) = view_change_proof_chain.merge(
packet.view_change_proofs,
&self.current_topology.sorted_peers,
self.current_topology.max_faults(),
self.wsv.latest_block_hash(),
) {
trace!(%error, "Failed to add proofs into view change proof chain")
}

packet.message
})
}

#[allow(clippy::panic, clippy::panic_in_result_fn)]
Expand Down Expand Up @@ -832,6 +863,9 @@ pub(crate) fn run(
// Instant when the previous view change or round happened.
let mut last_view_change_time = Instant::now();

// Internal variable used to pick receiver channel. Initialize to zero.
let mut control_message_in_a_row_counter = 0;

while !should_terminate(&mut shutdown_receiver) {
if should_sleep {
let span = span!(Level::TRACE, "main_thread_sleep");
Expand Down Expand Up @@ -912,7 +946,10 @@ pub(crate) fn run(
}

sumeragi
.receive_network_packet(&mut view_change_proof_chain)
.receive_network_packet(
&mut view_change_proof_chain,
&mut control_message_in_a_row_counter,
)
.map_or_else(
|| {
should_sleep = true;
Expand Down
11 changes: 10 additions & 1 deletion core/src/sumeragi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct SumeragiHandle {
queue: Arc<Queue>,
_thread_handle: Arc<ThreadHandler>,
// Should be dropped after `_thread_handle` to prevent sumeargi thread from panicking
control_message_sender: mpsc::SyncSender<MessagePacket>,
message_sender: mpsc::SyncSender<MessagePacket>,
}

Expand Down Expand Up @@ -186,7 +187,12 @@ impl SumeragiHandle {

/// Deposit a sumeragi network message.
pub fn incoming_message(&self, msg: MessagePacket) {
if let Err(error) = self.message_sender.try_send(msg) {
if matches!(&msg.message, Message::ViewChangeSuggested) {
if let Err(error) = self.control_message_sender.try_send(msg) {
self.metrics.dropped_messages.inc();
error!(?error, "This peer is faulty. Incoming control messages have to be dropped due to low processing speed.");
}
} else if let Err(error) = self.message_sender.try_send(msg) {
self.metrics.dropped_messages.inc();
error!(?error, "This peer is faulty. Incoming messages have to be dropped due to low processing speed.");
}
Expand All @@ -208,6 +214,7 @@ impl SumeragiHandle {
block_hashes,
}: SumeragiStartArgs,
) -> SumeragiHandle {
let (control_message_sender, control_message_receiver) = mpsc::sync_channel(100);
let (message_sender, message_receiver) = mpsc::sync_channel(100);

for (block_hash, i) in block_hashes
Expand Down Expand Up @@ -270,6 +277,7 @@ impl SumeragiHandle {
max_txs_in_block: configuration.max_transactions_in_block as usize,
kura: Arc::clone(&kura),
network: network.clone(),
control_message_receiver,
message_receiver,
debug_force_soft_fork,
current_topology,
Expand Down Expand Up @@ -299,6 +307,7 @@ impl SumeragiHandle {
network,
queue,
kura,
control_message_sender,
message_sender,
public_wsv_receiver,
metrics: Metrics::default(),
Expand Down

0 comments on commit 8685901

Please sign in to comment.