Skip to content

Commit

Permalink
Update metrics (#805)
Browse files Browse the repository at this point in the history
* Update metrics crate

Also bumps metrics-exporter-prometheus.

The biggest change from 0.21 -> 0.22 is in this PR
metrics-rs/metrics#394

* allow for versions greater than 0.22

this may change if the api to metrics changes _again_ before a major version.
  • Loading branch information
michaeldjeffrey committed May 6, 2024
1 parent db53596 commit 1888c61
Show file tree
Hide file tree
Showing 30 changed files with 241 additions and 156 deletions.
204 changes: 146 additions & 58 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ spl-token = "3.5.0"
reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]}
beacon = { git = "https://github.com/helium/proto", branch = "master" }
humantime = "2"
metrics = "0.21"
metrics = ">=0.22"
metrics-exporter-prometheus = "0"
tracing = "0"
tracing-subscriber = { version = "0", default-features=false, features = ["env-filter", "registry", "fmt"] }
Expand Down
2 changes: 1 addition & 1 deletion boost_manager/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub async fn last_reward_processed_time(
db: &Pool<Postgres>,
datetime: DateTime<Utc>,
) -> anyhow::Result<()> {
metrics::gauge!(LAST_REWARD_PROCESSED_TIME, datetime.timestamp() as f64);
metrics::gauge!(LAST_REWARD_PROCESSED_TIME).set(datetime.timestamp() as f64);
meta::store(db, LAST_REWARD_PROCESSED_TIME, datetime.timestamp()).await?;

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions boost_manager/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ where

async fn check_failed_activations(&self) -> Result<()> {
let num_marked_failed = db::update_failed_activations(&self.pool).await?;
metrics::counter!("failed_activations", num_marked_failed);
metrics::counter!("failed_activations").increment(num_marked_failed);
let total_failed_count = db::get_failed_activations_count(&self.pool).await?;
metrics::gauge!("db_failed_row_count", total_failed_count as f64);
metrics::gauge!("db_failed_row_count").set(total_failed_count as f64);
if total_failed_count > 0 {
tracing::warn!("{} failed status activations ", total_failed_count);
};
Expand All @@ -159,7 +159,7 @@ where
summed_activations_count: u64,
) -> Result<()> {
tracing::info!("processed batch of {} activations successfully", batch_size);
metrics::counter!("success_activations", summed_activations_count);
metrics::counter!("success_activations").increment(summed_activations_count);
db::update_success_batch(&self.pool, ids).await?;
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions db_store/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ macro_rules! query_exec_timed {
( $name:literal, $query:expr, $meth:ident, $exec:expr ) => {{
match poc_metrics::record_duration!(concat!($name, "_duration"), $query.$meth($exec).await) {
Ok(x) => {
metrics::increment_counter!(concat!($name, "_count"), "status" => "ok");
metrics::counter!(concat!($name, "_count"), "status" => "ok").increment(1);
Ok(x)
}
Err(e) => {
metrics::increment_counter!(concat!($name, "_count"), "status" => "error");
metrics::counter!(concat!($name, "_count"), "status" => "error").increment(1);
Err(Error::SqlError(e))
}
}
Expand Down
4 changes: 2 additions & 2 deletions db_store/src/metric_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn run(size_name: String, idle_name: String, pool: sqlx::Pool<sqlx::Postgr
loop {
trigger.tick().await;

metrics::gauge!(size_name.clone(), pool.size() as f64);
metrics::gauge!(idle_name.clone(), pool.num_idle() as f64);
metrics::gauge!(size_name.clone()).set(pool.size() as f64);
metrics::gauge!(idle_name.clone()).set(pool.num_idle() as f64);
}
}
3 changes: 1 addition & 2 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ where
let latency = Utc::now() - self.file_info.timestamp;
metrics::gauge!(
"file-processing-latency",
latency.num_seconds() as f64,
"file-type" => self.file_info.prefix.clone(), "process-name" => self.process_name.clone(),
);
).set(latency.num_seconds() as f64);

recorder.record(&self.process_name, &self.file_info).await?;
Ok(futures::stream::iter(self.data.into_iter()).boxed())
Expand Down
10 changes: 5 additions & 5 deletions file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl FileSinkBuilder {
metric: self.metric,
};

metrics::register_counter!(client.metric, vec![OK_LABEL]);
metrics::counter!(client.metric, vec![OK_LABEL]);

let mut sink = FileSink {
target_path: self.target_path,
Expand Down Expand Up @@ -172,22 +172,22 @@ impl FileSinkClient {
tokio::select! {
result = self.sender.send_timeout(Message::Data(on_write_tx, bytes), SEND_TIMEOUT) => match result {
Ok(_) => {
metrics::increment_counter!(
metrics::counter!(
self.metric,
labels
.chain(std::iter::once(OK_LABEL))
.collect::<Vec<Label>>()
);
).increment(1);
tracing::debug!("file_sink write succeeded for {:?}", self.metric);
Ok(on_write_rx)
}
Err(SendTimeoutError::Closed(_)) => {
metrics::increment_counter!(
metrics::counter!(
self.metric,
labels
.chain(std::iter::once(ERROR_LABEL))
.collect::<Vec<Label>>()
);
).increment(1);
tracing::error!("file_sink write failed for {:?} channel closed", self.metric);
Err(Error::channel())
}
Expand Down
31 changes: 16 additions & 15 deletions iot_config/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ const GATEWAY_CHAIN_LOOKUP_DURATION_METRIC: &str =
concat!(env!("CARGO_PKG_NAME"), "-", "gateway-info-lookup-duration");

pub fn initialize() {
metrics::gauge!(STREAM_METRIC, 0.0);
metrics::gauge!(STREAM_METRIC).set(0.0);
}

pub fn count_request(service: &'static str, rpc: &'static str) {
metrics::increment_counter!(RPC_METRIC, "service" => service, "rpc" => rpc);
metrics::counter!(RPC_METRIC, "service" => service, "rpc" => rpc).increment(1);
}

pub fn count_gateway_info_lookup(result: &'static str) {
metrics::increment_counter!(GATEWAY_CHAIN_LOOKUP_METRIC, "result" => result);
metrics::counter!(GATEWAY_CHAIN_LOOKUP_METRIC, "result" => result).increment(1);
}

pub fn gauge_hexes(cells: usize) {
metrics::gauge!(REGION_HEX_METRIC, cells as f64);
metrics::gauge!(REGION_HEX_METRIC).set(cells as f64);
}

pub fn count_region_lookup(
Expand All @@ -35,37 +35,38 @@ pub fn count_region_lookup(
) {
let reported_region =
reported_region.map_or_else(|| "LOOKUP_FAILED".to_string(), |region| region.to_string());
metrics::increment_counter!(
metrics::counter!(
REGION_LOOKUP_METRIC,
// per metrics docs, &str should be preferred for performance; should the regions be
// mapped through a match of region => &'static str of the name?
"default_region" => default_region.to_string(), "reported_region" => reported_region
);
)
.increment(1);
}

pub fn duration_gateway_info_lookup(start: std::time::Instant) {
metrics::histogram!(GATEWAY_CHAIN_LOOKUP_DURATION_METRIC, start.elapsed());
metrics::histogram!(GATEWAY_CHAIN_LOOKUP_DURATION_METRIC).record(start.elapsed());
}

pub fn count_skf_updates(adds: usize, removes: usize) {
metrics::counter!(SKF_ADD_COUNT_METRIC, adds as u64);
metrics::counter!(SKF_REMOVE_COUNT_METRIC, removes as u64);
metrics::counter!(SKF_ADD_COUNT_METRIC).increment(adds as u64);
metrics::counter!(SKF_REMOVE_COUNT_METRIC).increment(removes as u64);
}

pub fn count_eui_updates(adds: usize, removes: usize) {
metrics::counter!(EUI_ADD_COUNT_METRIC, adds as u64);
metrics::counter!(EUI_REMOVE_COUNT_METRIC, removes as u64);
metrics::counter!(EUI_ADD_COUNT_METRIC).increment(adds as u64);
metrics::counter!(EUI_REMOVE_COUNT_METRIC).increment(removes as u64);
}

pub fn count_devaddr_updates(adds: usize, removes: usize) {
metrics::counter!(DEVADDR_ADD_COUNT_METRIC, adds as u64);
metrics::counter!(DEVADDR_REMOVE_COUNT_METRIC, removes as u64);
metrics::counter!(DEVADDR_ADD_COUNT_METRIC).increment(adds as u64);
metrics::counter!(DEVADDR_REMOVE_COUNT_METRIC).increment(removes as u64);
}

pub fn route_stream_subscribe() {
metrics::increment_gauge!(STREAM_METRIC, 1.0);
metrics::gauge!(STREAM_METRIC).increment(1.0);
}

pub fn route_stream_unsubscribe() {
metrics::decrement_gauge!(STREAM_METRIC, 1.0);
metrics::gauge!(STREAM_METRIC).decrement(1.0);
}
2 changes: 1 addition & 1 deletion iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ where
payer_account.burned = payer_account.burned.saturating_sub(amount);
payer_account.balance = payer_account.balance.saturating_sub(amount);

metrics::counter!("burned", amount, "payer" => payer.to_string());
metrics::counter!("burned", "payer" => payer.to_string()).increment(amount);

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion iot_verifier/src/entropy_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl EntropyLoader {
report.version as i32,
)
.await?;
metrics::increment_counter!("oracles_iot_verifier_loader_entropy");
metrics::counter!("oracles_iot_verifier_loader_entropy").increment(1);
Ok(transaction)
})
.await?
Expand Down
4 changes: 2 additions & 2 deletions iot_verifier/src/gateway_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ impl GatewayCache {
) -> Result<GatewayInfo, GatewayCacheError> {
match self.gateway_cache_receiver.borrow().get(address) {
Some(hit) => {
metrics::increment_counter!("oracles_iot_verifier_gateway_cache_hit");
metrics::counter!("oracles_iot_verifier_gateway_cache_hit").increment(1);
Ok(hit.clone())
}
None => {
metrics::increment_counter!("oracles_iot_verifier_gateway_cache_miss");
metrics::counter!("oracles_iot_verifier_gateway_cache_miss").increment(1);
Err(GatewayCacheError::GatewayNotFound(address.clone()))
}
}
Expand Down
4 changes: 2 additions & 2 deletions iot_verifier/src/region_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ where
) -> Result<RegionParamsInfo, RegionCacheError<G::Error>> {
match self.cache.get(&region).await {
Some(hit) => {
metrics::increment_counter!("oracles_iot_verifier_region_params_cache_hit");
metrics::counter!("oracles_iot_verifier_region_params_cache_hit").increment(1);
Ok(hit.value().clone())
}
_ => match self.gateways.clone().resolve_region_params(region).await {
Ok(res) => {
metrics::increment_counter!("oracles_iot_verifier_region_params_cache_miss");
metrics::counter!("oracles_iot_verifier_region_params_cache_miss").increment(1);
self.cache
.insert(region, res.clone(), self.refresh_interval)
.await;
Expand Down
22 changes: 11 additions & 11 deletions iot_verifier/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,47 +26,47 @@ pub async fn initialize(db: &Pool<Postgres>) -> anyhow::Result<()> {
}

pub fn count_packets(count: u64) {
metrics::counter!(PACKET_COUNTER, count);
metrics::counter!(PACKET_COUNTER).increment(count);
}

pub fn count_non_rewardable_packets(count: u64) {
metrics::counter!(NON_REWARDABLE_PACKET_COUNTER, count);
metrics::counter!(NON_REWARDABLE_PACKET_COUNTER).increment(count);
}

pub fn count_loader_beacons(count: u64) {
metrics::counter!(LOADER_BEACON_COUNTER, count);
metrics::counter!(LOADER_BEACON_COUNTER).increment(count);
}

pub fn count_loader_witnesses(count: u64) {
metrics::counter!(LOADER_WITNESS_COUNTER, count);
metrics::counter!(LOADER_WITNESS_COUNTER).increment(count);
}

pub fn count_loader_dropped_beacons(count: u64, labels: &[(&'static str, &'static str)]) {
metrics::counter!(LOADER_DROPPED_BEACON_COUNTER, count, labels);
metrics::counter!(LOADER_DROPPED_BEACON_COUNTER, labels).increment(count);
}

pub fn count_loader_dropped_witnesses(count: u64, labels: &[(&'static str, &'static str)]) {
metrics::counter!(LOADER_DROPPED_WITNESS_COUNTER, count, labels);
metrics::counter!(LOADER_DROPPED_WITNESS_COUNTER, labels).increment(count);
}

pub fn num_beacons(count: u64) {
metrics::gauge!(BEACON_GUAGE, count as f64);
metrics::gauge!(BEACON_GUAGE).set(count as f64);
}

pub fn increment_num_beacons_by(count: u64) {
metrics::increment_gauge!(BEACON_GUAGE, count as f64);
metrics::gauge!(BEACON_GUAGE).increment(count as f64);
}

pub fn decrement_num_beacons() {
metrics::decrement_gauge!(BEACON_GUAGE, 1.0)
metrics::gauge!(BEACON_GUAGE).decrement(1.0)
}

pub fn increment_invalid_witnesses(labels: &[(&'static str, &'static str)]) {
metrics::increment_counter!(INVALID_WITNESS_COUNTER, labels);
metrics::counter!(INVALID_WITNESS_COUNTER, labels).increment(1);
}

pub fn last_rewarded_end_time(datetime: DateTime<Utc>) {
metrics::gauge!(LAST_REWARDED_END_TIME, datetime.timestamp() as f64);
metrics::gauge!(LAST_REWARDED_END_TIME).set(datetime.timestamp() as f64);
}

#[derive(Default)]
Expand Down
2 changes: 1 addition & 1 deletion iot_verifier/src/witness_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct Telemetry {

impl Telemetry {
fn new() -> Self {
let gauge = metrics::register_gauge!("iot_verifier_witness_updater_queue");
let gauge = metrics::gauge!("iot_verifier_witness_updater_queue");
gauge.set(0.0);
Self { queue_gauge: gauge }
}
Expand Down
2 changes: 1 addition & 1 deletion metrics/src/client_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ impl Timing {
if let Some(name) = self.name {
metrics::histogram!(
histogram_name,
self.start.elapsed().as_secs_f64(),
"name" => name,
"result" => self.result
)
.record(self.start.elapsed().as_secs_f64())
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ macro_rules! record_duration {
( $metric_name:expr, $e:expr ) => {{
let timer = std::time::Instant::now();
let res = $e;
::metrics::histogram!($metric_name, timer.elapsed());
::metrics::histogram!($metric_name).record(timer.elapsed());
res
}};
}
Expand Down Expand Up @@ -120,19 +120,19 @@ where
let metric_name_time = self.metric_name_time;

let timer = std::time::Instant::now();
metrics::increment_gauge!(metric_name_count, 1.0);
metrics::gauge!(metric_name_count).increment(1.0);

let clone = self.inner.clone();
// take the service that was ready
let mut inner = std::mem::replace(&mut self.inner, clone);

Box::pin(async move {
let res = inner.call(req).await;
metrics::decrement_gauge!(metric_name_count, 1.0);
metrics::gauge!(metric_name_count).decrement(1.0);
let elapsed_time = timer.elapsed();
tracing::debug!("request processed in {elapsed_time:?}");
// TODO What units to use? Is f64 seconds appropriate?
::metrics::histogram!(metric_name_time, elapsed_time.as_secs_f64());
metrics::histogram!(metric_name_time).record(elapsed_time.as_secs_f64());
res
})
}
Expand Down
4 changes: 2 additions & 2 deletions mobile_config/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ const GATEWAY_CHAIN_LOOKUP_METRIC: &str =
concat!(env!("CARGO_PKG_NAME"), "-", "gateway-chain-lookup");

pub fn count_request(service: &'static str, rpc: &'static str) {
metrics::increment_counter!(RPC_METRIC, "service" => service, "rpc" => rpc);
metrics::counter!(RPC_METRIC, "service" => service, "rpc" => rpc).increment(1);
}

pub fn count_gateway_chain_lookup(result: &'static str) {
metrics::increment_counter!(GATEWAY_CHAIN_LOOKUP_METRIC, "result" => result);
metrics::counter!(GATEWAY_CHAIN_LOOKUP_METRIC, "result" => result).increment(1);
}
6 changes: 4 additions & 2 deletions mobile_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ where
tracing::info!(%total_dcs, %payer, "Burning DC");
if self.burn_data_credits(&payer, total_dcs).await.is_err() {
// We have failed to burn data credits:
metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "false");
metrics::counter!("burned", "payer" => payer.to_string(), "success" => "false")
.increment(total_dcs);
continue;
}

// We succesfully managed to burn data credits:

metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "true");
metrics::counter!("burned", "payer" => payer.to_string(), "success" => "true")
.increment(total_dcs);

// Delete from the data transfer session and write out to S3

Expand Down
8 changes: 4 additions & 4 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,16 @@ impl CoverageDaemon {
self.oracle_boosting_sink.commit().await?;

loop {
#[rustfmt::skip]
tokio::select! {
_ = shutdown.clone() => {
tracing::info!("CoverageDaemon shutting down");
break;
}
Some(file) = self.coverage_objs.recv() => {
let start = Instant::now();
self.process_file(file).await?;
metrics::histogram!("coverage_object_processing_time", start.elapsed());
let start = Instant::now();
self.process_file(file).await?;
metrics::histogram!("coverage_object_processing_time")
.record(start.elapsed());
}
}
}
Expand Down
Loading

0 comments on commit 1888c61

Please sign in to comment.