Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use std sync primitives instead of parking_lot. #344

Merged
merged 4 commits into from
Mar 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
rust_version: ['1.56.1', 'stable', 'nightly']
rust_version: ['1.60.0', 'stable', 'nightly']
os: [ubuntu-latest, windows-latest, macOS-latest]
steps:
- uses: actions/checkout@v1
Expand Down
1 change: 0 additions & 1 deletion metrics-exporter-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ push-gateway = ["async-runtime", "hyper/client", "tracing"]
[dependencies]
metrics = { version = "^0.20", path = "../metrics" }
metrics-util = { version = "^0.14", path = "../metrics-util", default-features = false, features = ["recency", "registry", "summary"] }
parking_lot = { version = "0.12", default-features = false }
thiserror = { version = "1", default-features = false }
quanta = { version = "0.10.0", default-features = false }
indexmap = { version = "1", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions metrics-exporter-prometheus/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use std::pin::Pin;
use std::sync::RwLock;
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use std::thread;
use std::time::Duration;
Expand All @@ -31,7 +32,6 @@ use hyper::{
use indexmap::IndexMap;
#[cfg(feature = "http-listener")]
use ipnet::IpNet;
use parking_lot::RwLock;
use quanta::Clock;
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use tokio::runtime;
Expand Down Expand Up @@ -480,7 +480,7 @@ impl PrometheusBuilder {
let body = body
.map_err(|_| ())
.map(|mut b| b.copy_to_bytes(b.remaining()))
.map(|b| (&b[..]).to_vec())
.map(|b| b[..].to_vec())
.and_then(|s| String::from_utf8(s).map_err(|_| ()))
.unwrap_or_else(|_| {
String::from("<failed to read response body>")
Expand Down
14 changes: 8 additions & 6 deletions metrics-exporter-prometheus/src/recorder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::{PoisonError, RwLock};

use indexmap::IndexMap;
use metrics::{Counter, Gauge, Histogram, Key, KeyName, Recorder, SharedString, Unit};
use metrics_util::registry::{Recency, Registry};
use parking_lot::RwLock;
use quanta::Instant;

use crate::common::Snapshot;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl Inner {
// is not recent enough and should be/was deleted from the registry, we also need to
// delete it on our side as well.
let (name, labels) = key_to_parts(&key, Some(&self.global_labels));
let mut wg = self.distributions.write();
let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
let delete_by_name = if let Some(by_name) = wg.get_mut(&name) {
by_name.remove(&labels);
by_name.is_empty()
Expand All @@ -83,7 +83,7 @@ impl Inner {

let (name, labels) = key_to_parts(&key, Some(&self.global_labels));

let mut wg = self.distributions.write();
let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
let entry = wg
.entry(name.clone())
.or_insert_with(IndexMap::new)
Expand All @@ -93,7 +93,8 @@ impl Inner {
histogram.get_inner().clear_with(|samples| entry.record_samples(samples));
}

let distributions = self.distributions.read().clone();
let distributions =
self.distributions.read().unwrap_or_else(PoisonError::into_inner).clone();

Snapshot { counters, gauges, distributions }
}
Expand All @@ -102,7 +103,7 @@ impl Inner {
let Snapshot { mut counters, mut distributions, mut gauges } = self.get_recent_metrics();

let mut output = String::new();
let descriptions = self.descriptions.read();
let descriptions = self.descriptions.read().unwrap_or_else(PoisonError::into_inner);

for (name, mut by_labels) in counters.drain() {
if let Some(desc) = descriptions.get(name.as_str()) {
Expand Down Expand Up @@ -215,7 +216,8 @@ impl PrometheusRecorder {

fn add_description_if_missing(&self, key_name: &KeyName, description: SharedString) {
let sanitized = sanitize_metric_name(key_name.as_str());
let mut descriptions = self.inner.descriptions.write();
let mut descriptions =
self.inner.descriptions.write().unwrap_or_else(PoisonError::into_inner);
descriptions.entry(sanitized).or_insert(description);
}
}
Expand Down
2 changes: 1 addition & 1 deletion metrics-tracing-context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ tracing-subscriber = { version = "0.3.1", default-features = false, features = [

[dev-dependencies]
criterion = "0.3"
parking_lot = "0.12"
parking_lot = { version = "0.12.1", defautl-features = false }
tracing = { version = "0.1.29", default-features = false, features = ["std"] }
tracing-subscriber = { version = "0.3.1", default-features = false, features = ["registry"] }
5 changes: 2 additions & 3 deletions metrics-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ crossbeam-utils = { version = "0.8", default-features = false, optional = true }
portable-atomic = { version = "0.3", optional = true }
aho-corasick = { version = "0.7", default-features = false, optional = true, features = ["std"] }
indexmap = { version = "1", default-features = false, optional = true }
parking_lot = { version = "0.12", default-features = false, optional = true }
quanta = { version = "0.10.0", default-features = false, optional = true }
sketches-ddsketch = { version = "0.2.0", default-features = false, optional = true }
radix_trie = { version = "0.2", default-features = false, optional = true }
Expand Down Expand Up @@ -89,5 +88,5 @@ layers = ["layer-filter", "layer-router"]
layer-filter = ["aho-corasick"]
layer-router = ["radix_trie"]
summary = ["sketches-ddsketch"]
recency = ["parking_lot", "registry", "quanta"]
registry = ["portable-atomic", "crossbeam-epoch", "crossbeam-utils", "handles", "hashbrown", "num_cpus", "parking_lot"]
recency = ["registry", "quanta"]
registry = ["portable-atomic", "crossbeam-epoch", "crossbeam-utils", "handles", "hashbrown", "num_cpus"]
37 changes: 20 additions & 17 deletions metrics-util/src/registry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
//! High-performance metrics storage.

mod storage;
use std::{hash::BuildHasherDefault, iter::repeat};
use std::{
hash::BuildHasherDefault,
iter::repeat,
sync::{PoisonError, RwLock},
};

use hashbrown::{hash_map::RawEntryMut, HashMap};
use metrics::{Key, KeyHasher};
use parking_lot::RwLock;
pub use storage::{AtomicStorage, Storage};

#[cfg(feature = "recency")]
Expand Down Expand Up @@ -143,13 +146,13 @@ where
/// does not ensure that callers will see the registry as entirely empty at any given point.
pub fn clear(&self) {
for shard in &self.counters {
shard.write().clear();
shard.write().unwrap_or_else(PoisonError::into_inner).clear();
}
for shard in &self.gauges {
shard.write().clear();
shard.write().unwrap_or_else(PoisonError::into_inner).clear();
}
for shard in &self.histograms {
shard.write().clear();
shard.write().unwrap_or_else(PoisonError::into_inner).clear();
}
}

Expand All @@ -164,13 +167,13 @@ where
let (hash, shard) = self.get_hash_and_shard_for_counter(key);

// Try and get the handle if it exists, running our operation if we succeed.
let shard_read = shard.read();
let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
op(v)
} else {
// Switch to write guard and insert the handle first.
drop(shard_read);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
{
v
Expand Down Expand Up @@ -198,13 +201,13 @@ where
let (hash, shard) = self.get_hash_and_shard_for_gauge(key);

// Try and get the handle if it exists, running our operation if we succeed.
let shard_read = shard.read();
let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
op(v)
} else {
// Switch to write guard and insert the handle first.
drop(shard_read);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
{
v
Expand Down Expand Up @@ -232,13 +235,13 @@ where
let (hash, shard) = self.get_hash_and_shard_for_histogram(key);

// Try and get the handle if it exists, running our operation if we succeed.
let shard_read = shard.read();
let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
op(v)
} else {
// Switch to write guard and insert the handle first.
drop(shard_read);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
{
v
Expand All @@ -260,7 +263,7 @@ where
/// Returns `true` if the counter existed and was removed, `false` otherwise.
pub fn delete_counter(&self, key: &K) -> bool {
let (hash, shard) = self.get_hash_and_shard_for_counter(key);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
if let RawEntryMut::Occupied(entry) = entry {
let _ = entry.remove_entry();
Expand All @@ -275,7 +278,7 @@ where
/// Returns `true` if the gauge existed and was removed, `false` otherwise.
pub fn delete_gauge(&self, key: &K) -> bool {
let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
if let RawEntryMut::Occupied(entry) = entry {
let _ = entry.remove_entry();
Expand All @@ -290,7 +293,7 @@ where
/// Returns `true` if the histogram existed and was removed, `false` otherwise.
pub fn delete_histogram(&self, key: &K) -> bool {
let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
let mut shard_write = shard.write();
let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
if let RawEntryMut::Occupied(entry) = entry {
let _ = entry.remove_entry();
Expand All @@ -312,7 +315,7 @@ where
F: FnMut(&K, &S::Counter),
{
for subshard in self.counters.iter() {
let shard_read = subshard.read();
let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
for (key, counter) in shard_read.iter() {
collect(key, counter);
}
Expand All @@ -331,7 +334,7 @@ where
F: FnMut(&K, &S::Gauge),
{
for subshard in self.gauges.iter() {
let shard_read = subshard.read();
let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
for (key, gauge) in shard_read.iter() {
collect(key, gauge);
}
Expand All @@ -350,7 +353,7 @@ where
F: FnMut(&K, &S::Histogram),
{
for subshard in self.histograms.iter() {
let shard_read = subshard.read();
let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
for (key, histogram) in shard_read.iter() {
collect(key, histogram);
}
Expand Down
5 changes: 2 additions & 3 deletions metrics-util/src/registry/recency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
//! observed, to build a complete picture that allows deciding if a given metric has gone "idle" or
//! not, and thus whether it should actually be deleted.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex, PoisonError};
use std::time::Duration;
use std::{collections::HashMap, ops::DerefMut};

use metrics::{Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn};
use parking_lot::Mutex;
use quanta::{Clock, Instant};

use crate::Hashable;
Expand Down Expand Up @@ -311,7 +310,7 @@ where
{
if let Some(idle_timeout) = self.idle_timeout {
if self.mask.matches(kind) {
let mut guard = self.inner.lock();
let mut guard = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
let (clock, entries) = guard.deref_mut();

let now = clock.now();
Expand Down