Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
remove connected disconnected state, 3rd attempt (#3898)
Browse files Browse the repository at this point in the history
* overseer: remove mut in connector

* rename SelectRelayChainWFallback -> SelectRelayChain

* split Basics

* introduce the OverseerConnector, use it

* introduce is_relay_chain to RelayChainSelection

* chore: rename var

* avoid dummy import in subsystem

* actually remove Disconnecte/Connected enum

* extract DummySubsystem into mod dummy.

* Handle::Connected -> Handle::new

* chore: fmt

* fix test

* select relay chain takes no arg, simplification

* fmt

* Update node/service/src/lib.rs

Co-authored-by: Andronik Ordian <write@reusable.software>

* chore: improve malus tests

* avoid the deferred setting of `is_relay_chain` in `RelayChainSelection`

* positive assertion is not mandated, only the negative one, to avoid a stall

* chore: fmt

* assure the `RelayChainSelection` is not used before the overseer is up and running

Co-authored-by: Andronik Ordian <write@reusable.software>
  • Loading branch information
drahnr and ordian authored Sep 28, 2021
1 parent fdf3dfd commit e80340c
Show file tree
Hide file tree
Showing 18 changed files with 641 additions and 331 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion doc/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error>
where
Expand Down Expand Up @@ -213,7 +214,7 @@ impl OverseerGen for BehaveMaleficient {
),
);

Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
.map_err(|e| e.into())

// A builder pattern will simplify this further
Expand Down
94 changes: 72 additions & 22 deletions node/malus/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use polkadot_node_subsystem_test_helpers::*;

use polkadot_node_subsystem::{
messages::{AllMessages, AvailabilityStoreMessage},
overseer::{gen::TimeoutExt, Subsystem},
DummySubsystem,
overseer::{dummy::DummySubsystem, gen::TimeoutExt, Subsystem},
SubsystemError,
};

#[derive(Clone, Debug)]
Expand All @@ -48,34 +48,38 @@ where
}
}

#[derive(Clone, Debug)]
struct PassInterceptor;

impl<Sender> MessageInterceptor<Sender> for PassInterceptor
where
Sender: overseer::SubsystemSender<AllMessages>
+ overseer::SubsystemSender<AvailabilityStoreMessage>
+ Clone
+ 'static,
{
type Message = AvailabilityStoreMessage;
}

async fn overseer_send<T: Into<AllMessages>>(overseer: &mut TestSubsystemContextHandle<T>, msg: T) {
overseer.send(FromOverseer::Communication { msg }).await;
}

#[test]
fn integrity_test() {
fn launch_harness<F, M, Sub, G>(test_gen: G)
where
F: Future<Output = TestSubsystemContextHandle<M>> + Send,
M: Into<AllMessages> + std::fmt::Debug + Send + 'static,
AllMessages: From<M>,
Sub: Subsystem<TestSubsystemContext<M, sp_core::testing::TaskExecutor>, SubsystemError>,
G: Fn(TestSubsystemContextHandle<M>) -> (F, Sub),
{
let pool = sp_core::testing::TaskExecutor::new();
let (context, mut overseer) = make_subsystem_context(pool);

let sub = DummySubsystem;

let sub_intercepted = InterceptedSubsystem::new(sub, BlackHoleInterceptor);
let (context, overseer) = make_subsystem_context(pool);

// Try to send a message we know is going to be filtered.
let test_fut = async move {
let (tx, rx) = futures::channel::oneshot::channel();
overseer_send(
&mut overseer,
AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx),
)
.await;
let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap();
overseer
};
let (test_fut, subsystem) = test_gen(overseer);
let subsystem = async move {
sub_intercepted.start(context).future.await.unwrap();
subsystem.start(context).future.await.unwrap();
};

futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);

Expand All @@ -88,3 +92,49 @@ fn integrity_test() {
))
.1;
}

#[test]
fn integrity_test_intercept() {
launch_harness(|mut overseer| {
let sub = DummySubsystem;

let sub_intercepted = InterceptedSubsystem::new(sub, BlackHoleInterceptor);

(
async move {
let (tx, rx) = futures::channel::oneshot::channel();
overseer_send(
&mut overseer,
AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx),
)
.await;
let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap();
overseer
},
sub_intercepted,
)
})
}

#[test]
fn integrity_test_pass() {
launch_harness(|mut overseer| {
let sub = DummySubsystem;

let sub_intercepted = InterceptedSubsystem::new(sub, PassInterceptor);

(
async move {
let (tx, rx) = futures::channel::oneshot::channel();
overseer_send(
&mut overseer,
AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx),
)
.await;
let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap();
overseer
},
sub_intercepted,
)
})
}
5 changes: 3 additions & 2 deletions node/malus/src/variant-a.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use polkadot_cli::{
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateValidationMessage},
overseer::{self, OverseerHandle},
overseer::{self, OverseerConnector, OverseerHandle},
FromOverseer,
};

Expand Down Expand Up @@ -86,6 +86,7 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
where
Expand Down Expand Up @@ -113,7 +114,7 @@ impl OverseerGen for BehaveMaleficient {
},
);

Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
.map_err(|e| e.into())
}
}
Expand Down
14 changes: 11 additions & 3 deletions node/overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use polkadot_node_subsystem_types::messages::{
use polkadot_overseer::{
self as overseer,
gen::{FromOverseer, SpawnedSubsystem},
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerSignal, SubsystemError,
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerConnector,
OverseerSignal, SubsystemError,
};
use polkadot_primitives::v1::Hash;

Expand Down Expand Up @@ -173,8 +174,15 @@ fn main() {
.replace_candidate_validation(|_| Subsystem2)
.replace_candidate_backing(|orig| orig);

let (overseer, _handle) =
Overseer::new(vec![], all_subsystems, None, AlwaysSupportsParachains, spawner).unwrap();
let (overseer, _handle) = Overseer::new(
vec![],
all_subsystems,
None,
AlwaysSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;

Expand Down
6 changes: 5 additions & 1 deletion node/overseer/overseer-gen/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,13 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
&mut self.handle
}
/// Obtain access to the overseer handle.
pub fn as_handle(&mut self) -> &#handle {
pub fn as_handle(&self) -> &#handle {
&self.handle
}
/// Obtain a clone of the handle.
pub fn handle(&self) -> #handle {
self.handle.clone()
}
}

impl ::std::default::Default for #connector {
Expand Down
54 changes: 54 additions & 0 deletions node/overseer/src/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{AllMessages, OverseerSignal};
use polkadot_node_subsystem_types::errors::SubsystemError;
use polkadot_overseer_gen::{FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext};

/// A dummy subsystem that implements [`Subsystem`] for all
/// types of messages. Used for tests or as a placeholder.
#[derive(Clone, Copy, Debug)]
pub struct DummySubsystem;

impl<Context> Subsystem<Context, SubsystemError> for DummySubsystem
where
Context: SubsystemContext<
Signal = OverseerSignal,
Error = SubsystemError,
AllMessages = AllMessages,
>,
{
fn start(self, mut ctx: Context) -> SpawnedSubsystem<SubsystemError> {
let future = Box::pin(async move {
loop {
match ctx.recv().await {
Err(_) => return Ok(()),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(overseer_msg) => {
tracing::debug!(
target: "dummy-subsystem",
"Discarding a message sent from overseer {:?}",
overseer_msg
);
continue
},
}
}
});

SpawnedSubsystem { name: "dummy-subsystem", future }
}
}
Loading

0 comments on commit e80340c

Please sign in to comment.