diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b7b26694ac5..ae2e4c79564 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,6 +41,9 @@ jobs: wasm32-unknown-emscripten, wasm32-wasi ] + include: + - toolchain: wasm32-unknown-unknown + args: "--features wasm-bindgen" container: image: rust env: @@ -76,7 +79,7 @@ jobs: - name: Build on ${{ matrix.toolchain }} # TODO: also run `cargo test` # TODO: ideally we would build `--workspace`, but not all crates compile for WASM - run: cargo build --target=${{ matrix.toolchain }} + run: cargo build --target=${{ matrix.toolchain }} ${{ matrix.args }} check-rustdoc-links: name: Check rustdoc intra-doc links diff --git a/Cargo.toml b/Cargo.toml index 4ab8a15dc8f..37ffa96d6a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,7 @@ rendezvous = ["libp2p-rendezvous"] tcp-async-io = ["libp2p-tcp", "libp2p-tcp/async-io"] tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"] uds = ["libp2p-uds"] -wasm-bindgen = ["parking_lot/wasm-bindgen"] +wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "parking_lot/wasm-bindgen", "getrandom/js", "rand/wasm-bindgen"] wasm-ext = ["libp2p-wasm-ext"] wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext/websocket"] websocket = ["libp2p-websocket"] @@ -67,6 +67,9 @@ all-features = true atomic = "0.5.0" bytes = "1" futures = "0.3.1" +futures-timer = "3.0.2" # Explicit dependency to be used in `wasm-bindgen` feature +getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` feature +instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature lazy_static = "1.2" libp2p-core = { version = "0.30.0-rc.1", path = "core", default-features = false } libp2p-floodsub = { version = "0.31.0-rc.1", path = "protocols/floodsub", optional = true } @@ -88,10 +91,10 @@ libp2p-uds = { version = "0.30.0-rc.1", path = "transports/uds", optional = true libp2p-wasm-ext = { version = "0.30.0-rc.1", path = "transports/wasm-ext", default-features = false, optional = true } libp2p-yamux = { version = "0.34.0-rc.1", path = "muxers/yamux", optional = true } multiaddr = { version = "0.13.0-rc.1" } -parking_lot = "0.11.0" +parking_lot = "0.11.2" # Explicit dependency to be used in `wasm-bindgen` feature pin-project = "1.0.0" +rand = "0.7.3" # Explicit dependency to be used in `wasm-bindgen` feature smallvec = "1.6.1" -wasm-timer = "0.2.4" [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] libp2p-deflate = { version = "0.30.0-rc.1", path = "transports/deflate", optional = true } diff --git a/core/Cargo.toml b/core/Cargo.toml index 18a0a90b17e..e5579f5e35f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -17,6 +17,7 @@ either = "1.5" fnv = "1.0" futures = { version = "0.3.1", features = ["executor", "thread-pool"] } futures-timer = "3" +instant = "0.1.11" lazy_static = "1.2" libsecp256k1 = { version = "0.7.0", optional = true } log = "0.4" @@ -48,7 +49,6 @@ libp2p-tcp = { path = "../transports/tcp" } multihash = { version = "0.14", default-features = false, features = ["arb"] } quickcheck = "0.9.0" rand07 = { package = "rand", version = "0.7" } -wasm-timer = "0.2" [build-dependencies] prost-build = "0.9" diff --git a/core/src/peer_record.rs b/core/src/peer_record.rs index 6b7759213c9..18b62d2335d 100644 --- a/core/src/peer_record.rs +++ b/core/src/peer_record.rs @@ -2,9 +2,9 @@ use crate::identity::error::SigningError; use crate::identity::Keypair; use crate::signed_envelope::SignedEnvelope; use crate::{peer_record_proto, signed_envelope, Multiaddr, PeerId}; +use instant::SystemTime; use std::convert::TryInto; use std::fmt; -use std::time::SystemTime; const PAYLOAD_TYPE: &str = "/libp2p/routing-state-record"; const DOMAIN_SEP: &str = "libp2p-routing-state"; diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 825c174b9f3..7d5ee7b648a 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -18,7 +18,6 @@ fnv = "1.0.7" futures = "0.3.5" rand = "0.7.3" asynchronous-codec = "0.6" -wasm-timer = "0.2.4" unsigned-varint = { version = "0.7.0", features = ["asynchronous_codec"] } log = "0.4.11" sha2 = "0.9.1" @@ -27,6 +26,9 @@ smallvec = "1.6.1" prost = "0.9" hex_fmt = "0.3.0" regex = "1.4.0" +futures-timer = "3.0.2" +pin-project = "1.0.8" +instant = "0.1.11" [dev-dependencies] async-std = "1.6.3" diff --git a/protocols/gossipsub/src/backoff.rs b/protocols/gossipsub/src/backoff.rs index c10814d289e..2fe0eb7fd56 100644 --- a/protocols/gossipsub/src/backoff.rs +++ b/protocols/gossipsub/src/backoff.rs @@ -20,13 +20,13 @@ //! Data structure for efficiently storing known back-off's when pruning peers. use crate::topic::TopicHash; +use instant::Instant; use libp2p_core::PeerId; use std::collections::{ hash_map::{Entry, HashMap}, HashSet, }; use std::time::Duration; -use wasm_timer::Instant; #[derive(Copy, Clone)] struct HeartbeatIndex(usize); diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 9c2feb77765..d9682791f20 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -34,8 +34,8 @@ use futures::StreamExt; use log::{debug, error, trace, warn}; use prost::Message; use rand::{seq::SliceRandom, thread_rng}; -use wasm_timer::{Instant, Interval}; +use instant::Instant; use libp2p_core::{ connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId, @@ -45,7 +45,6 @@ use libp2p_swarm::{ NotifyHandler, PollParameters, }; -use crate::backoff::BackoffStorage; use crate::config::{GossipsubConfig, ValidationMode}; use crate::error::{PublishError, SubscriptionError, ValidationError}; use crate::gossip_promises::GossipPromises; @@ -62,6 +61,7 @@ use crate::types::{ GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage, }; use crate::types::{GossipsubRpc, PeerConnections, PeerKind}; +use crate::{backoff::BackoffStorage, interval::Interval}; use crate::{rpc_proto, TopicScoreParams}; use std::{cmp::Ordering::Equal, fmt::Debug}; @@ -406,8 +406,8 @@ where config.backoff_slack(), ), mcache: MessageCache::new(config.history_gossip(), config.history_length()), - heartbeat: Interval::new_at( - Instant::now() + config.heartbeat_initial_delay(), + heartbeat: Interval::new_initial( + config.heartbeat_initial_delay(), config.heartbeat_interval(), ), heartbeat_ticks: 0, diff --git a/protocols/gossipsub/src/gossip_promises.rs b/protocols/gossipsub/src/gossip_promises.rs index 49b1dde6d66..2904b152ee7 100644 --- a/protocols/gossipsub/src/gossip_promises.rs +++ b/protocols/gossipsub/src/gossip_promises.rs @@ -21,12 +21,12 @@ use crate::error::ValidationError; use crate::peer_score::RejectReason; use crate::MessageId; +use instant::Instant; use libp2p_core::PeerId; use log::debug; use rand::seq::SliceRandom; use rand::thread_rng; use std::collections::HashMap; -use wasm_timer::Instant; /// Tracks recently sent `IWANT` messages and checks if peers respond to them /// for each `IWANT` message we track one random requested message id. diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 4f0810d2934..592c5b409c4 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -25,6 +25,7 @@ use crate::types::{GossipsubRpc, PeerKind, RawGossipsubMessage}; use asynchronous_codec::Framed; use futures::prelude::*; use futures::StreamExt; +use instant::Instant; use libp2p_core::upgrade::{InboundUpgrade, NegotiationError, OutboundUpgrade, UpgradeError}; use libp2p_swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, @@ -39,7 +40,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; -use wasm_timer::Instant; /// The initial time (in seconds) we set the keep alive for protocol negotiations to occur. const INITIAL_KEEP_ALIVE: u64 = 30; diff --git a/protocols/gossipsub/src/interval.rs b/protocols/gossipsub/src/interval.rs new file mode 100644 index 00000000000..3080f541a55 --- /dev/null +++ b/protocols/gossipsub/src/interval.rs @@ -0,0 +1,209 @@ +// Copyright 2021 Oliver Wangler +// Copyright 2019 Pierre Krieger +// Copyright (c) 2019 Tokio Contributors +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS +// OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// +// Initial version copied from +// https://github.com/tomaka/wasm-timer/blob/8964804eff980dd3eb115b711c57e481ba541708/src/timer/interval.rs +// and adapted. +use std::{ + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use futures::prelude::*; +use futures_timer::Delay; +use instant::Instant; +use pin_project::pin_project; + +/// A stream representing notifications at fixed interval +/// +/// Intervals are created through the `Interval::new` or +/// `Interval::new_intial` methods indicating when a first notification +/// should be triggered and when it will be repeated. +/// +/// Note that intervals are not intended for high resolution timers, but rather +/// they will likely fire some granularity after the exact instant that they're +/// otherwise indicated to fire at. +#[pin_project] +#[derive(Debug)] +pub struct Interval { + #[pin] + delay: Delay, + interval: Duration, + fires_at: Instant, +} + +impl Interval { + /// Creates a new interval which will fire at `dur` time into the future, + /// and will repeat every `dur` interval after + /// + /// The returned object will be bound to the default timer for this thread. + /// The default timer will be spun up in a helper thread on first use. + pub fn new(dur: Duration) -> Interval { + Interval::new_initial(dur, dur) + } + + /// Creates a new interval which will fire the first time after the specified `initial_delay`, + /// and then will repeat every `dur` interval after. + /// + /// The returned object will be bound to the default timer for this thread. + /// The default timer will be spun up in a helper thread on first use. + pub fn new_initial(initial_delay: Duration, dur: Duration) -> Interval { + let fires_at = Instant::now() + initial_delay; + Interval { + delay: Delay::new(initial_delay), + interval: dur, + fires_at, + } + } +} + +impl Stream for Interval { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.as_mut().project().delay.poll(cx).is_pending() { + return Poll::Pending; + } + let next = next_interval(self.fires_at, Instant::now(), self.interval); + self.delay.reset(next); + self.fires_at += next; + Poll::Ready(Some(())) + } +} + +/// Converts Duration object to raw nanoseconds if possible +/// +/// This is useful to divide intervals. +/// +/// While technically for large duration it's impossible to represent any +/// duration as nanoseconds, the largest duration we can represent is about +/// 427_000 years. Large enough for any interval we would use or calculate in +/// tokio. +fn duration_to_nanos(dur: Duration) -> Option { + let v = dur.as_secs().checked_mul(1_000_000_000)?; + v.checked_add(dur.subsec_nanos() as u64) +} + +fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Duration { + let new = prev + interval; + if new > now { + interval + } else { + let spent_ns = + duration_to_nanos(now.duration_since(prev)).expect("interval should be expired"); + let interval_ns = + duration_to_nanos(interval).expect("interval is less that 427 thousand years"); + let mult = spent_ns / interval_ns + 1; + assert!( + mult < (1 << 32), + "can't skip more than 4 billion intervals of {:?} \ + (trying to skip {})", + interval, + mult + ); + interval * mult as u32 + } +} + +#[cfg(test)] +mod test { + use super::next_interval; + use std::time::{Duration, Instant}; + + struct Timeline(Instant); + + impl Timeline { + fn new() -> Timeline { + Timeline(Instant::now()) + } + fn at(&self, millis: u64) -> Instant { + self.0 + Duration::from_millis(millis) + } + fn at_ns(&self, sec: u64, nanos: u32) -> Instant { + self.0 + Duration::new(sec, nanos) + } + } + + fn dur(millis: u64) -> Duration { + Duration::from_millis(millis) + } + + // The math around Instant/Duration isn't 100% precise due to rounding + // errors + fn almost_eq(a: Instant, b: Instant) -> bool { + let diff = match a.cmp(&b) { + std::cmp::Ordering::Less => b - a, + std::cmp::Ordering::Equal => return true, + std::cmp::Ordering::Greater => a - b, + }; + + diff < Duration::from_millis(1) + } + + #[test] + fn norm_next() { + let tm = Timeline::new(); + assert!(almost_eq( + tm.at(1) + next_interval(tm.at(1), tm.at(2), dur(10)), + tm.at(11) + )); + assert!(almost_eq( + tm.at(7777) + next_interval(tm.at(7777), tm.at(7788), dur(100)), + tm.at(7877) + )); + assert!(almost_eq( + tm.at(1) + next_interval(tm.at(1), tm.at(1000), dur(2100)), + tm.at(2101) + )); + } + + #[test] + fn fast_forward() { + let tm = Timeline::new(); + + assert!(almost_eq( + tm.at(1) + next_interval(tm.at(1), tm.at(1000), dur(10)), + tm.at(1001) + )); + assert!(almost_eq( + tm.at(7777) + next_interval(tm.at(7777), tm.at(8888), dur(100)), + tm.at(8977) + )); + assert!(almost_eq( + tm.at(1) + next_interval(tm.at(1), tm.at(10000), dur(2100)), + tm.at(10501) + )); + } + + /// TODO: this test actually should be successful, but since we can't + /// multiply Duration on anything larger than u32 easily we decided + /// to allow it to fail for now + #[test] + #[should_panic(expected = "can't skip more than 4 billion intervals")] + fn large_skip() { + let tm = Timeline::new(); + assert_eq!( + tm.0 + next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)), + tm.at_ns(25, 1) + ); + } +} diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index ddba0f69a1e..5a426d51df8 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -130,6 +130,7 @@ mod behaviour; mod config; mod gossip_promises; mod handler; +mod interval; mod mcache; mod peer_score; pub mod subscription_filter; diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index 9e78ff69d4b..09ba48baf62 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -23,11 +23,12 @@ use crate::time_cache::TimeCache; use crate::{MessageId, TopicHash}; +use instant::Instant; use libp2p_core::PeerId; use log::{debug, trace, warn}; use std::collections::{hash_map, HashMap, HashSet}; use std::net::IpAddr; -use std::time::{Duration, Instant}; +use std::time::Duration; mod params; use crate::error::ValidationError; diff --git a/protocols/gossipsub/src/time_cache.rs b/protocols/gossipsub/src/time_cache.rs index 90300e77ea3..768f2fc7cc3 100644 --- a/protocols/gossipsub/src/time_cache.rs +++ b/protocols/gossipsub/src/time_cache.rs @@ -21,12 +21,13 @@ //! This implements a time-based LRU cache for checking gossipsub message duplicates. use fnv::FnvHashMap; +use instant::Instant; use std::collections::hash_map::{ self, Entry::{Occupied, Vacant}, }; use std::collections::VecDeque; -use std::time::{Duration, Instant}; +use std::time::Duration; struct ExpiringElement { /// The element that expires diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 7afcb590078..a64e0024bb6 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -11,13 +11,13 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" +futures-timer = "3.0.2" libp2p-core = { version = "0.30.0-rc.1", path = "../../core", default-features = false } libp2p-swarm = { version = "0.31.0-rc.1", path = "../../swarm" } log = "0.4.1" lru = "0.6" prost = "0.9" smallvec = "1.6.1" -wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index f0d05f79dc9..4ceb8d25e3d 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -22,6 +22,7 @@ use crate::protocol::{ IdentifyInfo, IdentifyProtocol, IdentifyPushProtocol, InboundPush, OutboundPush, ReplySubstream, }; use futures::prelude::*; +use futures_timer::Delay; use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::upgrade::{ EitherUpgrade, InboundUpgrade, OutboundUpgrade, SelectUpgrade, UpgradeError, @@ -32,7 +33,6 @@ use libp2p_swarm::{ }; use smallvec::SmallVec; use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; -use wasm_timer::Delay; /// Protocol handler for sending and receiving identification requests. /// @@ -189,14 +189,13 @@ impl ProtocolsHandler for IdentifyHandler { // Poll the future that fires when we need to identify the node again. match Future::poll(Pin::new(&mut self.next_id), cx) { Poll::Pending => Poll::Pending, - Poll::Ready(Ok(())) => { + Poll::Ready(()) => { self.next_id.reset(self.interval); let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(EitherUpgrade::A(IdentifyProtocol), ()), }; Poll::Ready(ev) } - Poll::Ready(Err(err)) => Poll::Ready(ProtocolsHandlerEvent::Close(err)), } } } diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 353943a2a24..41a21216511 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -23,10 +23,11 @@ prost = "0.9" rand = "0.7.2" sha2 = "0.9.1" smallvec = "1.6.1" -wasm-timer = "0.2" uint = "0.9" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } void = "1.0" +futures-timer = "3.0.2" +instant = "0.1.11" [dev-dependencies] env_logger = "0.9.0" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 65a60b12c76..480fdf3f10a 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -38,6 +38,7 @@ use crate::record::{ }; use crate::K_VALUE; use fnv::{FnvHashMap, FnvHashSet}; +use instant::Instant; use libp2p_core::{ connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, @@ -54,7 +55,6 @@ use std::num::NonZeroUsize; use std::task::{Context, Poll}; use std::vec; use std::{borrow::Cow, time::Duration}; -use wasm_timer::Instant; pub use crate::query::QueryStats; diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index b92c89bfee5..2b3e92ff0ee 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -24,6 +24,7 @@ use crate::protocol::{ }; use crate::record::{self, Record}; use futures::prelude::*; +use instant::Instant; use libp2p_core::{ either::EitherOutput, upgrade::{self, InboundUpgrade, OutboundUpgrade}, @@ -37,7 +38,6 @@ use log::trace; use std::{ error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration, }; -use wasm_timer::Instant; /// A prototype from which [`KademliaHandler`]s can be constructed. pub struct KademliaHandlerProto { diff --git a/protocols/kad/src/jobs.rs b/protocols/kad/src/jobs.rs index 402a797a52d..7ce201a4150 100644 --- a/protocols/kad/src/jobs.rs +++ b/protocols/kad/src/jobs.rs @@ -63,13 +63,14 @@ use crate::record::{self, store::RecordStore, ProviderRecord, Record}; use futures::prelude::*; +use futures_timer::Delay; +use instant::Instant; use libp2p_core::PeerId; use std::collections::HashSet; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use std::vec; -use wasm_timer::{Delay, Instant}; /// The maximum number of queries towards which background jobs /// are allowed to start new queries on an invocation of @@ -101,7 +102,7 @@ impl PeriodicJob { if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state { let new_deadline = Instant::now() - Duration::from_secs(1); *deadline = new_deadline; - delay.reset_at(new_deadline); + delay.reset(Duration::from_secs(1)); } } @@ -148,7 +149,7 @@ impl PutRecordJob { ) -> Self { let now = Instant::now(); let deadline = now + replicate_interval; - let delay = Delay::new_at(deadline); + let delay = Delay::new(replicate_interval); let next_publish = publish_interval.map(|i| now + i); Self { local_id, @@ -236,7 +237,7 @@ impl PutRecordJob { // Wait for the next run. let deadline = now + self.inner.interval; - let delay = Delay::new_at(deadline); + let delay = Delay::new(self.inner.interval); self.inner.state = PeriodicJobState::Waiting(delay, deadline); assert!(!self.inner.is_ready(cx, now)); } @@ -262,7 +263,7 @@ impl AddProviderJob { interval, state: { let deadline = now + interval; - PeriodicJobState::Waiting(Delay::new_at(deadline), deadline) + PeriodicJobState::Waiting(Delay::new(interval), deadline) }, }, } @@ -314,7 +315,7 @@ impl AddProviderJob { } let deadline = now + self.inner.interval; - let delay = Delay::new_at(deadline); + let delay = Delay::new(self.inner.interval); self.inner.state = PeriodicJobState::Waiting(delay, deadline); assert!(!self.inner.is_ready(cx, now)); } diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 0f883649b05..1058acb00e9 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -32,13 +32,13 @@ use asynchronous_codec::Framed; use bytes::BytesMut; use codec::UviBytes; use futures::prelude::*; +use instant::Instant; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::{Multiaddr, PeerId}; use prost::Message; use std::{borrow::Cow, convert::TryFrom, time::Duration}; use std::{io, iter}; use unsigned_varint::codec; -use wasm_timer::Instant; /// The protocol name used for negotiating with multistream-select. pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0"; diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 6fcf90df79f..708c758464f 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -30,9 +30,9 @@ use crate::kbucket::{Key, KeyBytes}; use crate::{ALPHA_VALUE, K_VALUE}; use either::Either; use fnv::FnvHashMap; +use instant::Instant; use libp2p_core::PeerId; use std::{num::NonZeroUsize, time::Duration}; -use wasm_timer::Instant; /// A `QueryPool` provides an aggregate state machine for driving `Query`s to completion. /// diff --git a/protocols/kad/src/query/peers/closest.rs b/protocols/kad/src/query/peers/closest.rs index 684c109b934..2b3cb124274 100644 --- a/protocols/kad/src/query/peers/closest.rs +++ b/protocols/kad/src/query/peers/closest.rs @@ -22,10 +22,10 @@ use super::*; use crate::kbucket::{Distance, Key, KeyBytes}; use crate::{ALPHA_VALUE, K_VALUE}; +use instant::Instant; use libp2p_core::PeerId; use std::collections::btree_map::{BTreeMap, Entry}; use std::{iter::FromIterator, num::NonZeroUsize, time::Duration}; -use wasm_timer::Instant; pub mod disjoint; diff --git a/protocols/kad/src/query/peers/closest/disjoint.rs b/protocols/kad/src/query/peers/closest/disjoint.rs index 01506ff6f7b..af91b8c1f0b 100644 --- a/protocols/kad/src/query/peers/closest/disjoint.rs +++ b/protocols/kad/src/query/peers/closest/disjoint.rs @@ -20,13 +20,13 @@ use super::*; use crate::kbucket::{Key, KeyBytes}; +use instant::Instant; use libp2p_core::PeerId; use std::{ collections::HashMap, iter::{Cycle, Map, Peekable}, ops::{Index, IndexMut, Range}, }; -use wasm_timer::Instant; /// Wraps around a set of [`ClosestPeersIter`], enforcing a disjoint discovery /// path per configured parallelism according to the S/Kademlia paper. diff --git a/protocols/kad/src/record.rs b/protocols/kad/src/record.rs index 8f1c585d1b8..e8e05670c79 100644 --- a/protocols/kad/src/record.rs +++ b/protocols/kad/src/record.rs @@ -23,10 +23,10 @@ pub mod store; use bytes::Bytes; +use instant::Instant; use libp2p_core::{multihash::Multihash, Multiaddr, PeerId}; use std::borrow::Borrow; use std::hash::{Hash, Hasher}; -use wasm_timer::Instant; /// The (opaque) key of a record. #[derive(Clone, Debug, PartialEq, Eq, Hash)] diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index 93b0bdf9c39..ded0e12e0b0 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -11,12 +11,13 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" +futures-timer = "3.0.2" +instant = "0.1.11" libp2p-core = { version = "0.30.0-rc.1", path = "../../core", default-features = false } libp2p-swarm = { version = "0.31.0-rc.1", path = "../../swarm" } log = "0.4.1" rand = "0.7.2" void = "1.0" -wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 435a5048485..d10538562de 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -21,6 +21,7 @@ use crate::protocol; use futures::future::BoxFuture; use futures::prelude::*; +use futures_timer::Delay; use libp2p_core::{upgrade::NegotiationError, UpgradeError}; use libp2p_swarm::{ KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent, @@ -35,7 +36,6 @@ use std::{ time::Duration, }; use void::Void; -use wasm_timer::Delay; /// The configuration for outbound pings. #[derive(Clone, Debug)] @@ -349,15 +349,10 @@ impl ProtocolsHandler for Handler { self.outbound = Some(PingState::Idle(stream)); break; } - Poll::Ready(Ok(())) => { + Poll::Ready(()) => { self.timer.reset(self.config.timeout); self.outbound = Some(PingState::Ping(protocol::send_ping(stream).boxed())); } - Poll::Ready(Err(e)) => { - return Poll::Ready(ProtocolsHandlerEvent::Close(Failure::Other { - error: Box::new(e), - })) - } }, Some(PingState::OpenStream) => { self.outbound = Some(PingState::OpenStream); diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index a3138568777..703a9275d87 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -19,12 +19,12 @@ // DEALINGS IN THE SOFTWARE. use futures::prelude::*; +use instant::Instant; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_swarm::NegotiatedSubstream; use rand::{distributions, prelude::*}; use std::{io, iter, time::Duration}; use void::Void; -use wasm_timer::Instant; /// The `Ping` protocol upgrade. /// diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index d46661d773a..95518a54045 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -14,6 +14,7 @@ asynchronous-codec = "0.6" bytes = "1" futures = "0.3.1" futures-timer = "3" +instant = "0.1.11" libp2p-core = { version = "0.30.0-rc.1", path = "../../core", default-features = false } libp2p-swarm = { version = "0.31.0-rc.1", path = "../../swarm" } log = "0.4" @@ -23,7 +24,6 @@ rand = "0.7" smallvec = "1.6.1" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } void = "1" -wasm-timer = "0.2" [build-dependencies] prost-build = "0.9" diff --git a/protocols/relay/src/handler.rs b/protocols/relay/src/handler.rs index 79aba915c2f..792ffd50534 100644 --- a/protocols/relay/src/handler.rs +++ b/protocols/relay/src/handler.rs @@ -25,6 +25,7 @@ use futures::channel::oneshot::{self, Canceled}; use futures::future::BoxFuture; use futures::prelude::*; use futures::stream::FuturesUnordered; +use instant::Instant; use libp2p_core::connection::ConnectionId; use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; @@ -36,7 +37,6 @@ use log::warn; use std::fmt; use std::task::{Context, Poll}; use std::time::Duration; -use wasm_timer::Instant; pub struct RelayHandlerConfig { pub connection_idle_timeout: Duration, diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml index 8396b2f65f8..8804a4487af 100644 --- a/protocols/rendezvous/Cargo.toml +++ b/protocols/rendezvous/Cargo.toml @@ -22,7 +22,8 @@ unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } bimap = "0.6.1" sha2 = "0.9" rand = "0.8" -wasm-timer = "0.2" +futures-timer = "3.0.2" +instant = "0.1.11" [dev-dependencies] async-trait = "0.1" diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 73eb90edb38..05dba57b185 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -310,8 +310,8 @@ fn handle_outbound_event( expiring_registrations.extend(registrations.iter().cloned().map(|registration| { async move { // if the timer errors we consider it expired - let _ = - wasm_timer::Delay::new(Duration::from_secs(registration.ttl as u64)).await; + let _ = futures_timer::Delay::new(Duration::from_secs(registration.ttl as u64)) + .await; (registration.record.peer_id(), registration.namespace) } diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index 93682dda422..ecfd8ce50c5 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -381,14 +381,8 @@ impl Registrations { self.registrations .insert(registration_id, registration.clone()); - let next_expiry = wasm_timer::Delay::new(Duration::from_secs(ttl as u64)) - .map(move |result| { - if result.is_err() { - log::warn!("Timer for registration {} has unexpectedly errored, treating it as expired", registration_id.0); - } - - registration_id - }) + let next_expiry = futures_timer::Delay::new(Duration::from_secs(ttl as u64)) + .map(move |_| registration_id) .boxed(); self.next_expiry.push(next_expiry); @@ -496,8 +490,8 @@ pub struct CookieNamespaceMismatch; #[cfg(test)] mod tests { + use instant::SystemTime; use std::option::Option::None; - use std::time::SystemTime; use libp2p_core::{identity, PeerRecord}; diff --git a/protocols/rendezvous/src/substream_handler.rs b/protocols/rendezvous/src/substream_handler.rs index efd7956b13c..55b2a3638c9 100644 --- a/protocols/rendezvous/src/substream_handler.rs +++ b/protocols/rendezvous/src/substream_handler.rs @@ -27,6 +27,7 @@ use futures::future::{self, BoxFuture, Fuse, FusedFuture}; use futures::FutureExt; +use instant::Instant; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_swarm::protocols_handler::{InboundUpgradeSend, OutboundUpgradeSend}; use libp2p_swarm::{ @@ -38,7 +39,7 @@ use std::fmt; use std::future::Future; use std::hash::Hash; use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::time::Duration; use void::Void; /// Handles a substream throughout its lifetime. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 87d581490f0..703ad01355e 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -13,6 +13,7 @@ categories = ["network-programming", "asynchronous"] async-trait = "0.1" bytes = "1" futures = "0.3.1" +instant = "0.1.11" libp2p-core = { version = "0.30.0-rc.1", path = "../../core", default-features = false } libp2p-swarm = { version = "0.31.0-rc.1", path = "../../swarm" } log = "0.4.11" @@ -20,7 +21,6 @@ lru = "0.7" rand = "0.7" smallvec = "1.6.1" unsigned-varint = { version = "0.7", features = ["std", "futures"] } -wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index ee2550df183..e0b482ccd3e 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -26,6 +26,7 @@ use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; pub use protocol::{ProtocolSupport, RequestProtocol, ResponseProtocol}; use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; +use instant::Instant; use libp2p_core::upgrade::{NegotiationError, UpgradeError}; use libp2p_swarm::{ protocols_handler::{ @@ -44,7 +45,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; -use wasm_timer::Instant; /// A connection handler of a `RequestResponse` protocol. #[doc(hidden)] diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 1e4dc7f396f..94b91c090ee 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -16,8 +16,9 @@ libp2p-core = { version = "0.30.0-rc.1", path = "../core", default-features = fa log = "0.4" rand = "0.7" smallvec = "1.6.1" -wasm-timer = "0.2" void = "1" +futures-timer = "3.0.2" +instant = "0.1.11" [dev-dependencies] libp2p = { path = "../", default-features = false, features = ["yamux", "plaintext"] } diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index 2000dbc3fb0..3cdb1bb0095 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -48,9 +48,9 @@ mod select; pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend}; +use instant::Instant; use libp2p_core::{upgrade::UpgradeError, ConnectedPoint, Multiaddr, PeerId}; use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration}; -use wasm_timer::Instant; pub use dummy::DummyProtocolsHandler; pub use map_in::MapInEvent; diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index 8254968c6e8..fa339fd9416 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -26,6 +26,8 @@ use crate::upgrade::SendWrapper; use futures::prelude::*; use futures::stream::FuturesUnordered; +use futures_timer::Delay; +use instant::Instant; use libp2p_core::{ connection::{ ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler, Substream, @@ -36,7 +38,6 @@ use libp2p_core::{ Connected, Multiaddr, }; use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; -use wasm_timer::{Delay, Instant}; /// Prototype for a `NodeHandlerWrapper`. pub struct NodeHandlerWrapperBuilder { @@ -159,7 +160,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.timeout.poll_unpin(cx) { - Poll::Ready(Ok(_)) => { + Poll::Ready(()) => { return Poll::Ready(( self.user_data .take() @@ -167,14 +168,7 @@ where Err(ProtocolsHandlerUpgrErr::Timeout), )) } - Poll::Ready(Err(_)) => { - return Poll::Ready(( - self.user_data - .take() - .expect("Future not to be polled again once ready."), - Err(ProtocolsHandlerUpgrErr::Timer), - )) - } + Poll::Pending => {} } @@ -362,10 +356,16 @@ where (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { if *deadline != t { *deadline = t; - timer.reset_at(t) + if let Some(dur) = deadline.checked_duration_since(Instant::now()) { + timer.reset(dur) + } + } + } + (_, KeepAlive::Until(t)) => { + if let Some(dur) = t.checked_duration_since(Instant::now()) { + self.shutdown = Shutdown::Later(Delay::new(dur), t) } } - (_, KeepAlive::Until(t)) => self.shutdown = Shutdown::Later(Delay::new_at(t), t), (_, KeepAlive::No) => self.shutdown = Shutdown::Asap, (_, KeepAlive::Yes) => self.shutdown = Shutdown::None, }; diff --git a/swarm/src/protocols_handler/one_shot.rs b/swarm/src/protocols_handler/one_shot.rs index 01a2951efc5..0851ff82e43 100644 --- a/swarm/src/protocols_handler/one_shot.rs +++ b/swarm/src/protocols_handler/one_shot.rs @@ -22,10 +22,9 @@ use crate::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; - +use instant::Instant; use smallvec::SmallVec; use std::{error, fmt::Debug, task::Context, task::Poll, time::Duration}; -use wasm_timer::Instant; /// A `ProtocolsHandler` that opens a new substream for each request. // TODO: Debug diff --git a/transports/wasm-ext/src/websockets.js b/transports/wasm-ext/src/websockets.js index 290af968e70..1ef2faf6ded 100644 --- a/transports/wasm-ext/src/websockets.js +++ b/transports/wasm-ext/src/websockets.js @@ -80,7 +80,18 @@ const dial = (addr) => { read: (function*() { while(ws.readyState == 1) { yield reader.next(); } })(), write: (data) => { if (ws.readyState == 1) { - ws.send(data); + // The passed in `data` is an `ArrayBufferView` [0]. If the + // underlying typed array is a `SharedArrayBuffer` (when + // using WASM threads, so multiple web workers sharing + // memory) the WebSocket's `send` method errors [1][2][3]. + // This limitation will probably be lifted in the future, + // but for now we have to make a copy here .. + // + // [0]: https://developer.mozilla.org/en-US/docs/Web/API/ArrayBufferView + // [1]: https://chromium.googlesource.com/chromium/src/+/1438f63f369fed3766fa5031e7a252c986c69be6%5E%21/ + // [2]: https://bugreports.qt.io/browse/QTBUG-78078 + // [3]: https://chromium.googlesource.com/chromium/src/+/HEAD/third_party/blink/renderer/bindings/IDLExtendedAttributes.md#AllowShared_p + ws.send(data.slice(0)); return promise_when_send_finished(ws); } else { return Promise.reject("WebSocket is closed");