From 8f2d83a44d3437aa367d65b4f178c7ebbea5040c Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 20 Nov 2020 16:42:26 +0100 Subject: [PATCH 1/3] send_message should not return an error --- node/collation-generation/src/lib.rs | 5 +- node/core/av-store/src/lib.rs | 4 +- node/core/candidate-validation/src/lib.rs | 2 +- node/core/proposer/src/lib.rs | 4 +- .../availability-distribution/src/lib.rs | 120 +++++++----------- node/network/bitfield-distribution/src/lib.rs | 84 ++++++------ node/network/bridge/src/lib.rs | 64 ++-------- .../collator-protocol/src/collator_side.rs | 48 +++---- node/network/collator-protocol/src/lib.rs | 6 +- .../collator-protocol/src/validator_side.rs | 74 ++++------- node/network/pov-distribution/src/lib.rs | 88 ++++++------- node/network/pov-distribution/src/tests.rs | 28 ++-- .../network/statement-distribution/src/lib.rs | 89 ++++++------- node/overseer/examples/minimal-example.rs | 2 +- node/overseer/src/lib.rs | 119 ++++++++++------- node/subsystem-test-helpers/src/lib.rs | 9 +- node/subsystem-util/src/lib.rs | 20 +-- .../subsystem-util/src/validator_discovery.rs | 4 +- node/subsystem/src/lib.rs | 4 +- node/test/service/src/lib.rs | 6 +- .../adder/collator/src/main.rs | 6 +- 21 files changed, 327 insertions(+), 459 deletions(-) diff --git a/node/collation-generation/src/lib.rs b/node/collation-generation/src/lib.rs index ad28b66f0783..b948f3d3c81d 100644 --- a/node/collation-generation/src/lib.rs +++ b/node/collation-generation/src/lib.rs @@ -95,10 +95,7 @@ impl CollationGenerationSubsystem { }, msg = receiver.next().fuse() => { if let Some(msg) = msg { - if let Err(err) = ctx.send_message(msg).await { - tracing::warn!(target: LOG_TARGET, err = ?err, "failed to forward message to overseer"); - break; - } + ctx.send_message(msg).await; } }, } diff --git a/node/core/av-store/src/lib.rs b/node/core/av-store/src/lib.rs index 8c20e351f977..72c1d9cb4cca 100644 --- a/node/core/av-store/src/lib.rs +++ b/node/core/av-store/src/lib.rs @@ -690,7 +690,7 @@ where RuntimeApiRequest::CandidateEvents(tx), )); - ctx.send_message(msg.into()).await?; + ctx.send_message(msg.into()).await; Ok(rx.await??) } @@ -858,7 +858,7 @@ where { let (tx, rx) = oneshot::channel(); - ctx.send_message(AllMessages::ChainApi(ChainApiMessage::BlockNumber(block_hash, tx))).await?; + ctx.send_message(AllMessages::ChainApi(ChainApiMessage::BlockNumber(block_hash, tx))).await; Ok(rx.await??.map(|number| number).unwrap_or_default()) } diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 011b156ffddb..103e27ecd883 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -171,7 +171,7 @@ async fn runtime_api_request( relay_parent, request, )) - ).await?; + ).await; receiver.await.map_err(Into::into) } diff --git a/node/core/proposer/src/lib.rs b/node/core/proposer/src/lib.rs index 82507bdbf8c1..813d937a7058 100644 --- a/node/core/proposer/src/lib.rs +++ b/node/core/proposer/src/lib.rs @@ -143,13 +143,13 @@ where let (sender, receiver) = futures::channel::oneshot::channel(); - overseer.wait_for_activation(parent_header_hash, sender).await?; + overseer.wait_for_activation(parent_header_hash, sender).await; receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)??; let (sender, receiver) = futures::channel::oneshot::channel(); overseer.send_msg(AllMessages::Provisioner( ProvisionerMessage::RequestInherentData(parent_header_hash, sender), - )).await?; + )).await; let mut timeout = futures_timer::Delay::new(PROPOSE_TIMEOUT).fuse(); diff --git a/node/network/availability-distribution/src/lib.rs b/node/network/availability-distribution/src/lib.rs index 920075e0425e..53b98826d036 100644 --- a/node/network/availability-distribution/src/lib.rs +++ b/node/network/availability-distribution/src/lib.rs @@ -56,62 +56,40 @@ const LOG_TARGET: &'static str = "availability_distribution"; #[derive(Debug, Error)] enum Error { - #[error("Sending PendingAvailability query failed")] - QueryPendingAvailabilitySendQuery(#[source] SubsystemError), #[error("Response channel to obtain PendingAvailability failed")] QueryPendingAvailabilityResponseChannel(#[source] oneshot::Canceled), #[error("RuntimeAPI to obtain PendingAvailability failed")] QueryPendingAvailability(#[source] RuntimeApiError), - #[error("Sending StoreChunk query failed")] - StoreChunkSendQuery(#[source] SubsystemError), #[error("Response channel to obtain StoreChunk failed")] StoreChunkResponseChannel(#[source] oneshot::Canceled), - #[error("Sending QueryChunk query failed")] - QueryChunkSendQuery(#[source] SubsystemError), #[error("Response channel to obtain QueryChunk failed")] QueryChunkResponseChannel(#[source] oneshot::Canceled), - #[error("Sending QueryAncestors query failed")] - QueryAncestorsSendQuery(#[source] SubsystemError), #[error("Response channel to obtain QueryAncestors failed")] QueryAncestorsResponseChannel(#[source] oneshot::Canceled), #[error("RuntimeAPI to obtain QueryAncestors failed")] QueryAncestors(#[source] ChainApiError), - #[error("Sending QuerySession query failed")] - QuerySessionSendQuery(#[source] SubsystemError), #[error("Response channel to obtain QuerySession failed")] QuerySessionResponseChannel(#[source] oneshot::Canceled), #[error("RuntimeAPI to obtain QuerySession failed")] QuerySession(#[source] RuntimeApiError), - #[error("Sending QueryValidators query failed")] - QueryValidatorsSendQuery(#[source] SubsystemError), #[error("Response channel to obtain QueryValidators failed")] QueryValidatorsResponseChannel(#[source] oneshot::Canceled), #[error("RuntimeAPI to obtain QueryValidators failed")] QueryValidators(#[source] RuntimeApiError), - #[error("Sending AvailabilityCores query failed")] - AvailabilityCoresSendQuery(#[source] SubsystemError), #[error("Response channel to obtain AvailabilityCores failed")] AvailabilityCoresResponseChannel(#[source] oneshot::Canceled), #[error("RuntimeAPI to obtain AvailabilityCores failed")] AvailabilityCores(#[source] RuntimeApiError), - #[error("Sending AvailabilityCores query failed")] - QueryAvailabilitySendQuery(#[source] SubsystemError), #[error("Response channel to obtain AvailabilityCores failed")] QueryAvailabilityResponseChannel(#[source] oneshot::Canceled), - #[error("Sending out a peer report message")] - ReportPeerMessageSend(#[source] SubsystemError), - - #[error("Sending a gossip message")] - TrackedGossipMessage(#[source] SubsystemError), - #[error("Receive channel closed")] IncomingMessageChannel(#[source] SubsystemError), } @@ -290,7 +268,7 @@ impl ProtocolState { } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - fn remove_relay_parent(&mut self, relay_parent: &Hash) -> Result<()> { + fn remove_relay_parent(&mut self, relay_parent: &Hash) { // we might be ancestor of some other relay_parent if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) { // if we were the last user, and it is @@ -324,7 +302,6 @@ impl ProtocolState { } } } - Ok(()) } } @@ -351,7 +328,7 @@ where state.peer_views.remove(&peerid); } NetworkBridgeEvent::PeerViewChange(peerid, view) => { - handle_peer_view_change(ctx, state, peerid, view, metrics).await?; + handle_peer_view_change(ctx, state, peerid, view, metrics).await; } NetworkBridgeEvent::OurViewChange(view) => { handle_our_view_change(ctx, keystore, state, view, metrics).await?; @@ -472,14 +449,14 @@ where }; send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message) - .await?; + .await; } } // cleanup the removed relay parents and their states let removed = old_view.difference(&view).collect::>(); for removed in removed { - state.remove_relay_parent(&removed)?; + state.remove_relay_parent(&removed); } Ok(()) } @@ -491,7 +468,7 @@ async fn send_tracked_gossip_message_to_peers( metrics: &Metrics, peers: Vec, message: AvailabilityGossipMessage, -) -> Result<()> +) where Context: SubsystemContext, { @@ -506,7 +483,7 @@ async fn send_tracked_gossip_messages_to_peer( metrics: &Metrics, peer: PeerId, message_iter: impl IntoIterator, -) -> Result<()> +) where Context: SubsystemContext, { @@ -521,12 +498,12 @@ async fn send_tracked_gossip_messages_to_peers( metrics: &Metrics, peers: Vec, message_iter: impl IntoIterator, -) -> Result<()> +) where Context: SubsystemContext, { if peers.is_empty() { - return Ok(()); + return; } for message in message_iter { for peer in peers.iter() { @@ -553,13 +530,10 @@ where protocol_v1::ValidationProtocol::AvailabilityDistribution(wire_message), ), )) - .await - .map_err(|e| Error::TrackedGossipMessage(e))?; + .await; metrics.on_chunk_distributed(); } - - Ok(()) } // Send the difference between two views which were not sent @@ -571,7 +545,7 @@ async fn handle_peer_view_change( origin: PeerId, view: View, metrics: &Metrics, -) -> Result<()> +) where Context: SubsystemContext, { @@ -616,9 +590,8 @@ where .collect::>(); send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages) - .await?; + .await; } - Ok(()) } /// Obtain the first key which has a signing key. @@ -662,7 +635,8 @@ where let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash) { live_candidate } else { - return modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await; + modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await; + return Ok(()); }; // check the merkle proof @@ -674,12 +648,14 @@ where ) { hash } else { - return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; + modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; + return Ok(()); }; let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk); if anticipated_hash != erasure_chunk_hash { - return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; + modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; + return Ok(()); } // an internal unique identifier of this message @@ -695,7 +671,8 @@ where .entry(origin.clone()) .or_default(); if received_set.contains(&message_id) { - return modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await; + modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await; + return Ok(()); } else { received_set.insert(message_id.clone()); } @@ -707,9 +684,9 @@ where .insert(message_id.1, message.clone()) .is_some() { - modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?; + modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await; } else { - modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await?; + modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await; // save the chunk for our index if let Some(validator_index) = per_candidate.validator_index { @@ -762,7 +739,8 @@ where .collect::>(); // gossip that message to interested peers - send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await + send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await; + Ok(()) } /// The bitfield distribution subsystem. @@ -947,8 +925,7 @@ where relay_parent, RuntimeApiRequest::AvailabilityCores(tx), ))) - .await - .map_err(|e| Error::AvailabilityCoresSendQuery(e))?; + .await; let all_para_ids: Vec<_> = rx .await @@ -970,7 +947,7 @@ where /// Modify the reputation of a peer based on its behavior. #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] -async fn modify_reputation(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()> +async fn modify_reputation(ctx: &mut Context, peer: PeerId, rep: Rep) where Context: SubsystemContext, { @@ -982,9 +959,7 @@ where ); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep), - )) - .await - .map_err(|e| Error::ReportPeerMessageSend(e)) + )).await; } /// Query the proof of validity for a particular candidate hash. @@ -996,9 +971,8 @@ where let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::AvailabilityStore( AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx), - )) - .await - .map_err(|e| Error::QueryAvailabilitySendQuery(e))?; + )).await; + rx.await .map_err(|e| Error::QueryAvailabilityResponseChannel(e)) } @@ -1015,9 +989,8 @@ where let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::AvailabilityStore( AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx), - )) - .await - .map_err(|e| Error::QueryChunkSendQuery(e))?; + )).await; + rx.await.map_err(|e| Error::QueryChunkResponseChannel(e)) } @@ -1033,17 +1006,15 @@ where Context: SubsystemContext, { let (tx, rx) = oneshot::channel(); - ctx.send_message( - AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreChunk { - candidate_hash, - relay_parent, - validator_index, - chunk: erasure_chunk, - tx, - } - )).await - .map_err(|e| Error::StoreChunkSendQuery(e))?; + ctx.send_message(AllMessages::AvailabilityStore( + AvailabilityStoreMessage::StoreChunk { + candidate_hash, + relay_parent, + validator_index, + chunk: erasure_chunk, + tx, + } + )).await; rx.await.map_err(|e| Error::StoreChunkResponseChannel(e)) } @@ -1062,9 +1033,7 @@ where ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::CandidatePendingAvailability(para, tx), - ))) - .await - .map_err(|e| Error::QueryPendingAvailabilitySendQuery(e))?; + ))).await; rx.await .map_err(|e| Error::QueryPendingAvailabilityResponseChannel(e))? @@ -1087,8 +1056,7 @@ where )); ctx.send_message(query_validators) - .await - .map_err(|e| Error::QueryValidatorsSendQuery(e))?; + .await; rx.await .map_err(|e| Error::QueryValidatorsResponseChannel(e))? .map_err(|e| Error::QueryValidators(e)) @@ -1112,8 +1080,7 @@ where }); ctx.send_message(query_ancestors) - .await - .map_err(|e| Error::QueryAncestorsSendQuery(e))?; + .await; rx.await .map_err(|e| Error::QueryAncestorsResponseChannel(e))? .map_err(|e| Error::QueryAncestors(e)) @@ -1135,8 +1102,7 @@ where )); ctx.send_message(query_session_idx_for_child) - .await - .map_err(|e| Error::QuerySessionSendQuery(e))?; + .await; rx.await .map_err(|e| Error::QuerySessionResponseChannel(e))? .map_err(|e| Error::QuerySession(e)) diff --git a/node/network/bitfield-distribution/src/lib.rs b/node/network/bitfield-distribution/src/lib.rs index 680c450bd53e..2925953546e1 100644 --- a/node/network/bitfield-distribution/src/lib.rs +++ b/node/network/bitfield-distribution/src/lib.rs @@ -163,24 +163,20 @@ impl BitfieldDistribution { msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability), } => { tracing::trace!(target: LOG_TARGET, "Processing DistributeBitfield"); - if let Err(err) = handle_bitfield_distribution( + handle_bitfield_distribution( &mut ctx, &mut state, &self.metrics, hash, signed_availability, - ).await { - tracing::warn!(target: LOG_TARGET, err = ?err, "Failed to reply to `DistributeBitfield` message"); - } + ).await; } FromOverseer::Communication { msg: BitfieldDistributionMessage::NetworkBridgeUpdateV1(event), } => { tracing::trace!(target: LOG_TARGET, "Processing NetworkMessage"); // a network message was received - if let Err(e) = handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await { - tracing::warn!(target: LOG_TARGET, err = ?e, "Failed to handle incoming network messages"); - } + handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await; } FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => { let _timer = self.metrics.time_active_leaves_update(); @@ -234,7 +230,7 @@ async fn modify_reputation( ctx: &mut Context, peer: PeerId, rep: ReputationChange, -) -> SubsystemResult<()> +) where Context: SubsystemContext, { @@ -255,7 +251,7 @@ async fn handle_bitfield_distribution( metrics: &Metrics, relay_parent: Hash, signed_availability: SignedAvailabilityBitfield, -) -> SubsystemResult<()> +) where Context: SubsystemContext, { @@ -272,12 +268,12 @@ where "Not supposed to work on relay parent related data", ); - return Ok(()); + return; }; let validator_set = &job_data.validator_set; if validator_set.is_empty() { tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "validator set is empty"); - return Ok(()); + return; } let validator_index = signed_availability.validator_index() as usize; @@ -285,7 +281,7 @@ where validator.clone() } else { tracing::trace!(target: LOG_TARGET, "Could not find a validator for index {}", validator_index); - return Ok(()); + return; }; let peer_views = &mut state.peer_views; @@ -294,11 +290,9 @@ where signed_availability, }; - relay_message(ctx, job_data, peer_views, validator, msg).await?; + relay_message(ctx, job_data, peer_views, validator, msg).await; metrics.on_own_bitfield_gossipped(); - - Ok(()) } /// Distribute a given valid and signature checked bitfield message. @@ -311,7 +305,7 @@ async fn relay_message( peer_views: &mut HashMap, validator: ValidatorId, message: BitfieldGossipMessage, -) -> SubsystemResult<()> +) where Context: SubsystemContext, { @@ -325,7 +319,7 @@ where ), ), )) - .await?; + .await; let message_sent_to_peer = &mut (job_data.message_sent_to_peer); @@ -361,9 +355,8 @@ where message.into_validation_protocol(), ), )) - .await?; + .await; } - Ok(()) } /// Handle an incoming message from a peer. @@ -374,13 +367,14 @@ async fn process_incoming_peer_message( metrics: &Metrics, origin: PeerId, message: BitfieldGossipMessage, -) -> SubsystemResult<()> +) where Context: SubsystemContext, { // we don't care about this, not part of our view. if !state.view.contains(&message.relay_parent) { - return modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await; + modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await; + return; } // Ignore anything the overseer did not tell this subsystem to work on. @@ -388,7 +382,8 @@ where let job_data: &mut _ = if let Some(ref mut job_data) = job_data { job_data } else { - return modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await; + modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await; + return; }; let validator_set = &job_data.validator_set; @@ -398,7 +393,8 @@ where relay_parent = %message.relay_parent, "Validator set is empty", ); - return modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await; + modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await; + return; } // Use the (untrusted) validator index provided by the signed payload @@ -408,7 +404,8 @@ where let validator = if let Some(validator) = validator_set.get(validator_index) { validator.clone() } else { - return modify_reputation(ctx, origin, COST_VALIDATOR_INDEX_INVALID).await; + modify_reputation(ctx, origin, COST_VALIDATOR_INDEX_INVALID).await; + return; }; // Check if the peer already sent us a message for the validator denoted in the message earlier. @@ -422,7 +419,8 @@ where if !received_set.contains(&validator) { received_set.insert(validator.clone()); } else { - return modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await; + modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await; + return; }; if message @@ -440,12 +438,12 @@ where validator_index, "already received a message for validator", ); - modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?; - return Ok(()); + modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await; + return; } one_per_validator.insert(validator.clone(), message.clone()); - relay_message(ctx, job_data, &mut state.peer_views, validator, message).await?; + relay_message(ctx, job_data, &mut state.peer_views, validator, message).await; modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await } else { @@ -461,7 +459,7 @@ async fn handle_network_msg( state: &mut ProtocolState, metrics: &Metrics, bridge_message: NetworkBridgeEvent, -) -> SubsystemResult<()> +) where Context: SubsystemContext, { @@ -477,10 +475,10 @@ where state.peer_views.remove(&peerid); } NetworkBridgeEvent::PeerViewChange(peerid, view) => { - handle_peer_view_change(ctx, state, peerid, view).await?; + handle_peer_view_change(ctx, state, peerid, view).await; } NetworkBridgeEvent::OurViewChange(view) => { - handle_our_view_change(state, view)?; + handle_our_view_change(state, view); } NetworkBridgeEvent::PeerMessage(remote, message) => { match message { @@ -490,17 +488,16 @@ where relay_parent, signed_availability: bitfield, }; - process_incoming_peer_message(ctx, state, metrics, remote, gossiped_bitfield).await?; + process_incoming_peer_message(ctx, state, metrics, remote, gossiped_bitfield).await; } } } } - Ok(()) } /// Handle the changes necassary when our view changes. #[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))] -fn handle_our_view_change(state: &mut ProtocolState, view: View) -> SubsystemResult<()> { +fn handle_our_view_change(state: &mut ProtocolState, view: View) { let old_view = std::mem::replace(&mut (state.view), view); for added in state.view.difference(&old_view) { @@ -517,7 +514,6 @@ fn handle_our_view_change(state: &mut ProtocolState, view: View) -> SubsystemRes // cleanup relay parents we are not interested in any more let _ = state.per_relay_parent.remove(&removed); } - Ok(()) } @@ -529,7 +525,7 @@ async fn handle_peer_view_change( state: &mut ProtocolState, origin: PeerId, view: View, -) -> SubsystemResult<()> +) where Context: SubsystemContext, { @@ -567,10 +563,8 @@ where .collect(); for (validator, message) in delta_set.into_iter() { - send_tracked_gossip_message(ctx, state, origin.clone(), validator, message).await?; + send_tracked_gossip_message(ctx, state, origin.clone(), validator, message).await; } - - Ok(()) } /// Send a gossip message and track it in the per relay parent data. @@ -581,14 +575,14 @@ async fn send_tracked_gossip_message( dest: PeerId, validator: ValidatorId, message: BitfieldGossipMessage, -) -> SubsystemResult<()> +) where Context: SubsystemContext, { let job_data = if let Some(job_data) = state.per_relay_parent.get_mut(&message.relay_parent) { job_data } else { - return Ok(()); + return; }; let message_sent_to_peer = &mut (job_data.message_sent_to_peer); @@ -602,10 +596,7 @@ where vec![dest], message.into_validation_protocol(), ), - )) - .await?; - - Ok(()) + )).await; } impl Subsystem for BitfieldDistribution @@ -647,7 +638,7 @@ where )); ctx.send_messages(std::iter::once(query_validators).chain(std::iter::once(query_signing))) - .await?; + .await; match (validators_rx.await?, session_rx.await?) { (Ok(v), Ok(s)) => Ok(Some(( @@ -788,7 +779,6 @@ mod test { .timeout(Duration::from_millis(10)) .await .expect("10ms is more than enough for sending messages.") - .expect("Error values should really never occur.") }; } diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index ed58cd90976e..7e05595d0b55 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -374,21 +374,15 @@ async fn update_view( WireMessage::ViewUpdate(new_view.clone()), ).await?; - if let Err(e) = dispatch_validation_event_to_all( + dispatch_validation_event_to_all( NetworkBridgeEvent::OurViewChange(new_view.clone()), ctx, - ).await { - tracing::warn!(target: LOG_TARGET, err = ?e, "Aborting - Failure to dispatch messages to overseer"); - return Err(e) - } + ).await; - if let Err(e) = dispatch_collation_event_to_all( + dispatch_collation_event_to_all( NetworkBridgeEvent::OurViewChange(new_view.clone()), ctx, - ).await { - tracing::warn!(target: LOG_TARGET, err = ?e, "Aborting - Failure to dispatch messages to overseer"); - return Err(e) - } + ).await; Ok(()) } @@ -507,14 +501,14 @@ async fn send_message( async fn dispatch_validation_event_to_all( event: NetworkBridgeEvent, ctx: &mut impl SubsystemContext, -) -> SubsystemResult<()> { +) { dispatch_validation_events_to_all(std::iter::once(event), ctx).await } async fn dispatch_collation_event_to_all( event: NetworkBridgeEvent, ctx: &mut impl SubsystemContext, -) -> SubsystemResult<()> { +) { dispatch_collation_events_to_all(std::iter::once(event), ctx).await } @@ -522,7 +516,7 @@ async fn dispatch_collation_event_to_all( async fn dispatch_validation_events_to_all( events: I, ctx: &mut impl SubsystemContext, -) -> SubsystemResult<()> +) where I: IntoIterator>, I::IntoIter: Send, @@ -554,7 +548,7 @@ async fn dispatch_validation_events_to_all( async fn dispatch_collation_events_to_all( events: I, ctx: &mut impl SubsystemContext, -) -> SubsystemResult<()> +) where I: IntoIterator>, I::IntoIter: Send, @@ -665,7 +659,7 @@ where view: View(Vec::new()), }); - let res = match peer_set { + match peer_set { PeerSet::Validation => dispatch_validation_events_to_all( vec![ NetworkBridgeEvent::PeerConnected(peer.clone(), role), @@ -686,11 +680,6 @@ where ], &mut ctx, ).await, - }; - - if let Err(e) = res { - tracing::warn!(err = ?e, "Aborting - Failure to dispatch messages to overseer"); - return Err(e); } } } @@ -704,7 +693,7 @@ where validator_discovery.on_peer_disconnected(&peer); if peer_map.remove(&peer).is_some() { - let res = match peer_set { + match peer_set { PeerSet::Validation => dispatch_validation_event_to_all( NetworkBridgeEvent::PeerDisconnected(peer), &mut ctx, @@ -713,15 +702,6 @@ where NetworkBridgeEvent::PeerDisconnected(peer), &mut ctx, ).await, - }; - - if let Err(e) = res { - tracing::warn!( - target: LOG_TARGET, - err = ?e, - "Aborting - Failure to dispatch messages to overseer", - ); - return Err(e) } } }, @@ -734,17 +714,7 @@ where &mut network_service, ).await?; - if let Err(e) = dispatch_validation_events_to_all( - events, - &mut ctx, - ).await { - tracing::warn!( - target: LOG_TARGET, - err = ?e, - "Aborting - Failure to dispatch messages to overseer", - ); - return Err(e) - } + dispatch_validation_events_to_all(events, &mut ctx).await; } if !c_messages.is_empty() { @@ -755,17 +725,7 @@ where &mut network_service, ).await?; - if let Err(e) = dispatch_collation_events_to_all( - events, - &mut ctx, - ).await { - tracing::warn!( - target: LOG_TARGET, - err = ?e, - "Aborting - Failure to dispatch messages to overseer", - ); - return Err(e) - } + dispatch_collation_events_to_all(events, &mut ctx).await; } }, } diff --git a/node/network/collator-protocol/src/collator_side.rs b/node/network/collator-protocol/src/collator_side.rs index 5a2d48003a51..baf6bf6c0369 100644 --- a/node/network/collator-protocol/src/collator_side.rs +++ b/node/network/collator-protocol/src/collator_side.rs @@ -234,7 +234,7 @@ where if let Some(view) = state.peer_views.get(peer) { if view.contains(&relay_parent) { let peer = peer.clone(); - advertise_collation(ctx, state, relay_parent, vec![peer]).await?; + advertise_collation(ctx, state, relay_parent, vec![peer]).await; } } } @@ -320,7 +320,7 @@ async fn declare( ctx: &mut Context, state: &mut State, to: Vec, -) -> Result<()> +) where Context: SubsystemContext { @@ -331,9 +331,7 @@ where to, protocol_v1::CollationProtocol::CollatorProtocol(wire_message), ) - )).await?; - - Ok(()) + )).await; } /// Issue a connection request to a set of validators and @@ -370,15 +368,13 @@ async fn advertise_collation( state: &mut State, relay_parent: Hash, to: Vec, -) -> Result<()> +) where Context: SubsystemContext { let collating_on = match state.collating_on { Some(collating_on) => collating_on, - None => { - return Ok(()); - } + None => return, }; let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent, collating_on); @@ -388,11 +384,9 @@ where to, protocol_v1::CollationProtocol::CollatorProtocol(wire_message), ) - )).await?; + )).await; state.metrics.on_advertisment_made(); - - Ok(()) } /// The main incoming message dispatching switch. @@ -482,7 +476,7 @@ async fn send_collation( origin: PeerId, receipt: CandidateReceipt, pov: PoV, -) -> Result<()> +) where Context: SubsystemContext { @@ -497,11 +491,9 @@ where vec![origin], protocol_v1::CollationProtocol::CollatorProtocol(wire_message), ) - )).await?; + )).await; state.metrics.on_collation_sent(); - - Ok(()) } /// A networking messages switch. @@ -535,7 +527,7 @@ where Some(our_para_id) => { if our_para_id == para_id { if let Some(collation) = state.collations.get(&relay_parent).cloned() { - send_collation(ctx, state, request_id, origin, collation.0, collation.1).await?; + send_collation(ctx, state, request_id, origin, collation.0, collation.1).await; } } else { tracing::warn!( @@ -573,7 +565,7 @@ async fn handle_peer_view_change( state: &mut State, peer_id: PeerId, view: View, -) -> Result<()> +) where Context: SubsystemContext { @@ -585,11 +577,9 @@ where for added in added.into_iter() { if state.collations.contains_key(&added) { - advertise_collation(ctx, state, added.clone(), vec![peer_id.clone()]).await?; + advertise_collation(ctx, state, added.clone(), vec![peer_id.clone()]).await; } } - - Ok(()) } /// A validator is connected. @@ -601,7 +591,7 @@ async fn handle_validator_connected( state: &mut State, peer_id: PeerId, validator_id: ValidatorId, -) -> Result<()> +) where Context: SubsystemContext { @@ -610,11 +600,9 @@ where if unknown { // Only declare the new peers. - declare(ctx, state, vec![peer_id.clone()]).await?; + declare(ctx, state, vec![peer_id.clone()]).await; state.peer_views.insert(peer_id, Default::default()); } - - Ok(()) } /// Bridge messages switch. @@ -635,7 +623,7 @@ where // it should be handled here. } PeerViewChange(peer_id, view) => { - handle_peer_view_change(ctx, state, peer_id, view).await?; + handle_peer_view_change(ctx, state, peer_id, view).await; } PeerDisconnected(peer_id) => { state.known_validators.retain(|_, v| *v != peer_id); @@ -697,13 +685,7 @@ where let _timer = state.metrics.time_handle_connection_request(); while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) { - if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id, validator_id).await { - tracing::warn!( - target: LOG_TARGET, - err = ?err, - "Failed to declare our collator id", - ); - } + handle_validator_connected(&mut ctx, &mut state, peer_id, validator_id).await; } // put it back state.last_connection_request = Some(request); diff --git a/node/network/collator-protocol/src/lib.rs b/node/network/collator-protocol/src/lib.rs index 5909c0b8e61d..fd27fbcfc9e9 100644 --- a/node/network/collator-protocol/src/lib.rs +++ b/node/network/collator-protocol/src/lib.rs @@ -136,7 +136,7 @@ where /// Modify the reputation of a peer based on its behavior. #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] -async fn modify_reputation(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()> +async fn modify_reputation(ctx: &mut Context, peer: PeerId, rep: Rep) where Context: SubsystemContext, { @@ -149,7 +149,5 @@ where ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep), - )).await?; - - Ok(()) + )).await; } diff --git a/node/network/collator-protocol/src/validator_side.rs b/node/network/collator-protocol/src/validator_side.rs index faeda6d307ec..3af5aba5e415 100644 --- a/node/network/collator-protocol/src/validator_side.rs +++ b/node/network/collator-protocol/src/validator_side.rs @@ -225,7 +225,7 @@ async fn fetch_collation( collator_id: CollatorId, para_id: ParaId, tx: oneshot::Sender<(CandidateReceipt, PoV)> -) -> Result<()> +) where Context: SubsystemContext { @@ -242,7 +242,7 @@ where "Failed to send collation", ); } - return Ok(()); + return; } } } @@ -262,10 +262,8 @@ where // Request the collation. // Assume it is `request_collation`'s job to check and ignore duplicate requests. if let Some(relevant_advertiser) = relevant_advertiser { - request_collation(ctx, state, relay_parent, para_id, relevant_advertiser, tx).await?; + request_collation(ctx, state, relay_parent, para_id, relevant_advertiser, tx).await; } - - Ok(()) } /// Report a collator for some malicious actions. @@ -274,7 +272,7 @@ async fn report_collator( ctx: &mut Context, state: &mut State, id: CollatorId, -) -> Result<()> +) where Context: SubsystemContext { @@ -283,11 +281,9 @@ where // is a tolerable thing to do. for (k, v) in state.known_collators.iter() { if *v == id { - modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await?; + modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await; } } - - Ok(()) } /// Some other subsystem has reported a collator as a good one, bump reputation. @@ -296,17 +292,15 @@ async fn note_good_collation( ctx: &mut Context, state: &mut State, id: CollatorId, -) -> Result<()> +) where Context: SubsystemContext { for (peer_id, collator_id) in state.known_collators.iter() { if id == *collator_id { - modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await?; + modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await; } } - - Ok(()) } /// A peer's view has changed. A number of things should be done: @@ -362,7 +356,7 @@ async fn received_collation( request_id: RequestId, receipt: CandidateReceipt, pov: PoV, -) -> Result<()> +) where Context: SubsystemContext { @@ -390,11 +384,9 @@ where // If this collation is not just a delayed one that we were expecting, // but our view has moved on, in that case modify peer's reputation. if !state.recently_removed_heads.contains(&relay_parent) { - modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?; + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; } } - - Ok(()) } /// Request a collation from the network. @@ -411,7 +403,7 @@ async fn request_collation( para_id: ParaId, peer_id: PeerId, result: oneshot::Sender<(CandidateReceipt, PoV)>, -) -> Result<()> +) where Context: SubsystemContext { @@ -423,7 +415,7 @@ where relay_parent = %relay_parent, "collation is no longer in view", ); - return Ok(()); + return; } if state.requested_collations.contains_key(&(relay_parent, para_id.clone(), peer_id.clone())) { @@ -434,7 +426,7 @@ where relay_parent = %relay_parent, "collation has already been requested", ); - return Ok(()); + return; } let request_id = state.next_request_id; @@ -470,9 +462,7 @@ where vec![peer_id], protocol_v1::CollationProtocol::CollatorProtocol(wire_message), ) - )).await?; - - Ok(()) + )).await; } /// Notify `CandidateSelectionSubsystem` that a collation has been advertised. @@ -482,7 +472,7 @@ async fn notify_candidate_selection( collator: CollatorId, relay_parent: Hash, para_id: ParaId, -) -> Result<()> +) where Context: SubsystemContext { @@ -492,9 +482,7 @@ where para_id, collator, ) - )).await?; - - Ok(()) + )).await; } /// Networking message has been received. @@ -504,7 +492,7 @@ async fn process_incoming_peer_message( state: &mut State, origin: PeerId, msg: protocol_v1::CollatorProtocolMessage, -)-> Result<()> +) where Context: SubsystemContext { @@ -519,19 +507,17 @@ where state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent)); if let Some(collator) = state.known_collators.get(&origin) { - notify_candidate_selection(ctx, collator.clone(), relay_parent, para_id).await?; + notify_candidate_selection(ctx, collator.clone(), relay_parent, para_id).await; } } RequestCollation(_, _, _) => { // This is a validator side of the protocol, collation requests are not expected here. - return modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; } Collation(request_id, receipt, pov) => { - received_collation(ctx, state, origin, request_id, receipt, pov).await?; + received_collation(ctx, state, origin, request_id, receipt, pov).await; } } - - Ok(()) } /// A leaf has become inactive so we want to @@ -592,7 +578,7 @@ async fn request_timed_out( ctx: &mut Context, state: &mut State, id: RequestId, -) -> Result<()> +) where Context: SubsystemContext { @@ -604,12 +590,10 @@ where if let Some(_) = state.requests_info.remove(&id) { let peer_id = key.2; - modify_reputation(ctx, peer_id, COST_REQUEST_TIMED_OUT).await?; + modify_reputation(ctx, peer_id, COST_REQUEST_TIMED_OUT).await; } } } - - Ok(()) } /// Bridge event switch. @@ -639,7 +623,7 @@ where handle_our_view_change(state, view).await?; }, PeerMessage(remote, msg) => { - process_incoming_peer_message(ctx, state, remote, msg).await?; + process_incoming_peer_message(ctx, state, remote, msg).await; } } @@ -652,7 +636,7 @@ async fn process_msg( ctx: &mut Context, msg: CollatorProtocolMessage, state: &mut State, -) -> Result<()> +) where Context: SubsystemContext { @@ -675,13 +659,13 @@ where ); } FetchCollation(relay_parent, collator_id, para_id, tx) => { - fetch_collation(ctx, state, relay_parent, collator_id, para_id, tx).await?; + fetch_collation(ctx, state, relay_parent, collator_id, para_id, tx).await; } ReportCollator(id) => { - report_collator(ctx, state, id).await?; + report_collator(ctx, state, id).await; } NoteGoodCollation(id) => { - note_good_collation(ctx, state, id).await?; + note_good_collation(ctx, state, id).await; } NetworkBridgeUpdateV1(event) => { if let Err(e) = handle_network_msg( @@ -697,8 +681,6 @@ where } } } - - Ok(()) } /// The main run loop. @@ -726,7 +708,7 @@ where tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message"); match msg { - Communication { msg } => process_msg(&mut ctx, msg, &mut state).await?, + Communication { msg } => process_msg(&mut ctx, msg, &mut state).await, Signal(BlockFinalized(_)) => {} Signal(ActiveLeaves(_)) => {} Signal(Conclude) => { break } @@ -742,7 +724,7 @@ where match request { CollationRequestResult::Timeout(id) => { tracing::trace!(target: LOG_TARGET, id, "request timed out"); - request_timed_out(&mut ctx, &mut state, id).await?; + request_timed_out(&mut ctx, &mut state, id).await; } CollationRequestResult::Received(id) => { state.requests_info.remove(&id); diff --git a/node/network/pov-distribution/src/lib.rs b/node/network/pov-distribution/src/lib.rs index e0050aeb2732..01b86b44a0fb 100644 --- a/node/network/pov-distribution/src/lib.rs +++ b/node/network/pov-distribution/src/lib.rs @@ -132,7 +132,7 @@ async fn handle_signal( ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::Validators(vals_tx), - ))).await?; + ))).await; let n_validators = match vals_rx.await? { Ok(v) => v.len(), @@ -178,7 +178,7 @@ async fn notify_all_we_are_awaiting( ctx: &mut impl SubsystemContext, relay_parent: Hash, pov_hash: Hash, -) -> SubsystemResult<()> { +) { // We use `awaited` as a proxy for which heads are in the peer's view. let peers_to_send: Vec<_> = peers.iter() .filter_map(|(peer, state)| if state.awaited.contains_key(&relay_parent) { @@ -188,7 +188,9 @@ async fn notify_all_we_are_awaiting( }) .collect(); - if peers_to_send.is_empty() { return Ok(()) } + if peers_to_send.is_empty() { + return; + } let payload = awaiting_message(relay_parent, vec![pov_hash]); @@ -205,7 +207,7 @@ async fn notify_one_we_are_awaiting_many( ctx: &mut impl SubsystemContext, relay_parent_state: &HashMap, relay_parent: Hash, -) -> SubsystemResult<()> { +) { let awaiting_hashes = relay_parent_state.get(&relay_parent).into_iter().flat_map(|s| { // Send the peer everything we are fetching at this relay-parent s.fetching.iter() @@ -213,7 +215,9 @@ async fn notify_one_we_are_awaiting_many( .map(|(pov_hash, _)| *pov_hash) }).collect::>(); - if awaiting_hashes.is_empty() { return Ok(()) } + if awaiting_hashes.is_empty() { + return; + } let payload = awaiting_message(relay_parent, awaiting_hashes); @@ -232,7 +236,7 @@ async fn distribute_to_awaiting( relay_parent: Hash, pov_hash: Hash, pov: &PoV, -) -> SubsystemResult<()> { +) { // Send to all peers who are awaiting the PoV and have that relay-parent in their view. // // Also removes it from their awaiting set. @@ -246,18 +250,16 @@ async fn distribute_to_awaiting( })) .collect(); - if peers_to_send.is_empty() { return Ok(()) } + if peers_to_send.is_empty() { return; } let payload = send_pov_message(relay_parent, pov_hash, pov.clone()); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, payload, - ))).await?; + ))).await; metrics.on_pov_distributed(); - - Ok(()) } /// Handles a `FetchPoV` message. @@ -268,17 +270,17 @@ async fn handle_fetch( relay_parent: Hash, descriptor: CandidateDescriptor, response_sender: oneshot::Sender>, -) -> SubsystemResult<()> { +) { let _timer = state.metrics.time_handle_fetch(); let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { Some(s) => s, - None => return Ok(()), + None => return, }; if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) { let _ = response_sender.send(pov.clone()); - return Ok(()); + return; } { @@ -286,7 +288,7 @@ async fn handle_fetch( Entry::Occupied(mut e) => { // we are already awaiting this PoV if there is an entry. e.get_mut().push(response_sender); - return Ok(()); + return; } Entry::Vacant(e) => { e.insert(vec![response_sender]); @@ -299,7 +301,7 @@ async fn handle_fetch( relay_parent_state.fetching.len = relay_parent_state.fetching.len(), "other subsystems have requested PoV distribution to fetch more PoVs than reasonably expected", ); - return Ok(()); + return; } // Issue an `Awaiting` message to all peers with this in their view. @@ -319,12 +321,12 @@ async fn handle_distribute( relay_parent: Hash, descriptor: CandidateDescriptor, pov: Arc, -) -> SubsystemResult<()> { +) { let _timer = state.metrics.time_handle_distribute(); let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { - None => return Ok(()), Some(s) => s, + None => return, }; if let Some(our_awaited) = relay_parent_state.fetching.get_mut(&descriptor.pov_hash) { @@ -355,7 +357,7 @@ async fn report_peer( ctx: &mut impl SubsystemContext, peer: PeerId, rep: Rep, -) -> SubsystemResult<()> { +) { ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await } @@ -367,16 +369,16 @@ async fn handle_awaiting( peer: PeerId, relay_parent: Hash, pov_hashes: Vec, -) -> SubsystemResult<()> { +) { if !state.our_view.0.contains(&relay_parent) { - report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?; - return Ok(()); + report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await; + return; } let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { None => { tracing::warn!("PoV Distribution relay parent state out-of-sync with our view"); - return Ok(()); + return; } Some(s) => s, }; @@ -385,8 +387,8 @@ async fn handle_awaiting( state.peer_state.get_mut(&peer).and_then(|s| s.awaited.get_mut(&relay_parent)) { None => { - report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?; - return Ok(()); + report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await; + return; } Some(a) => a, }; @@ -400,16 +402,14 @@ async fn handle_awaiting( let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone()); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) - )).await?; + )).await; } else { peer_awaiting.insert(pov_hash); } } } else { - report_peer(ctx, peer, COST_APPARENT_FLOOD).await?; + report_peer(ctx, peer, COST_APPARENT_FLOOD).await; } - - Ok(()) } /// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not. @@ -423,11 +423,11 @@ async fn handle_incoming_pov( relay_parent: Hash, pov_hash: Hash, pov: PoV, -) -> SubsystemResult<()> { +) { let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { None => { - report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; - return Ok(()); + report_peer(ctx, peer, COST_UNEXPECTED_POV).await; + return; }, Some(r) => r, }; @@ -436,16 +436,16 @@ async fn handle_incoming_pov( // Do validity checks and complete all senders awaiting this PoV. let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) { None => { - report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; - return Ok(()); + report_peer(ctx, peer, COST_UNEXPECTED_POV).await; + return; } Some(f) => f, }; let hash = pov.hash(); if hash != pov_hash { - report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; - return Ok(()); + report_peer(ctx, peer, COST_UNEXPECTED_POV).await; + return; } let pov = Arc::new(pov); @@ -453,10 +453,10 @@ async fn handle_incoming_pov( if fetching.is_empty() { // fetching is empty whenever we were awaiting something and // it was completed afterwards. - report_peer(ctx, peer.clone(), BENEFIT_LATE_POV).await?; + report_peer(ctx, peer.clone(), BENEFIT_LATE_POV).await; } else { // fetching is non-empty when the peer just provided us with data we needed. - report_peer(ctx, peer.clone(), BENEFIT_FRESH_POV).await?; + report_peer(ctx, peer.clone(), BENEFIT_FRESH_POV).await; } for response_sender in fetching.drain(..) { @@ -488,17 +488,15 @@ async fn handle_network_update( state: &mut State, ctx: &mut impl SubsystemContext, update: NetworkBridgeEvent, -) -> SubsystemResult<()> { +) { let _timer = state.metrics.time_handle_network_update(); match update { NetworkBridgeEvent::PeerConnected(peer, _observed_role) => { state.peer_state.insert(peer, PeerState { awaited: HashMap::new() }); - Ok(()) } NetworkBridgeEvent::PeerDisconnected(peer) => { state.peer_state.remove(&peer); - Ok(()) } NetworkBridgeEvent::PeerViewChange(peer_id, view) => { if let Some(peer_state) = state.peer_state.get_mut(&peer_id) { @@ -516,12 +514,11 @@ async fn handle_network_update( ctx, &state.relay_parent_state, *relay_parent, - ).await?; + ).await; } } } - Ok(()) } NetworkBridgeEvent::PeerMessage(peer, message) => { match message { @@ -546,7 +543,6 @@ async fn handle_network_update( } NetworkBridgeEvent::OurViewChange(view) => { state.our_view = view; - Ok(()) } } } @@ -582,7 +578,7 @@ impl PoVDistribution { relay_parent, descriptor, response_sender, - ).await?, + ).await, PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) => handle_distribute( &mut state, @@ -590,13 +586,13 @@ impl PoVDistribution { relay_parent, descriptor, pov, - ).await?, + ).await, PoVDistributionMessage::NetworkBridgeUpdateV1(event) => handle_network_update( &mut state, &mut ctx, event, - ).await?, + ).await, }, } } diff --git a/node/network/pov-distribution/src/tests.rs b/node/network/pov-distribution/src/tests.rs index 65f32ffe8810..03a39507a91d 100644 --- a/node/network/pov-distribution/src/tests.rs +++ b/node/network/pov-distribution/src/tests.rs @@ -80,7 +80,7 @@ fn distributes_to_those_awaiting_and_completes_local() { hash_a, descriptor, Arc::new(pov.clone()), - ).await.unwrap(); + ).await; assert!(!state.peer_state[&peer_a].awaited[&hash_a].contains(&pov_hash)); assert!(state.peer_state[&peer_c].awaited[&hash_b].contains(&pov_hash)); @@ -160,7 +160,7 @@ fn we_inform_peers_with_same_view_we_are_awaiting() { hash_a, descriptor, pov_send, - ).await.unwrap(); + ).await; assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1); @@ -234,7 +234,7 @@ fn peer_view_change_leads_to_us_informing() { &mut state, &mut ctx, NetworkBridgeEvent::PeerViewChange(peer_a.clone(), View(vec![hash_a, hash_b])), - ).await.unwrap(); + ).await; assert_matches!( handle.recv().await, @@ -310,7 +310,7 @@ fn peer_complete_fetch_and_is_rewarded() { peer_a.clone(), send_pov_message(hash_a, pov_hash, pov.clone()), ).focus().unwrap(), - ).await.unwrap(); + ).await; handle_network_update( &mut state, @@ -319,7 +319,7 @@ fn peer_complete_fetch_and_is_rewarded() { peer_b.clone(), send_pov_message(hash_a, pov_hash, pov.clone()), ).focus().unwrap(), - ).await.unwrap(); + ).await; assert_eq!(&*pov_recv.await.unwrap(), &pov); @@ -399,7 +399,7 @@ fn peer_punished_for_sending_bad_pov() { peer_a.clone(), send_pov_message(hash_a, pov_hash, bad_pov.clone()), ).focus().unwrap(), - ).await.unwrap(); + ).await; // didn't complete our sender. assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1); @@ -463,7 +463,7 @@ fn peer_punished_for_sending_unexpected_pov() { peer_a.clone(), send_pov_message(hash_a, pov_hash, pov.clone()), ).focus().unwrap(), - ).await.unwrap(); + ).await; assert_matches!( handle.recv().await, @@ -525,7 +525,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() { peer_a.clone(), send_pov_message(hash_b, pov_hash, pov.clone()), ).focus().unwrap(), - ).await.unwrap(); + ).await; assert_matches!( handle.recv().await, @@ -588,7 +588,7 @@ fn peer_reported_for_awaiting_too_much() { peer_a.clone(), awaiting_message(hash_a, vec![pov_hash]), ).focus().unwrap(), - ).await.unwrap(); + ).await; } assert_eq!(state.peer_state[&peer_a].awaited[&hash_a].len(), max_plausibly_awaited); @@ -602,7 +602,7 @@ fn peer_reported_for_awaiting_too_much() { peer_a.clone(), awaiting_message(hash_a, vec![last_pov_hash]), ).focus().unwrap(), - ).await.unwrap(); + ).await; // No more bookkeeping for you! assert_eq!(state.peer_state[&peer_a].awaited[&hash_a].len(), max_plausibly_awaited); @@ -672,7 +672,7 @@ fn peer_reported_for_awaiting_outside_their_view() { peer_a.clone(), awaiting_message(hash_b, vec![pov_hash]), ).focus().unwrap(), - ).await.unwrap(); + ).await; assert!(state.peer_state[&peer_a].awaited.get(&hash_b).is_none()); @@ -735,7 +735,7 @@ fn peer_reported_for_awaiting_outside_our_view() { peer_a.clone(), awaiting_message(hash_b, vec![pov_hash]), ).focus().unwrap(), - ).await.unwrap(); + ).await; // Illegal `awaited` is ignored. assert!(state.peer_state[&peer_a].awaited[&hash_b].is_empty()); @@ -810,7 +810,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { peer_a.clone(), send_pov_message(hash_a, pov_hash, pov.clone()), ).focus().unwrap(), - ).await.unwrap(); + ).await; assert_eq!(&*pov_recv.await.unwrap(), &pov); @@ -893,7 +893,7 @@ fn peer_completing_request_no_longer_awaiting() { peer_a.clone(), send_pov_message(hash_a, pov_hash, pov.clone()), ).focus().unwrap(), - ).await.unwrap(); + ).await; assert_eq!(&*pov_recv.await.unwrap(), &pov); diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index b01b60014e61..d8afe8341bb0 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -523,7 +523,7 @@ async fn circulate_statement_and_dependents( relay_parent: Hash, statement: SignedFullStatement, metrics: &Metrics, -) -> SubsystemResult<()> { +) { if let Some(active_head)= active_heads.get_mut(&relay_parent) { // First circulate the statement directly to all peers needing it. @@ -532,7 +532,7 @@ async fn circulate_statement_and_dependents( match active_head.note_statement(statement) { NotedStatement::Fresh(stored) => Some(( *stored.compact().candidate_hash(), - circulate_statement(peers, ctx, relay_parent, stored).await?, + circulate_statement(peers, ctx, relay_parent, stored).await, )), _ => None, } @@ -552,13 +552,11 @@ async fn circulate_statement_and_dependents( candidate_hash, &*active_head, metrics, - ).await?; + ).await; } } } } - - Ok(()) } fn statement_message(relay_parent: Hash, statement: SignedFullStatement) @@ -577,7 +575,7 @@ async fn circulate_statement( ctx: &mut impl SubsystemContext, relay_parent: Hash, stored: &StoredStatement, -) -> SubsystemResult> { +) -> Vec { let fingerprint = stored.fingerprint(); let mut peers_to_send = HashMap::new(); @@ -594,14 +592,14 @@ async fn circulate_statement( ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send.keys().cloned().collect(), payload, - ))).await?; + ))).await; } - Ok(peers_to_send.into_iter().filter_map(|(peer, needs_dependent)| if needs_dependent { + peers_to_send.into_iter().filter_map(|(peer, needs_dependent)| if needs_dependent { Some(peer) } else { None - }).collect()) + }).collect() } /// Send all statements about a given candidate hash to a peer. @@ -614,7 +612,7 @@ async fn send_statements_about( candidate_hash: CandidateHash, active_head: &ActiveHeadData, metrics: &Metrics, -) -> SubsystemResult<()> { +) { for statement in active_head.statements_about(candidate_hash) { if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { let payload = statement_message( @@ -624,13 +622,11 @@ async fn send_statements_about( ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) - )).await?; + )).await; metrics.on_statement_distributed(); } } - - Ok(()) } /// Send all statements at a given relay-parent to a peer. @@ -642,7 +638,7 @@ async fn send_statements( relay_parent: Hash, active_head: &ActiveHeadData, metrics: &Metrics, -) -> SubsystemResult<()> { +) { for statement in active_head.statements() { if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { let payload = statement_message( @@ -652,20 +648,18 @@ async fn send_statements( ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) - )).await?; + )).await; metrics.on_statement_distributed(); } } - - Ok(()) } async fn report_peer( ctx: &mut impl SubsystemContext, peer: PeerId, rep: Rep, -) -> SubsystemResult<()> { +) { ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) )).await @@ -685,13 +679,14 @@ async fn handle_incoming_message<'a>( ctx: &mut impl SubsystemContext, message: protocol_v1::StatementDistributionMessage, metrics: &Metrics, -) -> SubsystemResult> { +) -> Option<(Hash, &'a StoredStatement)> { let (relay_parent, statement) = match message { protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s), }; if !our_view.contains(&relay_parent) { - return report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await.map(|_| None); + report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await; + return None; } let active_head = match active_heads.get_mut(&relay_parent) { @@ -703,13 +698,14 @@ async fn handle_incoming_message<'a>( requested_relay_parent = %relay_parent, "our view out-of-sync with active heads; head not found", ); - return Ok(None); + return None; } }; // check the signature on the statement. if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) { - return report_peer(ctx, peer, COST_INVALID_SIGNATURE).await.map(|_| None); + report_peer(ctx, peer, COST_INVALID_SIGNATURE).await; + return None; } // Ensure the statement is stored in the peer data. @@ -720,8 +716,8 @@ async fn handle_incoming_message<'a>( let max_message_count = active_head.validators.len() * 2; match peer_data.receive(&relay_parent, &fingerprint, max_message_count) { Err(rep) => { - report_peer(ctx, peer, rep).await?; - return Ok(None) + report_peer(ctx, peer, rep).await; + return None; } Ok(true) => { // Send the peer all statements concerning the candidate that we have, @@ -734,7 +730,7 @@ async fn handle_incoming_message<'a>( fingerprint.0.candidate_hash().clone(), &*active_head, metrics, - ).await? + ).await; } Ok(false) => {} } @@ -742,14 +738,14 @@ async fn handle_incoming_message<'a>( // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation // or unpinned to a seconded candidate. So it is safe to place it into the storage. match active_head.note_statement(statement) { - NotedStatement::NotUseful => Ok(None), + NotedStatement::NotUseful => None, NotedStatement::UsefulButKnown => { - report_peer(ctx, peer, BENEFIT_VALID_STATEMENT).await?; - Ok(None) + report_peer(ctx, peer, BENEFIT_VALID_STATEMENT).await; + None } NotedStatement::Fresh(statement) => { - report_peer(ctx, peer, BENEFIT_VALID_STATEMENT_FIRST).await?; - Ok(Some((relay_parent, statement))) + report_peer(ctx, peer, BENEFIT_VALID_STATEMENT_FIRST).await; + Some((relay_parent, statement)) } } } @@ -763,7 +759,7 @@ async fn update_peer_view_and_send_unlocked( active_heads: &HashMap, new_view: View, metrics: &Metrics, -) -> SubsystemResult<()> { +) { let old_view = std::mem::replace(&mut peer_data.view, new_view); // Remove entries for all relay-parents in the old view but not the new. @@ -785,11 +781,9 @@ async fn update_peer_view_and_send_unlocked( new, active_head, metrics, - ).await?; + ).await; } } - - Ok(()) } #[tracing::instrument(level = "trace", skip(peers, active_heads, ctx, metrics), fields(subsystem = LOG_TARGET))] @@ -800,19 +794,16 @@ async fn handle_network_update( our_view: &mut View, update: NetworkBridgeEvent, metrics: &Metrics, -) -> SubsystemResult<()> { +) { match update { NetworkBridgeEvent::PeerConnected(peer, _role) => { peers.insert(peer, PeerData { view: Default::default(), view_knowledge: Default::default(), }); - - Ok(()) } NetworkBridgeEvent::PeerDisconnected(peer) => { peers.remove(&peer); - Ok(()) } NetworkBridgeEvent::PeerMessage(peer, message) => { match peers.get_mut(&peer) { @@ -825,7 +816,7 @@ async fn handle_network_update( ctx, message, metrics, - ).await?; + ).await; if let Some((relay_parent, new)) = new_stored { // When we receive a new message from a peer, we forward it to the @@ -833,12 +824,10 @@ async fn handle_network_update( let message = AllMessages::CandidateBacking( CandidateBackingMessage::Statement(relay_parent, new.statement.clone()) ); - ctx.send_message(message).await?; + ctx.send_message(message).await; } - - Ok(()) } - None => Ok(()), + None => (), } } @@ -854,7 +843,7 @@ async fn handle_network_update( metrics, ).await } - None => Ok(()), + None => (), } } NetworkBridgeEvent::OurViewChange(view) => { @@ -872,8 +861,6 @@ async fn handle_network_update( ); } } - - Ok(()) } } @@ -917,7 +904,7 @@ impl StatementDistribution { ctx.send_messages( std::iter::once(val_message).chain(std::iter::once(session_message)) - ).await?; + ).await; match (val_rx.await?, session_rx.await?) { (Ok(v), Ok(s)) => (v, s), @@ -959,7 +946,7 @@ impl StatementDistribution { relay_parent, statement, &metrics, - ).await?; + ).await; } StatementDistributionMessage::NetworkBridgeUpdateV1(event) => { let _timer = metrics.time_network_bridge_update_v1(); @@ -971,7 +958,7 @@ impl StatementDistribution { &mut our_view, event, &metrics, - ).await? + ).await; } StatementDistributionMessage::RegisterStatementListener(tx) => { statement_listeners.push(tx); @@ -1428,7 +1415,7 @@ mod tests { &active_heads, new_view.clone(), &Default::default(), - ).await.unwrap(); + ).await; assert_eq!(peer_data.view, new_view); assert!(!peer_data.view_knowledge.contains_key(&hash_a)); @@ -1544,7 +1531,7 @@ mod tests { &mut ctx, hash_b, &statement, - ).await.unwrap(); + ).await; { assert_eq!(needs_dependents.len(), 2); diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index e07280c1ca59..e481d38adcc6 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -64,7 +64,7 @@ impl Subsystem1 { }.into(), tx, ) - )).await.unwrap(); + )).await; } } } diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 113d3d0be0b1..04ff8c3c24ec 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -193,20 +193,20 @@ pub struct OverseerHandler { impl OverseerHandler { /// Inform the `Overseer` that that some block was imported. #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - pub async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { - self.events_tx.send(Event::BlockImported(block)).await.map_err(Into::into) + pub async fn block_imported(&mut self, block: BlockInfo) { + self.send_logging_error(Event::BlockImported(block)).await } /// Send some message to one of the `Subsystem`s. #[tracing::instrument(level = "trace", skip(self, msg), fields(subsystem = LOG_TARGET))] - pub async fn send_msg(&mut self, msg: impl Into) -> SubsystemResult<()> { - self.events_tx.send(Event::MsgToSubsystem(msg.into())).await.map_err(Into::into) + pub async fn send_msg(&mut self, msg: impl Into) { + self.send_logging_error(Event::MsgToSubsystem(msg.into())).await } /// Inform the `Overseer` that that some block was finalized. #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - pub async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { - self.events_tx.send(Event::BlockFinalized(block)).await.map_err(Into::into) + pub async fn block_finalized(&mut self, block: BlockInfo) { + self.send_logging_error(Event::BlockFinalized(block)).await } /// Wait for a block with the given hash to be in the active-leaves set. @@ -217,17 +217,23 @@ impl OverseerHandler { /// the response channel may never return if the hash was deactivated before this call. /// In this case, it's the caller's responsibility to ensure a timeout is set. #[tracing::instrument(level = "trace", skip(self, response_channel), fields(subsystem = LOG_TARGET))] - pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender>) -> SubsystemResult<()> { - self.events_tx.send(Event::ExternalRequest(ExternalRequest::WaitForActivation { + pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender>) { + self.send_logging_error(Event::ExternalRequest(ExternalRequest::WaitForActivation { hash, response_channel - })).await.map_err(Into::into) + })).await } /// Tell `Overseer` to shutdown. #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - pub async fn stop(&mut self) -> SubsystemResult<()> { - self.events_tx.send(Event::Stop).await.map_err(Into::into) + pub async fn stop(&mut self) { + self.send_logging_error(Event::Stop).await + } + + async fn send_logging_error(&mut self, event: Event) { + if self.events_tx.send(event).await.is_err() { + tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); + } } } @@ -239,7 +245,7 @@ impl OverseerHandler { pub async fn forward_events>( client: Arc

, mut handler: OverseerHandler, -) -> SubsystemResult<()> { +) { let mut finality = client.finality_notification_stream(); let mut imports = client.import_notification_stream(); @@ -248,7 +254,7 @@ pub async fn forward_events>( f = finality.next() => { match f { Some(block) => { - handler.block_finalized(block.into()).await?; + handler.block_finalized(block.into()).await; } None => break, } @@ -256,7 +262,7 @@ pub async fn forward_events>( i = imports.next() => { match i { Some(block) => { - handler.block_imported(block.into()).await?; + handler.block_imported(block.into()).await; } None => break, } @@ -264,8 +270,6 @@ pub async fn forward_events>( complete => break, } } - - Ok(()) } impl Debug for ToOverseer { @@ -338,15 +342,34 @@ impl SubsystemContext for OverseerSubsystemContext { }).await.map_err(Into::into) } - async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { - self.tx.send(ToOverseer::SubsystemMessage(msg)).await.map_err(Into::into) + async fn send_message(&mut self, msg: AllMessages) { + self.send_logging_error(ToOverseer::SubsystemMessage(msg)).await } - async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> + async fn send_messages(&mut self, msgs: T) where T: IntoIterator + Send, T::IntoIter: Send { let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok)); - self.tx.send_all(&mut msgs).await.map_err(Into::into) + if self.tx.send_all(&mut msgs).await.is_err() { + tracing::info!( + target: LOG_TARGET, + context = std::any::type_name::(), + "Failed to send messages to Overseer", + ); + + } + } +} + +impl OverseerSubsystemContext { + async fn send_logging_error(&mut self, msg: ToOverseer) { + if self.tx.send(msg).await.is_err() { + tracing::info!( + target: LOG_TARGET, + context = std::any::type_name::(), + "Failed to send a message to Overseer", + ); + } } } @@ -1712,7 +1735,7 @@ mod tests { tx, ) ) - ).await.unwrap(); + ).await; c += 1; continue; } @@ -1786,7 +1809,7 @@ mod tests { Some(msg) => { s1_results.push(msg); if s1_results.len() == 10 { - handler.stop().await.unwrap(); + handler.stop().await; } } None => break, @@ -1844,10 +1867,10 @@ mod tests { pin_mut!(overseer_fut); - handler.block_imported(second_block).await.unwrap(); - handler.block_imported(third_block).await.unwrap(); - handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await.unwrap(); - handler.stop().await.unwrap(); + handler.block_imported(second_block).await; + handler.block_imported(third_block).await; + handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; + handler.stop().await; select! { res = overseer_fut => { @@ -2012,8 +2035,8 @@ mod tests { let mut ss5_results = Vec::new(); let mut ss6_results = Vec::new(); - handler.block_imported(second_block).await.unwrap(); - handler.block_imported(third_block).await.unwrap(); + handler.block_imported(second_block).await; + handler.block_imported(third_block).await; let expected_heartbeats = vec![ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(first_block_hash)), @@ -2048,7 +2071,7 @@ mod tests { if ss5_results.len() == expected_heartbeats.len() && ss6_results.len() == expected_heartbeats.len() { - handler.stop().await.unwrap(); + handler.stop().await; } } @@ -2106,7 +2129,7 @@ mod tests { let mut ss6_results = Vec::new(); // this should stop work on both forks we started with earlier. - handler.block_finalized(third_block).await.unwrap(); + handler.block_finalized(third_block).await; let expected_heartbeats = vec![ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -2141,7 +2164,7 @@ mod tests { if ss5_results.len() == expected_heartbeats.len() && ss6_results.len() == expected_heartbeats.len() { - handler.stop().await.unwrap(); + handler.stop().await; } } @@ -2343,28 +2366,28 @@ mod tests { hash: Default::default(), parent_hash: Default::default(), number: Default::default(), - }).await.unwrap(); + }).await; // send a msg to each subsystem // except for BitfieldSigning as the message is not instantiable - handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await.unwrap(); - handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await.unwrap(); - handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await.unwrap(); - handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await.unwrap(); - handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await.unwrap(); - handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await.unwrap(); - handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await.unwrap(); - // handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await.unwrap(); - handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await.unwrap(); - handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await.unwrap(); - handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await.unwrap(); - handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await.unwrap(); - handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await.unwrap(); - handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await.unwrap(); - handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await.unwrap(); + handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; + handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await; + handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await; + handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await; + handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await; + handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await; + handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await; + // handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await; + handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await; + handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await; + handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await; + handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await; + handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await; + handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await; + handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await; // send a stop signal to each subsystems - handler.stop().await.unwrap(); + handler.stop().await; select! { res = overseer_fut => { diff --git a/node/subsystem-test-helpers/src/lib.rs b/node/subsystem-test-helpers/src/lib.rs index 6219421902ee..512d761afed1 100644 --- a/node/subsystem-test-helpers/src/lib.rs +++ b/node/subsystem-test-helpers/src/lib.rs @@ -191,15 +191,14 @@ impl SubsystemContext Ok(()) } - async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { + async fn send_message(&mut self, msg: AllMessages) { self.tx .send(msg) .await .expect("test overseer no longer live"); - Ok(()) } - async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> + async fn send_messages(&mut self, msgs: T) where T: IntoIterator + Send, T::IntoIter: Send, @@ -209,8 +208,6 @@ impl SubsystemContext .send_all(&mut iter) .await .expect("test overseer no longer live"); - - Ok(()) } } @@ -341,7 +338,7 @@ mod tests { spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed()); - block_on(handler.send_msg(CandidateSelectionMessage::Invalid(Default::default(), Default::default()))).unwrap(); + block_on(handler.send_msg(CandidateSelectionMessage::Invalid(Default::default(), Default::default()))); assert!(matches!(block_on(rx.into_future()).0.unwrap(), CandidateSelectionMessage::Invalid(_, _))); } } diff --git a/node/subsystem-util/src/lib.rs b/node/subsystem-util/src/lib.rs index 5766f25f16e6..750939a57a1d 100644 --- a/node/subsystem-util/src/lib.rs +++ b/node/subsystem-util/src/lib.rs @@ -207,13 +207,11 @@ where { let (tx, rx) = oneshot::channel(); - ctx - .send_message( - AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx))) - .try_into() - .map_err(|err| Error::SenderConversion(format!("{:?}", err)))?, - ) - .await?; + ctx.send_message( + AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx))) + .try_into() + .map_err(|err| Error::SenderConversion(format!("{:?}", err)))?, + ).await; Ok(rx) } @@ -752,7 +750,7 @@ where break }, outgoing = jobs.next().fuse() => - Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await, + Self::handle_outgoing(outgoing, &mut ctx).await, complete => break, } } @@ -866,13 +864,9 @@ where async fn handle_outgoing( outgoing: Option, ctx: &mut Context, - err_tx: &mut Option, JobsError)>>, ) { let msg = outgoing.expect("the Jobs stream never ends; qed"); - if let Err(e) = ctx.send_message(msg.into()).await { - let e = JobsError::Utility(e.into()); - Self::fwd_err(None, e, err_tx).await; - } + ctx.send_message(msg.into()).await; } } diff --git a/node/subsystem-util/src/validator_discovery.rs b/node/subsystem-util/src/validator_discovery.rs index 0952ad048c4b..83c405a3f4b5 100644 --- a/node/subsystem-util/src/validator_discovery.rs +++ b/node/subsystem-util/src/validator_discovery.rs @@ -63,7 +63,7 @@ pub async fn connect_to_validators( relay_parent, RuntimeApiRequest::ValidatorDiscovery(validators.clone(), tx), ) - )).await?; + )).await; let maybe_authorities = rx.await??; let authorities: Vec<_> = maybe_authorities.iter() @@ -97,7 +97,7 @@ async fn connect_to_authorities( validator_ids, connected, } - )).await?; + )).await; Ok(connected_rx) } diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index dfb4a925f1ca..57f62649f14c 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -204,10 +204,10 @@ pub trait SubsystemContext: Send + 'static { ) -> SubsystemResult<()>; /// Send a direct message to some other `Subsystem`, routed based on message type. - async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()>; + async fn send_message(&mut self, msg: AllMessages); /// Send multiple direct messages to other `Subsystem`s, routed based on message type. - async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> + async fn send_messages(&mut self, msgs: T) where T: IntoIterator + Send, T::IntoIter: Send; } diff --git a/node/test/service/src/lib.rs b/node/test/service/src/lib.rs index efd68cd3a89a..aa46bd6d7ed6 100644 --- a/node/test/service/src/lib.rs +++ b/node/test/service/src/lib.rs @@ -336,13 +336,11 @@ impl PolkadotTestNode { self.overseer_handler .send_msg(CollationGenerationMessage::Initialize(config)) - .await - .expect("Registers the collator"); + .await; self.overseer_handler .send_msg(CollatorProtocolMessage::CollateOn(para_id)) - .await - .expect("Sends CollateOn"); + .await; } } diff --git a/parachain/test-parachains/adder/collator/src/main.rs b/parachain/test-parachains/adder/collator/src/main.rs index 6b7029be07a1..b41201fd4606 100644 --- a/parachain/test-parachains/adder/collator/src/main.rs +++ b/parachain/test-parachains/adder/collator/src/main.rs @@ -86,13 +86,11 @@ fn main() -> Result<()> { }; overseer_handler .send_msg(CollationGenerationMessage::Initialize(config)) - .await - .expect("Registers collator"); + .await; overseer_handler .send_msg(CollatorProtocolMessage::CollateOn(PARA_ID)) - .await - .expect("Collates on"); + .await; Ok(full_node.task_manager) } From 86102a989b0fe5facd1c7cc37f3d5ebd5243e66b Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Mon, 23 Nov 2020 11:11:50 +0100 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Peter Goodspeed-Niklaus --- node/overseer/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 04ff8c3c24ec..25246d860779 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -351,9 +351,9 @@ impl SubsystemContext for OverseerSubsystemContext { { let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok)); if self.tx.send_all(&mut msgs).await.is_err() { - tracing::info!( + tracing::debug!( target: LOG_TARGET, - context = std::any::type_name::(), + msg_type = std::any::type_name::(), "Failed to send messages to Overseer", ); @@ -364,9 +364,9 @@ impl SubsystemContext for OverseerSubsystemContext { impl OverseerSubsystemContext { async fn send_logging_error(&mut self, msg: ToOverseer) { if self.tx.send(msg).await.is_err() { - tracing::info!( + tracing::debug!( target: LOG_TARGET, - context = std::any::type_name::(), + msg_type = std::any::type_name::(), "Failed to send a message to Overseer", ); } From dfa59f2940b024076d6cf53f8af77f6c3551ee11 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Mon, 23 Nov 2020 12:05:50 +0100 Subject: [PATCH 3/3] s/send_logging_error/send_and_log_error --- node/overseer/src/lib.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 25246d860779..76d967dc9ccd 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -194,19 +194,19 @@ impl OverseerHandler { /// Inform the `Overseer` that that some block was imported. #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] pub async fn block_imported(&mut self, block: BlockInfo) { - self.send_logging_error(Event::BlockImported(block)).await + self.send_and_log_error(Event::BlockImported(block)).await } /// Send some message to one of the `Subsystem`s. #[tracing::instrument(level = "trace", skip(self, msg), fields(subsystem = LOG_TARGET))] pub async fn send_msg(&mut self, msg: impl Into) { - self.send_logging_error(Event::MsgToSubsystem(msg.into())).await + self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await } /// Inform the `Overseer` that that some block was finalized. #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] pub async fn block_finalized(&mut self, block: BlockInfo) { - self.send_logging_error(Event::BlockFinalized(block)).await + self.send_and_log_error(Event::BlockFinalized(block)).await } /// Wait for a block with the given hash to be in the active-leaves set. @@ -218,7 +218,7 @@ impl OverseerHandler { /// In this case, it's the caller's responsibility to ensure a timeout is set. #[tracing::instrument(level = "trace", skip(self, response_channel), fields(subsystem = LOG_TARGET))] pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender>) { - self.send_logging_error(Event::ExternalRequest(ExternalRequest::WaitForActivation { + self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation { hash, response_channel })).await @@ -227,10 +227,10 @@ impl OverseerHandler { /// Tell `Overseer` to shutdown. #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] pub async fn stop(&mut self) { - self.send_logging_error(Event::Stop).await + self.send_and_log_error(Event::Stop).await } - async fn send_logging_error(&mut self, event: Event) { + async fn send_and_log_error(&mut self, event: Event) { if self.events_tx.send(event).await.is_err() { tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); } @@ -343,7 +343,7 @@ impl SubsystemContext for OverseerSubsystemContext { } async fn send_message(&mut self, msg: AllMessages) { - self.send_logging_error(ToOverseer::SubsystemMessage(msg)).await + self.send_and_log_error(ToOverseer::SubsystemMessage(msg)).await } async fn send_messages(&mut self, msgs: T) @@ -362,7 +362,7 @@ impl SubsystemContext for OverseerSubsystemContext { } impl OverseerSubsystemContext { - async fn send_logging_error(&mut self, msg: ToOverseer) { + async fn send_and_log_error(&mut self, msg: ToOverseer) { if self.tx.send(msg).await.is_err() { tracing::debug!( target: LOG_TARGET,