Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing for shared locks in id_lock_map #7618

Merged
merged 5 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions storage_controller/src/id_lock_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ use crate::service::RECONCILE_TIMEOUT;

const LOCK_TIMEOUT_ALERT_THRESHOLD: Duration = RECONCILE_TIMEOUT;

/// A wrapper around `OwnedRwLockWriteGuard` that when dropped changes the
/// current holding operation in lock.
pub struct WrappedWriteGuard<T: Display> {
/// A wrapper around `OwnedRwLockWriteGuard` used for tracking the
/// operation that holds the lock, and print a warning if it exceeds
/// the LOCK_TIMEOUT_ALERT_THRESHOLD time
pub struct TracingWriteGuard<T: Display> {
guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>,
start: Instant,
}

impl<T: Display> WrappedWriteGuard<T> {
impl<T: Display> TracingWriteGuard<T> {
VladLazar marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>) -> Self {
Self {
guard,
Expand All @@ -24,12 +25,12 @@ impl<T: Display> WrappedWriteGuard<T> {
}
}

impl<T: Display> Drop for WrappedWriteGuard<T> {
impl<T: Display> Drop for TracingWriteGuard<T> {
fn drop(&mut self) {
let duration = self.start.elapsed();
if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
tracing::warn!(
"Exclusive lock on {} was held for {:?}",
"Exclusive lock by {} was held for {:?}",
self.guard.as_ref().unwrap(),
duration
);
Expand All @@ -38,15 +39,16 @@ impl<T: Display> Drop for WrappedWriteGuard<T> {
}
}

/// A wrapper around `OwnedRwLockReadGuard` that when dropped changes the
/// current holding operation in lock.
pub struct WrappedReadGuard<T: Display> {
// A wrapper around `OwnedRwLockReadGuard` used for tracking the
/// operation that holds the lock, and print a warning if it exceeds
/// the LOCK_TIMEOUT_ALERT_THRESHOLD time
pub struct TracingReadGuard<T: Display> {
_guard: tokio::sync::OwnedRwLockReadGuard<Option<T>>,
operation: T,
start: Instant,
}

impl<T: Display> WrappedReadGuard<T> {
impl<T: Display> TracingReadGuard<T> {
pub fn new(guard: tokio::sync::OwnedRwLockReadGuard<Option<T>>, operation: T) -> Self {
Self {
_guard: guard,
Expand All @@ -56,12 +58,12 @@ impl<T: Display> WrappedReadGuard<T> {
}
}

impl<T: Display> Drop for WrappedReadGuard<T> {
impl<T: Display> Drop for TracingReadGuard<T> {
fn drop(&mut self) {
let duration = self.start.elapsed();
if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
tracing::warn!(
"Shared lock on {} was held for {:?}",
"Shared lock by {} was held for {:?}",
self.operation,
duration
);
Expand Down Expand Up @@ -90,21 +92,21 @@ where
&self,
key: T,
operation: I,
) -> impl std::future::Future<Output = WrappedReadGuard<I>> {
) -> impl std::future::Future<Output = TracingReadGuard<I>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default().clone();
async move { WrappedReadGuard::new(entry.read_owned().await, operation) }
async move { TracingReadGuard::new(entry.read_owned().await, operation) }
}

pub(crate) fn exclusive(
&self,
key: T,
operation: I,
) -> impl std::future::Future<Output = WrappedWriteGuard<I>> {
) -> impl std::future::Future<Output = TracingWriteGuard<I>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default().clone();
async move {
let mut guard = WrappedWriteGuard::new(entry.write_owned().await);
let mut guard = TracingWriteGuard::new(entry.write_owned().await);
*guard.guard = Some(operation);
guard
}
Expand Down Expand Up @@ -136,7 +138,7 @@ pub async fn trace_exclusive_lock<
op_locks: &IdLockMap<T, I>,
key: T,
operation: I,
) -> WrappedWriteGuard<I> {
) -> TracingWriteGuard<I> {
let start = Instant::now();
let guard = op_locks.exclusive(key.clone(), operation.clone()).await;

Expand All @@ -160,7 +162,7 @@ pub async fn trace_shared_lock<
op_locks: &IdLockMap<T, I>,
key: T,
operation: I,
) -> WrappedReadGuard<I> {
) -> TracingReadGuard<I> {
let start = Instant::now();
let guard = op_locks.shared(key.clone(), operation.clone()).await;

Expand Down
7 changes: 4 additions & 3 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use crate::{
compute_hook::NotifyError,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingWriteGuard},
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{ScheduleContext, ScheduleMode},
Expand Down Expand Up @@ -330,7 +330,7 @@ struct TenantShardSplitAbort {
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
/// Until this abort op is complete, no other operations may be done on the tenant
_tenant_lock: WrappedWriteGuard<TenantOperations>,
_tenant_lock: TracingWriteGuard<TenantOperations>,
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -1363,7 +1363,7 @@ impl Service {
async fn node_activate_reconcile(
&self,
mut node: Node,
_lock: &WrappedWriteGuard<NodeOperations>,
_lock: &TracingWriteGuard<NodeOperations>,
) -> Result<(), ApiError> {
// This Node is a mutable local copy: we will set it active so that we can use its
// API client to reconcile with the node. The Node in [`Self::nodes`] will get updated
Expand Down Expand Up @@ -2533,6 +2533,7 @@ impl Service {
TenantOperations::TimelineCreate,
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");

self.ensure_attached_wait(tenant_id).await?;

Expand Down
8 changes: 4 additions & 4 deletions test_runner/regress/test_storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1273,8 +1273,8 @@ def test_lock_time_tracing(neon_env_builder: NeonEnvBuilder):
tenant_id = env.initial_tenant
env.storage_controller.allowed_errors.extend(
[
".*Exclusive lock on.*",
".*Shared lock on.*",
".*Exclusive lock by.*",
".*Shared lock by.*",
".*Scheduling is disabled by policy.*",
f".*Operation TimelineCreate on key {tenant_id} has waited.*",
]
Expand Down Expand Up @@ -1306,7 +1306,7 @@ def update_tenent_policy():
)
thread_update_tenant_policy.join(timeout=10)

env.storage_controller.assert_log_contains("Exclusive lock on UpdatePolicy was held for")
env.storage_controller.assert_log_contains("Exclusive lock by UpdatePolicy was held for")
_, last_log_cursor = env.storage_controller.assert_log_contains(
f"Operation TimelineCreate on key {tenant_id} has waited"
)
Expand All @@ -1322,7 +1322,7 @@ def update_tenent_policy():
pg_version=PgVersion.NOT_SET, tenant_id=tenant_id, new_timeline_id=timeline_id
)
env.storage_controller.assert_log_contains(
"Shared lock on TimelineCreate was held for", offset=last_log_cursor
"Shared lock by TimelineCreate was held for", offset=last_log_cursor
)


Expand Down
Loading