Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subsystem: Remove custom event loop support #1256

Merged
merged 3 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 35 additions & 50 deletions blockprod/src/detail/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,10 @@ use crypto::{
use mempool::{
error::{Error, TxValidationError},
tx_accumulator::DefaultTxAccumulator,
MempoolInterface, MempoolSubsystemInterface,
};
use mocks::{MockChainstateInterface, MockMempoolInterface};
use rstest::rstest;
use subsystem::{error::ResponseError, subsystem::CallRequest};
use subsystem::error::ResponseError;
use test_utils::{
mock_time_getter::mocked_time_getter_seconds,
random::{make_seedable_rng, Seed},
Expand Down Expand Up @@ -86,39 +85,35 @@ mod collect_transactions {
)))
});

let mock_mempool_subsystem = manager.add_subsystem_with_custom_eventloop("mock-mempool", {
move |call, shutdn| async move {
mock_mempool.run(call, shutdn).await;
}
});
let mock_mempool_subsystem = manager.add_subsystem("mock-mempool", mock_mempool);

let current_tip = Id::new(H256::zero());

manager.add_subsystem_with_custom_eventloop(
"test-call",
move |_: CallRequest<()>, _| async move {
let block_production = BlockProduction::new(
chain_config,
Arc::new(test_blockprod_config()),
chainstate,
mock_mempool_subsystem,
p2p,
Default::default(),
prepare_thread_pool(1),
)
.expect("Error initializing blockprod");
let shutdown = manager.make_shutdown_trigger();
let tester = tokio::spawn(async move {
let block_production = BlockProduction::new(
chain_config,
Arc::new(test_blockprod_config()),
chainstate,
mock_mempool_subsystem,
p2p,
Default::default(),
prepare_thread_pool(1),
)
.expect("Error initializing blockprod");

let accumulator =
block_production.collect_transactions(current_tip, DUMMY_TIMESTAMP).await;
let accumulator =
block_production.collect_transactions(current_tip, DUMMY_TIMESTAMP).await;

match accumulator {
Err(BlockProductionError::MempoolChannelClosed) => {}
_ => panic!("Expected collect_tx() to fail"),
};
},
);
match accumulator {
Err(BlockProductionError::MempoolChannelClosed) => {}
_ => panic!("Expected collect_tx() to fail"),
};

manager.main().await;
shutdown.initiate();
});

let _ = tokio::join!(manager.main(), tester);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand All @@ -127,19 +122,14 @@ mod collect_transactions {
setup_blockprod_test(None, None);

let mock_mempool = MockMempoolInterface::default();

let mock_mempool_subsystem = manager.add_subsystem_with_custom_eventloop("mock-mempool", {
move |call: CallRequest<dyn MempoolInterface>, shutdn| async move {
mock_mempool.run(call, shutdn).await;
}
});
let mock_mempool_subsystem = manager.add_subsystem("mock-mempool", mock_mempool);

mock_mempool_subsystem
.call({
.as_submit_only()
.submit({
let shutdown = manager.make_shutdown_trigger();
move |_| shutdown.initiate()
})
.submit_only()
.unwrap();

// shutdown straight after startup, *then* call collect_transactions()
Expand Down Expand Up @@ -190,11 +180,7 @@ mod collect_transactions {
})
.times(1);

let mock_mempool_subsystem = manager.add_subsystem_with_custom_eventloop("mock-mempool", {
move |call, shutdn| async move {
mock_mempool.run(call, shutdn).await;
}
});
let mock_mempool_subsystem = manager.add_subsystem("mock-mempool", mock_mempool);

let current_tip = Id::new(H256::zero());

Expand Down Expand Up @@ -237,16 +223,18 @@ mod produce_block {
use utils::atomics::SeqCstAtomicU64;

use super::*;
use chainstate::chainstate_interface::ChainstateInterface;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn initial_block_download() {
let (mut manager, chain_config, _, mempool, p2p) = setup_blockprod_test(None, None);

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = Box::new(MockChainstateInterface::new());
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_is_initial_block_download().returning(|| true);

mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());

manager.add_subsystem("mock-chainstate", mock_chainstate)
};

Expand Down Expand Up @@ -346,6 +334,7 @@ mod produce_block {
))
});

let mock_chainstate: Box<dyn ChainstateInterface> = Box::new(mock_chainstate);
manager.add_subsystem("mock-chainstate", mock_chainstate)
};

Expand Down Expand Up @@ -691,7 +680,7 @@ mod produce_block {
let (mut manager, chain_config, _, mempool, p2p) = setup_blockprod_test(None, None);

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = Box::new(MockChainstateInterface::new());
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

Expand Down Expand Up @@ -756,7 +745,7 @@ mod produce_block {
let (mut manager, chain_config, _, mempool, p2p) = setup_blockprod_test(None, None);

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = Box::new(MockChainstateInterface::new());
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

Expand Down Expand Up @@ -827,11 +816,7 @@ mod produce_block {
)))
});

let mempool_subsystem = manager.add_subsystem_with_custom_eventloop("mock-mempool", {
move |call, shutdn| async move {
mock_mempool.run(call, shutdn).await;
}
});
let mempool_subsystem = manager.add_subsystem("mock-mempool", mock_mempool);

let join_handle = tokio::spawn({
let shutdown_trigger = manager.make_shutdown_trigger();
Expand Down
12 changes: 12 additions & 0 deletions blockprod/src/interface/blockprod_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,15 @@ impl BlockProductionInterface for BlockProduction {
Ok(block)
}
}

impl subsystem::Subsystem for Box<dyn BlockProductionInterface> {
type Interface = dyn BlockProductionInterface;

fn interface_ref(&self) -> &Self::Interface {
self.as_ref()
}

fn interface_mut(&mut self) -> &mut Self::Interface {
self.as_mut()
}
}
29 changes: 12 additions & 17 deletions blockprod/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ pub enum BlockProductionError {
JobManagerError(#[from] JobManagerError),
}

impl subsystem::Subsystem for Box<dyn BlockProductionInterface> {}

pub type BlockProductionHandle = subsystem::Handle<Box<dyn BlockProductionInterface>>;
pub type BlockProductionSubsystem = Box<dyn BlockProductionInterface>;
pub type BlockProductionHandle = subsystem::Handle<dyn BlockProductionInterface>;

fn prepare_thread_pool(thread_count: u16) -> Arc<slave_pool::ThreadPool> {
let mining_thread_pool = Arc::new(slave_pool::ThreadPool::new());
Expand All @@ -88,7 +87,7 @@ pub fn make_blockproduction(
mempool_handle: MempoolHandle,
p2p_handle: P2pHandle,
time_getter: TimeGetter,
) -> Result<Box<dyn BlockProductionInterface>, BlockProductionError> {
) -> Result<BlockProductionSubsystem, BlockProductionError> {
// TODO: make the number of threads configurable
let thread_count = 2;
let mining_thread_pool = prepare_thread_pool(thread_count);
Expand Down Expand Up @@ -139,7 +138,7 @@ mod tests {
random::Rng,
vrf::{VRFKeyKind, VRFPrivateKey},
};
use mempool::{MempoolHandle, MempoolSubsystemInterface};
use mempool::MempoolHandle;
use p2p::{
peer_manager::peerdb::storage_impl::PeerDbStorageImpl, testing_utils::test_p2p_config,
};
Expand Down Expand Up @@ -195,8 +194,9 @@ mod tests {
MempoolHandle,
P2pHandle,
) {
let mut manager = Manager::new("blockprod-unit-test");
manager.install_signal_handlers();
let manager_config =
subsystem::ManagerConfig::new("blockprod-unit-test").enable_signal_handlers();
let mut manager = Manager::new_with_config(manager_config);

let chain_config = Arc::new(chain_config.unwrap_or_else(create_unit_test_config));

Expand Down Expand Up @@ -228,26 +228,21 @@ mod tests {
subsystem::Handle::clone(&chainstate),
time_getter.clone(),
);
let mempool = manager.add_subsystem_with_custom_eventloop("mempool", {
move |call, shutdn| mempool.run(call, shutdn)
});
let mempool = manager.add_custom_subsystem("mempool", |hdl| mempool.init(hdl));

let mut p2p_config = test_p2p_config();
p2p_config.bind_addresses = vec!["127.0.0.1:0".to_owned()];

let p2p = p2p::make_p2p(
Arc::clone(&chain_config),
Arc::new(p2p_config),
chainstate.clone(),
subsystem::Handle::clone(&chainstate),
mempool.clone(),
time_getter,
PeerDbStorageImpl::new(InMemory::new()).unwrap(),
)
.expect("P2p initialization was successful");

let p2p = manager.add_subsystem_with_custom_eventloop("p2p", {
move |call, shutdown| p2p.run(call, shutdown)
});
.expect("P2p initialization was successful")
.add_to_manager("p2p", &mut manager);

(manager, chain_config, chainstate, mempool, p2p)
}
Expand Down Expand Up @@ -341,7 +336,7 @@ mod tests {
)
.expect("Error initializing blockprod");

let blockprod = manager.add_subsystem("blockprod", blockprod);
let blockprod = manager.add_direct_subsystem("blockprod", blockprod);
let shutdown = manager.make_shutdown_trigger();

tokio::spawn(async move {
Expand Down
1 change: 1 addition & 0 deletions chainstate/launcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ logging = {path = '../../logging'}
storage = { path = "../../storage" }
storage-inmemory = { path = "../../storage/inmemory" }
storage-lmdb = { path = "../../storage/lmdb" }
subsystem = { path = "../../subsystem" }
utils = { path = '../../utils' }
6 changes: 3 additions & 3 deletions chainstate/launcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use storage_lmdb::resize_callback::MapResizeCallback;
// Some useful reexports
pub use chainstate::{
chainstate_interface::ChainstateInterface, ChainstateConfig, ChainstateError as Error,
DefaultTransactionVerificationStrategy,
ChainstateSubsystem, DefaultTransactionVerificationStrategy,
};
pub use common::chain::ChainConfig;
pub use config::{ChainstateLauncherConfig, StorageBackendConfig};
Expand All @@ -38,7 +38,7 @@ fn make_chainstate_and_storage_impl<B: 'static + storage::Backend>(
storage_backend: B,
chain_config: Arc<ChainConfig>,
chainstate_config: ChainstateConfig,
) -> Result<Box<dyn ChainstateInterface>, Error> {
) -> Result<ChainstateSubsystem, Error> {
let storage = chainstate_storage::Store::new(storage_backend, &chain_config)
.map_err(|e| Error::FailedToInitializeChainstate(e.into()))?;

Expand All @@ -61,7 +61,7 @@ pub fn make_chainstate(
datadir: &std::path::Path,
chain_config: Arc<ChainConfig>,
config: ChainstateLauncherConfig,
) -> Result<Box<dyn ChainstateInterface>, Error> {
) -> Result<ChainstateSubsystem, Error> {
let ChainstateLauncherConfig {
storage_backend,
chainstate_config,
Expand Down
12 changes: 12 additions & 0 deletions chainstate/src/interface/chainstate_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,3 +673,15 @@ fn get_output_coin_amount(

Ok(amount)
}

impl subsystem::Subsystem for Box<dyn ChainstateInterface> {
type Interface = dyn ChainstateInterface;

fn interface_ref(&self) -> &Self::Interface {
self.as_ref()
}

fn interface_mut(&mut self) -> &mut Self::Interface {
self.as_mut()
}
}
6 changes: 3 additions & 3 deletions chainstate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ impl HasTxIndexDisabledError for ChainstateError {
}
}

impl subsystem::Subsystem for Box<dyn ChainstateInterface> {}
pub type ChainstateSubsystem = Box<dyn ChainstateInterface>;

pub type ChainstateHandle = subsystem::Handle<Box<dyn ChainstateInterface>>;
pub type ChainstateHandle = subsystem::Handle<dyn ChainstateInterface>;

pub fn make_chainstate<S, V>(
chain_config: Arc<ChainConfig>,
Expand All @@ -84,7 +84,7 @@ pub fn make_chainstate<S, V>(
tx_verification_strategy: V,
custom_orphan_error_hook: Option<Arc<detail::OrphanErrorHandler>>,
time_getter: TimeGetter,
) -> Result<Box<dyn ChainstateInterface>, ChainstateError>
) -> Result<ChainstateSubsystem, ChainstateError>
where
S: chainstate_storage::BlockchainStorage + Sync + 'static,
V: TransactionVerificationStrategy + Sync + 'static,
Expand Down
11 changes: 6 additions & 5 deletions chainstate/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ mod test {
let chain_config = Arc::new(common::chain::config::create_unit_test_config());
let chainstate_config = ChainstateConfig::new();
let mut man = subsystem::Manager::new("rpctest");
let shutdown = man.make_shutdown_trigger();
let handle = man.add_subsystem(
"chainstate",
crate::make_chainstate(
Expand All @@ -354,11 +355,11 @@ mod test {
)
.unwrap(),
);
let _ = man.add_subsystem_with_custom_eventloop(
"test",
move |_: subsystem::CallRequest<()>, _| proc(handle),
);
man.main().await;
let tester = tokio::spawn(async move {
proc(handle);
shutdown.initiate();
});
let _ = tokio::join!(man.main(), tester);
}

#[tokio::test]
Expand Down
Loading
Loading