Skip to content

Commit

Permalink
ref(redis): Expose redis pool state via statsd (#3812)
Browse files Browse the repository at this point in the history
Need more insight into the Redis pool, we should make sure it's never
exhausted.
  • Loading branch information
Dav1dde committed Jul 11, 2024
1 parent 6dc184f commit 1ad9f6d
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 4 deletions.
21 changes: 21 additions & 0 deletions relay-redis/src/real.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,25 @@ impl RedisPool {
inner,
})
}

/// Returns information about the current state of the pool.
pub fn stats(&self) -> Stats {
let s = match &self.inner {
RedisPoolInner::Cluster(p) => p.state(),
RedisPoolInner::Single(p) => p.state(),
};

Stats {
connections: s.connections,
idle_connections: s.idle_connections,
}
}
}

/// Stats about how the [`RedisPool`] is performing.
pub struct Stats {
/// The number of connections currently being managed by the pool.
pub connections: u32,
/// The number of idle connections.
pub idle_connections: u32,
}
10 changes: 8 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl ServiceState {
buffer_guard.clone(),
project_cache_services,
metric_outcomes,
redis_pool,
redis_pool.clone(),
)
.spawn_handler(project_cache_rx);

Expand All @@ -210,7 +210,13 @@ impl ServiceState {
)
.start();

RelayStats::new(config.clone(), upstream_relay.clone()).start();
RelayStats::new(
config.clone(),
upstream_relay.clone(),
#[cfg(feature = "processing")]
redis_pool,
)
.start();

let relay_cache = RelayCacheService::new(config.clone(), upstream_relay.clone()).start();

Expand Down
32 changes: 30 additions & 2 deletions relay-server/src/services/stats.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::Arc;

use relay_config::{Config, RelayMode};
#[cfg(feature = "processing")]
use relay_redis::RedisPool;
use relay_statsd::metric;
use relay_system::{Addr, Service};
use tokio::time::interval;
Expand All @@ -14,13 +16,21 @@ use crate::statsd::{RelayGauges, TokioGauges};
pub struct RelayStats {
config: Arc<Config>,
upstream_relay: Addr<UpstreamRelay>,
#[cfg(feature = "processing")]
redis_pool: Option<RedisPool>,
}

impl RelayStats {
pub fn new(config: Arc<Config>, upstream_relay: Addr<UpstreamRelay>) -> Self {
pub fn new(
config: Arc<Config>,
upstream_relay: Addr<UpstreamRelay>,
#[cfg(feature = "processing")] redis_pool: Option<RedisPool>,
) -> Self {
Self {
config,
upstream_relay,
#[cfg(feature = "processing")]
redis_pool,
}
}

Expand Down Expand Up @@ -89,6 +99,20 @@ impl RelayStats {
}
}
}

#[cfg(not(feature = "processing"))]
async fn redis_pool(&self) {}

#[cfg(feature = "processing")]
async fn redis_pool(&self) {
let Some(ref redis_pool) = self.redis_pool else {
return;
};

let state = redis_pool.stats();
metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections));
metric!(gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections));
}
}

impl Service for RelayStats {
Expand All @@ -101,7 +125,11 @@ impl Service for RelayStats {

tokio::spawn(async move {
loop {
let _ = tokio::join!(self.tokio_metrics(), self.upstream_status());
let _ = tokio::join!(
self.upstream_status(),
self.tokio_metrics(),
self.redis_pool(),
);
ticker.tick().await;
}
});
Expand Down
10 changes: 10 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ pub enum RelayGauges {
///
/// Relay uses the same value for its memory health check.
SystemMemoryTotal,
/// The number of connections currently being managed by the Redis Pool.
#[cfg(feature = "processing")]
RedisPoolConnections,
/// The number of idle connections in the Redis Pool.
#[cfg(feature = "processing")]
RedisPoolIdleConnections,
}

impl GaugeMetric for RelayGauges {
Expand All @@ -46,6 +52,10 @@ impl GaugeMetric for RelayGauges {
RelayGauges::BufferPeriodicUnspool => "buffer.unspool.periodic",
RelayGauges::SystemMemoryUsed => "health.system_memory.used",
RelayGauges::SystemMemoryTotal => "health.system_memory.total",
#[cfg(feature = "processing")]
RelayGauges::RedisPoolConnections => "redis.pool.connections",
#[cfg(feature = "processing")]
RelayGauges::RedisPoolIdleConnections => "redis.pool.idle_connections",
}
}
}
Expand Down

0 comments on commit 1ad9f6d

Please sign in to comment.