Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Adding Dispute Participation Metrics #6838

Merged
merged 15 commits into from
Mar 11, 2023
Merged
2 changes: 1 addition & 1 deletion node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Initialized {
let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem;

let (participation_sender, participation_receiver) = mpsc::channel(1);
let participation = Participation::new(participation_sender);
let participation = Participation::new(participation_sender, metrics.clone());
let highest_session = rolling_session_window.latest_session();

Self {
Expand Down
68 changes: 68 additions & 0 deletions node/core/dispute-coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ struct MetricsInner {
vote_cleanup_time: prometheus::Histogram,
/// Number of refrained participations.
refrained_participations: prometheus::Counter<prometheus::U64>,
/// Distribution of participation durations.
participation_durations: prometheus::Histogram,
/// Measures the duration of the full participation pipeline: From when
/// a participation request is first queued to when participation in the
/// requested dispute is complete.
participation_pipeline_durations: prometheus::Histogram,
/// Size of participation priority queue
participation_priority_queue_size: prometheus::Gauge<prometheus::U64>,
/// Size of participation best effort queue
participation_best_effort_queue_size: prometheus::Gauge<prometheus::U64>,
}

/// Candidate validation metrics.
Expand Down Expand Up @@ -96,6 +106,36 @@ impl Metrics {
metrics.refrained_participations.inc();
}
}

/// Provide a timer for participation durations which updates on drop.
pub(crate) fn time_participation(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.participation_durations.start_timer())
}

/// Provide a timer for participation pipeline durations which updates on drop.
pub(crate) fn time_participation_pipeline(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0
.as_ref()
.map(|metrics| metrics.participation_pipeline_durations.start_timer())
}

/// Set the priority_queue_size metric
pub fn report_priority_queue_size(&self, size: u64) {
if let Some(metrics) = &self.0 {
metrics.participation_priority_queue_size.set(size);
}
}

/// Set the best_effort_queue_size metric
pub fn report_best_effort_queue_size(&self, size: u64) {
if let Some(metrics) = &self.0 {
metrics.participation_best_effort_queue_size.set(size);
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -163,6 +203,34 @@ impl metrics::Metrics for Metrics {
))?,
registry,
)?,
participation_durations: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_participation_durations",
"Time spent within fn Participation::participate",
sandreim marked this conversation as resolved.
Show resolved Hide resolved
)
)?,
registry,
)?,
participation_pipeline_durations: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_participation_pipeline_durations",
"Measures the duration of the full participation pipeline: From when a participation request is first queued to when participation in the requested dispute is complete.",
)
)?,
registry,
)?,
participation_priority_queue_size: prometheus::register(
prometheus::Gauge::new("polkadot_parachain_dispute_priority_queue_size",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should the metric names be updated to match the new variable names? e.g. "polkadot_parachain_dispute_participation_priority_queue_size"

"Number of disputes waiting for local participation in the priority queue.")?,
registry,
)?,
participation_best_effort_queue_size: prometheus::register(
prometheus::Gauge::new("polkadot_parachain_dispute_best_effort_queue_size",
"Number of disputes waiting for local participation in the best effort queue.")?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
Expand Down
35 changes: 28 additions & 7 deletions node/core/dispute-coordinator/src/participation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ mod queues;
use queues::Queues;
pub use queues::{ParticipationPriority, ParticipationRequest, QueueError};

use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;

/// How many participation processes do we want to run in parallel the most.
///
/// This should be a relatively low value, while we might have a speedup once we fetched the data,
Expand All @@ -69,6 +72,8 @@ pub struct Participation {
worker_sender: WorkerMessageSender,
/// Some recent block for retrieving validation code from chain.
recent_block: Option<(BlockNumber, Hash)>,
/// Metrics handle cloned from Initialized
metrics: Metrics,
}

/// Message from worker tasks.
Expand Down Expand Up @@ -133,12 +138,13 @@ impl Participation {
/// The passed in sender will be used by background workers to communicate back their results.
/// The calling context should make sure to call `Participation::on_worker_message()` for the
/// received messages.
pub fn new(sender: WorkerMessageSender) -> Self {
pub fn new(sender: WorkerMessageSender, metrics: Metrics) -> Self {
Self {
running_participations: HashSet::new(),
queue: Queues::new(),
queue: Queues::new(metrics.clone()),
eskimor marked this conversation as resolved.
Show resolved Hide resolved
worker_sender: sender,
recent_block: None,
metrics,
}
}

Expand All @@ -154,19 +160,21 @@ impl Participation {
priority: ParticipationPriority,
req: ParticipationRequest,
) -> Result<()> {
let request_timer = self.metrics.time_participation_pipeline();

// Participation already running - we can ignore that request:
if self.running_participations.contains(req.candidate_hash()) {
return Ok(())
}
// Available capacity - participate right away (if we already have a recent block):
if let Some((_, h)) = self.recent_block {
if self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS {
self.fork_participation(ctx, req, h)?;
self.fork_participation(ctx, req, h, request_timer)?;
return Ok(())
}
}
// Out of capacity/no recent block yet - queue:
self.queue.queue(ctx.sender(), priority, req).await
self.queue.queue(ctx.sender(), priority, req, request_timer).await
}

/// Message from a worker task was received - get the outcome.
Expand Down Expand Up @@ -235,8 +243,9 @@ impl Participation {
recent_head: Hash,
) -> FatalResult<()> {
while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS {
if let Some(req) = self.queue.dequeue() {
self.fork_participation(ctx, req, recent_head)?;
let (maybe_req, maybe_timer) = self.queue.dequeue();
if let Some(req) = maybe_req {
self.fork_participation(ctx, req, recent_head, maybe_timer)?;
} else {
break
}
Expand All @@ -250,12 +259,22 @@ impl Participation {
ctx: &mut Context,
req: ParticipationRequest,
recent_head: Hash,
request_timer: Option<prometheus::HistogramTimer>,
) -> FatalResult<()> {
let participation_timer = self.metrics.time_participation();
if self.running_participations.insert(*req.candidate_hash()) {
let sender = ctx.sender().clone();
ctx.spawn(
"participation-worker",
participate(self.worker_sender.clone(), sender, recent_head, req).boxed(),
participate(
self.worker_sender.clone(),
sender,
recent_head,
req,
request_timer,
participation_timer,
)
.boxed(),
)
.map_err(FatalError::SpawnFailed)?;
}
Expand All @@ -268,6 +287,8 @@ async fn participate(
mut sender: impl overseer::DisputeCoordinatorSenderTrait,
block_hash: Hash,
req: ParticipationRequest,
_request_timer: Option<prometheus::HistogramTimer>, // Sends metric data when dropped
_participation_timer: Option<prometheus::HistogramTimer>, // Sends metric data when dropped
) {
#[cfg(test)]
// Hack for tests, so we get recovery messages not too early.
Expand Down
58 changes: 50 additions & 8 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use crate::{
LOG_TARGET,
};

use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -56,6 +59,14 @@ pub struct Queues {

/// Priority queue.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,

/// Timer handle for each participation request. Stored to measure full request
/// completion time. Optimally these would have been stored in the participation
/// request itself, but HistogramTimer doesn't implement the Clone trait.
request_timers: BTreeMap<CandidateComparator, Option<prometheus::HistogramTimer>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HistogramTimer doesn't implement the Clone trait.

I think you can put it behind a smart pointer like Rc. Might simplify the code. 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rc doesn't work because we have multithreaded tokio runtime, we would need Arc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Placed the timer in ParticipationRequest. It does indeed simplify the changes 👍 Please check the refactor when you have a moment!


/// Handle for recording queues data in metrics
metrics: Metrics,
}

/// A dispute participation request that can be queued.
Expand Down Expand Up @@ -128,8 +139,13 @@ impl ParticipationRequest {

impl Queues {
/// Create new `Queues`.
pub fn new() -> Self {
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() }
pub fn new(metrics: Metrics) -> Self {
Self {
best_effort: BTreeMap::new(),
priority: BTreeMap::new(),
request_timers: BTreeMap::new(),
metrics,
}
}

/// Will put message in queue, either priority or best effort depending on priority.
Expand All @@ -143,20 +159,35 @@ impl Queues {
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
priority: ParticipationPriority,
req: ParticipationRequest,
timer: Option<prometheus::HistogramTimer>,
) -> Result<()> {
let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?;

self.queue_with_comparator(comparator, priority, req)?;
self.queue_with_comparator(comparator, priority, req, timer)?;
Ok(())
}

/// Get the next best request for dispute participation if any.
/// First the priority queue is considered and then the best effort one.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
return Some(req.1)
/// We also get the corresponding request timer, if any.
pub fn dequeue(
&mut self,
) -> (Option<ParticipationRequest>, Option<prometheus::HistogramTimer>) {
if let Some((comp, req)) = self.pop_priority() {
self.metrics.report_priority_queue_size(self.priority.len() as u64);
if let Some(maybe_timer) = self.request_timers.remove(&comp) {
return (Some(req), maybe_timer)
}
return (Some(req), None)
}
if let Some((comp, req)) = self.pop_best_effort() {
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
if let Some(maybe_timer) = self.request_timers.remove(&comp) {
return (Some(req), maybe_timer)
}
return (Some(req), None)
}
self.pop_best_effort().map(|d| d.1)
(None, None)
}

/// Reprioritizes any participation requests pertaining to the
Expand All @@ -180,6 +211,9 @@ impl Queues {
}
if let Some(request) = self.best_effort.remove(&comparator) {
self.priority.insert(comparator, request);
// Report changes to both queue sizes
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
}
Expand All @@ -189,14 +223,20 @@ impl Queues {
comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest,
timer: Option<prometheus::HistogramTimer>,
) -> std::result::Result<(), QueueError> {
if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
// Remove any best effort entry:
self.best_effort.remove(&comparator);
if let None = self.best_effort.remove(&comparator) {
// Only insert new timer if request wasn't in either queue
self.request_timers.insert(comparator, timer);
}
self.priority.insert(comparator, req);
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
} else {
if self.priority.contains_key(&comparator) {
// The candidate is already in priority queue - don't
Expand All @@ -207,6 +247,8 @@ impl Queues {
return Err(QueueError::BestEffortFull)
}
self.best_effort.insert(comparator, req);
self.request_timers.insert(comparator, timer);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
}
Expand Down
Loading