Skip to content

Commit

Permalink
Use std sync primitives instead of parking_lot. (metrics-rs#344)
Browse files Browse the repository at this point in the history
* Use std::sync::Mutex instead of parking_lot::Mutex.
* Bump MSRV to 1.60.0 for CI.

Co-authored-by: Toby Lawrence <tobz@users.noreply.github.com>
  • Loading branch information
BratSinot and tobz authored Mar 12, 2023
1 parent 96b5455 commit 298aa3c
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
rust_version: ['1.56.1', 'stable', 'nightly']
rust_version: ['1.60.0', 'stable', 'nightly']
os: [ubuntu-latest, windows-latest, macOS-latest]
steps:
- uses: actions/checkout@v1
Expand Down
1 change: 0 additions & 1 deletion metrics-exporter-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ push-gateway = ["async-runtime", "hyper/client", "tracing"]
[dependencies]
metrics = { version = "^0.20", path = "../metrics" }
metrics-util = { version = "^0.14", path = "../metrics-util", default-features = false, features = ["recency", "registry", "summary"] }
parking_lot = { version = "0.12", default-features = false }
thiserror = { version = "1", default-features = false }
quanta = { version = "0.10.0", default-features = false }
indexmap = { version = "1", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions metrics-exporter-prometheus/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use std::pin::Pin;
use std::sync::RwLock;
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use std::thread;
use std::time::Duration;
Expand All @@ -31,7 +32,6 @@ use hyper::{
use indexmap::IndexMap;
#[cfg(feature = "http-listener")]
use ipnet::IpNet;
use parking_lot::RwLock;
use quanta::Clock;
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use tokio::runtime;
Expand Down Expand Up @@ -480,7 +480,7 @@ impl PrometheusBuilder {
let body = body
.map_err(|_| ())
.map(|mut b| b.copy_to_bytes(b.remaining()))
.map(|b| (&b[..]).to_vec())
.map(|b| b[..].to_vec())
.and_then(|s| String::from_utf8(s).map_err(|_| ()))
.unwrap_or_else(|_| {
String::from("<failed to read response body>")
Expand Down
14 changes: 8 additions & 6 deletions metrics-exporter-prometheus/src/recorder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::{PoisonError, RwLock};

use indexmap::IndexMap;
use metrics::{Counter, Gauge, Histogram, Key, KeyName, Recorder, SharedString, Unit};
use metrics_util::registry::{Recency, Registry};
use parking_lot::RwLock;
use quanta::Instant;

use crate::common::Snapshot;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl Inner {
// is not recent enough and should be/was deleted from the registry, we also need to
// delete it on our side as well.
let (name, labels) = key_to_parts(&key, Some(&self.global_labels));
let mut wg = self.distributions.write();
let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
let delete_by_name = if let Some(by_name) = wg.get_mut(&name) {
by_name.remove(&labels);
by_name.is_empty()
Expand All @@ -83,7 +83,7 @@ impl Inner {

let (name, labels) = key_to_parts(&key, Some(&self.global_labels));

let mut wg = self.distributions.write();
let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
let entry = wg
.entry(name.clone())
.or_insert_with(IndexMap::new)
Expand All @@ -93,7 +93,8 @@ impl Inner {
histogram.get_inner().clear_with(|samples| entry.record_samples(samples));
}

let distributions = self.distributions.read().clone();
let distributions =
self.distributions.read().unwrap_or_else(PoisonError::into_inner).clone();

Snapshot { counters, gauges, distributions }
}
Expand All @@ -102,7 +103,7 @@ impl Inner {
let Snapshot { mut counters, mut distributions, mut gauges } = self.get_recent_metrics();

let mut output = String::new();
let descriptions = self.descriptions.read();
let descriptions = self.descriptions.read().unwrap_or_else(PoisonError::into_inner);

for (name, mut by_labels) in counters.drain() {
if let Some(desc) = descriptions.get(name.as_str()) {
Expand Down Expand Up @@ -215,7 +216,8 @@ impl PrometheusRecorder {

fn add_description_if_missing(&self, key_name: &KeyName, description: SharedString) {
let sanitized = sanitize_metric_name(key_name.as_str());
let mut descriptions = self.inner.descriptions.write();
let mut descriptions =
self.inner.descriptions.write().unwrap_or_else(PoisonError::into_inner);
descriptions.entry(sanitized).or_insert(description);
}
}
Expand Down
2 changes: 1 addition & 1 deletion metrics-tracing-context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ tracing-subscriber = { version = "0.3.1", default-features = false, features = [

[dev-dependencies]
criterion = "0.3"
parking_lot = "0.12"
parking_lot = { version = "0.12.1", defautl-features = false }
tracing = { version = "0.1.29", default-features = false, features = ["std"] }
tracing-subscriber = { version = "0.3.1", default-features = false, features = ["registry"] }
5 changes: 2 additions & 3 deletions metrics-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ crossbeam-utils = { version = "0.8", default-features = false, optional = true }
portable-atomic = { version = "0.3", optional = true }
aho-corasick = { version = "0.7", default-features = false, optional = true, features = ["std"] }
indexmap = { version = "1", default-features = false, optional = true }
parking_lot = { version = "0.12", default-features = false, optional = true }
quanta = { version = "0.10.0", default-features = false, optional = true }
sketches-ddsketch = { version = "0.2.0", default-features = false, optional = true }
radix_trie = { version = "0.2", default-features = false, optional = true }
Expand Down Expand Up @@ -89,5 +88,5 @@ layers = ["layer-filter", "layer-router"]
layer-filter = ["aho-corasick"]
layer-router = ["radix_trie"]
summary = ["sketches-ddsketch"]
recency = ["parking_lot", "registry", "quanta"]
registry = ["portable-atomic", "crossbeam-epoch", "crossbeam-utils", "handles", "hashbrown", "num_cpus", "parking_lot"]
recency = ["registry", "quanta"]
registry = ["portable-atomic", "crossbeam-epoch", "crossbeam-utils", "handles", "hashbrown", "num_cpus"]
37 changes: 20 additions & 17 deletions metrics-util/src/registry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
//! High-performance metrics storage.

mod storage;
use std::{hash::BuildHasherDefault, iter::repeat};
use std::{
hash::BuildHasherDefault,
iter::repeat,
sync::{PoisonError, RwLock},
};

use hashbrown::{hash_map::RawEntryMut, HashMap};
use metrics::{Key, KeyHasher};
use parking_lot::RwLock;
pub use storage::{AtomicStorage, Storage};

#[cfg(feature = "recency")]
Expand Down Expand Up @@ -143,13 +146,13 @@ where
/// does not ensure that callers will see the registry as entirely empty at any given point.
pub fn clear(&self) {
for shard in &self.counters {
shard.write().clear();
shard.write().unwrap_or_else(PoisonError::into_inner).clear();
}
for shard in &self.gauges {
shard.write().clear();
shard.write().unwrap_or_else(PoisonError::into_inner).clear();
}
for shard in &self.histograms {
shard.write().clear();
shard.write().unwrap_or_else(PoisonError::into_inner).clear();
}
}

Expand All @@ -164,13 +167,13 @@ where
let (hash, shard) = self.get_hash_and_shard_for_counter(key);

// Try and get the handle if it exists, running our operation if we succeed.
let shard_read = shard.read();
let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
op(v)
} else {
// Switch to write guard and insert the handle first.
drop(shard_read);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
{
v
Expand Down Expand Up @@ -198,13 +201,13 @@ where
let (hash, shard) = self.get_hash_and_shard_for_gauge(key);

// Try and get the handle if it exists, running our operation if we succeed.
let shard_read = shard.read();
let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
op(v)
} else {
// Switch to write guard and insert the handle first.
drop(shard_read);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
{
v
Expand Down Expand Up @@ -232,13 +235,13 @@ where
let (hash, shard) = self.get_hash_and_shard_for_histogram(key);

// Try and get the handle if it exists, running our operation if we succeed.
let shard_read = shard.read();
let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
op(v)
} else {
// Switch to write guard and insert the handle first.
drop(shard_read);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
{
v
Expand All @@ -260,7 +263,7 @@ where
/// Returns `true` if the counter existed and was removed, `false` otherwise.
pub fn delete_counter(&self, key: &K) -> bool {
let (hash, shard) = self.get_hash_and_shard_for_counter(key);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
if let RawEntryMut::Occupied(entry) = entry {
let _ = entry.remove_entry();
Expand All @@ -275,7 +278,7 @@ where
/// Returns `true` if the gauge existed and was removed, `false` otherwise.
pub fn delete_gauge(&self, key: &K) -> bool {
let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
if let RawEntryMut::Occupied(entry) = entry {
let _ = entry.remove_entry();
Expand All @@ -290,7 +293,7 @@ where
/// Returns `true` if the histogram existed and was removed, `false` otherwise.
pub fn delete_histogram(&self, key: &K) -> bool {
let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
if let RawEntryMut::Occupied(entry) = entry {
let _ = entry.remove_entry();
Expand All @@ -312,7 +315,7 @@ where
F: FnMut(&K, &S::Counter),
{
for subshard in self.counters.iter() {
let shard_read = subshard.read();
let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
for (key, counter) in shard_read.iter() {
collect(key, counter);
}
Expand All @@ -331,7 +334,7 @@ where
F: FnMut(&K, &S::Gauge),
{
for subshard in self.gauges.iter() {
let shard_read = subshard.read();
let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
for (key, gauge) in shard_read.iter() {
collect(key, gauge);
}
Expand All @@ -350,7 +353,7 @@ where
F: FnMut(&K, &S::Histogram),
{
for subshard in self.histograms.iter() {
let shard_read = subshard.read();
let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
for (key, histogram) in shard_read.iter() {
collect(key, histogram);
}
Expand Down
5 changes: 2 additions & 3 deletions metrics-util/src/registry/recency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
//! observed, to build a complete picture that allows deciding if a given metric has gone "idle" or
//! not, and thus whether it should actually be deleted.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex, PoisonError};
use std::time::Duration;
use std::{collections::HashMap, ops::DerefMut};

use metrics::{Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn};
use parking_lot::Mutex;
use quanta::{Clock, Instant};

use crate::Hashable;
Expand Down Expand Up @@ -311,7 +310,7 @@ where
{
if let Some(idle_timeout) = self.idle_timeout {
if self.mask.matches(kind) {
let mut guard = self.inner.lock();
let mut guard = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
let (clock, entries) = guard.deref_mut();

let now = clock.now();
Expand Down

0 comments on commit 298aa3c

Please sign in to comment.