Skip to content

Commit

Permalink
[dag][rpc] return errors as enum types (#10562)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun authored Oct 24, 2023
1 parent e44a2b7 commit b870fb9
Show file tree
Hide file tree
Showing 15 changed files with 310 additions and 131 deletions.
8 changes: 4 additions & 4 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::{
rb_handler::NodeBroadcastHandler,
storage::DAGStorage,
types::DAGMessage,
ProofNotifier,
DAGRpcResult, ProofNotifier,
};
use crate::{
dag::{
Expand Down Expand Up @@ -55,7 +55,7 @@ pub struct DagBootstrapper {
signer: Arc<ValidatorSigner>,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage, DAGRpcResult>>,
dag_network_sender: Arc<dyn TDAGNetworkSender>,
proof_notifier: Arc<dyn ProofNotifier>,
time_service: aptos_time_service::TimeService,
Expand All @@ -72,7 +72,7 @@ impl DagBootstrapper {
signer: Arc<ValidatorSigner>,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage, DAGRpcResult>>,
dag_network_sender: Arc<dyn TDAGNetworkSender>,
proof_notifier: Arc<dyn ProofNotifier>,
time_service: aptos_time_service::TimeService,
Expand Down Expand Up @@ -351,7 +351,7 @@ pub(super) fn bootstrap_dag_for_test(
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage, DAGRpcResult>>,
dag_network_sender: Arc<dyn TDAGNetworkSender>,
proof_notifier: Arc<dyn ProofNotifier>,
time_service: aptos_time_service::TimeService,
Expand Down
14 changes: 4 additions & 10 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
adapter::TLedgerInfoProvider,
dag_fetcher::{FetchRequester, TFetchRequester},
dag_store::Dag,
errors::DagDriverError,
observability::{
counters,
logging::{LogEvent, LogSchema},
Expand All @@ -18,7 +19,7 @@ use crate::{
CertificateAckState, CertifiedAck, CertifiedNode, CertifiedNodeMessage, DAGMessage,
Extensions, Node, SignatureBuilder,
},
RpcHandler,
DAGRpcResult, RpcHandler,
},
payload_manager::PayloadManager,
state_replication::PayloadClient,
Expand All @@ -38,22 +39,15 @@ use futures::{
FutureExt,
};
use std::{sync::Arc, time::Duration};
use thiserror::Error as ThisError;
use tokio_retry::strategy::ExponentialBackoff;

#[derive(Debug, ThisError)]
pub enum DagDriverError {
#[error("missing parents")]
MissingParents,
}

pub(crate) struct DagDriver {
author: Author,
epoch_state: Arc<EpochState>,
dag: Arc<RwLock<Dag>>,
payload_manager: Arc<PayloadManager>,
payload_client: Arc<dyn PayloadClient>,
reliable_broadcast: Arc<ReliableBroadcast<DAGMessage, ExponentialBackoff>>,
reliable_broadcast: Arc<ReliableBroadcast<DAGMessage, ExponentialBackoff, DAGRpcResult>>,
time_service: TimeService,
rb_abort_handle: Option<(AbortHandle, u64)>,
storage: Arc<dyn DAGStorage>,
Expand All @@ -72,7 +66,7 @@ impl DagDriver {
dag: Arc<RwLock<Dag>>,
payload_manager: Arc<PayloadManager>,
payload_client: Arc<dyn PayloadClient>,
reliable_broadcast: Arc<ReliableBroadcast<DAGMessage, ExponentialBackoff>>,
reliable_broadcast: Arc<ReliableBroadcast<DAGMessage, ExponentialBackoff, DAGRpcResult>>,
time_service: TimeService,
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
Expand Down
85 changes: 54 additions & 31 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{dag_network::RpcWithFallback, types::NodeMetadata, RpcHandler};
use super::DAGRpcResult;
use crate::dag::{
dag_network::TDAGNetworkSender,
dag_network::{RpcWithFallback, TDAGNetworkSender},
dag_store::Dag,
errors::{DAGError, FetchRequestHandleError},
observability::logging::{LogEvent, LogSchema},
types::{CertifiedNode, FetchResponse, Node, RemoteFetchRequest},
types::{CertifiedNode, FetchResponse, Node, NodeMetadata, RemoteFetchRequest},
RpcHandler,
};
use anyhow::{anyhow, ensure};
use aptos_config::config::DagFetcherConfig;
use aptos_consensus_types::common::{Author, Round};
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_logger::{debug, error};
use aptos_logger::{debug, error, info};
use aptos_time_service::TimeService;
use aptos_types::epoch_state::EpochState;
use async_trait::async_trait;
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use std::{
collections::HashMap,
ops::Deref,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use thiserror::Error as ThisError;
use tokio::sync::{
mpsc::{Receiver, Sender},
oneshot,
Expand Down Expand Up @@ -278,41 +280,62 @@ impl TDagFetcher for DagFetcher {
);

while let Some(response) = rpc.next().await {
match response
.and_then(FetchResponse::try_from)
.and_then(|response| response.verify(&remote_request, &self.epoch_state.verifier))
{
Ok(response) => {
let certified_nodes = response.certified_nodes();
// TODO: support chunk response or fallback to state sync
{
let mut dag_writer = dag.write();
for node in certified_nodes.into_iter().rev() {
if let Err(e) = dag_writer.add_node(node) {
error!("Failed to add node {}", e);
match response {
Ok(DAGRpcResult(Ok(response))) => {
match FetchResponse::try_from(response).and_then(|response| {
response.verify(&remote_request, &self.epoch_state.verifier)
}) {
Ok(fetch_response) => {
let certified_nodes = fetch_response.certified_nodes();
// TODO: support chunk response or fallback to state sync
{
let mut dag_writer = dag.write();
for node in certified_nodes.into_iter().rev() {
if let Err(e) = dag_writer.add_node(node) {
error!("Failed to add node {}", e);
}
}
}
}
}

if dag.read().all_exists(remote_request.targets()) {
return Ok(());
if dag.read().all_exists(remote_request.targets()) {
return Ok(());
}
},
Err(err) => {
info!(error = ?err, "unable to parse fetch response");
},
};
},
Ok(DAGRpcResult(Err(dag_rpc_error))) => {
info!(error = ?dag_rpc_error, "responder returned error");
if let DAGError::FetchRequestHandleError(err) = dag_rpc_error.deref() {
match err {
FetchRequestHandleError::TargetsMissing => {
// TODO: add who the responder is
info!("responder is missing target nodes to serve")
},
FetchRequestHandleError::GarbageCollected(
requested_round,
lowest_round,
) => {
info!(
"fetch error. requested: {}, lowest_round: {}",
requested_round, lowest_round
)
},
}
}
},
Err(err) => error!("Fetch rpc failed {}", err),
Err(err) => {
// TODO: add who responder is
info!(error = ?err, "error issuing RPC");
},
}
}
Err(anyhow!("Fetch with fallback failed"))
}
}

#[derive(Debug, ThisError)]
pub enum FetchRequestHandleError {
#[error("target nodes are missing")]
TargetsMissing,
#[error("garbage collected, request round {0}, lowest round {1}")]
GarbageCollected(Round, Round),
}

pub struct FetchRequestHandler {
dag: Arc<RwLock<Dag>>,
author_to_index: HashMap<Author, usize>,
Expand Down
83 changes: 55 additions & 28 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
// Copyright © Aptos Foundation

use super::{
dag_driver::DagDriver,
dag_fetcher::{FetchRequestHandler, FetchWaiter},
dag_state_sync::{StateSyncStatus, StateSyncTrigger},
CertifiedNode, Node,
};
use crate::{
dag::{dag_network::RpcHandler, rb_handler::NodeBroadcastHandler, types::DAGMessage},
dag::{
dag_driver::DagDriver,
dag_fetcher::{FetchRequestHandler, FetchWaiter},
dag_network::RpcHandler,
dag_state_sync::{StateSyncStatus, StateSyncTrigger},
errors::{DAGError, DAGRpcError, DagDriverError, NodeBroadcastHandleError},
rb_handler::NodeBroadcastHandler,
types::{DAGMessage, DAGRpcResult},
CertifiedNode, Node,
},
network::{IncomingDAGRequest, TConsensusMsg},
};
use aptos_channels::aptos_channel;
use aptos_consensus_types::common::{Author, Round};
use aptos_logger::{debug, warn};
use aptos_logger::{debug, error, warn};
use aptos_network::protocols::network::RpcError;
use aptos_types::epoch_state::EpochState;
use bytes::Bytes;
Expand Down Expand Up @@ -104,37 +107,61 @@ impl NetworkHandler {
rpc_request: IncomingDAGRequest,
) -> anyhow::Result<StateSyncStatus> {
let dag_message: DAGMessage = rpc_request.req.try_into()?;
let epoch = dag_message.epoch();

debug!(
"processing rpc message {} from {}",
dag_message.name(),
rpc_request.sender
);

let response: anyhow::Result<DAGMessage> = {
let response: Result<DAGMessage, DAGError> = {
match dag_message.verify(rpc_request.sender, &self.epoch_state.verifier) {
Ok(_) => match dag_message {
DAGMessage::NodeMsg(node) => {
self.node_receiver.process(node).await.map(|r| r.into())
},
DAGMessage::NodeMsg(node) => self
.node_receiver
.process(node)
.await
.map(|r| r.into())
.map_err(|err| {
err.downcast::<NodeBroadcastHandleError>()
.map_or(DAGError::Unknown, |err| {
DAGError::NodeBroadcastHandleError(err)
})
}),
DAGMessage::CertifiedNodeMsg(certified_node_msg) => {
match self.state_sync_trigger.check(certified_node_msg).await? {
StateSyncStatus::Synced(Some(certified_node_msg)) => self
.dag_driver
.process(certified_node_msg.certified_node())
.await
.map(|r| r.into()),
.map(|r| r.into())
.map_err(|err| {
err.downcast::<DagDriverError>()
.map_or(DAGError::Unknown, |err| {
DAGError::DagDriverError(err)
})
}),
status @ (StateSyncStatus::NeedsSync(_)
| StateSyncStatus::EpochEnds) => return Ok(status),
_ => unreachable!(),
}
},
DAGMessage::FetchRequest(request) => {
self.fetch_receiver.process(request).await.map(|r| r.into())
},
DAGMessage::FetchRequest(request) => self
.fetch_receiver
.process(request)
.await
.map(|r| r.into())
.map_err(|err| {
err.downcast::<DagDriverError>()
.map_or(DAGError::Unknown, DAGError::DagDriverError)
}),
_ => unreachable!("verification must catch this error"),
},
Err(err) => Err(err),
Err(err) => {
error!(error = ?err, "error verifying message");
Err(DAGError::MessageVerificationError)
},
}
};

Expand All @@ -143,19 +170,19 @@ impl NetworkHandler {
response.as_ref().map(|r| r.name())
);

let response = response
.and_then(|response_msg| {
rpc_request
.protocol
.to_bytes(&response_msg.into_network_message())
.map(Bytes::from)
})
.map_err(RpcError::ApplicationError)?;
let response: DAGRpcResult = response.map_err(|e| DAGRpcError::new(epoch, e)).into();

let rpc_response = rpc_request
.protocol
.to_bytes(&response.into_network_message())
.map(Bytes::from)
.map_err(RpcError::Error);

rpc_request
.response_sender
.send(Ok(response))
.map_err(|_| anyhow::anyhow!("unable to respond to rpc"))
.map(|_| StateSyncStatus::Synced(None))
.send(rpc_response)
.map_err(|_| anyhow::anyhow!("unable to respond to rpc"))?;

Ok(StateSyncStatus::Synced(None))
}
}
12 changes: 6 additions & 6 deletions consensus/src/dag/dag_network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright © Aptos Foundation

use super::types::DAGMessage;
use super::{types::DAGMessage, DAGRpcResult};
use aptos_consensus_types::common::Author;
use aptos_reliable_broadcast::RBNetworkSender;
use aptos_time_service::{Interval, TimeService, TimeServiceTrait};
Expand All @@ -26,13 +26,13 @@ pub trait RpcHandler {
}

#[async_trait]
pub trait TDAGNetworkSender: Send + Sync + RBNetworkSender<DAGMessage> {
pub trait TDAGNetworkSender: Send + Sync + RBNetworkSender<DAGMessage, DAGRpcResult> {
async fn send_rpc(
&self,
receiver: Author,
message: DAGMessage,
timeout: Duration,
) -> anyhow::Result<DAGMessage>;
) -> anyhow::Result<DAGRpcResult>;

/// Given a list of potential responders, sending rpc to get response from any of them and could
/// fallback to more in case of failures.
Expand Down Expand Up @@ -81,7 +81,7 @@ pub struct RpcWithFallback {

terminated: bool,
futures: Pin<
Box<FuturesUnordered<Pin<Box<dyn Future<Output = anyhow::Result<DAGMessage>> + Send>>>>,
Box<FuturesUnordered<Pin<Box<dyn Future<Output = anyhow::Result<DAGRpcResult>> + Send>>>>,
>,
sender: Arc<dyn TDAGNetworkSender>,
interval: Pin<Box<Interval>>,
Expand Down Expand Up @@ -120,12 +120,12 @@ async fn send_rpc(
peer: Author,
message: DAGMessage,
timeout: Duration,
) -> anyhow::Result<DAGMessage> {
) -> anyhow::Result<DAGRpcResult> {
sender.send_rpc(peer, message, timeout).await
}

impl Stream for RpcWithFallback {
type Item = anyhow::Result<DAGMessage>;
type Item = anyhow::Result<DAGRpcResult>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.futures.is_empty() {
Expand Down
Loading

0 comments on commit b870fb9

Please sign in to comment.