From bed82c8fde4d6889c50cbe77a97c9dd551e1dd5b Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 11 Jul 2022 13:06:19 +0200 Subject: [PATCH 01/17] Extract json-rpc-client and introduce worker --- client/relay-chain-rpc-interface/src/lib.rs | 275 +---------- .../src/rpc_client.rs | 428 ++++++++++++++++++ 2 files changed, 435 insertions(+), 268 deletions(-) create mode 100644 client/relay-chain-rpc-interface/src/rpc_client.rs diff --git a/client/relay-chain-rpc-interface/src/lib.rs b/client/relay-chain-rpc-interface/src/lib.rs index 9f84e437699..a4041191834 100644 --- a/client/relay-chain-rpc-interface/src/lib.rs +++ b/client/relay-chain-rpc-interface/src/lib.rs @@ -46,239 +46,12 @@ use sp_storage::StorageKey; use std::{pin::Pin, sync::Arc}; pub use url::Url; +mod rpc_client; +pub use rpc_client::RelayChainRPCClient; const LOG_TARGET: &str = "relay-chain-rpc-interface"; const TIMEOUT_IN_SECONDS: u64 = 6; -/// Client that maps RPC methods and deserializes results -#[derive(Clone)] -struct RelayChainRPCClient { - /// Websocket client to make calls - ws_client: Arc, - - /// Retry strategy that should be used for requests and subscriptions - retry_strategy: ExponentialBackoff, -} - -impl RelayChainRPCClient { - pub async fn new(url: Url) -> RelayChainResult { - tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client"); - let ws_client = WsClientBuilder::default().build(url.as_str()).await?; - - Ok(RelayChainRPCClient { - ws_client: Arc::new(ws_client), - retry_strategy: ExponentialBackoff::default(), - }) - } - - /// Call a call to `state_call` rpc method. - async fn call_remote_runtime_function( - &self, - method_name: &str, - hash: PHash, - payload: Option, - ) -> RelayChainResult { - let payload_bytes = - payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode())); - let params = rpc_params! { - method_name, - payload_bytes, - hash - }; - let res = self - .request_tracing::("state_call", params, |err| { - tracing::trace!( - target: LOG_TARGET, - %method_name, - %hash, - error = %err, - "Error during call to 'state_call'.", - ); - }) - .await?; - Decode::decode(&mut &*res.0).map_err(Into::into) - } - - /// Subscribe to a notification stream via RPC - async fn subscribe<'a, R>( - &self, - sub_name: &'a str, - unsub_name: &'a str, - params: Option>, - ) -> RelayChainResult> - where - R: DeserializeOwned, - { - self.ws_client - .subscribe::(sub_name, params, unsub_name) - .await - .map_err(|err| RelayChainError::RPCCallError(sub_name.to_string(), err)) - } - - /// Perform RPC request - async fn request<'a, R>( - &self, - method: &'a str, - params: Option>, - ) -> Result - where - R: DeserializeOwned + std::fmt::Debug, - { - self.request_tracing( - method, - params, - |e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"), - ) - .await - } - - /// Perform RPC request - async fn request_tracing<'a, R, OR>( - &self, - method: &'a str, - params: Option>, - trace_error: OR, - ) -> Result - where - R: DeserializeOwned + std::fmt::Debug, - OR: Fn(&jsonrpsee::core::Error), - { - retry_notify( - self.retry_strategy.clone(), - || async { - self.ws_client.request(method, params.clone()).await.map_err(|err| match err { - JsonRpseeError::Transport(_) => - backoff::Error::Transient { err, retry_after: None }, - _ => backoff::Error::Permanent(err), - }) - }, - |error, dur| tracing::trace!(target: LOG_TARGET, %error, ?dur, "Encountered transport error, retrying."), - ) - .await - .map_err(|err| { - trace_error(&err); - RelayChainError::RPCCallError(method.to_string(), err)}) - } - - async fn system_health(&self) -> Result { - self.request("system_health", None).await - } - - async fn state_get_read_proof( - &self, - storage_keys: Vec, - at: Option, - ) -> Result, RelayChainError> { - let params = rpc_params!(storage_keys, at); - self.request("state_getReadProof", params).await - } - - async fn state_get_storage( - &self, - storage_key: StorageKey, - at: Option, - ) -> Result, RelayChainError> { - let params = rpc_params!(storage_key, at); - self.request("state_getStorage", params).await - } - - async fn chain_get_head(&self) -> Result { - self.request("chain_getHead", None).await - } - - async fn chain_get_header( - &self, - hash: Option, - ) -> Result, RelayChainError> { - let params = rpc_params!(hash); - self.request("chain_getHeader", params).await - } - - async fn parachain_host_candidate_pending_availability( - &self, - at: PHash, - para_id: ParaId, - ) -> Result, RelayChainError> { - self.call_remote_runtime_function( - "ParachainHost_candidate_pending_availability", - at, - Some(para_id), - ) - .await - } - - async fn parachain_host_session_index_for_child( - &self, - at: PHash, - ) -> Result { - self.call_remote_runtime_function("ParachainHost_session_index_for_child", at, None::<()>) - .await - } - - async fn parachain_host_validators( - &self, - at: PHash, - ) -> Result, RelayChainError> { - self.call_remote_runtime_function("ParachainHost_validators", at, None::<()>) - .await - } - - async fn parachain_host_persisted_validation_data( - &self, - at: PHash, - para_id: ParaId, - occupied_core_assumption: OccupiedCoreAssumption, - ) -> Result, RelayChainError> { - self.call_remote_runtime_function( - "ParachainHost_persisted_validation_data", - at, - Some((para_id, occupied_core_assumption)), - ) - .await - } - - async fn parachain_host_inbound_hrmp_channels_contents( - &self, - para_id: ParaId, - at: PHash, - ) -> Result>, RelayChainError> { - self.call_remote_runtime_function( - "ParachainHost_inbound_hrmp_channels_contents", - at, - Some(para_id), - ) - .await - } - - async fn parachain_host_dmq_contents( - &self, - para_id: ParaId, - at: PHash, - ) -> Result, RelayChainError> { - self.call_remote_runtime_function("ParachainHost_dmq_contents", at, Some(para_id)) - .await - } - - async fn subscribe_all_heads(&self) -> Result, RelayChainError> { - self.subscribe::("chain_subscribeAllHeads", "chain_unsubscribeAllHeads", None) - .await - } - - async fn subscribe_new_best_heads(&self) -> Result, RelayChainError> { - self.subscribe::("chain_subscribeNewHeads", "chain_unsubscribeNewHeads", None) - .await - } - - async fn subscribe_finalized_heads(&self) -> Result, RelayChainError> { - self.subscribe::( - "chain_subscribeFinalizedHeads", - "chain_unsubscribeFinalizedHeads", - None, - ) - .await - } -} - /// RelayChainRPCInterface is used to interact with a full node that is running locally /// in the same process. #[derive(Clone)] @@ -344,17 +117,7 @@ impl RelayChainInterface for RelayChainRPCInterface { async fn import_notification_stream( &self, ) -> RelayChainResult + Send>>> { - let imported_headers_stream = - self.rpc_client.subscribe_all_heads().await?.filter_map(|item| async move { - item.map_err(|err| { - tracing::error!( - target: LOG_TARGET, - "Encountered error in import notification stream: {}", - err - ) - }) - .ok() - }); + let imported_headers_stream = self.rpc_client.get_imported_heads_stream().await?; Ok(imported_headers_stream.boxed()) } @@ -362,20 +125,7 @@ impl RelayChainInterface for RelayChainRPCInterface { async fn finality_notification_stream( &self, ) -> RelayChainResult + Send>>> { - let imported_headers_stream = self - .rpc_client - .subscribe_finalized_heads() - .await? - .filter_map(|item| async move { - item.map_err(|err| { - tracing::error!( - target: LOG_TARGET, - "Encountered error in finality notification stream: {}", - err - ) - }) - .ok() - }); + let imported_headers_stream = self.rpc_client.get_finalized_heads_stream().await?; Ok(imported_headers_stream.boxed()) } @@ -430,7 +180,7 @@ impl RelayChainInterface for RelayChainRPCInterface { /// 3. Wait for the block to be imported via subscription. /// 4. If timeout is reached, we return an error. async fn wait_for_block(&self, wait_for_hash: PHash) -> RelayChainResult<()> { - let mut head_stream = self.rpc_client.subscribe_all_heads().await?; + let mut head_stream = self.rpc_client.get_imported_heads_stream().await?; if self.rpc_client.chain_get_header(Some(wait_for_hash)).await?.is_some() { return Ok(()) @@ -442,7 +192,7 @@ impl RelayChainInterface for RelayChainRPCInterface { futures::select! { _ = timeout => return Err(RelayChainError::WaitTimeout(wait_for_hash)), evt = head_stream.next().fuse() => match evt { - Some(Ok(evt)) if evt.hash() == wait_for_hash => return Ok(()), + Some(evt) if evt.hash() == wait_for_hash => return Ok(()), // Not the event we waited on. Some(_) => continue, None => return Err(RelayChainError::ImportListenerClosed(wait_for_hash)), @@ -454,18 +204,7 @@ impl RelayChainInterface for RelayChainRPCInterface { async fn new_best_notification_stream( &self, ) -> RelayChainResult + Send>>> { - let imported_headers_stream = - self.rpc_client.subscribe_new_best_heads().await?.filter_map(|item| async move { - item.map_err(|err| { - tracing::error!( - target: LOG_TARGET, - "Error in best block notification stream: {}", - err - ) - }) - .ok() - }); - + let imported_headers_stream = self.rpc_client.get_best_heads_stream().await?; Ok(imported_headers_stream.boxed()) } } diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs new file mode 100644 index 00000000000..5dc107af0d4 --- /dev/null +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -0,0 +1,428 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use async_trait::async_trait; +use backoff::{future::retry_notify, ExponentialBackoff}; +use core::time::Duration; +use cumulus_primitives_core::{ + relay_chain::{ + v2::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId}, + Hash as PHash, Header as PHeader, InboundHrmpMessage, + }, + InboundDownwardMessage, ParaId, PersistedValidationData, +}; +use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; +use futures::{ + channel::mpsc::{UnboundedReceiver, UnboundedSender}, + FutureExt, Stream, StreamExt, +}; +use jsonrpsee::{ + core::{ + client::{Client as JsonRPCClient, ClientT, Subscription, SubscriptionClientT}, + Error as JsonRpseeError, + }, + rpc_params, + types::ParamsSer, + ws_client::WsClientBuilder, +}; +use parity_scale_codec::{Decode, Encode}; +use polkadot_service::Handle; +use sc_client_api::{StorageData, StorageProof}; +use sc_rpc_api::{state::ReadProof, system::Health}; +use sp_core::sp_std::collections::btree_map::BTreeMap; +use sp_runtime::DeserializeOwned; +use sp_state_machine::StorageValue; +use sp_storage::StorageKey; +use std::{pin::Pin, sync::Arc}; + +pub use url::Url; + +const LOG_TARGET: &str = "relay-chain-rpc-interface"; +const TIMEOUT_IN_SECONDS: u64 = 6; + +/// Client that maps RPC methods and deserializes results +#[derive(Clone)] +pub struct RelayChainRPCClient { + /// Websocket client to make calls + ws_client: Arc, + + /// Retry strategy that should be used for requests and subscriptions + retry_strategy: ExponentialBackoff, + + to_worker_channel: Option>, +} + +#[derive(Clone, Debug)] +pub enum RPCMessages { + NewHeadListener(UnboundedSender), + NewImportListener(UnboundedSender), + NewFinalizedListener(UnboundedSender), +} + +pub struct RPCWorker { + client_receiver: UnboundedReceiver, + import_listener: Vec>, + finalized_listener: Vec>, + head_listener: Vec>, + import_sub: Subscription, + finalized_sub: Subscription, + best_sub: Subscription, +} + +pub async fn create_worker_client(url: Url) -> RelayChainResult<(RPCWorker, RelayChainRPCClient)> { + let (mut client, import, head, finalized) = RelayChainRPCClient::new_with_streams(url).await?; + let (worker, sender) = RPCWorker::new(import, head, finalized); + client.set_worker_channel(sender); + Ok((worker, client)) +} + +impl RPCWorker { + pub fn new( + import_sub: Subscription, + best_sub: Subscription, + finalized_sub: Subscription, + ) -> (RPCWorker, UnboundedSender) { + let (tx, rx) = futures::channel::mpsc::unbounded(); + let worker = RPCWorker { + client_receiver: rx, + import_listener: Vec::new(), + finalized_listener: Vec::new(), + head_listener: Vec::new(), + import_sub, + best_sub, + finalized_sub, + }; + (worker, tx) + } + + pub async fn run(&mut self) -> ! { + loop { + futures::select! { + evt = self.client_receiver.next().fuse() => match evt { + Some(RPCMessages::NewHeadListener(tx)) => { self.head_listener.push(tx) }, + Some(RPCMessages::NewImportListener(tx)) => { self.import_listener.push(tx) }, + Some(RPCMessages::NewFinalizedListener(tx)) => {self.finalized_listener.push(tx) }, + None => {} + }, + import_evt = self.import_sub.next().fuse() => { + match import_evt { + Some(Ok(header)) => self.import_listener.iter().for_each(|e| { + tracing::debug!(target:LOG_TARGET, ?header, "Sending header to import listener."); + e.unbounded_send(header.clone()); + }), + None => todo!(), + _ => todo!(), + }; + }, + header_evt = self.best_sub.next().fuse() => { + match header_evt { + Some(Ok(header)) => self.head_listener.iter().for_each(|e| { + tracing::debug!(target:LOG_TARGET, ?header, "Sending header to best head listener."); + e.unbounded_send(header.clone()); + }), + None => todo!(), + _ => todo!(), + }; + }, + finalized_evt = self.finalized_sub.next().fuse() => { + match finalized_evt { + Some(Ok(header)) => self.finalized_listener.iter().for_each(|e| { + tracing::debug!(target:LOG_TARGET, ?header, "Sending header to finalized head listener."); + e.unbounded_send(header.clone()); + }), + None => todo!(), + _ => todo!(), + }; + }, + } + } + } +} + +impl RelayChainRPCClient { + fn set_worker_channel(&mut self, sender: UnboundedSender) { + self.to_worker_channel = Some(sender); + } + + pub async fn new_with_streams( + url: Url, + ) -> RelayChainResult<(Self, Subscription, Subscription, Subscription)> + { + tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client"); + let ws_client = WsClientBuilder::default().build(url.as_str()).await?; + + let client = RelayChainRPCClient { + to_worker_channel: None, + ws_client: Arc::new(ws_client), + retry_strategy: ExponentialBackoff::default(), + }; + let head_stream = client.subscribe_new_best_heads().await?; + let finalized_stream = client.subscribe_finalized_heads().await?; + let imported_stream = client.subscribe_imported_heads().await?; + Ok((client, imported_stream, head_stream, finalized_stream)) + } + + pub async fn new(url: Url) -> RelayChainResult { + tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client"); + let ws_client = WsClientBuilder::default().build(url.as_str()).await?; + + Ok(RelayChainRPCClient { + to_worker_channel: None, + ws_client: Arc::new(ws_client), + retry_strategy: ExponentialBackoff::default(), + }) + } + + /// Call a call to `state_call` rpc method. + pub async fn call_remote_runtime_function( + &self, + method_name: &str, + hash: PHash, + payload: Option, + ) -> RelayChainResult { + let payload_bytes = + payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode())); + let params = rpc_params! { + method_name, + payload_bytes, + hash + }; + let res = self + .request_tracing::("state_call", params, |err| { + tracing::trace!( + target: LOG_TARGET, + %method_name, + %hash, + error = %err, + "Error during call to 'state_call'.", + ); + }) + .await?; + Decode::decode(&mut &*res.0).map_err(Into::into) + } + + /// Subscribe to a notification stream via RPC + pub async fn subscribe<'a, R>( + &self, + sub_name: &'a str, + unsub_name: &'a str, + params: Option>, + ) -> RelayChainResult> + where + R: DeserializeOwned, + { + self.ws_client + .subscribe::(sub_name, params, unsub_name) + .await + .map_err(|err| RelayChainError::RPCCallError(sub_name.to_string(), err)) + } + + /// Perform RPC request + async fn request<'a, R>( + &self, + method: &'a str, + params: Option>, + ) -> Result + where + R: DeserializeOwned + std::fmt::Debug, + { + self.request_tracing( + method, + params, + |e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"), + ) + .await + } + + /// Perform RPC request + async fn request_tracing<'a, R, OR>( + &self, + method: &'a str, + params: Option>, + trace_error: OR, + ) -> Result + where + R: DeserializeOwned + std::fmt::Debug, + OR: Fn(&jsonrpsee::core::Error), + { + retry_notify( + self.retry_strategy.clone(), + || async { + self.ws_client.request(method, params.clone()).await.map_err(|err| match err { + JsonRpseeError::Transport(_) => + backoff::Error::Transient { err, retry_after: None }, + _ => backoff::Error::Permanent(err), + }) + }, + |error, dur| tracing::trace!(target: LOG_TARGET, %error, ?dur, "Encountered transport error, retrying."), + ) + .await + .map_err(|err| { + trace_error(&err); + RelayChainError::RPCCallError(method.to_string(), err)}) + } + + pub async fn system_health(&self) -> Result { + self.request("system_health", None).await + } + + pub async fn state_get_read_proof( + &self, + storage_keys: Vec, + at: Option, + ) -> Result, RelayChainError> { + let params = rpc_params!(storage_keys, at); + self.request("state_getReadProof", params).await + } + + pub async fn state_get_storage( + &self, + storage_key: StorageKey, + at: Option, + ) -> Result, RelayChainError> { + let params = rpc_params!(storage_key, at); + self.request("state_getStorage", params).await + } + + pub async fn chain_get_head(&self) -> Result { + self.request("chain_getHead", None).await + } + + pub async fn chain_get_header( + &self, + hash: Option, + ) -> Result, RelayChainError> { + let params = rpc_params!(hash); + self.request("chain_getHeader", params).await + } + + pub async fn parachain_host_candidate_pending_availability( + &self, + at: PHash, + para_id: ParaId, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_candidate_pending_availability", + at, + Some(para_id), + ) + .await + } + + pub async fn parachain_host_session_index_for_child( + &self, + at: PHash, + ) -> Result { + self.call_remote_runtime_function("ParachainHost_session_index_for_child", at, None::<()>) + .await + } + + pub async fn parachain_host_validators( + &self, + at: PHash, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_validators", at, None::<()>) + .await + } + + pub async fn parachain_host_persisted_validation_data( + &self, + at: PHash, + para_id: ParaId, + occupied_core_assumption: OccupiedCoreAssumption, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_persisted_validation_data", + at, + Some((para_id, occupied_core_assumption)), + ) + .await + } + + pub async fn parachain_host_inbound_hrmp_channels_contents( + &self, + para_id: ParaId, + at: PHash, + ) -> Result>, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_inbound_hrmp_channels_contents", + at, + Some(para_id), + ) + .await + } + + pub async fn parachain_host_dmq_contents( + &self, + para_id: ParaId, + at: PHash, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_dmq_contents", at, Some(para_id)) + .await + } + + pub async fn get_imported_heads_stream( + &self, + ) -> Result, RelayChainError> { + let (tx, rx) = futures::channel::mpsc::unbounded::(); + if let Some(channel) = &self.to_worker_channel { + tracing::debug!(target: LOG_TARGET, "Registering 'NewImportListener'"); + channel.unbounded_send(RPCMessages::NewImportListener(tx)); + } + Ok(rx) + } + + pub async fn get_best_heads_stream( + &self, + ) -> Result, RelayChainError> { + let (tx, rx) = futures::channel::mpsc::unbounded::(); + if let Some(channel) = &self.to_worker_channel { + tracing::debug!(target: LOG_TARGET, "Registering 'NewHeadListener'"); + channel.unbounded_send(RPCMessages::NewHeadListener(tx)); + } + Ok(rx) + } + + pub async fn get_finalized_heads_stream( + &self, + ) -> Result, RelayChainError> { + let (tx, rx) = futures::channel::mpsc::unbounded::(); + if let Some(channel) = &self.to_worker_channel { + tracing::debug!(target: LOG_TARGET, "Registering 'NewFinalizedListener'"); + channel.unbounded_send(RPCMessages::NewFinalizedListener(tx)); + } + Ok(rx) + } + + async fn subscribe_imported_heads(&self) -> Result, RelayChainError> { + self.subscribe::("chain_subscribeAllHeads", "chain_unsubscribeAllHeads", None) + .await + } + + async fn subscribe_finalized_heads(&self) -> Result, RelayChainError> { + self.subscribe::( + "chain_subscribeFinalizedHeads", + "chain_unsubscribeFinalizedHeads", + None, + ) + .await + } + + async fn subscribe_new_best_heads(&self) -> Result, RelayChainError> { + self.subscribe::("chain_subscribeNewHeads", "chain_unsubscribeNewHeads", None) + .await + } +} From 83e1c26852bd67b6e21768bc723ec46918464dcf Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 19 Jul 2022 14:32:52 +0200 Subject: [PATCH 02/17] Initial rpc worker --- Cargo.lock | 1 + client/relay-chain-rpc-interface/Cargo.toml | 1 + client/relay-chain-rpc-interface/src/lib.rs | 15 +++++++-------- .../relay-chain-rpc-interface/src/rpc_client.rs | 8 ++++---- parachain-template/node/src/service.rs | 8 ++++++-- polkadot-parachain/src/service.rs | 8 ++++++-- test/service/src/lib.rs | 4 +++- 7 files changed, 28 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 950cca9602f..9e0509acd84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2091,6 +2091,7 @@ dependencies = [ "polkadot-service", "sc-client-api", "sc-rpc-api", + "sc-service", "sp-api", "sp-core", "sp-runtime", diff --git a/client/relay-chain-rpc-interface/Cargo.toml b/client/relay-chain-rpc-interface/Cargo.toml index d7b6009dc97..da2e03fc197 100644 --- a/client/relay-chain-rpc-interface/Cargo.toml +++ b/client/relay-chain-rpc-interface/Cargo.toml @@ -16,6 +16,7 @@ sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-rpc-api = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/client/relay-chain-rpc-interface/src/lib.rs b/client/relay-chain-rpc-interface/src/lib.rs index a4041191834..ff5f3ddbb08 100644 --- a/client/relay-chain-rpc-interface/src/lib.rs +++ b/client/relay-chain-rpc-interface/src/lib.rs @@ -15,7 +15,6 @@ // along with Cumulus. If not, see . use async_trait::async_trait; -use backoff::{future::retry_notify, ExponentialBackoff}; use core::time::Duration; use cumulus_primitives_core::{ relay_chain::{ @@ -35,15 +34,13 @@ use jsonrpsee::{ types::ParamsSer, ws_client::WsClientBuilder, }; -use parity_scale_codec::{Decode, Encode}; use polkadot_service::Handle; -use sc_client_api::{StorageData, StorageProof}; -use sc_rpc_api::{state::ReadProof, system::Health}; +use sc_client_api::StorageProof; +use sc_service::SpawnTaskHandle; use sp_core::sp_std::collections::btree_map::BTreeMap; -use sp_runtime::DeserializeOwned; use sp_state_machine::StorageValue; use sp_storage::StorageKey; -use std::{pin::Pin, sync::Arc}; +use std::pin::Pin; pub use url::Url; mod rpc_client; @@ -60,8 +57,10 @@ pub struct RelayChainRPCInterface { } impl RelayChainRPCInterface { - pub async fn new(url: Url) -> RelayChainResult { - Ok(Self { rpc_client: RelayChainRPCClient::new(url).await? }) + pub async fn new(url: Url, spawn_handle: SpawnTaskHandle) -> RelayChainResult { + let (worker, client) = rpc_client::create_worker_client(url).await?; + spawn_handle.spawn("relay-chain-rpc-worker", None, worker.run()); + Ok(Self { rpc_client: client }) } } diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index 5dc107af0d4..27462351868 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -108,7 +108,7 @@ impl RPCWorker { (worker, tx) } - pub async fn run(&mut self) -> ! { + pub async fn run(mut self) { loop { futures::select! { evt = self.client_receiver.next().fuse() => match evt { @@ -120,7 +120,7 @@ impl RPCWorker { import_evt = self.import_sub.next().fuse() => { match import_evt { Some(Ok(header)) => self.import_listener.iter().for_each(|e| { - tracing::debug!(target:LOG_TARGET, ?header, "Sending header to import listener."); + tracing::info!(target:LOG_TARGET, ?header, "Sending header to import listener."); e.unbounded_send(header.clone()); }), None => todo!(), @@ -130,7 +130,7 @@ impl RPCWorker { header_evt = self.best_sub.next().fuse() => { match header_evt { Some(Ok(header)) => self.head_listener.iter().for_each(|e| { - tracing::debug!(target:LOG_TARGET, ?header, "Sending header to best head listener."); + tracing::info!(target:LOG_TARGET, ?header, "Sending header to best head listener."); e.unbounded_send(header.clone()); }), None => todo!(), @@ -140,7 +140,7 @@ impl RPCWorker { finalized_evt = self.finalized_sub.next().fuse() => { match finalized_evt { Some(Ok(header)) => self.finalized_listener.iter().for_each(|e| { - tracing::debug!(target:LOG_TARGET, ?header, "Sending header to finalized head listener."); + tracing::info!(target:LOG_TARGET, ?header, "Sending header to finalized head listener2."); e.unbounded_send(header.clone()); }), None => todo!(), diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index aec1ce8e8b9..3c9fe9c8edc 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -176,8 +176,12 @@ async fn build_relay_chain_interface( hwbench: Option, ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { match collator_options.relay_chain_rpc_url { - Some(relay_chain_url) => - Ok((Arc::new(RelayChainRPCInterface::new(relay_chain_url).await?) as Arc<_>, None)), + Some(relay_chain_url) => Ok(( + Arc::new( + RelayChainRPCInterface::new(relay_chain_url, task_manager.spawn_handle()).await?, + ) as Arc<_>, + None, + )), None => build_inprocess_relay_chain( polkadot_config, parachain_config, diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index a694a9b1137..d5438b3912c 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -296,8 +296,12 @@ async fn build_relay_chain_interface( hwbench: Option, ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { match collator_options.relay_chain_rpc_url { - Some(relay_chain_url) => - Ok((Arc::new(RelayChainRPCInterface::new(relay_chain_url).await?) as Arc<_>, None)), + Some(relay_chain_url) => Ok(( + Arc::new( + RelayChainRPCInterface::new(relay_chain_url, task_manager.spawn_handle()).await?, + ) as Arc<_>, + None, + )), None => build_inprocess_relay_chain( polkadot_config, parachain_config, diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index cf87fd826af..767f23a6aa0 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -182,7 +182,9 @@ async fn build_relay_chain_interface( task_manager: &mut TaskManager, ) -> RelayChainResult> { if let Some(relay_chain_url) = collator_options.relay_chain_rpc_url { - return Ok(Arc::new(RelayChainRPCInterface::new(relay_chain_url).await?) as Arc<_>) + return Ok(Arc::new( + RelayChainRPCInterface::new(relay_chain_url, task_manager.spawn_handle()).await?, + ) as Arc<_>) } let relay_chain_full_node = polkadot_test_service::new_full( From 09c461d2b72af787cebb7d56b01aa45fb89ad316 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 20 Jul 2022 10:51:28 +0200 Subject: [PATCH 03/17] Add error handling --- client/relay-chain-interface/src/lib.rs | 2 + client/relay-chain-rpc-interface/src/lib.rs | 19 +---- .../src/rpc_client.rs | 72 +++++++++++-------- parachain-template/node/src/service.rs | 17 +++-- polkadot-parachain/src/service.rs | 17 +++-- test/service/src/lib.rs | 10 +-- 6 files changed, 75 insertions(+), 62 deletions(-) diff --git a/client/relay-chain-interface/src/lib.rs b/client/relay-chain-interface/src/lib.rs index 119721adec1..0040ccf92d6 100644 --- a/client/relay-chain-interface/src/lib.rs +++ b/client/relay-chain-interface/src/lib.rs @@ -54,6 +54,8 @@ pub enum RelayChainError { RPCCallError(String, JsonRPSeeError), #[error("RPC Error: '{0}'")] JsonRPCError(#[from] JsonRPSeeError), + #[error("Unable to reach worker: {0}")] + WorkerCommunicationError(String), #[error("Scale codec deserialization error: {0}")] DeserializationError(CodecError), #[error("Scale codec deserialization error: {0}")] diff --git a/client/relay-chain-rpc-interface/src/lib.rs b/client/relay-chain-rpc-interface/src/lib.rs index ff5f3ddbb08..7ea29b24497 100644 --- a/client/relay-chain-rpc-interface/src/lib.rs +++ b/client/relay-chain-rpc-interface/src/lib.rs @@ -25,18 +25,8 @@ use cumulus_primitives_core::{ }; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; use futures::{FutureExt, Stream, StreamExt}; -use jsonrpsee::{ - core::{ - client::{Client as JsonRPCClient, ClientT, Subscription, SubscriptionClientT}, - Error as JsonRpseeError, - }, - rpc_params, - types::ParamsSer, - ws_client::WsClientBuilder, -}; use polkadot_service::Handle; use sc_client_api::StorageProof; -use sc_service::SpawnTaskHandle; use sp_core::sp_std::collections::btree_map::BTreeMap; use sp_state_machine::StorageValue; use sp_storage::StorageKey; @@ -44,9 +34,8 @@ use std::pin::Pin; pub use url::Url; mod rpc_client; -pub use rpc_client::RelayChainRPCClient; +pub use rpc_client::{create_worker_client, RelayChainRPCClient}; -const LOG_TARGET: &str = "relay-chain-rpc-interface"; const TIMEOUT_IN_SECONDS: u64 = 6; /// RelayChainRPCInterface is used to interact with a full node that is running locally @@ -57,10 +46,8 @@ pub struct RelayChainRPCInterface { } impl RelayChainRPCInterface { - pub async fn new(url: Url, spawn_handle: SpawnTaskHandle) -> RelayChainResult { - let (worker, client) = rpc_client::create_worker_client(url).await?; - spawn_handle.spawn("relay-chain-rpc-worker", None, worker.run()); - Ok(Self { rpc_client: client }) + pub fn new(rpc_client: RelayChainRPCClient) -> Self { + Self { rpc_client } } } diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index 27462351868..c2728d29e15 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -14,9 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use async_trait::async_trait; use backoff::{future::retry_notify, ExponentialBackoff}; -use core::time::Duration; use cumulus_primitives_core::{ relay_chain::{ v2::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId}, @@ -24,10 +22,10 @@ use cumulus_primitives_core::{ }, InboundDownwardMessage, ParaId, PersistedValidationData, }; -use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; +use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; use futures::{ channel::mpsc::{UnboundedReceiver, UnboundedSender}, - FutureExt, Stream, StreamExt, + FutureExt, StreamExt, }; use jsonrpsee::{ core::{ @@ -39,19 +37,16 @@ use jsonrpsee::{ ws_client::WsClientBuilder, }; use parity_scale_codec::{Decode, Encode}; -use polkadot_service::Handle; -use sc_client_api::{StorageData, StorageProof}; +use sc_client_api::StorageData; use sc_rpc_api::{state::ReadProof, system::Health}; use sp_core::sp_std::collections::btree_map::BTreeMap; use sp_runtime::DeserializeOwned; -use sp_state_machine::StorageValue; use sp_storage::StorageKey; -use std::{pin::Pin, sync::Arc}; +use std::sync::Arc; pub use url::Url; -const LOG_TARGET: &str = "relay-chain-rpc-interface"; -const TIMEOUT_IN_SECONDS: u64 = 6; +const LOG_TARGET: &str = "relay-chain-rpc-client"; /// Client that maps RPC methods and deserializes results #[derive(Clone)] @@ -120,31 +115,46 @@ impl RPCWorker { import_evt = self.import_sub.next().fuse() => { match import_evt { Some(Ok(header)) => self.import_listener.iter().for_each(|e| { - tracing::info!(target:LOG_TARGET, ?header, "Sending header to import listener."); - e.unbounded_send(header.clone()); + tracing::info!(target:LOG_TARGET, header = header.number, "Sending header to import listener."); + if let Err(err) = e.unbounded_send(header.clone()) { + tracing::error!(target:LOG_TARGET, ?err, "Unable to send to import stream."); + return; + }; }), - None => todo!(), - _ => todo!(), + _ => { + tracing::error!(target:LOG_TARGET, "Received some non-ok value from import stream."); + return; + } }; }, header_evt = self.best_sub.next().fuse() => { match header_evt { Some(Ok(header)) => self.head_listener.iter().for_each(|e| { - tracing::info!(target:LOG_TARGET, ?header, "Sending header to best head listener."); - e.unbounded_send(header.clone()); + tracing::info!(target:LOG_TARGET, header = header.number, "Sending header to best head listener."); + if let Err(err) = e.unbounded_send(header.clone()) { + tracing::error!(target:LOG_TARGET, ?err, "Unable to send to best-block stream."); + return; + }; }), - None => todo!(), - _ => todo!(), + _ => { + tracing::error!(target:LOG_TARGET, "Received some non-ok value from best-block stream."); + return; + } }; }, finalized_evt = self.finalized_sub.next().fuse() => { match finalized_evt { Some(Ok(header)) => self.finalized_listener.iter().for_each(|e| { - tracing::info!(target:LOG_TARGET, ?header, "Sending header to finalized head listener2."); - e.unbounded_send(header.clone()); + tracing::info!(target:LOG_TARGET, header = header.number, "Sending header to finalized head listener."); + if let Err(err) = e.unbounded_send(header.clone()) { + tracing::error!(target:LOG_TARGET, ?err, "Unable to send to best-block stream."); + return; + }; }), - None => todo!(), - _ => todo!(), + _ => { + tracing::error!(target:LOG_TARGET, "Received some non-ok value from finalized stream."); + return; + } }; }, } @@ -379,8 +389,10 @@ impl RelayChainRPCClient { ) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::unbounded::(); if let Some(channel) = &self.to_worker_channel { - tracing::debug!(target: LOG_TARGET, "Registering 'NewImportListener'"); - channel.unbounded_send(RPCMessages::NewImportListener(tx)); + tracing::info!(target: LOG_TARGET, "Registering 'NewImportListener'"); + channel + .unbounded_send(RPCMessages::NewImportListener(tx)) + .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; } Ok(rx) } @@ -390,8 +402,10 @@ impl RelayChainRPCClient { ) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::unbounded::(); if let Some(channel) = &self.to_worker_channel { - tracing::debug!(target: LOG_TARGET, "Registering 'NewHeadListener'"); - channel.unbounded_send(RPCMessages::NewHeadListener(tx)); + tracing::info!(target: LOG_TARGET, "Registering 'NewHeadListener'"); + channel + .unbounded_send(RPCMessages::NewHeadListener(tx)) + .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; } Ok(rx) } @@ -401,8 +415,10 @@ impl RelayChainRPCClient { ) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::unbounded::(); if let Some(channel) = &self.to_worker_channel { - tracing::debug!(target: LOG_TARGET, "Registering 'NewFinalizedListener'"); - channel.unbounded_send(RPCMessages::NewFinalizedListener(tx)); + tracing::info!(target: LOG_TARGET, "Registering 'NewFinalizedListener'"); + channel + .unbounded_send(RPCMessages::NewFinalizedListener(tx)) + .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; } Ok(rx) } diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index 3c9fe9c8edc..ba453b81187 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -22,7 +22,7 @@ use cumulus_client_service::{ use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::RelayChainRPCInterface; +use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRPCInterface}; // Substrate Imports use sc_client_api::ExecutorProvider; @@ -176,12 +176,15 @@ async fn build_relay_chain_interface( hwbench: Option, ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { match collator_options.relay_chain_rpc_url { - Some(relay_chain_url) => Ok(( - Arc::new( - RelayChainRPCInterface::new(relay_chain_url, task_manager.spawn_handle()).await?, - ) as Arc<_>, - None, - )), + Some(relay_chain_url) => { + let (worker, client) = create_worker_client(relay_chain_url).await?; + task_manager.spawn_essential_handle().spawn( + "relay-chain-rpc-worker", + None, + worker.run(), + ); + Ok((Arc::new(RelayChainRPCInterface::new(client)) as Arc<_>, None)) + }, None => build_inprocess_relay_chain( polkadot_config, parachain_config, diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index fd33d581128..25638430824 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -30,7 +30,7 @@ use cumulus_primitives_core::{ }; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::RelayChainRPCInterface; +use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRPCInterface}; use polkadot_service::CollatorPair; use sp_core::Pair; @@ -296,12 +296,15 @@ async fn build_relay_chain_interface( hwbench: Option, ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { match collator_options.relay_chain_rpc_url { - Some(relay_chain_url) => Ok(( - Arc::new( - RelayChainRPCInterface::new(relay_chain_url, task_manager.spawn_handle()).await?, - ) as Arc<_>, - None, - )), + Some(relay_chain_url) => { + let (worker, client) = create_worker_client(relay_chain_url).await?; + task_manager.spawn_essential_handle().spawn( + "relay-chain-rpc-worker", + None, + worker.run(), + ); + Ok((Arc::new(RelayChainRPCInterface::new(client)) as Arc<_>, None)) + }, None => build_inprocess_relay_chain( polkadot_config, parachain_config, diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index eb7222c74c4..ad9f86d126b 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -37,7 +37,7 @@ use cumulus_client_service::{ use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::RelayChainInProcessInterface; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::RelayChainRPCInterface; +use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRPCInterface}; use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi}; use parking_lot::Mutex; @@ -182,9 +182,11 @@ async fn build_relay_chain_interface( task_manager: &mut TaskManager, ) -> RelayChainResult> { if let Some(relay_chain_url) = collator_options.relay_chain_rpc_url { - return Ok(Arc::new( - RelayChainRPCInterface::new(relay_chain_url, task_manager.spawn_handle()).await?, - ) as Arc<_>) + let (worker, client) = create_worker_client(relay_chain_url).await?; + task_manager + .spawn_essential_handle() + .spawn("relay-chain-rpc-worker", None, worker.run()); + return Ok(Arc::new(RelayChainRPCInterface::new(client)) as Arc<_>) } let relay_chain_full_node = polkadot_test_service::new_full( From b0e96bbfa10be6df737cd0d6133e105288fcc4f1 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 20 Jul 2022 13:27:54 +0200 Subject: [PATCH 04/17] Use bounded channels for listeners --- .../src/rpc_client.rs | 112 +++++++++--------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index c2728d29e15..a167d7301e4 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -24,8 +24,8 @@ use cumulus_primitives_core::{ }; use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; use futures::{ - channel::mpsc::{UnboundedReceiver, UnboundedSender}, - FutureExt, StreamExt, + channel::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, + StreamExt, }; use jsonrpsee::{ core::{ @@ -62,16 +62,16 @@ pub struct RelayChainRPCClient { #[derive(Clone, Debug)] pub enum RPCMessages { - NewHeadListener(UnboundedSender), - NewImportListener(UnboundedSender), - NewFinalizedListener(UnboundedSender), + NewHeadListener(Sender), + NewImportListener(Sender), + NewFinalizedListener(Sender), } pub struct RPCWorker { client_receiver: UnboundedReceiver, - import_listener: Vec>, - finalized_listener: Vec>, - head_listener: Vec>, + import_listener: Vec>, + finalized_listener: Vec>, + head_listener: Vec>, import_sub: Subscription, finalized_sub: Subscription, best_sub: Subscription, @@ -104,55 +104,68 @@ impl RPCWorker { } pub async fn run(mut self) { + let mut import_sub = self.import_sub.fuse(); + let mut best_head_sub = self.best_sub.fuse(); + let mut finalized_sub = self.finalized_sub.fuse(); loop { futures::select! { - evt = self.client_receiver.next().fuse() => match evt { - Some(RPCMessages::NewHeadListener(tx)) => { self.head_listener.push(tx) }, - Some(RPCMessages::NewImportListener(tx)) => { self.import_listener.push(tx) }, - Some(RPCMessages::NewFinalizedListener(tx)) => {self.finalized_listener.push(tx) }, + evt = self.client_receiver.next() => match evt { + Some(RPCMessages::NewHeadListener(tx)) => { + tracing::debug!("Registering new head listener, now at {}", self.head_listener.len() + 1); + self.head_listener.push(tx); + }, + Some(RPCMessages::NewImportListener(tx)) => { + tracing::debug!("Registering new import listener, now at {}", self.import_listener.len() + 1); + self.import_listener.push(tx) + }, + Some(RPCMessages::NewFinalizedListener(tx)) => { + tracing::debug!("Registering new finalized listener, now at {}", self.finalized_listener.len() + 1); + self.finalized_listener.push(tx) + }, None => {} }, - import_evt = self.import_sub.next().fuse() => { + import_evt = import_sub.next() => { match import_evt { - Some(Ok(header)) => self.import_listener.iter().for_each(|e| { - tracing::info!(target:LOG_TARGET, header = header.number, "Sending header to import listener."); - if let Err(err) = e.unbounded_send(header.clone()) { - tracing::error!(target:LOG_TARGET, ?err, "Unable to send to import stream."); - return; - }; + Some(Ok(header)) => self.import_listener.retain_mut(|e| { + if let Err(err) = e.try_send(header.clone()) { + false + } else { + true + } }), - _ => { - tracing::error!(target:LOG_TARGET, "Received some non-ok value from import stream."); + err @ _ => { + tracing::error!(target:LOG_TARGET, ?err, "Received some non-ok value from import stream."); return; } }; }, - header_evt = self.best_sub.next().fuse() => { + header_evt = best_head_sub.next() => { match header_evt { - Some(Ok(header)) => self.head_listener.iter().for_each(|e| { - tracing::info!(target:LOG_TARGET, header = header.number, "Sending header to best head listener."); - if let Err(err) = e.unbounded_send(header.clone()) { - tracing::error!(target:LOG_TARGET, ?err, "Unable to send to best-block stream."); - return; - }; + Some(Ok(header)) => self.head_listener.retain_mut(|e| { + if let Err(err) = e.try_send(header.clone()) { + false + } else { + true + } }), - _ => { - tracing::error!(target:LOG_TARGET, "Received some non-ok value from best-block stream."); + err @ _ => { + tracing::error!(target:LOG_TARGET, ?err, "Received some non-ok value from best block stream."); return; } }; }, - finalized_evt = self.finalized_sub.next().fuse() => { + finalized_evt = finalized_sub.next() => { match finalized_evt { - Some(Ok(header)) => self.finalized_listener.iter().for_each(|e| { - tracing::info!(target:LOG_TARGET, header = header.number, "Sending header to finalized head listener."); - if let Err(err) = e.unbounded_send(header.clone()) { - tracing::error!(target:LOG_TARGET, ?err, "Unable to send to best-block stream."); - return; - }; + Some(Ok(header)) => self.finalized_listener.retain_mut(|e| { + if let Err(err) = e.try_send(header.clone()) { + tracing::debug!(target:LOG_TARGET, ?err, "Unable to send to finalized stream, removing listener."); + false + } else { + true + } }), - _ => { - tracing::error!(target:LOG_TARGET, "Received some non-ok value from finalized stream."); + err @ _ => { + tracing::error!(target:LOG_TARGET, ?err, "Received some non-ok value from finalized block stream."); return; } }; @@ -384,12 +397,9 @@ impl RelayChainRPCClient { .await } - pub async fn get_imported_heads_stream( - &self, - ) -> Result, RelayChainError> { - let (tx, rx) = futures::channel::mpsc::unbounded::(); + pub async fn get_imported_heads_stream(&self) -> Result, RelayChainError> { + let (tx, rx) = futures::channel::mpsc::channel::(128); if let Some(channel) = &self.to_worker_channel { - tracing::info!(target: LOG_TARGET, "Registering 'NewImportListener'"); channel .unbounded_send(RPCMessages::NewImportListener(tx)) .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; @@ -397,12 +407,9 @@ impl RelayChainRPCClient { Ok(rx) } - pub async fn get_best_heads_stream( - &self, - ) -> Result, RelayChainError> { - let (tx, rx) = futures::channel::mpsc::unbounded::(); + pub async fn get_best_heads_stream(&self) -> Result, RelayChainError> { + let (tx, rx) = futures::channel::mpsc::channel::(128); if let Some(channel) = &self.to_worker_channel { - tracing::info!(target: LOG_TARGET, "Registering 'NewHeadListener'"); channel .unbounded_send(RPCMessages::NewHeadListener(tx)) .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; @@ -410,12 +417,9 @@ impl RelayChainRPCClient { Ok(rx) } - pub async fn get_finalized_heads_stream( - &self, - ) -> Result, RelayChainError> { - let (tx, rx) = futures::channel::mpsc::unbounded::(); + pub async fn get_finalized_heads_stream(&self) -> Result, RelayChainError> { + let (tx, rx) = futures::channel::mpsc::channel::(128); if let Some(channel) = &self.to_worker_channel { - tracing::info!(target: LOG_TARGET, "Registering 'NewFinalizedListener'"); channel .unbounded_send(RPCMessages::NewFinalizedListener(tx)) .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; From d39832042a638c31e4e812c46d3c724934bea2f2 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 20 Jul 2022 17:34:57 +0200 Subject: [PATCH 05/17] Improve naming and clean up --- .../src/rpc_client.rs | 130 ++++++++---------- 1 file changed, 58 insertions(+), 72 deletions(-) diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index a167d7301e4..c659c6b1d88 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -57,21 +57,21 @@ pub struct RelayChainRPCClient { /// Retry strategy that should be used for requests and subscriptions retry_strategy: ExponentialBackoff, - to_worker_channel: Option>, + to_worker_channel: Option>, } #[derive(Clone, Debug)] -pub enum RPCMessages { - NewHeadListener(Sender), - NewImportListener(Sender), - NewFinalizedListener(Sender), +pub enum NotificationRegisterMessage { + RegisterBestHeadListener(Sender), + RegisterImportListener(Sender), + RegisterFinalizationListener(Sender), } pub struct RPCWorker { - client_receiver: UnboundedReceiver, - import_listener: Vec>, - finalized_listener: Vec>, - head_listener: Vec>, + client_receiver: UnboundedReceiver, + imported_heads_listeners: Vec>, + finalized_heads_listeners: Vec>, + best_heads_listeners: Vec>, import_sub: Subscription, finalized_sub: Subscription, best_sub: Subscription, @@ -84,18 +84,43 @@ pub async fn create_worker_client(url: Url) -> RelayChainResult<(RPCWorker, Rela Ok((worker, client)) } +fn handle_event_distribution( + event: Option>, + senders: &mut Vec>, +) { + match event { + Some(Ok(header)) => senders.retain_mut(|e| { + if let Err(err) = e.try_send(header.clone()) { + tracing::debug!( + target: LOG_TARGET, + ?err, + header = header.number, + "Unable to send, removing Sender from listeners." + ); + false + } else { + true + } + }), + err @ _ => { + tracing::error!(target: LOG_TARGET, ?err, "Received error value from stream."); + return + }, + }; +} + impl RPCWorker { pub fn new( import_sub: Subscription, best_sub: Subscription, finalized_sub: Subscription, - ) -> (RPCWorker, UnboundedSender) { + ) -> (RPCWorker, UnboundedSender) { let (tx, rx) = futures::channel::mpsc::unbounded(); let worker = RPCWorker { client_receiver: rx, - import_listener: Vec::new(), - finalized_listener: Vec::new(), - head_listener: Vec::new(), + imported_heads_listeners: Vec::new(), + finalized_heads_listeners: Vec::new(), + best_heads_listeners: Vec::new(), import_sub, best_sub, finalized_sub, @@ -108,75 +133,36 @@ impl RPCWorker { let mut best_head_sub = self.best_sub.fuse(); let mut finalized_sub = self.finalized_sub.fuse(); loop { + // In this loop we do two things: + // 1. Listen for `NotificationRegisterMessage` and register new listeners for the notification streams + // 2. Distribute incoming import, best head and finalization notifications to registered listeners. + // If an error occurs during sending, the receiver has been closed and we drop the sender too. futures::select! { evt = self.client_receiver.next() => match evt { - Some(RPCMessages::NewHeadListener(tx)) => { - tracing::debug!("Registering new head listener, now at {}", self.head_listener.len() + 1); - self.head_listener.push(tx); + Some(NotificationRegisterMessage::RegisterBestHeadListener(tx)) => { + self.best_heads_listeners.push(tx); }, - Some(RPCMessages::NewImportListener(tx)) => { - tracing::debug!("Registering new import listener, now at {}", self.import_listener.len() + 1); - self.import_listener.push(tx) + Some(NotificationRegisterMessage::RegisterImportListener(tx)) => { + self.imported_heads_listeners.push(tx) }, - Some(RPCMessages::NewFinalizedListener(tx)) => { - tracing::debug!("Registering new finalized listener, now at {}", self.finalized_listener.len() + 1); - self.finalized_listener.push(tx) + Some(NotificationRegisterMessage::RegisterFinalizationListener(tx)) => { + self.finalized_heads_listeners.push(tx) }, None => {} }, - import_evt = import_sub.next() => { - match import_evt { - Some(Ok(header)) => self.import_listener.retain_mut(|e| { - if let Err(err) = e.try_send(header.clone()) { - false - } else { - true - } - }), - err @ _ => { - tracing::error!(target:LOG_TARGET, ?err, "Received some non-ok value from import stream."); - return; - } - }; - }, - header_evt = best_head_sub.next() => { - match header_evt { - Some(Ok(header)) => self.head_listener.retain_mut(|e| { - if let Err(err) = e.try_send(header.clone()) { - false - } else { - true - } - }), - err @ _ => { - tracing::error!(target:LOG_TARGET, ?err, "Received some non-ok value from best block stream."); - return; - } - }; - }, - finalized_evt = finalized_sub.next() => { - match finalized_evt { - Some(Ok(header)) => self.finalized_listener.retain_mut(|e| { - if let Err(err) = e.try_send(header.clone()) { - tracing::debug!(target:LOG_TARGET, ?err, "Unable to send to finalized stream, removing listener."); - false - } else { - true - } - }), - err @ _ => { - tracing::error!(target:LOG_TARGET, ?err, "Received some non-ok value from finalized block stream."); - return; - } - }; - }, + import_evt = import_sub.next() => + handle_event_distribution(import_evt, &mut self.imported_heads_listeners), + header_evt = best_head_sub.next() => + handle_event_distribution(header_evt, &mut self.best_heads_listeners), + finalized_evt = finalized_sub.next() => + handle_event_distribution(finalized_evt, &mut self.finalized_heads_listeners), } } } } impl RelayChainRPCClient { - fn set_worker_channel(&mut self, sender: UnboundedSender) { + fn set_worker_channel(&mut self, sender: UnboundedSender) { self.to_worker_channel = Some(sender); } @@ -401,7 +387,7 @@ impl RelayChainRPCClient { let (tx, rx) = futures::channel::mpsc::channel::(128); if let Some(channel) = &self.to_worker_channel { channel - .unbounded_send(RPCMessages::NewImportListener(tx)) + .unbounded_send(NotificationRegisterMessage::RegisterImportListener(tx)) .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; } Ok(rx) @@ -411,7 +397,7 @@ impl RelayChainRPCClient { let (tx, rx) = futures::channel::mpsc::channel::(128); if let Some(channel) = &self.to_worker_channel { channel - .unbounded_send(RPCMessages::NewHeadListener(tx)) + .unbounded_send(NotificationRegisterMessage::RegisterBestHeadListener(tx)) .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; } Ok(rx) @@ -421,7 +407,7 @@ impl RelayChainRPCClient { let (tx, rx) = futures::channel::mpsc::channel::(128); if let Some(channel) = &self.to_worker_channel { channel - .unbounded_send(RPCMessages::NewFinalizedListener(tx)) + .unbounded_send(NotificationRegisterMessage::RegisterFinalizationListener(tx)) .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; } Ok(rx) From de5d7d15b02b3cd15ee6f54fb58da9753f6e15fe Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 21 Jul 2022 11:57:40 +0200 Subject: [PATCH 06/17] Use tracing channels --- Cargo.lock | 1 + client/relay-chain-rpc-interface/Cargo.toml | 1 + .../src/rpc_client.rs | 68 +++++++++---------- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 95f4f220a69..b193f8d7263 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1951,6 +1951,7 @@ dependencies = [ "sc-client-api", "sc-rpc-api", "sc-service", + "sc-utils", "sp-api", "sp-core", "sp-runtime", diff --git a/client/relay-chain-rpc-interface/Cargo.toml b/client/relay-chain-rpc-interface/Cargo.toml index da2e03fc197..b2892684e1f 100644 --- a/client/relay-chain-rpc-interface/Cargo.toml +++ b/client/relay-chain-rpc-interface/Cargo.toml @@ -17,6 +17,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-rpc-api = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index c659c6b1d88..57c8aec3cb6 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -24,7 +24,7 @@ use cumulus_primitives_core::{ }; use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; use futures::{ - channel::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, + channel::mpsc::{Receiver, Sender}, StreamExt, }; use jsonrpsee::{ @@ -39,6 +39,7 @@ use jsonrpsee::{ use parity_scale_codec::{Decode, Encode}; use sc_client_api::StorageData; use sc_rpc_api::{state::ReadProof, system::Health}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_core::sp_std::collections::btree_map::BTreeMap; use sp_runtime::DeserializeOwned; use sp_storage::StorageKey; @@ -57,7 +58,7 @@ pub struct RelayChainRPCClient { /// Retry strategy that should be used for requests and subscriptions retry_strategy: ExponentialBackoff, - to_worker_channel: Option>, + to_worker_channel: Option>, } #[derive(Clone, Debug)] @@ -67,8 +68,9 @@ pub enum NotificationRegisterMessage { RegisterFinalizationListener(Sender), } -pub struct RPCWorker { - client_receiver: UnboundedReceiver, +/// Worker that should be used in combination with [`RelayChainRPCClient`]. Must be polled to distribute header notifications to listeners. +pub struct RPCStreamWorker { + client_receiver: TracingUnboundedReceiver, imported_heads_listeners: Vec>, finalized_heads_listeners: Vec>, best_heads_listeners: Vec>, @@ -77,9 +79,17 @@ pub struct RPCWorker { best_sub: Subscription, } -pub async fn create_worker_client(url: Url) -> RelayChainResult<(RPCWorker, RelayChainRPCClient)> { - let (mut client, import, head, finalized) = RelayChainRPCClient::new_with_streams(url).await?; - let (worker, sender) = RPCWorker::new(import, head, finalized); +/// Entry point to create [`RelayChainRPCClient`] and [`RPCStreamWorker`]; +pub async fn create_worker_client( + url: Url, +) -> RelayChainResult<(RPCStreamWorker, RelayChainRPCClient)> { + let mut client = RelayChainRPCClient::new(url).await?; + let best_head_stream = client.subscribe_new_best_heads().await?; + let finalized_head_stream = client.subscribe_finalized_heads().await?; + let imported_head_stream = client.subscribe_imported_heads().await?; + + let (worker, sender) = + RPCStreamWorker::new(imported_head_stream, best_head_stream, finalized_head_stream); client.set_worker_channel(sender); Ok((worker, client)) } @@ -109,14 +119,15 @@ fn handle_event_distribution( }; } -impl RPCWorker { - pub fn new( +impl RPCStreamWorker { + /// Create new worker + fn new( import_sub: Subscription, best_sub: Subscription, finalized_sub: Subscription, - ) -> (RPCWorker, UnboundedSender) { - let (tx, rx) = futures::channel::mpsc::unbounded(); - let worker = RPCWorker { + ) -> (RPCStreamWorker, TracingUnboundedSender) { + let (tx, rx) = tracing_unbounded("mpsc-cumulus-rpc-worker"); + let worker = RPCStreamWorker { client_receiver: rx, imported_heads_listeners: Vec::new(), finalized_heads_listeners: Vec::new(), @@ -128,15 +139,16 @@ impl RPCWorker { (worker, tx) } + /// Run this worker to drive notification streams. + /// The worker does two things: + /// 1. Listen for `NotificationRegisterMessage` and register new listeners for the notification streams + /// 2. Distribute incoming import, best head and finalization notifications to registered listeners. + /// If an error occurs during sending, the receiver has been closed and we remove the sender from the list. pub async fn run(mut self) { let mut import_sub = self.import_sub.fuse(); let mut best_head_sub = self.best_sub.fuse(); let mut finalized_sub = self.finalized_sub.fuse(); loop { - // In this loop we do two things: - // 1. Listen for `NotificationRegisterMessage` and register new listeners for the notification streams - // 2. Distribute incoming import, best head and finalization notifications to registered listeners. - // If an error occurs during sending, the receiver has been closed and we drop the sender too. futures::select! { evt = self.client_receiver.next() => match evt { Some(NotificationRegisterMessage::RegisterBestHeadListener(tx)) => { @@ -162,14 +174,13 @@ impl RPCWorker { } impl RelayChainRPCClient { - fn set_worker_channel(&mut self, sender: UnboundedSender) { + /// Set channel to exchange messages with the rpc-worker + fn set_worker_channel(&mut self, sender: TracingUnboundedSender) { self.to_worker_channel = Some(sender); } - pub async fn new_with_streams( - url: Url, - ) -> RelayChainResult<(Self, Subscription, Subscription, Subscription)> - { + /// Initialize new RPC Client and connect vie WebSocket to a given endpoint. + async fn new(url: Url) -> RelayChainResult { tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client"); let ws_client = WsClientBuilder::default().build(url.as_str()).await?; @@ -178,21 +189,8 @@ impl RelayChainRPCClient { ws_client: Arc::new(ws_client), retry_strategy: ExponentialBackoff::default(), }; - let head_stream = client.subscribe_new_best_heads().await?; - let finalized_stream = client.subscribe_finalized_heads().await?; - let imported_stream = client.subscribe_imported_heads().await?; - Ok((client, imported_stream, head_stream, finalized_stream)) - } - - pub async fn new(url: Url) -> RelayChainResult { - tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client"); - let ws_client = WsClientBuilder::default().build(url.as_str()).await?; - Ok(RelayChainRPCClient { - to_worker_channel: None, - ws_client: Arc::new(ws_client), - retry_strategy: ExponentialBackoff::default(), - }) + Ok(client) } /// Call a call to `state_call` rpc method. From 4bb26be3a8194bbedf2c955c0e5e116e9c977035 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 21 Jul 2022 14:04:50 +0200 Subject: [PATCH 07/17] Improve code readability --- client/relay-chain-interface/src/lib.rs | 2 +- client/relay-chain-rpc-interface/src/lib.rs | 1 + .../src/rpc_client.rs | 135 ++++++++++-------- 3 files changed, 79 insertions(+), 59 deletions(-) diff --git a/client/relay-chain-interface/src/lib.rs b/client/relay-chain-interface/src/lib.rs index 0040ccf92d6..81fc52702e5 100644 --- a/client/relay-chain-interface/src/lib.rs +++ b/client/relay-chain-interface/src/lib.rs @@ -54,7 +54,7 @@ pub enum RelayChainError { RPCCallError(String, JsonRPSeeError), #[error("RPC Error: '{0}'")] JsonRPCError(#[from] JsonRPSeeError), - #[error("Unable to reach worker: {0}")] + #[error("Unable to reach RPCStreamWorker: {0}")] WorkerCommunicationError(String), #[error("Scale codec deserialization error: {0}")] DeserializationError(CodecError), diff --git a/client/relay-chain-rpc-interface/src/lib.rs b/client/relay-chain-rpc-interface/src/lib.rs index 7ea29b24497..88a90097806 100644 --- a/client/relay-chain-rpc-interface/src/lib.rs +++ b/client/relay-chain-rpc-interface/src/lib.rs @@ -33,6 +33,7 @@ use sp_storage::StorageKey; use std::pin::Pin; pub use url::Url; + mod rpc_client; pub use rpc_client::{create_worker_client, RelayChainRPCClient}; diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index 57c8aec3cb6..51d50c2b048 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -70,13 +70,18 @@ pub enum NotificationRegisterMessage { /// Worker that should be used in combination with [`RelayChainRPCClient`]. Must be polled to distribute header notifications to listeners. pub struct RPCStreamWorker { + // Communication channel with the RPC client client_receiver: TracingUnboundedReceiver, - imported_heads_listeners: Vec>, - finalized_heads_listeners: Vec>, - best_heads_listeners: Vec>, - import_sub: Subscription, - finalized_sub: Subscription, - best_sub: Subscription, + + // Senders to distribute incoming header notifications to + imported_header_listeners: Vec>, + finalized_header_listeners: Vec>, + best_header_listeners: Vec>, + + // Incoming notification subscriptions + rpc_imported_header_subscription: Subscription, + rpc_finalized_header_subscription: Subscription, + rpc_best_header_subscription: Subscription, } /// Entry point to create [`RelayChainRPCClient`] and [`RPCStreamWorker`]; @@ -97,30 +102,20 @@ pub async fn create_worker_client( fn handle_event_distribution( event: Option>, senders: &mut Vec>, -) { +) -> Result<(), String> { match event { - Some(Ok(header)) => senders.retain_mut(|e| { - if let Err(err) = e.try_send(header.clone()) { - tracing::debug!( - target: LOG_TARGET, - ?err, - header = header.number, - "Unable to send, removing Sender from listeners." - ); - false - } else { - true - } - }), - err @ _ => { - tracing::error!(target: LOG_TARGET, ?err, "Received error value from stream."); - return + // If sending fails, we remove the sender from the list because the receiver was dropped + Some(Ok(header)) => { + senders.retain_mut(|e| e.try_send(header.clone()).is_ok()); + Ok(()) }, - }; + None => Err("RPC Subscription closed.".to_string()), + Some(Err(err)) => Err(format!("Error in RPC subscription: {}", err)), + } } impl RPCStreamWorker { - /// Create new worker + /// Create new worker. Returns the worker and a channel to register new listeners. fn new( import_sub: Subscription, best_sub: Subscription, @@ -129,12 +124,12 @@ impl RPCStreamWorker { let (tx, rx) = tracing_unbounded("mpsc-cumulus-rpc-worker"); let worker = RPCStreamWorker { client_receiver: rx, - imported_heads_listeners: Vec::new(), - finalized_heads_listeners: Vec::new(), - best_heads_listeners: Vec::new(), - import_sub, - best_sub, - finalized_sub, + imported_header_listeners: Vec::new(), + finalized_header_listeners: Vec::new(), + best_header_listeners: Vec::new(), + rpc_imported_header_subscription: import_sub, + rpc_best_header_subscription: best_sub, + rpc_finalized_header_subscription: finalized_sub, }; (worker, tx) } @@ -145,29 +140,41 @@ impl RPCStreamWorker { /// 2. Distribute incoming import, best head and finalization notifications to registered listeners. /// If an error occurs during sending, the receiver has been closed and we remove the sender from the list. pub async fn run(mut self) { - let mut import_sub = self.import_sub.fuse(); - let mut best_head_sub = self.best_sub.fuse(); - let mut finalized_sub = self.finalized_sub.fuse(); + let mut import_sub = self.rpc_imported_header_subscription.fuse(); + let mut best_head_sub = self.rpc_best_header_subscription.fuse(); + let mut finalized_sub = self.rpc_finalized_header_subscription.fuse(); loop { futures::select! { evt = self.client_receiver.next() => match evt { Some(NotificationRegisterMessage::RegisterBestHeadListener(tx)) => { - self.best_heads_listeners.push(tx); + self.best_header_listeners.push(tx); }, Some(NotificationRegisterMessage::RegisterImportListener(tx)) => { - self.imported_heads_listeners.push(tx) + self.imported_header_listeners.push(tx) }, Some(NotificationRegisterMessage::RegisterFinalizationListener(tx)) => { - self.finalized_heads_listeners.push(tx) + self.finalized_header_listeners.push(tx) }, None => {} }, - import_evt = import_sub.next() => - handle_event_distribution(import_evt, &mut self.imported_heads_listeners), - header_evt = best_head_sub.next() => - handle_event_distribution(header_evt, &mut self.best_heads_listeners), - finalized_evt = finalized_sub.next() => - handle_event_distribution(finalized_evt, &mut self.finalized_heads_listeners), + import_event = import_sub.next() => { + if let Err(err) = handle_event_distribution(import_event, &mut self.imported_header_listeners) { + tracing::error!(target: LOG_TARGET, err, "Encountered error while processing imported header notification. Stopping RPC Worker."); + return; + } + }, + best_header_event = best_head_sub.next() => { + if let Err(err) = handle_event_distribution(best_header_event, &mut self.best_header_listeners) { + tracing::error!(target: LOG_TARGET, err, "Encountered error while processing best header notification. Stopping RPC Worker."); + return; + } + } + finalized_event = finalized_sub.next() => { + if let Err(err) = handle_event_distribution(finalized_event, &mut self.finalized_header_listeners) { + tracing::error!(target: LOG_TARGET, err, "Encountered error while processing finalized header notification. Stopping RPC Worker."); + return; + } + } } } } @@ -381,33 +388,45 @@ impl RelayChainRPCClient { .await } + fn send_register_message_to_worker( + &self, + message: NotificationRegisterMessage, + ) -> Result<(), RelayChainError> { + match &self.to_worker_channel { + Some(channel) => { + channel + .unbounded_send(message) + .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; + Ok(()) + }, + None => Err(RelayChainError::WorkerCommunicationError( + "Worker channel needs to be set before requesting notification streams." + .to_string(), + )), + } + } + pub async fn get_imported_heads_stream(&self) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::channel::(128); - if let Some(channel) = &self.to_worker_channel { - channel - .unbounded_send(NotificationRegisterMessage::RegisterImportListener(tx)) - .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; - } + self.send_register_message_to_worker(NotificationRegisterMessage::RegisterImportListener( + tx, + ))?; Ok(rx) } pub async fn get_best_heads_stream(&self) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::channel::(128); - if let Some(channel) = &self.to_worker_channel { - channel - .unbounded_send(NotificationRegisterMessage::RegisterBestHeadListener(tx)) - .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; - } + self.send_register_message_to_worker( + NotificationRegisterMessage::RegisterBestHeadListener(tx), + )?; Ok(rx) } pub async fn get_finalized_heads_stream(&self) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::channel::(128); - if let Some(channel) = &self.to_worker_channel { - channel - .unbounded_send(NotificationRegisterMessage::RegisterFinalizationListener(tx)) - .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; - } + self.send_register_message_to_worker( + NotificationRegisterMessage::RegisterFinalizationListener(tx), + )?; Ok(rx) } From fc24b873e303b92852de0f297590a34978dc2a22 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 21 Jul 2022 14:26:48 +0200 Subject: [PATCH 08/17] Decrease channel size limit --- client/relay-chain-rpc-interface/src/rpc_client.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index 51d50c2b048..6171347e282 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -49,6 +49,8 @@ pub use url::Url; const LOG_TARGET: &str = "relay-chain-rpc-client"; +const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20; + /// Client that maps RPC methods and deserializes results #[derive(Clone)] pub struct RelayChainRPCClient { @@ -407,7 +409,7 @@ impl RelayChainRPCClient { } pub async fn get_imported_heads_stream(&self) -> Result, RelayChainError> { - let (tx, rx) = futures::channel::mpsc::channel::(128); + let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); self.send_register_message_to_worker(NotificationRegisterMessage::RegisterImportListener( tx, ))?; @@ -415,7 +417,7 @@ impl RelayChainRPCClient { } pub async fn get_best_heads_stream(&self) -> Result, RelayChainError> { - let (tx, rx) = futures::channel::mpsc::channel::(128); + let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); self.send_register_message_to_worker( NotificationRegisterMessage::RegisterBestHeadListener(tx), )?; @@ -423,7 +425,7 @@ impl RelayChainRPCClient { } pub async fn get_finalized_heads_stream(&self) -> Result, RelayChainError> { - let (tx, rx) = futures::channel::mpsc::channel::(128); + let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); self.send_register_message_to_worker( NotificationRegisterMessage::RegisterFinalizationListener(tx), )?; From b65b56c434893cfacea6cb720df5d40ab598fc4c Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 21 Jul 2022 14:45:51 +0200 Subject: [PATCH 09/17] Remove unused dependency --- Cargo.lock | 1 - client/relay-chain-rpc-interface/Cargo.toml | 1 - client/relay-chain-rpc-interface/src/rpc_client.rs | 1 + 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b193f8d7263..6ac6a4b8fb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1950,7 +1950,6 @@ dependencies = [ "polkadot-service", "sc-client-api", "sc-rpc-api", - "sc-service", "sc-utils", "sp-api", "sp-core", diff --git a/client/relay-chain-rpc-interface/Cargo.toml b/client/relay-chain-rpc-interface/Cargo.toml index b2892684e1f..8a922e66a7b 100644 --- a/client/relay-chain-rpc-interface/Cargo.toml +++ b/client/relay-chain-rpc-interface/Cargo.toml @@ -16,7 +16,6 @@ sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } -sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-rpc-api = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index 6171347e282..13b0822b5db 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -63,6 +63,7 @@ pub struct RelayChainRPCClient { to_worker_channel: Option>, } +/// Worker messages to register new notification listeners #[derive(Clone, Debug)] pub enum NotificationRegisterMessage { RegisterBestHeadListener(Sender), From 1398b7da0a1a7af4d2837985acaaf2b57382056b Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 21 Jul 2022 15:23:59 +0200 Subject: [PATCH 10/17] Fix docs --- client/relay-chain-rpc-interface/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/relay-chain-rpc-interface/src/lib.rs b/client/relay-chain-rpc-interface/src/lib.rs index 88a90097806..5f8c32872b5 100644 --- a/client/relay-chain-rpc-interface/src/lib.rs +++ b/client/relay-chain-rpc-interface/src/lib.rs @@ -35,7 +35,7 @@ use std::pin::Pin; pub use url::Url; mod rpc_client; -pub use rpc_client::{create_worker_client, RelayChainRPCClient}; +pub use rpc_client::{create_worker_client, RPCStreamWorker, RelayChainRPCClient}; const TIMEOUT_IN_SECONDS: u64 = 6; From c7601f0ca6a84c4ef00ab34f55763a21070df73f Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 27 Jul 2022 12:53:59 +0200 Subject: [PATCH 11/17] RPC -> Rpc --- client/relay-chain-interface/src/lib.rs | 8 ++--- client/relay-chain-rpc-interface/src/lib.rs | 14 ++++----- .../src/rpc_client.rs | 30 +++++++++---------- parachain-template/node/src/service.rs | 4 +-- polkadot-parachain/src/service.rs | 4 +-- test/service/src/lib.rs | 4 +-- 6 files changed, 32 insertions(+), 32 deletions(-) diff --git a/client/relay-chain-interface/src/lib.rs b/client/relay-chain-interface/src/lib.rs index 81fc52702e5..91cfb531b73 100644 --- a/client/relay-chain-interface/src/lib.rs +++ b/client/relay-chain-interface/src/lib.rs @@ -29,7 +29,7 @@ use sc_client_api::StorageProof; use futures::Stream; use async_trait::async_trait; -use jsonrpsee_core::Error as JsonRPSeeError; +use jsonrpsee_core::Error as JsonRpcError; use parity_scale_codec::Error as CodecError; use sp_api::ApiError; use sp_state_machine::StorageValue; @@ -51,10 +51,10 @@ pub enum RelayChainError { #[error("State machine error occured: {0}")] StateMachineError(Box), #[error("Unable to call RPC method '{0}' due to error: {1}")] - RPCCallError(String, JsonRPSeeError), + RpcCallError(String, JsonRpcError), #[error("RPC Error: '{0}'")] - JsonRPCError(#[from] JsonRPSeeError), - #[error("Unable to reach RPCStreamWorker: {0}")] + JsonRpcError(#[from] JsonRpcError), + #[error("Unable to reach RpcStreamWorker: {0}")] WorkerCommunicationError(String), #[error("Scale codec deserialization error: {0}")] DeserializationError(CodecError), diff --git a/client/relay-chain-rpc-interface/src/lib.rs b/client/relay-chain-rpc-interface/src/lib.rs index 5f8c32872b5..cbffad8d787 100644 --- a/client/relay-chain-rpc-interface/src/lib.rs +++ b/client/relay-chain-rpc-interface/src/lib.rs @@ -35,25 +35,25 @@ use std::pin::Pin; pub use url::Url; mod rpc_client; -pub use rpc_client::{create_worker_client, RPCStreamWorker, RelayChainRPCClient}; +pub use rpc_client::{create_worker_client, RelayChainRpcClient, RpcStreamWorker}; const TIMEOUT_IN_SECONDS: u64 = 6; -/// RelayChainRPCInterface is used to interact with a full node that is running locally +/// RelayChainRpcInterface is used to interact with a full node that is running locally /// in the same process. #[derive(Clone)] -pub struct RelayChainRPCInterface { - rpc_client: RelayChainRPCClient, +pub struct RelayChainRpcInterface { + rpc_client: RelayChainRpcClient, } -impl RelayChainRPCInterface { - pub fn new(rpc_client: RelayChainRPCClient) -> Self { +impl RelayChainRpcInterface { + pub fn new(rpc_client: RelayChainRpcClient) -> Self { Self { rpc_client } } } #[async_trait] -impl RelayChainInterface for RelayChainRPCInterface { +impl RelayChainInterface for RelayChainRpcInterface { async fn retrieve_dmq_contents( &self, para_id: ParaId, diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index 13b0822b5db..8d9acfddbee 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -29,7 +29,7 @@ use futures::{ }; use jsonrpsee::{ core::{ - client::{Client as JsonRPCClient, ClientT, Subscription, SubscriptionClientT}, + client::{Client as JsonRpcClient, ClientT, Subscription, SubscriptionClientT}, Error as JsonRpseeError, }, rpc_params, @@ -53,9 +53,9 @@ const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20; /// Client that maps RPC methods and deserializes results #[derive(Clone)] -pub struct RelayChainRPCClient { +pub struct RelayChainRpcClient { /// Websocket client to make calls - ws_client: Arc, + ws_client: Arc, /// Retry strategy that should be used for requests and subscriptions retry_strategy: ExponentialBackoff, @@ -72,7 +72,7 @@ pub enum NotificationRegisterMessage { } /// Worker that should be used in combination with [`RelayChainRPCClient`]. Must be polled to distribute header notifications to listeners. -pub struct RPCStreamWorker { +pub struct RpcStreamWorker { // Communication channel with the RPC client client_receiver: TracingUnboundedReceiver, @@ -87,17 +87,17 @@ pub struct RPCStreamWorker { rpc_best_header_subscription: Subscription, } -/// Entry point to create [`RelayChainRPCClient`] and [`RPCStreamWorker`]; +/// Entry point to create [`RelayChainRpcClient`] and [`RpcStreamWorker`]; pub async fn create_worker_client( url: Url, -) -> RelayChainResult<(RPCStreamWorker, RelayChainRPCClient)> { - let mut client = RelayChainRPCClient::new(url).await?; +) -> RelayChainResult<(RpcStreamWorker, RelayChainRpcClient)> { + let mut client = RelayChainRpcClient::new(url).await?; let best_head_stream = client.subscribe_new_best_heads().await?; let finalized_head_stream = client.subscribe_finalized_heads().await?; let imported_head_stream = client.subscribe_imported_heads().await?; let (worker, sender) = - RPCStreamWorker::new(imported_head_stream, best_head_stream, finalized_head_stream); + RpcStreamWorker::new(imported_head_stream, best_head_stream, finalized_head_stream); client.set_worker_channel(sender); Ok((worker, client)) } @@ -117,15 +117,15 @@ fn handle_event_distribution( } } -impl RPCStreamWorker { +impl RpcStreamWorker { /// Create new worker. Returns the worker and a channel to register new listeners. fn new( import_sub: Subscription, best_sub: Subscription, finalized_sub: Subscription, - ) -> (RPCStreamWorker, TracingUnboundedSender) { + ) -> (RpcStreamWorker, TracingUnboundedSender) { let (tx, rx) = tracing_unbounded("mpsc-cumulus-rpc-worker"); - let worker = RPCStreamWorker { + let worker = RpcStreamWorker { client_receiver: rx, imported_header_listeners: Vec::new(), finalized_header_listeners: Vec::new(), @@ -183,7 +183,7 @@ impl RPCStreamWorker { } } -impl RelayChainRPCClient { +impl RelayChainRpcClient { /// Set channel to exchange messages with the rpc-worker fn set_worker_channel(&mut self, sender: TracingUnboundedSender) { self.to_worker_channel = Some(sender); @@ -194,7 +194,7 @@ impl RelayChainRPCClient { tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client"); let ws_client = WsClientBuilder::default().build(url.as_str()).await?; - let client = RelayChainRPCClient { + let client = RelayChainRpcClient { to_worker_channel: None, ws_client: Arc::new(ws_client), retry_strategy: ExponentialBackoff::default(), @@ -244,7 +244,7 @@ impl RelayChainRPCClient { self.ws_client .subscribe::(sub_name, params, unsub_name) .await - .map_err(|err| RelayChainError::RPCCallError(sub_name.to_string(), err)) + .map_err(|err| RelayChainError::RpcCallError(sub_name.to_string(), err)) } /// Perform RPC request @@ -289,7 +289,7 @@ impl RelayChainRPCClient { .await .map_err(|err| { trace_error(&err); - RelayChainError::RPCCallError(method.to_string(), err)}) + RelayChainError::RpcCallError(method.to_string(), err)}) } pub async fn system_health(&self) -> Result { diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index ba453b81187..22fbeb605ef 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -22,7 +22,7 @@ use cumulus_client_service::{ use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRPCInterface}; +use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRpcInterface}; // Substrate Imports use sc_client_api::ExecutorProvider; @@ -183,7 +183,7 @@ async fn build_relay_chain_interface( None, worker.run(), ); - Ok((Arc::new(RelayChainRPCInterface::new(client)) as Arc<_>, None)) + Ok((Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>, None)) }, None => build_inprocess_relay_chain( polkadot_config, diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index 25638430824..f43b1185f1d 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -30,7 +30,7 @@ use cumulus_primitives_core::{ }; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRPCInterface}; +use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRpcInterface}; use polkadot_service::CollatorPair; use sp_core::Pair; @@ -303,7 +303,7 @@ async fn build_relay_chain_interface( None, worker.run(), ); - Ok((Arc::new(RelayChainRPCInterface::new(client)) as Arc<_>, None)) + Ok((Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>, None)) }, None => build_inprocess_relay_chain( polkadot_config, diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index ad9f86d126b..ad6e8b83c46 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -37,7 +37,7 @@ use cumulus_client_service::{ use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::RelayChainInProcessInterface; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRPCInterface}; +use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRpcInterface}; use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi}; use parking_lot::Mutex; @@ -186,7 +186,7 @@ async fn build_relay_chain_interface( task_manager .spawn_essential_handle() .spawn("relay-chain-rpc-worker", None, worker.run()); - return Ok(Arc::new(RelayChainRPCInterface::new(client)) as Arc<_>) + return Ok(Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>) } let relay_chain_full_node = polkadot_test_service::new_full( From 23f6e0db5e2df529089f6c459328fb648d2d2d62 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 27 Jul 2022 13:23:58 +0200 Subject: [PATCH 12/17] Start worker in initialization method --- client/relay-chain-rpc-interface/src/lib.rs | 2 +- .../relay-chain-rpc-interface/src/rpc_client.rs | 15 +++++++++++---- parachain-template/node/src/service.rs | 9 ++------- polkadot-parachain/src/service.rs | 9 ++------- test/service/src/lib.rs | 7 ++----- 5 files changed, 18 insertions(+), 24 deletions(-) diff --git a/client/relay-chain-rpc-interface/src/lib.rs b/client/relay-chain-rpc-interface/src/lib.rs index cbffad8d787..b1e3a13f71e 100644 --- a/client/relay-chain-rpc-interface/src/lib.rs +++ b/client/relay-chain-rpc-interface/src/lib.rs @@ -35,7 +35,7 @@ use std::pin::Pin; pub use url::Url; mod rpc_client; -pub use rpc_client::{create_worker_client, RelayChainRpcClient, RpcStreamWorker}; +pub use rpc_client::{create_client_and_start_worker, RelayChainRpcClient, RpcStreamWorker}; const TIMEOUT_IN_SECONDS: u64 = 6; diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index 8d9acfddbee..422d7769771 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -37,6 +37,7 @@ use jsonrpsee::{ ws_client::WsClientBuilder, }; use parity_scale_codec::{Decode, Encode}; +use polkadot_service::TaskManager; use sc_client_api::StorageData; use sc_rpc_api::{state::ReadProof, system::Health}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; @@ -87,10 +88,11 @@ pub struct RpcStreamWorker { rpc_best_header_subscription: Subscription, } -/// Entry point to create [`RelayChainRpcClient`] and [`RpcStreamWorker`]; -pub async fn create_worker_client( +/// Entry point to create [`RelayChainRpcClient`] and start a worker that distributes notifications. +pub async fn create_client_and_start_worker( url: Url, -) -> RelayChainResult<(RpcStreamWorker, RelayChainRpcClient)> { + task_manager: &mut TaskManager, +) -> RelayChainResult { let mut client = RelayChainRpcClient::new(url).await?; let best_head_stream = client.subscribe_new_best_heads().await?; let finalized_head_stream = client.subscribe_finalized_heads().await?; @@ -99,7 +101,12 @@ pub async fn create_worker_client( let (worker, sender) = RpcStreamWorker::new(imported_head_stream, best_head_stream, finalized_head_stream); client.set_worker_channel(sender); - Ok((worker, client)) + + task_manager + .spawn_essential_handle() + .spawn("relay-chain-rpc-worker", None, worker.run()); + + Ok(client) } fn handle_event_distribution( diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index 22fbeb605ef..d364b172e79 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -22,7 +22,7 @@ use cumulus_client_service::{ use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRpcInterface}; +use cumulus_relay_chain_rpc_interface::{create_client_and_start_worker, RelayChainRpcInterface}; // Substrate Imports use sc_client_api::ExecutorProvider; @@ -177,12 +177,7 @@ async fn build_relay_chain_interface( ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { match collator_options.relay_chain_rpc_url { Some(relay_chain_url) => { - let (worker, client) = create_worker_client(relay_chain_url).await?; - task_manager.spawn_essential_handle().spawn( - "relay-chain-rpc-worker", - None, - worker.run(), - ); + let client = create_client_and_start_worker(relay_chain_url, task_manager).await?; Ok((Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>, None)) }, None => build_inprocess_relay_chain( diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index f43b1185f1d..8a47675f7f4 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -30,7 +30,7 @@ use cumulus_primitives_core::{ }; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRpcInterface}; +use cumulus_relay_chain_rpc_interface::{create_client_and_start_worker, RelayChainRpcInterface}; use polkadot_service::CollatorPair; use sp_core::Pair; @@ -297,12 +297,7 @@ async fn build_relay_chain_interface( ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { match collator_options.relay_chain_rpc_url { Some(relay_chain_url) => { - let (worker, client) = create_worker_client(relay_chain_url).await?; - task_manager.spawn_essential_handle().spawn( - "relay-chain-rpc-worker", - None, - worker.run(), - ); + let client = create_client_and_start_worker(relay_chain_url, task_manager).await?; Ok((Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>, None)) }, None => build_inprocess_relay_chain( diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index ad6e8b83c46..69593b3c1e5 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -37,7 +37,7 @@ use cumulus_client_service::{ use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::RelayChainInProcessInterface; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::{create_worker_client, RelayChainRpcInterface}; +use cumulus_relay_chain_rpc_interface::{create_client_and_start_worker, RelayChainRpcInterface}; use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi}; use parking_lot::Mutex; @@ -182,10 +182,7 @@ async fn build_relay_chain_interface( task_manager: &mut TaskManager, ) -> RelayChainResult> { if let Some(relay_chain_url) = collator_options.relay_chain_rpc_url { - let (worker, client) = create_worker_client(relay_chain_url).await?; - task_manager - .spawn_essential_handle() - .spawn("relay-chain-rpc-worker", None, worker.run()); + let client = create_client_and_start_worker(relay_chain_url, task_manager).await?; return Ok(Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>) } From 73c69fd5dc554f246f6c30d6ff20f333feae66f1 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 27 Jul 2022 14:12:12 +0200 Subject: [PATCH 13/17] Print error in case a distribution channel is full --- client/relay-chain-rpc-interface/src/rpc_client.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index 422d7769771..e54a3cd4dce 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -114,9 +114,19 @@ fn handle_event_distribution( senders: &mut Vec>, ) -> Result<(), String> { match event { - // If sending fails, we remove the sender from the list because the receiver was dropped Some(Ok(header)) => { - senders.retain_mut(|e| e.try_send(header.clone()).is_ok()); + senders.retain_mut(|e| { + match e.try_send(header.clone()) { + // Receiver has been dropped, remove Sender from list. + Err(error) if error.is_disconnected() => false, + // Channel is full. This should not happen. + Err(error) => { + tracing::error!(target: LOG_TARGET, ?error, "Event distribution channel has reached its limit. This can lead to missed notifications."); + true + }, + _ => true, + } + }); Ok(()) }, None => Err("RPC Subscription closed.".to_string()), From 9953820a4454becbbd922ad02d9b38c28d94eea7 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 27 Jul 2022 14:27:24 +0200 Subject: [PATCH 14/17] Fix docs --- client/relay-chain-rpc-interface/src/rpc_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index e54a3cd4dce..f409f9b5712 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -72,7 +72,7 @@ pub enum NotificationRegisterMessage { RegisterFinalizationListener(Sender), } -/// Worker that should be used in combination with [`RelayChainRPCClient`]. Must be polled to distribute header notifications to listeners. +/// Worker that should be used in combination with [`RelayChainRpcClient`]. Must be polled to distribute header notifications to listeners. pub struct RpcStreamWorker { // Communication channel with the RPC client client_receiver: TracingUnboundedReceiver, From 3cc5397897f71a06edc9d0e5be99e3cc519950e7 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 28 Jul 2022 10:10:21 +0200 Subject: [PATCH 15/17] Make `RpcStreamWorker` private Co-authored-by: Davide Galassi --- client/relay-chain-rpc-interface/src/lib.rs | 2 +- client/relay-chain-rpc-interface/src/rpc_client.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/relay-chain-rpc-interface/src/lib.rs b/client/relay-chain-rpc-interface/src/lib.rs index b1e3a13f71e..f295c693ecd 100644 --- a/client/relay-chain-rpc-interface/src/lib.rs +++ b/client/relay-chain-rpc-interface/src/lib.rs @@ -35,7 +35,7 @@ use std::pin::Pin; pub use url::Url; mod rpc_client; -pub use rpc_client::{create_client_and_start_worker, RelayChainRpcClient, RpcStreamWorker}; +pub use rpc_client::{create_client_and_start_worker, RelayChainRpcClient}; const TIMEOUT_IN_SECONDS: u64 = 6; diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index f409f9b5712..a185cc47691 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -73,7 +73,7 @@ pub enum NotificationRegisterMessage { } /// Worker that should be used in combination with [`RelayChainRpcClient`]. Must be polled to distribute header notifications to listeners. -pub struct RpcStreamWorker { +struct RpcStreamWorker { // Communication channel with the RPC client client_receiver: TracingUnboundedReceiver, From 7366381e18ef5f6507f9596514046ae10e8679bb Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Fri, 29 Jul 2022 17:21:20 +0200 Subject: [PATCH 16/17] Use tokio channels and add TODO item --- Cargo.lock | 2 +- client/relay-chain-rpc-interface/Cargo.toml | 2 +- .../src/rpc_client.rs | 27 ++++++++++++------- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ac6a4b8fb8..d60dc207399 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1950,12 +1950,12 @@ dependencies = [ "polkadot-service", "sc-client-api", "sc-rpc-api", - "sc-utils", "sp-api", "sp-core", "sp-runtime", "sp-state-machine", "sp-storage", + "tokio", "tracing", "url", ] diff --git a/client/relay-chain-rpc-interface/Cargo.toml b/client/relay-chain-rpc-interface/Cargo.toml index 8a922e66a7b..3bae31b7fed 100644 --- a/client/relay-chain-rpc-interface/Cargo.toml +++ b/client/relay-chain-rpc-interface/Cargo.toml @@ -16,9 +16,9 @@ sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } -sc-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-rpc-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +tokio = { version = "1.19.2", features = ["sync"] } futures = "0.3.21" futures-timer = "3.0.2" diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index a185cc47691..399c88298e2 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -40,11 +40,13 @@ use parity_scale_codec::{Decode, Encode}; use polkadot_service::TaskManager; use sc_client_api::StorageData; use sc_rpc_api::{state::ReadProof, system::Health}; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_core::sp_std::collections::btree_map::BTreeMap; use sp_runtime::DeserializeOwned; use sp_storage::StorageKey; use std::sync::Arc; +use tokio::sync::mpsc::{ + channel as tokio_channel, Receiver as TokioReceiver, Sender as TokioSender, +}; pub use url::Url; @@ -61,7 +63,7 @@ pub struct RelayChainRpcClient { /// Retry strategy that should be used for requests and subscriptions retry_strategy: ExponentialBackoff, - to_worker_channel: Option>, + to_worker_channel: Option>, } /// Worker messages to register new notification listeners @@ -75,7 +77,7 @@ pub enum NotificationRegisterMessage { /// Worker that should be used in combination with [`RelayChainRpcClient`]. Must be polled to distribute header notifications to listeners. struct RpcStreamWorker { // Communication channel with the RPC client - client_receiver: TracingUnboundedReceiver, + client_receiver: TokioReceiver, // Senders to distribute incoming header notifications to imported_header_listeners: Vec>, @@ -120,6 +122,8 @@ fn handle_event_distribution( // Receiver has been dropped, remove Sender from list. Err(error) if error.is_disconnected() => false, // Channel is full. This should not happen. + // TODO: Improve error handling here + // https://github.com/paritytech/cumulus/issues/1482 Err(error) => { tracing::error!(target: LOG_TARGET, ?error, "Event distribution channel has reached its limit. This can lead to missed notifications."); true @@ -140,8 +144,8 @@ impl RpcStreamWorker { import_sub: Subscription, best_sub: Subscription, finalized_sub: Subscription, - ) -> (RpcStreamWorker, TracingUnboundedSender) { - let (tx, rx) = tracing_unbounded("mpsc-cumulus-rpc-worker"); + ) -> (RpcStreamWorker, TokioSender) { + let (tx, rx) = tokio_channel(100); let worker = RpcStreamWorker { client_receiver: rx, imported_header_listeners: Vec::new(), @@ -164,8 +168,8 @@ impl RpcStreamWorker { let mut best_head_sub = self.rpc_best_header_subscription.fuse(); let mut finalized_sub = self.rpc_finalized_header_subscription.fuse(); loop { - futures::select! { - evt = self.client_receiver.next() => match evt { + tokio::select! { + evt = self.client_receiver.recv() => match evt { Some(NotificationRegisterMessage::RegisterBestHeadListener(tx)) => { self.best_header_listeners.push(tx); }, @@ -175,7 +179,10 @@ impl RpcStreamWorker { Some(NotificationRegisterMessage::RegisterFinalizationListener(tx)) => { self.finalized_header_listeners.push(tx) }, - None => {} + None => { + tracing::error!(target: LOG_TARGET, "RPC client receiver closed. Stopping RPC Worker."); + return; + } }, import_event = import_sub.next() => { if let Err(err) = handle_event_distribution(import_event, &mut self.imported_header_listeners) { @@ -202,7 +209,7 @@ impl RpcStreamWorker { impl RelayChainRpcClient { /// Set channel to exchange messages with the rpc-worker - fn set_worker_channel(&mut self, sender: TracingUnboundedSender) { + fn set_worker_channel(&mut self, sender: TokioSender) { self.to_worker_channel = Some(sender); } @@ -415,7 +422,7 @@ impl RelayChainRpcClient { match &self.to_worker_channel { Some(channel) => { channel - .unbounded_send(message) + .try_send(message) .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; Ok(()) }, From 512f1b4c9384d3ba69fb30fc2c120d75b3c8030e Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 1 Aug 2022 10:48:11 +0200 Subject: [PATCH 17/17] Remove `Option` from `to_worker_channel` --- .../src/rpc_client.rs | 100 ++++++++---------- 1 file changed, 44 insertions(+), 56 deletions(-) diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index 399c88298e2..71014b18e0e 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -63,7 +63,8 @@ pub struct RelayChainRpcClient { /// Retry strategy that should be used for requests and subscriptions retry_strategy: ExponentialBackoff, - to_worker_channel: Option>, + /// Channel to communicate with the RPC worker + to_worker_channel: TokioSender, } /// Worker messages to register new notification listeners @@ -95,14 +96,16 @@ pub async fn create_client_and_start_worker( url: Url, task_manager: &mut TaskManager, ) -> RelayChainResult { - let mut client = RelayChainRpcClient::new(url).await?; - let best_head_stream = client.subscribe_new_best_heads().await?; - let finalized_head_stream = client.subscribe_finalized_heads().await?; - let imported_head_stream = client.subscribe_imported_heads().await?; + tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client"); + let ws_client = WsClientBuilder::default().build(url.as_str()).await?; + + let best_head_stream = RelayChainRpcClient::subscribe_new_best_heads(&ws_client).await?; + let finalized_head_stream = RelayChainRpcClient::subscribe_finalized_heads(&ws_client).await?; + let imported_head_stream = RelayChainRpcClient::subscribe_imported_heads(&ws_client).await?; let (worker, sender) = RpcStreamWorker::new(imported_head_stream, best_head_stream, finalized_head_stream); - client.set_worker_channel(sender); + let client = RelayChainRpcClient::new(ws_client, sender).await?; task_manager .spawn_essential_handle() @@ -208,18 +211,13 @@ impl RpcStreamWorker { } impl RelayChainRpcClient { - /// Set channel to exchange messages with the rpc-worker - fn set_worker_channel(&mut self, sender: TokioSender) { - self.to_worker_channel = Some(sender); - } - - /// Initialize new RPC Client and connect vie WebSocket to a given endpoint. - async fn new(url: Url) -> RelayChainResult { - tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client"); - let ws_client = WsClientBuilder::default().build(url.as_str()).await?; - + /// Initialize new RPC Client. + async fn new( + ws_client: JsonRpcClient, + sender: TokioSender, + ) -> RelayChainResult { let client = RelayChainRpcClient { - to_worker_channel: None, + to_worker_channel: sender, ws_client: Arc::new(ws_client), retry_strategy: ExponentialBackoff::default(), }; @@ -256,20 +254,6 @@ impl RelayChainRpcClient { } /// Subscribe to a notification stream via RPC - pub async fn subscribe<'a, R>( - &self, - sub_name: &'a str, - unsub_name: &'a str, - params: Option>, - ) -> RelayChainResult> - where - R: DeserializeOwned, - { - self.ws_client - .subscribe::(sub_name, params, unsub_name) - .await - .map_err(|err| RelayChainError::RpcCallError(sub_name.to_string(), err)) - } /// Perform RPC request async fn request<'a, R>( @@ -419,18 +403,9 @@ impl RelayChainRpcClient { &self, message: NotificationRegisterMessage, ) -> Result<(), RelayChainError> { - match &self.to_worker_channel { - Some(channel) => { - channel - .try_send(message) - .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))?; - Ok(()) - }, - None => Err(RelayChainError::WorkerCommunicationError( - "Worker channel needs to be set before requesting notification streams." - .to_string(), - )), - } + self.to_worker_channel + .try_send(message) + .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string())) } pub async fn get_imported_heads_stream(&self) -> Result, RelayChainError> { @@ -457,22 +432,35 @@ impl RelayChainRpcClient { Ok(rx) } - async fn subscribe_imported_heads(&self) -> Result, RelayChainError> { - self.subscribe::("chain_subscribeAllHeads", "chain_unsubscribeAllHeads", None) - .await + async fn subscribe_imported_heads( + ws_client: &JsonRpcClient, + ) -> Result, RelayChainError> { + Ok(ws_client + .subscribe::("chain_subscribeAllHeads", None, "chain_unsubscribeAllHeads") + .await?) } - async fn subscribe_finalized_heads(&self) -> Result, RelayChainError> { - self.subscribe::( - "chain_subscribeFinalizedHeads", - "chain_unsubscribeFinalizedHeads", - None, - ) - .await + async fn subscribe_finalized_heads( + ws_client: &JsonRpcClient, + ) -> Result, RelayChainError> { + Ok(ws_client + .subscribe::( + "chain_subscribeFinalizedHeads", + None, + "chain_unsubscribeFinalizedHeads", + ) + .await?) } - async fn subscribe_new_best_heads(&self) -> Result, RelayChainError> { - self.subscribe::("chain_subscribeNewHeads", "chain_unsubscribeNewHeads", None) - .await + async fn subscribe_new_best_heads( + ws_client: &JsonRpcClient, + ) -> Result, RelayChainError> { + Ok(ws_client + .subscribe::( + "chain_subscribeNewHeads", + None, + "chain_unsubscribeFinalizedHeads", + ) + .await?) } }