Skip to content

Commit

Permalink
fix(hubble): decrease eth chunk size when approaching head
Browse files Browse the repository at this point in the history
  • Loading branch information
KaiserKarel committed May 2, 2024
1 parent 0c7029f commit 9438e4e
Showing 1 changed file with 119 additions and 56 deletions.
175 changes: 119 additions & 56 deletions hubble/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use ethers::{
};
use futures::{
stream::{self, FuturesOrdered},
TryStreamExt,
StreamExt, TryFutureExt, TryStreamExt,
};
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, Postgres};
Expand Down Expand Up @@ -88,6 +88,7 @@ impl Config {
})
}
}

impl Indexer {
/// Spawns two long running tasks, one which continuously indexes from current to head, and one which indexes from current-20 to current.
/// The second routine handles fixing up block reorgs.
Expand Down Expand Up @@ -120,71 +121,132 @@ impl Indexer {
}
}

#[derive(Debug, thiserror::Error)]
enum IndexBlockError {
#[error("something went wrong, but can be resolved by retrying: {err:?}")]
Retryable { height: u64, err: FromProviderError },
#[error("unknown error: {0}")]
Other(#[from] Report),
}

impl From<sqlx::Error> for IndexBlockError {
fn from(err: sqlx::Error) -> Self {
IndexBlockError::Other(Report::from(err))
}
}

async fn index_blocks(
pool: PgPool,
range: Range<u64>,
chain_id: ChainId,
provider: Provider<Http>,
) -> Result<(), Report> {
futures::stream::iter(range.into_iter().map(Ok::<u64, Report>))
.try_chunks(200)
.map_err(|err| err.1)
.try_fold(pool, |pool, chunk| async {
let err =
match index_blocks_by_chunk(pool.clone(), range.clone(), chain_id, provider.clone(), 200)
.await
{
Ok(()) => return Ok(()),
Err(err) => err,
};

match err {
IndexBlockError::Retryable { height, err } => {
// This most likely indicates we caught up indexing with the node. We now switch to
// single block mode.
if matches!(err, FromProviderError::BlockNotFound) {
index_blocks_by_chunk(pool, height..range.end, chain_id, provider, 1).await?;
}
}
err => return Err(err.into()),
}
Ok(())
}

async fn index_blocks_by_chunk(
pool: PgPool,
range: Range<u64>,
chain_id: ChainId,
provider: Provider<Http>,
chunk_size: usize,
) -> Result<(), IndexBlockError> {
let mut chunks = futures::stream::iter(range.into_iter()).chunks(chunk_size);

while let Some(chunk) = chunks.next().await {
if chunk.len() > 1 {
info!(
chain_id.canonical,
"indexing blocks for chunk: {}..{}",
&chunk.first().unwrap(),
&chunk.last().unwrap()
chunk.first().unwrap(),
chunk.last().unwrap()
);
let tx = pool.begin().await.map_err(Report::from)?;

let inserts = FuturesOrdered::from_iter(
chunk
.into_iter()
.enumerate()
// At some point we reach the state where i = 0 block hasn't been created yet, meaning it
// takes ±12s to become available. i = 1 -> ±24s etc.
.map(|(i, height)| {
// The retry wait is 1s in `from_provider_retried`.
let max_retries = i * 20;
BlockInsert::from_provider_retried(chain_id, height, &provider, max_retries)
}),
} else {
info!(
chain_id.canonical,
"indexing block {}",
chunk.first().unwrap(),
);
}

let tx = inserts
.try_fold(tx, |mut tx, (_, block)| async move {
match block.execute(&mut tx).await {
Err(err) => {
tx.rollback().await?;
return Err(err);
}
Ok(info) => {
debug!(
chain_id.canonical,
height = info.height,
hash = info.hash,
num_transactions = info.num_tx,
num_events = info.num_events,
"indexed block"
);
metrics::BLOCK_COLLECTOR
.with_label_values(&[chain_id.canonical])
.inc();
metrics::TRANSACTION_COLLECTOR
.with_label_values(&[chain_id.canonical])
.inc_by(info.num_tx as u64);
metrics::EVENT_COLLECTOR
.with_label_values(&[chain_id.canonical])
.inc_by(info.num_events as u64);
}
}
Ok(tx)
})
.await?;
tx.commit().await?;
Ok(pool)
})
.await?;
let mut tx = pool.begin().await.map_err(Report::from)?;

let mut inserts = FuturesOrdered::from_iter(chunk.into_iter().map(|height| {
// Hack workaround because partial move for async blocks isn't possible.
async fn from_provider_retried(
chain_id: ChainId,
height: u64,
provider: &Provider<Http>,
max_retries: usize,
) -> (u64, Result<(usize, BlockInsert), FromProviderError>) {
(
height,
BlockInsert::from_provider_retried(chain_id, height, provider, max_retries)
.await,
)
}
from_provider_retried(chain_id, height, &provider, 60)
}));

while let Some((height, block)) = inserts.next().await {
let (_tries, block) = match block {
Err(FromProviderError::Other(err)) => {
tx.commit().await?;
return Err(IndexBlockError::Other(err));
}
Err(err) => {
tx.commit().await?;
return Err(IndexBlockError::Retryable { height, err });
}
Ok(block) => block,
};

match block.execute(&mut tx).await {
Err(err) => {
tx.rollback().await?;
return Err(err.into());
}
Ok(info) => {
debug!(
chain_id.canonical,
height = info.height,
hash = info.hash,
num_transactions = info.num_tx,
num_events = info.num_events,
"indexed block"
);
metrics::BLOCK_COLLECTOR
.with_label_values(&[chain_id.canonical])
.inc();
metrics::TRANSACTION_COLLECTOR
.with_label_values(&[chain_id.canonical])
.inc_by(info.num_tx as u64);
metrics::EVENT_COLLECTOR
.with_label_values(&[chain_id.canonical])
.inc_by(info.num_events as u64);
}
}
}
tx.commit().await?;
}
Ok(())
}

Expand All @@ -210,6 +272,7 @@ async fn reindex_blocks(
let chunk = (current - 20)..current;
let inserts = FuturesOrdered::from_iter(chunk.into_iter().map(|height| {
BlockInsert::from_provider_retried(chain_id, height as u64, &provider, 1000)
.map_err(Report::from)
}));
inserts
.try_fold(tx, |mut tx, (_, block)| async move {
Expand Down Expand Up @@ -302,14 +365,14 @@ impl BlockInsert {
height: u64,
provider: &Provider<Http>,
max_retries: usize,
) -> Result<(usize, Self), Report> {
) -> Result<(usize, Self), FromProviderError> {
let mut count = 0;
loop {
match Self::from_provider(chain_id, height, provider).await {
Ok(block) => return Ok((count, block)),
Err(err) => {
if !err.retryable() || count > max_retries {
return Err(err.into());
return Err(err);
}
count += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
Expand Down

0 comments on commit 9438e4e

Please sign in to comment.