Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Support overweight messages in XCMP queue #799

Merged
merged 8 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 122 additions & 18 deletions pallets/xcmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use cumulus_primitives_core::{
relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
};
use frame_support::weights::Weight;
use frame_support::weights::{constants::WEIGHT_PER_MILLIS, Weight};
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
ChaChaRng,
Expand All @@ -48,6 +48,9 @@ use xcm::{latest::prelude::*, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};

pub use pallet::*;

/// Index used to identify overweight XCMs.
pub type OverweightIndex = u64;

#[frame_support::pallet]
pub mod pallet {
use super::*;
Expand All @@ -70,18 +73,9 @@ pub mod pallet {

/// Means of converting an `Xcm` into a `VersionedXcm`.
type VersionWrapper: WrapVersion;
}

impl Default for QueueConfigData {
fn default() -> Self {
Self {
suspend_threshold: 2,
drop_threshold: 5,
resume_threshold: 1,
threshold_weight: 100_000,
weight_restrict_decay: 2,
}
}
/// The origin that is allowed to execute overweight messages.
type ExecuteOverweightOrigin: EnsureOrigin<Self::Origin>;
}

#[pallet::hooks]
Expand All @@ -93,7 +87,40 @@ pub mod pallet {
}

#[pallet::call]
impl<T: Config> Pallet<T> {}
impl<T: Config> Pallet<T> {
/// Services a single overweight XCM.
///
/// - `origin`: Must pass `ExecuteOverweightOrigin`.
/// - `index`: The index of the overweight XCM to service
/// - `weight_limit`: The amount of weight that XCM execution may take.
///
/// Errors:
/// - `BadOverweightIndex`: XCM under `index` is not found in the `Overweight` storage map.
/// - `BadXcm`: XCM under `index` cannot be properly decoded into a valid XCM format.
/// - `WeightOverLimit`: XCM execution may use greater `weight_limit`.
///
/// Events:
/// - `OverweightServiced`: On success.
#[pallet::weight(weight_limit.saturating_add(1_000_000))]
pub fn service_overweight(
origin: OriginFor<T>,
index: OverweightIndex,
weight_limit: Weight,
) -> DispatchResultWithPostInfo {
T::ExecuteOverweightOrigin::ensure_origin(origin)?;

let (sender, sent_at, data) =
Overweight::<T>::get(index).ok_or(Error::<T>::BadOverweightIndex)?;
let xcm =
VersionedXcm::<T::Call>::decode_all_with_depth_limit(MAX_XCM_DECODE_DEPTH, &data)
.map_err(|_| Error::<T>::BadXcm)?;
let used = Self::handle_xcm_message(sender, sent_at, xcm, weight_limit)
.map_err(|_| Error::<T>::WeightOverLimit)?;
Overweight::<T>::remove(index);
Self::deposit_event(Event::OverweightServiced(index, used));
Ok(Some(used.saturating_add(1_000_000)).into())
}
}

#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
Expand All @@ -110,6 +137,10 @@ pub mod pallet {
UpwardMessageSent(Option<T::Hash>),
/// An HRMP message was sent to a sibling parachain.
XcmpMessageSent(Option<T::Hash>),
/// An XCM exceeded the individual message weight budget.
OverweightEnqueued(ParaId, RelayBlockNumber, OverweightIndex, Weight),
/// An XCM from the overweight queue was executed with the given actual weight used.
OverweightServiced(OverweightIndex, Weight),
}

#[pallet::error]
Expand All @@ -120,6 +151,10 @@ pub mod pallet {
BadXcmOrigin,
/// Bad XCM data.
BadXcm,
/// Bad overweight index.
BadOverweightIndex,
/// Provided weight is possibly not enough to execute the message.
WeightOverLimit,
}

/// Status of the inbound XCMP channels.
Expand Down Expand Up @@ -166,6 +201,19 @@ pub mod pallet {
/// The configuration which controls the dynamics of the outbound queue.
#[pallet::storage]
pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;

/// The messages that exceeded max individual message weight budget.
///
/// These message stay in this storage map until they are manually dispatched via
/// `service_overweight`.
#[pallet::storage]
pub(super) type Overweight<T: Config> =
StorageMap<_, Twox64Concat, OverweightIndex, (ParaId, RelayBlockNumber, Vec<u8>)>;

/// The number of overweight messages ever recorded in `Overweight`. Also doubles as the next
/// available free overweight index.
#[pallet::storage]
pub(super) type OverweightCount<T: Config> = StorageValue<_, OverweightIndex, ValueQuery>;
}

#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, RuntimeDebug, TypeInfo)]
Expand Down Expand Up @@ -196,6 +244,22 @@ pub struct QueueConfigData {
/// The speed to which the available weight approaches the maximum weight. A lower number
/// results in a faster progression. A value of 1 makes the entire weight available initially.
weight_restrict_decay: Weight,
/// The maximum amount of weight any individual message may consume. Messages above this weight
/// go into the overweight queue and may only be serviced explicitly.
xcmp_max_individual_weight: Weight,
KiChjang marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for QueueConfigData {
fn default() -> Self {
Self {
suspend_threshold: 2,
drop_threshold: 5,
resume_threshold: 1,
threshold_weight: 100_000,
weight_restrict_decay: 2,
xcmp_max_individual_weight: 20 * WEIGHT_PER_MILLIS,
}
}
}

#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
Expand Down Expand Up @@ -339,7 +403,7 @@ impl<T: Config> Pallet<T> {

fn handle_xcm_message(
sender: ParaId,
_sent_at: RelayBlockNumber,
_sent_at: RelayBlockNumber, // Review Q: why is this arg still here?
KiChjang marked this conversation as resolved.
Show resolved Hide resolved
xcm: VersionedXcm<T::Call>,
max_weight: Weight,
) -> Result<Weight, XcmError> {
Expand All @@ -366,6 +430,7 @@ impl<T: Config> Pallet<T> {
sender: ParaId,
(sent_at, format): (RelayBlockNumber, XcmpMessageFormat),
max_weight: Weight,
max_individual_weight: Weight,
) -> (Weight, bool) {
let data = <InboundXcmpMessages<T>>::get(sender, sent_at);
let mut last_remaining_fragments;
Expand All @@ -390,6 +455,20 @@ impl<T: Config> Pallet<T> {
remaining_fragments = last_remaining_fragments;
break
},
Err(XcmError::WeightLimitReached(required))
if required > max_individual_weight =>
KiChjang marked this conversation as resolved.
Show resolved Hide resolved
{
// overweight - add to overweight queue and continue with message
// execution consuming the message.
let msg_len = last_remaining_fragments
.len()
.saturating_sub(remaining_fragments.len());
let overweight_xcm = last_remaining_fragments[..msg_len].to_vec();
let index = Self::stash_overweight(sender, sent_at, overweight_xcm);
Self::deposit_event(Event::OverweightEnqueued(
sender, sent_at, index, required,
));
gavofyork marked this conversation as resolved.
Show resolved Hide resolved
},
Err(_) => {
// Message looks invalid; don't attempt to retry
},
Expand Down Expand Up @@ -440,6 +519,22 @@ impl<T: Config> Pallet<T> {
(weight_used, is_empty)
}

/// Puts a given XCM into the list of overweight messages, allowing it to be executed later.
fn stash_overweight(
sender: ParaId,
sent_at: RelayBlockNumber,
xcm: Vec<u8>,
) -> OverweightIndex {
let index = <Self as Store>::OverweightCount::mutate(|count| {
let index = *count;
*count += 1;
index
});

<Self as Store>::Overweight::insert(index, (sender, sent_at, xcm));
index
}

/// Service the incoming XCMP message queue attempting to execute up to `max_weight` execution
/// weight of messages.
///
Expand Down Expand Up @@ -473,8 +568,13 @@ impl<T: Config> Pallet<T> {
return 0
}

let QueueConfigData { resume_threshold, threshold_weight, weight_restrict_decay, .. } =
<QueueConfig<T>>::get();
let QueueConfigData {
resume_threshold,
threshold_weight,
weight_restrict_decay,
xcmp_max_individual_weight,
..
} = <QueueConfig<T>>::get();

let mut shuffled = Self::create_shuffle(status.len());
let mut weight_used = 0;
Expand Down Expand Up @@ -516,8 +616,12 @@ impl<T: Config> Pallet<T> {
} else {
// Process up to one block's worth for now.
let weight_remaining = weight_available.saturating_sub(weight_used);
let (weight_processed, is_empty) =
Self::process_xcmp_message(sender, status[index].2[0], weight_remaining);
let (weight_processed, is_empty) = Self::process_xcmp_message(
sender,
status[index].2[0],
weight_remaining,
xcmp_max_individual_weight,
);
if is_empty {
status[index].2.remove(0);
}
Expand Down
2 changes: 2 additions & 0 deletions pallets/xcmp-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use super::*;
use crate as xcmp_queue;
use frame_support::parameter_types;
use frame_system::EnsureRoot;
use sp_core::H256;
use sp_runtime::{
testing::Header,
Expand Down Expand Up @@ -157,6 +158,7 @@ impl Config for Test {
type XcmExecutor = xcm_executor::XcmExecutor<XcmConfig>;
type ChannelInfo = ParachainSystem;
type VersionWrapper = ();
type ExecuteOverweightOrigin = EnsureRoot<AccountId>;
}

pub fn new_test_ext() -> sp_io::TestExternalities {
Expand Down
4 changes: 2 additions & 2 deletions pallets/xcmp-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn bad_message_is_handled() {
InboundXcmpMessages::<Test>::insert(ParaId::from(1000), 1, bad_data);
let format = XcmpMessageFormat::ConcatenatedEncodedBlob;
// This should exit with an error.
XcmpQueue::process_xcmp_message(1000.into(), (1, format), 10_000_000_000);
XcmpQueue::process_xcmp_message(1000.into(), (1, format), 10_000_000_000, 10_000_000_000);
});
}

Expand All @@ -60,6 +60,6 @@ fn other_bad_message_is_handled() {
InboundXcmpMessages::<Test>::insert(ParaId::from(1000), 1, bad_data);
let format = XcmpMessageFormat::ConcatenatedEncodedBlob;
// This should exit with an error.
XcmpQueue::process_xcmp_message(1000.into(), (1, format), 10_000_000_000);
XcmpQueue::process_xcmp_message(1000.into(), (1, format), 10_000_000_000, 10_000_000_000);
});
}
1 change: 1 addition & 0 deletions parachain-template/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ impl cumulus_pallet_xcmp_queue::Config for Runtime {
type XcmExecutor = XcmExecutor<XcmConfig>;
type ChannelInfo = ParachainSystem;
type VersionWrapper = ();
type ExecuteOverweightOrigin = EnsureRoot<AccountId>;
}

impl cumulus_pallet_dmp_queue::Config for Runtime {
Expand Down
1 change: 1 addition & 0 deletions polkadot-parachains/rococo-parachain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ impl cumulus_pallet_xcmp_queue::Config for Runtime {
type XcmExecutor = XcmExecutor<XcmConfig>;
type ChannelInfo = ParachainSystem;
type VersionWrapper = ();
type ExecuteOverweightOrigin = EnsureRoot<AccountId>;
}

impl cumulus_pallet_dmp_queue::Config for Runtime {
Expand Down
1 change: 1 addition & 0 deletions polkadot-parachains/statemine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ impl cumulus_pallet_xcmp_queue::Config for Runtime {
type XcmExecutor = XcmExecutor<XcmConfig>;
type ChannelInfo = ParachainSystem;
type VersionWrapper = PolkadotXcm;
type ExecuteOverweightOrigin = EnsureRoot<AccountId>;
}

impl cumulus_pallet_dmp_queue::Config for Runtime {
Expand Down
1 change: 1 addition & 0 deletions polkadot-parachains/statemint/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ impl cumulus_pallet_xcmp_queue::Config for Runtime {
type XcmExecutor = XcmExecutor<XcmConfig>;
type ChannelInfo = ParachainSystem;
type VersionWrapper = PolkadotXcm;
type ExecuteOverweightOrigin = EnsureRoot<AccountId>;
}

impl cumulus_pallet_dmp_queue::Config for Runtime {
Expand Down
1 change: 1 addition & 0 deletions polkadot-parachains/westmint/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ impl cumulus_pallet_xcmp_queue::Config for Runtime {
type XcmExecutor = XcmExecutor<XcmConfig>;
type ChannelInfo = ParachainSystem;
type VersionWrapper = PolkadotXcm;
type ExecuteOverweightOrigin = EnsureRoot<AccountId>;
}

impl cumulus_pallet_dmp_queue::Config for Runtime {
Expand Down