From fe5fd3cc7f85029cdbd4e8f1492287600db5a2e6 Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Tue, 25 Aug 2020 02:29:45 +0800 Subject: [PATCH] refactor: re-export crossbeam-channel from ckb-types --- Cargo.lock | 29 +++++++++++-------- chain/Cargo.toml | 2 +- chain/src/chain.rs | 10 +++---- ckb-bin/Cargo.toml | 2 +- ckb-bin/src/subcommand/miner.rs | 2 +- devtools/ci/check-cargotoml.sh | 4 +-- miner/Cargo.toml | 2 +- miner/src/client.rs | 2 +- miner/src/miner.rs | 2 +- miner/src/worker/dummy.rs | 2 +- miner/src/worker/eaglesong_simple.rs | 2 +- miner/src/worker/mod.rs | 2 +- network/Cargo.toml | 1 - network/src/network.rs | 8 ++--- network/src/protocols/test.rs | 2 +- notify/Cargo.toml | 2 +- notify/src/lib.rs | 2 +- rpc/Cargo.toml | 2 +- rpc/src/module/subscription.rs | 2 +- sync/Cargo.toml | 2 +- sync/src/synchronizer/mod.rs | 7 +++-- test/Cargo.toml | 2 +- test/src/main.rs | 2 +- test/src/net.rs | 4 +-- test/src/worker.rs | 2 +- tx-pool/Cargo.toml | 2 +- tx-pool/src/service.rs | 23 ++++++++------- util/channel/Cargo.toml | 11 +++++++ util/channel/src/lib.rs | 3 ++ util/logger-service/Cargo.toml | 2 +- util/logger-service/src/lib.rs | 6 ++-- util/stop-handler/Cargo.toml | 2 +- util/stop-handler/src/lib.rs | 10 +++++-- util/types/Cargo.toml | 2 +- util/types/src/core/service.rs | 8 ++--- verification/Cargo.toml | 1 - verification/src/contextual_block_verifier.rs | 8 +++-- 37 files changed, 101 insertions(+), 76 deletions(-) create mode 100644 util/channel/Cargo.toml create mode 100644 util/channel/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index f1d08b00e1d..0ba872bde21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -382,6 +382,7 @@ dependencies = [ "ckb-chain", "ckb-chain-iter", "ckb-chain-spec", + "ckb-channel", "ckb-instrument", "ckb-jsonrpc-types", "ckb-logger", @@ -400,7 +401,6 @@ dependencies = [ "ckb-util", "ckb-verification", "clap", - "crossbeam-channel 0.3.9", "ctrlc", "rayon", "sentry", @@ -421,6 +421,7 @@ dependencies = [ "bitflags", "ckb-app-config", "ckb-chain-spec", + "ckb-channel", "ckb-dao", "ckb-dao-utils", "ckb-error", @@ -436,7 +437,6 @@ dependencies = [ "ckb-tx-pool", "ckb-types", "ckb-verification", - "crossbeam-channel 0.3.9", "faketime", "lazy_static", ] @@ -467,6 +467,13 @@ dependencies = [ "toml", ] +[[package]] +name = "ckb-channel" +version = "0.36.0-pre" +dependencies = [ + "crossbeam-channel 0.3.9", +] + [[package]] name = "ckb-crypto" version = "0.36.0-pre" @@ -658,9 +665,9 @@ dependencies = [ "ansi_term 0.12.1", "backtrace", "chrono", + "ckb-channel", "ckb-logger-config", "ckb-util", - "crossbeam-channel 0.3.9", "env_logger", "lazy_static", "log 0.4.11", @@ -720,6 +727,7 @@ name = "ckb-miner" version = "0.36.0-pre" dependencies = [ "ckb-app-config", + "ckb-channel", "ckb-hash", "ckb-jsonrpc-types", "ckb-logger", @@ -727,7 +735,6 @@ dependencies = [ "ckb-stop-handler", "ckb-types", "console", - "crossbeam-channel 0.3.9", "eaglesong", "failure", "futures 0.1.29", @@ -762,7 +769,6 @@ dependencies = [ "ckb-types", "ckb-util", "criterion", - "crossbeam-channel 0.3.9", "faketime", "faster-hex 0.4.1", "futures 0.3.4", @@ -807,10 +813,10 @@ name = "ckb-notify" version = "0.36.0-pre" dependencies = [ "ckb-app-config", + "ckb-channel", "ckb-logger", "ckb-stop-handler", "ckb-types", - "crossbeam-channel 0.3.9", ] [[package]] @@ -912,6 +918,7 @@ dependencies = [ "ckb-app-config", "ckb-chain", "ckb-chain-spec", + "ckb-channel", "ckb-dao", "ckb-dao-utils", "ckb-error", @@ -935,7 +942,6 @@ dependencies = [ "ckb-types", "ckb-util", "ckb-verification", - "crossbeam-channel 0.3.9", "failure", "faketime", "futures 0.1.29", @@ -1025,8 +1031,8 @@ dependencies = [ name = "ckb-stop-handler" version = "0.36.0-pre" dependencies = [ + "ckb-channel", "ckb-logger", - "crossbeam-channel 0.3.9", "futures 0.1.29", "parking_lot 0.7.1", "tokio 0.2.22", @@ -1053,6 +1059,7 @@ dependencies = [ "bitflags", "ckb-chain", "ckb-chain-spec", + "ckb-channel", "ckb-dao", "ckb-dao-utils", "ckb-db", @@ -1069,7 +1076,6 @@ dependencies = [ "ckb-types", "ckb-util", "ckb-verification", - "crossbeam-channel 0.3.9", "failure", "faketime", "futures 0.3.4", @@ -1123,6 +1129,7 @@ dependencies = [ "ckb-app-config", "ckb-async-runtime", "ckb-chain-spec", + "ckb-channel", "ckb-dao", "ckb-error", "ckb-fee-estimator", @@ -1135,7 +1142,6 @@ dependencies = [ "ckb-types", "ckb-util", "ckb-verification", - "crossbeam-channel 0.3.9", "failure", "faketime", "lru-cache", @@ -1148,12 +1154,12 @@ version = "0.36.0-pre" dependencies = [ "bit-vec", "bytes 0.5.6", + "ckb-channel", "ckb-error", "ckb-fixed-hash", "ckb-hash", "ckb-occupied-capacity", "ckb-rational", - "crossbeam-channel 0.3.9", "failure", "merkle-cbt", "molecule", @@ -1192,7 +1198,6 @@ dependencies = [ "ckb-test-chain-utils", "ckb-traits", "ckb-types", - "crossbeam-channel 0.3.9", "enum-display-derive", "failure", "faketime", diff --git a/chain/Cargo.toml b/chain/Cargo.toml index 06f4cdad760..c146618bcd3 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -14,7 +14,6 @@ ckb-chain-spec = { path = "../spec" } ckb-store = { path = "../store" } ckb-verification = { path = "../verification" } faketime = "0.2.0" -crossbeam-channel = "0.3" ckb-stop-handler = { path = "../util/stop-handler" } ckb-dao = { path = "../util/dao" } ckb-proposal-table = { path = "../util/proposal-table" } @@ -22,6 +21,7 @@ ckb-error = { path = "../error" } ckb-app-config = { path = "../util/app-config" } bitflags = "1.0" ckb-rust-unstable-port = { path = "../util/rust-unstable-port" } +ckb-channel = { path = "../util/channel" } [dev-dependencies] ckb-test-chain-utils = { path = "../util/test-chain-utils" } diff --git a/chain/src/chain.rs b/chain/src/chain.rs index c421731b630..c7d5bc2d315 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -1,5 +1,6 @@ use crate::cell::{attach_block_cell, detach_block_cell}; use crate::switch::Switch; +use ckb_channel::{self as channel, select, Sender}; use ckb_error::{Error, InternalErrorKind}; use ckb_logger::{self, debug, error, info, log_enabled, trace, warn}; use ckb_metrics::{metrics, Timer}; @@ -22,7 +23,6 @@ use ckb_verification::InvalidParentError; use ckb_verification::{ BlockVerifier, ContextualBlockVerifier, NonContextualBlockTxsVerifier, Verifier, VerifyContext, }; -use crossbeam_channel::{self, select, Sender}; use faketime::unix_time_as_millis; use std::collections::{HashSet, VecDeque}; use std::sync::Arc; @@ -152,11 +152,9 @@ impl ChainService { // remove `allow` tag when https://github.com/crossbeam-rs/crossbeam/issues/404 is solved #[allow(clippy::zero_ptr, clippy::drop_copy)] pub fn start(mut self, thread_name: Option) -> ChainController { - let (signal_sender, signal_receiver) = - crossbeam_channel::bounded::<()>(SIGNAL_CHANNEL_SIZE); - let (process_block_sender, process_block_receiver) = - crossbeam_channel::bounded(DEFAULT_CHANNEL_SIZE); - let (truncate_sender, truncate_receiver) = crossbeam_channel::bounded(1); + let (signal_sender, signal_receiver) = channel::bounded::<()>(SIGNAL_CHANNEL_SIZE); + let (process_block_sender, process_block_receiver) = channel::bounded(DEFAULT_CHANNEL_SIZE); + let (truncate_sender, truncate_receiver) = channel::bounded(1); // Mainly for test: give an empty thread_name let mut thread_builder = thread::Builder::new(); diff --git a/ckb-bin/Cargo.toml b/ckb-bin/Cargo.toml index 0c173e7e4f8..a87c2d57761 100644 --- a/ckb-bin/Cargo.toml +++ b/ckb-bin/Cargo.toml @@ -10,13 +10,13 @@ clap = { version = "2" } serde = { version = "1.0", features = ["derive"] } serde_plain = "0.3.0" toml = "0.5" -crossbeam-channel = "0.3" ckb-app-config = { path = "../util/app-config" } ckb-logger = { path = "../util/logger" } ckb-logger-service = { path = "../util/logger-service" } ckb-metrics-service = { path = "../util/metrics-service" } ckb-util = { path = "../util" } ckb-types = { path = "../util/types" } +ckb-channel = { path = "../util/channel" } ckb-jsonrpc-types = { path = "../util/jsonrpc-types" } ckb-chain = { path = "../chain" } ckb-shared = { path = "../shared" } diff --git a/ckb-bin/src/subcommand/miner.rs b/ckb-bin/src/subcommand/miner.rs index 83fa1d869ad..7b1193f87d2 100644 --- a/ckb-bin/src/subcommand/miner.rs +++ b/ckb-bin/src/subcommand/miner.rs @@ -1,6 +1,6 @@ use ckb_app_config::{ExitCode, MinerArgs, MinerConfig}; +use ckb_channel::unbounded; use ckb_miner::{Client, Miner}; -use crossbeam_channel::unbounded; use std::thread; pub fn miner(args: MinerArgs) -> Result<(), ExitCode> { diff --git a/devtools/ci/check-cargotoml.sh b/devtools/ci/check-cargotoml.sh index afd2976950c..8d599036aee 100755 --- a/devtools/ci/check-cargotoml.sh +++ b/devtools/ci/check-cargotoml.sh @@ -73,12 +73,12 @@ function search_crate() { local depcnt=0 local grepopts="-rh" tmpcnt=$({\ - ${GREP} ${grepopts} "\(^\| \)extern crate ${crate}\(::\|;\)" "${source}" \ + ${GREP} ${grepopts} "\(^\| \)extern crate ${crate}\(::\|;\| as \)" "${source}" \ || true; }\ | wc -l) depcnt=$((depcnt + tmpcnt)) tmpcnt=$({\ - ${GREP} ${grepopts} "\(^\| \)use ${crate}\(::\|;\)" "${source}" \ + ${GREP} ${grepopts} "\(^\| \)use ${crate}\(::\|;\| as \)" "${source}" \ || true; }\ | wc -l) depcnt=$((depcnt + tmpcnt)) diff --git a/miner/Cargo.toml b/miner/Cargo.toml index 8811503ead8..2a653aca36d 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -9,11 +9,11 @@ edition = "2018" ckb-logger = { path = "../util/logger" } ckb-app-config = { path = "../util/app-config" } ckb-types = { path = "../util/types" } +ckb-channel = { path = "../util/channel" } ckb-hash = { path = "../util/hash" } ckb-pow = { path = "../pow" } rand = "0.6" serde = { version = "1.0", features = ["derive"] } -crossbeam-channel = "0.3" serde_json = "1.0" ckb-jsonrpc-types = { path = "../util/jsonrpc-types" } hyper = "0.12" diff --git a/miner/src/client.rs b/miner/src/client.rs index 183c33a47ca..57da2cb26b0 100644 --- a/miner/src/client.rs +++ b/miner/src/client.rs @@ -1,5 +1,6 @@ use crate::Work; use ckb_app_config::MinerClientConfig; +use ckb_channel::Sender; use ckb_jsonrpc_types::{ error::Error as RpcFail, error::ErrorCode as RpcFailCode, id::Id, params::Params, request::MethodCall, response::Output, version::Version, Block as JsonBlock, BlockTemplate, @@ -7,7 +8,6 @@ use ckb_jsonrpc_types::{ use ckb_logger::{debug, error, warn}; use ckb_stop_handler::{SignalSender, StopHandler}; use ckb_types::{packed::Block, H256}; -use crossbeam_channel::Sender; use failure::Error; use futures::sync::{mpsc, oneshot}; use hyper::error::Error as HyperError; diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 5fff36d87b6..e23ef850de8 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -2,6 +2,7 @@ use crate::client::Client; use crate::worker::{start_worker, WorkerController, WorkerMessage}; use crate::Work; use ckb_app_config::MinerWorkerConfig; +use ckb_channel::{select, unbounded, Receiver}; use ckb_logger::{debug, error, info}; use ckb_pow::PowEngine; use ckb_types::{ @@ -9,7 +10,6 @@ use ckb_types::{ prelude::*, utilities::compact_to_target, }; -use crossbeam_channel::{select, unbounded, Receiver}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use lru_cache::LruCache; use std::sync::Arc; diff --git a/miner/src/worker/dummy.rs b/miner/src/worker/dummy.rs index 06e807224d9..bb40fdfa481 100644 --- a/miner/src/worker/dummy.rs +++ b/miner/src/worker/dummy.rs @@ -1,8 +1,8 @@ use super::{Worker, WorkerMessage}; use ckb_app_config::DummyConfig; +use ckb_channel::{Receiver, Sender}; use ckb_logger::error; use ckb_types::packed::Byte32; -use crossbeam_channel::{Receiver, Sender}; use indicatif::ProgressBar; use rand::{ distributions::{self as dist, Distribution as _}, diff --git a/miner/src/worker/eaglesong_simple.rs b/miner/src/worker/eaglesong_simple.rs index b2fac21474c..1ece255a0e9 100644 --- a/miner/src/worker/eaglesong_simple.rs +++ b/miner/src/worker/eaglesong_simple.rs @@ -1,10 +1,10 @@ use super::{Worker, WorkerMessage}; use ckb_app_config::ExtraHashFunction; +use ckb_channel::{Receiver, Sender}; use ckb_hash::blake2b_256; use ckb_logger::{debug, error}; use ckb_pow::pow_message; use ckb_types::{packed::Byte32, U256}; -use crossbeam_channel::{Receiver, Sender}; use eaglesong::eaglesong; use indicatif::ProgressBar; use std::thread; diff --git a/miner/src/worker/mod.rs b/miner/src/worker/mod.rs index 8cabc348411..7acc1deb8ce 100644 --- a/miner/src/worker/mod.rs +++ b/miner/src/worker/mod.rs @@ -2,10 +2,10 @@ mod dummy; mod eaglesong_simple; use ckb_app_config::MinerWorkerConfig; +use ckb_channel::{unbounded, Sender}; use ckb_logger::error; use ckb_pow::{DummyPowEngine, EaglesongBlake2bPowEngine, EaglesongPowEngine, PowEngine}; use ckb_types::{packed::Byte32, U256}; -use crossbeam_channel::{unbounded, Sender}; use dummy::Dummy; use eaglesong_simple::EaglesongSimple; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; diff --git a/network/Cargo.toml b/network/Cargo.toml index 2e253920d3c..ee9cba7148f 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -15,7 +15,6 @@ ckb-app-config = { path = "../util/app-config" } tokio = { version = "0.2.11", features = ["time", "io-util", "tcp", "dns", "rt-threaded", "blocking", "stream"] } tokio-util = { version = "0.3.0", features = ["codec"] } futures = "0.3" -crossbeam-channel = "0.3" p2p = { version="0.3.0", package="tentacle", features = ["molc"] } faketime = "0.2.0" lazy_static = "1.3.0" diff --git a/network/src/network.rs b/network/src/network.rs index 6041075afa4..0316f8ee400 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -49,7 +49,7 @@ use std::{ pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + mpsc as std_mpsc, Arc, }, thread, time::{Duration, Instant}, @@ -1036,8 +1036,8 @@ impl NetworkService { if let Some(name) = thread_name { thread_builder = thread_builder.name(name.to_string()); } - let (sender, receiver) = crossbeam_channel::bounded(1); - let (start_sender, start_receiver) = crossbeam_channel::bounded(1); + let (sender, receiver) = std_mpsc::channel(); + let (start_sender, start_receiver) = std_mpsc::channel(); let network_state_1 = Arc::clone(&network_state); // Main network thread let thread = thread_builder @@ -1128,7 +1128,7 @@ impl NetworkService { return Err(e); } - let stop = StopHandler::new(SignalSender::Crossbeam(sender), thread); + let stop = StopHandler::new(SignalSender::Std(sender), thread); Ok(NetworkController { version, network_state, diff --git a/network/src/protocols/test.rs b/network/src/protocols/test.rs index 4981bceccb2..f3fe3efcc3a 100644 --- a/network/src/protocols/test.rs +++ b/network/src/protocols/test.rs @@ -198,7 +198,7 @@ fn net_service_start(name: String) -> Node { let peer_id = network_state.local_peer_id().clone(); let control = p2p_service.control().clone(); - let (addr_sender, addr_receiver) = crossbeam_channel::bounded(1); + let (addr_sender, addr_receiver) = ::std::sync::mpsc::channel(); thread::spawn(move || { let num_threads = ::std::cmp::max(num_cpus::get(), 4); diff --git a/notify/Cargo.toml b/notify/Cargo.toml index b89fa6645dc..f3999a5272b 100644 --- a/notify/Cargo.toml +++ b/notify/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT" ckb-logger = { path = "../util/logger" } ckb-app-config = { path = "../util/app-config" } ckb-types = { path = "../util/types" } +ckb-channel = { path = "../util/channel" } ckb-stop-handler = { path = "../util/stop-handler" } -crossbeam-channel = "0.3" [dev-dependencies] diff --git a/notify/src/lib.rs b/notify/src/lib.rs index bf83a80f0a1..2c54df4ff78 100644 --- a/notify/src/lib.rs +++ b/notify/src/lib.rs @@ -1,11 +1,11 @@ use ckb_app_config::NotifyConfig; +use ckb_channel::{bounded, select, Receiver, RecvError, Sender}; use ckb_logger::{debug, error, trace}; use ckb_stop_handler::{SignalSender, StopHandler}; use ckb_types::{ core::{service::Request, BlockView}, packed::Alert, }; -use crossbeam_channel::{bounded, select, Receiver, RecvError, Sender}; use std::collections::HashMap; use std::process::Command; use std::thread; diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 3df44af0b99..aa820b42880 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -16,6 +16,7 @@ ckb-store = { path = "../store" } ckb-sync = { path = "../sync" } ckb-chain = { path = "../chain" } ckb-logger = { path = "../util/logger"} +ckb-channel = { path = "../util/channel" } ckb-logger-service = { path = "../util/logger-service"} ckb-network-alert = { path = "../util/network-alert" } ckb-fee-estimator = { path = "../util/fee-estimator" } @@ -27,7 +28,6 @@ jsonrpc-tcp-server = "~14.1" jsonrpc-ws-server = "~14.1" jsonrpc-server-utils = "~14.1" jsonrpc-pubsub = "~14.1" -crossbeam-channel = "0.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" num_cpus = "1.10" diff --git a/rpc/src/module/subscription.rs b/rpc/src/module/subscription.rs index 84c124835ac..00e5282b9eb 100644 --- a/rpc/src/module/subscription.rs +++ b/rpc/src/module/subscription.rs @@ -1,6 +1,6 @@ +use ckb_channel::select; use ckb_logger::error; use ckb_notify::NotifyController; -use crossbeam_channel::select; use jsonrpc_core::{futures::Future, Metadata, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{ diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 750fe0500ec..54072ea1fd4 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -19,6 +19,7 @@ faketime = "0.2.0" bitflags = "1.0" ckb-verification = { path = "../verification" } ckb-chain-spec = { path = "../spec" } +ckb-channel = { path = "../util/channel" } ckb-traits = { path = "../traits" } failure = "0.1.5" lru-cache = { git = "https://github.com/nervosnetwork/lru-cache", rev = "a35fdb8" } @@ -27,7 +28,6 @@ futures = "0.3" ckb-error = {path = "../error"} ckb-tx-pool = { path = "../tx-pool" } ckb-fee-estimator = { path = "../util/fee-estimator" } -crossbeam-channel = "0.3" ratelimit_meter = "5.0" tempfile = "3.0" diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index df118d58df2..9f72c9f5f1e 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -18,6 +18,7 @@ use crate::{ MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT, }; use ckb_chain::chain::ChainController; +use ckb_channel as channel; use ckb_logger::{debug, error, info, trace, warn}; use ckb_metrics::metrics; use ckb_network::{ @@ -49,7 +50,7 @@ enum FetchCMD { struct BlockFetchCMD { sync: Synchronizer, p2p_control: ServiceControl, - recv: crossbeam_channel::Receiver, + recv: channel::Receiver, } impl BlockFetchCMD { @@ -93,7 +94,7 @@ impl BlockFetchCMD { pub struct Synchronizer { chain: ChainController, pub shared: Arc, - fetch_channel: Option>, + fetch_channel: Option>, } impl Synchronizer { @@ -476,7 +477,7 @@ impl Synchronizer { None => { let p2p_control = raw.clone(); let sync = self.clone(); - let (sender, recv) = crossbeam_channel::bounded(2); + let (sender, recv) = channel::bounded(2); let peers = self.get_peers_to_fetch(ibd, &disconnect_list); sender.send(FetchCMD::Fetch(peers)).unwrap(); self.fetch_channel = Some(sender); diff --git a/test/Cargo.toml b/test/Cargo.toml index c276aaf95a4..23f8d6d825c 100644 --- a/test/Cargo.toml +++ b/test/Cargo.toml @@ -11,6 +11,7 @@ toml = "0.5.0" ckb-jsonrpc-types = { path = "../util/jsonrpc-types" } ckb-app-config = { path = "../util/app-config" } ckb-network = { path = "../network" } +ckb-channel = { path = "../util/channel" } ckb-sync = { path = "../sync" } ckb-fee-estimator = { path = "../util/fee-estimator" } ckb-types = { path = "../util/types" } @@ -27,7 +28,6 @@ reqwest = "0.9" rand = "0.6" log = "0.4" env_logger = "0.6" -crossbeam-channel = "0.3" faketime = "0.2" failure = "0.1.5" serde_json = "1.0" diff --git a/test/src/main.rs b/test/src/main.rs index 1e629e7031f..93130ea481c 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -1,3 +1,4 @@ +use ckb_channel::unbounded; use ckb_test::specs::*; use ckb_test::{ utils::node_log, @@ -7,7 +8,6 @@ use ckb_test::{ use ckb_types::core::ScriptHashType; use ckb_util::Mutex; use clap::{value_t, App, Arg}; -use crossbeam_channel::unbounded; use log::{error, info}; use rand::{seq::SliceRandom, thread_rng}; use std::any::Any; diff --git a/test/src/net.rs b/test/src/net.rs index 001ae522f25..cd625e42b5b 100644 --- a/test/src/net.rs +++ b/test/src/net.rs @@ -2,12 +2,12 @@ use crate::specs::TestProtocol; use crate::utils::{temp_path, wait_until}; use crate::{Node, Setup}; use ckb_app_config::NetworkConfig; +use ckb_channel::{self as channel, Receiver, RecvTimeoutError, Sender}; use ckb_network::{ bytes::Bytes, CKBProtocol, CKBProtocolContext, CKBProtocolHandler, DefaultExitHandler, NetworkController, NetworkService, NetworkState, PeerIndex, ProtocolId, }; use ckb_types::core::{BlockNumber, BlockView}; -use crossbeam_channel::{self, Receiver, RecvTimeoutError, Sender}; use std::collections::HashSet; use std::path::PathBuf; use std::sync::{ @@ -94,7 +94,7 @@ impl Net { ); assert!(self.controller.is_none()); - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel::unbounded(); let config = NetworkConfig { listen_addresses: vec![format!("/ip4/127.0.0.1/tcp/{}", self.p2p_port()) .parse() diff --git a/test/src/worker.rs b/test/src/worker.rs index 033f4102eb2..2adda946532 100644 --- a/test/src/worker.rs +++ b/test/src/worker.rs @@ -1,6 +1,6 @@ use crate::{utils::nodes_panicked, Net, Spec}; +use ckb_channel::{unbounded, Receiver, Sender}; use ckb_util::Mutex; -use crossbeam_channel::{unbounded, Receiver, Sender}; use log::{error, info}; use std::any::Any; use std::panic; diff --git a/tx-pool/Cargo.toml b/tx-pool/Cargo.toml index bea97527e17..6e66bee459c 100644 --- a/tx-pool/Cargo.toml +++ b/tx-pool/Cargo.toml @@ -23,8 +23,8 @@ ckb-chain-spec = { path = "../spec" } ckb-snapshot = { path = "../util/snapshot" } ckb-error = { path = "../error" } tokio = { version = "0.2", features = ["sync", "blocking", "rt-threaded", "macros"] } -crossbeam-channel = "0.3" ckb-async-runtime = { path = "../util/runtime" } ckb-stop-handler = { path = "../util/stop-handler" } ckb-fee-estimator = { path = "../util/fee-estimator" } ckb-app-config = { path = "../util/app-config" } +ckb-channel = { path = "../util/channel" } diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index 65bf7df2b3b..f5144ffc715 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -6,6 +6,7 @@ use crate::process::PlugTarget; use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig}; use ckb_async_runtime::{new_runtime, Handle}; use ckb_chain_spec::consensus::Consensus; +use ckb_channel as channel; use ckb_error::Error; use ckb_jsonrpc_types::BlockTemplate; use ckb_logger::error; @@ -26,12 +27,12 @@ use tokio::sync::{mpsc, oneshot, RwLock}; pub const DEFAULT_CHANNEL_SIZE: usize = 512; pub struct Request { - pub responder: crossbeam_channel::Sender, + pub responder: channel::Sender, pub arguments: A, } impl Request { - pub fn call(arguments: A, responder: crossbeam_channel::Sender) -> Request { + pub fn call(arguments: A, responder: channel::Sender) -> Request { Request { responder, arguments, @@ -126,7 +127,7 @@ impl TxPoolController { block_assembler_config: Option, ) -> Result { let mut sender = self.sender.clone(); - let (responder, response) = crossbeam_channel::bounded(1); + let (responder, response) = channel::bounded(1); let request = Request::call( ( bytes_limit, @@ -176,7 +177,7 @@ impl TxPoolController { pub fn submit_txs(&self, txs: Vec) -> Result { let mut sender = self.sender.clone(); - let (responder, response) = crossbeam_channel::bounded(1); + let (responder, response) = channel::bounded(1); let request = Request::call(txs, responder); sender.try_send(Message::SubmitTxs(request)).map_err(|e| { let (_m, e) = handle_try_send_error(e); @@ -191,7 +192,7 @@ impl TxPoolController { target: PlugTarget, ) -> Result<(), FailureError> { let mut sender = self.sender.clone(); - let (responder, response) = crossbeam_channel::bounded(1); + let (responder, response) = channel::bounded(1); let request = Request::call((entries, target), responder); sender.try_send(Message::PlugEntry(request)).map_err(|e| { let (_m, e) = handle_try_send_error(e); @@ -215,7 +216,7 @@ impl TxPoolController { pub fn get_tx_pool_info(&self) -> Result { let mut sender = self.sender.clone(); - let (responder, response) = crossbeam_channel::bounded(1); + let (responder, response) = channel::bounded(1); let request = Request::call((), responder); sender .try_send(Message::GetTxPoolInfo(request)) @@ -231,7 +232,7 @@ impl TxPoolController { proposals: Vec, ) -> Result, FailureError> { let mut sender = self.sender.clone(); - let (responder, response) = crossbeam_channel::bounded(1); + let (responder, response) = channel::bounded(1); let request = Request::call(proposals, responder); sender .try_send(Message::FreshProposalsFilter(request)) @@ -244,7 +245,7 @@ impl TxPoolController { pub fn fetch_tx_for_rpc(&self, id: ProposalShortId) -> Result { let mut sender = self.sender.clone(); - let (responder, response) = crossbeam_channel::bounded(1); + let (responder, response) = channel::bounded(1); let request = Request::call(id, responder); sender.try_send(Message::FetchTxRPC(request)).map_err(|e| { let (_m, e) = handle_try_send_error(e); @@ -258,7 +259,7 @@ impl TxPoolController { short_ids: Vec, ) -> Result, FailureError> { let mut sender = self.sender.clone(); - let (responder, response) = crossbeam_channel::bounded(1); + let (responder, response) = channel::bounded(1); let request = Request::call(short_ids, responder); sender.try_send(Message::FetchTxs(request)).map_err(|e| { let (_m, e) = handle_try_send_error(e); @@ -272,7 +273,7 @@ impl TxPoolController { short_ids: Vec, ) -> Result { let mut sender = self.sender.clone(); - let (responder, response) = crossbeam_channel::bounded(1); + let (responder, response) = channel::bounded(1); let request = Request::call(short_ids, responder); sender .try_send(Message::FetchTxsWithCycles(request)) @@ -285,7 +286,7 @@ impl TxPoolController { pub fn clear_pool(&self, new_snapshot: Arc) -> Result<(), FailureError> { let mut sender = self.sender.clone(); - let (responder, response) = crossbeam_channel::bounded(1); + let (responder, response) = channel::bounded(1); let request = Request::call(new_snapshot, responder); sender.try_send(Message::ClearPool(request)).map_err(|e| { let (_m, e) = handle_try_send_error(e); diff --git a/util/channel/Cargo.toml b/util/channel/Cargo.toml new file mode 100644 index 00000000000..c2ea90ee9c7 --- /dev/null +++ b/util/channel/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "ckb-channel" +version = "0.36.0-pre" +license = "MIT" +authors = ["Nervos Core Dev "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +crossbeam-channel = "~0.3" diff --git a/util/channel/src/lib.rs b/util/channel/src/lib.rs new file mode 100644 index 00000000000..c2d25777d64 --- /dev/null +++ b/util/channel/src/lib.rs @@ -0,0 +1,3 @@ +pub use crossbeam_channel::{ + bounded, select, unbounded, Receiver, RecvError, RecvTimeoutError, Sender, +}; diff --git a/util/logger-service/Cargo.toml b/util/logger-service/Cargo.toml index bc4cc32973f..c9db3e68a0b 100644 --- a/util/logger-service/Cargo.toml +++ b/util/logger-service/Cargo.toml @@ -8,12 +8,12 @@ edition = "2018" [dependencies] ckb-util = { path = ".." } ckb-logger-config = { path = "../logger-config" } +ckb-channel = { path = "../channel" } ansi_term = "0.12" log = "0.4" env_logger = "0.6" lazy_static = "1.3" regex = "1.1.6" chrono = "0.4" -crossbeam-channel = "0.3" backtrace = "0.3" sentry = "0.16.0" diff --git a/util/logger-service/src/lib.rs b/util/logger-service/src/lib.rs index 568999348c3..1d5f7249639 100644 --- a/util/logger-service/src/lib.rs +++ b/util/logger-service/src/lib.rs @@ -1,7 +1,7 @@ use ansi_term::Colour; use backtrace::Backtrace; use chrono::prelude::{DateTime, Local}; -use crossbeam_channel::unbounded; +use ckb_channel::{self, unbounded}; use env_logger::filter::{Builder, Filter}; use lazy_static::lazy_static; use log::{LevelFilter, Log, Metadata, Record, SetLoggerError}; @@ -15,7 +15,7 @@ use ckb_logger_config::Config; use ckb_util::{strings, Mutex, RwLock}; lazy_static! { - static ref CONTROL_HANDLE: sync::Arc>>> = + static ref CONTROL_HANDLE: sync::Arc>>> = sync::Arc::new(RwLock::new(None)); } @@ -38,7 +38,7 @@ enum Message { #[derive(Debug)] pub struct Logger { - sender: crossbeam_channel::Sender, + sender: ckb_channel::Sender, handle: Mutex>>, filter: sync::Arc>, emit_sentry_breadcrumbs: bool, diff --git a/util/stop-handler/Cargo.toml b/util/stop-handler/Cargo.toml index 9490a3381dc..6f01f4f05d2 100644 --- a/util/stop-handler/Cargo.toml +++ b/util/stop-handler/Cargo.toml @@ -7,7 +7,7 @@ license = "MIT" [dependencies] parking_lot = "=0.7.1" -crossbeam-channel = "0.3" ckb-logger = { path = "../logger" } futures = "0.1" tokio = { version = "0.2", features = ["sync", "blocking", "rt-threaded"] } +ckb-channel = { path = "../channel" } diff --git a/util/stop-handler/src/lib.rs b/util/stop-handler/src/lib.rs index c9c5ad0970f..7bfcae7433b 100644 --- a/util/stop-handler/src/lib.rs +++ b/util/stop-handler/src/lib.rs @@ -1,7 +1,7 @@ use ckb_logger::error; -use crossbeam_channel::Sender; use futures::sync::oneshot; use parking_lot::Mutex; +use std::sync::mpsc; use std::sync::Arc; use std::thread::JoinHandle; use tokio::sync::oneshot as tokio_oneshot; @@ -9,7 +9,8 @@ use tokio::sync::oneshot as tokio_oneshot; #[derive(Debug)] pub enum SignalSender { Future(oneshot::Sender<()>), - Crossbeam(Sender<()>), + Crossbeam(ckb_channel::Sender<()>), + Std(mpsc::Sender<()>), Tokio(tokio_oneshot::Sender<()>), } @@ -21,6 +22,11 @@ impl SignalSender { error!("handler signal send error {:?}", e); }; } + SignalSender::Std(tx) => { + if let Err(e) = tx.send(()) { + error!("handler signal send error {:?}", e); + }; + } SignalSender::Future(tx) => { if let Err(e) = tx.send(()) { error!("handler signal send error {:?}", e); diff --git a/util/types/Cargo.toml b/util/types/Cargo.toml index 3b25170063c..779c27900e6 100644 --- a/util/types/Cargo.toml +++ b/util/types/Cargo.toml @@ -13,7 +13,7 @@ bytes = { version="0.5.4", features = ["serde"] } merkle-cbt = "0.2" ckb-occupied-capacity = { path = "../occupied-capacity" } ckb-hash = { path = "../hash" } -crossbeam-channel = "0.3" +ckb-channel = { path = "../channel" } bit-vec = "0.5.1" failure = "0.1.5" ckb-error = { path = "../../error" } diff --git a/util/types/src/core/service.rs b/util/types/src/core/service.rs index aae01767c00..a66cbe440b3 100644 --- a/util/types/src/core/service.rs +++ b/util/types/src/core/service.rs @@ -1,17 +1,17 @@ -use crossbeam_channel::{self, Sender}; +use ckb_channel::Sender; +use std::sync::mpsc; -const ONESHOT_CHANNEL_SIZE: usize = 1; pub const SIGNAL_CHANNEL_SIZE: usize = 1; pub const DEFAULT_CHANNEL_SIZE: usize = 32; pub struct Request { - pub responder: Sender, + pub responder: mpsc::Sender, pub arguments: A, } impl Request { pub fn call(sender: &Sender>, arguments: A) -> Option { - let (responder, response) = crossbeam_channel::bounded(ONESHOT_CHANNEL_SIZE); + let (responder, response) = mpsc::channel(); let _ = sender.send(Request { responder, arguments, diff --git a/verification/Cargo.toml b/verification/Cargo.toml index d6c37cbdcbc..c4dfdbef51d 100644 --- a/verification/Cargo.toml +++ b/verification/Cargo.toml @@ -23,7 +23,6 @@ failure = "0.1.5" ckb-error = { path = "../error" } enum-display-derive = "0.1.0" tokio = { version = "0.2", features = ["sync", "blocking", "rt-threaded"] } -crossbeam-channel = "0.3" ckb-async-runtime = { path = "../util/runtime" } [dev-dependencies] diff --git a/verification/src/contextual_block_verifier.rs b/verification/src/contextual_block_verifier.rs index 024c741b925..bb92b4fbe1a 100644 --- a/verification/src/contextual_block_verifier.rs +++ b/verification/src/contextual_block_verifier.rs @@ -26,7 +26,7 @@ use ckb_types::{ use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{oneshot, RwLock}; pub struct VerifyContext<'a, CS> { pub(crate) store: &'a CS, @@ -357,7 +357,7 @@ impl<'a, CS: ChainStore<'a>> BlockTxsVerifier<'a, CS> { keys: K, handle: &Handle, ) -> HashMap { - let (sender, receiver) = crossbeam_channel::bounded(1); + let (sender, receiver) = oneshot::channel(); handle.spawn(async move { let guard = txs_verify_cache.read().await; let ret = keys @@ -369,7 +369,9 @@ impl<'a, CS: ChainStore<'a>> BlockTxsVerifier<'a, CS> { error_target!(crate::LOG_TARGET, "TxsVerifier fetched_cache error {:?}", e); }; }); - receiver.recv().expect("fetched cache no exception") + handle + .block_on(receiver) + .expect("fetched cache no exception") } pub fn verify(