From 12376aa1a03f14f21fc702592ddbab7c636c8937 Mon Sep 17 00:00:00 2001 From: Hussein Ait Lahcen Date: Thu, 31 Aug 2023 19:52:58 +0200 Subject: [PATCH] fix(voyager): use beacon block for packet stream interface --- voyager/src/chain/evm.rs | 108 +++++++++++++++++++++++++-------------- 1 file changed, 69 insertions(+), 39 deletions(-) diff --git a/voyager/src/chain/evm.rs b/voyager/src/chain/evm.rs index 95e017222b..37233e354e 100644 --- a/voyager/src/chain/evm.rs +++ b/voyager/src/chain/evm.rs @@ -27,13 +27,13 @@ use contracts::{ use ethers::{ abi::{AbiEncode, RawLog, Tokenizable}, contract::EthCall, - prelude::{decode_logs, k256::ecdsa, parse_log, EthLogDecode, LogMeta, SignerMiddleware}, + prelude::{decode_logs, k256::ecdsa, parse_log, EthLogDecode, SignerMiddleware}, providers::{Middleware, Provider, Ws}, signers::{LocalWallet, Wallet}, types::U256, utils::{keccak256, secret_key_to_address}, }; -use futures::{Future, Stream, StreamExt}; +use futures::{stream::unfold, Future, Stream, StreamExt}; use prost::Message; use protos::union::ibc::lightclients::ethereum::v1 as ethereum_v1; use typenum::Unsigned; @@ -509,45 +509,75 @@ impl Chain for Evm { &self, ) -> impl Future + '_> + '_ { async move { - self.provider - .subscribe_logs(&self.ibc_handler.event::().filter) - .await - .unwrap() - .then(move |log| async move { - let meta = LogMeta::from(&log); - let event: SendPacketFilter = parse_log(log).unwrap(); - - // TODO: Would be nice if this info was passed through in the SendPacket event - let (channel_data, is_found): ( - contracts::ibc_handler::IbcCoreChannelV1ChannelData, - bool, - ) = self - .ibc_handler - .get_channel(event.source_port.clone(), event.source_channel.clone()) - .await - .unwrap(); - - assert!( - is_found, - "channel not found for port_id {port}, channel_id {channel}", - port = event.source_port, - channel = event.source_channel + unfold( + self.query_latest_height().await, + move |previous_beacon_height| async move { + let current_beacon_height = loop { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + let current_beacon_height = self.query_latest_height().await; + if current_beacon_height > previous_beacon_height { + break current_beacon_height; + } + }; + tracing::debug!( + previous_beacon_height = previous_beacon_height.revision_height, + current_beacon_height = current_beacon_height.revision_height ); - - ( - self.make_height(meta.block_number.0[0]), - Packet { - sequence: event.sequence, - source_port: event.source_port, - source_channel: event.source_channel, - destination_port: channel_data.counterparty.port_id, - destination_channel: channel_data.counterparty.channel_id, - data: event.data.to_vec(), - timeout_height: event.timeout_height.into(), - timeout_timestamp: event.timeout_timestamp, - }, + let previous_execution_height = + self.execution_height(previous_beacon_height).await; + let current_execution_height = + self.execution_height(current_beacon_height).await; + let packets = futures::stream::iter( + self.provider + .get_logs( + &self + .ibc_handler + .event::() + .filter + .from_block(previous_execution_height.revision_height) + .to_block(current_execution_height.revision_height - 1), + ) + .await + .unwrap(), ) - }) + .then(move |log| async move { + let event: SendPacketFilter = parse_log(log).unwrap(); + + // TODO: Would be nice if this info was passed through in the SendPacket event + let (channel_data, is_found): ( + contracts::ibc_handler::IbcCoreChannelV1ChannelData, + bool, + ) = self + .ibc_handler + .get_channel(event.source_port.clone(), event.source_channel.clone()) + .await + .unwrap(); + + assert!( + is_found, + "channel not found for port_id {port}, channel_id {channel}", + port = event.source_port, + channel = event.source_channel + ); + + ( + self.make_height(current_beacon_height.revision_height), + Packet { + sequence: event.sequence, + source_port: event.source_port, + source_channel: event.source_channel, + destination_port: channel_data.counterparty.port_id, + destination_channel: channel_data.counterparty.channel_id, + data: event.data.to_vec(), + timeout_height: event.timeout_height.into(), + timeout_timestamp: event.timeout_timestamp, + }, + ) + }); + Some((packets, current_beacon_height)) + }, + ) + .flatten() } } }