Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
benluelo committed May 3, 2024
1 parent eda9244 commit 66de117
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 43 deletions.
74 changes: 40 additions & 34 deletions lib/block-message/src/chain_impls/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ where
AnyChainIdentified<AnyFetch>: From<Identified<Hc, Fetch<Hc>>>,
AnyChainIdentified<AnyData>: From<Identified<Hc, Data<Hc>>>,
{
tracing::debug!(%from_slot, %to_slot, "fetching beacon block range");
tracing::debug!(%from_slot, %to_slot, "fetching logs in beacon block range");

let event_height = Height {
revision_number,
Expand All @@ -137,39 +137,45 @@ where
let from_block = c.execution_height_of_beacon_slot(from_slot).await;
let to_block = c.execution_height_of_beacon_slot(to_slot).await;

// REVIEW: Surely transactions and events can be fetched in parallel?
conc(
futures::stream::iter(
c.provider()
.get_logs(
&Filter::new()
.address(ethers::types::H160::from(c.ibc_handler_address()))
.from_block(from_block)
// NOTE: This -1 is very important, else events will be double fetched
.to_block(to_block - 1),
)
.await
.unwrap(),
)
.filter_map(|log| async {
let tx_hash = log
.transaction_hash
.expect("log should have transaction_hash")
.into();

tracing::debug!(?log, "raw log");

match IBCHandlerEvents::decode_log(&log.into()) {
Ok(event) => Some(mk_aggregate_event(c, event, event_height, tx_hash).await),
Err(e) => {
tracing::warn!("could not decode evm event {}", e);
None
if from_block == to_block {
tracing::debug!(%from_block, %to_block, %from_slot, %to_slot, "beacon block range is empty");
QueueMsg::Noop
} else {
tracing::debug!(%from_block, %to_block, "fetching block range");
// REVIEW: Surely transactions and events can be fetched in parallel?
conc(
futures::stream::iter(
c.provider()
.get_logs(
&Filter::new()
.address(ethers::types::H160::from(c.ibc_handler_address()))
.from_block(from_block)
// NOTE: This -1 is very important, else events will be double fetched
.to_block(to_block - 1),
)
.await
.unwrap(),
)
.filter_map(|log| async {
let tx_hash = log
.transaction_hash
.expect("log should have transaction_hash")
.into();

tracing::debug!(?log, "raw log");

match IBCHandlerEvents::decode_log(&log.into()) {
Ok(event) => Some(mk_aggregate_event(c, event, event_height, tx_hash).await),
Err(e) => {
tracing::warn!("could not decode evm event {}", e);
None
}
}
}
})
.collect::<Vec<_>>()
.await,
)
})
.collect::<Vec<_>>()
.await,
)
}
}

pub(crate) async fn fetch_beacon_block_range<C, Hc>(
Expand All @@ -194,7 +200,7 @@ where
))
} else {
// attempt to shrink from..to
// note that this is *exclusive* on the `to`
// note that this is *exclusive* on `to`
for slot in (from_slot + 1)..to_slot {
tracing::info!("querying slot {slot}");
match beacon_api_client
Expand Down
4 changes: 2 additions & 2 deletions voyager-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
}
],
"l1_client_id": "08-wasm-0",
"l2_eth_rpc_api": "wss://arbitrum-sepolia.drpc.org",
"l2_eth_rpc_api": "wss://arb-sepolia.g.alchemy.com/v2/6PCr1n8dJeYbE2Z9LrXScs05hLTYiVFl",
"l1_contract_address": "0xd80810638dbDF9081b72C1B33c65375e807281C8",
"l1_latest_confirmed_slot": "117",
"l1": {
Expand Down Expand Up @@ -184,7 +184,7 @@
}
},
"voyager": {
"num_workers": 20,
"num_workers": 1,
"queue": {
"type": "pg-queue",
"database_url": "postgres://postgres:postgrespassword@127.0.0.1:5432/default",
Expand Down
23 changes: 16 additions & 7 deletions voyager/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use axum::{
};
use chain_utils::{AnyChain, AnyChainTryFromConfigError, Chains};
use frame_support_procedural::{CloneNoBound, DebugNoBound};
use futures::{channel::mpsc::UnboundedSender, Future, SinkExt, StreamExt};
use futures::{channel::mpsc::UnboundedSender, Future, SinkExt, StreamExt, TryStreamExt};
use queue_msg::{Engine, InMemoryQueue, Queue, QueueMessageTypes, QueueMsg};
use relay_message::RelayMessageTypes;
use reqwest::StatusCode;
Expand Down Expand Up @@ -274,21 +274,30 @@ impl Voyager {
join_set.spawn(async move {
reactor
.run(&mut q)
.for_each(|x| async {
let msg = x.unwrap();
.try_for_each(|data| async move {
tracing::info!(data = %serde_json::to_string(&data).unwrap(), "received data outside of an aggregation");

tracing::info!(data = %serde_json::to_string(&msg).unwrap(), "received data outside of an aggregation");
Ok(())
})
.await;
Ok(())
.await
});
}

let errs = vec![];

// TODO: figure out
while let Some(res) = join_set.join_next().await {
res.unwrap().unwrap();
match res {
Ok(Ok(())) => {}
Ok(Err(err)) => {
tracing::error!(%err, "error processing message");
panic!();
}
Err(err) => {
tracing::error!(%err, "error processing message");
panic!();
}
}
}

// while let Some(res) = join_set.join_next().await {
Expand Down

0 comments on commit 66de117

Please sign in to comment.