Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle substrate node disconnection #522

Merged
merged 5 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions crates/event-watcher-traits/src/substrate/bridge_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use super::{event_watcher::SubstrateEventWatcher, *};
use sp_core::sr25519::Pair as Sr25519Pair;
use webb::substrate::subxt::OnlineClient;
use webb_relayer_context::RelayerContext;
// A Substrate Bridge Watcher is a trait for Signature Bridge Pallet that is not specific for watching events from that pallet,
/// instead it watches for commands sent from other event watchers or services, it helps decouple the event watchers
/// from the actual action that should be taken depending on the event.
Expand All @@ -30,7 +32,7 @@ where
&self,
chain_id: u32,
store: Arc<Self::Store>,
client: Arc<Self::Client>,
client: Arc<OnlineClient<RuntimeConfig>>,
pair: Sr25519Pair,
cmd: BridgeCommand,
) -> webb_relayer_utils::Result<()>;
Expand All @@ -47,7 +49,7 @@ where
async fn run(
&self,
chain_id: u32,
client: Arc<Self::Client>,
ctx: RelayerContext,
pair: Sr25519Pair,
store: Arc<Self::Store>,
) -> webb_relayer_utils::Result<()> {
Expand All @@ -58,6 +60,11 @@ where
webb_proposals::TypedChainId::Substrate(chain_id);
let bridge_key = BridgeKey::new(typed_chain_id);
let key = SledQueueKey::from_bridge_key(bridge_key);
let client = ctx
.substrate_provider::<RuntimeConfig>(&chain_id.to_string())
.await?;
let client = Arc::new(client);

loop {
let result = match store.dequeue_item(key)? {
Some(cmd) => {
Expand Down
28 changes: 22 additions & 6 deletions crates/event-watcher-traits/src/substrate/event_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
// limitations under the License.

use tokio::sync::Mutex;
use webb::substrate::subxt::config::Header;
use webb::substrate::subxt::{config::Header, OnlineClient};
use webb_relayer_config::event_watcher::EventsWatcherConfig;
use webb_relayer_context::RelayerContext;
use webb_relayer_utils::{metric, retry};

use super::*;
Expand All @@ -23,7 +24,7 @@ use super::*;
pub type EventHandlerFor<W, RuntimeConfig> = Box<
dyn EventHandler<
RuntimeConfig,
Client = <W as SubstrateEventWatcher<RuntimeConfig>>::Client,
Client = OnlineClient<RuntimeConfig>,
Store = <W as SubstrateEventWatcher<RuntimeConfig>>::Store,
> + Send
+ Sync,
Expand Down Expand Up @@ -125,8 +126,6 @@ where

/// The name of the pallet that this event watcher is watching.
const PALLET_NAME: &'static str;
/// The Runtime Client that can be used to perform API calls.
type Client: OnlineClientT<RuntimeConfig> + Send + Sync;

/// The Storage backend, used by the event watcher to store its state.
type Store: HistoryStore;
Expand All @@ -143,17 +142,34 @@ where
async fn run(
&self,
chain_id: u32,
client: Arc<Self::Client>,
ctx: RelayerContext,
store: Arc<Self::Store>,
event_watcher_config: EventsWatcherConfig,
handlers: Vec<EventHandlerFor<Self, RuntimeConfig>>,
metrics: Arc<Mutex<metric::Metrics>>,
) -> webb_relayer_utils::Result<()> {
const MAX_RETRY_COUNT: usize = 5;

let backoff = backoff::backoff::Constant::new(Duration::from_secs(1));
let backoff = backoff::ExponentialBackoff {
max_elapsed_time: None,
..Default::default()
};
let metrics_clone = metrics.clone();
let task = || async {
let maybe_client = ctx
.substrate_provider::<RuntimeConfig>(&chain_id.to_string())
.await;
let client = match maybe_client {
Ok(client) => client,
Err(err) => {
tracing::error!(
"Failed to connect with substrate client for chain_id: {}, retrying...!",
chain_id
);
return Err(backoff::Error::transient(err));
}
};
let client = Arc::new(client);
let mut instant = std::time::Instant::now();
let step = 1u64;
let rpc = client.rpc();
Expand Down
6 changes: 1 addition & 5 deletions crates/event-watcher-traits/src/substrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ use futures::prelude::*;
use std::cmp;
use std::sync::Arc;
use std::time::Duration;
use webb::substrate::subxt::{
self,
client::{OfflineClientT, OnlineClientT},
config::Header,
};
use webb::substrate::subxt::{self, client::OnlineClientT, config::Header};
use webb_proposals::{
ResourceId, SubstrateTargetSystem, TargetSystem, TypedChainId,
};
Expand Down
5 changes: 1 addition & 4 deletions crates/event-watcher-traits/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ impl SubstrateEventWatcher<PolkadotConfig> for TestEventsWatcher {

const PALLET_NAME: &'static str = "System";

type Client = OnlineClient<PolkadotConfig>;

type Store = SledStore;
}

Expand Down Expand Up @@ -83,7 +81,6 @@ async fn substrate_event_watcher_should_work() -> webb_relayer_utils::Result<()>
{
let chain_id = 5u32;
let store = SledStore::temporary()?;
let client = OnlineClient::<PolkadotConfig>::new().await?;
let watcher = TestEventsWatcher::default();
let config = webb_relayer_config::WebbRelayerConfig::default();
let ctx = RelayerContext::new(config, store.clone())?;
Expand All @@ -92,7 +89,7 @@ async fn substrate_event_watcher_should_work() -> webb_relayer_utils::Result<()>
watcher
.run(
chain_id,
client.into(),
ctx,
Arc::new(store),
event_watcher_config,
vec![Box::<RemarkedEventHandler>::default()],
Expand Down
21 changes: 16 additions & 5 deletions crates/tx-queue/src/substrate/substrate_tx_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use rand::Rng;
use webb::substrate::subxt;
use webb::substrate::subxt::config::ExtrinsicParams;
use webb::substrate::subxt::tx::SubmittableExtrinsic;
use webb::substrate::subxt::PolkadotConfig;
use webb_relayer_context::RelayerContext;
use webb_relayer_store::sled::SledQueueKey;
use webb_relayer_store::QueueStore;
Expand Down Expand Up @@ -87,11 +88,6 @@ where
max_elapsed_time: None,
..Default::default()
};
// protocol-substrate client
let client = self
.ctx
.substrate_provider::<X>(&chain_id.to_string())
.await?;

tracing::event!(
target: webb_relayer_utils::probe::TARGET,
Expand All @@ -104,6 +100,21 @@ where

let metrics_clone = self.ctx.metrics.clone();
let task = || async {
// Tangle node connection
let maybe_client = self
.ctx
.substrate_provider::<PolkadotConfig>(&chain_id.to_string())
.await;
let client = match maybe_client {
Ok(client) => client,
Err(err) => {
tracing::error!(
"Failed to connect with substrate client for chain_id: {}, retrying...!",
chain_id
);
return Err(backoff::Error::transient(err));
}
};
loop {
tracing::trace!("Checking for any txs in the queue ...");
// dequeue signed transaction
Expand Down
6 changes: 1 addition & 5 deletions event-watchers/dkg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod public_key_changed_handler;
#[doc(hidden)]
pub use public_key_changed_handler::*;
use webb::substrate::subxt::events::StaticEvent;
use webb::substrate::subxt::{self, PolkadotConfig};
use webb::substrate::subxt::PolkadotConfig;
use webb::substrate::tangle_runtime::api::{
dkg::events::PublicKeySignatureChanged,
dkg_proposal_handler::events::ProposalSigned,
Expand All @@ -38,8 +38,6 @@ impl SubstrateEventWatcher<PolkadotConfig> for DKGMetadataWatcher {

const PALLET_NAME: &'static str = PublicKeySignatureChanged::PALLET;

type Client = subxt::OnlineClient<PolkadotConfig>;

type Store = webb_relayer_store::SledStore;
}

Expand All @@ -53,7 +51,5 @@ impl SubstrateEventWatcher<PolkadotConfig> for DKGProposalHandlerWatcher {

const PALLET_NAME: &'static str = ProposalSigned::PALLET;

type Client = subxt::OnlineClient<PolkadotConfig>;

type Store = webb_relayer_store::SledStore;
}
4 changes: 1 addition & 3 deletions event-watchers/substrate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub use vanchor_encrypted_output_handler::*;
pub use vanchor_leaves_handler::*;
use webb::substrate::subxt::events::StaticEvent;
use webb::substrate::{
subxt::{OnlineClient, PolkadotConfig},
subxt::PolkadotConfig,
tangle_runtime::api::v_anchor_bn254::events::Transaction,
};
use webb_event_watcher_traits::SubstrateEventWatcher;
Expand All @@ -42,7 +42,5 @@ impl SubstrateEventWatcher<PolkadotConfig> for SubstrateVAnchorEventWatcher {

const PALLET_NAME: &'static str = Transaction::PALLET;

type Client = OnlineClient<PolkadotConfig>;

type Store = SledStore;
}
7 changes: 3 additions & 4 deletions event-watchers/substrate/src/signature_bridge_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ pub struct SubstrateBridgeEventWatcher;
impl SubstrateEventWatcher<PolkadotConfig> for SubstrateBridgeEventWatcher {
const TAG: &'static str = "Substrate bridge pallet Watcher";
const PALLET_NAME: &'static str = MaintainerSet::PALLET;
type Client = OnlineClient<PolkadotConfig>;
type Store = SledStore;
}

Expand All @@ -109,7 +108,7 @@ impl SubstrateBridgeWatcher<PolkadotConfig> for SubstrateBridgeEventWatcher {
&self,
chain_id: u32,
store: Arc<Self::Store>,
client: Arc<Self::Client>,
client: Arc<OnlineClient<PolkadotConfig>>,
pair: Sr25519Pair,
cmd: BridgeCommand,
) -> webb_relayer_utils::Result<()> {
Expand Down Expand Up @@ -154,7 +153,7 @@ where
&self,
chain_id: u32,
store: Arc<<Self as SubstrateEventWatcher<PolkadotConfig>>::Store>,
api: Arc<<Self as SubstrateEventWatcher<PolkadotConfig>>::Client>,
api: Arc<OnlineClient<PolkadotConfig>>,
pair: Sr25519Pair,
(proposal_data, signature): (Vec<u8>, Vec<u8>),
) -> webb_relayer_utils::Result<()> {
Expand Down Expand Up @@ -252,7 +251,7 @@ where
&self,
chain_id: u32,
store: Arc<<Self as SubstrateEventWatcher<PolkadotConfig>>::Store>,
api: Arc<<Self as SubstrateEventWatcher<PolkadotConfig>>::Client>,
api: Arc<OnlineClient<PolkadotConfig>>,
pair: Sr25519Pair,
(public_key, nonce, signature): (Vec<u8>, u32, Vec<u8>),
) -> webb_relayer_utils::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion examples/in_depth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,6 @@ async fn main() -> anyhow::Result<()> {
service::build_web_services(ctx.clone()).await?;
// and also the background services:
// this does not block, will fire the services on background tasks.
service::ignite(&ctx, Arc::new(store)).await?;
service::ignite(ctx, Arc::new(store)).await?;
Ok(())
}
3 changes: 2 additions & 1 deletion services/webb-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ async fn main(args: Opts) -> anyhow::Result<()> {
let server_handle = tokio::spawn(build_web_services(ctx.clone()));
// start all background services.
// this does not block, will fire the services on background tasks.
webb_relayer::service::ignite(&ctx, Arc::new(store)).await?;
webb_relayer::service::ignite(ctx.clone(), Arc::new(store)).await?;

tracing::event!(
target: webb_relayer_utils::probe::TARGET,
tracing::Level::DEBUG,
Expand Down
6 changes: 3 additions & 3 deletions services/webb-relayer/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ pub async fn build_web_services(ctx: RelayerContext) -> crate::Result<()> {
/// * `ctx` - RelayContext reference that holds the configuration
/// * `store` -[Sled](https://sled.rs)-based database store
pub async fn ignite(
ctx: &RelayerContext,
ctx: RelayerContext,
store: Arc<Store>,
) -> crate::Result<()> {
tracing::trace!(
"Relayer configuration: {}",
serde_json::to_string_pretty(&ctx.config)?
);
evm::ignite(ctx, store.clone()).await?;
substrate::ignite(ctx, store.clone()).await?;
evm::ignite(&ctx, store.clone()).await?;
substrate::ignite(ctx.clone(), store.clone()).await?;
Ok(())
}

Expand Down
Loading