Skip to content

Commit

Permalink
feat(rpc): rpc rate limiter impl (paradigmxyz#11952)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
  • Loading branch information
kamiyaa and mattsse authored Oct 25, 2024
1 parent e676d71 commit d91cacd
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/rpc/rpc-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ metrics.workspace = true
serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
tracing.workspace = true
tokio-util = { workspace = true }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }

[dev-dependencies]
reth-chainspec.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ pub use eth::EthHandlers;
mod metrics;
pub use metrics::{MeteredRequestFuture, RpcRequestMetricsService};

// Rpc rate limiter
pub mod rate_limiter;

/// Convenience function for starting a server in one step.
#[allow(clippy::too_many_arguments)]
pub async fn launch<Provider, Pool, Network, Tasks, Events, EvmConfig, EthApi, BlockExecutor>(
Expand Down
116 changes: 116 additions & 0 deletions crates/rpc/rpc-builder/src/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//! [`jsonrpsee`] helper layer for rate limiting certain methods.
use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse};
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore;
use tower::Layer;

/// Rate limiter for the RPC server.
///
/// Rate limits expensive calls such as debug_ and trace_.
#[derive(Debug, Clone)]
pub struct RpcRequestRateLimiter {
inner: Arc<RpcRequestRateLimiterInner>,
}

impl RpcRequestRateLimiter {
/// Create a new rate limit layer with the given number of permits.
pub fn new(rate_limit: usize) -> Self {
Self {
inner: Arc::new(RpcRequestRateLimiterInner {
call_guard: PollSemaphore::new(Arc::new(Semaphore::new(rate_limit))),
}),
}
}
}

impl<S> Layer<S> for RpcRequestRateLimiter {
type Service = RpcRequestRateLimitingService<S>;

fn layer(&self, inner: S) -> Self::Service {
RpcRequestRateLimitingService::new(inner, self.clone())
}
}

/// Rate Limiter for the RPC server
#[derive(Debug, Clone)]
struct RpcRequestRateLimiterInner {
/// Semaphore to rate limit calls
call_guard: PollSemaphore,
}

/// A [`RpcServiceT`] middleware that rate limits RPC calls to the server.
#[derive(Debug, Clone)]
pub struct RpcRequestRateLimitingService<S> {
/// The rate limiter for RPC requests
rate_limiter: RpcRequestRateLimiter,
/// The inner service being wrapped
inner: S,
}

impl<S> RpcRequestRateLimitingService<S> {
/// Create a new rate limited service.
pub const fn new(service: S, rate_limiter: RpcRequestRateLimiter) -> Self {
Self { inner: service, rate_limiter }
}
}

impl<'a, S> RpcServiceT<'a> for RpcRequestRateLimitingService<S>
where
S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
{
type Future = RateLimitingRequestFuture<S::Future>;

fn call(&self, req: Request<'a>) -> Self::Future {
let method_name = req.method_name();
if method_name.starts_with("trace_") || method_name.starts_with("debug_") {
RateLimitingRequestFuture {
fut: self.inner.call(req),
guard: Some(self.rate_limiter.inner.call_guard.clone()),
permit: None,
}
} else {
// if we don't need to rate limit, then there
// is no need to get a semaphore permit
RateLimitingRequestFuture { fut: self.inner.call(req), guard: None, permit: None }
}
}
}

/// Response future.
#[pin_project::pin_project]
pub struct RateLimitingRequestFuture<F> {
#[pin]
fut: F,
guard: Option<PollSemaphore>,
permit: Option<OwnedSemaphorePermit>,
}

impl<F> std::fmt::Debug for RateLimitingRequestFuture<F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("RateLimitingRequestFuture")
}
}

impl<F: Future<Output = MethodResponse>> Future for RateLimitingRequestFuture<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Some(guard) = this.guard.as_mut() {
*this.permit = ready!(guard.poll_acquire(cx));
*this.guard = None;
}
let res = this.fut.poll(cx);
if res.is_ready() {
*this.permit = None;
}
res
}
}

0 comments on commit d91cacd

Please sign in to comment.