Skip to content

Commit

Permalink
Added observability of instance datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
cikzh committed Feb 9, 2024
1 parent 7af7e0c commit efeca5c
Show file tree
Hide file tree
Showing 29 changed files with 407 additions and 103 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 23 additions & 3 deletions statime-linux/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use rand::{rngs::StdRng, SeedableRng};
use statime::{
config::{ClockIdentity, InstanceConfig, SdoId, TimePropertiesDS, TimeSource},
filters::{Filter, KalmanConfiguration, KalmanFilter},
observability::ObservableInstanceState,
port::{
InBmca, Measurement, Port, PortAction, PortActionIterator, TimestampContext, MAX_DATA_LEN,
},
Expand Down Expand Up @@ -258,6 +259,16 @@ async fn actual_main() {
time_properties_ds,
)));

// The observer for the metrics exporter
let (instance_state_sender, instance_state_receiver) =
tokio::sync::watch::channel(ObservableInstanceState {
default_ds: instance.default_ds(),
current_ds: instance.current_ds(),
parent_ds: instance.parent_ds(),
time_properties_ds: instance.time_properties_ds(),
});
statime_linux::observer::spawn(&config, instance_state_receiver).await;

let (bmca_notify_sender, bmca_notify_receiver) = tokio::sync::watch::channel(false);

let mut main_task_senders = Vec::with_capacity(config.ports.len());
Expand Down Expand Up @@ -293,6 +304,7 @@ async fn actual_main() {
(LinuxClock::CLOCK_TAI, InterfaceTimestampMode::SoftwareAll)
}
};

let rng = StdRng::from_entropy();
let port = instance.add_port(
port_config.into(),
Expand Down Expand Up @@ -374,12 +386,10 @@ async fn actual_main() {
.expect("space in channel buffer");
}

// The observer for the metrics exporter
statime_linux::observer::spawn(&config.observability).await;

run(
instance,
bmca_notify_sender,
instance_state_sender,
main_task_receivers,
main_task_senders,
internal_sync_senders,
Expand All @@ -391,6 +401,7 @@ async fn actual_main() {
async fn run(
instance: &'static PtpInstance<KalmanFilter>,
bmca_notify_sender: tokio::sync::watch::Sender<bool>,
instance_state_sender: tokio::sync::watch::Sender<ObservableInstanceState>,
mut main_task_receivers: Vec<Receiver<BmcaPort>>,
main_task_senders: Vec<Sender<BmcaPort>>,
internal_sync_senders: Vec<tokio::sync::watch::Sender<ClockSyncMode>>,
Expand Down Expand Up @@ -430,6 +441,15 @@ async fn run(

instance.bmca(&mut mut_bmca_ports);

// Update instance state for observability
// We don't care if isn't anybody on the other side
let _ = instance_state_sender.send(ObservableInstanceState {
default_ds: instance.default_ds(),
current_ds: instance.current_ds(),
parent_ds: instance.parent_ds(),
time_properties_ds: instance.time_properties_ds(),
});

let mut clock_states = vec![ClockSyncMode::FromSystem; internal_sync_senders.len()];
for (idx, port) in mut_bmca_ports.iter().enumerate() {
if port.is_steering() {
Expand Down
113 changes: 92 additions & 21 deletions statime-linux/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,23 @@ use tokio::{
};

use crate::config::Config;
use statime::{
config::TimePropertiesDS,
observability::{default::DefaultDS, ObservableInstanceState},
};

#[derive(Debug, Serialize, Deserialize)]
pub struct ObservableState {
pub program: ProgramData,
pub instance: ObservableInstanceState,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ProgramData {
pub version: String,
pub build_commit: String,
pub build_commit_date: String,
pub uptime_seconds: f64,
}

impl ProgramData {
Expand All @@ -26,14 +39,6 @@ impl ProgramData {
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ProgramData {
pub version: String,
pub build_commit: String,
pub build_commit_date: String,
pub uptime_seconds: f64,
}

impl Default for ProgramData {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -154,16 +159,6 @@ struct Measurement<T> {
value: T,
}

#[allow(dead_code)]
impl<T> Measurement<T> {
fn simple(value: T) -> Vec<Measurement<T>> {
vec![Measurement {
labels: Default::default(),
value,
}]
}
}

#[derive(PartialEq, Eq, Clone, Copy)]
enum Unit {
Seconds,
Expand All @@ -190,10 +185,83 @@ impl MetricType {
}
}

fn format_default_ds(w: &mut impl std::fmt::Write, default_ds: &DefaultDS) -> std::fmt::Result {
let clock_identity = format!("{}", default_ds.clock_identity);

format_metric(
w,
"number_ports",
"The amount of ports assigned",
MetricType::Gauge,
None,
vec![Measurement {
labels: vec![("clock_identity", clock_identity.clone())],
value: default_ds.number_ports,
}],
)?;

format_metric(
w,
"quality_class",
"The PTP clock class",
MetricType::Gauge,
None,
vec![Measurement {
labels: vec![("clock_identity", clock_identity.clone())],
value: default_ds.clock_quality.clock_class,
}],
)?;

format_metric(
w,
"quality_accuracy",
"The quality of the clock",
MetricType::Gauge,
None,
vec![Measurement {
labels: vec![("clock_identity", clock_identity.clone())],
value: default_ds.clock_quality.clock_accuracy.to_primitive(),
}],
)?;

format_metric(
w,
"quality_offset_scaled_log_variance",
"2-log of the variance (in seconds^2) of the clock when not synchronized",
MetricType::Gauge,
None,
vec![Measurement {
labels: vec![("clock_identity", clock_identity.clone())],
value: default_ds.clock_quality.offset_scaled_log_variance,
}],
)?;

Ok(())
}

pub fn format_time_properties_ds(
w: &mut impl std::fmt::Write,
time_properties_ds: &TimePropertiesDS,
) -> std::fmt::Result {
format_metric(
w,
"current_utc_offset",
"Current offset from UTC",
MetricType::Gauge,
None,
vec![Measurement {
labels: vec![],
value: time_properties_ds.current_utc_offset.unwrap_or(0),
}],
)?;

Ok(())
}

pub fn format_state(w: &mut impl std::fmt::Write, state: &ObservableState) -> std::fmt::Result {
format_metric(
w,
"statime_uptime",
"uptime",
"The time that statime has been running",
MetricType::Gauge,
Some(Unit::Seconds),
Expand All @@ -207,6 +275,9 @@ pub fn format_state(w: &mut impl std::fmt::Write, state: &ObservableState) -> st
}],
)?;

format_default_ds(w, &state.instance.default_ds)?;
format_time_properties_ds(w, &state.instance.time_properties_ds)?;

w.write_str("# EOF\n")?;
Ok(())
}
Expand All @@ -220,9 +291,9 @@ fn format_metric<T: std::fmt::Display>(
measurements: Vec<Measurement<T>>,
) -> std::fmt::Result {
let name = if let Some(unit) = unit {
format!("{}_{}", name, unit.as_str())
format!("statime_{}_{}", name, unit.as_str())
} else {
name.to_owned()
format!("statime_{}", name)
};

// write help text
Expand Down
28 changes: 19 additions & 9 deletions statime-linux/src/observer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use statime::observability::ObservableInstanceState;
use std::{fs::Permissions, os::unix::prelude::PermissionsExt, path::Path, time::Instant};

use tokio::{io::AsyncWriteExt, net::UnixStream, task::JoinHandle};

use crate::metrics::exporter::{ObservableState, ProgramData};
use crate::{
config::Config,
metrics::exporter::{ObservableState, ProgramData},
};

pub async fn spawn(config: &super::config::ObservabilityConfig) -> JoinHandle<std::io::Result<()>> {
pub async fn spawn(
config: &Config,
instance_state_receiver: tokio::sync::watch::Receiver<ObservableInstanceState>,
) -> JoinHandle<std::io::Result<()>> {
let config = config.clone();
tokio::spawn(async move {
let result = observer(config).await;
let result = observer(config, instance_state_receiver).await;
if let Err(ref e) = result {
log::warn!("Abnormal termination of the state observer: {e}");
log::warn!("The state observer will not be available");
Expand All @@ -16,11 +22,14 @@ pub async fn spawn(config: &super::config::ObservabilityConfig) -> JoinHandle<st
})
}

async fn observer(config: super::config::ObservabilityConfig) -> std::io::Result<()> {
async fn observer(
config: Config,
instance_state_receiver: tokio::sync::watch::Receiver<ObservableInstanceState>,
) -> std::io::Result<()> {
let start_time = Instant::now();

let path = match config.observation_path {
Some(path) => path,
let path = match config.observability.observation_path {
Some(ref path) => path,
None => return Ok(()),
};

Expand All @@ -29,15 +38,16 @@ async fn observer(config: super::config::ObservabilityConfig) -> std::io::Result
// need elevated permissions to read from the socket. So we explicitly set
// the permissions
let permissions: std::fs::Permissions =
PermissionsExt::from_mode(config.observation_permissions);
PermissionsExt::from_mode(config.observability.observation_permissions);

let peers_listener = create_unix_socket_with_permissions(&path, permissions)?;
let peers_listener = create_unix_socket_with_permissions(path, permissions)?;

loop {
let (mut stream, _addr) = peers_listener.accept().await?;

let observe = ObservableState {
program: ProgramData::with_uptime(start_time.elapsed().as_secs_f64()),
instance: instance_state_receiver.borrow().to_owned(),
};

write_json(&mut stream, &observe).await?;
Expand Down
4 changes: 3 additions & 1 deletion statime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ publish.workspace = true
rust-version.workspace = true

[features]
default = ["std"]
default = ["std", "serde"]
std = []
fuzz = ["std"]
serde = ["dep:serde"]

[dependencies]
arrayvec.workspace = true
Expand All @@ -24,3 +25,4 @@ libm.workspace = true
log = { workspace = true, default-features = false}
rand = { workspace = true, default-features = false }
atomic_refcell.workspace = true
serde = { workspace = true, optional = true }
18 changes: 9 additions & 9 deletions statime/src/bmc/bmca.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{
use crate::{
datastructures::{
common::{PortIdentity, TimeInterval},
datasets::DefaultDS,
datasets::InternalDefaultDS,
messages::{AnnounceMessage, Header},
},
port::state::PortState,
Expand Down Expand Up @@ -106,7 +106,7 @@ impl<A> Bmca<A> {
/// If None is returned, then the port should remain in the same state as it
/// is now.
pub(crate) fn calculate_recommended_state<F>(
own_data: &DefaultDS,
own_data: &InternalDefaultDS,
best_global_announce_message: Option<BestAnnounceMessage>,
best_port_announce_message: Option<BestAnnounceMessage>,
port_state: &PortState<F>,
Expand All @@ -130,7 +130,7 @@ impl<A> Bmca<A> {
}

fn calculate_recommended_state_low_class(
own_data: &DefaultDS,
own_data: &InternalDefaultDS,
best_port_announce_message: Option<BestAnnounceMessage>,
) -> RecommendedState {
let d0 = ComparisonDataset::from_own_data(own_data);
Expand All @@ -143,7 +143,7 @@ impl<A> Bmca<A> {
}

fn calculate_recommended_state_high_class(
own_data: &DefaultDS,
own_data: &InternalDefaultDS,
best_global_announce_message: Option<BestAnnounceMessage>,
best_port_announce_message: Option<BestAnnounceMessage>,
) -> RecommendedState {
Expand Down Expand Up @@ -287,8 +287,8 @@ impl BestAnnounceMessage {

#[derive(Debug, PartialEq, Eq)]
pub(crate) enum RecommendedState {
M1(DefaultDS),
M2(DefaultDS),
M1(InternalDefaultDS),
M2(InternalDefaultDS),
M3(AnnounceMessage),
P1(AnnounceMessage),
P2(AnnounceMessage),
Expand Down Expand Up @@ -443,15 +443,15 @@ mod tests {
assert_eq!(message1.compare(&message2), Ordering::Less)
}

fn default_own_data() -> DefaultDS {
fn default_own_data() -> InternalDefaultDS {
let clock_identity = Default::default();
let priority_1 = 0;
let priority_2 = 0;
let domain_number = 0;
let slave_only = false;
let sdo_id = Default::default();

DefaultDS::new(InstanceConfig {
InternalDefaultDS::new(InstanceConfig {
clock_identity,
priority_1,
priority_2,
Expand Down Expand Up @@ -492,7 +492,7 @@ mod tests {
let slave_only = false;
let sdo_id = Default::default();

let mut own_data = DefaultDS::new(InstanceConfig {
let mut own_data = InternalDefaultDS::new(InstanceConfig {
clock_identity,
priority_1,
priority_2,
Expand Down
Loading

0 comments on commit efeca5c

Please sign in to comment.