Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Adding Dispute Participation Metrics #6838

Merged
merged 15 commits into from
Mar 11, 2023
Merged
9 changes: 7 additions & 2 deletions node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Initialized {
let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem;

let (participation_sender, participation_receiver) = mpsc::channel(1);
let participation = Participation::new(participation_sender);
let participation = Participation::new(participation_sender, metrics.clone());
let highest_session = rolling_session_window.latest_session();

Self {
Expand Down Expand Up @@ -916,12 +916,17 @@ impl Initialized {
} else {
self.metrics.on_queued_best_effort_participation();
}
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
let r = self
.participation
.queue_participation(
ctx,
priority,
ParticipationRequest::new(new_state.candidate_receipt().clone(), session),
ParticipationRequest::new(
new_state.candidate_receipt().clone(),
session,
request_timer,
),
)
.await;
log_error(r)?;
Expand Down
2 changes: 2 additions & 0 deletions node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,13 @@ impl DisputeCoordinatorSubsystem {
?candidate_hash,
"Found valid dispute, with no vote from us on startup - participating."
);
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
gilescope marked this conversation as resolved.
Show resolved Hide resolved
participation_requests.push((
ParticipationPriority::with_priority_if(is_included),
ParticipationRequest::new(
vote_state.votes().candidate_receipt.clone(),
session,
request_timer,
),
));
}
Expand Down
68 changes: 68 additions & 0 deletions node/core/dispute-coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ struct MetricsInner {
vote_cleanup_time: prometheus::Histogram,
/// Number of refrained participations.
refrained_participations: prometheus::Counter<prometheus::U64>,
/// Distribution of participation durations.
participation_durations: prometheus::Histogram,
/// Measures the duration of the full participation pipeline: From when
/// a participation request is first queued to when participation in the
/// requested dispute is complete.
participation_pipeline_durations: prometheus::Histogram,
/// Size of participation priority queue
participation_priority_queue_size: prometheus::Gauge<prometheus::U64>,
/// Size of participation best effort queue
participation_best_effort_queue_size: prometheus::Gauge<prometheus::U64>,
}

/// Candidate validation metrics.
Expand Down Expand Up @@ -96,6 +106,36 @@ impl Metrics {
metrics.refrained_participations.inc();
}
}

/// Provide a timer for participation durations which updates on drop.
pub(crate) fn time_participation(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.participation_durations.start_timer())
}

/// Provide a timer for participation pipeline durations which updates on drop.
pub(crate) fn time_participation_pipeline(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0
.as_ref()
.map(|metrics| metrics.participation_pipeline_durations.start_timer())
}

/// Set the priority_queue_size metric
pub fn report_priority_queue_size(&self, size: u64) {
if let Some(metrics) = &self.0 {
metrics.participation_priority_queue_size.set(size);
}
}

/// Set the best_effort_queue_size metric
pub fn report_best_effort_queue_size(&self, size: u64) {
if let Some(metrics) = &self.0 {
metrics.participation_best_effort_queue_size.set(size);
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -163,6 +203,34 @@ impl metrics::Metrics for Metrics {
))?,
registry,
)?,
participation_durations: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_participation_durations",
"Time spent within fn Participation::participate",
sandreim marked this conversation as resolved.
Show resolved Hide resolved
)
)?,
registry,
)?,
participation_pipeline_durations: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_participation_pipeline_durations",
"Measures the duration of the full participation pipeline: From when a participation request is first queued to when participation in the requested dispute is complete.",
)
)?,
registry,
)?,
participation_priority_queue_size: prometheus::register(
prometheus::Gauge::new("polkadot_parachain_dispute_participation_priority_queue_size",
"Number of disputes waiting for local participation in the priority queue.")?,
registry,
)?,
participation_best_effort_queue_size: prometheus::register(
prometheus::Gauge::new("polkadot_parachain_dispute_participation_best_effort_queue_size",
"Number of disputes waiting for local participation in the best effort queue.")?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
Expand Down
23 changes: 19 additions & 4 deletions node/core/dispute-coordinator/src/participation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ mod queues;
use queues::Queues;
pub use queues::{ParticipationPriority, ParticipationRequest, QueueError};

use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;

/// How many participation processes do we want to run in parallel the most.
///
/// This should be a relatively low value, while we might have a speedup once we fetched the data,
Expand All @@ -71,6 +74,8 @@ pub struct Participation {
worker_sender: WorkerMessageSender,
/// Some recent block for retrieving validation code from chain.
recent_block: Option<(BlockNumber, Hash)>,
/// Metrics handle cloned from Initialized
metrics: Metrics,
}

/// Message from worker tasks.
Expand Down Expand Up @@ -135,12 +140,13 @@ impl Participation {
/// The passed in sender will be used by background workers to communicate back their results.
/// The calling context should make sure to call `Participation::on_worker_message()` for the
/// received messages.
pub fn new(sender: WorkerMessageSender) -> Self {
pub fn new(sender: WorkerMessageSender, metrics: Metrics) -> Self {
Self {
running_participations: HashSet::new(),
queue: Queues::new(),
queue: Queues::new(metrics.clone()),
eskimor marked this conversation as resolved.
Show resolved Hide resolved
worker_sender: sender,
recent_block: None,
metrics,
}
}

Expand Down Expand Up @@ -253,11 +259,19 @@ impl Participation {
req: ParticipationRequest,
recent_head: Hash,
) -> FatalResult<()> {
let participation_timer = self.metrics.time_participation();
if self.running_participations.insert(*req.candidate_hash()) {
let sender = ctx.sender().clone();
ctx.spawn(
"participation-worker",
participate(self.worker_sender.clone(), sender, recent_head, req).boxed(),
participate(
self.worker_sender.clone(),
sender,
recent_head,
req,
participation_timer,
)
.boxed(),
)
.map_err(FatalError::SpawnFailed)?;
}
Expand All @@ -269,7 +283,8 @@ async fn participate(
mut result_sender: WorkerMessageSender,
mut sender: impl overseer::DisputeCoordinatorSenderTrait,
block_hash: Hash,
req: ParticipationRequest,
req: ParticipationRequest, // Sends metric data via request_timer field when dropped
_participation_timer: Option<prometheus::HistogramTimer>, // Sends metric data when dropped
) {
#[cfg(test)]
// Hack for tests, so we get recovery messages not too early.
Expand Down
60 changes: 53 additions & 7 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{cmp::Ordering, collections::BTreeMap};
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};

use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
Expand All @@ -25,6 +25,9 @@ use crate::{
LOG_TARGET,
};

use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -56,14 +59,18 @@ pub struct Queues {

/// Priority queue.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,

/// Handle for recording queues data in metrics
metrics: Metrics,
}

/// A dispute participation request that can be queued.
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Clone)]
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
pub struct ParticipationRequest {
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
_request_timer: Arc<Option<prometheus::HistogramTimer>>, // Sends metric data when request is dropped
}

/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
Expand Down Expand Up @@ -107,8 +114,17 @@ pub enum QueueError {

impl ParticipationRequest {
/// Create a new `ParticipationRequest` to be queued.
pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex) -> Self {
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session }
pub fn new(
candidate_receipt: CandidateReceipt,
session: SessionIndex,
request_timer: Arc<Option<prometheus::HistogramTimer>>,
) -> Self {
Self {
candidate_hash: candidate_receipt.hash(),
candidate_receipt,
session,
_request_timer: request_timer,
}
}

pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt {
Expand All @@ -126,10 +142,29 @@ impl ParticipationRequest {
}
}

// We want to compare participation requests in unit tests, so we
// only implement Eq for tests.
#[cfg(test)]
impl PartialEq for ParticipationRequest {
fn eq(&self, other: &Self) -> bool {
let ParticipationRequest {
candidate_receipt,
candidate_hash,
session: _session,
_request_timer,
} = self;
candidate_receipt == other.candidate_receipt() &&
candidate_hash == other.candidate_hash() &&
self.session == other.session()
}
}
#[cfg(test)]
impl Eq for ParticipationRequest {}

impl Queues {
/// Create new `Queues`.
pub fn new() -> Self {
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() }
pub fn new(metrics: Metrics) -> Self {
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), metrics }
}

/// Will put message in queue, either priority or best effort depending on priority.
Expand All @@ -154,9 +189,14 @@ impl Queues {
/// First the priority queue is considered and then the best effort one.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
self.metrics.report_priority_queue_size(self.priority.len() as u64);
return Some(req.1)
}
if let Some(req) = self.pop_best_effort() {
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
return Some(req.1)
}
self.pop_best_effort().map(|d| d.1)
None
}

/// Reprioritizes any participation requests pertaining to the
Expand All @@ -180,6 +220,9 @@ impl Queues {
}
if let Some(request) = self.best_effort.remove(&comparator) {
self.priority.insert(comparator, request);
// Report changes to both queue sizes
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
}
Expand All @@ -197,6 +240,8 @@ impl Queues {
// Remove any best effort entry:
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
} else {
if self.priority.contains_key(&comparator) {
// The candidate is already in priority queue - don't
Expand All @@ -207,6 +252,7 @@ impl Queues {
return Err(QueueError::BestEffortFull)
}
self.best_effort.insert(comparator, req);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
}
Expand Down
Loading