Skip to content

Commit

Permalink
feat: use thread pools for scans
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Apr 13, 2023
1 parent daf5547 commit 45b9abd
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/chainhook-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ ansi_term = "0.12.1"
atty = "0.2.14"
crossbeam-channel = "0.5.6"
uuid = { version = "1.3.0", features = ["v4", "fast-rng"] }
threadpool = "1.8.1"

[dev-dependencies]
criterion = "0.3"
Expand Down
118 changes: 75 additions & 43 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ use chainhook_event_observer::observer::{start_event_observer, ApiKey, ObserverE
use chainhook_event_observer::utils::Context;
use chainhook_types::{BitcoinBlockSignaling, StacksBlockData, StacksChainEvent};
use redis::{Commands, Connection};
use threadpool::ThreadPool;

use std::sync::mpsc::channel;

pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 12;
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 12;

pub struct Service {
config: Config,
Expand Down Expand Up @@ -110,14 +113,6 @@ impl Service {
);
}

// let ordinal_index = match initialize_ordinal_index(&event_observer_config, None, &self.ctx)
// {
// Ok(index) => index,
// Err(e) => {
// panic!()
// }
// };

let context_cloned = self.ctx.clone();
let event_observer_config_moved = event_observer_config.clone();
let _ = std::thread::spawn(move || {
Expand All @@ -131,6 +126,76 @@ impl Service {
let _ = hiro_system_kit::nestable_block_on(future);
});

// Stacks scan operation threadpool
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
let stacks_scan_pool = ThreadPool::new(STACKS_SCAN_THREAD_POOL_SIZE);
let ctx = self.ctx.clone();
let config = self.config.clone();
let _ = hiro_system_kit::thread_named("Stacks scan runloop")
.spawn(move || {
while let Ok(predicate_spec) = stacks_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let mut moved_config = config.clone();
stacks_scan_pool.execute(move || {
let op = scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec,
&mut moved_config,
&moved_ctx,
);
let end_block = match hiro_system_kit::nestable_block_on(op) {
Ok(end_block) => end_block,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Stacks chainstate: {e}",
);
return;
}
};
info!(
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}", end_block.index
);
});
}
let res = stacks_scan_pool.join();
res
})
.expect("unable to spawn thread");

// Bitcoin scan operation threadpool
let (bitcoin_scan_op_tx, bitcoin_scan_op_rx) = crossbeam_channel::unbounded();
let bitcoin_scan_pool = ThreadPool::new(BITCOIN_SCAN_THREAD_POOL_SIZE);
let ctx = self.ctx.clone();
let config = self.config.clone();
let _ = hiro_system_kit::thread_named("Bitcoin scan runloop")
.spawn(move || {
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
bitcoin_scan_pool.execute(move || {
let op = scan_bitcoin_chainstate_via_http_using_predicate(
predicate_spec,
&moved_config,
&moved_ctx,
);

match hiro_system_kit::nestable_block_on(op) {
Ok(_) => {}
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
}
};
});
}
let res = bitcoin_scan_pool.join();
res
})
.expect("unable to spawn thread");

loop {
let event = match observer_event_rx.recv() {
Ok(cmd) => cmd,
Expand Down Expand Up @@ -178,43 +243,10 @@ impl Service {
}
match chainhook {
ChainhookSpecification::Stacks(predicate_spec) => {
let end_block = match scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec,
&mut self.config,
&self.ctx,
)
.await
{
Ok(end_block) => end_block,
Err(e) => {
error!(
self.ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
continue;
}
};
info!(
self.ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}", end_block.index
);
let _ = stacks_scan_op_tx.send(predicate_spec);
}
ChainhookSpecification::Bitcoin(predicate_spec) => {
match scan_bitcoin_chainstate_via_http_using_predicate(
predicate_spec,
&self.config,
&self.ctx,
)
.await
{
Ok(_) => {}
Err(e) => {
error!(
self.ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
}
};
let _ = bitcoin_scan_op_tx.send(predicate_spec);
}
}
}
Expand Down

0 comments on commit 45b9abd

Please sign in to comment.