Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Revert "Use Subscription Manager from jsonrpc-pubsub (#6208)"
Browse files Browse the repository at this point in the history
This reverts commit f028a50.
  • Loading branch information
andresilva authored Jun 4, 2020
1 parent fc11ba9 commit fc876cf
Show file tree
Hide file tree
Showing 31 changed files with 285 additions and 196 deletions.
126 changes: 62 additions & 64 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion bin/node/browser-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "Apache-2.0"
[dependencies]
futures-timer = "3.0.2"
libp2p = { version = "0.19.1", default-features = false }
jsonrpc-core = "14.2"
jsonrpc-core = "14.0.5"
serde = "1.0.106"
serde_json = "1.0.48"
wasm-bindgen = { version = "=0.2.62", features = ["serde-serialize"] }
Expand Down
2 changes: 1 addition & 1 deletion bin/node/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ codec = { package = "parity-scale-codec", version = "1.3.0" }
serde = { version = "1.0.102", features = ["derive"] }
futures = { version = "0.3.1", features = ["compat"] }
hex-literal = "0.2.1"
jsonrpc-core = "14.2"
jsonrpc-core = "14.0.3"
log = "0.4.8"
rand = "0.7.2"
structopt = { version = "0.3.8", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion bin/node/rpc-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"]
env_logger = "0.7.0"
futures = "0.1.29"
hyper = "0.12.35"
jsonrpc-core-client = { version = "14.2", default-features = false, features = ["http"] }
jsonrpc-core-client = { version = "14.0.5", default-features = false, features = ["http"] }
log = "0.4.8"
node-primitives = { version = "2.0.0-rc2", path = "../primitives" }
sc-rpc = { version = "2.0.0-rc2", path = "../../../client/rpc" }
2 changes: 1 addition & 1 deletion bin/node/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
sc-client-api = { version = "2.0.0-rc2", path = "../../../client/api" }
jsonrpc-core = "14.2"
jsonrpc-core = "14.0.3"
node-primitives = { version = "2.0.0-rc2", path = "../primitives" }
node-runtime = { version = "2.0.0-rc2", path = "../runtime" }
sp-runtime = { version = "2.0.0-rc2", path = "../../../primitives/runtime" }
Expand Down
2 changes: 1 addition & 1 deletion bin/utils/subkey/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ rpassword = "4.0.1"
itertools = "0.8.2"
derive_more = { version = "0.99.2" }
sc-rpc = { version = "2.0.0-rc2", path = "../../../client/rpc" }
jsonrpc-core-client = { version = "14.2", features = ["http"] }
jsonrpc-core-client = { version = "14.0.3", features = ["http"] }
hyper = "0.12.35"
libp2p = { version = "0.19.1", default-features = false }
serde_json = "1.0"
Expand Down
6 changes: 3 additions & 3 deletions client/consensus/babe/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
sc-consensus-babe = { version = "0.8.0-rc2", path = "../" }
sc-rpc-api = { version = "0.8.0-rc2", path = "../../../rpc-api" }
jsonrpc-core = "14.2"
jsonrpc-core-client = "14.2"
jsonrpc-derive = "14.2.1"
jsonrpc-core = "14.0.3"
jsonrpc-core-client = "14.0.5"
jsonrpc-derive = "14.0.3"
sp-consensus-babe = { version = "0.8.0-rc2", path = "../../../../primitives/consensus/babe" }
serde = { version = "1.0.104", features=["derive"] }
sp-blockchain = { version = "2.0.0-rc2", path = "../../../../primitives/blockchain" }
Expand Down
6 changes: 3 additions & 3 deletions client/consensus/manual-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
derive_more = "0.99.2"
futures = "0.3.4"
jsonrpc-core = "14.2"
jsonrpc-core-client = "14.2"
jsonrpc-derive = "14.2.1"
jsonrpc-core = "14.0.5"
jsonrpc-core-client = "14.0.5"
jsonrpc-derive = "14.0.5"
log = "0.4.8"
parking_lot = "0.10.0"
serde = { version = "1.0", features=["derive"] }
Expand Down
6 changes: 3 additions & 3 deletions client/finality-grandpa/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
sc-finality-grandpa = { version = "0.8.0-rc2", path = "../" }
finality-grandpa = { version = "0.12.3", features = ["derive-codec"] }
jsonrpc-core = "14.2"
jsonrpc-core-client = "14.2"
jsonrpc-derive = "14.2.1"
jsonrpc-core = "14.0.3"
jsonrpc-core-client = "14.0.3"
jsonrpc-derive = "14.0.3"
futures = { version = "0.3.4", features = ["compat"] }
serde = { version = "1.0.105", features = ["derive"] }
serde_json = "1.0.50"
Expand Down
8 changes: 4 additions & 4 deletions client/rpc-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ targets = ["x86_64-unknown-linux-gnu"]
codec = { package = "parity-scale-codec", version = "1.3.0" }
derive_more = "0.99.2"
futures = { version = "0.3.1", features = ["compat"] }
jsonrpc-core = "14.2"
jsonrpc-core-client = "14.2"
jsonrpc-derive = "14.2.1"
jsonrpc-pubsub = "14.2"
jsonrpc-core = "14.0.3"
jsonrpc-core-client = "14.0.5"
jsonrpc-derive = "14.0.3"
jsonrpc-pubsub = "14.0.3"
log = "0.4.8"
parking_lot = "0.10.0"
sp-core = { version = "2.0.0-rc2", path = "../../primitives/core" }
Expand Down
2 changes: 2 additions & 0 deletions client/rpc-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
mod errors;
mod helpers;
mod policy;
mod subscriptions;

pub use jsonrpc_core::IoHandlerExtension as RpcExtension;
pub use subscriptions::{Subscriptions, TaskExecutor};
pub use helpers::Receiver;
pub use policy::DenyUnsafe;

Expand Down
121 changes: 121 additions & 0 deletions client/rpc-api/src/subscriptions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// This file is part of Substrate.

// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::sync::{Arc, atomic::{self, AtomicUsize}};

use log::{error, warn};
use jsonrpc_pubsub::{SubscriptionId, typed::{Sink, Subscriber}};
use parking_lot::Mutex;
use jsonrpc_core::futures::sync::oneshot;
use jsonrpc_core::futures::{Future, future};

type Id = u64;

/// Alias for a an implementation of `futures::future::Executor`.
pub type TaskExecutor = Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;

/// Generate unique ids for subscriptions.
#[derive(Clone, Debug)]
pub struct IdProvider {
next_id: Arc<AtomicUsize>,
}
impl Default for IdProvider {
fn default() -> Self {
IdProvider {
next_id: Arc::new(AtomicUsize::new(1)),
}
}
}

impl IdProvider {
/// Returns next id for the subscription.
pub fn next_id(&self) -> Id {
self.next_id.fetch_add(1, atomic::Ordering::AcqRel) as u64
}
}

/// Subscriptions manager.
///
/// Takes care of assigning unique subscription ids and
/// driving the sinks into completion.
#[derive(Clone)]
pub struct Subscriptions {
next_id: IdProvider,
active_subscriptions: Arc<Mutex<HashMap<Id, oneshot::Sender<()>>>>,
executor: TaskExecutor,
}

impl Subscriptions {
/// Creates new `Subscriptions` object.
pub fn new(executor: TaskExecutor) -> Self {
Subscriptions {
next_id: Default::default(),
active_subscriptions: Default::default(),
executor,
}
}

/// Borrows the internal task executor.
///
/// This can be used to spawn additional tasks on the underlying event loop.
pub fn executor(&self) -> &TaskExecutor {
&self.executor
}

/// Creates new subscription for given subscriber.
///
/// Second parameter is a function that converts Subscriber sink into a future.
/// This future will be driven to completion by the underlying event loop
/// or will be cancelled in case #cancel is invoked.
pub fn add<T, E, G, R, F>(&self, subscriber: Subscriber<T, E>, into_future: G) -> SubscriptionId where
G: FnOnce(Sink<T, E>) -> R,
R: future::IntoFuture<Future=F, Item=(), Error=()>,
F: future::Future<Item=(), Error=()> + Send + 'static,
{
let id = self.next_id.next_id();
let subscription_id: SubscriptionId = id.into();
if let Ok(sink) = subscriber.assign_id(subscription_id.clone()) {
let (tx, rx) = oneshot::channel();
let future = into_future(sink)
.into_future()
.select(rx.map_err(|e| warn!("Error timeing out: {:?}", e)))
.then(|_| Ok(()));

self.active_subscriptions.lock().insert(id, tx);
if self.executor.execute(Box::new(future)).is_err() {
error!("Failed to spawn RPC subscription task");
}
}

subscription_id
}

/// Cancel subscription.
///
/// Returns true if subscription existed or false otherwise.
pub fn cancel(&self, id: SubscriptionId) -> bool {
if let SubscriptionId::Number(id) = id {
if let Some(tx) = self.active_subscriptions.lock().remove(&id) {
let _ = tx.send(());
return true;
}
}
false
}
}
8 changes: 4 additions & 4 deletions client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ description = "Substrate RPC servers."
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpc-core = "14.2"
pubsub = { package = "jsonrpc-pubsub", version = "14.2" }
jsonrpc-core = "14.0.3"
pubsub = { package = "jsonrpc-pubsub", version = "14.0.3" }
log = "0.4.8"
serde = "1.0.101"
serde_json = "1.0.41"
sp-runtime = { version = "2.0.0-rc2", path = "../../primitives/runtime" }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
http = { package = "jsonrpc-http-server", version = "14.2" }
ws = { package = "jsonrpc-ws-server", version = "14.2" }
http = { package = "jsonrpc-http-server", version = "14.0.3" }
ws = { package = "jsonrpc-ws-server", version = "14.0.3" }
4 changes: 2 additions & 2 deletions client/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ sc-client-api = { version = "2.0.0-rc2", path = "../api" }
sp-api = { version = "2.0.0-rc2", path = "../../primitives/api" }
codec = { package = "parity-scale-codec", version = "1.3.0" }
futures = { version = "0.3.1", features = ["compat"] }
jsonrpc-pubsub = "14.2"
jsonrpc-pubsub = "14.0.3"
log = "0.4.8"
sp-core = { version = "2.0.0-rc2", path = "../../primitives/core" }
rpc = { package = "jsonrpc-core", version = "14.2" }
rpc = { package = "jsonrpc-core", version = "14.0.3" }
sp-version = { version = "2.0.0-rc2", path = "../../primitives/version" }
serde_json = "1.0.41"
sp-session = { version = "2.0.0-rc2", path = "../../primitives/session" }
Expand Down
9 changes: 5 additions & 4 deletions client/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use rpc::futures::{
};
use futures::{StreamExt as _, compat::Compat};
use futures::future::{ready, FutureExt, TryFutureExt};
use sc_rpc_api::DenyUnsafe;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use sc_rpc_api::{DenyUnsafe, Subscriptions};
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use codec::{Encode, Decode};
use sp_core::{Bytes, traits::BareCryptoStorePtr};
use sp_api::ProvideRuntimeApi;
Expand All @@ -55,7 +55,7 @@ pub struct Author<P, Client> {
/// Transactions pool
pool: Arc<P>,
/// Subscriptions manager
subscriptions: SubscriptionManager,
subscriptions: Subscriptions,
/// The key store.
keystore: BareCryptoStorePtr,
/// Whether to deny unsafe calls
Expand All @@ -67,7 +67,7 @@ impl<P, Client> Author<P, Client> {
pub fn new(
client: Arc<Client>,
pool: Arc<P>,
subscriptions: SubscriptionManager,
subscriptions: Subscriptions,
keystore: BareCryptoStorePtr,
deny_unsafe: DenyUnsafe,
) -> Self {
Expand All @@ -81,6 +81,7 @@ impl<P, Client> Author<P, Client> {
}
}


/// Currently we treat all RPC transactions as externals.
///
/// Possibly in the future we could allow opt-in for special treatment
Expand Down
35 changes: 11 additions & 24 deletions client/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl TestSetup {
Author {
client: self.client.clone(),
pool: self.pool.clone(),
subscriptions: SubscriptionManager::new(Arc::new(crate::testing::TaskExecutor)),
subscriptions: Subscriptions::new(Arc::new(crate::testing::TaskExecutor)),
keystore: self.keystore.clone(),
deny_unsafe: DenyUnsafe::No,
}
Expand Down Expand Up @@ -133,14 +133,8 @@ fn should_watch_extrinsic() {
uxt(AccountKeyring::Alice, 0).encode().into(),
);

let id = executor::block_on(id_rx.compat()).unwrap().unwrap();
assert_matches!(id, SubscriptionId::String(_));

let id = match id {
SubscriptionId::String(id) => id,
_ => unreachable!(),
};

// then
assert_eq!(executor::block_on(id_rx.compat()), Ok(Ok(1.into())));
// check notifications
let replacement = {
let tx = Transfer {
Expand All @@ -153,22 +147,15 @@ fn should_watch_extrinsic() {
};
AuthorApi::submit_extrinsic(&p, replacement.encode().into()).wait().unwrap();
let (res, data) = executor::block_on(data.into_future().compat()).unwrap();

let expected = Some(format!(
r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":"ready","subscription":"{}"}}}}"#,
id,
));
assert_eq!(res, expected);

assert_eq!(
res,
Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":"ready","subscription":1}}"#.into())
);
let h = blake2_256(&replacement.encode());
let expected = Some(format!(
r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":{{"usurped":"0x{}"}},"subscription":"{}"}}}}"#,
HexDisplay::from(&h),
id,
));

let res = executor::block_on(data.into_future().compat()).unwrap().0;
assert_eq!(res, expected);
assert_eq!(
executor::block_on(data.into_future().compat()).unwrap().0,
Some(format!(r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":{{"usurped":"0x{}"}},"subscription":1}}}}"#, HexDisplay::from(&h)))
);
}

#[test]
Expand Down
8 changes: 4 additions & 4 deletions client/rpc/src/chain/chain_full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use std::sync::Arc;
use rpc::futures::future::result;
use jsonrpc_pubsub::manager::SubscriptionManager;

use sc_rpc_api::Subscriptions;
use sc_client_api::{BlockchainEvents, BlockBackend};
use sp_runtime::{generic::{BlockId, SignedBlock}, traits::{Block as BlockT}};

Expand All @@ -32,14 +32,14 @@ pub struct FullChain<Block: BlockT, Client> {
/// Substrate client.
client: Arc<Client>,
/// Current subscriptions.
subscriptions: SubscriptionManager,
subscriptions: Subscriptions,
/// phantom member to pin the block type
_phantom: PhantomData<Block>,
}

impl<Block: BlockT, Client> FullChain<Block, Client> {
/// Create new Chain API RPC handler.
pub fn new(client: Arc<Client>, subscriptions: SubscriptionManager) -> Self {
pub fn new(client: Arc<Client>, subscriptions: Subscriptions) -> Self {
Self {
client,
subscriptions,
Expand All @@ -56,7 +56,7 @@ impl<Block, Client> ChainBackend<Client, Block> for FullChain<Block, Client> whe
&self.client
}

fn subscriptions(&self) -> &SubscriptionManager {
fn subscriptions(&self) -> &Subscriptions {
&self.subscriptions
}

Expand Down
Loading

0 comments on commit fc876cf

Please sign in to comment.