Skip to content

Commit

Permalink
chain/ethreum: Refactor fetch_receipts_with_retry
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Apr 17, 2024
1 parent 9dc2d4f commit 4fece3a
Showing 1 changed file with 36 additions and 26 deletions.
62 changes: 36 additions & 26 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ impl EthereumAdapter {
) -> Self {
let web3 = Arc::new(Web3::new(transport));


// Check if the chain supports `getBlockReceipts` method.
let supports_block_receipts = web3
.eth()
Expand Down Expand Up @@ -2146,40 +2145,51 @@ async fn fetch_transaction_receipts_in_batch(
Ok(collected)
}

// Fetches transaction receipts with retries. This function acts as a dispatcher
// based on whether block receipts are supported or individual transaction receipts
// need to be fetched.
async fn fetch_receipts_with_retry(
web3: Arc<Web3<Transport>>,
hashes: Vec<H256>,
block_hash: H256,
logger: Logger,
supports_block_receipts: bool,
) -> Result<Vec<Arc<TransactionReceipt>>, IngestorError> {
let receipts_future = if supports_block_receipts {
fetch_block_receipts_with_retry(web3, block_hash, logger).await
} else {
let receipts_future = if ENV_VARS.fetch_receipts_in_batches {
// Deprecated batching retrieval of transaction receipts.
fetch_transaction_receipts_in_batch_with_retry(web3, hashes, block_hash, logger).await
} else {
let hash_stream = graph::tokio_stream::iter(hashes);
let receipt_stream = graph::tokio_stream::StreamExt::map(hash_stream, move |tx_hash| {
fetch_transaction_receipt_with_retry(
web3.cheap_clone(),
tx_hash,
block_hash,
logger.cheap_clone(),
)
})
.buffered(ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls);
graph::tokio_stream::StreamExt::collect::<
Result<Vec<Arc<TransactionReceipt>>, IngestorError>,
>(receipt_stream)
.await
};
if supports_block_receipts {
return fetch_block_receipts_with_retry(web3, block_hash, logger).await;
}
fetch_individual_receipts_with_retry(web3, hashes, block_hash, logger).await
}

receipts_future
};
// Fetches receipts for each transaction in the block individually.
async fn fetch_individual_receipts_with_retry(
web3: Arc<Web3<Transport>>,
hashes: Vec<H256>,
block_hash: H256,
logger: Logger,
) -> Result<Vec<Arc<TransactionReceipt>>, IngestorError> {
if ENV_VARS.fetch_receipts_in_batches {
return fetch_transaction_receipts_in_batch_with_retry(web3, hashes, block_hash, logger)
.await;
}

receipts_future
// Use a stream to fetch receipts individually
let hash_stream = graph::tokio_stream::iter(hashes);
let receipt_stream = hash_stream
.map(move |tx_hash| {
fetch_transaction_receipt_with_retry(
web3.cheap_clone(),
tx_hash,
block_hash,
logger.cheap_clone(),
)
})
.buffered(ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls);

graph::tokio_stream::StreamExt::collect::<Result<Vec<Arc<TransactionReceipt>>, IngestorError>>(
receipt_stream,
)
.await
}

/// Fetches transaction receipts of all transactions in a block with `eth_getBlockReceipts` call.
Expand Down

0 comments on commit 4fece3a

Please sign in to comment.