Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable parallel key scraping #1985

Merged
merged 21 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cumulus/pallets/xcmp-queue/src/bridging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ impl<SiblingBridgeHubParaId: Get<ParaId>, Runtime: crate::Config>
let sibling_bridge_hub_id: ParaId = SiblingBridgeHubParaId::get();

// let's find the channel's state with the sibling parachain,
let Some((outbound_state, queued_pages)) = pallet::Pallet::<Runtime>::outbound_channel_state(sibling_bridge_hub_id) else {
let Some((outbound_state, queued_pages)) =
pallet::Pallet::<Runtime>::outbound_channel_state(sibling_bridge_hub_id)
else {
return false
};
// suspended channel => it is congested
Expand Down
22 changes: 14 additions & 8 deletions cumulus/pallets/xcmp-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,11 @@ fn verify_fee_factor_increase_and_decrease() {
assert_eq!(DeliveryFeeFactor::<Test>::get(sibling_para_id), initial);

// Sending the message right now is cheap
let (_, delivery_fees) = validate_send::<XcmpQueue>(destination, xcm.clone())
.expect("message can be sent; qed");
let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else { unreachable!("asset is fungible; qed"); };
let (_, delivery_fees) =
validate_send::<XcmpQueue>(destination, xcm.clone()).expect("message can be sent; qed");
let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else {
unreachable!("asset is fungible; qed");
};
assert_eq!(delivery_fee_amount, 402_000_000);

let smaller_xcm = Xcm(vec![ClearOrigin; 30]);
Expand All @@ -422,19 +424,23 @@ fn verify_fee_factor_increase_and_decrease() {
assert_ok!(send_xcm::<XcmpQueue>(destination, xcm.clone())); // Size 520
assert_eq!(DeliveryFeeFactor::<Test>::get(sibling_para_id), FixedU128::from_float(1.05));

for _ in 0..12 { // We finish at size 929
for _ in 0..12 {
// We finish at size 929
assert_ok!(send_xcm::<XcmpQueue>(destination, smaller_xcm.clone()));
}
assert!(DeliveryFeeFactor::<Test>::get(sibling_para_id) > FixedU128::from_float(1.88));

// Sending the message right now is expensive
let (_, delivery_fees) = validate_send::<XcmpQueue>(destination, xcm.clone())
.expect("message can be sent; qed");
let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else { unreachable!("asset is fungible; qed"); };
let (_, delivery_fees) =
validate_send::<XcmpQueue>(destination, xcm.clone()).expect("message can be sent; qed");
let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else {
unreachable!("asset is fungible; qed");
};
assert_eq!(delivery_fee_amount, 758_030_955);

// Fee factor only decreases in `take_outbound_messages`
for _ in 0..5 { // We take 5 100 byte pages
for _ in 0..5 {
// We take 5 100 byte pages
XcmpQueue::take_outbound_messages(1);
}
assert!(DeliveryFeeFactor::<Test>::get(sibling_para_id) < FixedU128::from_float(1.72));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ pub(crate) fn weight_witness_warning(
if dev_mode {
return
}
let CallWeightDef::Immediate(w) = &method.weight else {
return
};
let CallWeightDef::Immediate(w) = &method.weight else { return };

let partial_warning = Warning::new_deprecated("UncheckedWeightWitness")
.old("not check weight witness data")
Expand Down Expand Up @@ -66,9 +64,7 @@ pub(crate) fn weight_constant_warning(
if dev_mode {
return
}
let syn::Expr::Lit(lit) = weight else {
return
};
let syn::Expr::Lit(lit) = weight else { return };

let warning = Warning::new_deprecated("ConstantWeight")
.index(warnings.len())
Expand Down
126 changes: 123 additions & 3 deletions substrate/utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,37 @@ where
Ok(keys)
}

/// Get keys at `prefix` in `block` in parallel manner.
async fn rpc_get_keys_parallel(
&self,
prefix: StorageKey,
block: B::Hash,
parallel: u16,
) -> Result<Vec<StorageKey>, &'static str> {
let prefixes = extend_prefix(&prefix, parallel);
let batch = prefixes.into_iter().map(|prefix| self.rpc_get_keys_paged(prefix, block));

let keys = futures::future::join_all(batch)
.await
.into_iter()
.filter_map(|res| match res {
Ok(keys) => Some(keys),
Err(err) => {
log::warn!(
target: LOG_TARGET,
"{} when fetching keys at block {:?}",
err,
block,
);
None
},
})
.flatten()
.collect::<Vec<StorageKey>>();

Ok(keys)
}

/// Fetches storage data from a node using a dynamic batch size.
///
/// This function adjusts the batch size on the fly to help prevent overwhelming the node with
Expand Down Expand Up @@ -590,16 +621,17 @@ where
/// map them to values one by one.
///
/// This can work with public nodes. But, expect it to be darn slow.
pub(crate) async fn rpc_get_pairs_paged(
pub(crate) async fn rpc_get_pairs(
&self,
prefix: StorageKey,
at: B::Hash,
pending_ext: &mut TestExternalities<HashingFor<B>>,
parallel: u16,
) -> Result<Vec<KeyValue>, &'static str> {
let start = Instant::now();
let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into());
let keys = self
.rpc_get_keys_paged(prefix.clone(), at)
.rpc_get_keys_parallel(prefix.clone(), at, parallel)
.await?
.into_iter()
.collect::<Vec<_>>();
Expand Down Expand Up @@ -759,6 +791,64 @@ where
}
}

// Create a batch of storage key prefixes each starting with `prefix`, meant to be used for key
// scraping. Given the prefix 00, the return can be 000-00F or 0000-00FF, depending on `size`.
// `size` will be rounded to power of 16 if not already, so is the returned batch size.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Create a batch of storage key prefixes each starting with `prefix`, meant to be used for key
// scraping. Given the prefix 00, the return can be 000-00F or 0000-00FF, depending on `size`.
// `size` will be rounded to power of 16 if not already, so is the returned batch size.
/// Create a batch of storage key prefixes each starting with `prefix`, meant to be used for key
/// scraping. Given the prefix 00, the return can be 000-00F or 0000-00FF, depending on `size`.
/// `size` will be rounded to power of 16 if not already, so is the returned batch size.

fn extend_prefix(prefix: &StorageKey, size: u16) -> Vec<StorageKey> {
const MAX_EXT_LEN: usize = 3;
const MAX_BATCH_SIZE: u16 = 16u16.pow(MAX_EXT_LEN as u32);
const POW_OF_SIXTEEN: [u16; MAX_EXT_LEN] = [1, 16, 256];

// round to power of 16
// up to MAX_BATCH_SIZE
fn round(n: u16) -> (u16, usize) {
if n <= 1 {
return (1, 0)
} else if n <= 16 {
return (16, 1)
}

let mut pow: u16 = 16;
let mut exp: usize = 1;

while pow < n {
if pow == MAX_BATCH_SIZE {
break
}

pow = pow.saturating_mul(16);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What stops pow becoming > MAX_BATCH_SIZE here?

exp += 1;
}

debug_assert!(pow <= MAX_BATCH_SIZE);
debug_assert!(exp <= MAX_EXT_LEN);

// round down if below threshold
if n * 4 <= pow {
(pow / 16, exp - 1)
} else {
(pow, exp)
}
}

let (size, len) = round(size);
let mut ext = vec![0; len];

(0..size)
.map(|idx| {
// 0-f | 00-ff | 000-fff
// relatively static, use OnceCell if turned out to be hot
for i in 0..len {
ext[len - i - 1] = (idx / POW_OF_SIXTEEN[i] % 16) as u8;
}

let mut prefix = prefix.as_ref().to_vec();
prefix.extend(&ext);
StorageKey(prefix)
})
.collect()
}

impl<B: BlockT + DeserializeOwned> Builder<B>
where
B::Hash: DeserializeOwned,
Expand Down Expand Up @@ -846,7 +936,7 @@ where
for prefix in &config.hashed_prefixes {
let now = std::time::Instant::now();
let additional_key_values =
self.rpc_get_pairs_paged(StorageKey(prefix.to_vec()), at, pending_ext).await?;
self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext, 16).await?;
let elapsed = now.elapsed();
log::info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1440,4 +1530,34 @@ mod remote_tests {
.unwrap()
.execute_with(|| {});
}

#[test]
fn prefixes_for_scraping_keys() {
let prefix = StorageKey(vec![0, 0]);

assert_eq!(extend_prefix(&prefix, 0), vec![StorageKey(vec![0, 0])]);
assert_eq!(extend_prefix(&prefix, 1), vec![StorageKey(vec![0, 0])]);
assert_eq!(extend_prefix(&prefix, 16), (0..16).map(|i| StorageKey(vec![0, 0, i])).collect::<Vec<_>>());

let prefixes = extend_prefix(&prefix, 256);
assert_eq!(prefixes, (0..256u32).map(|i| StorageKey(vec![0, 0, (i / 16 % 16) as u8, (i % 16) as u8])).collect::<Vec<_>>());
assert_eq!(prefixes[0], StorageKey(vec![0, 0, 0, 0]));
assert_eq!(prefixes[1], StorageKey(vec![0, 0, 0, 1]));
assert_eq!(prefixes[15], StorageKey(vec![0, 0, 0, 15]));
assert_eq!(prefixes[16], StorageKey(vec![0, 0, 1, 0]));
assert_eq!(prefixes[254], StorageKey(vec![0, 0, 15, 14]));
assert_eq!(prefixes[255], StorageKey(vec![0, 0, 15, 15]));

let prefixes = extend_prefix(&prefix, 4096);
assert_eq!(prefixes, (0..4096u32).map(|i| StorageKey(vec![0, 0, (i / 256 % 16) as u8, (i / 16 % 16) as u8, (i % 16) as u8])).collect::<Vec<_>>());
assert_eq!(prefixes[0], StorageKey(vec![0, 0, 0, 0, 0]));
assert_eq!(prefixes[1], StorageKey(vec![0, 0, 0, 0, 1]));
assert_eq!(prefixes[4094], StorageKey(vec![0, 0, 15, 15, 14]));
assert_eq!(prefixes[4095], StorageKey(vec![0, 0, 15, 15, 15]));

// rounding
assert_eq!(extend_prefix(&prefix, 2), extend_prefix(&prefix, 16));
assert_eq!(extend_prefix(&prefix, 65), extend_prefix(&prefix, 256));
assert_eq!(extend_prefix(&prefix, 1025), extend_prefix(&prefix, 4096));
}
}
Loading