Skip to content

Commit

Permalink
Implement rate limiting for Key Image requests in Fog Ledger Router
Browse files Browse the repository at this point in the history
  • Loading branch information
awygle committed Apr 26, 2023
1 parent a5d5175 commit 5802dbb
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 21 deletions.
71 changes: 71 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion fog/ledger/connection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ mod untrusted;
pub use untrusted::FogUntrustedLedgerGrpcClient;

mod router_client;
pub use router_client::LedgerGrpcClient;
pub use router_client::{Error as RouterClientError, LedgerGrpcClient};
21 changes: 17 additions & 4 deletions fog/ledger/connection/src/router_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct LedgerGrpcClient {
response_receiver: ClientDuplexReceiver<LedgerResponse>,

/// Low-lever ledger API client
_client: LedgerApiClient,
client: LedgerApiClient,
}

impl LedgerGrpcClient {
Expand Down Expand Up @@ -76,14 +76,27 @@ impl LedgerGrpcClient {
Self {
logger,
attest_cipher: None,
_client: client,
client,
request_sender,
response_receiver,
uri,
verifier,
}
}

/// Need ability to reconnect in case of rate limiting
pub fn reconnect(&mut self) {
self.deattest();

let (request_sender, response_receiver) = self
.client
.request()
.expect("Could not retrieve grpc sender and receiver.");

self.request_sender = request_sender;
self.response_receiver = response_receiver;
}

fn is_attested(&self) -> bool {
self.attest_cipher.is_some()
}
Expand Down Expand Up @@ -136,7 +149,6 @@ impl LedgerGrpcClient {
&mut self,
key_images: &[KeyImage],
) -> Result<CheckKeyImagesResponse, Error> {
log::trace!(self.logger, "Check key images was called");
if !self.is_attested() {
let verification_report = self.attest().await;
verification_report?;
Expand Down Expand Up @@ -202,7 +214,8 @@ impl LedgerGrpcClient {

impl Drop for LedgerGrpcClient {
fn drop(&mut self) {
block_on(self.request_sender.close()).expect("Couldn't close the router request sender");
// closing streams that have received an error result will fail, but that's OK
let _ = block_on(self.request_sender.close());
}
}

Expand Down
1 change: 1 addition & 0 deletions fog/ledger/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ mc-fog-uri = { path = "../../uri" }
clap = { version = "4.1", features = ["derive", "env"] }
displaydoc = { version = "0.2", default-features = false }
futures = "0.3"
governor = "0.5"
grpcio = "0.12.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
13 changes: 11 additions & 2 deletions fog/ledger/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use clap::Parser;
use mc_attest_core::ProviderId;
use mc_common::ResponderId;
use mc_fog_uri::{FogLedgerUri, KeyImageStoreUri};
use mc_util_parse::parse_duration_in_seconds;
use mc_util_parse::{parse_duration_in_milliseconds, parse_duration_in_seconds};
use mc_util_uri::AdminUri;
use serde::Serialize;
use std::{path::PathBuf, str::FromStr, time::Duration};
use std::{num::NonZeroU32, path::PathBuf, str::FromStr, time::Duration};

/// Configuration parameters for the Fog Ledger Router service.
#[derive(Clone, Parser, Serialize)]
Expand Down Expand Up @@ -65,6 +65,15 @@ pub struct LedgerRouterConfig {
#[clap(long, default_value = "86400", value_parser = parse_duration_in_seconds, env = "MC_CLIENT_AUTH_TOKEN_MAX_LIFETIME")]
pub client_auth_token_max_lifetime: Duration,

/// Rate limiting burst period, in milliseconds. Defaults to 10000 (10 sec)
#[clap(long, default_value = "10000", value_parser = parse_duration_in_milliseconds, env = "MC_RATE_LIMIT_BURST_PERIOD")]
pub rate_limit_burst_period: Duration,

/// Rate limiting maximum burst. Defaults to 80 requests.
#[clap(long, default_value = "80", env = "MC_RATE_LIMIT_MAX_BURST")]
pub rate_limit_max_burst: NonZeroU32,

/// Path to ledger db (lmdb)
/// Path to ledger db (lmdb)
#[clap(long, env = "MC_LEDGER_DB")]
pub ledger_db: PathBuf,
Expand Down
49 changes: 38 additions & 11 deletions fog/ledger/server/src/router_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ use crate::{
SVC_COUNTERS,
};
use futures::{future::try_join_all, SinkExt, TryStreamExt};
use governor::{
clock::{Clock, DefaultClock},
state::keyed::DefaultKeyedStateStore,
RateLimiter,
};
use grpcio::{ChannelBuilder, DuplexSink, RequestStream, RpcStatus, WriteFlags};
use mc_attest_api::attest;
use mc_attest_enclave_api::{EnclaveMessage, NonceSession};
Expand All @@ -21,7 +26,9 @@ use mc_fog_api::{
};
use mc_fog_ledger_enclave::LedgerEnclaveProxy;
use mc_fog_uri::{ConnectionUri, KeyImageStoreUri};
use mc_util_grpc::{rpc_invalid_arg_error, ConnectionUriGrpcioChannel, ResponseStatus};
use mc_util_grpc::{
rpc_invalid_arg_error, rpc_resource_exhausted_error, ConnectionUriGrpcioChannel, ResponseStatus,
};
use mc_util_metrics::GrpcMethodName;
use mc_util_telemetry::{create_context, tracer, BoxedTracer, FutureExt, Tracer};
use std::{collections::BTreeMap, str::FromStr, sync::Arc};
Expand All @@ -35,6 +42,7 @@ pub async fn handle_requests<E>(
mut requests: RequestStream<LedgerRequest>,
mut responses: DuplexSink<LedgerResponse>,
query_retries: usize,
rate_limit_context: Arc<RateLimiter<Vec<u8>, DefaultKeyedStateStore<Vec<u8>>, DefaultClock>>,
logger: Logger,
) -> Result<(), grpcio::Error>
where
Expand All @@ -52,6 +60,7 @@ where
shard_clients.clone(),
enclave.clone(),
query_retries,
rate_limit_context.clone(),
logger.clone(),
)
.await;
Expand All @@ -76,6 +85,7 @@ pub async fn handle_request<E>(
shard_clients: Vec<Arc<KeyImageStoreApiClient>>,
enclave: E,
query_retries: usize,
rate_limit_context: Arc<RateLimiter<Vec<u8>, DefaultKeyedStateStore<Vec<u8>>, DefaultClock>>,
logger: Logger,
) -> Result<LedgerResponse, RpcStatus>
where
Expand All @@ -87,16 +97,33 @@ where
tracer.in_span("auth", |_cx| handle_auth_request(enclave, request, logger))
}
Some(LedgerRequest_oneof_request_data::check_key_images(request)) => {
handle_query_request(
request,
enclave,
shard_clients,
query_retries,
logger,
&tracer,
)
.with_context(create_context(&tracer, "check_key_images"))
.await
match rate_limit_context.check_key(&request.channel_id) {
Ok(()) => {
handle_query_request(
request,
enclave,
shard_clients,
query_retries,
logger,
&tracer,
)
.with_context(create_context(&tracer, "check_key_images"))
.await
}
Err(not_until) => {
let rpc_status = rpc_resource_exhausted_error(
"Key image rate limit exceeded",
format!(
"Try again in {} milliseconds",
not_until
.wait_time_from(DefaultClock::default().now())
.as_millis()
),
&logger,
);
Err(rpc_status)
}
}
}
None => {
let rpc_status = rpc_invalid_arg_error(
Expand Down
2 changes: 2 additions & 0 deletions fog/ledger/server/src/router_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ where
enclave.clone(),
ledger_store_grpc_clients.clone(),
config.query_retries,
config.rate_limit_burst_period,
config.rate_limit_max_burst,
logger.clone(),
);

Expand Down
13 changes: 13 additions & 0 deletions fog/ledger/server/src/router_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
SVC_COUNTERS,
};
use futures::{FutureExt, TryFutureExt};
use governor::{clock::DefaultClock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter};
use grpcio::{DuplexSink, RequestStream, RpcContext, UnarySink};
use mc_attest_api::attest::{AuthMessage, Message};
use mc_common::logger::{log, Logger};
Expand All @@ -20,7 +21,9 @@ use mc_util_telemetry::tracer;

use std::{
collections::HashMap,
num::NonZeroU32,
sync::{Arc, RwLock},
time::Duration,
};

#[derive(Clone)]
Expand All @@ -31,6 +34,7 @@ where
enclave: E,
shards: Arc<RwLock<HashMap<KeyImageStoreUri, Arc<ledger_grpc::KeyImageStoreApiClient>>>>,
query_retries: usize,
rate_limit_context: Arc<RateLimiter<Vec<u8>, DefaultKeyedStateStore<Vec<u8>>, DefaultClock>>,
logger: Logger,
}

Expand All @@ -41,12 +45,20 @@ impl<E: LedgerEnclaveProxy> LedgerRouterService<E> {
enclave: E,
shards: Arc<RwLock<HashMap<KeyImageStoreUri, Arc<ledger_grpc::KeyImageStoreApiClient>>>>,
query_retries: usize,
burst_period: Duration,
max_burst: NonZeroU32,
logger: Logger,
) -> Self {
let rate_limiter = RateLimiter::keyed(
Quota::with_period(burst_period / max_burst.get())
.unwrap()
.allow_burst(max_burst),
);
Self {
enclave,
shards,
query_retries,
rate_limit_context: Arc::new(rate_limiter),
logger,
}
}
Expand Down Expand Up @@ -80,6 +92,7 @@ where
requests,
responses,
self.query_retries,
self.rate_limit_context.clone(),
logger.clone(),
)
.map_err(move |err| log::error!(&logger, "failed to reply: {}", err))
Expand Down
Loading

0 comments on commit 5802dbb

Please sign in to comment.