diff --git a/Cargo.lock b/Cargo.lock index 526397197f9..d60dc207399 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1955,6 +1955,7 @@ dependencies = [ "sp-runtime", "sp-state-machine", "sp-storage", + "tokio", "tracing", "url", ] diff --git a/client/relay-chain-interface/src/lib.rs b/client/relay-chain-interface/src/lib.rs index 119721adec1..91cfb531b73 100644 --- a/client/relay-chain-interface/src/lib.rs +++ b/client/relay-chain-interface/src/lib.rs @@ -29,7 +29,7 @@ use sc_client_api::StorageProof; use futures::Stream; use async_trait::async_trait; -use jsonrpsee_core::Error as JsonRPSeeError; +use jsonrpsee_core::Error as JsonRpcError; use parity_scale_codec::Error as CodecError; use sp_api::ApiError; use sp_state_machine::StorageValue; @@ -51,9 +51,11 @@ pub enum RelayChainError { #[error("State machine error occured: {0}")] StateMachineError(Box), #[error("Unable to call RPC method '{0}' due to error: {1}")] - RPCCallError(String, JsonRPSeeError), + RpcCallError(String, JsonRpcError), #[error("RPC Error: '{0}'")] - JsonRPCError(#[from] JsonRPSeeError), + JsonRpcError(#[from] JsonRpcError), + #[error("Unable to reach RpcStreamWorker: {0}")] + WorkerCommunicationError(String), #[error("Scale codec deserialization error: {0}")] DeserializationError(CodecError), #[error("Scale codec deserialization error: {0}")] diff --git a/client/relay-chain-rpc-interface/Cargo.toml b/client/relay-chain-rpc-interface/Cargo.toml index d7b6009dc97..3bae31b7fed 100644 --- a/client/relay-chain-rpc-interface/Cargo.toml +++ b/client/relay-chain-rpc-interface/Cargo.toml @@ -18,6 +18,7 @@ sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = " sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-rpc-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +tokio = { version = "1.19.2", features = ["sync"] } futures = "0.3.21" futures-timer = "3.0.2" diff --git a/client/relay-chain-rpc-interface/src/lib.rs b/client/relay-chain-rpc-interface/src/lib.rs index 9f84e437699..f295c693ecd 100644 --- a/client/relay-chain-rpc-interface/src/lib.rs +++ b/client/relay-chain-rpc-interface/src/lib.rs @@ -15,7 +15,6 @@ // along with Cumulus. If not, see . use async_trait::async_trait; -use backoff::{future::retry_notify, ExponentialBackoff}; use core::time::Duration; use cumulus_primitives_core::{ relay_chain::{ @@ -26,274 +25,35 @@ use cumulus_primitives_core::{ }; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; use futures::{FutureExt, Stream, StreamExt}; -use jsonrpsee::{ - core::{ - client::{Client as JsonRPCClient, ClientT, Subscription, SubscriptionClientT}, - Error as JsonRpseeError, - }, - rpc_params, - types::ParamsSer, - ws_client::WsClientBuilder, -}; -use parity_scale_codec::{Decode, Encode}; use polkadot_service::Handle; -use sc_client_api::{StorageData, StorageProof}; -use sc_rpc_api::{state::ReadProof, system::Health}; +use sc_client_api::StorageProof; use sp_core::sp_std::collections::btree_map::BTreeMap; -use sp_runtime::DeserializeOwned; use sp_state_machine::StorageValue; use sp_storage::StorageKey; -use std::{pin::Pin, sync::Arc}; +use std::pin::Pin; pub use url::Url; -const LOG_TARGET: &str = "relay-chain-rpc-interface"; -const TIMEOUT_IN_SECONDS: u64 = 6; - -/// Client that maps RPC methods and deserializes results -#[derive(Clone)] -struct RelayChainRPCClient { - /// Websocket client to make calls - ws_client: Arc, - - /// Retry strategy that should be used for requests and subscriptions - retry_strategy: ExponentialBackoff, -} - -impl RelayChainRPCClient { - pub async fn new(url: Url) -> RelayChainResult { - tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client"); - let ws_client = WsClientBuilder::default().build(url.as_str()).await?; - - Ok(RelayChainRPCClient { - ws_client: Arc::new(ws_client), - retry_strategy: ExponentialBackoff::default(), - }) - } - - /// Call a call to `state_call` rpc method. - async fn call_remote_runtime_function( - &self, - method_name: &str, - hash: PHash, - payload: Option, - ) -> RelayChainResult { - let payload_bytes = - payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode())); - let params = rpc_params! { - method_name, - payload_bytes, - hash - }; - let res = self - .request_tracing::("state_call", params, |err| { - tracing::trace!( - target: LOG_TARGET, - %method_name, - %hash, - error = %err, - "Error during call to 'state_call'.", - ); - }) - .await?; - Decode::decode(&mut &*res.0).map_err(Into::into) - } - - /// Subscribe to a notification stream via RPC - async fn subscribe<'a, R>( - &self, - sub_name: &'a str, - unsub_name: &'a str, - params: Option>, - ) -> RelayChainResult> - where - R: DeserializeOwned, - { - self.ws_client - .subscribe::(sub_name, params, unsub_name) - .await - .map_err(|err| RelayChainError::RPCCallError(sub_name.to_string(), err)) - } - - /// Perform RPC request - async fn request<'a, R>( - &self, - method: &'a str, - params: Option>, - ) -> Result - where - R: DeserializeOwned + std::fmt::Debug, - { - self.request_tracing( - method, - params, - |e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"), - ) - .await - } - - /// Perform RPC request - async fn request_tracing<'a, R, OR>( - &self, - method: &'a str, - params: Option>, - trace_error: OR, - ) -> Result - where - R: DeserializeOwned + std::fmt::Debug, - OR: Fn(&jsonrpsee::core::Error), - { - retry_notify( - self.retry_strategy.clone(), - || async { - self.ws_client.request(method, params.clone()).await.map_err(|err| match err { - JsonRpseeError::Transport(_) => - backoff::Error::Transient { err, retry_after: None }, - _ => backoff::Error::Permanent(err), - }) - }, - |error, dur| tracing::trace!(target: LOG_TARGET, %error, ?dur, "Encountered transport error, retrying."), - ) - .await - .map_err(|err| { - trace_error(&err); - RelayChainError::RPCCallError(method.to_string(), err)}) - } - - async fn system_health(&self) -> Result { - self.request("system_health", None).await - } - - async fn state_get_read_proof( - &self, - storage_keys: Vec, - at: Option, - ) -> Result, RelayChainError> { - let params = rpc_params!(storage_keys, at); - self.request("state_getReadProof", params).await - } - - async fn state_get_storage( - &self, - storage_key: StorageKey, - at: Option, - ) -> Result, RelayChainError> { - let params = rpc_params!(storage_key, at); - self.request("state_getStorage", params).await - } - - async fn chain_get_head(&self) -> Result { - self.request("chain_getHead", None).await - } - - async fn chain_get_header( - &self, - hash: Option, - ) -> Result, RelayChainError> { - let params = rpc_params!(hash); - self.request("chain_getHeader", params).await - } - - async fn parachain_host_candidate_pending_availability( - &self, - at: PHash, - para_id: ParaId, - ) -> Result, RelayChainError> { - self.call_remote_runtime_function( - "ParachainHost_candidate_pending_availability", - at, - Some(para_id), - ) - .await - } - - async fn parachain_host_session_index_for_child( - &self, - at: PHash, - ) -> Result { - self.call_remote_runtime_function("ParachainHost_session_index_for_child", at, None::<()>) - .await - } +mod rpc_client; +pub use rpc_client::{create_client_and_start_worker, RelayChainRpcClient}; - async fn parachain_host_validators( - &self, - at: PHash, - ) -> Result, RelayChainError> { - self.call_remote_runtime_function("ParachainHost_validators", at, None::<()>) - .await - } - - async fn parachain_host_persisted_validation_data( - &self, - at: PHash, - para_id: ParaId, - occupied_core_assumption: OccupiedCoreAssumption, - ) -> Result, RelayChainError> { - self.call_remote_runtime_function( - "ParachainHost_persisted_validation_data", - at, - Some((para_id, occupied_core_assumption)), - ) - .await - } - - async fn parachain_host_inbound_hrmp_channels_contents( - &self, - para_id: ParaId, - at: PHash, - ) -> Result>, RelayChainError> { - self.call_remote_runtime_function( - "ParachainHost_inbound_hrmp_channels_contents", - at, - Some(para_id), - ) - .await - } - - async fn parachain_host_dmq_contents( - &self, - para_id: ParaId, - at: PHash, - ) -> Result, RelayChainError> { - self.call_remote_runtime_function("ParachainHost_dmq_contents", at, Some(para_id)) - .await - } - - async fn subscribe_all_heads(&self) -> Result, RelayChainError> { - self.subscribe::("chain_subscribeAllHeads", "chain_unsubscribeAllHeads", None) - .await - } - - async fn subscribe_new_best_heads(&self) -> Result, RelayChainError> { - self.subscribe::("chain_subscribeNewHeads", "chain_unsubscribeNewHeads", None) - .await - } - - async fn subscribe_finalized_heads(&self) -> Result, RelayChainError> { - self.subscribe::( - "chain_subscribeFinalizedHeads", - "chain_unsubscribeFinalizedHeads", - None, - ) - .await - } -} +const TIMEOUT_IN_SECONDS: u64 = 6; -/// RelayChainRPCInterface is used to interact with a full node that is running locally +/// RelayChainRpcInterface is used to interact with a full node that is running locally /// in the same process. #[derive(Clone)] -pub struct RelayChainRPCInterface { - rpc_client: RelayChainRPCClient, +pub struct RelayChainRpcInterface { + rpc_client: RelayChainRpcClient, } -impl RelayChainRPCInterface { - pub async fn new(url: Url) -> RelayChainResult { - Ok(Self { rpc_client: RelayChainRPCClient::new(url).await? }) +impl RelayChainRpcInterface { + pub fn new(rpc_client: RelayChainRpcClient) -> Self { + Self { rpc_client } } } #[async_trait] -impl RelayChainInterface for RelayChainRPCInterface { +impl RelayChainInterface for RelayChainRpcInterface { async fn retrieve_dmq_contents( &self, para_id: ParaId, @@ -344,17 +104,7 @@ impl RelayChainInterface for RelayChainRPCInterface { async fn import_notification_stream( &self, ) -> RelayChainResult + Send>>> { - let imported_headers_stream = - self.rpc_client.subscribe_all_heads().await?.filter_map(|item| async move { - item.map_err(|err| { - tracing::error!( - target: LOG_TARGET, - "Encountered error in import notification stream: {}", - err - ) - }) - .ok() - }); + let imported_headers_stream = self.rpc_client.get_imported_heads_stream().await?; Ok(imported_headers_stream.boxed()) } @@ -362,20 +112,7 @@ impl RelayChainInterface for RelayChainRPCInterface { async fn finality_notification_stream( &self, ) -> RelayChainResult + Send>>> { - let imported_headers_stream = self - .rpc_client - .subscribe_finalized_heads() - .await? - .filter_map(|item| async move { - item.map_err(|err| { - tracing::error!( - target: LOG_TARGET, - "Encountered error in finality notification stream: {}", - err - ) - }) - .ok() - }); + let imported_headers_stream = self.rpc_client.get_finalized_heads_stream().await?; Ok(imported_headers_stream.boxed()) } @@ -430,7 +167,7 @@ impl RelayChainInterface for RelayChainRPCInterface { /// 3. Wait for the block to be imported via subscription. /// 4. If timeout is reached, we return an error. async fn wait_for_block(&self, wait_for_hash: PHash) -> RelayChainResult<()> { - let mut head_stream = self.rpc_client.subscribe_all_heads().await?; + let mut head_stream = self.rpc_client.get_imported_heads_stream().await?; if self.rpc_client.chain_get_header(Some(wait_for_hash)).await?.is_some() { return Ok(()) @@ -442,7 +179,7 @@ impl RelayChainInterface for RelayChainRPCInterface { futures::select! { _ = timeout => return Err(RelayChainError::WaitTimeout(wait_for_hash)), evt = head_stream.next().fuse() => match evt { - Some(Ok(evt)) if evt.hash() == wait_for_hash => return Ok(()), + Some(evt) if evt.hash() == wait_for_hash => return Ok(()), // Not the event we waited on. Some(_) => continue, None => return Err(RelayChainError::ImportListenerClosed(wait_for_hash)), @@ -454,18 +191,7 @@ impl RelayChainInterface for RelayChainRPCInterface { async fn new_best_notification_stream( &self, ) -> RelayChainResult + Send>>> { - let imported_headers_stream = - self.rpc_client.subscribe_new_best_heads().await?.filter_map(|item| async move { - item.map_err(|err| { - tracing::error!( - target: LOG_TARGET, - "Error in best block notification stream: {}", - err - ) - }) - .ok() - }); - + let imported_headers_stream = self.rpc_client.get_best_heads_stream().await?; Ok(imported_headers_stream.boxed()) } } diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs new file mode 100644 index 00000000000..71014b18e0e --- /dev/null +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -0,0 +1,466 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use backoff::{future::retry_notify, ExponentialBackoff}; +use cumulus_primitives_core::{ + relay_chain::{ + v2::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId}, + Hash as PHash, Header as PHeader, InboundHrmpMessage, + }, + InboundDownwardMessage, ParaId, PersistedValidationData, +}; +use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; +use futures::{ + channel::mpsc::{Receiver, Sender}, + StreamExt, +}; +use jsonrpsee::{ + core::{ + client::{Client as JsonRpcClient, ClientT, Subscription, SubscriptionClientT}, + Error as JsonRpseeError, + }, + rpc_params, + types::ParamsSer, + ws_client::WsClientBuilder, +}; +use parity_scale_codec::{Decode, Encode}; +use polkadot_service::TaskManager; +use sc_client_api::StorageData; +use sc_rpc_api::{state::ReadProof, system::Health}; +use sp_core::sp_std::collections::btree_map::BTreeMap; +use sp_runtime::DeserializeOwned; +use sp_storage::StorageKey; +use std::sync::Arc; +use tokio::sync::mpsc::{ + channel as tokio_channel, Receiver as TokioReceiver, Sender as TokioSender, +}; + +pub use url::Url; + +const LOG_TARGET: &str = "relay-chain-rpc-client"; + +const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20; + +/// Client that maps RPC methods and deserializes results +#[derive(Clone)] +pub struct RelayChainRpcClient { + /// Websocket client to make calls + ws_client: Arc, + + /// Retry strategy that should be used for requests and subscriptions + retry_strategy: ExponentialBackoff, + + /// Channel to communicate with the RPC worker + to_worker_channel: TokioSender, +} + +/// Worker messages to register new notification listeners +#[derive(Clone, Debug)] +pub enum NotificationRegisterMessage { + RegisterBestHeadListener(Sender), + RegisterImportListener(Sender), + RegisterFinalizationListener(Sender), +} + +/// Worker that should be used in combination with [`RelayChainRpcClient`]. Must be polled to distribute header notifications to listeners. +struct RpcStreamWorker { + // Communication channel with the RPC client + client_receiver: TokioReceiver, + + // Senders to distribute incoming header notifications to + imported_header_listeners: Vec>, + finalized_header_listeners: Vec>, + best_header_listeners: Vec>, + + // Incoming notification subscriptions + rpc_imported_header_subscription: Subscription, + rpc_finalized_header_subscription: Subscription, + rpc_best_header_subscription: Subscription, +} + +/// Entry point to create [`RelayChainRpcClient`] and start a worker that distributes notifications. +pub async fn create_client_and_start_worker( + url: Url, + task_manager: &mut TaskManager, +) -> RelayChainResult { + tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client"); + let ws_client = WsClientBuilder::default().build(url.as_str()).await?; + + let best_head_stream = RelayChainRpcClient::subscribe_new_best_heads(&ws_client).await?; + let finalized_head_stream = RelayChainRpcClient::subscribe_finalized_heads(&ws_client).await?; + let imported_head_stream = RelayChainRpcClient::subscribe_imported_heads(&ws_client).await?; + + let (worker, sender) = + RpcStreamWorker::new(imported_head_stream, best_head_stream, finalized_head_stream); + let client = RelayChainRpcClient::new(ws_client, sender).await?; + + task_manager + .spawn_essential_handle() + .spawn("relay-chain-rpc-worker", None, worker.run()); + + Ok(client) +} + +fn handle_event_distribution( + event: Option>, + senders: &mut Vec>, +) -> Result<(), String> { + match event { + Some(Ok(header)) => { + senders.retain_mut(|e| { + match e.try_send(header.clone()) { + // Receiver has been dropped, remove Sender from list. + Err(error) if error.is_disconnected() => false, + // Channel is full. This should not happen. + // TODO: Improve error handling here + // https://github.com/paritytech/cumulus/issues/1482 + Err(error) => { + tracing::error!(target: LOG_TARGET, ?error, "Event distribution channel has reached its limit. This can lead to missed notifications."); + true + }, + _ => true, + } + }); + Ok(()) + }, + None => Err("RPC Subscription closed.".to_string()), + Some(Err(err)) => Err(format!("Error in RPC subscription: {}", err)), + } +} + +impl RpcStreamWorker { + /// Create new worker. Returns the worker and a channel to register new listeners. + fn new( + import_sub: Subscription, + best_sub: Subscription, + finalized_sub: Subscription, + ) -> (RpcStreamWorker, TokioSender) { + let (tx, rx) = tokio_channel(100); + let worker = RpcStreamWorker { + client_receiver: rx, + imported_header_listeners: Vec::new(), + finalized_header_listeners: Vec::new(), + best_header_listeners: Vec::new(), + rpc_imported_header_subscription: import_sub, + rpc_best_header_subscription: best_sub, + rpc_finalized_header_subscription: finalized_sub, + }; + (worker, tx) + } + + /// Run this worker to drive notification streams. + /// The worker does two things: + /// 1. Listen for `NotificationRegisterMessage` and register new listeners for the notification streams + /// 2. Distribute incoming import, best head and finalization notifications to registered listeners. + /// If an error occurs during sending, the receiver has been closed and we remove the sender from the list. + pub async fn run(mut self) { + let mut import_sub = self.rpc_imported_header_subscription.fuse(); + let mut best_head_sub = self.rpc_best_header_subscription.fuse(); + let mut finalized_sub = self.rpc_finalized_header_subscription.fuse(); + loop { + tokio::select! { + evt = self.client_receiver.recv() => match evt { + Some(NotificationRegisterMessage::RegisterBestHeadListener(tx)) => { + self.best_header_listeners.push(tx); + }, + Some(NotificationRegisterMessage::RegisterImportListener(tx)) => { + self.imported_header_listeners.push(tx) + }, + Some(NotificationRegisterMessage::RegisterFinalizationListener(tx)) => { + self.finalized_header_listeners.push(tx) + }, + None => { + tracing::error!(target: LOG_TARGET, "RPC client receiver closed. Stopping RPC Worker."); + return; + } + }, + import_event = import_sub.next() => { + if let Err(err) = handle_event_distribution(import_event, &mut self.imported_header_listeners) { + tracing::error!(target: LOG_TARGET, err, "Encountered error while processing imported header notification. Stopping RPC Worker."); + return; + } + }, + best_header_event = best_head_sub.next() => { + if let Err(err) = handle_event_distribution(best_header_event, &mut self.best_header_listeners) { + tracing::error!(target: LOG_TARGET, err, "Encountered error while processing best header notification. Stopping RPC Worker."); + return; + } + } + finalized_event = finalized_sub.next() => { + if let Err(err) = handle_event_distribution(finalized_event, &mut self.finalized_header_listeners) { + tracing::error!(target: LOG_TARGET, err, "Encountered error while processing finalized header notification. Stopping RPC Worker."); + return; + } + } + } + } + } +} + +impl RelayChainRpcClient { + /// Initialize new RPC Client. + async fn new( + ws_client: JsonRpcClient, + sender: TokioSender, + ) -> RelayChainResult { + let client = RelayChainRpcClient { + to_worker_channel: sender, + ws_client: Arc::new(ws_client), + retry_strategy: ExponentialBackoff::default(), + }; + + Ok(client) + } + + /// Call a call to `state_call` rpc method. + pub async fn call_remote_runtime_function( + &self, + method_name: &str, + hash: PHash, + payload: Option, + ) -> RelayChainResult { + let payload_bytes = + payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode())); + let params = rpc_params! { + method_name, + payload_bytes, + hash + }; + let res = self + .request_tracing::("state_call", params, |err| { + tracing::trace!( + target: LOG_TARGET, + %method_name, + %hash, + error = %err, + "Error during call to 'state_call'.", + ); + }) + .await?; + Decode::decode(&mut &*res.0).map_err(Into::into) + } + + /// Subscribe to a notification stream via RPC + + /// Perform RPC request + async fn request<'a, R>( + &self, + method: &'a str, + params: Option>, + ) -> Result + where + R: DeserializeOwned + std::fmt::Debug, + { + self.request_tracing( + method, + params, + |e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"), + ) + .await + } + + /// Perform RPC request + async fn request_tracing<'a, R, OR>( + &self, + method: &'a str, + params: Option>, + trace_error: OR, + ) -> Result + where + R: DeserializeOwned + std::fmt::Debug, + OR: Fn(&jsonrpsee::core::Error), + { + retry_notify( + self.retry_strategy.clone(), + || async { + self.ws_client.request(method, params.clone()).await.map_err(|err| match err { + JsonRpseeError::Transport(_) => + backoff::Error::Transient { err, retry_after: None }, + _ => backoff::Error::Permanent(err), + }) + }, + |error, dur| tracing::trace!(target: LOG_TARGET, %error, ?dur, "Encountered transport error, retrying."), + ) + .await + .map_err(|err| { + trace_error(&err); + RelayChainError::RpcCallError(method.to_string(), err)}) + } + + pub async fn system_health(&self) -> Result { + self.request("system_health", None).await + } + + pub async fn state_get_read_proof( + &self, + storage_keys: Vec, + at: Option, + ) -> Result, RelayChainError> { + let params = rpc_params!(storage_keys, at); + self.request("state_getReadProof", params).await + } + + pub async fn state_get_storage( + &self, + storage_key: StorageKey, + at: Option, + ) -> Result, RelayChainError> { + let params = rpc_params!(storage_key, at); + self.request("state_getStorage", params).await + } + + pub async fn chain_get_head(&self) -> Result { + self.request("chain_getHead", None).await + } + + pub async fn chain_get_header( + &self, + hash: Option, + ) -> Result, RelayChainError> { + let params = rpc_params!(hash); + self.request("chain_getHeader", params).await + } + + pub async fn parachain_host_candidate_pending_availability( + &self, + at: PHash, + para_id: ParaId, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_candidate_pending_availability", + at, + Some(para_id), + ) + .await + } + + pub async fn parachain_host_session_index_for_child( + &self, + at: PHash, + ) -> Result { + self.call_remote_runtime_function("ParachainHost_session_index_for_child", at, None::<()>) + .await + } + + pub async fn parachain_host_validators( + &self, + at: PHash, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_validators", at, None::<()>) + .await + } + + pub async fn parachain_host_persisted_validation_data( + &self, + at: PHash, + para_id: ParaId, + occupied_core_assumption: OccupiedCoreAssumption, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_persisted_validation_data", + at, + Some((para_id, occupied_core_assumption)), + ) + .await + } + + pub async fn parachain_host_inbound_hrmp_channels_contents( + &self, + para_id: ParaId, + at: PHash, + ) -> Result>, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_inbound_hrmp_channels_contents", + at, + Some(para_id), + ) + .await + } + + pub async fn parachain_host_dmq_contents( + &self, + para_id: ParaId, + at: PHash, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_dmq_contents", at, Some(para_id)) + .await + } + + fn send_register_message_to_worker( + &self, + message: NotificationRegisterMessage, + ) -> Result<(), RelayChainError> { + self.to_worker_channel + .try_send(message) + .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string())) + } + + pub async fn get_imported_heads_stream(&self) -> Result, RelayChainError> { + let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); + self.send_register_message_to_worker(NotificationRegisterMessage::RegisterImportListener( + tx, + ))?; + Ok(rx) + } + + pub async fn get_best_heads_stream(&self) -> Result, RelayChainError> { + let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); + self.send_register_message_to_worker( + NotificationRegisterMessage::RegisterBestHeadListener(tx), + )?; + Ok(rx) + } + + pub async fn get_finalized_heads_stream(&self) -> Result, RelayChainError> { + let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); + self.send_register_message_to_worker( + NotificationRegisterMessage::RegisterFinalizationListener(tx), + )?; + Ok(rx) + } + + async fn subscribe_imported_heads( + ws_client: &JsonRpcClient, + ) -> Result, RelayChainError> { + Ok(ws_client + .subscribe::("chain_subscribeAllHeads", None, "chain_unsubscribeAllHeads") + .await?) + } + + async fn subscribe_finalized_heads( + ws_client: &JsonRpcClient, + ) -> Result, RelayChainError> { + Ok(ws_client + .subscribe::( + "chain_subscribeFinalizedHeads", + None, + "chain_unsubscribeFinalizedHeads", + ) + .await?) + } + + async fn subscribe_new_best_heads( + ws_client: &JsonRpcClient, + ) -> Result, RelayChainError> { + Ok(ws_client + .subscribe::( + "chain_subscribeNewHeads", + None, + "chain_unsubscribeFinalizedHeads", + ) + .await?) + } +} diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index aec1ce8e8b9..d364b172e79 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -22,7 +22,7 @@ use cumulus_client_service::{ use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::RelayChainRPCInterface; +use cumulus_relay_chain_rpc_interface::{create_client_and_start_worker, RelayChainRpcInterface}; // Substrate Imports use sc_client_api::ExecutorProvider; @@ -176,8 +176,10 @@ async fn build_relay_chain_interface( hwbench: Option, ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { match collator_options.relay_chain_rpc_url { - Some(relay_chain_url) => - Ok((Arc::new(RelayChainRPCInterface::new(relay_chain_url).await?) as Arc<_>, None)), + Some(relay_chain_url) => { + let client = create_client_and_start_worker(relay_chain_url, task_manager).await?; + Ok((Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>, None)) + }, None => build_inprocess_relay_chain( polkadot_config, parachain_config, diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index 8994b2e2c48..8a47675f7f4 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -30,7 +30,7 @@ use cumulus_primitives_core::{ }; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::RelayChainRPCInterface; +use cumulus_relay_chain_rpc_interface::{create_client_and_start_worker, RelayChainRpcInterface}; use polkadot_service::CollatorPair; use sp_core::Pair; @@ -296,8 +296,10 @@ async fn build_relay_chain_interface( hwbench: Option, ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { match collator_options.relay_chain_rpc_url { - Some(relay_chain_url) => - Ok((Arc::new(RelayChainRPCInterface::new(relay_chain_url).await?) as Arc<_>, None)), + Some(relay_chain_url) => { + let client = create_client_and_start_worker(relay_chain_url, task_manager).await?; + Ok((Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>, None)) + }, None => build_inprocess_relay_chain( polkadot_config, parachain_config, diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index 990c540b9e1..69593b3c1e5 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -37,7 +37,7 @@ use cumulus_client_service::{ use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::RelayChainInProcessInterface; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::RelayChainRPCInterface; +use cumulus_relay_chain_rpc_interface::{create_client_and_start_worker, RelayChainRpcInterface}; use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi}; use parking_lot::Mutex; @@ -182,7 +182,8 @@ async fn build_relay_chain_interface( task_manager: &mut TaskManager, ) -> RelayChainResult> { if let Some(relay_chain_url) = collator_options.relay_chain_rpc_url { - return Ok(Arc::new(RelayChainRPCInterface::new(relay_chain_url).await?) as Arc<_>) + let client = create_client_and_start_worker(relay_chain_url, task_manager).await?; + return Ok(Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>) } let relay_chain_full_node = polkadot_test_service::new_full(