Skip to content

Commit

Permalink
[fix] hyperledger#3352: Split up control flow and data messages
Browse files Browse the repository at this point in the history
Signed-off-by: Sam H. Smith <sam.henning.smith@protonmail.com>
  • Loading branch information
SamHSmith committed Jul 6, 2023
1 parent 54db8dd commit d6c004a
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 91 deletions.
2 changes: 1 addition & 1 deletion core/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ pub mod message {
for block in blocks.clone() {
block_sync.sumeragi.incoming_message(MessagePacket::new(
ProofChain::default(),
Message::BlockSyncUpdate(block.into()),
Some(Message::BlockSyncUpdate(block.into())),
));
}
}
Expand Down
206 changes: 122 additions & 84 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct Sumeragi {
pub kura: Arc<Kura>,
/// [`iroha_p2p::Network`] actor address
pub network: IrohaNetwork,
/// Receiver channel, for control flow messages.
pub control_message_receiver: mpsc::Receiver<ControlFlowMessage>,
/// Receiver channel.
pub message_receiver: mpsc::Receiver<MessagePacket>,
/// Only used in testing. Causes the genesis peer to withhold blocks when it
Expand Down Expand Up @@ -130,28 +132,51 @@ impl Sumeragi {
}

#[allow(clippy::panic)]
fn receive_network_packet(&self, view_change_proof_chain: &mut ProofChain) -> Option<Message> {
let current_topology = &self.current_topology;
match self.message_receiver.try_recv() {
Ok(packet) => {
if let Err(error) = view_change_proof_chain.merge(
packet.view_change_proofs,
&current_topology.sorted_peers,
current_topology.max_faults(),
self.wsv.latest_block_hash(),
) {
trace!(%error, "Failed to add proofs into view change proof chain")
}
Some(packet.message)
fn receive_network_packet(
&self,
view_change_proof_chain: &mut ProofChain,
control_message_in_a_row_counter: &mut usize,
) -> Option<Message> {
const MAX_CONTROL_MSG_IN_A_ROW: usize = 25;

if *control_message_in_a_row_counter < MAX_CONTROL_MSG_IN_A_ROW {
*control_message_in_a_row_counter += 1;
self.control_message_receiver
.try_recv()
.map_err(|recv_error| {
assert!(
recv_error != mpsc::TryRecvError::Disconnected,
"Sumeragi control message pump disconnected. This is not a recoverable error."
)
})
.ok()
.map(std::convert::Into::into)
} else {
None
}.or_else(|| {
*control_message_in_a_row_counter = 0;
self
.message_receiver
.try_recv()
.map_err(|recv_error| {
assert!(
recv_error != mpsc::TryRecvError::Disconnected,
"Sumeragi message pump disconnected. This is not a recoverable error."
)
})
.ok()
})
.and_then(|packet : MessagePacket| {
if let Err(error) = view_change_proof_chain.merge(
packet.view_change_proofs,
&self.current_topology.sorted_peers,
self.current_topology.max_faults(),
self.wsv.latest_block_hash(),
) {
trace!(%error, "Failed to add proofs into view change proof chain")
}
Err(recv_error) => match recv_error {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("Sumeragi message pump disconnected. This is not a recoverable error.")
// TODO: Use early return.
}
},
}
packet.message
})
}

#[allow(clippy::panic, clippy::panic_in_result_fn)]
Expand All @@ -171,62 +196,64 @@ impl Sumeragi {
// the genesis block.
match self.message_receiver.try_recv() {
Ok(packet) => {
let (block, new_wsv) = match packet.message {
Message::BlockCreated(BlockCreated { block }) => {
let mut new_wsv = self.wsv.clone();
// If we receive a committed genesis block that is
// valid, use it without question. During the
// genesis round we blindly take on the network
// topology described in the provided genesis
// block.
let block = {
let span = span!(
Level::TRACE,
"Genesis Round Peer is revalidating the block."
);
let _enter = span.enter();
if let Some(message) = packet.message {
let (block, new_wsv) = match message {
Message::BlockCreated(BlockCreated { block }) => {
let mut new_wsv = self.wsv.clone();
// If we receive a committed genesis block that is
// valid, use it without question. During the
// genesis round we blindly take on the network
// topology described in the provided genesis
// block.
let block = {
let span = span!(
Level::TRACE,
"Genesis Round Peer is revalidating the block."
);
let _enter = span.enter();
match block.revalidate(&mut new_wsv) {
Ok(()) => block,
Err(error) => {
error!(?error);
continue;
}
}
};
// Omit signature verification during genesis round
(block.commit_unchecked().into(), new_wsv)
}
Message::BlockSyncUpdate(BlockSyncUpdate { block }) => {
let mut new_wsv = self.wsv.clone();
// Omit signature verification during genesis round
match block.revalidate(&mut new_wsv) {
Ok(()) => block,
Ok(()) => (block, new_wsv),
Err(error) => {
error!(?error);
continue;
}
}
};
// Omit signature verification during genesis round
(block.commit_unchecked().into(), new_wsv)
}
Message::BlockSyncUpdate(BlockSyncUpdate { block }) => {
let mut new_wsv = self.wsv.clone();
// Omit signature verification during genesis round
match block.revalidate(&mut new_wsv) {
Ok(()) => (block, new_wsv),
Err(error) => {
error!(?error);
continue;
}
msg => {
trace!(?msg, "Not handling the message, waiting for genesis...");
continue;
}
};

if block.is_genesis() {
match &block {
VersionedCommittedBlock::V1(block) => {
assert!(
!block.transactions.iter().any(|tx| tx.error.is_some()),
"Genesis transaction set contains invalid transactions"
);
}
}
}
msg => {
trace!(?msg, "Not handling the message, waiting for genesis...");
continue;
}
};

if block.is_genesis() {
match &block {
VersionedCommittedBlock::V1(block) => {
assert!(
!block.transactions.iter().any(|tx| tx.error.is_some()),
"Genesis transaction set contains invalid transactions"
);
}
self.commit_block(block, new_wsv);
return Err(EarlyReturn::GenesisBlockReceivedAndCommitted);
}

self.commit_block(block, new_wsv);
return Err(EarlyReturn::GenesisBlockReceivedAndCommitted);
debug!("Received a block that was not genesis.");
}
debug!("Received a block that was not genesis.");
}
Err(mpsc::TryRecvError::Disconnected) => return Err(EarlyReturn::Disconnected),
_ => (),
Expand Down Expand Up @@ -279,7 +306,10 @@ impl Sumeragi {
);

self.send_events(&block);
let msg = MessagePacket::new(ProofChain::default(), BlockCreated::from(block.clone()));
let msg = MessagePacket::new(
ProofChain::default(),
Some(BlockCreated::from(block.clone()).into()),
);
self.broadcast_packet(msg);
// Omit signature verification during genesis round
self.commit_block(block.commit_unchecked(), new_wsv);
Expand Down Expand Up @@ -408,10 +438,7 @@ fn suggest_view_change(
)
.unwrap_or_else(|err| error!("{err}"));

let msg = MessagePacket::new(
view_change_proof_chain.clone(),
Message::ViewChangeSuggested,
);
let msg = MessagePacket::new(view_change_proof_chain.clone(), None);
sumeragi.broadcast_packet(msg);
}

Expand Down Expand Up @@ -442,9 +469,6 @@ fn handle_message(

#[allow(clippy::suspicious_operation_groupings)]
match (message, role) {
(Message::ViewChangeSuggested, _) => {
trace!("Received view change suggestion.");
}
(Message::BlockSyncUpdate(BlockSyncUpdate { block }), _) => {
let block_hash = block.hash();
info!(%addr, %role, hash=%block_hash, "Block sync update received");
Expand Down Expand Up @@ -531,7 +555,7 @@ fn handle_message(

let msg = MessagePacket::new(
view_change_proof_chain.clone(),
BlockSigned::from(&v_block.block),
Some(BlockSigned::from(&v_block.block).into()),
);

sumeragi.broadcast_packet_to(msg, [current_topology.proxy_tail()]);
Expand All @@ -547,7 +571,7 @@ fn handle_message(

let msg = MessagePacket::new(
view_change_proof_chain.clone(),
BlockSigned::from(&v_block.block),
Some(BlockSigned::from(&v_block.block).into()),
);

sumeragi.broadcast_packet_to(msg, [current_topology.proxy_tail()]);
Expand Down Expand Up @@ -642,7 +666,7 @@ fn process_message_independent(

let msg = MessagePacket::new(
view_change_proof_chain.clone(),
BlockCreated::from(new_block),
Some(BlockCreated::from(new_block).into()),
);
if current_view_change_index >= 1 {
sumeragi.broadcast_packet(msg);
Expand All @@ -660,9 +684,14 @@ fn process_message_independent(
Ok(committed_block) => {
let msg = MessagePacket::new(
view_change_proof_chain.clone(),
BlockCommitted::from(Into::<VersionedCommittedBlock>::into(
committed_block.clone(),
)),
Some(
BlockCommitted::from(
Into::<VersionedCommittedBlock>::into(
committed_block.clone(),
),
)
.into(),
),
);

sumeragi.broadcast_packet(msg);
Expand All @@ -685,9 +714,12 @@ fn process_message_independent(

let msg = MessagePacket::new(
view_change_proof_chain.clone(),
BlockCommitted::from(Into::<VersionedCommittedBlock>::into(
committed_block.clone(),
)),
Some(
BlockCommitted::from(Into::<VersionedCommittedBlock>::into(
committed_block.clone(),
))
.into(),
),
);

#[cfg(debug_assertions)]
Expand Down Expand Up @@ -838,6 +870,9 @@ pub(crate) fn run(
// Instant when the previous view change or round happened.
let mut last_view_change_time = Instant::now();

// Internal variable used to pick receiver channel. Initialize to zero.
let mut control_message_in_a_row_counter = 0;

while !should_terminate(&mut shutdown_receiver) {
if should_sleep {
let span = span!(Level::TRACE, "main_thread_sleep");
Expand Down Expand Up @@ -918,7 +953,10 @@ pub(crate) fn run(
}

sumeragi
.receive_network_packet(&mut view_change_proof_chain)
.receive_network_packet(
&mut view_change_proof_chain,
&mut control_message_in_a_row_counter,
)
.map_or_else(
|| {
should_sleep = true;
Expand Down
25 changes: 20 additions & 5 deletions core/src/sumeragi/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,20 @@ pub struct MessagePacket {
/// peers which agree with view change should sign it.
pub view_change_proofs: view_change::ProofChain,
/// Actual Sumeragi message in this packet.
pub message: Message,
pub message: Option<Message>,
}

impl MessagePacket {
/// Construct [`Self`]
pub fn new(view_change_proofs: view_change::ProofChain, message: impl Into<Message>) -> Self {
pub fn new(view_change_proofs: view_change::ProofChain, message: Option<Message>) -> Self {
Self {
view_change_proofs,
message: message.into(),
message,
}
}
}

#[allow(clippy::enum_variant_names)]
/// Message's variants that are used by peers to communicate in the process of consensus.
#[derive(Debug, Clone, Decode, Encode, FromVariant)]
pub enum Message {
Expand All @@ -72,8 +73,22 @@ pub enum Message {
BlockCommitted(BlockCommitted),
/// This message is sent by `BlockSync` when new block is received
BlockSyncUpdate(BlockSyncUpdate),
/// View change is suggested due to some faulty peer or general fault in consensus.
ViewChangeSuggested,
}

/// Specialization of `MessagePacket`
pub struct ControlFlowMessage {
/// Proof of view change. As part of this message handling, all
/// peers which agree with view change should sign it.
pub view_change_proofs: view_change::ProofChain,
}

impl From<ControlFlowMessage> for MessagePacket {
fn from(m: ControlFlowMessage) -> MessagePacket {
MessagePacket {
view_change_proofs: m.view_change_proofs,
message: None,
}
}
}

/// `BlockCreated` message structure.
Expand Down
Loading

0 comments on commit d6c004a

Please sign in to comment.