Skip to content

Commit

Permalink
feat(task): update task crate to alloy
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Sep 20, 2024
1 parent 5b47eb2 commit 5caf9f4
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/task/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ publish = false
rundler-provider = { path = "../provider" }
rundler-utils = { path = "../utils" }

alloy-primitives.workspace = true

anyhow.workspace = true
async-trait.workspace = true
futures.workspace = true
ethers.workspace = true
pin-project.workspace = true
metrics.workspace = true
tokio.workspace = true
Expand Down
22 changes: 9 additions & 13 deletions crates/task/src/block_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

use std::time::Duration;

use ethers::types::{Block, BlockNumber, H256};
use rundler_provider::Provider;
use alloy_primitives::B256;
use rundler_provider::{Block, BlockId, EvmProvider};
use rundler_utils::retry::{self, UnlimitedRetryOpts};
use tokio::time;
use tracing::error;
Expand All @@ -26,27 +26,23 @@ use tracing::error;
/// This function polls the provider for the latest block until a new block is discovered, with
/// unlimited retries.
pub async fn wait_for_new_block(
provider: &impl Provider,
last_block_hash: H256,
provider: &impl EvmProvider,
last_block_hash: B256,
poll_interval: Duration,
) -> (H256, Block<H256>) {
) -> (B256, Block) {
loop {
let block = retry::with_unlimited_retries(
"watch latest block",
|| provider.get_block(BlockNumber::Latest),
|| provider.get_block(BlockId::latest()),
UnlimitedRetryOpts::default(),
)
.await;
let Some(block) = block else {
error!("Latest block should be present when waiting for new block.");
continue;
};
let Some(hash) = block.hash else {
error!("Latest block should have hash.");
continue;
};
if last_block_hash != hash {
return (hash, block);
if last_block_hash != block.header.hash {
return (block.header.hash, block);
}
time::sleep(poll_interval).await;
}
Expand All @@ -57,7 +53,7 @@ pub async fn wait_for_new_block(
/// This function polls the provider for the latest block number until a new block number is discovered,
/// with unlimited retries.
pub async fn wait_for_new_block_number(
provider: &impl Provider,
provider: &impl EvmProvider,
last_block_number: u64,
poll_interval: Duration,
) -> u64 {
Expand Down
37 changes: 24 additions & 13 deletions crates/task/src/grpc/protos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

//! Protobuf utilities

use ethers::types::{Address, Bytes, H256, U128, U256};
use alloy_primitives::{Address, Bytes, B256, U128, U256};

/// Error type for conversions from protobuf types to Ethers/local types.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -79,26 +79,35 @@ impl FromFixedLengthProtoBytes for U128 {
const LEN: usize = 16;

fn from_fixed_length_bytes(bytes: &[u8]) -> Self {
Self::from_little_endian(bytes)
Self::from_le_slice(bytes)
}
}

impl FromFixedLengthProtoBytes for U256 {
const LEN: usize = 32;

fn from_fixed_length_bytes(bytes: &[u8]) -> Self {
Self::from_little_endian(bytes)
Self::from_le_slice(bytes)
}
}

impl FromFixedLengthProtoBytes for H256 {
impl FromFixedLengthProtoBytes for B256 {
const LEN: usize = 32;

fn from_fixed_length_bytes(bytes: &[u8]) -> Self {
Self::from_slice(bytes)
}
}

impl FromFixedLengthProtoBytes for u128 {
const LEN: usize = 16;

fn from_fixed_length_bytes(bytes: &[u8]) -> Self {
let (int_bytes, _) = bytes.split_at(std::mem::size_of::<u128>());
u128::from_le_bytes(int_bytes.try_into().unwrap())
}
}

/// Trait for a type that can be converted to protobuf bytes.
pub trait ToProtoBytes {
/// Convert to protobuf bytes.
Expand All @@ -107,29 +116,25 @@ pub trait ToProtoBytes {

impl ToProtoBytes for Address {
fn to_proto_bytes(&self) -> Vec<u8> {
self.as_bytes().to_vec()
self.to_vec()
}
}

impl ToProtoBytes for U128 {
fn to_proto_bytes(&self) -> Vec<u8> {
let mut vec = vec![0_u8; 16];
self.to_little_endian(&mut vec);
vec
self.to_le_bytes::<16>().into()
}
}

impl ToProtoBytes for U256 {
fn to_proto_bytes(&self) -> Vec<u8> {
let mut vec = vec![0_u8; 32];
self.to_little_endian(&mut vec);
vec
self.to_le_bytes::<32>().into()
}
}

impl ToProtoBytes for H256 {
impl ToProtoBytes for B256 {
fn to_proto_bytes(&self) -> Vec<u8> {
self.as_bytes().to_vec()
self.to_vec()
}
}

Expand All @@ -138,3 +143,9 @@ impl ToProtoBytes for Bytes {
self.to_vec()
}
}

impl ToProtoBytes for u128 {
fn to_proto_bytes(&self) -> Vec<u8> {
self.to_le_bytes().into()
}
}
3 changes: 3 additions & 0 deletions crates/task/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use tracing::{error, info};
/// Core task trait implemented by top level Rundler tasks.
#[async_trait]
pub trait Task: Sync + Send + 'static {
/// Convert into a boxed task.
fn boxed(self) -> Box<dyn Task>;

/// Run the task.
async fn run(self: Box<Self>, shutdown_token: CancellationToken) -> anyhow::Result<()>;
}
Expand Down

0 comments on commit 5caf9f4

Please sign in to comment.