Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RoutingTable V2: Distance Vector Routing #9187

Merged
merged 17 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions chain/network/src/network_protocol/borsh_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ impl From<&mem::PeerMessage> for net::PeerMessage {
net::PeerMessage::SyncRoutingTable(rtu.into())
}
mem::PeerMessage::RequestUpdateNonce(e) => net::PeerMessage::RequestUpdateNonce(e),
mem::PeerMessage::DistanceVector(_) => {
panic!("DistanceVector is not supported in Borsh encoding")
}
saketh-are marked this conversation as resolved.
Show resolved Hide resolved

// This message is not supported, we translate it to an empty RoutingTableUpdate.
mem::PeerMessage::SyncAccountsData(_) => {
Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/network_protocol/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Edge {

pub fn make_fake_edge(peer0: PeerId, peer1: PeerId, nonce: u64) -> Self {
Self(Arc::new(EdgeInner {
key: (peer0, peer1),
key: if peer0 < peer1 { (peer0, peer1) } else { (peer1, peer0) },
saketh-are marked this conversation as resolved.
Show resolved Hide resolved
nonce,
signature0: Signature::empty(KeyType::ED25519),
signature1: Signature::empty(KeyType::ED25519),
Expand Down
27 changes: 27 additions & 0 deletions chain/network/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,32 @@ impl RoutingTableUpdate {
Self { edges, accounts }
}
}

/// Denotes a network path to `destination` of length `distance`.
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct AdvertisedPeerDistance {
pub destination: PeerId,
pub distance: u32,
}

/// Struct shared by a peer listing the distances it has to other peers
/// in the NEAR network.
///
/// It includes a collection of signed edges forming a spanning tree
/// which verifiably achieves the advertised routing distances.
///
/// The distances in the tree may be the same or better than the advertised
/// distances; see routing::graph_v2::tests::inconsistent_peers.
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct DistanceVector {
saketh-are marked this conversation as resolved.
Show resolved Hide resolved
/// PeerId of the node sending the message.
pub root: PeerId,
/// List of distances the root has to other peers in the network.
pub distances: Vec<AdvertisedPeerDistance>,
/// Spanning tree of signed edges achieving the claimed distances (or better).
pub edges: Vec<Edge>,
}

/// Structure representing handshake between peers.
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct Handshake {
Expand Down Expand Up @@ -361,6 +387,7 @@ pub enum PeerMessage {
LastEdge(Edge),
/// Contains accounts and edge information.
SyncRoutingTable(RoutingTableUpdate),
DistanceVector(DistanceVector),
RequestUpdateNonce(PartialEdgeInfo),

SyncAccountsData(SyncAccountsData),
Expand Down
24 changes: 24 additions & 0 deletions chain/network/src/network_protocol/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,29 @@ message RoutingTableUpdate {
repeated AnnounceAccount accounts = 2;
}

// Denotes an available route to `destination` of length `distance`
message AdvertisedPeerDistance {
PublicKey destination = 1;
uint32 distance = 2;
}

/// Message shared by a peer listing the distances it has to other peers
/// in the NEAR network.
///
/// It includes a collection of signed edges forming a spanning tree
/// which verifiably achieves the advertised routing distances.
///
/// The distances in the tree may be the same or better than the advertised
/// distances; see routing::graph_v2::tests::inconsistent_peers.
message DistanceVector {
// PeerId of the node sending the message.
PublicKey root = 1;
// List of distances the root has to other peers in the network.
repeated AdvertisedPeerDistance distances = 2;
// Spanning tree of signed edges achieving the claimed distances (or better).
repeated Edge edges = 3;
}

// TODO: document it.
message UpdateNonceRequest {
PartialEdgeInfo partial_edge_info = 1;
Expand Down Expand Up @@ -417,6 +440,7 @@ message PeerMessage {
HandshakeFailure handshake_failure = 5;
LastEdge last_edge = 6;
RoutingTableUpdate sync_routing_table = 7;
DistanceVector distance_vector = 28;

UpdateNonceRequest update_nonce_request = 8;
UpdateNonceResponse update_nonce_response = 9;
Expand Down
73 changes: 72 additions & 1 deletion chain/network/src/network_protocol/proto_conv/peer_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use super::*;
use crate::network_protocol::proto;
use crate::network_protocol::proto::peer_message::Message_type as ProtoMT;
use crate::network_protocol::{
Disconnect, PeerMessage, PeersRequest, PeersResponse, RoutingTableUpdate, SyncAccountsData,
AdvertisedPeerDistance, Disconnect, DistanceVector, PeerMessage, PeersRequest, PeersResponse,
RoutingTableUpdate, SyncAccountsData,
};
use crate::network_protocol::{RoutedMessage, RoutedMessageV2};
use borsh::{BorshDeserialize as _, BorshSerialize as _};
Expand Down Expand Up @@ -45,6 +46,70 @@ impl TryFrom<&proto::RoutingTableUpdate> for RoutingTableUpdate {

//////////////////////////////////////////

#[derive(thiserror::Error, Debug)]
pub enum ParseAdvertisedPeerDistanceError {
#[error("destination {0}")]
Destination(ParseRequiredError<ParsePublicKeyError>),
}

impl From<&AdvertisedPeerDistance> for proto::AdvertisedPeerDistance {
fn from(x: &AdvertisedPeerDistance) -> Self {
Self {
destination: MF::some((&x.destination).into()),
distance: x.distance,
..Default::default()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure you want the default here? If anyone in the future adds another field the compiler will tell them about it if there is no default.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't fully understand this comment, but removing this line results in:

error[E0063]: missing field `special_fields` in initializer of `_proto::network::AdvertisedPeerDistance`
  --> chain/network/src/network_protocol/proto_conv/peer_message.rs:57:9
   |
57 |         Self { destination: MF::some((&x.destination).into()), distance: x.distance }
   |         ^^^^ missing `special_fields`

Are you suggesting something else?

}
}
}

impl TryFrom<&proto::AdvertisedPeerDistance> for AdvertisedPeerDistance {
type Error = ParseAdvertisedPeerDistanceError;
fn try_from(x: &proto::AdvertisedPeerDistance) -> Result<Self, Self::Error> {
Ok(Self {
destination: try_from_required(&x.destination).map_err(Self::Error::Destination)?,
distance: x.distance,
})
}
}

//////////////////////////////////////////

//////////////////////////////////////////

#[derive(thiserror::Error, Debug)]
pub enum ParseDistanceVectorError {
#[error("root {0}")]
Root(ParseRequiredError<ParsePublicKeyError>),
#[error("distances {0}")]
Distances(ParseVecError<ParseAdvertisedPeerDistanceError>),
#[error("edges {0}")]
Edges(ParseVecError<ParseEdgeError>),
}

impl From<&DistanceVector> for proto::DistanceVector {
fn from(x: &DistanceVector) -> Self {
Self {
root: MF::some((&x.root).into()),
distances: x.distances.iter().map(Into::into).collect(),
edges: x.edges.iter().map(Into::into).collect(),
..Default::default()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto
and again shame those are not autogenerated

}
}
}

impl TryFrom<&proto::DistanceVector> for DistanceVector {
type Error = ParseDistanceVectorError;
fn try_from(x: &proto::DistanceVector) -> Result<Self, Self::Error> {
Ok(Self {
root: try_from_required(&x.root).map_err(Self::Error::Root)?,
distances: try_from_slice(&x.distances).map_err(Self::Error::Distances)?,
edges: try_from_slice(&x.edges).map_err(Self::Error::Edges)?,
})
}
}

//////////////////////////////////////////

impl From<&BlockHeader> for proto::BlockHeader {
fn from(x: &BlockHeader) -> Self {
Self { borsh: x.try_to_vec().unwrap(), ..Default::default() }
Expand Down Expand Up @@ -93,6 +158,7 @@ impl From<&PeerMessage> for proto::PeerMessage {
..Default::default()
}),
PeerMessage::SyncRoutingTable(rtu) => ProtoMT::SyncRoutingTable(rtu.into()),
PeerMessage::DistanceVector(spt) => ProtoMT::DistanceVector(spt.into()),
PeerMessage::RequestUpdateNonce(pei) => {
ProtoMT::UpdateNonceRequest(proto::UpdateNonceRequest {
partial_edge_info: MF::some(pei.into()),
Expand Down Expand Up @@ -182,6 +248,8 @@ pub enum ParsePeerMessageError {
LastEdge(ParseRequiredError<ParseEdgeError>),
#[error("sync_routing_table: {0}")]
SyncRoutingTable(ParseRoutingTableUpdateError),
#[error("shortest_path_tree: {0}")]
DistanceVector(ParseDistanceVectorError),
#[error("update_nonce_requrest: {0}")]
UpdateNonceRequest(ParseRequiredError<ParsePartialEdgeInfoError>),
#[error("update_nonce_response: {0}")]
Expand Down Expand Up @@ -230,6 +298,9 @@ impl TryFrom<&proto::PeerMessage> for PeerMessage {
ProtoMT::SyncRoutingTable(rtu) => PeerMessage::SyncRoutingTable(
rtu.try_into().map_err(Self::Error::SyncRoutingTable)?,
),
ProtoMT::DistanceVector(spt) => {
PeerMessage::DistanceVector(spt.try_into().map_err(Self::Error::DistanceVector)?)
}
ProtoMT::UpdateNonceRequest(unr) => PeerMessage::RequestUpdateNonce(
try_from_required(&unr.partial_edge_info)
.map_err(Self::Error::UpdateNonceRequest)?,
Expand Down
42 changes: 38 additions & 4 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use crate::concurrency::atomic_cell::AtomicCell;
use crate::concurrency::demux;
use crate::config::PEERS_RESPONSE_MAX_PEERS;
use crate::network_protocol::{
Edge, EdgeState, Encoding, OwnedAccount, ParsePeerMessageError, PartialEdgeInfo,
PeerChainInfoV2, PeerIdOrHash, PeerInfo, PeersRequest, PeersResponse, RawRoutedMessage,
RoutedMessageBody, RoutingTableUpdate, StateResponseInfo, SyncAccountsData,
DistanceVector, Edge, EdgeState, Encoding, OwnedAccount, ParsePeerMessageError,
PartialEdgeInfo, PeerChainInfoV2, PeerIdOrHash, PeerInfo, PeersRequest, PeersResponse,
RawRoutedMessage, RoutedMessageBody, RoutingTableUpdate, StateResponseInfo, SyncAccountsData,
};
use crate::peer::stream;
use crate::peer::tracker::Tracker;
Expand All @@ -15,6 +15,7 @@ use crate::peer_manager::peer_manager_actor::Event;
use crate::peer_manager::peer_manager_actor::MAX_TIER2_PEERS;
use crate::private_actix::{RegisterPeerError, SendMessage};
use crate::routing::edge::verify_nonce;
use crate::routing::NetworkTopologyChange;
use crate::shards_manager::ShardsManagerRequestFromNetwork;
use crate::stats::metrics;
use crate::tcp;
Expand Down Expand Up @@ -1202,6 +1203,18 @@ impl PeerActor {
.push(Event::MessageProcessed(conn.tier, peer_msg));
}));
}
PeerMessage::DistanceVector(dv) => {
let clock = self.clock.clone();
let conn = conn.clone();
let network_state = self.network_state.clone();
ctx.spawn(wrap_future(async move {
Self::handle_distance_vector(&clock, &network_state, conn.clone(), dv).await;
network_state
.config
.event_sink
.push(Event::MessageProcessed(conn.tier, peer_msg));
}));
}
PeerMessage::SyncAccountsData(msg) => {
metrics::SYNC_ACCOUNTS_DATA
.with_label_values(&[
Expand Down Expand Up @@ -1303,7 +1316,7 @@ impl PeerActor {
return;
}

self.network_state.tier2_add_route_back(&self.clock, &conn, msg.as_ref());
self.network_state.add_route_back(&self.clock, &conn, msg.as_ref());
saketh-are marked this conversation as resolved.
Show resolved Hide resolved
if for_me {
// Handle Ping and Pong message if they are for us without sending to client.
// i.e. Return false in case of Ping and Pong
Expand Down Expand Up @@ -1378,6 +1391,27 @@ impl PeerActor {
Ok(accounts) => network_state.add_accounts(accounts).await,
}
}

async fn handle_distance_vector(
clock: &time::Clock,
network_state: &Arc<NetworkState>,
conn: Arc<connection::Connection>,
distance_vector: DistanceVector,
) {
let _span = tracing::trace_span!(target: "network", "handle_distance_vector").entered();

if conn.peer_info.id != distance_vector.root {
conn.stop(Some(ReasonForBan::InvalidDistanceVector));
return;
}

if let Err(ban_reason) = network_state
.update_routes(&clock, NetworkTopologyChange::PeerAdvertisedDistances(distance_vector))
.await
{
conn.stop(Some(ban_reason));
}
}
}

impl actix::Actor for PeerActor {
Expand Down
Loading