Skip to content

Commit

Permalink
subsystem: Remove custom event loop support
Browse files Browse the repository at this point in the history
* Generalized the built-in event loop with
  * Custom initialization
  * Separation between subsystem state object and interface
  * Ability to specify background work in the `Subsystem` trait
* Ported `p2p` to the built-in event loop
* Ported tests to the built-in event loop
* Removed support for custom event loop from subsystem
* Substantial `subsystem` crate code reorganization
  • Loading branch information
iljakuklic committed Oct 3, 2023
1 parent fb69030 commit 788b1ba
Show file tree
Hide file tree
Showing 57 changed files with 1,364 additions and 1,294 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 35 additions & 50 deletions blockprod/src/detail/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,10 @@ use crypto::{
use mempool::{
error::{Error, TxValidationError},
tx_accumulator::DefaultTxAccumulator,
MempoolInterface, MempoolSubsystemInterface,
};
use mocks::{MockChainstateInterface, MockMempoolInterface};
use rstest::rstest;
use subsystem::{error::ResponseError, subsystem::CallRequest};
use subsystem::error::ResponseError;
use test_utils::{
mock_time_getter::mocked_time_getter_seconds,
random::{make_seedable_rng, Seed},
Expand Down Expand Up @@ -86,39 +85,35 @@ mod collect_transactions {
)))
});

let mock_mempool_subsystem = manager.add_subsystem_with_custom_eventloop("mock-mempool", {
move |call, shutdn| async move {
mock_mempool.run(call, shutdn).await;
}
});
let mock_mempool_subsystem = manager.add_subsystem("mock-mempool", mock_mempool);

let current_tip = Id::new(H256::zero());

manager.add_subsystem_with_custom_eventloop(
"test-call",
move |_: CallRequest<()>, _| async move {
let block_production = BlockProduction::new(
chain_config,
Arc::new(test_blockprod_config()),
chainstate,
mock_mempool_subsystem,
p2p,
Default::default(),
prepare_thread_pool(1),
)
.expect("Error initializing blockprod");
let shutdown = manager.make_shutdown_trigger();
let tester = tokio::spawn(async move {
let block_production = BlockProduction::new(
chain_config,
Arc::new(test_blockprod_config()),
chainstate,
mock_mempool_subsystem,
p2p,
Default::default(),
prepare_thread_pool(1),
)
.expect("Error initializing blockprod");

let accumulator =
block_production.collect_transactions(current_tip, DUMMY_TIMESTAMP).await;
let accumulator =
block_production.collect_transactions(current_tip, DUMMY_TIMESTAMP).await;

match accumulator {
Err(BlockProductionError::MempoolChannelClosed) => {}
_ => panic!("Expected collect_tx() to fail"),
};
},
);
match accumulator {
Err(BlockProductionError::MempoolChannelClosed) => {}
_ => panic!("Expected collect_tx() to fail"),
};

manager.main().await;
shutdown.initiate();
});

let _ = tokio::join!(manager.main(), tester);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand All @@ -127,19 +122,14 @@ mod collect_transactions {
setup_blockprod_test(None, None);

let mock_mempool = MockMempoolInterface::default();

let mock_mempool_subsystem = manager.add_subsystem_with_custom_eventloop("mock-mempool", {
move |call: CallRequest<dyn MempoolInterface>, shutdn| async move {
mock_mempool.run(call, shutdn).await;
}
});
let mock_mempool_subsystem = manager.add_subsystem("mock-mempool", mock_mempool);

mock_mempool_subsystem
.call({
.as_submit_only()
.submit({
let shutdown = manager.make_shutdown_trigger();
move |_| shutdown.initiate()
})
.submit_only()
.unwrap();

// shutdown straight after startup, *then* call collect_transactions()
Expand Down Expand Up @@ -190,11 +180,7 @@ mod collect_transactions {
})
.times(1);

let mock_mempool_subsystem = manager.add_subsystem_with_custom_eventloop("mock-mempool", {
move |call, shutdn| async move {
mock_mempool.run(call, shutdn).await;
}
});
let mock_mempool_subsystem = manager.add_subsystem("mock-mempool", mock_mempool);

let current_tip = Id::new(H256::zero());

Expand Down Expand Up @@ -236,16 +222,18 @@ mod produce_block {
use utils::atomics::SeqCstAtomicU64;

use super::*;
use chainstate::chainstate_interface::ChainstateInterface;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn initial_block_download() {
let (mut manager, chain_config, _, mempool, p2p) = setup_blockprod_test(None, None);

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = Box::new(MockChainstateInterface::new());
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_is_initial_block_download().returning(|| true);

mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());

manager.add_subsystem("mock-chainstate", mock_chainstate)
};

Expand Down Expand Up @@ -345,6 +333,7 @@ mod produce_block {
))
});

let mock_chainstate: Box<dyn ChainstateInterface> = Box::new(mock_chainstate);
manager.add_subsystem("mock-chainstate", mock_chainstate)
};

Expand Down Expand Up @@ -690,7 +679,7 @@ mod produce_block {
let (mut manager, chain_config, _, mempool, p2p) = setup_blockprod_test(None, None);

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = Box::new(MockChainstateInterface::new());
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

Expand Down Expand Up @@ -755,7 +744,7 @@ mod produce_block {
let (mut manager, chain_config, _, mempool, p2p) = setup_blockprod_test(None, None);

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = Box::new(MockChainstateInterface::new());
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

Expand Down Expand Up @@ -826,11 +815,7 @@ mod produce_block {
)))
});

let mempool_subsystem = manager.add_subsystem_with_custom_eventloop("mock-mempool", {
move |call, shutdn| async move {
mock_mempool.run(call, shutdn).await;
}
});
let mempool_subsystem = manager.add_subsystem("mock-mempool", mock_mempool);

let join_handle = tokio::spawn({
let shutdown_trigger = manager.make_shutdown_trigger();
Expand Down
12 changes: 12 additions & 0 deletions blockprod/src/interface/blockprod_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,15 @@ impl BlockProductionInterface for BlockProduction {
Ok(block)
}
}

impl subsystem::Subsystem for Box<dyn BlockProductionInterface> {
type Interface = dyn BlockProductionInterface;

fn interface_ref(&self) -> &Self::Interface {
self.as_ref()
}

fn interface_mut(&mut self) -> &mut Self::Interface {
self.as_mut()
}
}
29 changes: 12 additions & 17 deletions blockprod/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ pub enum BlockProductionError {
JobManagerError(#[from] JobManagerError),
}

impl subsystem::Subsystem for Box<dyn BlockProductionInterface> {}

pub type BlockProductionHandle = subsystem::Handle<Box<dyn BlockProductionInterface>>;
pub type BlockProductionSubsystem = Box<dyn BlockProductionInterface>;
pub type BlockProductionHandle = subsystem::Handle<dyn BlockProductionInterface>;

fn prepare_thread_pool(thread_count: u16) -> Arc<slave_pool::ThreadPool> {
let mining_thread_pool = Arc::new(slave_pool::ThreadPool::new());
Expand All @@ -88,7 +87,7 @@ pub fn make_blockproduction(
mempool_handle: MempoolHandle,
p2p_handle: P2pHandle,
time_getter: TimeGetter,
) -> Result<Box<dyn BlockProductionInterface>, BlockProductionError> {
) -> Result<BlockProductionSubsystem, BlockProductionError> {
// TODO: make the number of threads configurable
let thread_count = 2;
let mining_thread_pool = prepare_thread_pool(thread_count);
Expand Down Expand Up @@ -138,7 +137,7 @@ mod tests {
random::Rng,
vrf::{VRFKeyKind, VRFPrivateKey},
};
use mempool::{MempoolHandle, MempoolSubsystemInterface};
use mempool::MempoolHandle;
use p2p::{
peer_manager::peerdb::storage_impl::PeerDbStorageImpl, testing_utils::test_p2p_config,
};
Expand Down Expand Up @@ -194,8 +193,9 @@ mod tests {
MempoolHandle,
P2pHandle,
) {
let mut manager = Manager::new("blockprod-unit-test");
manager.install_signal_handlers();
let manager_config =
subsystem::ManagerConfig::new("blockprod-unit-test").enable_signal_handlers();
let mut manager = Manager::new_with_config(manager_config);

let chain_config = Arc::new(chain_config.unwrap_or_else(create_unit_test_config));

Expand Down Expand Up @@ -227,26 +227,21 @@ mod tests {
subsystem::Handle::clone(&chainstate),
time_getter.clone(),
);
let mempool = manager.add_subsystem_with_custom_eventloop("mempool", {
move |call, shutdn| mempool.run(call, shutdn)
});
let mempool = manager.add_custom_subsystem("mempool", |hdl| mempool.init(hdl));

let mut p2p_config = test_p2p_config();
p2p_config.bind_addresses = vec!["127.0.0.1:0".to_owned()];

let p2p = p2p::make_p2p(
Arc::clone(&chain_config),
Arc::new(p2p_config),
chainstate.clone(),
subsystem::Handle::clone(&chainstate),
mempool.clone(),
time_getter,
PeerDbStorageImpl::new(InMemory::new()).unwrap(),
)
.expect("P2p initialization was successful");

let p2p = manager.add_subsystem_with_custom_eventloop("p2p", {
move |call, shutdown| p2p.run(call, shutdown)
});
.expect("P2p initialization was successful")
.add_to_manager("p2p", &mut manager);

(manager, chain_config, chainstate, mempool, p2p)
}
Expand Down Expand Up @@ -340,7 +335,7 @@ mod tests {
)
.expect("Error initializing blockprod");

let blockprod = manager.add_subsystem("blockprod", blockprod);
let blockprod = manager.add_direct_subsystem("blockprod", blockprod);
let shutdown = manager.make_shutdown_trigger();

tokio::spawn(async move {
Expand Down
1 change: 1 addition & 0 deletions chainstate/launcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ logging = {path = '../../logging'}
storage = { path = "../../storage" }
storage-inmemory = { path = "../../storage/inmemory" }
storage-lmdb = { path = "../../storage/lmdb" }
subsystem = { path = "../../subsystem" }
utils = { path = '../../utils' }
6 changes: 3 additions & 3 deletions chainstate/launcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use storage_lmdb::resize_callback::MapResizeCallback;
// Some useful reexports
pub use chainstate::{
chainstate_interface::ChainstateInterface, ChainstateConfig, ChainstateError as Error,
DefaultTransactionVerificationStrategy,
ChainstateSubsystem, DefaultTransactionVerificationStrategy,
};
pub use common::chain::ChainConfig;
pub use config::{ChainstateLauncherConfig, StorageBackendConfig};
Expand All @@ -38,7 +38,7 @@ fn make_chainstate_and_storage_impl<B: 'static + storage::Backend>(
storage_backend: B,
chain_config: Arc<ChainConfig>,
chainstate_config: ChainstateConfig,
) -> Result<Box<dyn ChainstateInterface>, Error> {
) -> Result<ChainstateSubsystem, Error> {
let storage = chainstate_storage::Store::new(storage_backend, &chain_config)
.map_err(|e| Error::FailedToInitializeChainstate(e.into()))?;

Expand All @@ -61,7 +61,7 @@ pub fn make_chainstate(
datadir: &std::path::Path,
chain_config: Arc<ChainConfig>,
config: ChainstateLauncherConfig,
) -> Result<Box<dyn ChainstateInterface>, Error> {
) -> Result<ChainstateSubsystem, Error> {
let ChainstateLauncherConfig {
storage_backend,
chainstate_config,
Expand Down
12 changes: 12 additions & 0 deletions chainstate/src/interface/chainstate_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,3 +673,15 @@ fn get_output_coin_amount(

Ok(amount)
}

impl subsystem::Subsystem for Box<dyn ChainstateInterface> {
type Interface = dyn ChainstateInterface;

fn interface_ref(&self) -> &Self::Interface {
self.as_ref()
}

fn interface_mut(&mut self) -> &mut Self::Interface {
self.as_mut()
}
}
6 changes: 3 additions & 3 deletions chainstate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ impl HasTxIndexDisabledError for ChainstateError {
}
}

impl subsystem::Subsystem for Box<dyn ChainstateInterface> {}
pub type ChainstateSubsystem = Box<dyn ChainstateInterface>;

pub type ChainstateHandle = subsystem::Handle<Box<dyn ChainstateInterface>>;
pub type ChainstateHandle = subsystem::Handle<dyn ChainstateInterface>;

pub fn make_chainstate<S, V>(
chain_config: Arc<ChainConfig>,
Expand All @@ -84,7 +84,7 @@ pub fn make_chainstate<S, V>(
tx_verification_strategy: V,
custom_orphan_error_hook: Option<Arc<detail::OrphanErrorHandler>>,
time_getter: TimeGetter,
) -> Result<Box<dyn ChainstateInterface>, ChainstateError>
) -> Result<ChainstateSubsystem, ChainstateError>
where
S: chainstate_storage::BlockchainStorage + Sync + 'static,
V: TransactionVerificationStrategy + Sync + 'static,
Expand Down
11 changes: 6 additions & 5 deletions chainstate/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ mod test {
let chain_config = Arc::new(common::chain::config::create_unit_test_config());
let chainstate_config = ChainstateConfig::new();
let mut man = subsystem::Manager::new("rpctest");
let shutdown = man.make_shutdown_trigger();
let handle = man.add_subsystem(
"chainstate",
crate::make_chainstate(
Expand All @@ -354,11 +355,11 @@ mod test {
)
.unwrap(),
);
let _ = man.add_subsystem_with_custom_eventloop(
"test",
move |_: subsystem::CallRequest<()>, _| proc(handle),
);
man.main().await;
let tester = tokio::spawn(async move {
proc(handle);
shutdown.initiate();
});
let _ = tokio::join!(man.main(), tester);
}

#[tokio::test]
Expand Down
Loading

0 comments on commit 788b1ba

Please sign in to comment.