Skip to content

Commit

Permalink
Change push to websocket and pull to rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed May 9, 2023
1 parent d683700 commit 3ec205a
Show file tree
Hide file tree
Showing 10 changed files with 24 additions and 18 deletions.
6 changes: 6 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ grpc_addr = 'http://127.0.0.1:9090'
# listens on. Required
websocket_addr = 'ws://127.0.0.1:26657/websocket'

# The type of event source to use for getting events from the chain.
# Either `websocket` for WebSocket or `rpc` for RPC via the `/block_results` endpoint.
# Default: 'websocket'
event_source = 'websocket'

# Specify the maximum amount of time (duration) that the RPC requests should
# take before timing out. Default: 10s (10 seconds)
# Note: Hermes uses this parameter _only_ in `start` mode; for all other CLIs,
Expand Down Expand Up @@ -310,6 +315,7 @@ id = 'ibc-1'
rpc_addr = 'http://127.0.0.1:26557'
grpc_addr = 'http://127.0.0.1:9091'
websocket_addr = 'ws://127.0.0.1:26557/websocket'
event_source = 'websocket'
rpc_timeout = '10s'
account_prefix = 'cosmos'
key_name = 'testkey'
Expand Down
2 changes: 1 addition & 1 deletion crates/relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::runtime::Runtime as TokioRuntime;
use tracing::{error, info, instrument};

use ibc_relayer::{
chain::handle::Subscription, config::ChainConfig, event::source::push::EventSource,
chain::handle::Subscription, config::ChainConfig, event::source::websocket::EventSource,
};
use ibc_relayer_types::{core::ics24_host::identifier::ChainId, events::IbcEvent};

Expand Down
4 changes: 2 additions & 2 deletions crates/relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,13 @@ impl CosmosSdkChain {
use crate::config::EventSource as Mode;

let (event_source, monitor_tx) = match self.config.event_source {
Mode::Push => EventSource::push(
Mode::WebSocket => EventSource::websocket(
self.config.id.clone(),
self.config.websocket_addr.clone(),
self.compat_mode,
self.rt.clone(),
),
Mode::Pull => EventSource::pull(
Mode::Rpc => EventSource::rpc(
self.config.id.clone(),
self.rpc_client.clone(),
self.rt.clone(),
Expand Down
4 changes: 2 additions & 2 deletions crates/relayer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,11 +442,11 @@ pub struct GenesisRestart {
#[serde(rename_all = "lowercase")]
pub enum EventSource {
/// Push-based event source, via WebSocket
Push,
WebSocket,

/// Pull-based event source, via RPC /block_results
#[default]
Pull,
Rpc,
}

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
Expand Down
24 changes: 12 additions & 12 deletions crates/relayer/src/event/source.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod pull;
pub mod push;
pub mod rpc;
pub mod websocket;

use std::sync::Arc;

Expand All @@ -23,35 +23,35 @@ use crate::chain::{handle::Subscription, tracking::TrackingId};
pub type Result<T> = core::result::Result<T, Error>;

pub enum EventSource {
Push(push::EventSource),
Pull(pull::EventSource),
WebSocket(websocket::EventSource),
Rpc(rpc::EventSource),
}

impl EventSource {
pub fn push(
pub fn websocket(
chain_id: ChainId,
ws_url: WebSocketClientUrl,
rpc_compat: CompatMode,
rt: Arc<TokioRuntime>,
) -> Result<(Self, TxEventSourceCmd)> {
let (mut source, tx) = push::EventSource::new(chain_id, ws_url, rpc_compat, rt)?;
let (mut source, tx) = websocket::EventSource::new(chain_id, ws_url, rpc_compat, rt)?;
source.init_subscriptions()?;
Ok((Self::Push(source), tx))
Ok((Self::WebSocket(source), tx))
}

pub fn pull(
pub fn rpc(
chain_id: ChainId,
rpc_client: HttpClient,
rt: Arc<TokioRuntime>,
) -> Result<(Self, TxEventSourceCmd)> {
let (source, tx) = pull::EventSource::new(chain_id, rpc_client, rt)?;
Ok((Self::Pull(source), tx))
let (source, tx) = rpc::EventSource::new(chain_id, rpc_client, rt)?;
Ok((Self::Rpc(source), tx))
}

pub fn run(self) {
match self {
Self::Push(source) => source.run(),
Self::Pull(source) => source.run(),
Self::WebSocket(source) => source.run(),
Self::Rpc(source) => source.run(),
}
}
}
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion tools/test-framework/src/types/single/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl FullNode {
rpc_addr: Url::from_str(&self.chain_driver.rpc_address())?,
websocket_addr: WebSocketClientUrl::from_str(&self.chain_driver.websocket_address())?,
grpc_addr: Url::from_str(&self.chain_driver.grpc_address())?,
event_source: config::EventSource::Push,
event_source: config::EventSource::WebSocket,
rpc_timeout: Duration::from_secs(10),
genesis_restart: None,
account_prefix: self.chain_driver.account_prefix.clone(),
Expand Down

0 comments on commit 3ec205a

Please sign in to comment.