Skip to content

Commit

Permalink
Use separate threads for chains
Browse files Browse the repository at this point in the history
  • Loading branch information
vmarkushin committed Jun 19, 2023
1 parent a358385 commit 80d132a
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 29 deletions.
47 changes: 35 additions & 12 deletions hyperspace/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use futures::{future::ready, StreamExt};
use primitives::Chain;
use std::convert::Infallible;

pub mod chain;
pub mod command;
Expand All @@ -30,6 +31,7 @@ use events::{has_packet_events, parse_events};
use futures::TryFutureExt;
use ibc::events::IbcEvent;
use metrics::handler::MetricsHandler;
use tokio::task::JoinHandle;

#[derive(Copy, Debug, Clone)]
pub enum Mode {
Expand All @@ -53,19 +55,40 @@ where
let (mut chain_a_finality, mut chain_b_finality) =
(chain_a.finality_notifications().await?, chain_b.finality_notifications().await?);

// loop forever
loop {
tokio::select! {
// new finality event from chain A
result = chain_a_finality.next() => {
process_finality_event!(chain_a, chain_b, chain_a_metrics, mode, result, chain_a_finality, chain_b_finality)
}
// new finality event from chain B
result = chain_b_finality.next() => {
process_finality_event!(chain_b, chain_a, chain_b_metrics, mode, result, chain_b_finality, chain_a_finality)
}
let mut chain_a_cl = chain_a.clone();
let mut chain_b_cl = chain_b.clone();

let jh1: JoinHandle<anyhow::Result<Infallible>> = tokio::spawn(async move {
loop {
let result = chain_a_finality.next().await;
process_finality_event!(
chain_a_cl,
chain_b_cl,
chain_a_metrics,
mode,
result,
chain_a_finality
)
}
}
});

let jh2: JoinHandle<anyhow::Result<Infallible>> = tokio::spawn(async move {
loop {
let result = chain_b_finality.next().await;
process_finality_event!(
chain_b,
chain_a,
chain_b_metrics,
mode,
result,
chain_b_finality
)
}
});

jh1.await??;
jh2.await??;
Ok(())
}

pub async fn fish<A, B>(chain_a: A, chain_b: B) -> Result<(), anyhow::Error>
Expand Down
13 changes: 2 additions & 11 deletions hyperspace/core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

#[macro_export]
macro_rules! process_finality_event {
($source:ident, $sink:ident, $metrics:expr, $mode:ident, $result:ident, $stream_source:ident, $stream_sink:ident) => {
($source:ident, $sink:ident, $metrics:expr, $mode:ident, $result:ident, $stream:ident) => {
match $result {
// stream closed
None => {
log::warn!("Stream closed for {}", $source.name());
$stream_source = loop {
$stream = loop {
match $source.finality_notifications().await {
Ok(stream) => break stream,
Err(e) => {
Expand All @@ -28,15 +28,6 @@ macro_rules! process_finality_event {
},
};
};
$stream_sink = loop {
match $sink.finality_notifications().await {
Ok(stream) => break stream,
Err(e) => {
log::error!("Failed to get finality notifications for {} {:?}. Trying again in 30 seconds...", $sink.name(), e);
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
},
};
};
},
Some(finality_event) => {
log::info!("=======================================================");
Expand Down
19 changes: 14 additions & 5 deletions hyperspace/core/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use ibc_proto::google::protobuf::Any;
use metrics::handler::MetricsHandler;
use primitives::Chain;
use tokio::task::JoinHandle;

/// This sends messages to the sink chain in a gas-aware manner.
pub async fn flush_message_batch(
Expand All @@ -32,7 +33,11 @@ pub async fn flush_message_batch(
log::debug!(target: "hyperspace", "Outgoing messages weight: {} block max weight: {}", batch_weight, block_max_weight);
let ratio = (batch_weight / block_max_weight) as usize;
if ratio == 0 {
sink.submit(msgs).await?;
let sink = sink.clone();
let _join_handler: JoinHandle<Result<_, anyhow::Error>> = tokio::spawn(async move {
sink.submit(msgs).await?;
Ok(())
});
return Ok(())
}

Expand All @@ -51,10 +56,14 @@ pub async fn flush_message_batch(
);
let chunk_size = (msgs.len() / chunk).max(1);
// TODO: return number of failed messages and record it to metrics
for batch in msgs.chunks(chunk_size) {
// send out batches.
sink.submit(batch.to_vec()).await?;
}
let sink = sink.clone();
let _join_handler: JoinHandle<Result<_, anyhow::Error>> = tokio::spawn(async move {
for batch in msgs.chunks(chunk_size) {
// send out batches.
sink.submit(batch.to_vec()).await?;
}
Ok(())
});

Ok(())
}
2 changes: 1 addition & 1 deletion hyperspace/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub fn apply_prefix(mut commitment_prefix: Vec<u8>, path: impl Into<Vec<u8>>) ->
#[async_trait::async_trait]
pub trait IbcProvider {
/// Finality event type, passed on to [`Chain::query_latest_ibc_events`]
type FinalityEvent: Debug;
type FinalityEvent: Debug + Send;
/// A representation of the transaction id for the chain
type TransactionId: Debug;
/// Asset Id
Expand Down

0 comments on commit 80d132a

Please sign in to comment.