Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

chainHead: Limit ongoing operations #14699

Merged
merged 28 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4787dba
chainHead/api: Make storage/body/call pure RPC methods
lexnv Jul 27, 2023
8050811
chainHead: Add mpsc channel between RPC methods
lexnv Jul 27, 2023
3b30eb3
chainHead/subscriptions: Extract mpsc::Sender via BlockGuard
lexnv Jul 27, 2023
d5305a9
chainHead/subscriptions: Generate and provide the method operation ID
lexnv Jul 27, 2023
50c840d
chainHead: Generate `chainHead_body` response
lexnv Jul 27, 2023
6c5940c
chainHead: Generate `chainHead_call` response
lexnv Jul 27, 2023
589efa8
chainHead: Generate `chainHead_storage` responses
lexnv Jul 27, 2023
8062837
chainHead: Propagate responses of methods to chainHead_follow
lexnv Jul 27, 2023
4f7b445
chainHead/tests: Adjust `chainHead_body` responses
lexnv Jul 31, 2023
53fcd99
chainHead/tests: Adjust `chainHead_call` responses
lexnv Jul 31, 2023
167dd72
chainHead/tests: Adjust `chainHead_call` responses
lexnv Jul 31, 2023
35aca6d
chainHead/tests: Ensure unique operation IDs across methods
lexnv Jul 31, 2023
008fa22
chainHead/events: Remove old method events
lexnv Jul 31, 2023
6c30aef
chainHead/subscriptions: Add limit helper
lexnv Aug 1, 2023
a29d677
chainHead/subscription: Expose limits to `BlockGuard`
lexnv Aug 1, 2023
a33d80c
chainHead/tests: Adjust testing to ongoing operations
lexnv Aug 1, 2023
40a73fb
chainHead: Make limits configurable via `ChainHeadConfig`
lexnv Aug 1, 2023
eb15293
chainHead/tests: Adjust testing to `ChainHeadConfig`
lexnv Aug 1, 2023
a026fe0
chainHead/tests: Ensure operation limits discards items
lexnv Aug 1, 2023
4e4cca8
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_limits
lexnv Aug 9, 2023
defc196
chainHead: Improve documentation
lexnv Aug 9, 2023
4cacbc1
chainHead: Rename `OngoingOperations` -> `LimitOperations`
lexnv Aug 9, 2023
09c743c
chainHead: Rename reserve -> reserve_at_most
lexnv Aug 9, 2023
06fec3e
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_limits
Aug 10, 2023
ab481e8
chainHead: Use duration const instead of u64
lexnv Aug 14, 2023
09e7372
chainHead/subscription: Use tokio::sync::Semaphore for limits
lexnv Aug 14, 2023
8954c33
Merge remote-tracking branch 'origin/lexnv/chainhead_limits' into lex…
lexnv Aug 14, 2023
a3df109
Update client/rpc-spec-v2/src/chain_head/subscription/inner.rs
lexnv Aug 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ hex = "0.4"
futures = "0.3.21"
parking_lot = "0.12.1"
tokio-stream = { version = "0.1", features = ["sync"] }
tokio = { version = "1.22.0", features = ["sync"] }
array-bytes = "6.1"
log = "0.4.17"
futures-util = { version = "0.3.19", default-features = false }
Expand Down
96 changes: 67 additions & 29 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,41 @@ use std::{marker::PhantomData, sync::Arc, time::Duration};

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";

/// The configuration of [`ChainHead`].
pub struct ChainHeadConfig {
/// The maximum number of pinned blocks across all subscriptions.
pub global_max_pinned_blocks: usize,
/// The maximum duration that a block is allowed to be pinned per subscription.
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
}

/// Maximum pinned blocks across all connections.
/// This number is large enough to consider immediate blocks.
/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
const MAX_PINNED_BLOCKS: usize = 512;

/// Any block of any subscription should not be pinned more than
/// this constant. When a subscription contains a block older than this,
/// the subscription becomes subject to termination.
/// Note: This should be enough for immediate blocks.
const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);

/// The maximum number of ongoing operations per subscription.
/// Note: The lower limit imposed by the spec is 16.
const MAX_ONGOING_OPERATIONS: usize = 16;

impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
}
}
}

/// An API for chain head RPC calls.
pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
Expand All @@ -76,17 +111,17 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
genesis_hash: GenesisHash,
max_pinned_blocks: usize,
max_pinned_duration: Duration,
config: ChainHeadConfig,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
backend: backend.clone(),
executor,
subscriptions: Arc::new(SubscriptionManagement::new(
max_pinned_blocks,
max_pinned_duration,
config.global_max_pinned_blocks,
config.subscription_max_pinned_duration,
config.subscription_max_ongoing_operations,
backend,
)),
genesis_hash,
Expand Down Expand Up @@ -197,12 +232,10 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<MethodResponse> {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
Expand Down Expand Up @@ -252,12 +285,10 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<Option<String>> {
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(None)
},
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
Expand Down Expand Up @@ -306,21 +337,27 @@ where
.transpose()?
.map(ChildInfo::new_default_from_vec);

let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
let block_guard =
match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};

let storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
let operation_id = block_guard.operation_id();

// The number of operations we are allowed to execute.
let num_operations = block_guard.num_reserved();
let discarded = items.len().saturating_sub(num_operations);
skunert marked this conversation as resolved.
Show resolved Hide resolved
let mut items = items;
items.truncate(num_operations);

let fut = async move {
storage_client.generate_events(block_guard, hash, items, child_trie);
};
Expand All @@ -329,7 +366,7 @@ where
.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id,
discarded_items: Some(0),
discarded_items: Some(discarded),
}))
}

Expand All @@ -342,9 +379,10 @@ where
) -> RpcResult<MethodResponse> {
let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);

let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
Expand Down
2 changes: 1 addition & 1 deletion client/rpc-spec-v2/src/chain_head/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod chain_head_storage;
mod subscription;

pub use api::ChainHeadApiServer;
pub use chain_head::ChainHead;
pub use chain_head::{ChainHead, ChainHeadConfig};
pub use event::{
BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
Expand Down
8 changes: 4 additions & 4 deletions client/rpc-spec-v2/src/chain_head/subscription/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use sp_blockchain::Error;
/// Subscription management error.
#[derive(Debug, thiserror::Error)]
pub enum SubscriptionManagementError {
/// The block cannot be pinned into memory because
/// the subscription has exceeded the maximum number
/// of blocks pinned.
#[error("Exceeded pinning limits")]
/// The subscription has exceeded the internal limits
/// regarding the number of pinned blocks in memory or
/// the number of ongoing operations.
#[error("Exceeded pinning or operation limits")]
ExceededLimits,
/// Error originated from the blockchain (client or backend).
#[error("Blockchain error {0}")]
Expand Down
Loading