Skip to content

Commit

Permalink
feat: revisit threading model
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Aug 1, 2023
1 parent 31d9980 commit 05b6d5c
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 120 deletions.
1 change: 1 addition & 0 deletions components/hord-cli/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl Config {
HordConfig {
network_thread_max: self.limits.max_number_of_networking_threads,
ingestion_thread_max: self.limits.max_number_of_processing_threads,
ingestion_thread_queue_size: 4,
cache_size: self.limits.max_caching_memory_size_mb,
db_path: self.expected_cache_path(),
first_inscription_height: match self.network.bitcoin_network {
Expand Down
158 changes: 84 additions & 74 deletions components/hord-cli/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::{
collections::{BTreeMap, HashMap, VecDeque},
hash::BuildHasherDefault,
path::PathBuf,
sync::{mpsc::Sender, Arc}, thread::sleep, time::Duration,
sync::{mpsc::Sender, Arc},
thread::sleep,
time::Duration,
};

use chainhook_sdk::{
Expand Down Expand Up @@ -760,12 +762,12 @@ pub fn find_all_inscriptions_in_block(
block_height: &u64,
inscriptions_db_conn: &Connection,
ctx: &Context,
) -> Vec<(TransactionIdentifier, TraversalResult)> {
) -> BTreeMap<(TransactionIdentifier, usize), TraversalResult> {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT inscription_number, ordinal_number, inscription_id FROM inscriptions where block_height = ? ORDER BY inscription_number ASC")
.unwrap();
let mut results = vec![];
let mut results = BTreeMap::new();
let mut rows = stmt.query(args).unwrap();

let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_conn, ctx);
Expand All @@ -789,7 +791,10 @@ pub fn find_all_inscriptions_in_block(
transaction_identifier_inscription: transaction_identifier_inscription.clone(),
transfer_data: transfer_data,
};
results.push((transaction_identifier_inscription, traversal));
results.insert(
(transaction_identifier_inscription, inscription_input_index),
traversal,
);
}
return results;
}
Expand Down Expand Up @@ -1009,8 +1014,7 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
let moved_ctx = moved_ctx.clone();
let moved_http_client = http_client.clone();
retrieve_block_data_pool.execute(move || {
moved_ctx
.try_log(|logger| debug!(logger, "Fetching block #{block_height}"));
moved_ctx.try_log(|logger| debug!(logger, "Fetching block #{block_height}"));
let future = download_block_with_retry(
&moved_http_client,
&block_hash,
Expand Down Expand Up @@ -2280,8 +2284,7 @@ pub async fn rebuild_rocks_db(

let number_of_blocks_to_process = end_block - start_block + 1;

let compress_block_data_pool = ThreadPool::new(hord_config.ingestion_thread_max);
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::unbounded();
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(50);
let http_client = build_http_client();

let moved_config = bitcoin_config.clone();
Expand All @@ -2292,12 +2295,12 @@ pub async fn rebuild_rocks_db(

let mut block_heights = VecDeque::from((start_block..=end_block).collect::<Vec<u64>>());

for _ in 0..8 {
for _ in 0..hord_config.network_thread_max {
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = moved_ctx.clone();
let http_client = moved_http_client.clone();
sleep(Duration::from_millis(500));
sleep(Duration::from_millis(200));
set.spawn(try_download_block_bytes_with_retry(
http_client,
block_height,
Expand All @@ -2312,40 +2315,39 @@ pub async fn rebuild_rocks_db(

let mut tx_thread_pool = vec![];
let mut rx_thread_pool = vec![];
let mut thread_pool_handles = vec![];

for _ in 0..hord_config.ingestion_thread_max {
let (tx, rx) = bounded::<Option<Vec<u8>>>(8);
let (tx, rx) = bounded::<Option<Vec<u8>>>(hord_config.ingestion_thread_queue_size);
tx_thread_pool.push(tx);
rx_thread_pool.push(rx);
}

let compression_thread = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
for rx in rx_thread_pool.into_iter() {
let block_compressed_tx_moved = block_compressed_tx.clone();
let moved_ctx: Context = moved_ctx.clone();
let moved_bitcoin_network = moved_bitcoin_network.clone();
compress_block_data_pool.execute(move || {
while let Ok(Some(block_bytes)) = rx.recv() {
let raw_block_data =
parse_downloaded_block(block_bytes).expect("unable to parse block");
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
.expect("unable to compress block");
let block_data = hord::parse_ordinals_and_standardize_block(
raw_block_data,
&moved_bitcoin_network,
&moved_ctx,
)
.expect("unable to deserialize block");
for rx in rx_thread_pool.into_iter() {
let block_compressed_tx_moved = block_compressed_tx.clone();
let moved_ctx: Context = moved_ctx.clone();
let moved_bitcoin_network = moved_bitcoin_network.clone();

let handle = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
while let Ok(Some(block_bytes)) = rx.recv() {
let raw_block_data =
parse_downloaded_block(block_bytes).expect("unable to parse block");
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
.expect("unable to compress block");
let block_data = hord::parse_ordinals_and_standardize_block(
raw_block_data,
&moved_bitcoin_network,
&moved_ctx,
)
.expect("unable to deserialize block");

let _ =
block_compressed_tx_moved.send(Some((block_data, compressed_block)));
}
});
}
let _ = compress_block_data_pool.join();
})
.expect("unable to spawn thread");
let _ = block_compressed_tx_moved.send(Some((block_data, compressed_block)));
}
})
.expect("unable to spawn thread");
thread_pool_handles.push(handle);
}

let cloned_ctx = ctx.clone();

Expand All @@ -2354,52 +2356,56 @@ pub async fn rebuild_rocks_db(
let mut inbox = HashMap::new();
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
let mut blocks_processed = 0;
while let Ok(Some((block, compacted_block))) =
block_compressed_rx.recv()
{
blocks_processed += 1;
let block_index = block.block_identifier.index;

// In the context of ordinals, we're constrained to process blocks sequentially
// Blocks are processed by a threadpool and could be coming out of order.
// Inbox block for later if the current block is not the one we should be
// processing.
if block_index >= start_sequencing_blocks_at_height {
inbox.insert(block_index, (block, compacted_block));
let mut chunk = Vec::new();
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
cloned_ctx.try_log(|logger| {
info!(
logger,
"Dequeuing block #{inbox_cursor} for processing (# blocks inboxed: {})",
inbox.len()
)
});
chunk.push((block, compacted_block));
inbox_cursor += 1;
}
if chunk.is_empty() {
// Early return / wait for next block
cloned_ctx.try_log(|logger| {
info!(logger, "Inboxing compacted block #{block_index}")
});
continue;

loop {
// Dequeue all the blocks available
let mut new_blocks = vec![];
while let Ok(Some((block, compacted_block))) = block_compressed_rx.try_recv()
{
blocks_processed += 1;
new_blocks.push((block, compacted_block))
}
//
let mut ooo_processing = vec![];
for (block, compacted_block) in new_blocks.into_iter() {
let block_index = block.block_identifier.index;
if block_index >= start_sequencing_blocks_at_height {
inbox.insert(block_index, (block, compacted_block));
} else {
if let Some(ref tx) = blocks_post_processor {
let _ = tx.send(chunk);
}
ooo_processing.push((block, compacted_block));
// todo: do something
}
}

if blocks_processed == number_of_blocks_to_process {
// In order processing: construct the longest sequence of known blocks
let mut chunk = Vec::new();
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
cloned_ctx.try_log(|logger| {
info!(
logger,
"Local block storage successfully seeded with #{blocks_processed} blocks"
"Adding block #{inbox_cursor} to next sequence (# blocks inboxed: {})",
inbox.len()
)
});
break;
chunk.push((block, compacted_block));
inbox_cursor += 1;
}
if !chunk.is_empty() {
if let Some(ref tx) = blocks_post_processor {
let _ = tx.send(chunk);
}
} else {
if blocks_processed == number_of_blocks_to_process {
cloned_ctx.try_log(|logger| {
info!(
logger,
"Local block storage successfully seeded with #{blocks_processed} blocks"
)
});
break;
}
}
sleep(Duration::from_secs(3));
}
()
})
Expand Down Expand Up @@ -2429,7 +2435,11 @@ pub async fn rebuild_rocks_db(
for tx in tx_thread_pool.iter() {
let _ = tx.send(None);
}
let _ = compression_thread.join();

for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}

let _ = storage_thread.join();
let _ = set.shutdown();

Expand Down
Loading

0 comments on commit 05b6d5c

Please sign in to comment.