From 4de751124da4b4086b4eadd27a74bcd698b2da63 Mon Sep 17 00:00:00 2001 From: Weiliang Li Date: Fri, 18 Oct 2019 16:41:01 +0900 Subject: [PATCH 1/8] authority-discovery: futures 03 Future --- Cargo.lock | 1 + core/authority-discovery/Cargo.toml | 1 + core/authority-discovery/src/lib.rs | 151 +++++++++++++++------------- node/cli/src/service.rs | 2 +- 4 files changed, 86 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c1aec02c00919..f07da6a0e0660 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4665,6 +4665,7 @@ dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/authority-discovery/Cargo.toml b/core/authority-discovery/Cargo.toml index 7283e07f89ca7..7635843481e18 100644 --- a/core/authority-discovery/Cargo.toml +++ b/core/authority-discovery/Cargo.toml @@ -15,6 +15,7 @@ client = { package = "substrate-client", path = "../../core/client" } codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" } derive_more = "0.15.0" futures = "0.1.29" +futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] } libp2p = { version = "0.12.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } log = "0.4.8" network = { package = "substrate-network", path = "../../core/network" } diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 987169ead90b1..7cf18af37ba92 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -42,23 +42,33 @@ //! 3. Validates the signatures of the retrieved key value pairs. //! //! 4. Adds the retrieved external addresses as priority nodes to the peerset. +use std::collections::{HashMap, HashSet}; +use std::convert::TryInto; +use std::iter::FromIterator; +use std::marker::PhantomData; +use std::pin::Pin; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use futures::future::Future as Future01; +use futures::sync::mpsc::Receiver as Receiver01; +use futures03::compat::Stream01CompatExt; +use futures03::future::{FutureExt, TryFutureExt}; +use futures03::prelude::{Future, Stream}; +use futures03::stream::StreamExt; +use futures03::task::{Context, Poll}; + +use tokio_timer::Interval as Interval01; use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, Signature}; use client::blockchain::HeaderBackend; use error::{Error, Result}; -use futures::{prelude::*, sync::mpsc::Receiver}; use log::{debug, error, log_enabled, warn}; use network::specialization::NetworkSpecialization; use network::{DhtEvent, ExHashT}; use prost::Message; use sr_primitives::generic::BlockId; use sr_primitives::traits::{Block as BlockT, ProvideRuntimeApi}; -use std::collections::{HashMap, HashSet}; -use std::convert::TryInto; -use std::iter::FromIterator; -use std::marker::PhantomData; -use std::sync::Arc; -use std::time::{Duration, Instant}; mod error; /// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs. @@ -66,6 +76,12 @@ mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); } +// TODO change tokio 02's Interval when tokio 02's runtime is adopted +type Interval01As03 = Pin + Send>>; + +const DURATION_12H: Duration = Duration::from_secs(12 * 60 * 60); +const DURATION_10M: Duration = Duration::from_secs(10 * 60); + /// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities. pub struct AuthorityDiscovery where @@ -78,12 +94,12 @@ where network: Arc, /// Channel we receive Dht events on. - dht_event_rx: Receiver, + dht_event_rx: Pin + Send>>, /// Interval to be proactive, publishing own addresses. - publish_interval: tokio_timer::Interval, + publish_interval: Interval01As03, /// Interval on which to query for addresses of other authorities. - query_interval: tokio_timer::Interval, + query_interval: Interval01As03, /// The network peerset interface for priority groups lets us only set an entire group, but we retrieve the /// addresses of other authorities one by one from the network. To use the peerset interface we need to cache the @@ -96,27 +112,32 @@ where impl AuthorityDiscovery where - Block: BlockT + 'static, + Block: BlockT + Unpin + 'static, Network: NetworkProvider, Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, ::Api: AuthorityDiscoveryApi, + Self: Future, { /// Return a new authority discovery. pub fn new( client: Arc, network: Arc, - dht_event_rx: futures::sync::mpsc::Receiver, - ) -> AuthorityDiscovery { + dht_event_rx: Pin + Send>>, + ) -> Self { // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node // could restart at any point in time, one can not depend on the republishing process, thus publishing own // external addresses should happen on an interval < 36h. - let publish_interval = - tokio_timer::Interval::new(Instant::now(), Duration::from_secs(12 * 60 * 60)); + let publish_interval = Interval01::new_interval(DURATION_12H) + .compat() + .map(|x| x.unwrap()) + .boxed(); // External addresses of other authorities can change at any given point in time. The interval on which to query // for external addresses of other authorities is a trade off between efficiency and performance. - let query_interval = - tokio_timer::Interval::new(Instant::now(), Duration::from_secs(10 * 60)); + let query_interval = Interval01::new_interval(DURATION_10M) + .compat() + .map(|x| x.unwrap()) + .boxed(); let address_cache = HashMap::new(); @@ -131,6 +152,20 @@ where } } + /// New a futures 01 authority discovery + pub fn new_compat( + client: Arc, + network: Arc, + dht_event_rx: Receiver01, + ) -> impl Future01 { + // TODO remove this function when we switch to futures 03 + let dht_event_rx = dht_event_rx.compat().map(|x| x.unwrap()).boxed(); + + Self::new(client, network, dht_event_rx) + .map(|x| Ok(x)) + .compat() + } + fn publish_own_ext_addresses(&mut self) -> Result<()> { let id = BlockId::hash(self.client.info().best_hash); @@ -191,8 +226,8 @@ where Ok(()) } - fn handle_dht_events(&mut self) -> Result<()> { - while let Ok(Async::Ready(Some(event))) = self.dht_event_rx.poll() { + fn handle_dht_events(&mut self, cx: &mut Context) -> Result<()> { + while let Poll::Ready(Some(event)) = self.dht_event_rx.poll_next_unpin(cx) { match event { DhtEvent::ValueFound(v) => { if log_enabled!(log::Level::Debug) { @@ -202,15 +237,9 @@ where self.handle_dht_value_found_event(v)?; } - DhtEvent::ValueNotFound(hash) => { - warn!(target: "sub-authority-discovery", "Value for hash '{:?}' not found on Dht.", hash) - } - DhtEvent::ValuePut(hash) => { - debug!(target: "sub-authority-discovery", "Successfully put hash '{:?}' on Dht.", hash) - } - DhtEvent::ValuePutFailed(hash) => { - warn!(target: "sub-authority-discovery", "Failed to put hash '{:?}' on Dht.", hash) - } + DhtEvent::ValueNotFound(hash) => warn!(target: "sub-authority-discovery", "Value for hash '{:?}' not found on Dht.", hash), + DhtEvent::ValuePut(hash) => debug!(target: "sub-authority-discovery", "Successfully put hash '{:?}' on Dht.", hash), + DhtEvent::ValuePutFailed(hash) => warn!(target: "sub-authority-discovery", "Failed to put hash '{:?}' on Dht.", hash), } } @@ -291,53 +320,36 @@ where } } -impl futures::Future for AuthorityDiscovery +impl Future for AuthorityDiscovery where - Block: BlockT + 'static, + Block: BlockT + Unpin + 'static, Network: NetworkProvider, Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, ::Api: AuthorityDiscoveryApi, { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> futures::Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let mut inner = || -> Result<()> { // Process incoming events before triggering new ones. - self.handle_dht_events()?; + self.handle_dht_events(cx)?; - if let Async::Ready(_) = self - .publish_interval - .poll() - .map_err(Error::PollingTokioTimer)? - { + if let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) { // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval // tick. - while let Async::Ready(_) = self - .publish_interval - .poll() - .map_err(Error::PollingTokioTimer)? - {} + while let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {} self.publish_own_ext_addresses()?; } - if let Async::Ready(_) = self - .query_interval - .poll() - .map_err(Error::PollingTokioTimer)? - { + if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) { // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval // tick. - while let Async::Ready(_) = self - .query_interval - .poll() - .map_err(Error::PollingTokioTimer)? - {} + while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {} self.request_addresses_of_others()?; } @@ -351,7 +363,7 @@ where }; // Make sure to always return NotReady as this is a long running task with the same lifetime as the node itself. - Ok(futures::Async::NotReady) + Poll::Pending } } @@ -415,13 +427,14 @@ fn hash_authority_id(id: &[u8]) -> Result { mod tests { use super::*; use client::runtime_api::{ApiExt, Core, RuntimeVersion}; - use futures::future::poll_fn; + use futures03::channel::mpsc::channel; + use futures03::future::poll_fn; use primitives::{ExecutionContext, NativeOrEncoded}; use sr_primitives::traits::Zero; use sr_primitives::traits::{ApiRef, Block as BlockT, NumberFor, ProvideRuntimeApi}; use std::sync::{Arc, Mutex}; use test_client::runtime::Block; - use tokio::runtime::current_thread; + use tokio::runtime::current_thread::Runtime as Runtime01; #[derive(Clone)] struct TestApi {} @@ -611,12 +624,12 @@ mod tests { #[test] fn publish_own_ext_addresses_puts_record_on_dht() { - let (_dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let (_dht_event_tx, dht_event_rx) = channel(1000); let test_api = Arc::new(TestApi {}); let network: Arc = Arc::new(Default::default()); let mut authority_discovery = - AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx.boxed()); authority_discovery.publish_own_ext_addresses().unwrap(); @@ -626,12 +639,12 @@ mod tests { #[test] fn request_addresses_of_others_triggers_dht_get_query() { - let (_dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let (_dht_event_tx, dht_event_rx) = channel(1000); let test_api = Arc::new(TestApi {}); let network: Arc = Arc::new(Default::default()); let mut authority_discovery = - AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx.boxed()); authority_discovery.request_addresses_of_others().unwrap(); @@ -643,12 +656,12 @@ mod tests { fn handle_dht_events_with_value_found_should_call_set_priority_group() { // Create authority discovery. - let (mut dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let (mut dht_event_tx, dht_event_rx) = channel(1000); let test_api = Arc::new(TestApi {}); let network: Arc = Arc::new(Default::default()); let mut authority_discovery = - AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx.boxed()); // Create sample dht event. @@ -675,8 +688,8 @@ mod tests { // Make authority discovery handle the event. - let f = || { - authority_discovery.handle_dht_events().unwrap(); + let f = |cx: &mut Context<'_>| -> Poll<()> { + authority_discovery.handle_dht_events(cx).unwrap(); // Expect authority discovery to set the priority set. assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1); @@ -689,10 +702,12 @@ mod tests { ) ); - Ok(Async::Ready(())) + Poll::Ready(()) }; - let mut runtime = current_thread::Runtime::new().unwrap(); - runtime.block_on(poll_fn::<(), (), _>(f)).unwrap(); + let mut runtime = Runtime01::new().unwrap(); + runtime + .block_on(poll_fn(f).map(|x| Ok::<(), ()>(x)).compat()) + .unwrap(); } } diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 485cd325b0ec1..9a3ee7d70b592 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -170,7 +170,7 @@ macro_rules! new_full { let babe = babe::start_babe(babe_config)?; service.spawn_essential_task(babe); - let authority_discovery = authority_discovery::AuthorityDiscovery::new( + let authority_discovery = authority_discovery::AuthorityDiscovery::new_compat( service.client(), service.network(), dht_event_rx, From 88f2ed669776a8452c2f680708a688580bb85f60 Mon Sep 17 00:00:00 2001 From: Weiliang Li Date: Fri, 18 Oct 2019 17:04:51 +0900 Subject: [PATCH 2/8] make ci happy --- core/authority-discovery/src/lib.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 7cf18af37ba92..6f80c3d58f6a6 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -237,9 +237,17 @@ where self.handle_dht_value_found_event(v)?; } - DhtEvent::ValueNotFound(hash) => warn!(target: "sub-authority-discovery", "Value for hash '{:?}' not found on Dht.", hash), - DhtEvent::ValuePut(hash) => debug!(target: "sub-authority-discovery", "Successfully put hash '{:?}' on Dht.", hash), - DhtEvent::ValuePutFailed(hash) => warn!(target: "sub-authority-discovery", "Failed to put hash '{:?}' on Dht.", hash), + DhtEvent::ValueNotFound(hash) => warn!( + target: "sub-authority-discovery", + "Value for hash '{:?}' not found on Dht.", hash + ), + DhtEvent::ValuePut(hash) => debug!( + target: "sub-authority-discovery", + "Successfully put hash '{:?}' on Dht.", hash), + DhtEvent::ValuePutFailed(hash) => warn!( + target: "sub-authority-discovery", + "Failed to put hash '{:?}' on Dht.", hash + ), } } From 4471f7d842c760bab10668eabbcf5c02550eee57 Mon Sep 17 00:00:00 2001 From: Weiliang Li Date: Wed, 23 Oct 2019 10:56:01 +0900 Subject: [PATCH 3/8] use futures timer instead of tokio timer --- Cargo.lock | 2 +- core/authority-discovery/Cargo.toml | 2 +- core/authority-discovery/src/error.rs | 2 -- core/authority-discovery/src/lib.rs | 27 +++++++-------------------- 4 files changed, 9 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f07da6a0e0660..3be21ccb765a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4666,6 +4666,7 @@ dependencies = [ "derive_more 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4681,7 +4682,6 @@ dependencies = [ "substrate-primitives 2.0.0", "substrate-test-runtime-client 2.0.0", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/core/authority-discovery/Cargo.toml b/core/authority-discovery/Cargo.toml index 7635843481e18..984b5cc0636f1 100644 --- a/core/authority-discovery/Cargo.toml +++ b/core/authority-discovery/Cargo.toml @@ -23,7 +23,7 @@ primitives = { package = "substrate-primitives", path = "../primitives" } prost = "0.5.0" serde_json = "1.0.41" sr-primitives = { path = "../../core/sr-primitives" } -tokio-timer = "0.2.11" +futures-timer = "0.4" [dev-dependencies] parking_lot = "0.9.0" diff --git a/core/authority-discovery/src/error.rs b/core/authority-discovery/src/error.rs index e8c1ad9705f0c..dca50cc0beb9e 100644 --- a/core/authority-discovery/src/error.rs +++ b/core/authority-discovery/src/error.rs @@ -42,6 +42,4 @@ pub enum Error { Decoding(prost::DecodeError), /// Failed to parse a libp2p multi address. ParsingMultiaddress(libp2p::core::multiaddr::Error), - /// Tokio timer error. - PollingTokioTimer(tokio_timer::Error) } diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 6f80c3d58f6a6..7245842ea39aa 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -48,7 +48,7 @@ use std::iter::FromIterator; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use futures::future::Future as Future01; use futures::sync::mpsc::Receiver as Receiver01; @@ -57,8 +57,7 @@ use futures03::future::{FutureExt, TryFutureExt}; use futures03::prelude::{Future, Stream}; use futures03::stream::StreamExt; use futures03::task::{Context, Poll}; - -use tokio_timer::Interval as Interval01; +use futures_timer::Interval; use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, Signature}; use client::blockchain::HeaderBackend; @@ -76,12 +75,6 @@ mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); } -// TODO change tokio 02's Interval when tokio 02's runtime is adopted -type Interval01As03 = Pin + Send>>; - -const DURATION_12H: Duration = Duration::from_secs(12 * 60 * 60); -const DURATION_10M: Duration = Duration::from_secs(10 * 60); - /// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities. pub struct AuthorityDiscovery where @@ -97,9 +90,9 @@ where dht_event_rx: Pin + Send>>, /// Interval to be proactive, publishing own addresses. - publish_interval: Interval01As03, + publish_interval: Interval, /// Interval on which to query for addresses of other authorities. - query_interval: Interval01As03, + query_interval: Interval, /// The network peerset interface for priority groups lets us only set an entire group, but we retrieve the /// addresses of other authorities one by one from the network. To use the peerset interface we need to cache the @@ -127,17 +120,11 @@ where // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node // could restart at any point in time, one can not depend on the republishing process, thus publishing own // external addresses should happen on an interval < 36h. - let publish_interval = Interval01::new_interval(DURATION_12H) - .compat() - .map(|x| x.unwrap()) - .boxed(); + let publish_interval = Interval::new(Duration::from_secs(12 * 60 * 60)); // External addresses of other authorities can change at any given point in time. The interval on which to query // for external addresses of other authorities is a trade off between efficiency and performance. - let query_interval = Interval01::new_interval(DURATION_10M) - .compat() - .map(|x| x.unwrap()) - .boxed(); + let query_interval = Interval::new(Duration::from_secs(10 * 60)); let address_cache = HashMap::new(); @@ -152,7 +139,7 @@ where } } - /// New a futures 01 authority discovery + /// Return futures 01 authority discovery pub fn new_compat( client: Arc, network: Arc, From 2ba2377b1dc865623fd6939de5720f9855e8b97a Mon Sep 17 00:00:00 2001 From: Weiliang Li Date: Wed, 23 Oct 2019 13:18:47 +0900 Subject: [PATCH 4/8] Update core/authority-discovery/src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Bastian Köcher --- core/authority-discovery/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 7245842ea39aa..1fdb478e09ac4 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -54,7 +54,7 @@ use futures::future::Future as Future01; use futures::sync::mpsc::Receiver as Receiver01; use futures03::compat::Stream01CompatExt; use futures03::future::{FutureExt, TryFutureExt}; -use futures03::prelude::{Future, Stream}; +use futures03::{Future, Stream}; use futures03::stream::StreamExt; use futures03::task::{Context, Poll}; use futures_timer::Interval; From e8abc90a639b76ec4cc02c268bafe0a3bd08f1a0 Mon Sep 17 00:00:00 2001 From: Weiliang Li Date: Wed, 23 Oct 2019 14:25:48 +0900 Subject: [PATCH 5/8] remove tokio 01 runtime --- Cargo.lock | 1 - core/authority-discovery/Cargo.toml | 1 - core/authority-discovery/src/lib.rs | 9 +++------ 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 82669da10f0a9..4f46c85b0f3b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4726,7 +4726,6 @@ dependencies = [ "substrate-peerset 2.0.0", "substrate-primitives 2.0.0", "substrate-test-runtime-client 2.0.0", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/core/authority-discovery/Cargo.toml b/core/authority-discovery/Cargo.toml index 984b5cc0636f1..fb6a5878cc43b 100644 --- a/core/authority-discovery/Cargo.toml +++ b/core/authority-discovery/Cargo.toml @@ -29,4 +29,3 @@ futures-timer = "0.4" parking_lot = "0.9.0" peerset = { package = "substrate-peerset", path = "../../core/peerset" } test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } -tokio = "0.1.22" diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 1fdb478e09ac4..472ba85298556 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -54,9 +54,9 @@ use futures::future::Future as Future01; use futures::sync::mpsc::Receiver as Receiver01; use futures03::compat::Stream01CompatExt; use futures03::future::{FutureExt, TryFutureExt}; -use futures03::{Future, Stream}; use futures03::stream::StreamExt; use futures03::task::{Context, Poll}; +use futures03::{Future, Stream}; use futures_timer::Interval; use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, Signature}; @@ -423,13 +423,13 @@ mod tests { use super::*; use client::runtime_api::{ApiExt, Core, RuntimeVersion}; use futures03::channel::mpsc::channel; + use futures03::executor::block_on; use futures03::future::poll_fn; use primitives::{ExecutionContext, NativeOrEncoded}; use sr_primitives::traits::Zero; use sr_primitives::traits::{ApiRef, Block as BlockT, NumberFor, ProvideRuntimeApi}; use std::sync::{Arc, Mutex}; use test_client::runtime::Block; - use tokio::runtime::current_thread::Runtime as Runtime01; #[derive(Clone)] struct TestApi {} @@ -700,9 +700,6 @@ mod tests { Poll::Ready(()) }; - let mut runtime = Runtime01::new().unwrap(); - runtime - .block_on(poll_fn(f).map(|x| Ok::<(), ()>(x)).compat()) - .unwrap(); + let _ = block_on(poll_fn(f)); } } From 15fe6aaed9d9dc37ded41620652a4b36e4a881be Mon Sep 17 00:00:00 2001 From: Weiliang Li Date: Tue, 29 Oct 2019 10:49:30 +0900 Subject: [PATCH 6/8] trigger build --- core/authority-discovery/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index 472ba85298556..af948820dd2a8 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -682,7 +682,6 @@ mod tests { dht_event_tx.try_send(dht_event).unwrap(); // Make authority discovery handle the event. - let f = |cx: &mut Context<'_>| -> Poll<()> { authority_discovery.handle_dht_events(cx).unwrap(); From 2afed11da40c9ac0500e7c5086e309ccb75ec7e1 Mon Sep 17 00:00:00 2001 From: Weiliang Li Date: Tue, 29 Oct 2019 18:21:08 +0900 Subject: [PATCH 7/8] kill futures01 --- Cargo.lock | 1 - core/authority-discovery/Cargo.toml | 1 - core/authority-discovery/src/lib.rs | 31 +++++++---------------------- 3 files changed, 7 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 15c670341a22b..2a7e1b2e85450 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4804,7 +4804,6 @@ version = "2.0.0" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/authority-discovery/Cargo.toml b/core/authority-discovery/Cargo.toml index fb6a5878cc43b..6589479d4a4a0 100644 --- a/core/authority-discovery/Cargo.toml +++ b/core/authority-discovery/Cargo.toml @@ -14,7 +14,6 @@ bytes = "0.4.12" client = { package = "substrate-client", path = "../../core/client" } codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" } derive_more = "0.15.0" -futures = "0.1.29" futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] } libp2p = { version = "0.12.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } log = "0.4.8" diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index af948820dd2a8..dac34c98263f4 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -50,13 +50,10 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use futures::future::Future as Future01; -use futures::sync::mpsc::Receiver as Receiver01; -use futures03::compat::Stream01CompatExt; -use futures03::future::{FutureExt, TryFutureExt}; +use futures03::channel::mpsc::Receiver; use futures03::stream::StreamExt; use futures03::task::{Context, Poll}; -use futures03::{Future, Stream}; +use futures03::Future; use futures_timer::Interval; use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, Signature}; @@ -87,7 +84,7 @@ where network: Arc, /// Channel we receive Dht events on. - dht_event_rx: Pin + Send>>, + dht_event_rx: Receiver, /// Interval to be proactive, publishing own addresses. publish_interval: Interval, @@ -115,7 +112,7 @@ where pub fn new( client: Arc, network: Arc, - dht_event_rx: Pin + Send>>, + dht_event_rx: Receiver, ) -> Self { // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node // could restart at any point in time, one can not depend on the republishing process, thus publishing own @@ -139,20 +136,6 @@ where } } - /// Return futures 01 authority discovery - pub fn new_compat( - client: Arc, - network: Arc, - dht_event_rx: Receiver01, - ) -> impl Future01 { - // TODO remove this function when we switch to futures 03 - let dht_event_rx = dht_event_rx.compat().map(|x| x.unwrap()).boxed(); - - Self::new(client, network, dht_event_rx) - .map(|x| Ok(x)) - .compat() - } - fn publish_own_ext_addresses(&mut self) -> Result<()> { let id = BlockId::hash(self.client.info().best_hash); @@ -624,7 +607,7 @@ mod tests { let network: Arc = Arc::new(Default::default()); let mut authority_discovery = - AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx.boxed()); + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); authority_discovery.publish_own_ext_addresses().unwrap(); @@ -639,7 +622,7 @@ mod tests { let network: Arc = Arc::new(Default::default()); let mut authority_discovery = - AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx.boxed()); + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); authority_discovery.request_addresses_of_others().unwrap(); @@ -656,7 +639,7 @@ mod tests { let network: Arc = Arc::new(Default::default()); let mut authority_discovery = - AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx.boxed()); + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); // Create sample dht event. From a7eb4ddf37e6c7f084bc245a92ac7f0f4d8a71f1 Mon Sep 17 00:00:00 2001 From: Weiliang Li Date: Tue, 29 Oct 2019 20:58:25 +0900 Subject: [PATCH 8/8] rename futures --- core/authority-discovery/Cargo.toml | 2 +- core/authority-discovery/src/lib.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/authority-discovery/Cargo.toml b/core/authority-discovery/Cargo.toml index 6589479d4a4a0..c4cab438f4007 100644 --- a/core/authority-discovery/Cargo.toml +++ b/core/authority-discovery/Cargo.toml @@ -14,7 +14,7 @@ bytes = "0.4.12" client = { package = "substrate-client", path = "../../core/client" } codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" } derive_more = "0.15.0" -futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] } +futures-preview = "0.3.0-alpha.19" libp2p = { version = "0.12.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } log = "0.4.8" network = { package = "substrate-network", path = "../../core/network" } diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs index dac34c98263f4..25d17cb1e903f 100644 --- a/core/authority-discovery/src/lib.rs +++ b/core/authority-discovery/src/lib.rs @@ -50,10 +50,10 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use futures03::channel::mpsc::Receiver; -use futures03::stream::StreamExt; -use futures03::task::{Context, Poll}; -use futures03::Future; +use futures::channel::mpsc::Receiver; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; +use futures::Future; use futures_timer::Interval; use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, Signature}; @@ -405,9 +405,9 @@ fn hash_authority_id(id: &[u8]) -> Result { mod tests { use super::*; use client::runtime_api::{ApiExt, Core, RuntimeVersion}; - use futures03::channel::mpsc::channel; - use futures03::executor::block_on; - use futures03::future::poll_fn; + use futures::channel::mpsc::channel; + use futures::executor::block_on; + use futures::future::poll_fn; use primitives::{ExecutionContext, NativeOrEncoded}; use sr_primitives::traits::Zero; use sr_primitives::traits::{ApiRef, Block as BlockT, NumberFor, ProvideRuntimeApi};