Skip to content

Commit

Permalink
adds metrics to crossbeam arms (solana-labs#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
segfaultdoc authored Oct 15, 2023
1 parent 71df5c1 commit 2843edc
Showing 1 changed file with 128 additions and 1 deletion.
129 changes: 128 additions & 1 deletion relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ struct RelayerMetrics {
pub num_try_send_channel_full: u64,
pub packet_latencies_us: Histogram,

pub crossbeam_slot_receiver_processing_us: Histogram,
pub crossbeam_delay_packet_receiver_processing_us: Histogram,
pub crossbeam_subscription_receiver_processing_us: Histogram,
pub crossbeam_heartbeat_tick_processing_us: Histogram,
pub crossbeam_metrics_tick_processing_us: Histogram,

// channel stats
pub slot_receiver_max_len: usize,
pub slot_receiver_capacity: usize,
Expand All @@ -81,7 +87,12 @@ impl RelayerMetrics {
max_heartbeat_tick_latency_us: 0,
metrics_latency_us: 0,
num_try_send_channel_full: 0,
packet_latencies_us: Default::default(),
packet_latencies_us: Histogram::default(),
crossbeam_slot_receiver_processing_us: Histogram::default(),
crossbeam_delay_packet_receiver_processing_us: Histogram::default(),
crossbeam_subscription_receiver_processing_us: Histogram::default(),
crossbeam_heartbeat_tick_processing_us: Histogram::default(),
crossbeam_metrics_tick_processing_us: Histogram::default(),
slot_receiver_max_len: 0,
slot_receiver_capacity,
subscription_receiver_max_len: 0,
Expand Down Expand Up @@ -202,6 +213,112 @@ impl RelayerMetrics {
.unwrap_or_default(),
i64
),
// crossbeam arm latencies
(
"crossbeam_subscription_receiver_processing_us_p50",
self.crossbeam_subscription_receiver_processing_us
.percentile(50.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_subscription_receiver_processing_us_p90",
self.crossbeam_subscription_receiver_processing_us
.percentile(90.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_subscription_receiver_processing_us_p99",
self.crossbeam_subscription_receiver_processing_us
.percentile(99.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_slot_receiver_processing_us_p50",
self.crossbeam_slot_receiver_processing_us
.percentile(50.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_slot_receiver_processing_us_p90",
self.crossbeam_slot_receiver_processing_us
.percentile(90.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_slot_receiver_processing_us_p99",
self.crossbeam_slot_receiver_processing_us
.percentile(99.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_metrics_tick_processing_us_p50",
self.crossbeam_metrics_tick_processing_us
.percentile(50.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_metrics_tick_processing_us_p90",
self.crossbeam_metrics_tick_processing_us
.percentile(90.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_metrics_tick_processing_us_p99",
self.crossbeam_metrics_tick_processing_us
.percentile(99.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_delay_packet_receiver_processing_us_p50",
self.crossbeam_delay_packet_receiver_processing_us
.percentile(50.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_delay_packet_receiver_processing_us_p90",
self.crossbeam_delay_packet_receiver_processing_us
.percentile(90.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_delay_packet_receiver_processing_us_p99",
self.crossbeam_delay_packet_receiver_processing_us
.percentile(99.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_heartbeat_tick_processing_us_p50",
self.crossbeam_heartbeat_tick_processing_us
.percentile(50.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_heartbeat_tick_processing_us_p90",
self.crossbeam_heartbeat_tick_processing_us
.percentile(90.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_heartbeat_tick_processing_us_p99",
self.crossbeam_heartbeat_tick_processing_us
.percentile(99.0)
.unwrap_or_default(),
i64
),
// channel lengths
("slot_receiver_len", self.slot_receiver_max_len, i64),
("slot_receiver_capacity", self.slot_receiver_capacity, i64),
Expand Down Expand Up @@ -377,16 +494,23 @@ impl RelayerImpl {
while !exit.load(Ordering::Relaxed) {
crossbeam_channel::select! {
recv(slot_receiver) -> maybe_slot => {
let start = Instant::now();
Self::update_highest_slot(maybe_slot, &mut highest_slot, &mut relayer_metrics)?;
let _ = relayer_metrics.crossbeam_slot_receiver_processing_us.increment(start.elapsed().as_micros() as u64);
},
recv(delay_packet_receiver) -> maybe_packet_batches => {
let start = Instant::now();
let failed_forwards = Self::forward_packets(maybe_packet_batches, &packet_subscriptions, &leader_schedule_cache, &highest_slot, &leader_lookahead, &mut relayer_metrics)?;
Self::drop_connections(failed_forwards, &packet_subscriptions, &mut relayer_metrics);
let _ = relayer_metrics.crossbeam_delay_packet_receiver_processing_us.increment(start.elapsed().as_micros() as u64);
},
recv(subscription_receiver) -> maybe_subscription => {
let start = Instant::now();
Self::handle_subscription(maybe_subscription, &packet_subscriptions, &mut relayer_metrics)?;
let _ = relayer_metrics.crossbeam_subscription_receiver_processing_us.increment(start.elapsed().as_micros() as u64);
}
recv(heartbeat_tick) -> time_generated => {
let start = Instant::now();
if let Ok(time_generated) = time_generated {
relayer_metrics.max_heartbeat_tick_latency_us = std::cmp::max(relayer_metrics.max_heartbeat_tick_latency_us, Instant::now().duration_since(time_generated).as_micros() as u64);
}
Expand All @@ -402,8 +526,10 @@ impl RelayerImpl {
HealthState::Unhealthy => packet_subscriptions.read().unwrap().keys().cloned().collect(),
};
Self::drop_connections(pubkeys_to_drop, &packet_subscriptions, &mut relayer_metrics);
let _ = relayer_metrics.crossbeam_heartbeat_tick_processing_us.increment(start.elapsed().as_micros() as u64);
}
recv(metrics_tick) -> time_generated => {
let start = Instant::now();
let l_packet_subscriptions = packet_subscriptions.read().unwrap();
relayer_metrics.num_current_connections = l_packet_subscriptions.len() as u64;
relayer_metrics.update_packet_subscription_total_capacity(&l_packet_subscriptions);
Expand All @@ -412,6 +538,7 @@ impl RelayerImpl {
if let Ok(time_generated) = time_generated {
relayer_metrics.metrics_latency_us = time_generated.elapsed().as_micros() as u64;
}
let _ = relayer_metrics.crossbeam_metrics_tick_processing_us.increment(start.elapsed().as_micros() as u64);

relayer_metrics.report();
relayer_metrics = RelayerMetrics::new(
Expand Down

0 comments on commit 2843edc

Please sign in to comment.