Skip to content

Commit

Permalink
feat(voyager): refactor scroll block fetching to use the same logic a…
Browse files Browse the repository at this point in the history
…s ethereum (#1824)

also implemented a few remaining `todo!()`s
  • Loading branch information
benluelo authored Apr 26, 2024
2 parents d41eff0 + e4bb5a0 commit 9e7ff3e
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 293 deletions.
248 changes: 136 additions & 112 deletions lib/block-message/src/chain_impls/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::{collections::VecDeque, marker::PhantomData};

use beacon_api::client::BeaconApiClient;
use chain_utils::ethereum::{
Ethereum, EthereumChain, EthereumChainExt as _, IBCHandlerEvents, IbcHandlerExt,
ETHEREUM_REVISION_NUMBER,
Ethereum, EthereumChain, IBCHandlerEvents, IbcHandlerExt, ETHEREUM_REVISION_NUMBER,
};
use contracts::{
ibc_channel_handshake::{
Expand All @@ -18,6 +17,7 @@ use contracts::{
ibc_handler::{GetChannelCall, GetConnectionCall},
ibc_packet::{AcknowledgePacketFilter, IBCPacketEvents, RecvPacketFilter, SendPacketFilter},
};
use enumorph::Enumorph;
use ethers::{contract::EthLogDecode, providers::Middleware, types::Filter};
use frunk::{hlist_pat, HList};
use futures::StreamExt;
Expand All @@ -38,7 +38,8 @@ use unionlabs::{
hash::H256,
ibc::{
core::{
channel::channel::Channel, client::height::Height,
channel::channel::Channel,
client::height::{Height, IsHeight},
connection::connection_end::ConnectionEnd,
},
lightclients::cometbls,
Expand All @@ -54,7 +55,7 @@ use crate::{
id, AnyChainIdentified, BlockMessageTypes, ChainExt, DoAggregate, Identified, IsAggregateData,
};

pub trait EthereumChainExt = ChainExt + EthereumChain;
pub trait EthereumChainExt = ChainExt + chain_utils::ethereum::EthereumChainExt;

impl<C: ChainSpec> ChainExt for Ethereum<C> {
type Data = EthereumData<C>;
Expand Down Expand Up @@ -98,117 +99,79 @@ where
to_slot: to_height.revision_height,
}),
)),
EthereumFetch::FetchGetLogs(FetchGetLogs { from_slot, to_slot }) => {
let event_height = Height {
revision_number: ETHEREUM_REVISION_NUMBER,
revision_height: to_slot,
};

let from_block = c
.beacon_api_client
.execution_height(beacon_api::client::BlockId::Slot(from_slot))
.await
.unwrap();
let to_block = c
.beacon_api_client
.execution_height(beacon_api::client::BlockId::Slot(to_slot))
.await
.unwrap();

// REVIEW: Surely transactions and events can be fetched in parallel?
conc(
futures::stream::iter(
c.provider
.get_logs(
&Filter::new()
.address(ethers::types::H160::from(c.ibc_handler_address))
.from_block(from_block)
// NOTE: This -1 is very important, else events will be double fetched
.to_block(to_block - 1),
)
.await
.unwrap(),
)
.then(|log| async {
let tx_hash = log
.transaction_hash
.expect("log should have transaction_hash")
.into();

tracing::debug!(?log, "raw log");

match IBCHandlerEvents::decode_log(&log.into()) {
Ok(event) => {
Some(mk_aggregate_event(c, event, event_height, tx_hash).await)
}
Err(e) => {
tracing::warn!("could not decode evm event {}", e);
None
}
}
})
.collect::<Vec<_>>()
.await
.into_iter()
.flatten()
.collect::<Vec<_>>(),
)
EthereumFetch::FetchGetLogs(get_logs) => {
fetch_get_logs(c, get_logs, ETHEREUM_REVISION_NUMBER).await
}
EthereumFetch::FetchBeaconBlockRange(beacon_block_range) => {
fetch_beacon_block_range(c, beacon_block_range, &c.beacon_api_client).await
}
EthereumFetch::FetchChannel(FetchChannel { height, path }) => data(id(
c.chain_id(),
Data::<Ethereum<C>>::specific(ChannelData {
channel: c
.ibc_handler()
.eth_call(
GetChannelCall {
port_id: path.port_id.to_string(),
channel_id: path.channel_id.to_string(),
},
c.beacon_api_client
.execution_height(beacon_api::client::BlockId::Slot(
height.revision_height,
))
.await
.unwrap(),
)
.await
.unwrap()
.unwrap()
.try_into()
.unwrap(),
__marker: PhantomData,
}),
)),
EthereumFetch::FetchConnection(FetchConnection { height, path }) => data(id(
c.chain_id(),
Data::<Ethereum<C>>::specific(ConnectionData(
c.ibc_handler()
.eth_call(
GetConnectionCall {
connection_id: path.connection_id.to_string(),
},
c.beacon_api_client
.execution_height(beacon_api::client::BlockId::Slot(
height.revision_height,
))
.await
.unwrap(),
)
.await
.unwrap()
.unwrap()
.try_into()
.unwrap(),
)),
)),
EthereumFetch::FetchChannel(channel) => fetch_channel(c, channel).await,
EthereumFetch::FetchConnection(connection) => fetch_connection(c, connection).await,
}
}
}

pub async fn fetch_beacon_block_range<C, Hc>(
pub(crate) async fn fetch_get_logs<Hc>(
c: &Hc,
FetchGetLogs { from_slot, to_slot }: FetchGetLogs,
revision_number: u64,
) -> QueueMsg<BlockMessageTypes>
where
Hc: EthereumChainExt<Height = Height>,
// TODO: Replace with associated type bounds once stable
Hc::Aggregate: From<AggregateWithChannel<Hc>>,
Hc::Aggregate: From<AggregateWithConnection<Hc>>,
Hc::Fetch: From<FetchChannel<Hc>>,
Hc::Fetch: From<FetchConnection<Hc>>,

AnyChainIdentified<AnyAggregate>: From<Identified<Hc, Aggregate<Hc>>>,
AnyChainIdentified<AnyFetch>: From<Identified<Hc, Fetch<Hc>>>,
AnyChainIdentified<AnyData>: From<Identified<Hc, Data<Hc>>>,
{
let event_height = Height {
revision_number,
revision_height: to_slot,
};

let from_block = c.execution_height_of_beacon_slot(from_slot).await;
let to_block = c.execution_height_of_beacon_slot(to_slot).await;

// REVIEW: Surely transactions and events can be fetched in parallel?
conc(
futures::stream::iter(
c.provider()
.get_logs(
&Filter::new()
.address(ethers::types::H160::from(c.ibc_handler_address()))
.from_block(from_block)
// NOTE: This -1 is very important, else events will be double fetched
.to_block(to_block - 1),
)
.await
.unwrap(),
)
.filter_map(|log| async {
let tx_hash = log
.transaction_hash
.expect("log should have transaction_hash")
.into();

tracing::debug!(?log, "raw log");

match IBCHandlerEvents::decode_log(&log.into()) {
Ok(event) => Some(mk_aggregate_event(c, event, event_height, tx_hash).await),
Err(e) => {
tracing::warn!("could not decode evm event {}", e);
None
}
}
})
.collect::<Vec<_>>()
.await,
)
}

pub(crate) async fn fetch_beacon_block_range<C, Hc>(
c: &Hc,
FetchBeaconBlockRange { from_slot, to_slot }: FetchBeaconBlockRange,
beacon_api_client: &BeaconApiClient<C>,
Expand Down Expand Up @@ -278,6 +241,67 @@ where
}
}

pub(crate) async fn fetch_channel<Hc>(
c: &Hc,
FetchChannel { height, path }: FetchChannel<Hc>,
) -> QueueMsg<BlockMessageTypes>
where
Hc: EthereumChainExt,
Hc::Data: From<ChannelData<Hc>>,
AnyChainIdentified<AnyData>: From<Identified<Hc, Data<Hc>>>,
{
data(id(
c.chain_id(),
Data::<Hc>::specific(ChannelData {
channel: c
.ibc_handler()
.eth_call(
GetChannelCall {
port_id: path.port_id.to_string(),
channel_id: path.channel_id.to_string(),
},
c.execution_height_of_beacon_slot(height.revision_height())
.await,
)
.await
.unwrap()
.unwrap()
.try_into()
.unwrap(),
__marker: PhantomData,
}),
))
}

pub(crate) async fn fetch_connection<Hc>(
c: &Hc,
FetchConnection { height, path }: FetchConnection<Hc>,
) -> QueueMsg<BlockMessageTypes>
where
Hc: EthereumChainExt,
Hc::Data: From<ConnectionData<Hc>>,
AnyChainIdentified<AnyData>: From<Identified<Hc, Data<Hc>>>,
{
data(id(
c.chain_id(),
Data::<Hc>::specific(ConnectionData(
c.ibc_handler()
.eth_call(
GetConnectionCall {
connection_id: path.connection_id.to_string(),
},
c.execution_height_of_beacon_slot(height.revision_height())
.await,
)
.await
.unwrap()
.unwrap()
.try_into()
.unwrap(),
)),
))
}

pub async fn mk_aggregate_event<Hc>(
c: &Hc,
event: IBCHandlerEvents,
Expand Down Expand Up @@ -557,7 +581,7 @@ where
}

#[queue_msg]
#[derive(enumorph::Enumorph)]
#[derive(Enumorph)]
pub enum EthereumFetch<C: ChainSpec> {
FetchEvents(FetchEvents<C>),
FetchGetLogs(FetchGetLogs),
Expand Down Expand Up @@ -600,7 +624,7 @@ pub struct FetchConnection<Hc: EthereumChainExt> {
}

#[queue_msg]
#[derive(enumorph::Enumorph)]
#[derive(Enumorph)]
pub enum EthereumAggregate<C: ChainSpec> {
AggregateWithChannel(AggregateWithChannel<Ethereum<C>>),
AggregateWithConnection(AggregateWithConnection<Ethereum<C>>),
Expand Down Expand Up @@ -629,7 +653,7 @@ where
}

#[queue_msg]
#[derive(enumorph::Enumorph)]
#[derive(Enumorph)]
pub enum AggregateWithChannel<Hc: ChainExt + EthereumChain> {
PacketAcknowledgement(EventInfo<Hc, AcknowledgePacketFilter>),
SendPacket(EventInfo<Hc, SendPacketFilter>),
Expand All @@ -641,7 +665,7 @@ pub enum AggregateWithChannel<Hc: ChainExt + EthereumChain> {
}

#[queue_msg]
#[derive(enumorph::Enumorph)]
#[derive(Enumorph)]
pub enum AggregateWithConnection<Hc: ChainExt + EthereumChain> {
ConnectionOpenInit(EventInfo<Hc, ConnectionOpenInitFilter>),
ConnectionOpenTry(EventInfo<Hc, ConnectionOpenTryFilter>),
Expand Down Expand Up @@ -941,7 +965,7 @@ where
}

#[queue_msg]
#[derive(enumorph::Enumorph)]
#[derive(Enumorph)]
pub enum EthereumData<C: ChainSpec> {
Channel(ChannelData<Ethereum<C>>),
Connection(ConnectionData<Ethereum<C>>),
Expand Down
Loading

0 comments on commit 9e7ff3e

Please sign in to comment.