Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Per subsystem CPU usage tracking #4239

Merged
merged 28 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4eafe81
SubsystemContext: add subsystem name str
sandreim Nov 5, 2021
255c147
Overseer builder proc macro changes
sandreim Nov 5, 2021
6dca05c
Update ToOverseer enum
sandreim Nov 5, 2021
c0a332c
Assign subsystem names to orphan tasks
sandreim Nov 5, 2021
26a6aac
cargo fmt
sandreim Nov 5, 2021
5afa046
SubsystemContext: add subsystem name str
sandreim Nov 5, 2021
c88911c
Overseer builder proc macro changes
sandreim Nov 5, 2021
ba6f57a
Update ToOverseer enum
sandreim Nov 5, 2021
710941e
Assign subsystem names to orphan tasks
sandreim Nov 5, 2021
4a3bbbd
cargo fmt
sandreim Nov 5, 2021
d7a4a5e
Rebase changes for new spawn() group param
sandreim Nov 8, 2021
61f27e6
Add subsystem constat in JobTrait
sandreim Nov 8, 2021
5494f18
Add subsystem string
sandreim Nov 8, 2021
266645a
Fix tests
sandreim Nov 8, 2021
38fe1e8
Fix spawn() calls
sandreim Nov 8, 2021
cdfe97a
cargo fmt
sandreim Nov 8, 2021
b9f986f
Merge branch 'master' of github.com:paritytech/polkadot into sandreim…
sandreim Nov 8, 2021
610abf4
Merge branch 'sandreim/per_subsystem_task_metrics' of github.com:pari…
sandreim Nov 8, 2021
81d8af5
Fix
sandreim Nov 8, 2021
95c50eb
Fix tests
sandreim Nov 8, 2021
8b8e5dc
fix
sandreim Nov 8, 2021
cf80a1f
Fix more tests
sandreim Nov 9, 2021
393fb66
Address PR review feedback #1
sandreim Nov 9, 2021
3b4d098
Merge branch 'master' of github.com:paritytech/polkadot into sandreim…
sandreim Nov 9, 2021
5fd25dd
Address PR review round 2
sandreim Nov 10, 2021
dae09bc
Fixes
sandreim Nov 10, 2021
57a2e32
Merge branch 'master' into sandreim/per_subsystem_task_metrics
ordian Nov 11, 2021
9cc4620
update Cargo.lock
ordian Nov 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
let mut task_sender = sender.clone();
let metrics = metrics.clone();
ctx.spawn(
"collation generation collation builder",
"collation-builder",
sandreim marked this conversation as resolved.
Show resolved Hide resolved
Box::pin(async move {
let persisted_validation_data_hash = validation_data.hash();

Expand Down
5 changes: 3 additions & 2 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,9 @@ impl Error {
fn trace(&self) {
match self {
// don't spam the log with spurious errors
Self::RuntimeApi(_) | Self::Oneshot(_) =>
tracing::debug!(target: LOG_TARGET, err = ?self),
Self::RuntimeApi(_) | Self::Oneshot(_) => {
tracing::debug!(target: LOG_TARGET, err = ?self)
},
// it's worth reporting otherwise
_ => tracing::warn!(target: LOG_TARGET, err = ?self),
}
Expand Down
15 changes: 9 additions & 6 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ impl CandidateBackingJob {
}
};
sender
.send_command(FromJobCommand::Spawn("Backing Validation", bg.boxed()))
.send_command(FromJobCommand::Spawn("backing-validation", bg.boxed()))
.await?;
}

Expand Down Expand Up @@ -900,11 +900,13 @@ impl CandidateBackingJob {
.await;

match confirmation_rx.await {
Err(oneshot::Canceled) =>
tracing::debug!(target: LOG_TARGET, "Dispute coordinator confirmation lost",),
Err(oneshot::Canceled) => {
tracing::debug!(target: LOG_TARGET, "Dispute coordinator confirmation lost",)
},
Ok(ImportStatementsResult::ValidImport) => {},
Ok(ImportStatementsResult::InvalidImport) =>
tracing::warn!(target: LOG_TARGET, "Failed to import statements of validity",),
Ok(ImportStatementsResult::InvalidImport) => {
tracing::warn!(target: LOG_TARGET, "Failed to import statements of validity",)
},
}
}

Expand Down Expand Up @@ -1168,7 +1170,8 @@ impl util::JobTrait for CandidateBackingJob {
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;

const NAME: &'static str = "CandidateBackingJob";
const NAME: &'static str = "candidate-backing-job";
const SUBSYSTEM: &'static str = "candidate-backing";

fn run<S: SubsystemSender>(
parent: Hash,
Expand Down
3 changes: 2 additions & 1 deletion node/core/bitfield-signing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ impl JobTrait for BitfieldSigningJob {
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;

const NAME: &'static str = "BitfieldSigningJob";
const NAME: &'static str = "bitfield-signing-job";
const SUBSYSTEM: &'static str = "bitfield-signing";

/// Run a job for the parent block indicated
fn run<S: SubsystemSender>(
Expand Down
3 changes: 2 additions & 1 deletion node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ impl JobTrait for ProvisioningJob {
type RunArgs = ();
type Metrics = Metrics;

const NAME: &'static str = "ProvisioningJob";
const NAME: &'static str = "provisioning-job";
const SUBSYSTEM: &'static str = "provisioner";

/// Run a job for the parent block indicated
//
Expand Down
14 changes: 12 additions & 2 deletions node/core/pvf/src/executor_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,21 @@ impl TaskExecutor {
}

impl sp_core::traits::SpawnNamed for TaskExecutor {
fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn_blocking(
&self,
_task_name: &'static str,
_subsystem_name: &'static str,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}

fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn(
&self,
_task_name: &'static str,
_subsystem_name: &'static str,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
}
Expand Down
4 changes: 2 additions & 2 deletions node/core/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ where
)
}
} else {
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, request);
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, "", request);
drahnr marked this conversation as resolved.
Show resolved Hide resolved
self.active_requests.push(receiver);
}
}
Expand All @@ -288,7 +288,7 @@ where
}

if let Some((req, recv)) = self.waiting_requests.pop_front() {
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, req);
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, "", req);
sandreim marked this conversation as resolved.
Show resolved Hide resolved
self.active_requests.push(recv);
}
}
Expand Down
1 change: 1 addition & 0 deletions node/jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl Jaeger {
// Spawn a background task that pulls span information and sends them on the network.
spawner.spawn(
"jaeger-collector",
"jaeger",
sandreim marked this conversation as resolved.
Show resolved Hide resolved
Box::pin(async move {
match async_std::net::UdpSocket::bind("0.0.0.0:0").await {
Ok(udp_socket) => loop {
Expand Down
6 changes: 4 additions & 2 deletions node/network/availability-distribution/src/tests/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ impl TestState {
// lock ;-)
let update_tx = tx.clone();
harness.pool.spawn(
"Sending active leaves updates",
"sending-active-leaves-updates",
"",
async move {
for update in updates {
overseer_signal(update_tx.clone(), OverseerSignal::ActiveLeaves(update)).await;
Expand Down Expand Up @@ -308,7 +309,8 @@ fn to_incoming_req(
let (tx, rx): (oneshot::Sender<netconfig::OutgoingResponse>, oneshot::Receiver<_>) =
oneshot::channel();
executor.spawn(
"Message forwarding",
"message-forwarding",
"",
async {
let response = rx.await;
let payload = response.expect("Unexpected canceled request").result;
Expand Down
2 changes: 1 addition & 1 deletion node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ where
awaiting: vec![response_sender],
});

if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)) {
if let Err(e) = ctx.spawn("recovery-interaction", Box::pin(remote)) {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
Expand Down
14 changes: 12 additions & 2 deletions node/overseer/overseer-gen/examples/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,21 @@ struct Xxx {
struct DummySpawner;

impl SpawnNamed for DummySpawner {
fn spawn_blocking(&self, name: &'static str, _future: futures::future::BoxFuture<'static, ()>) {
fn spawn_blocking(
&self,
_task_name: &'static str,
_subsystem_name: &'static str,
_future: futures::future::BoxFuture<'static, ()>,
) {
unimplemented!("spawn blocking {}", name)
}

fn spawn(&self, name: &'static str, _future: futures::future::BoxFuture<'static, ()>) {
fn spawn(
&self,
_task_name: &'static str,
_subsystem_name: &'static str,
_future: futures::future::BoxFuture<'static, ()>,
) {
unimplemented!("spawn {}", name)
}
}
Expand Down
23 changes: 16 additions & 7 deletions node/overseer/overseer-gen/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
// TODO generate a builder pattern that ensures this
// TODO https://github.com/paritytech/polkadot/issues/3427
let #subsystem_name = match self. #subsystem_name {
FieldInitMethod::Fn(func) => func(handle.clone())?,
FieldInitMethod::Fn(func) => func(handle.clone())?,
FieldInitMethod::Value(val) => val,
FieldInitMethod::Uninitialized =>
panic!("All subsystems must exist with the builder pattern."),
Expand All @@ -349,11 +349,18 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#channel_name_rx, #channel_name_unbounded_rx
);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(SIGNAL_CHANNEL_CAPACITY);

// Generate subsystem name based on overseer field name.
let mut subsystem_string = String::from(stringify!(#subsystem_name));
// Convert owned `snake case` string to a `kebab case` static str.
let subsystem_static_str = Box::leak(subsystem_string.replace("_", "-").into_boxed_str());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a fan of this, but seems correct


let ctx = #subsyste_ctx_name::< #consumes >::new(
signal_rx,
message_rx,
channels_out.clone(),
to_overseer_tx.clone(),
subsystem_static_str
);

let #subsystem_name: OverseenSubsystem< #consumes > =
Expand All @@ -364,6 +371,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
unbounded_meter,
ctx,
#subsystem_name,
subsystem_static_str,
&mut running_subsystems,
)?;
)*
Expand Down Expand Up @@ -489,22 +497,22 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
/// Task kind to launch.
pub trait TaskKind {
/// Spawn a task, it depends on the implementer if this is blocking or not.
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>);
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>);
}

#[allow(missing_docs)]
struct Regular;
impl TaskKind for Regular {
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn(name, future)
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn(task_name, subsystem_name, future)
}
}

#[allow(missing_docs)]
struct Blocking;
impl TaskKind for Blocking {
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn_blocking(name, future)
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn(task_name, subsystem_name, future)
}
}

Expand All @@ -517,6 +525,7 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
unbounded_meter: #support_crate ::metered::Meter,
ctx: Ctx,
s: SubSys,
subsystem_name: &'static str,
futures: &mut #support_crate ::FuturesUnordered<BoxFuture<'static, ::std::result::Result<(), #error_ty> >>,
) -> ::std::result::Result<OverseenSubsystem<M>, #error_ty >
where
Expand All @@ -540,7 +549,7 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
let _ = tx.send(());
});

<TK as TaskKind>::launch_task(spawner, name, fut);
<TK as TaskKind>::launch_task(spawner, name, subsystem_name, fut);

futures.push(Box::pin(
rx.map(|e| {
Expand Down
9 changes: 9 additions & 0 deletions node/overseer/overseer-gen/proc-macro/src/impl_misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
>,
signals_received: SignalsReceived,
pending_incoming: Option<(usize, M)>,
name: &'static str
}

impl<M> #subsystem_ctx_name<M> {
Expand All @@ -121,6 +122,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
messages: SubsystemIncomingMessages<M>,
to_subsystems: ChannelsOut,
to_overseer: #support_crate ::metered::UnboundedMeteredSender<#support_crate:: ToOverseer>,
name: &'static str
) -> Self {
let signals_received = SignalsReceived::default();
#subsystem_ctx_name {
Expand All @@ -133,8 +135,13 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
to_overseer,
signals_received,
pending_incoming: None,
name
}
}

fn name(&self) -> &'static str {
self.name
}
}

#[#support_crate ::async_trait]
Expand Down Expand Up @@ -229,6 +236,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
{
self.to_overseer.unbounded_send(#support_crate ::ToOverseer::SpawnJob {
name,
subsystem: self.name(),
s,
}).map_err(|_| #support_crate ::OverseerError::TaskSpawn(name))?;
Ok(())
Expand All @@ -239,6 +247,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
{
self.to_overseer.unbounded_send(#support_crate ::ToOverseer::SpawnBlockingJob {
name,
subsystem: self.name(),
s,
}).map_err(|_| #support_crate ::OverseerError::TaskSpawn(name))?;
Ok(())
Expand Down
12 changes: 10 additions & 2 deletions node/overseer/overseer-gen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ pub enum ToOverseer {
SpawnJob {
/// Name of the task to spawn which be shown in jaeger and tracing logs.
name: &'static str,
/// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
subsystem: &'static str,
/// The future to execute.
s: BoxFuture<'static, ()>,
},
Expand All @@ -120,6 +122,8 @@ pub enum ToOverseer {
SpawnBlockingJob {
/// Name of the task to spawn which be shown in jaeger and tracing logs.
name: &'static str,
/// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
subsystem: &'static str,
/// The future to execute.
s: BoxFuture<'static, ()>,
},
Expand All @@ -128,8 +132,12 @@ pub enum ToOverseer {
impl fmt::Debug for ToOverseer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SpawnJob { name, .. } => writeln!(f, "SpawnJob{{ {}, ..}}", name),
Self::SpawnBlockingJob { name, .. } => writeln!(f, "SpawnBlockingJob{{ {}, ..}}", name),
Self::SpawnJob { name, subsystem, .. } => {
writeln!(f, "SpawnJob{{ {}, {} ..}}", name, subsystem)
},
Self::SpawnBlockingJob { name, subsystem, .. } => {
writeln!(f, "SpawnBlockingJob{{ {}, {} ..}}", name, subsystem)
},
}
}
}
Expand Down
29 changes: 20 additions & 9 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,8 @@ where

futures::future::ready(())
});
overseer.spawner().spawn("metrics_metronome", Box::pin(metronome));
overseer.spawner().spawn("metrics-metronome", "overseer", Box::pin(metronome));

Ok(())
}

Expand Down Expand Up @@ -616,11 +617,11 @@ where
},
msg = self.to_overseer_rx.select_next_some() => {
match msg {
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
ToOverseer::SpawnJob { name, subsystem, s } => {
self.spawn_job(name, subsystem, s);
}
ToOverseer::SpawnBlockingJob { name, s } => {
self.spawn_blocking_job(name, s);
ToOverseer::SpawnBlockingJob { name, subsystem, s } => {
self.spawn_blocking_job(name, subsystem, s);
}
}
},
Expand Down Expand Up @@ -772,11 +773,21 @@ where
}
}

fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.spawner.spawn(name, j);
fn spawn_job(
&mut self,
task_name: &'static str,
subsystem_name: &'static str,
j: BoxFuture<'static, ()>,
) {
self.spawner.spawn(task_name, subsystem_name, j);
}

fn spawn_blocking_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.spawner.spawn_blocking(name, j);
fn spawn_blocking_job(
&mut self,
task_name: &'static str,
subsystem_name: &'static str,
j: BoxFuture<'static, ()>,
) {
self.spawner.spawn_blocking(task_name, subsystem_name, j);
}
}
Loading