Skip to content

Commit

Permalink
feat(voyager): consume messages concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
benluelo committed Oct 15, 2023
1 parent ce690af commit 7b64e37
Show file tree
Hide file tree
Showing 15 changed files with 660 additions and 544 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/chain-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ hubble.workspace = true
reqwest = "0.11.20"
serde_json = "1.0.107"
thiserror = "1.0.49"
crossbeam-queue = "0.3.8"
146 changes: 79 additions & 67 deletions lib/chain-utils/src/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use unionlabs::{
};

use crate::{
chain_client_id, private_key::PrivateKey, Chain, ChainEvent, ClientState, EventSource,
chain_client_id, private_key::PrivateKey, Chain, ChainEvent, ClientState, EventSource, Pool,
};

pub type CometblsMiddleware = SignerMiddleware<Provider<Ws>, Wallet<ecdsa::SigningKey>>;
Expand All @@ -55,8 +55,10 @@ pub type CometblsMiddleware = SignerMiddleware<Provider<Ws>, Wallet<ecdsa::Signi
pub struct Evm<C: ChainSpec> {
pub chain_id: U256,

pub wallet: LocalWallet,
pub ibc_handler: IBCHandler<CometblsMiddleware>,
pub readonly_ibc_handler: IBCHandler<Provider<Ws>>,

// pub wallet: LocalWallet,
pub ibc_handlers: Pool<IBCHandler<CometblsMiddleware>>,
pub provider: Provider<Ws>,
pub beacon_api_client: BeaconApiClient<C>,

Expand All @@ -69,7 +71,7 @@ pub struct Config {
pub ibc_handler_address: Address,

/// The signer that will be used to submit transactions by voyager.
pub signer: PrivateKey<ecdsa::SigningKey>,
pub signers: Vec<PrivateKey<ecdsa::SigningKey>>,

/// The RPC endpoint for the execution chain.
pub eth_rpc_api: String,
Expand Down Expand Up @@ -250,7 +252,7 @@ impl<C: ChainSpec> Chain for Evm<C> {
) -> impl Future<Output = Vec<u8>> + '_ {
async move {
let filter = self
.ibc_handler
.readonly_ibc_handler
.write_acknowledgement_filter()
.filter
.at_block_hash(block_hash);
Expand Down Expand Up @@ -293,19 +295,31 @@ impl<C: ChainSpec> Evm<C> {

let chain_id = provider.get_chainid().await?;

let signing_key: ecdsa::SigningKey = config.signer.value();
let address = secret_key_to_address(&signing_key);
let ibc_handlers = config.signers.into_iter().map(|signer| {
let signing_key: ecdsa::SigningKey = signer.value();
let address = secret_key_to_address(&signing_key);

let wallet = LocalWallet::new_with_signer(signing_key, address, chain_id.as_u64());

let wallet = LocalWallet::new_with_signer(signing_key, address, chain_id.as_u64());
let signer_middleware =
Arc::new(SignerMiddleware::new(provider.clone(), wallet.clone()));

let signer_middleware = Arc::new(SignerMiddleware::new(provider.clone(), wallet.clone()));
IBCHandler::new(
config.ibc_handler_address.clone(),
signer_middleware.clone(),
)
});

Ok(Self {
chain_id: U256(chain_id),
ibc_handler: IBCHandler::new(config.ibc_handler_address, signer_middleware.clone()),
ibc_handlers: Pool::new(ibc_handlers),
readonly_ibc_handler: IBCHandler::new(
config.ibc_handler_address,
provider.clone().into(),
),
provider,
beacon_api_client: BeaconApiClient::new(config.eth_beacon_rpc_api).await,
wallet,
// wallet,
hasura_client: config.hasura_config.map(|hasura_config| {
HasuraDataStore::new(
reqwest::Client::new(),
Expand Down Expand Up @@ -355,13 +369,13 @@ impl<C: ChainSpec> Evm<C> {
at_execution_height: u64,
) -> Result<
Option<<<Call as EthCallExt>::Return as TupleToOption>::Inner>,
ContractError<CometblsMiddleware>,
ContractError<Provider<Ws>>,
>
where
Call: EthCallExt + 'static,
Call::Return: TupleToOption,
{
self.ibc_handler
self.readonly_ibc_handler
.method_hash::<Call, Call::Return>(Call::selector(), call)
.expect("valid contract selector")
.block(at_execution_height)
Expand All @@ -381,7 +395,7 @@ chain_client_id! {

#[derive(Debug)]
pub enum EvmEventSourceError {
Contract(ContractError<CometblsMiddleware>),
Contract(ContractError<Provider<Ws>>),
ChannelNotFound {
port_id: String,
channel_id: String,
Expand All @@ -408,10 +422,7 @@ impl<C: ChainSpec> EventSource for Evm<C> {
// TODO: Make this the height to start from
type Seed = ();

fn events(
&self,
_seed: Self::Seed,
) -> impl Stream<Item = Result<Self::Event, Self::Error>> + '_ {
fn events(self, _seed: Self::Seed) -> impl Stream<Item = Result<Self::Event, Self::Error>> {
async move {
let genesis_time = self
.beacon_api_client
Expand All @@ -421,15 +432,17 @@ impl<C: ChainSpec> EventSource for Evm<C> {
.data
.genesis_time;

let latest_height = self.query_latest_height().await;

stream::unfold(
self.query_latest_height().await,
move |previous_beacon_height| async move {
(self, latest_height),
move |(this, previous_beacon_height)| async move {
tracing::info!("fetching events");

let current_beacon_height = loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let current_beacon_height = self.query_latest_height().await;
let current_beacon_height = this.query_latest_height().await;

tracing::debug!(%current_beacon_height, %previous_beacon_height);

Expand All @@ -442,15 +455,15 @@ impl<C: ChainSpec> EventSource for Evm<C> {
current_beacon_height = current_beacon_height.revision_height
);
let previous_execution_height =
self.execution_height(previous_beacon_height).await;
this.execution_height(previous_beacon_height).await;
let current_execution_height =
self.execution_height(current_beacon_height).await;
this.execution_height(current_beacon_height).await;

let packets = futures::stream::iter(
self.provider
this.provider
.get_logs(
&self
.ibc_handler
&this
.readonly_ibc_handler
.events()
.filter
.from_block(previous_execution_height)
Expand All @@ -459,12 +472,12 @@ impl<C: ChainSpec> EventSource for Evm<C> {
.await
.unwrap(),
)
.then(move |log| async move {
.then(|log| async {
// dbg!(&log);

let block_hash = log.block_hash.expect("log should have block_hash");

// let event_height = self.make_height(
// let event_height = this.make_height(
// log.block_number.expect("log should have block_number").0[0],
// );
let event_height =
Expand All @@ -475,12 +488,12 @@ impl<C: ChainSpec> EventSource for Evm<C> {
Ok(x) => x,
Err(e) => {
tracing::warn!("Failed to decode ibc handler event, we may need to regenerate them: {:?}", e);
return Ok(None)
return Ok::<_, EvmEventSourceError>(None::<ChainEvent<Evm<C>>>)
}
};

let read_channel = |port_id: String, channel_id: String| async move {
self.read_ibc_state(
let read_channel = |port_id: String, channel_id: String| async {
this.read_ibc_state(
GetChannelCall {
port_id: port_id.clone(),
channel_id: channel_id.clone(),
Expand All @@ -497,8 +510,8 @@ impl<C: ChainSpec> EventSource for Evm<C> {
.map_err(EvmEventSourceError::ChannelConversion)
};

let read_connection = |connection_id: String| async move {
self.read_ibc_state(
let read_connection = |connection_id: String| async {
this.read_ibc_state(
GetConnectionCall {
connection_id: connection_id.clone(),
},
Expand Down Expand Up @@ -664,7 +677,7 @@ impl<C: ChainSpec> EventSource for Evm<C> {
<Self as Chain>::ClientId,
String,
EmptyString,
> = self
> = this
.read_ibc_state(
GetConnectionCall {
connection_id: event.connection_id.clone(),
Expand Down Expand Up @@ -710,14 +723,14 @@ impl<C: ChainSpec> EventSource for Evm<C> {
}))
}
IBCHandlerEvents::GeneratedClientIdentifierFilter(event) => {
let client_type = self
.ibc_handler
let client_type = this
.readonly_ibc_handler
.client_types(event.0.clone())
.await
.map_err(EvmEventSourceError::Contract)?;

let (client_state, success) = self
.ibc_handler
let (client_state, success) = this
.readonly_ibc_handler
.get_client_state(event.0.clone())
.await
.unwrap();
Expand Down Expand Up @@ -806,48 +819,47 @@ impl<C: ChainSpec> EventSource for Evm<C> {
};

Ok(event.map(|event| {
ChainEvent {
ChainEvent::<Evm<C>> {
// TODO: Cache
chain_id: self.chain_id(),
chain_id: this.chain_id(),
block_hash: block_hash.into(),
height: current_beacon_height,
event,
}
}))
})
.filter_map(|x| async move { x.transpose() });
.filter_map(|x| async { x.transpose() })
.then(|event: Result<ChainEvent<Evm<C>>, EvmEventSourceError>| async {
if let Ok(ref event) = event {
let current_slot = event.height.revision_height;

let next_epoch_ts = next_epoch_timestamp::<C>(current_slot, genesis_time);

if let Some(hc) = &this.hasura_client {
hc
.do_post::<InsertDemoTx>(insert_demo_tx::Variables {
data: serde_json::json! {{
"latest_execution_block_hash": event.block_hash,
"timestamp": next_epoch_ts,
}},
})
.await
.unwrap();
}
}

// pass it back through
event
});

Some((packets, current_beacon_height))
let iter = futures::stream::iter(packets.collect::<Vec<_>>().await);

Some((iter, (this, current_beacon_height)))
},
)
.flatten()
.then(move |event| async move {
if let Ok(ref event) = event {
let current_slot = event.height.revision_height;

let next_epoch_ts = next_epoch_timestamp::<C>(current_slot, genesis_time);

if let Some(hasura_config) = &self.hasura_client {
hasura_config
.do_post::<InsertDemoTx>(insert_demo_tx::Variables {
data: serde_json::json! {{
"latest_execution_block_hash": event.block_hash,
"timestamp": next_epoch_ts,
}},
})
.await
.unwrap();
}
}

// pass it back through
event
})
}
.flatten_stream()
.inspect(|_x| {
// dbg!(x);
})
}
}

Expand Down
Loading

0 comments on commit 7b64e37

Please sign in to comment.