Skip to content

Commit

Permalink
fix: move parsing back to network thread
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Apr 18, 2023
1 parent d80b1af commit bad1ee6
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 13 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/chainhook-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ tar = "0.4.38"
flume = "0.10.14"
ansi_term = "0.12.1"
atty = "0.2.14"
crossbeam-channel = "0.5.6"
crossbeam-channel = "0.5.8"
uuid = { version = "1.3.0", features = ["v4", "fast-rng"] }
threadpool = "1.8.1"

Expand Down
7 changes: 3 additions & 4 deletions components/chainhook-event-observer/src/hord/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use threadpool::ThreadPool;

use crate::{
indexer::bitcoin::{
download_block_with_retry, parse_downloaded_block, retrieve_block_hash_with_retry,
download_block_with_retry, retrieve_block_hash_with_retry,
standardize_bitcoin_block, BitcoinBlockFullBreakdown,
},
observer::BitcoinConfig,
Expand Down Expand Up @@ -786,7 +786,7 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
let future =
download_block_with_retry(&block_hash, &moved_bitcoin_config, &moved_ctx);
let block_data = hiro_system_kit::nestable_block_on(future).unwrap();
let _ = block_data_tx.send(Some(block_data));
let _ = block_data_tx.send(Some((block_height, block_hash, block_data)));
});
}
let res = retrieve_block_data_pool.join();
Expand All @@ -796,10 +796,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(

let _ = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
while let Ok(Some(downloaded_block)) = block_data_rx.recv() {
while let Ok(Some((_, _, block_data))) = block_data_rx.recv() {
let block_compressed_tx_moved = block_compressed_tx.clone();
compress_block_data_pool.execute(move || {
let block_data = parse_downloaded_block(downloaded_block).unwrap();
let compressed_block = CompactedBlock::from_full_block(&block_data);
let block_index = block_data.height as u32;
let _ = block_compressed_tx_moved.send(Some((
Expand Down
14 changes: 12 additions & 2 deletions components/chainhook-event-observer/src/indexer/bitcoin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,21 @@ pub async fn download_block_with_retry(
block_hash: &str,
bitcoin_config: &BitcoinConfig,
ctx: &Context,
) -> Result<Vec<u8>, String> {
) -> Result<BitcoinBlockFullBreakdown, String> {
let mut errors_count = 0;
let block = loop {
match download_block(block_hash, bitcoin_config, ctx).await {
Ok(result) => break result,
Ok(result) => match parse_downloaded_block(result) {
Ok(result) => break result,
Err(e) => {
errors_count += 1;
error!(
"unable to retrieve block #{block_hash} (attempt #{errors_count}): {}",
e.to_string()
);
std::thread::sleep(std::time::Duration::from_millis(500));
}
},
Err(e) => {
errors_count += 1;
error!(
Expand Down

0 comments on commit bad1ee6

Please sign in to comment.