diff --git a/crates/event-watcher-traits/src/substrate/bridge_watcher.rs b/crates/event-watcher-traits/src/substrate/bridge_watcher.rs index 119e80e05..83f017639 100644 --- a/crates/event-watcher-traits/src/substrate/bridge_watcher.rs +++ b/crates/event-watcher-traits/src/substrate/bridge_watcher.rs @@ -14,6 +14,8 @@ use super::{event_watcher::SubstrateEventWatcher, *}; use sp_core::sr25519::Pair as Sr25519Pair; +use webb::substrate::subxt::OnlineClient; +use webb_relayer_context::RelayerContext; // A Substrate Bridge Watcher is a trait for Signature Bridge Pallet that is not specific for watching events from that pallet, /// instead it watches for commands sent from other event watchers or services, it helps decouple the event watchers /// from the actual action that should be taken depending on the event. @@ -30,7 +32,7 @@ where &self, chain_id: u32, store: Arc, - client: Arc, + client: Arc>, pair: Sr25519Pair, cmd: BridgeCommand, ) -> webb_relayer_utils::Result<()>; @@ -47,7 +49,7 @@ where async fn run( &self, chain_id: u32, - client: Arc, + ctx: RelayerContext, pair: Sr25519Pair, store: Arc, ) -> webb_relayer_utils::Result<()> { @@ -58,6 +60,11 @@ where webb_proposals::TypedChainId::Substrate(chain_id); let bridge_key = BridgeKey::new(typed_chain_id); let key = SledQueueKey::from_bridge_key(bridge_key); + let client = ctx + .substrate_provider::(&chain_id.to_string()) + .await?; + let client = Arc::new(client); + loop { let result = match store.dequeue_item(key)? { Some(cmd) => { diff --git a/crates/event-watcher-traits/src/substrate/event_watcher.rs b/crates/event-watcher-traits/src/substrate/event_watcher.rs index bc6a94f26..388b8716b 100644 --- a/crates/event-watcher-traits/src/substrate/event_watcher.rs +++ b/crates/event-watcher-traits/src/substrate/event_watcher.rs @@ -13,8 +13,9 @@ // limitations under the License. use tokio::sync::Mutex; -use webb::substrate::subxt::config::Header; +use webb::substrate::subxt::{config::Header, OnlineClient}; use webb_relayer_config::event_watcher::EventsWatcherConfig; +use webb_relayer_context::RelayerContext; use webb_relayer_utils::{metric, retry}; use super::*; @@ -23,7 +24,7 @@ use super::*; pub type EventHandlerFor = Box< dyn EventHandler< RuntimeConfig, - Client = >::Client, + Client = OnlineClient, Store = >::Store, > + Send + Sync, @@ -125,8 +126,6 @@ where /// The name of the pallet that this event watcher is watching. const PALLET_NAME: &'static str; - /// The Runtime Client that can be used to perform API calls. - type Client: OnlineClientT + Send + Sync; /// The Storage backend, used by the event watcher to store its state. type Store: HistoryStore; @@ -143,7 +142,7 @@ where async fn run( &self, chain_id: u32, - client: Arc, + ctx: RelayerContext, store: Arc, event_watcher_config: EventsWatcherConfig, handlers: Vec>, @@ -151,9 +150,26 @@ where ) -> webb_relayer_utils::Result<()> { const MAX_RETRY_COUNT: usize = 5; - let backoff = backoff::backoff::Constant::new(Duration::from_secs(1)); + let backoff = backoff::ExponentialBackoff { + max_elapsed_time: None, + ..Default::default() + }; let metrics_clone = metrics.clone(); let task = || async { + let maybe_client = ctx + .substrate_provider::(&chain_id.to_string()) + .await; + let client = match maybe_client { + Ok(client) => client, + Err(err) => { + tracing::error!( + "Failed to connect with substrate client for chain_id: {}, retrying...!", + chain_id + ); + return Err(backoff::Error::transient(err)); + } + }; + let client = Arc::new(client); let mut instant = std::time::Instant::now(); let step = 1u64; let rpc = client.rpc(); diff --git a/crates/event-watcher-traits/src/substrate/mod.rs b/crates/event-watcher-traits/src/substrate/mod.rs index e907e13c9..1468f334a 100644 --- a/crates/event-watcher-traits/src/substrate/mod.rs +++ b/crates/event-watcher-traits/src/substrate/mod.rs @@ -18,11 +18,7 @@ use futures::prelude::*; use std::cmp; use std::sync::Arc; use std::time::Duration; -use webb::substrate::subxt::{ - self, - client::{OfflineClientT, OnlineClientT}, - config::Header, -}; +use webb::substrate::subxt::{self, client::OnlineClientT, config::Header}; use webb_proposals::{ ResourceId, SubstrateTargetSystem, TargetSystem, TypedChainId, }; diff --git a/crates/event-watcher-traits/src/tests.rs b/crates/event-watcher-traits/src/tests.rs index 9c5191e6a..2003b4b7d 100644 --- a/crates/event-watcher-traits/src/tests.rs +++ b/crates/event-watcher-traits/src/tests.rs @@ -33,8 +33,6 @@ impl SubstrateEventWatcher for TestEventsWatcher { const PALLET_NAME: &'static str = "System"; - type Client = OnlineClient; - type Store = SledStore; } @@ -83,7 +81,6 @@ async fn substrate_event_watcher_should_work() -> webb_relayer_utils::Result<()> { let chain_id = 5u32; let store = SledStore::temporary()?; - let client = OnlineClient::::new().await?; let watcher = TestEventsWatcher::default(); let config = webb_relayer_config::WebbRelayerConfig::default(); let ctx = RelayerContext::new(config, store.clone())?; @@ -92,7 +89,7 @@ async fn substrate_event_watcher_should_work() -> webb_relayer_utils::Result<()> watcher .run( chain_id, - client.into(), + ctx, Arc::new(store), event_watcher_config, vec![Box::::default()], diff --git a/crates/tx-queue/src/substrate/substrate_tx_queue.rs b/crates/tx-queue/src/substrate/substrate_tx_queue.rs index 80ba2b4bf..2bd991e5e 100644 --- a/crates/tx-queue/src/substrate/substrate_tx_queue.rs +++ b/crates/tx-queue/src/substrate/substrate_tx_queue.rs @@ -18,6 +18,7 @@ use rand::Rng; use webb::substrate::subxt; use webb::substrate::subxt::config::ExtrinsicParams; use webb::substrate::subxt::tx::SubmittableExtrinsic; +use webb::substrate::subxt::PolkadotConfig; use webb_relayer_context::RelayerContext; use webb_relayer_store::sled::SledQueueKey; use webb_relayer_store::QueueStore; @@ -87,11 +88,6 @@ where max_elapsed_time: None, ..Default::default() }; - // protocol-substrate client - let client = self - .ctx - .substrate_provider::(&chain_id.to_string()) - .await?; tracing::event!( target: webb_relayer_utils::probe::TARGET, @@ -104,6 +100,21 @@ where let metrics_clone = self.ctx.metrics.clone(); let task = || async { + // Tangle node connection + let maybe_client = self + .ctx + .substrate_provider::(&chain_id.to_string()) + .await; + let client = match maybe_client { + Ok(client) => client, + Err(err) => { + tracing::error!( + "Failed to connect with substrate client for chain_id: {}, retrying...!", + chain_id + ); + return Err(backoff::Error::transient(err)); + } + }; loop { tracing::trace!("Checking for any txs in the queue ..."); // dequeue signed transaction diff --git a/event-watchers/dkg/src/lib.rs b/event-watchers/dkg/src/lib.rs index e516768b6..08635e3a5 100644 --- a/event-watchers/dkg/src/lib.rs +++ b/event-watchers/dkg/src/lib.rs @@ -21,7 +21,7 @@ mod public_key_changed_handler; #[doc(hidden)] pub use public_key_changed_handler::*; use webb::substrate::subxt::events::StaticEvent; -use webb::substrate::subxt::{self, PolkadotConfig}; +use webb::substrate::subxt::PolkadotConfig; use webb::substrate::tangle_runtime::api::{ dkg::events::PublicKeySignatureChanged, dkg_proposal_handler::events::ProposalSigned, @@ -38,8 +38,6 @@ impl SubstrateEventWatcher for DKGMetadataWatcher { const PALLET_NAME: &'static str = PublicKeySignatureChanged::PALLET; - type Client = subxt::OnlineClient; - type Store = webb_relayer_store::SledStore; } @@ -53,7 +51,5 @@ impl SubstrateEventWatcher for DKGProposalHandlerWatcher { const PALLET_NAME: &'static str = ProposalSigned::PALLET; - type Client = subxt::OnlineClient; - type Store = webb_relayer_store::SledStore; } diff --git a/event-watchers/substrate/src/lib.rs b/event-watchers/substrate/src/lib.rs index 038f1146a..9aa95f951 100644 --- a/event-watchers/substrate/src/lib.rs +++ b/event-watchers/substrate/src/lib.rs @@ -27,7 +27,7 @@ pub use vanchor_encrypted_output_handler::*; pub use vanchor_leaves_handler::*; use webb::substrate::subxt::events::StaticEvent; use webb::substrate::{ - subxt::{OnlineClient, PolkadotConfig}, + subxt::PolkadotConfig, tangle_runtime::api::v_anchor_bn254::events::Transaction, }; use webb_event_watcher_traits::SubstrateEventWatcher; @@ -42,7 +42,5 @@ impl SubstrateEventWatcher for SubstrateVAnchorEventWatcher { const PALLET_NAME: &'static str = Transaction::PALLET; - type Client = OnlineClient; - type Store = SledStore; } diff --git a/event-watchers/substrate/src/signature_bridge_watcher.rs b/event-watchers/substrate/src/signature_bridge_watcher.rs index 433f73be6..d707564f2 100644 --- a/event-watchers/substrate/src/signature_bridge_watcher.rs +++ b/event-watchers/substrate/src/signature_bridge_watcher.rs @@ -98,7 +98,6 @@ pub struct SubstrateBridgeEventWatcher; impl SubstrateEventWatcher for SubstrateBridgeEventWatcher { const TAG: &'static str = "Substrate bridge pallet Watcher"; const PALLET_NAME: &'static str = MaintainerSet::PALLET; - type Client = OnlineClient; type Store = SledStore; } @@ -109,7 +108,7 @@ impl SubstrateBridgeWatcher for SubstrateBridgeEventWatcher { &self, chain_id: u32, store: Arc, - client: Arc, + client: Arc>, pair: Sr25519Pair, cmd: BridgeCommand, ) -> webb_relayer_utils::Result<()> { @@ -154,7 +153,7 @@ where &self, chain_id: u32, store: Arc<>::Store>, - api: Arc<>::Client>, + api: Arc>, pair: Sr25519Pair, (proposal_data, signature): (Vec, Vec), ) -> webb_relayer_utils::Result<()> { @@ -252,7 +251,7 @@ where &self, chain_id: u32, store: Arc<>::Store>, - api: Arc<>::Client>, + api: Arc>, pair: Sr25519Pair, (public_key, nonce, signature): (Vec, u32, Vec), ) -> webb_relayer_utils::Result<()> { diff --git a/examples/in_depth.rs b/examples/in_depth.rs index 239eadf97..24460d876 100644 --- a/examples/in_depth.rs +++ b/examples/in_depth.rs @@ -136,6 +136,6 @@ async fn main() -> anyhow::Result<()> { service::build_web_services(ctx.clone()).await?; // and also the background services: // this does not block, will fire the services on background tasks. - service::ignite(&ctx, Arc::new(store)).await?; + service::ignite(ctx, Arc::new(store)).await?; Ok(()) } diff --git a/services/webb-relayer/src/main.rs b/services/webb-relayer/src/main.rs index 457bbafe1..9f27a9b9a 100644 --- a/services/webb-relayer/src/main.rs +++ b/services/webb-relayer/src/main.rs @@ -76,7 +76,8 @@ async fn main(args: Opts) -> anyhow::Result<()> { let server_handle = tokio::spawn(build_web_services(ctx.clone())); // start all background services. // this does not block, will fire the services on background tasks. - webb_relayer::service::ignite(&ctx, Arc::new(store)).await?; + webb_relayer::service::ignite(ctx.clone(), Arc::new(store)).await?; + tracing::event!( target: webb_relayer_utils::probe::TARGET, tracing::Level::DEBUG, diff --git a/services/webb-relayer/src/service/mod.rs b/services/webb-relayer/src/service/mod.rs index e273683c7..383cd8db8 100644 --- a/services/webb-relayer/src/service/mod.rs +++ b/services/webb-relayer/src/service/mod.rs @@ -86,15 +86,15 @@ pub async fn build_web_services(ctx: RelayerContext) -> crate::Result<()> { /// * `ctx` - RelayContext reference that holds the configuration /// * `store` -[Sled](https://sled.rs)-based database store pub async fn ignite( - ctx: &RelayerContext, + ctx: RelayerContext, store: Arc, ) -> crate::Result<()> { tracing::trace!( "Relayer configuration: {}", serde_json::to_string_pretty(&ctx.config)? ); - evm::ignite(ctx, store.clone()).await?; - substrate::ignite(ctx, store.clone()).await?; + evm::ignite(&ctx, store.clone()).await?; + substrate::ignite(ctx.clone(), store.clone()).await?; Ok(()) } diff --git a/services/webb-relayer/src/service/substrate.rs b/services/webb-relayer/src/service/substrate.rs index b47c56507..f695381a0 100644 --- a/services/webb-relayer/src/service/substrate.rs +++ b/services/webb-relayer/src/service/substrate.rs @@ -59,43 +59,38 @@ pub fn build_web_services() -> Router> { /// * `ctx` - RelayContext reference that holds the configuration /// * `store` -[Sled](https://sled.rs)-based database store pub async fn ignite( - ctx: &RelayerContext, + ctx: RelayerContext, store: Arc, ) -> crate::Result<()> { - for (node_name, node_config) in &ctx.config.substrate { + for (_, node_config) in ctx.clone().config.substrate { if !node_config.enabled { continue; } - ignite_tangle_runtime(ctx, store.clone(), node_name, node_config) - .await?; + ignite_tangle_runtime(ctx.clone(), store.clone(), &node_config).await?; } Ok(()) } async fn ignite_tangle_runtime( - ctx: &RelayerContext, + ctx: RelayerContext, store: Arc, - node_name: &str, node_config: &SubstrateConfig, ) -> crate::Result<()> { - let client = ctx.substrate_provider::(node_name).await?; let chain_id = node_config.chain_id; for pallet in &node_config.pallets { match pallet { Pallet::DKGProposalHandler(config) => { start_dkg_proposal_handler( - ctx, + ctx.clone(), config, - client.clone(), chain_id, store.clone(), )?; } Pallet::Dkg(config) => { start_dkg_pallet_watcher( - ctx, + ctx.clone(), config, - client.clone(), chain_id, store.clone(), )?; @@ -107,7 +102,6 @@ async fn ignite_tangle_runtime( start_substrate_signature_bridge_events_watcher( ctx.clone(), config, - client.clone(), chain_id, store.clone(), ) @@ -115,9 +109,8 @@ async fn ignite_tangle_runtime( } Pallet::VAnchorBn254(config) => { start_substrate_vanchor_event_watcher( - ctx, + ctx.clone(), config, - client.clone(), chain_id, store.clone(), )?; @@ -141,9 +134,8 @@ async fn ignite_tangle_runtime( /// * `chain_id` - An u32 representing the chain id of the chain /// * `store` -[Sled](https://sled.rs)-based database store pub fn start_dkg_proposal_handler( - ctx: &RelayerContext, + ctx: RelayerContext, config: &DKGProposalHandlerPalletConfig, - client: TangleClient, chain_id: u32, store: Arc, ) -> crate::Result<()> { @@ -167,7 +159,7 @@ pub fn start_dkg_proposal_handler( let proposal_signed_handler = ProposalSignedHandler::default(); let proposal_handler_watcher_task = proposal_handler_watcher.run( chain_id, - client.into(), + ctx.clone(), store, my_config.events_watcher, vec![Box::new(proposal_signed_handler)], @@ -205,9 +197,8 @@ pub fn start_dkg_proposal_handler( /// * `chain_id` - An u32 representing the chain id of the chain /// * `store` -[Sled](https://sled.rs)-based database store pub fn start_dkg_pallet_watcher( - ctx: &RelayerContext, + ctx: RelayerContext, config: &DKGPalletConfig, - client: TangleClient, chain_id: u32, store: Arc, ) -> crate::Result<()> { @@ -231,7 +222,7 @@ pub fn start_dkg_pallet_watcher( let dkg_event_watcher_task = dkg_event_watcher.run( chain_id, - client.into(), + ctx.clone(), store, my_config.events_watcher, vec![Box::new(public_key_changed_handler)], @@ -269,9 +260,8 @@ pub fn start_dkg_pallet_watcher( /// * `chain_id` - An u32 representing the chain id of the chain /// * `store` -[Sled](https://sled.rs)-based database store pub fn start_substrate_vanchor_event_watcher( - ctx: &RelayerContext, + ctx: RelayerContext, config: &VAnchorBn254PalletConfig, - client: TangleClient, chain_id: u32, store: Arc, ) -> crate::Result<()> { @@ -316,7 +306,7 @@ pub fn start_substrate_vanchor_event_watcher( let watcher = SubstrateVAnchorEventWatcher::default(); let substrate_vanchor_watcher_task = watcher.run( chain_id, - client.clone().into(), + ctx.clone(), store.clone(), my_config.events_watcher, vec![ @@ -359,7 +349,7 @@ pub fn start_substrate_vanchor_event_watcher( let watcher = SubstrateVAnchorEventWatcher::default(); let substrate_vanchor_watcher_task = watcher.run( chain_id, - client.clone().into(), + ctx.clone(), store.clone(), my_config.events_watcher, vec![ @@ -392,7 +382,7 @@ pub fn start_substrate_vanchor_event_watcher( let watcher = SubstrateVAnchorEventWatcher::default(); let substrate_vanchor_watcher_task = watcher.run( chain_id, - client.clone().into(), + ctx.clone(), store.clone(), my_config.events_watcher, vec![ @@ -417,6 +407,8 @@ pub fn start_substrate_vanchor_event_watcher( } } }; + + tracing::debug!("Task resturned"); crate::Result::Ok(()) }; // kick off the watcher. @@ -428,7 +420,6 @@ pub fn start_substrate_vanchor_event_watcher( pub async fn start_substrate_signature_bridge_events_watcher( ctx: RelayerContext, config: &SignatureBridgePalletConfig, - client: TangleClient, chain_id: u32, store: Arc, ) -> crate::Result<()> { @@ -452,7 +443,7 @@ pub async fn start_substrate_signature_bridge_events_watcher( let events_watcher_task = SubstrateEventWatcher::run( &substrate_bridge_watcher, chain_id, - client.clone().into(), + ctx.clone(), store.clone(), my_config.events_watcher, vec![Box::new(bridge_event_handler)], @@ -461,7 +452,7 @@ pub async fn start_substrate_signature_bridge_events_watcher( let cmd_handler_task = SubstrateBridgeWatcher::run( &substrate_bridge_watcher, chain_id, - client.into(), + ctx.clone(), pair.clone(), store.clone(), );