Skip to content

Commit

Permalink
feat: in-house thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Aug 1, 2023
1 parent 2f172e0 commit bc5ffdd
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 34 deletions.
17 changes: 4 additions & 13 deletions components/hord-cli/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2326,7 +2326,8 @@ pub async fn rebuild_rocks_db(
let moved_bitcoin_network = moved_bitcoin_network.clone();
compress_block_data_pool.execute(move || {
while let Ok(Some(block_bytes)) = rx.recv() {
let raw_block_data = parse_downloaded_block(block_bytes).expect("unable to parse block");
let raw_block_data =
parse_downloaded_block(block_bytes).expect("unable to parse block");
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
.expect("unable to compress block");
let block_data = hord::parse_ordinals_and_standardize_block(
Expand Down Expand Up @@ -2422,12 +2423,7 @@ pub async fn rebuild_rocks_db(
thread_index = (thread_index + 1) % hord_config.ingestion_thread_max;
}

ctx.try_log(|logger| {
info!(
logger,
"Gargbage collecting will start"
)
});
ctx.try_log(|logger| info!(logger, "Gargbage collecting will start"));

for tx in tx_thread_pool.iter() {
let _ = tx.send(None);
Expand All @@ -2436,12 +2432,7 @@ pub async fn rebuild_rocks_db(
let _ = storage_thread.join();
let _ = set.shutdown();

ctx.try_log(|logger| {
info!(
logger,
"Gargbage collecting did finish"
)
});
ctx.try_log(|logger| info!(logger, "Gargbage collecting did finish"));

// match guard.report().build() {
// Ok(report) => {
Expand Down
51 changes: 30 additions & 21 deletions components/hord-cli/src/hord/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,8 +1192,8 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
let (traversal_tx, traversal_rx) = channel();

let traversal_data_pool = ThreadPool::new(thread_max);
let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];

for thread_index in 0..thread_max {
let (tx, rx) = channel();
Expand All @@ -1204,23 +1204,30 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
let moved_hord_db_path = hord_config.db_path.clone();
let local_cache = cache_l2.clone();

traversal_data_pool.execute(move || {
while let Ok(Some((transaction_id, block_identifier, input_index, prioritary))) =
rx.recv()
{
let traversal: Result<TraversalResult, String> =
retrieve_satoshi_point_using_lazy_storage_v3(
&moved_hord_db_path,
&block_identifier,
&transaction_id,
input_index,
0,
&local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
}
});
let handle = hiro_system_kit::thread_named("Worker")
.spawn(move || {
while let Ok(Some((
transaction_id,
block_identifier,
input_index,
prioritary,
))) = rx.recv()
{
let traversal: Result<TraversalResult, String> =
retrieve_satoshi_point_using_lazy_storage_v3(
&moved_hord_db_path,
&block_identifier,
&transaction_id,
input_index,
0,
&local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
}
})
.expect("unable to spawn thread");
thread_pool_handles.push(handle);
}

// Empty cache
Expand Down Expand Up @@ -1335,12 +1342,14 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
}
}
}
for thread_index in 0..thread_max {
let _ = tx_thread_pool[thread_index].send(None);
for tx in tx_thread_pool.iter() {
let _ = tx.send(None);
}

let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
let _ = traversal_data_pool.join();
for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
});
} else {
ctx.try_log(|logger| {
Expand Down

0 comments on commit bc5ffdd

Please sign in to comment.