Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Support overweight messages in XCMP queue #799

Merged
merged 8 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 124 additions & 15 deletions pallets/xcmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

#![cfg_attr(not(feature = "std"), no_std)]

pub mod migration;

#[cfg(test)]
mod mock;

Expand All @@ -36,7 +38,7 @@ use cumulus_primitives_core::{
relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
};
use frame_support::weights::Weight;
use frame_support::weights::{constants::WEIGHT_PER_MILLIS, Weight};
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
ChaChaRng,
Expand All @@ -48,6 +50,11 @@ use xcm::{latest::prelude::*, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};

pub use pallet::*;

/// Index used to identify overweight XCMs.
pub type OverweightIndex = u64;

const LOG_TARGET: &str = "xcmp_queue";

#[frame_support::pallet]
pub mod pallet {
use super::*;
Expand All @@ -56,6 +63,7 @@ pub mod pallet {

#[pallet::pallet]
#[pallet::generate_store(pub(super) trait Store)]
#[pallet::storage_version(migration::STORAGE_VERSION)]
pub struct Pallet<T>(_);

#[pallet::config]
Expand All @@ -70,30 +78,58 @@ pub mod pallet {

/// Means of converting an `Xcm` into a `VersionedXcm`.
type VersionWrapper: WrapVersion;
}

impl Default for QueueConfigData {
fn default() -> Self {
Self {
suspend_threshold: 2,
drop_threshold: 5,
resume_threshold: 1,
threshold_weight: 100_000,
weight_restrict_decay: 2,
}
}
/// The origin that is allowed to execute overweight messages.
type ExecuteOverweightOrigin: EnsureOrigin<Self::Origin>;
}

#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn on_runtime_upgrade() -> Weight {
migration::migrate_to_latest::<T>()
}

fn on_idle(_now: T::BlockNumber, max_weight: Weight) -> Weight {
// on_idle processes additional messages with any remaining block weight.
Self::service_xcmp_queue(max_weight)
}
}

#[pallet::call]
impl<T: Config> Pallet<T> {}
impl<T: Config> Pallet<T> {
/// Services a single overweight XCM.
///
/// - `origin`: Must pass `ExecuteOverweightOrigin`.
/// - `index`: The index of the overweight XCM to service
/// - `weight_limit`: The amount of weight that XCM execution may take.
///
/// Errors:
/// - `BadOverweightIndex`: XCM under `index` is not found in the `Overweight` storage map.
/// - `BadXcm`: XCM under `index` cannot be properly decoded into a valid XCM format.
/// - `WeightOverLimit`: XCM execution may use greater `weight_limit`.
///
/// Events:
/// - `OverweightServiced`: On success.
#[pallet::weight(weight_limit.saturating_add(1_000_000))]
pub fn service_overweight(
origin: OriginFor<T>,
index: OverweightIndex,
weight_limit: Weight,
) -> DispatchResultWithPostInfo {
T::ExecuteOverweightOrigin::ensure_origin(origin)?;

let (sender, sent_at, data) =
Overweight::<T>::get(index).ok_or(Error::<T>::BadOverweightIndex)?;
let xcm =
VersionedXcm::<T::Call>::decode_all_with_depth_limit(MAX_XCM_DECODE_DEPTH, &data)
.map_err(|_| Error::<T>::BadXcm)?;
let used = Self::handle_xcm_message(sender, sent_at, xcm, weight_limit)
.map_err(|_| Error::<T>::WeightOverLimit)?;
Overweight::<T>::remove(index);
Self::deposit_event(Event::OverweightServiced(index, used));
Ok(Some(used.saturating_add(1_000_000)).into())
}
}

#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
Expand All @@ -110,6 +146,10 @@ pub mod pallet {
UpwardMessageSent(Option<T::Hash>),
/// An HRMP message was sent to a sibling parachain.
XcmpMessageSent(Option<T::Hash>),
/// An XCM exceeded the individual message weight budget.
OverweightEnqueued(ParaId, RelayBlockNumber, OverweightIndex, Weight),
/// An XCM from the overweight queue was executed with the given actual weight used.
OverweightServiced(OverweightIndex, Weight),
}

#[pallet::error]
Expand All @@ -120,6 +160,10 @@ pub mod pallet {
BadXcmOrigin,
/// Bad XCM data.
BadXcm,
/// Bad overweight index.
BadOverweightIndex,
/// Provided weight is possibly not enough to execute the message.
WeightOverLimit,
}

/// Status of the inbound XCMP channels.
Expand Down Expand Up @@ -163,6 +207,19 @@ pub mod pallet {
/// The configuration which controls the dynamics of the outbound queue.
#[pallet::storage]
pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;

/// The messages that exceeded max individual message weight budget.
///
/// These message stay in this storage map until they are manually dispatched via
/// `service_overweight`.
#[pallet::storage]
pub(super) type Overweight<T: Config> =
StorageMap<_, Twox64Concat, OverweightIndex, (ParaId, RelayBlockNumber, Vec<u8>)>;

/// The number of overweight messages ever recorded in `Overweight`. Also doubles as the next
/// available free overweight index.
#[pallet::storage]
pub(super) type OverweightCount<T: Config> = StorageValue<_, OverweightIndex, ValueQuery>;
}

#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, RuntimeDebug, TypeInfo)]
Expand Down Expand Up @@ -244,6 +301,22 @@ pub struct QueueConfigData {
/// The speed to which the available weight approaches the maximum weight. A lower number
/// results in a faster progression. A value of 1 makes the entire weight available initially.
weight_restrict_decay: Weight,
/// The maximum amount of weight any individual message may consume. Messages above this weight
/// go into the overweight queue and may only be serviced explicitly.
xcmp_max_individual_weight: Weight,
KiChjang marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for QueueConfigData {
fn default() -> Self {
Self {
suspend_threshold: 2,
drop_threshold: 5,
resume_threshold: 1,
threshold_weight: 100_000,
weight_restrict_decay: 2,
xcmp_max_individual_weight: 20 * WEIGHT_PER_MILLIS,
}
}
}

#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
Expand Down Expand Up @@ -414,6 +487,7 @@ impl<T: Config> Pallet<T> {
sender: ParaId,
(sent_at, format): (RelayBlockNumber, XcmpMessageFormat),
max_weight: Weight,
max_individual_weight: Weight,
) -> (Weight, bool) {
let data = <InboundXcmpMessages<T>>::get(sender, sent_at);
let mut last_remaining_fragments;
Expand All @@ -430,6 +504,19 @@ impl<T: Config> Pallet<T> {
let weight = max_weight - weight_used;
match Self::handle_xcm_message(sender, sent_at, xcm, weight) {
Ok(used) => weight_used = weight_used.saturating_add(used),
Err(XcmError::WeightLimitReached(required))
if required > max_individual_weight =>
KiChjang marked this conversation as resolved.
Show resolved Hide resolved
{
// overweight - add to overweight queue and continue with message
// execution consuming the message.
let msg_len = last_remaining_fragments
.len()
.saturating_sub(remaining_fragments.len());
let overweight_xcm = last_remaining_fragments[..msg_len].to_vec();
let index = Self::stash_overweight(sender, sent_at, overweight_xcm);
let e = Event::OverweightEnqueued(sender, sent_at, index, required);
Self::deposit_event(e);
},
Err(XcmError::WeightLimitReached(required))
if required <= max_weight =>
{
Expand Down Expand Up @@ -488,6 +575,22 @@ impl<T: Config> Pallet<T> {
(weight_used, is_empty)
}

/// Puts a given XCM into the list of overweight messages, allowing it to be executed later.
fn stash_overweight(
sender: ParaId,
sent_at: RelayBlockNumber,
xcm: Vec<u8>,
) -> OverweightIndex {
let index = <Self as Store>::OverweightCount::mutate(|count| {
let index = *count;
*count += 1;
index
});

<Self as Store>::Overweight::insert(index, (sender, sent_at, xcm));
index
}

/// Service the incoming XCMP message queue attempting to execute up to `max_weight` execution
/// weight of messages.
///
Expand Down Expand Up @@ -521,8 +624,13 @@ impl<T: Config> Pallet<T> {
return 0
}

let QueueConfigData { resume_threshold, threshold_weight, weight_restrict_decay, .. } =
<QueueConfig<T>>::get();
let QueueConfigData {
resume_threshold,
threshold_weight,
weight_restrict_decay,
xcmp_max_individual_weight,
..
} = <QueueConfig<T>>::get();

let mut shuffled = Self::create_shuffle(status.len());
let mut weight_used = 0;
Expand Down Expand Up @@ -568,6 +676,7 @@ impl<T: Config> Pallet<T> {
sender,
status[index].message_metadata[0],
weight_remaining,
xcmp_max_individual_weight,
);
if is_empty {
status[index].message_metadata.remove(0);
Expand Down
128 changes: 128 additions & 0 deletions pallets/xcmp-queue/src/migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! A module that is responsible for migration of storage.

use crate::{Config, Pallet, Store};
use frame_support::{pallet_prelude::*, traits::StorageVersion, weights::Weight};

/// The current storage version.
pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);

/// Migrates the pallet storage to the most recent version, checking and setting the
/// `StorageVersion`.
pub fn migrate_to_latest<T: Config>() -> Weight {
let mut weight = 0;

if StorageVersion::get::<Pallet<T>>() == 0 {
weight += migrate_to_v1::<T>();
StorageVersion::new(1).put::<Pallet<T>>();
}

weight
}

mod v0 {
use super::*;
use codec::{Decode, Encode};

#[derive(Encode, Decode, Debug)]
pub struct QueueConfigData {
pub suspend_threshold: u32,
pub drop_threshold: u32,
pub resume_threshold: u32,
pub threshold_weight: Weight,
pub weight_restrict_decay: Weight,
}

impl Default for QueueConfigData {
fn default() -> Self {
QueueConfigData {
suspend_threshold: 2,
drop_threshold: 5,
resume_threshold: 1,
threshold_weight: 100_000,
weight_restrict_decay: 2,
}
}
}
}

/// Migrates `QueueConfigData` from v0 (without the `xcmp_max_individual_weight` field) to v1 (with
/// max individual weight).
/// Uses the `Default` implementation of `QueueConfigData` to choose a value for
/// `xcmp_max_individual_weight`.
///
/// NOTE: Only use this function if you know what you're doing. Default to using
/// `migrate_to_latest`.
pub fn migrate_to_v1<T: Config>() -> Weight {
let translate = |pre: v0::QueueConfigData| -> super::QueueConfigData {
super::QueueConfigData {
suspend_threshold: pre.suspend_threshold,
drop_threshold: pre.drop_threshold,
resume_threshold: pre.resume_threshold,
threshold_weight: pre.threshold_weight,
weight_restrict_decay: pre.weight_restrict_decay,
xcmp_max_individual_weight: super::QueueConfigData::default()
.xcmp_max_individual_weight,
}
};
Comment on lines +71 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to parameterize the migration function to take a xcmp_max_individual_weight so that people can migrate to whatever config value they want?
Alternatively we'd want to expose some knob somewhere to update the queue config, no?
Maybe a candidate for a follow-up issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added an issue: #801


if let Err(_) = <Pallet<T> as Store>::QueueConfig::translate(|pre| pre.map(translate)) {
log::error!(
target: super::LOG_TARGET,
"unexpected error when performing translation of the QueueConfig type during storage upgrade to v1"
);
}

T::DbWeight::get().reads_writes(1, 1)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::mock::{new_test_ext, Test};

#[test]
fn test_migration_to_v1() {
let v0 = v0::QueueConfigData {
suspend_threshold: 5,
drop_threshold: 12,
resume_threshold: 3,
threshold_weight: 333_333,
weight_restrict_decay: 1,
};

new_test_ext().execute_with(|| {
// Put the v0 version in the state
frame_support::storage::unhashed::put_raw(
&crate::QueueConfig::<Test>::hashed_key(),
&v0.encode(),
);

migrate_to_v1::<Test>();

let v1 = crate::QueueConfig::<Test>::get();

assert_eq!(v0.suspend_threshold, v1.suspend_threshold);
assert_eq!(v0.drop_threshold, v1.drop_threshold);
assert_eq!(v0.resume_threshold, v1.resume_threshold);
assert_eq!(v0.threshold_weight, v1.threshold_weight);
assert_eq!(v0.weight_restrict_decay, v1.weight_restrict_decay);
assert_eq!(v1.xcmp_max_individual_weight, 20_000_000_000);
});
}
}
2 changes: 2 additions & 0 deletions pallets/xcmp-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use super::*;
use crate as xcmp_queue;
use frame_support::parameter_types;
use frame_system::EnsureRoot;
use sp_core::H256;
use sp_runtime::{
testing::Header,
Expand Down Expand Up @@ -157,6 +158,7 @@ impl Config for Test {
type XcmExecutor = xcm_executor::XcmExecutor<XcmConfig>;
type ChannelInfo = ParachainSystem;
type VersionWrapper = ();
type ExecuteOverweightOrigin = EnsureRoot<AccountId>;
}

pub fn new_test_ext() -> sp_io::TestExternalities {
Expand Down
Loading