Skip to content

Commit

Permalink
feat: batch ingestion, improve cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Aug 1, 2023
1 parent 41ecace commit 168162e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 22 deletions.
62 changes: 41 additions & 21 deletions components/hord-cli/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2318,7 +2318,7 @@ pub async fn rebuild_rocks_db(
rx_thread_pool.push(rx);
}

let _ = hiro_system_kit::thread_named("Block data compression")
let compression_thread = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
for rx in rx_thread_pool.into_iter() {
let block_compressed_tx_moved = block_compressed_tx.clone();
Expand All @@ -2328,7 +2328,7 @@ pub async fn rebuild_rocks_db(
while let Ok(Some(block_bytes)) = rx.recv() {
let raw_block_data = parse_downloaded_block(block_bytes).unwrap();
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
.expect("unable to serialize block");
.expect("unable to compress block");
let block_data = hord::parse_ordinals_and_standardize_block(
raw_block_data,
&moved_bitcoin_network,
Expand All @@ -2347,7 +2347,7 @@ pub async fn rebuild_rocks_db(

let cloned_ctx = ctx.clone();

let _storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
let storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
.spawn(move || {
let mut inbox = HashMap::new();
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
Expand All @@ -2367,7 +2367,7 @@ pub async fn rebuild_rocks_db(
let mut chunk = Vec::new();
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
cloned_ctx.try_log(|logger| {
slog::info!(
info!(
logger,
"Dequeuing block #{inbox_cursor} for processing (# blocks inboxed: {})",
inbox.len()
Expand All @@ -2379,7 +2379,7 @@ pub async fn rebuild_rocks_db(
if chunk.is_empty() {
// Early return / wait for next block
cloned_ctx.try_log(|logger| {
slog::info!(logger, "Inboxing compacted block #{block_index}")
info!(logger, "Inboxing compacted block #{block_index}")
});
continue;
} else {
Expand All @@ -2391,27 +2391,12 @@ pub async fn rebuild_rocks_db(

if blocks_processed == number_of_blocks_to_process {
cloned_ctx.try_log(|logger| {
slog::info!(
info!(
logger,
"Local block storage successfully seeded with #{blocks_processed} blocks"
)
});
break;
// match guard.report().build() {
// Ok(report) => {
// ctx.try_log(|logger| {
// slog::info!(logger, "Generating report");
// });

// let file = std::fs::File::create("hord-perf.svg").unwrap();
// report.flamegraph(file).unwrap();
// }
// Err(e) => {
// ctx.try_log(|logger| {
// slog::error!(logger, "Reporting failed: {}", e.to_string());
// });
// }
// }
}
}
()
Expand All @@ -2437,6 +2422,41 @@ pub async fn rebuild_rocks_db(
thread_index = (thread_index + 1) % hord_config.ingestion_thread_max;
}

ctx.try_log(|logger| {
info!(
logger,
"Gargbage collecting will start"
)
});

for tx in tx_thread_pool.iter() {
let _ = tx.send(None);
}
let _ = compression_thread.join();
let _ = storage_thread.join();
let _ = set.shutdown();

ctx.try_log(|logger| {
info!(
logger,
"Gargbage collecting did finish"
)
});

// match guard.report().build() {
// Ok(report) => {
// ctx.try_log(|logger| {
// slog::info!(logger, "Generating report");
// });
// let file = std::fs::File::create("hord-perf.svg").unwrap();
// report.flamegraph(file).unwrap();
// }
// Err(e) => {
// ctx.try_log(|logger| {
// slog::error!(logger, "Reporting failed: {}", e.to_string());
// });
// }
// }

Ok(())
}
2 changes: 1 addition & 1 deletion components/hord-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Service {
rebuild_rocks_db(
&self.config,
start_block,
end_block,
end_block.min(start_block + 256),
hord_config.first_inscription_height,
Some(tx.clone()),
&self.ctx,
Expand Down

0 comments on commit 168162e

Please sign in to comment.