Skip to content

Commit

Permalink
fix(hubble): infinite loop
Browse files Browse the repository at this point in the history
  • Loading branch information
cor committed May 31, 2024
1 parent 17ddd5e commit 4d75db9
Showing 1 changed file with 22 additions and 12 deletions.
34 changes: 22 additions & 12 deletions hubble/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl Config {
block.height.max(self.start_height.unwrap_or_default())
}
})
.and(self.start_height)
.unwrap_or_default() as u64;

let range = current..u64::MAX;
Expand Down Expand Up @@ -220,7 +221,7 @@ async fn index_blocks_by_chunk(
chain_id: ChainId,
height: u64,
provider: &Provider<Http>,
) -> (u64, Result<(usize, BlockInsert), FromProviderError>) {
) -> (u64, Result<BlockInsert, FromProviderError>) {
(
height,
BlockInsert::from_provider_retried(chain_id, height, provider).await,
Expand All @@ -230,7 +231,7 @@ async fn index_blocks_by_chunk(
}));

while let Some((height, block)) = inserts.next().await {
let (_tries, block) = match block {
let block = match block {
Err(FromProviderError::Other(err)) => {
tx.commit().await?;
return Err(IndexBlockError::Other(err));
Expand Down Expand Up @@ -298,7 +299,7 @@ async fn reindex_blocks(
.map_err(Report::from)
}));
inserts
.try_fold(tx, |mut tx, (_, block)| async move {
.try_fold(tx, |mut tx, block| async move {
let log = PgLog {
chain_id: block.chain_id,
block_hash: block.hash.clone(),
Expand Down Expand Up @@ -339,6 +340,7 @@ pub struct InsertInfo {
num_events: i32,
}

#[must_use]
pub struct BlockInsert {
chain_id: ChainId,
hash: String,
Expand Down Expand Up @@ -381,16 +383,24 @@ impl BlockInsert {
chain_id: ChainId,
height: u64,
provider: &Provider<Http>,
) -> Result<(usize, Self), FromProviderError> {
loop {
debug!(chain_id=chain_id.canonical, height, "fetching block from provider");
(|| {
Self::from_provider(chain_id, height, provider)
.inspect_err(|e| debug!(?e, chain_id=chain_id.canonical, height, "error fetching block from provider"))
) -> Result<Self, FromProviderError> {
debug!(
chain_id = chain_id.canonical,
height, "fetching block from provider"
);
(|| {
Self::from_provider(chain_id, height, provider).inspect_err(|e| {
debug!(
?e,
chain_id = chain_id.canonical,
height,
"error fetching block from provider"
)
})
.retry(&crate::expo_backoff())
.await?;
}
})
.retry(&crate::expo_backoff())
.await
.map_err(Into::into)
}

async fn from_provider(
Expand Down

0 comments on commit 4d75db9

Please sign in to comment.