Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

paras-scheduler: Fix migration to V1 #1969

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion polkadot/runtime/parachains/src/assigner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ impl<T: Config> Pallet<T> {
fn is_bulk_core(core_idx: &CoreIndex) -> bool {
let parachain_cores =
<ParachainAssigner<T> as AssignmentProvider<BlockNumberFor<T>>>::session_core_count();
(0..parachain_cores).contains(&core_idx.0)

core_idx.0 < parachain_cores
}
}

Expand Down
4 changes: 2 additions & 2 deletions polkadot/runtime/parachains/src/assigner_parachains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod pallet {

impl<T: Config> AssignmentProvider<BlockNumberFor<T>> for Pallet<T> {
fn session_core_count() -> u32 {
<paras::Pallet<T>>::parachains().len() as u32
paras::Parachains::<T>::decode_len().unwrap_or(0) as u32
}

fn pop_assignment_for_core(
Expand All @@ -62,7 +62,7 @@ impl<T: Config> AssignmentProvider<BlockNumberFor<T>> for Pallet<T> {
max_availability_timeouts: 0,
// The next assignment already goes to the same [`ParaId`], this can be any number
// that's high enough to clear the time it takes to clear backing/availability.
ttl: BlockNumberFor::<T>::from(10u32),
ttl: 10u32.into(),
}
}
}
16 changes: 6 additions & 10 deletions polkadot/runtime/parachains/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,14 +605,10 @@ impl<T: Config> Pallet<T> {
/// Moves all elements in the claimqueue forward.
fn move_claimqueue_forward() {
let mut cq = ClaimQueue::<T>::get();
for (_, core_queue) in cq.iter_mut() {
for core_queue in cq.values_mut() {
// First pop the finished claims from the front.
match core_queue.front() {
None => {},
Some(None) => {
core_queue.pop_front();
},
Some(_) => {},
if let Some(None) = core_queue.front() {
core_queue.pop_front();
}
}

Expand All @@ -628,9 +624,10 @@ impl<T: Config> Pallet<T> {

// This can only happen on new sessions at which we move all assignments back to the
// provider. Hence, there's nothing we need to do here.
if ValidatorGroups::<T>::get().is_empty() {
if ValidatorGroups::<T>::decode_len().map_or(true, |l| l == 0) {
return
}

let n_lookahead = Self::claimqueue_lookahead();
let n_session_cores = T::AssignmentProvider::session_core_count();
let cq = ClaimQueue::<T>::get();
Expand Down Expand Up @@ -686,8 +683,7 @@ impl<T: Config> Pallet<T> {

fn add_to_claimqueue(core_idx: CoreIndex, pe: ParasEntry<BlockNumberFor<T>>) {
ClaimQueue::<T>::mutate(|la| {
let la_deque = la.entry(core_idx).or_insert_with(|| VecDeque::new());
la_deque.push_back(Some(pe));
la.entry(core_idx).or_default().push_back(Some(pe));
});
}

Expand Down
111 changes: 74 additions & 37 deletions polkadot/runtime/parachains/src/scheduler/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,45 @@ use frame_support::{
mod v0 {
use super::*;

use primitives::CollatorId;
use primitives::{CollatorId, Id};

#[storage_alias]
pub(super) type Scheduled<T: Config> = StorageValue<Pallet<T>, Vec<CoreAssignment>, ValueQuery>;

#[derive(Encode, Decode)]
pub struct QueuedParathread {
claim: primitives::ParathreadEntry,
core_offset: u32,
}
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub struct ParathreadClaim(pub Id, pub CollatorId);

#[derive(Encode, Decode, Default)]
pub struct ParathreadClaimQueue {
queue: Vec<QueuedParathread>,
next_core_offset: u32,
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub struct ParathreadEntry {
/// The claim.
pub claim: ParathreadClaim,
/// Number of retries.
pub retries: u32,
}

// Only here to facilitate the migration.
impl ParathreadClaimQueue {
pub fn len(self) -> usize {
self.queue.len()
}
/// What is occupying a specific availability core.
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub enum CoreOccupied {
/// A parathread.
Parathread(ParathreadEntry),
/// A parachain.
Parachain,
}

/// The actual type isn't important, as we only delete the key in the state.
#[storage_alias]
pub(super) type ParathreadQueue<T: Config> =
StorageValue<Pallet<T>, ParathreadClaimQueue, ValueQuery>;
pub(crate) type AvailabilityCores<T: Config> =
StorageValue<Pallet<T>, Vec<Option<CoreOccupied>>, ValueQuery>;

/// The actual type isn't important, as we only delete the key in the state.
#[storage_alias]
pub(super) type ParathreadClaimIndex<T: Config> =
StorageValue<Pallet<T>, Vec<ParaId>, ValueQuery>;
pub(super) type ParathreadQueue<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;

#[storage_alias]
pub(super) type ParathreadClaimIndex<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;

/// The assignment type.
#[derive(Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
Expand Down Expand Up @@ -108,30 +117,36 @@ pub mod v1 {

#[cfg(feature = "try-runtime")]
fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
log::trace!(
let n: u32 = v0::Scheduled::<T>::get().len() as u32 +
v0::AvailabilityCores::<T>::get().iter().filter(|c| c.is_some()).count() as u32;

log::info!(
target: crate::scheduler::LOG_TARGET,
"Scheduled before migration: {}",
v0::Scheduled::<T>::get().len()
"Number of scheduled and waiting for availability before: {n}",
);

let bytes = u32::to_be_bytes(v0::Scheduled::<T>::get().len() as u32);

Ok(bytes.to_vec())
Ok(n.encode())
}

#[cfg(feature = "try-runtime")]
fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
log::trace!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
log::info!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");

ensure!(
v0::Scheduled::<T>::get().len() == 0,
v0::Scheduled::<T>::get().is_empty(),
"Scheduled should be empty after the migration"
);

let sched_len = u32::from_be_bytes(state.try_into().unwrap());
let expected_len = u32::decode(&mut &state[..]).unwrap();
let availability_cores_waiting = super::AvailabilityCores::<T>::get()
.iter()
.filter(|c| !matches!(c, CoreOccupied::Free))
.count();

ensure!(
Pallet::<T>::claimqueue_len() as u32 == sched_len,
"Scheduled completely moved to ClaimQueue after migration"
Pallet::<T>::claimqueue_len() as u32 + availability_cores_waiting as u32 ==
expected_len,
"ClaimQueue and AvailabilityCores should have the correct length",
);

Ok(())
Expand All @@ -142,11 +157,8 @@ pub mod v1 {
pub fn migrate_to_v1<T: crate::scheduler::Config>() -> Weight {
let mut weight: Weight = Weight::zero();

let pq = v0::ParathreadQueue::<T>::take();
let pq_len = pq.len() as u64;

let pci = v0::ParathreadClaimIndex::<T>::take();
let pci_len = pci.len() as u64;
v0::ParathreadQueue::<T>::kill();
v0::ParathreadClaimIndex::<T>::kill();

let now = <frame_system::Pallet<T>>::block_number();
let scheduled = v0::Scheduled::<T>::take();
Expand All @@ -158,10 +170,35 @@ pub fn migrate_to_v1<T: crate::scheduler::Config>() -> Weight {
Pallet::<T>::add_to_claimqueue(core_idx, pe);
}

let parachains = paras::Pallet::<T>::parachains();
let availability_cores = v0::AvailabilityCores::<T>::take();
let mut new_availability_cores = Vec::new();

for (core_index, core) in availability_cores.into_iter().enumerate() {
let new_core = if let Some(core) = core {
match core {
v0::CoreOccupied::Parachain => CoreOccupied::Paras(ParasEntry::new(
Assignment::new(parachains[core_index]),
now,
)),
v0::CoreOccupied::Parathread(entry) =>
CoreOccupied::Paras(ParasEntry::new(Assignment::new(entry.claim.0), now)),
}
} else {
CoreOccupied::Free
};

new_availability_cores.push(new_core);
}

super::AvailabilityCores::<T>::set(new_availability_cores);

// 2x as once for Scheduled and once for Claimqueue
weight = weight.saturating_add(T::DbWeight::get().reads_writes(2 * sched_len, 2 * sched_len));
weight = weight.saturating_add(T::DbWeight::get().reads_writes(pq_len, pq_len));
weight = weight.saturating_add(T::DbWeight::get().reads_writes(pci_len, pci_len));
// reading parachains + availability_cores, writing AvailabilityCores
weight = weight.saturating_add(T::DbWeight::get().reads_writes(2, 1));
// 2x kill
weight = weight.saturating_add(T::DbWeight::get().writes(2));

weight
}