Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
refactor: only load one solution at a time
Browse files Browse the repository at this point in the history
This increases the database read load, because we read one solution
at a time. On the other hand, it substantially decreases the overall
memory load, because we _only_ read one solution at a time instead
of reading all of them.
  • Loading branch information
coriolinus committed Jun 8, 2021
1 parent 4ec3a3a commit 3d7dc0b
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 71 deletions.
49 changes: 36 additions & 13 deletions frame/election-provider-multi-phase/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ pub mod unsigned;
pub mod weights;

pub use signed::{
SignedSubmission, BalanceOf, NegativeImbalanceOf, PositiveImbalanceOf, SignedSubmissionsOf,
BalanceOf, NegativeImbalanceOf, PositiveImbalanceOf, SignedSubmission, SignedSubmissionOf,
SignedSubmissions, SubmissionIndicesOf,
};
pub use weights::WeightInfo;

Expand Down Expand Up @@ -884,18 +885,16 @@ pub mod pallet {

// ensure solution claims is better.
let mut signed_submissions = Self::signed_submissions();
let ejected_a_solution = signed_submissions.len()
== T::SignedMaxSubmissions::get().saturated_into::<usize>();

let deposit_amount =
Self::insert_submission(&who, &mut signed_submissions, solution, size)?;
let (maybe_deposit, ejected_a_solution) =
Self::insert_submission(&who, &mut signed_submissions, solution, size);

// collect deposit. Thereafter, the function cannot fail.
T::Currency::reserve(&who, deposit_amount)
.map_err(|_| Error::<T>::SignedCannotPayDeposit)?;
if let Some(deposit_amount) = maybe_deposit {
// collect deposit. Thereafter, the function cannot fail.
T::Currency::reserve(&who, deposit_amount)
.map_err(|_| Error::<T>::SignedCannotPayDeposit)?;
}

// store the new signed submission.
<SignedSubmissions<T>>::put(signed_submissions);
Self::deposit_event(Event::SolutionStored(ElectionCompute::Signed, ejected_a_solution));
Ok(())
}
Expand Down Expand Up @@ -1049,10 +1048,34 @@ pub mod pallet {
#[pallet::getter(fn snapshot_metadata)]
pub type SnapshotMetadata<T: Config> = StorageValue<_, SolutionOrSnapshotSize>;

/// Sorted set of unchecked, signed solutions.
/// The next index to be assigned to an incoming signed submission.
///
/// We can't just use `SignedSubmissionIndices.len()`, because that's a bounded set; past its
/// capacity, it will simply saturate. We can't just iterate over `SignedSubmissionsMap`,
/// because iteration is slow. Instead, we store the value here.
#[pallet::storage]
pub(crate) type SignedSubmissionNextIndex<T: Config> = StorageValue<_, u32, ValueQuery>;

/// A sorted, bounded set of `(score, index)`, where each `index` points to a value in
/// `SignedSubmissions`.
///
/// We never need to process more than a single signed submission at a time. Signed submissions
/// can be quite large, so we're willing to pay the cost of multiple database accesses to access
/// them one at a time instead of reading and decoding all of them at once.
#[pallet::storage]
pub(crate) type SignedSubmissionIndices<T: Config> =
StorageValue<_, SubmissionIndicesOf<T>, ValueQuery>;

/// Unchecked, signed solutions.
///
/// Together with `SubmissionIndices`, this stores a bounded set of `SignedSubmissions` while
/// allowing us to keep only a single one in memory at a time.
///
/// Twox note: the key of the map is an auto-incrementing index which users cannot inspect or
/// affect; we shouldn't need a cryptographically secure hasher.
#[pallet::storage]
#[pallet::getter(fn signed_submissions)]
pub type SignedSubmissions<T: Config> = StorageValue<_, SignedSubmissionsOf<T>, ValueQuery>;
pub(crate) type SignedSubmissionsMap<T: Config> =
StorageMap<_, Twox64Concat, u32, SignedSubmissionOf<T>, ValueQuery>;

/// The minimum score that each 'untrusted' solution must attain in order to be considered
/// feasible.
Expand Down
193 changes: 135 additions & 58 deletions frame/election-provider-multi-phase/src/signed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@

use crate::{
CompactOf, Config, ElectionCompute, Pallet, RawSolution, ReadySolution, SolutionOrSnapshotSize,
Weight, WeightInfo, QueuedSolution, SignedSubmissions,
Weight, WeightInfo, QueuedSolution, SignedSubmissionsMap, SignedSubmissionIndices,
SignedSubmissionNextIndex,
};
use codec::{Encode, Decode, HasCompact};
use frame_support::{
storage::bounded_btree_set::BoundedBTreeSet,
storage::bounded_btree_map::BoundedBTreeMap,
traits::{Currency, Get, OnUnbalanced, ReservableCurrency},
};
use sp_arithmetic::traits::SaturatedConversion;
use sp_npos_elections::{is_score_better, CompactSolution};
use sp_npos_elections::{is_score_better, CompactSolution, ElectionScore};
use sp_runtime::{
RuntimeDebug,
traits::{Saturating, Zero},
};
use sp_std::{cmp::Ordering, collections::btree_set::BTreeSet};
use sp_std::{cmp::Ordering, ops::Deref};

/// A raw, unchecked signed submission.
///
Expand Down Expand Up @@ -89,17 +90,121 @@ pub type PositiveImbalanceOf<T> = <<T as Config>::Currency as Currency<
pub type NegativeImbalanceOf<T> = <<T as Config>::Currency as Currency<
<T as frame_system::Config>::AccountId,
>>::NegativeImbalance;
pub type SignedSubmissionOf<T> = SignedSubmission<
<T as frame_system::Config>::AccountId,
BalanceOf<T>,
CompactOf<T>,
>;
pub type SignedSubmissionsOf<T> = BoundedBTreeSet<
SignedSubmissionOf<T>,
<T as Config>::SignedMaxSubmissions,
>;
pub type SignedSubmissionOf<T> =
SignedSubmission<<T as frame_system::Config>::AccountId, BalanceOf<T>, CompactOf<T>>;

pub type SubmissionIndicesOf<T> =
BoundedBTreeMap<ElectionScore, u32, <T as Config>::SignedMaxSubmissions>;

/// Mask type which pretends to be a set of `SignedSubmissionOf<T>`, while in fact delegating to the
/// actual implementations in `SignedSubmissionIndices<T>`, `SignedSubmissionsMap<T>`, and
/// `SignedSubmissionNextIndex<T>`.
pub struct SignedSubmissions<T: Config>(SubmissionIndicesOf<T>);

impl<T: Config> SignedSubmissions<T> {
/// Empty the set of signed submissions, returning an iterator of signed submissions
pub fn drain(&mut self) -> impl Iterator<Item = SignedSubmissionOf<T>> {
self.0.clear();
SignedSubmissionNextIndex::<T>::kill();
SignedSubmissionsMap::<T>::drain().map(|(_k, v)| v)
}

/// Decode the length of the signed submissions without actually reading the entire struct into
/// memory.
pub fn decode_len() -> Option<usize> {
SignedSubmissionIndices::<T>::decode_len()
}

/// Insert a new signed submission into the set.
///
/// Returns `(inserted, removed)`. `inserted` is true when the submission was inserted.
/// `removed` is the removed weakest submission, if any.
///
/// In the event that the new submission is not better than the current weakest according
/// to `is_score_better`, we do not change anything.
pub fn insert(
&mut self,
submission: SignedSubmissionOf<T>,
) -> (bool, Option<SignedSubmissionOf<T>>) {
let insert_idx = SignedSubmissionNextIndex::<T>::get();
let weakest = match self.0.try_insert(submission.solution.score, insert_idx) {
Ok(Some(prev_idx)) => {
// a submission of equal score was already present in the set;
// no point editing the actual backing map as we know that the newer solution can't
// be better than the old. However, we do need to put the old value back.
self.0
.try_insert(submission.solution.score, prev_idx)
.expect("didn't change the map size; qed");
return (false, None);
}
Ok(None) => {
// successfully inserted into the set; no need to take out weakest member
None
}
Err((score, insert_idx)) => {
// could not insert into the set because it is full.
// note that we short-circuit return here in case the iteration produces `None`.
// If there wasn't a weakest entry to remove, then there must be a capacity of 0,
// which means that we can't meaningfully proceed.
let (weakest_score, weakest_idx) = match self.0.iter().next() {
None => return (false, None),
Some((score, idx)) => (*score, *idx),
};
let threshold = T::SolutionImprovementThreshold::get();

// if we haven't improved on the weakest score, don't change anything.
if !is_score_better(score, weakest_score, threshold) {
return (false, None);
}

self.0.remove(&weakest_score);
self.0
.try_insert(score, insert_idx)
.expect("just removed an item, we must be under capacity; qed");

// ensure that SignedSubmissionsMap never grows past capacity by taking out the
// weakest member here.
Some(SignedSubmissionsMap::<T>::take(weakest_idx))
}
};

// we've taken out the weakest, so update the storage map and the next index
SignedSubmissionsMap::<T>::insert(insert_idx, submission);
SignedSubmissionNextIndex::<T>::put(insert_idx + 1);
(true, weakest)
}

/// Remove the signed submission with the highest score from the set.
pub fn pop_last(&mut self) -> Option<SignedSubmissionOf<T>> {
let (highest_score, idx) = self.0.iter().rev().next()?;
let (highest_score, idx) = (*highest_score, *idx);
self.0.remove(&highest_score);
Some(SignedSubmissionsMap::<T>::take(idx))
}
}

// ensure that we update the storage once we're done with this wrapper struct.
impl<T: Config> Drop for SignedSubmissions<T> {
fn drop(&mut self) {
let indices = sp_std::mem::take(&mut self.0);
SignedSubmissionIndices::<T>::put(indices);
}
}

impl<T: Config> Deref for SignedSubmissions<T> {
type Target = SubmissionIndicesOf<T>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<T: Config> Pallet<T> {
/// `Self` accessor for `SignedSubmission<T>`.
pub fn signed_submissions() -> SignedSubmissions<T> {
SignedSubmissions(SignedSubmissionIndices::<T>::get())
}

/// Finish the signed phase. Process the signed submissions from best to worse until a valid one
/// is found, rewarding the best one and slashing the invalid ones along the way.
///
Expand All @@ -108,17 +213,11 @@ impl<T: Config> Pallet<T> {
/// This drains the [`SignedSubmissions`], potentially storing the best valid one in
/// [`QueuedSolution`].
pub fn finalize_signed_phase() -> (bool, Weight) {
let mut all_submissions: BTreeSet<_> = <SignedSubmissions<T>>::take().into_inner();
let mut all_submissions = Self::signed_submissions();
let mut found_solution = false;
let mut weight = T::DbWeight::get().reads(1);

// `BTreeSet::pop_last` is what we really want, but unfortunately, it's still nightly-only.
let take_highest = |set: &mut BTreeSet<SignedSubmissionOf<T>>| {
let highest = set.iter().rev().next()?.clone();
set.remove(&highest);
Some(highest)
};
while let Some(best) = take_highest(&mut all_submissions) {
while let Some(best) = all_submissions.pop_last() {
let SignedSubmission { solution, who, deposit, reward } = best;
let active_voters = solution.compact.voter_count() as u32;
let feasibility_weight = {
Expand Down Expand Up @@ -159,12 +258,11 @@ impl<T: Config> Pallet<T> {

// Any unprocessed solution is pointless to even consider. Feasible or malicious,
// they didn't end up being used. Unreserve the bonds.
for not_processed in all_submissions {
let SignedSubmission { who, deposit, .. } = not_processed;
for SignedSubmission { who, deposit, .. } in all_submissions.drain() {
let _remaining = T::Currency::unreserve(&who, deposit);
weight = weight.saturating_add(T::DbWeight::get().writes(1));
debug_assert!(_remaining.is_zero());
};
}

(found_solution, weight)
}
Expand Down Expand Up @@ -207,50 +305,29 @@ impl<T: Config> Pallet<T> {
///
/// If insertion was successful, the required deposit amount is returned.
///
/// Additionally returns a bool indicating whether an existing solution was ejected.
///
/// Note: in the event that the queue is full, this function will drop the weakest element as
/// long as the new solution sufficiently improves on the weakest solution.
pub fn insert_submission(
who: &T::AccountId,
queue: &mut SignedSubmissionsOf<T>,
queue: &mut SignedSubmissions<T>,
solution: RawSolution<CompactOf<T>>,
size: SolutionOrSnapshotSize,
) -> Result<BalanceOf<T>, crate::Error<T>> {
use crate::Error;

let score = solution.score;
) -> (Option<BalanceOf<T>>, bool) {
let reward = T::SignedRewardBase::get();
let deposit = Self::deposit_for(&solution, size);
let submission = SignedSubmission { who: who.clone(), deposit, reward, solution };

queue
.try_insert(submission)
.or_else(|submission| {
let threshold = T::SolutionImprovementThreshold::get();
// This shouldn't ever fail, becuase it means that the queue was simultaneously full
// and empty. It can still happen if the queue has a max size of 0, though.
let weakest = queue.iter().next().ok_or(Error::<T>::SignedQueueFull)?;
if is_score_better(score, weakest.solution.score, threshold) {
// remove the previous weakest element and unreserve its deposit

// unfortunately, because weakest is currently tied to an immutable borrow of
// queue, we now have to clone it in order to remove it.
let weakest = weakest.clone();
queue.remove(&weakest);
let _remainder = T::Currency::unreserve(&weakest.who, weakest.deposit);
debug_assert!(_remainder.is_zero());

// This should really never ever fail, because we've just removed an item, so
// inserting one should be totally ok. We're not going to take it for granted
// though.
queue.try_insert(submission).map_err(|_| Error::<T>::SignedQueueFull)?;

// the particular value here doesn't matter, as it's overwritten by the following map.
Ok(false)
} else {
Err(Error::<T>::SignedQueueFull)
}
})
.map(|_| deposit)
// if we had to remove the weakest solution, unreserve its deposit
let (inserted, maybe_weakest) = queue.insert(submission);
let ejected_weakest = maybe_weakest.is_some();
if let Some(weakest) = maybe_weakest {
let _remainder = T::Currency::unreserve(&weakest.who, weakest.deposit);
debug_assert!(_remainder.is_zero());
}

(inserted.then(move || deposit), ejected_weakest)
}

/// The feasibility weight of the given raw solution.
Expand Down

0 comments on commit 3d7dc0b

Please sign in to comment.