Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dp/chore/test_muxer
Browse files Browse the repository at this point in the history
* upstream/master:
  Rename all the network behaviours to more basic names (libp2p#726)
  Avoid some warnings. (libp2p#733)
  Add error messages for interrupt (libp2p#704)
  Remove relay, peerstore and datastore (libp2p#723)
  Don't add an address to the topology if it is already in (libp2p#724)
  Add a few more methods to Swarm and PollParameters (libp2p#721)
  Some changes to the main libp2p doc (libp2p#710)
  Don't wrap yamux::Connection in a mutex (libp2p#719)
  relay: Use `SliceRandom::shuffle`. (libp2p#722)
  Remove some boxed futures. (libp2p#718)
  Fix several errors reported by clippy. (libp2p#715)
  • Loading branch information
dvdplm committed Dec 6, 2018
2 parents 50808ec + 9102266 commit a6a76ba
Show file tree
Hide file tree
Showing 55 changed files with 659 additions and 3,974 deletions.
5 changes: 0 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ libp2p-mplex = { path = "./muxers/mplex" }
libp2p-identify = { path = "./protocols/identify" }
libp2p-kad = { path = "./protocols/kad" }
libp2p-floodsub = { path = "./protocols/floodsub" }
libp2p-peerstore = { path = "./stores/peerstore" }
libp2p-ping = { path = "./protocols/ping" }
libp2p-plaintext = { path = "./protocols/plaintext" }
libp2p-ratelimit = { path = "./transports/ratelimit" }
libp2p-relay = { path = "./transports/relay" }
libp2p-core = { path = "./core" }
libp2p-core-derive = { path = "./misc/core-derive" }
libp2p-secio = { path = "./protocols/secio", default-features = false }
Expand Down Expand Up @@ -68,11 +66,8 @@ members = [
"protocols/ping",
"protocols/plaintext",
"protocols/secio",
"stores/datastore",
"stores/peerstore",
"transports/dns",
"transports/ratelimit",
"transports/relay",
"transports/tcp",
"transports/timeout",
"transports/uds",
Expand Down
40 changes: 33 additions & 7 deletions core/src/nodes/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
};
use fnv::FnvHashMap;
use futures::prelude::*;
use std::{collections::hash_map::Entry, fmt, io, mem};
use std::{collections::hash_map::Entry, error, fmt, io, mem};

// TODO: make generic over PeerId

Expand Down Expand Up @@ -297,12 +297,12 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
/// Interrupts a reach attempt.
///
/// Returns `Ok` if something was interrupted, and `Err` if the ID is not or no longer valid.
pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<(), ()> {
pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<(), InterruptError> {
match self.tasks.entry(id.0) {
Entry::Vacant(_) => Err(()),
Entry::Vacant(_) => Err(InterruptError::ReachAttemptNotFound),
Entry::Occupied(entry) => {
match entry.get() {
TaskState::Connected(_) => return Err(()),
TaskState::Connected(_) => return Err(InterruptError::AlreadyReached),
TaskState::Pending => (),
};

Expand Down Expand Up @@ -439,6 +439,32 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
}
}

/// Reach attempt interrupt errors.
#[derive(Debug)]
pub enum InterruptError {
/// An invalid reach attempt has been used to try to interrupt. The task
/// entry is vacant; it needs to be added first via add_reach_attempt
/// (with the TaskState set to Pending) before we try to connect.
ReachAttemptNotFound,
/// The task has already connected to the node; interrupting a reach attempt
/// is thus redundant as it has already completed. Thus, the reach attempt
/// that has tried to be used is no longer valid, since already reached.
AlreadyReached,
}

impl fmt::Display for InterruptError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
InterruptError::ReachAttemptNotFound =>
write!(f, "The reach attempt could not be found."),
InterruptError::AlreadyReached =>
write!(f, "The reach attempt has already completed or reached the node."),
}
}
}

impl error::Error for InterruptError {}

/// Access to a peer in the collection.
pub struct PeerMut<'a, TInEvent: 'a> {
inner: HandledNodesTask<'a, TInEvent>,
Expand Down Expand Up @@ -788,9 +814,9 @@ mod tests {
let fut = future::empty::<_, Void>();
let reach_id = cs.add_reach_attempt(fut, Handler::default());
assert!(cs.interrupt(reach_id).is_ok());
assert!(cs.interrupt(reach_id).is_err());
assert_matches!(cs.interrupt(reach_id), Err(InterruptError::ReachAttemptNotFound))
}

#[test]
fn interrupting_an_established_connection_is_err() {
let cs = Arc::new(Mutex::new(TestCollectionStream::new()));
Expand Down Expand Up @@ -824,6 +850,6 @@ mod tests {

assert!(cs.lock().has_connection(&peer_id), "Connection was not established");

assert!(cs.lock().interrupt(reach_id).is_err(), "Could interrupt a reach attempt that already completed");
assert_matches!(cs.lock().interrupt(reach_id), Err(InterruptError::AlreadyReached));
}
}
120 changes: 58 additions & 62 deletions core/src/nodes/raw_swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,75 +634,71 @@ where
}

// Poll the existing nodes.
loop {
let (action, out_event);
match self.active_nodes.poll() {
Async::NotReady => break,
Async::Ready(CollectionEvent::NodeReached(reach_event)) => {
let (a, e) = handle_node_reached(&mut self.reach_attempts, reach_event);
action = a;
out_event = e;
}
Async::Ready(CollectionEvent::ReachError { id, error, handler }) => {
let (a, e) = handle_reach_error(&mut self.reach_attempts, id, error, handler);
action = a;
out_event = e;
}
Async::Ready(CollectionEvent::NodeError {
let (action, out_event);
match self.active_nodes.poll() {
Async::NotReady => return Async::NotReady,
Async::Ready(CollectionEvent::NodeReached(reach_event)) => {
let (a, e) = handle_node_reached(&mut self.reach_attempts, reach_event);
action = a;
out_event = e;
}
Async::Ready(CollectionEvent::ReachError { id, error, handler }) => {
let (a, e) = handle_reach_error(&mut self.reach_attempts, id, error, handler);
action = a;
out_event = e;
}
Async::Ready(CollectionEvent::NodeError {
peer_id,
error,
}) => {
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
.expect("We insert into connected_points whenever a connection is \
opened and remove only when a connection is closed; the \
underlying API is guaranteed to always deliver a connection \
closed message after it has been opened, and no two closed \
messages; QED");
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = RawSwarmEvent::NodeError {
peer_id,
endpoint,
error,
}) => {
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
.expect("We insert into connected_points whenever a connection is \
opened and remove only when a connection is closed; the \
underlying API is guaranteed to always deliver a connection \
closed message after it has been opened, and no two closed \
messages; QED");
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = RawSwarmEvent::NodeError {
peer_id,
endpoint,
error,
};
}
Async::Ready(CollectionEvent::NodeClosed { peer_id }) => {
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
.expect("We insert into connected_points whenever a connection is \
opened and remove only when a connection is closed; the \
underlying API is guaranteed to always deliver a connection \
closed message after it has been opened, and no two closed \
messages; QED");
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint };
}
Async::Ready(CollectionEvent::NodeEvent { peer_id, event }) => {
action = Default::default();
out_event = RawSwarmEvent::NodeEvent { peer_id, event };
}
};

if let Some((peer_id, handler, first, rest)) = action.start_dial_out {
self.start_dial_out(peer_id, handler, first, rest);
};
}

if let Some(interrupt) = action.interrupt {
// TODO: improve proof or remove; this is too complicated right now
self.active_nodes
.interrupt(interrupt)
.expect("interrupt is guaranteed to be gathered from `out_reach_attempts`;
we insert in out_reach_attempts only when we call \
active_nodes.add_reach_attempt, and we remove only when we call \
interrupt or when a reach attempt succeeds or errors; therefore the \
out_reach_attempts should always be in sync with the actual \
attempts; QED");
Async::Ready(CollectionEvent::NodeClosed { peer_id }) => {
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
.expect("We insert into connected_points whenever a connection is \
opened and remove only when a connection is closed; the \
underlying API is guaranteed to always deliver a connection \
closed message after it has been opened, and no two closed \
messages; QED");
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint };
}
Async::Ready(CollectionEvent::NodeEvent { peer_id, event }) => {
action = Default::default();
out_event = RawSwarmEvent::NodeEvent { peer_id, event };
}
}

if let Some((peer_id, handler, first, rest)) = action.start_dial_out {
self.start_dial_out(peer_id, handler, first, rest);
}

return Async::Ready(out_event);
if let Some(interrupt) = action.interrupt {
// TODO: improve proof or remove; this is too complicated right now
self.active_nodes
.interrupt(interrupt)
.expect("interrupt is guaranteed to be gathered from `out_reach_attempts`;
we insert in out_reach_attempts only when we call \
active_nodes.add_reach_attempt, and we remove only when we call \
interrupt or when a reach attempt succeeds or errors; therefore the \
out_reach_attempts should always be in sync with the actual \
attempts; QED");
}

Async::NotReady
Async::Ready(out_event)
}
}

Expand Down
55 changes: 52 additions & 3 deletions core/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
Transport, Multiaddr, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo,
Transport, Multiaddr, PublicKey, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo,
muxing::StreamMuxer,
nodes::{
handled_node::NodeHandler,
Expand Down Expand Up @@ -55,9 +55,18 @@ where TTransport: Transport,
/// if we're not connected to them.
topology: TTopology,

/// Public key of the local node.
local_public_key: PublicKey,

/// Peer ID of the local node.
local_peer_id: PeerId,

/// List of protocols that the behaviour says it supports.
supported_protocols: SmallVec<[Vec<u8>; 16]>,

/// List of multiaddresses we're listening on.
listened_addrs: SmallVec<[Multiaddr; 8]>,

/// List of multiaddresses we're listening on after NAT traversal.
external_addresses: SmallVec<[Multiaddr; 8]>,
}
Expand Down Expand Up @@ -112,7 +121,7 @@ where TBehaviour: NetworkBehaviour<TTopology>,
{
/// Builds a new `Swarm`.
#[inline]
pub fn new(transport: TTransport, mut behaviour: TBehaviour, topology: TTopology) -> Self {
pub fn new(transport: TTransport, mut behaviour: TBehaviour, topology: TTopology, local_public_key: PublicKey) -> Self {
let supported_protocols = behaviour
.new_handler()
.listen_protocol()
Expand All @@ -121,11 +130,17 @@ where TBehaviour: NetworkBehaviour<TTopology>,
.collect();

let raw_swarm = RawSwarm::new(transport);

let local_peer_id = local_public_key.clone().into_peer_id();

Swarm {
raw_swarm,
behaviour,
topology,
local_public_key,
local_peer_id,
supported_protocols,
listened_addrs: SmallVec::new(),
external_addresses: SmallVec::new(),
}
}
Expand All @@ -142,7 +157,11 @@ where TBehaviour: NetworkBehaviour<TTopology>,
/// On success, returns an alternative version of the address.
#[inline]
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
me.raw_swarm.listen_on(addr)
let result = me.raw_swarm.listen_on(addr);
if let Ok(ref addr) = result {
me.listened_addrs.push(addr.clone());
}
result
}

/// Tries to dial the given address.
Expand Down Expand Up @@ -173,6 +192,12 @@ where TBehaviour: NetworkBehaviour<TTopology>,
RawSwarm::listeners(&me.raw_swarm)
}

/// Returns the peer ID of the swarm passed as parameter.
#[inline]
pub fn local_peer_id(me: &Self) -> &PeerId {
&me.local_peer_id
}

/// Returns the topology of the swarm.
#[inline]
pub fn topology(me: &Self) -> &TTopology {
Expand Down Expand Up @@ -250,7 +275,10 @@ where TBehaviour: NetworkBehaviour<TTopology>,
let mut parameters = PollParameters {
topology: &mut self.topology,
supported_protocols: &self.supported_protocols,
listened_addrs: &self.listened_addrs,
external_addresses: &self.external_addresses,
local_public_key: &self.local_public_key,
local_peer_id: &self.local_peer_id,
};
self.behaviour.poll(&mut parameters)
};
Expand Down Expand Up @@ -326,7 +354,10 @@ pub trait NetworkBehaviour<TTopology> {
pub struct PollParameters<'a, TTopology: 'a> {
topology: &'a mut TTopology,
supported_protocols: &'a [Vec<u8>],
listened_addrs: &'a [Multiaddr],
external_addresses: &'a [Multiaddr],
local_public_key: &'a PublicKey,
local_peer_id: &'a PeerId,
}

impl<'a, TTopology> PollParameters<'a, TTopology> {
Expand All @@ -347,13 +378,31 @@ impl<'a, TTopology> PollParameters<'a, TTopology> {
self.supported_protocols.iter().map(AsRef::as_ref)
}

/// Returns the list of the addresses we're listening on
#[inline]
pub fn listened_addresses(&self) -> impl ExactSizeIterator<Item = &Multiaddr> {
self.listened_addrs.iter()
}

/// Returns the list of the addresses we're listening on, after accounting for NAT traversal.
///
/// This corresponds to the elements produced with `ReportObservedAddr`.
#[inline]
pub fn external_addresses(&self) -> impl ExactSizeIterator<Item = &Multiaddr> {
self.external_addresses.iter()
}

/// Returns the public key of the local node.
#[inline]
pub fn local_public_key(&self) -> &PublicKey {
self.local_public_key
}

/// Returns the peer id of the local node.
#[inline]
pub fn local_peer_id(&self) -> &PeerId {
self.local_peer_id
}
}

/// Action to perform.
Expand Down
5 changes: 4 additions & 1 deletion core/src/topology/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ impl MemoryTopology {
/// Adds an address to the topology.
#[inline]
pub fn add_address(&mut self, peer: PeerId, addr: Multiaddr) {
self.list.entry(peer).or_insert_with(|| Vec::new()).push(addr);
let addrs = self.list.entry(peer).or_insert_with(|| Vec::new());
if addrs.iter().all(|a| a != &addr) {
addrs.push(addr);
}
}

/// Returns a list of all the known peers in the topology.
Expand Down
Loading

0 comments on commit a6a76ba

Please sign in to comment.