Skip to content

Commit

Permalink
swarm/: Drive ExpandedSwarm via Stream trait only (#2100)
Browse files Browse the repository at this point in the history
Change `Stream` implementation of `ExpandedSwarm` to return all
`SwarmEvents` instead of only the `NetworkBehaviour`'s events.

Remove `ExpandedSwarm::next_event`. Users can use `<ExpandedSwarm as
StreamExt>::next` instead.

Remove `ExpandedSwarm::next`. Users can use `<ExpandedSwarm as
StreamExt>::filter_map` instead.
  • Loading branch information
elenaf9 authored Jun 14, 2021
1 parent d45606a commit e8fed53
Show file tree
Hide file tree
Showing 20 changed files with 341 additions and 332 deletions.
33 changes: 10 additions & 23 deletions examples/chat-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@
//! --features="floodsub mplex noise tcp-tokio mdns"
//! ```

use futures::StreamExt;
use libp2p::{
Multiaddr,
NetworkBehaviour,
PeerId,
Swarm,
Transport,
core::upgrade,
identity,
floodsub::{self, Floodsub, FloodsubEvent},
mdns::{Mdns, MdnsEvent},
mplex,
noise,
swarm::{NetworkBehaviourEventProcess, SwarmBuilder},
swarm::{NetworkBehaviourEventProcess, SwarmBuilder, SwarmEvent},
// `TokioTcpConfig` is available through the `tcp-tokio` feature.
tcp::TokioTcpConfig,
};
Expand Down Expand Up @@ -149,29 +149,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

// Kick it off
let mut listening = false;
loop {
let to_publish = {
tokio::select! {
line = stdin.next_line() => {
let line = line?.expect("stdin closed");
Some((floodsub_topic.clone(), line))
}
event = swarm.next() => {
// All events are handled by the `NetworkBehaviourEventProcess`es.
// I.e. the `swarm.next()` future drives the `Swarm` without ever
// terminating.
panic!("Unexpected event: {:?}", event);
}
tokio::select! {
line = stdin.next_line() => {
let line = line?.expect("stdin closed");
swarm.behaviour_mut().floodsub.publish(floodsub_topic.clone(), line.as_bytes());
}
};
if let Some((topic, line)) = to_publish {
swarm.behaviour_mut().floodsub.publish(topic, line.as_bytes());
}
if !listening {
for addr in Swarm::listeners(&swarm) {
println!("Listening on {:?}", addr);
listening = true;
event = swarm.select_next_some() => {
if let SwarmEvent::NewListenAddr(addr) = event {
println!("Listening on {:?}", addr);
}
}
}
}
Expand Down
19 changes: 7 additions & 12 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use libp2p::{
identity,
floodsub::{self, Floodsub, FloodsubEvent},
mdns::{Mdns, MdnsConfig, MdnsEvent},
swarm::NetworkBehaviourEventProcess
swarm::{NetworkBehaviourEventProcess, SwarmEvent}
};
use std::{error::Error, task::{Context, Poll}};

Expand Down Expand Up @@ -124,7 +124,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut swarm = {
let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?;
let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(local_peer_id.clone()),
floodsub: Floodsub::new(local_peer_id),
mdns,
ignored_member: false,
};
Expand All @@ -147,7 +147,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

// Kick it off
let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
match stdin.try_poll_next_unpin(cx)? {
Expand All @@ -160,17 +159,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => println!("{:?}", event),
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => {
if !listening {
for addr in Swarm::listeners(&swarm) {
println!("Listening on {:?}", addr);
listening = true;
}
Poll::Ready(Some(event)) => {
if let SwarmEvent::NewListenAddr(addr) = event {
println!("Listening on {:?}", addr);
}
break
}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
}
}
Poll::Pending
Expand Down
23 changes: 9 additions & 14 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use libp2p::{
development_transport,
identity,
mdns::{Mdns, MdnsConfig, MdnsEvent},
swarm::NetworkBehaviourEventProcess
swarm::{NetworkBehaviourEventProcess, SwarmEvent}
};
use std::{error::Error, task::{Context, Poll}};

Expand Down Expand Up @@ -150,8 +150,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a swarm to manage peers and events.
let mut swarm = {
// Create a Kademlia behaviour.
let store = MemoryStore::new(local_peer_id.clone());
let kademlia = Kademlia::new(local_peer_id.clone(), store);
let store = MemoryStore::new(local_peer_id);
let kademlia = Kademlia::new(local_peer_id, store);
let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?;
let behaviour = MyBehaviour { kademlia, mdns };
Swarm::new(transport, behaviour, local_peer_id)
Expand All @@ -164,7 +164,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

// Kick it off.
let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
match stdin.try_poll_next_unpin(cx)? {
Expand All @@ -175,25 +174,21 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => println!("{:?}", event),
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => {
if !listening {
if let Some(a) = Swarm::listeners(&swarm).next() {
println!("Listening on {:?}", a);
listening = true;
}
Poll::Ready(Some(event)) => {
if let SwarmEvent::NewListenAddr(addr) = event {
println!("Listening on {:?}", addr);
}
break
}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
}
}
Poll::Pending
}))
}

fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
let mut args = line.split(" ");
let mut args = line.split(' ');

match args.next() {
Some("GET") => {
Expand Down
19 changes: 7 additions & 12 deletions examples/gossipsub-chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use libp2p::gossipsub::MessageId;
use libp2p::gossipsub::{
GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity, ValidationMode,
};
use libp2p::{gossipsub, identity, PeerId};
use libp2p::{gossipsub, identity, swarm::SwarmEvent, PeerId};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::time::Duration;
Expand Down Expand Up @@ -136,7 +136,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut stdin = io::BufReader::new(io::stdin()).lines();

// Kick it off
let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
if let Err(e) = match stdin.try_poll_next_unpin(cx)? {
Expand All @@ -152,30 +151,26 @@ async fn main() -> Result<(), Box<dyn Error>> {

loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(gossip_event)) => match gossip_event {
GossipsubEvent::Message {
Poll::Ready(Some(event)) => match event {
SwarmEvent::Behaviour(GossipsubEvent::Message {
propagation_source: peer_id,
message_id: id,
message,
} => println!(
}) => println!(
"Got message: {} with id: {} from peer: {:?}",
String::from_utf8_lossy(&message.data),
id,
peer_id
),
SwarmEvent::NewListenAddr(addr) => {
println!("Listening on {:?}", addr);
}
_ => {}
},
Poll::Ready(None) | Poll::Pending => break,
}
}

if !listening {
for addr in libp2p::Swarm::listeners(&swarm) {
println!("Listening on {:?}", addr);
listening = true;
}
}

Poll::Pending
}))
}
13 changes: 7 additions & 6 deletions examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
//! peer ID will be generated randomly.

use async_std::task;
use futures::StreamExt;
use libp2p::{
Multiaddr,
Swarm,
swarm::{Swarm, SwarmEvent},
PeerId,
identity,
development_transport
Expand Down Expand Up @@ -64,8 +65,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a Kademlia behaviour.
let mut cfg = KademliaConfig::default();
cfg.set_query_timeout(Duration::from_secs(5 * 60));
let store = MemoryStore::new(local_peer_id.clone());
let mut behaviour = Kademlia::with_config(local_peer_id.clone(), store, cfg);
let store = MemoryStore::new(local_peer_id);
let mut behaviour = Kademlia::with_config(local_peer_id, store, cfg);

// Add the bootnodes to the local routing table. `libp2p-dns` built
// into the `transport` resolves the `dnsaddr` when Kademlia tries
Expand All @@ -91,11 +92,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Kick it off!
task::block_on(async move {
loop {
let event = swarm.next().await;
if let KademliaEvent::QueryResult {
let event = swarm.select_next_some().await;
if let SwarmEvent::Behaviour(KademliaEvent::QueryResult {
result: QueryResult::GetClosestPeers(result),
..
} = event {
}) = event {
match result {
Ok(ok) =>
if !ok.peers.is_empty() {
Expand Down
19 changes: 7 additions & 12 deletions examples/ipfs-private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use libp2p::{
noise,
ping::{self, Ping, PingConfig, PingEvent},
pnet::{PnetConfig, PreSharedKey},
swarm::NetworkBehaviourEventProcess,
swarm::{NetworkBehaviourEventProcess, SwarmEvent},
tcp::TcpConfig,
yamux::YamuxConfig,
Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport,
Expand Down Expand Up @@ -254,7 +254,7 @@ fn main() -> Result<(), Box<dyn Error>> {

println!("Subscribing to {:?}", gossipsub_topic);
behaviour.gossipsub.subscribe(&gossipsub_topic).unwrap();
Swarm::new(transport, behaviour, local_peer_id.clone())
Swarm::new(transport, behaviour, local_peer_id)
};

// Reach out to other nodes if specified
Expand All @@ -271,7 +271,6 @@ fn main() -> Result<(), Box<dyn Error>> {
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

// Kick it off
let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
if let Err(e) = match stdin.try_poll_next_unpin(cx)? {
Expand All @@ -287,17 +286,13 @@ fn main() -> Result<(), Box<dyn Error>> {
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => println!("{:?}", event),
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => {
if !listening {
for addr in Swarm::listeners(&swarm) {
println!("Address {}/ipfs/{}", addr, local_peer_id);
listening = true;
}
Poll::Ready(Some(event)) => {
if let SwarmEvent::NewListenAddr(addr) = event {
println!("Listening on {:?}", addr);
}
break;
}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
}
}
Poll::Pending
Expand Down
15 changes: 11 additions & 4 deletions examples/mdns-passive-discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use libp2p::{identity, mdns::{Mdns, MdnsConfig, MdnsEvent}, PeerId, Swarm};
use futures::StreamExt;
use libp2p::{
identity,
mdns::{Mdns, MdnsConfig, MdnsEvent},
swarm::{Swarm, SwarmEvent},
PeerId
};
use std::error::Error;

#[async_std::main]
Expand All @@ -43,17 +49,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

loop {
match swarm.next().await {
MdnsEvent::Discovered(peers) => {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(MdnsEvent::Discovered(peers)) => {
for (peer, addr) in peers {
println!("discovered {} {}", peer, addr);
}
}
MdnsEvent::Expired(expired) => {
SwarmEvent::Behaviour(MdnsEvent::Expired(expired)) => {
for (peer, addr) in expired {
println!("expired {} {}", peer, addr);
}
}
_ => {}
}
}
}
19 changes: 7 additions & 12 deletions examples/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
use futures::executor::block_on;
use futures::prelude::*;
use libp2p::ping::{Ping, PingConfig};
use libp2p::swarm::Swarm;
use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::{identity, PeerId};
use std::error::Error;
use std::task::Poll;
Expand Down Expand Up @@ -76,20 +76,15 @@ fn main() -> Result<(), Box<dyn Error>> {
println!("Dialed {}", addr)
}

let mut listening = false;
block_on(future::poll_fn(move |cx| loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => println!("{:?}", event),
Poll::Ready(Some(event)) => match event {
SwarmEvent::NewListenAddr(addr) => println!("Listening on {:?}", addr),
SwarmEvent::Behaviour(event) => println!("{:?}", event),
_ => {}
},
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => {
if !listening {
for addr in Swarm::listeners(&swarm) {
println!("Listening on {}", addr);
listening = true;
}
}
return Poll::Pending;
}
Poll::Pending => return Poll::Pending,
}
}));

Expand Down
Loading

0 comments on commit e8fed53

Please sign in to comment.