diff --git a/hubble/src/eth.rs b/hubble/src/eth.rs index 6a42c0557c..114822dd95 100644 --- a/hubble/src/eth.rs +++ b/hubble/src/eth.rs @@ -7,7 +7,7 @@ use ethers::{ }; use futures::{ stream::{self, FuturesOrdered}, - TryStreamExt, + StreamExt, TryFutureExt, TryStreamExt, }; use serde::{Deserialize, Serialize}; use sqlx::{PgPool, Postgres}; @@ -88,6 +88,7 @@ impl Config { }) } } + impl Indexer { /// Spawns two long running tasks, one which continuously indexes from current to head, and one which indexes from current-20 to current. /// The second routine handles fixing up block reorgs. @@ -120,71 +121,132 @@ impl Indexer { } } +#[derive(Debug, thiserror::Error)] +enum IndexBlockError { + #[error("something went wrong, but can be resolved by retrying: {err:?}")] + Retryable { height: u64, err: FromProviderError }, + #[error("unknown error: {0}")] + Other(#[from] Report), +} + +impl From for IndexBlockError { + fn from(err: sqlx::Error) -> Self { + IndexBlockError::Other(Report::from(err)) + } +} + async fn index_blocks( pool: PgPool, range: Range, chain_id: ChainId, provider: Provider, ) -> Result<(), Report> { - futures::stream::iter(range.into_iter().map(Ok::)) - .try_chunks(200) - .map_err(|err| err.1) - .try_fold(pool, |pool, chunk| async { + let err = + match index_blocks_by_chunk(pool.clone(), range.clone(), chain_id, provider.clone(), 200) + .await + { + Ok(()) => return Ok(()), + Err(err) => err, + }; + + match err { + IndexBlockError::Retryable { height, err } => { + // This most likely indicates we caught up indexing with the node. We now switch to + // single block mode. + if matches!(err, FromProviderError::BlockNotFound) { + index_blocks_by_chunk(pool, height..range.end, chain_id, provider, 1).await?; + } + } + err => return Err(err.into()), + } + Ok(()) +} + +async fn index_blocks_by_chunk( + pool: PgPool, + range: Range, + chain_id: ChainId, + provider: Provider, + chunk_size: usize, +) -> Result<(), IndexBlockError> { + let mut chunks = futures::stream::iter(range.into_iter()).chunks(chunk_size); + + while let Some(chunk) = chunks.next().await { + if chunk.len() > 1 { info!( chain_id.canonical, "indexing blocks for chunk: {}..{}", - &chunk.first().unwrap(), - &chunk.last().unwrap() + chunk.first().unwrap(), + chunk.last().unwrap() ); - let tx = pool.begin().await.map_err(Report::from)?; - - let inserts = FuturesOrdered::from_iter( - chunk - .into_iter() - .enumerate() - // At some point we reach the state where i = 0 block hasn't been created yet, meaning it - // takes ±12s to become available. i = 1 -> ±24s etc. - .map(|(i, height)| { - // The retry wait is 1s in `from_provider_retried`. - let max_retries = i * 20; - BlockInsert::from_provider_retried(chain_id, height, &provider, max_retries) - }), + } else { + info!( + chain_id.canonical, + "indexing block {}", + chunk.first().unwrap(), ); + } - let tx = inserts - .try_fold(tx, |mut tx, (_, block)| async move { - match block.execute(&mut tx).await { - Err(err) => { - tx.rollback().await?; - return Err(err); - } - Ok(info) => { - debug!( - chain_id.canonical, - height = info.height, - hash = info.hash, - num_transactions = info.num_tx, - num_events = info.num_events, - "indexed block" - ); - metrics::BLOCK_COLLECTOR - .with_label_values(&[chain_id.canonical]) - .inc(); - metrics::TRANSACTION_COLLECTOR - .with_label_values(&[chain_id.canonical]) - .inc_by(info.num_tx as u64); - metrics::EVENT_COLLECTOR - .with_label_values(&[chain_id.canonical]) - .inc_by(info.num_events as u64); - } - } - Ok(tx) - }) - .await?; - tx.commit().await?; - Ok(pool) - }) - .await?; + let mut tx = pool.begin().await.map_err(Report::from)?; + + let mut inserts = FuturesOrdered::from_iter(chunk.into_iter().map(|height| { + // Hack workaround because partial move for async blocks isn't possible. + async fn from_provider_retried( + chain_id: ChainId, + height: u64, + provider: &Provider, + max_retries: usize, + ) -> (u64, Result<(usize, BlockInsert), FromProviderError>) { + ( + height, + BlockInsert::from_provider_retried(chain_id, height, provider, max_retries) + .await, + ) + } + from_provider_retried(chain_id, height, &provider, 60) + })); + + while let Some((height, block)) = inserts.next().await { + let (_tries, block) = match block { + Err(FromProviderError::Other(err)) => { + tx.commit().await?; + return Err(IndexBlockError::Other(err)); + } + Err(err) => { + tx.commit().await?; + return Err(IndexBlockError::Retryable { height, err }); + } + Ok(block) => block, + }; + + match block.execute(&mut tx).await { + Err(err) => { + tx.rollback().await?; + return Err(err.into()); + } + Ok(info) => { + debug!( + chain_id.canonical, + height = info.height, + hash = info.hash, + num_transactions = info.num_tx, + num_events = info.num_events, + "indexed block" + ); + metrics::BLOCK_COLLECTOR + .with_label_values(&[chain_id.canonical]) + .inc(); + metrics::TRANSACTION_COLLECTOR + .with_label_values(&[chain_id.canonical]) + .inc_by(info.num_tx as u64); + metrics::EVENT_COLLECTOR + .with_label_values(&[chain_id.canonical]) + .inc_by(info.num_events as u64); + } + } + } + tx.commit().await?; + } Ok(()) } @@ -210,6 +272,7 @@ async fn reindex_blocks( let chunk = (current - 20)..current; let inserts = FuturesOrdered::from_iter(chunk.into_iter().map(|height| { BlockInsert::from_provider_retried(chain_id, height as u64, &provider, 1000) + .map_err(Report::from) })); inserts .try_fold(tx, |mut tx, (_, block)| async move { @@ -302,14 +365,14 @@ impl BlockInsert { height: u64, provider: &Provider, max_retries: usize, - ) -> Result<(usize, Self), Report> { + ) -> Result<(usize, Self), FromProviderError> { let mut count = 0; loop { match Self::from_provider(chain_id, height, provider).await { Ok(block) => return Ok((count, block)), Err(err) => { if !err.retryable() || count > max_retries { - return Err(err.into()); + return Err(err); } count += 1; tokio::time::sleep(Duration::from_secs(1)).await;