Skip to content

Commit

Permalink
refactor: new curp conflict pool
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>
  • Loading branch information
bsbds authored and Phoenix500526 committed Apr 3, 2024
1 parent 4c0c9e6 commit 2e6844e
Show file tree
Hide file tree
Showing 10 changed files with 732 additions and 9 deletions.
41 changes: 41 additions & 0 deletions crates/curp-external-api/src/conflict.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#![allow(clippy::module_name_repetitions)]

/// Common operations for conflict pools
pub trait ConflictPoolOp {
/// Entry of the pool
type Entry;

/// Removes a command from the pool
fn remove(&mut self, entry: Self::Entry);

/// Returns all commands in the pool
fn all(&self) -> Vec<Self::Entry>;

/// Clears all entries in the pool
fn clear(&mut self);

/// Returns the number of commands in the pool
fn len(&self) -> usize;

/// Checks if the pool contains some commands that will conflict with all other commands
fn is_empty(&self) -> bool;
}

/// Speculative pool operations
pub trait SpeculativePoolOp: ConflictPoolOp {
/// Inserts a command in to the pool
///
/// Returns the entry if a conflict is detected
fn insert_if_not_conflict(&mut self, entry: Self::Entry) -> Option<Self::Entry>;
}

/// Uncommitted pool operations
pub trait UncommittedPoolOp: ConflictPoolOp {
/// Inserts a command in to the pool
///
/// Returns `true` if a conflict is detected
fn insert(&mut self, entry: Self::Entry) -> bool;

/// Returns all commands in the pool that conflicts with the given command
fn all_conflict(&self, entry: &Self::Entry) -> Vec<Self::Entry>;
}
3 changes: 3 additions & 0 deletions crates/curp-external-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,6 @@ pub type InflightId = u64;
pub mod cmd;
/// The command to be executed
pub mod role_change;

/// Conflict trait
pub mod conflict;
12 changes: 8 additions & 4 deletions crates/curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ impl From<CurpError> for tonic::Status {

/// Entry of speculative pool
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub(crate) struct PoolEntry<C> {
/// Propose id
pub(crate) id: ProposeId,
Expand All @@ -844,25 +845,28 @@ pub(crate) struct PoolEntry<C> {

/// Inner entry of speculative pool
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub(crate) enum PoolEntryInner<C> {
/// Command entry
Command(Arc<C>),
/// ConfChange entry
ConfChange(Vec<ConfChange>),
}

impl<C> PoolEntry<C>
where
C: Command,
{
impl<C> PoolEntry<C> {
/// Create a new pool entry
pub(crate) fn new(id: ProposeId, inner: impl Into<PoolEntryInner<C>>) -> Self {
Self {
id,
inner: inner.into(),
}
}
}

impl<C> PoolEntry<C>
where
C: Command,
{
/// Check if the entry is conflict with the command
pub(crate) fn is_conflict_with_cmd(&self, c: &C) -> bool {
match self.inner {
Expand Down
101 changes: 101 additions & 0 deletions crates/curp/src/server/conflict/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#![allow(unused)]
#![allow(unreachable_pub)]

/// Speculative pool
pub(crate) mod spec_pool_new;

/// Uncommitted pool
pub(crate) mod uncommitted_pool;

#[cfg(test)]
mod tests;

use std::{ops::Deref, sync::Arc};

use crate::rpc::{ConfChange, PoolEntry, PoolEntryInner, ProposeId};

// TODO: relpace `PoolEntry` with this
/// Entry stored in conflict pools
pub(super) enum SplitEntry<C> {
/// A command entry
Command(CommandEntry<C>),
/// A conf change entry
ConfChange(ConfChangeEntry),
}

impl<C> From<PoolEntry<C>> for SplitEntry<C> {
fn from(entry: PoolEntry<C>) -> Self {
match entry.inner {
PoolEntryInner::Command(c) => SplitEntry::Command(CommandEntry {
id: entry.id,
cmd: c,
}),
PoolEntryInner::ConfChange(c) => SplitEntry::ConfChange(ConfChangeEntry {
id: entry.id,
conf_change: c,
}),
}
}
}

/// Command entry type
#[derive(Debug)]
pub struct CommandEntry<C> {
/// The propose id
id: ProposeId,
/// The command
cmd: Arc<C>,
}

impl<C> Clone for CommandEntry<C> {
#[inline]
fn clone(&self) -> Self {
Self {
id: self.id,
cmd: Arc::clone(&self.cmd),
}
}
}

impl<C> Deref for CommandEntry<C> {
type Target = C;

#[inline]
fn deref(&self) -> &Self::Target {
&self.cmd
}
}

impl<C> AsRef<C> for CommandEntry<C> {
#[inline]
fn as_ref(&self) -> &C {
self.cmd.as_ref()
}
}

impl<C> From<CommandEntry<C>> for PoolEntry<C> {
fn from(entry: CommandEntry<C>) -> Self {
PoolEntry {
id: entry.id,
inner: PoolEntryInner::Command(entry.cmd),
}
}
}

/// Conf change entry type
#[derive(Clone, PartialEq)]
pub(super) struct ConfChangeEntry {
/// The propose id
id: ProposeId,
/// The conf change entry
conf_change: Vec<ConfChange>,
}

impl<C> From<ConfChangeEntry> for PoolEntry<C> {
fn from(entry: ConfChangeEntry) -> Self {
PoolEntry {
id: entry.id,
inner: PoolEntryInner::ConfChange(entry.conf_change),
}
}
}
131 changes: 131 additions & 0 deletions crates/curp/src/server/conflict/spec_pool_new.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use curp_external_api::conflict::{ConflictPoolOp, SpeculativePoolOp};

use crate::rpc::PoolEntry;

use super::{CommandEntry, ConfChangeEntry, SplitEntry};

/// A speculative pool object
pub type SpObject<C> = Box<dyn SpeculativePoolOp<Entry = CommandEntry<C>> + Send + 'static>;

/// Union type of `SpeculativePool` objects
pub(crate) struct SpeculativePool<C> {
/// Command speculative pools
command_sps: Vec<SpObject<C>>,
/// Conf change speculative pool
conf_change_sp: ConfChangeSp,
}

impl<C> SpeculativePool<C> {
/// Creates a new pool
pub(crate) fn new(command_sps: Vec<SpObject<C>>) -> Self {
Self {
command_sps,
conf_change_sp: ConfChangeSp::default(),
}
}

/// Inserts an entry into the pool
pub(crate) fn insert(&mut self, entry: PoolEntry<C>) -> Option<PoolEntry<C>> {
if !self.conf_change_sp.is_empty() {
return Some(entry);
}

match SplitEntry::from(entry) {
SplitEntry::Command(c) => {
for csp in &mut self.command_sps {
if let Some(e) = csp.insert_if_not_conflict(c.clone()) {
return Some(e.into());
}
}
}
SplitEntry::ConfChange(c) => {
if !self
.command_sps
.iter()
.map(AsRef::as_ref)
.all(ConflictPoolOp::is_empty)
{
return Some(c.into());
}
let _ignore = self.conf_change_sp.insert_if_not_conflict(c);
}
}

None
}

// TODO: Use reference instead of clone
/// Removes an entry from the pool
pub(crate) fn remove(&mut self, entry: PoolEntry<C>) {
match SplitEntry::from(entry) {
SplitEntry::Command(c) => {
for csp in &mut self.command_sps {
csp.remove(c.clone());
}
}
SplitEntry::ConfChange(c) => {
self.conf_change_sp.remove(c);
}
}
}

/// Returns all entries in the pool
pub(crate) fn all(&self) -> Vec<PoolEntry<C>> {
let mut entries = Vec::new();
for csp in &self.command_sps {
entries.extend(csp.all().into_iter().map(Into::into));
}
entries.extend(self.conf_change_sp.all().into_iter().map(Into::into));
entries
}

/// Returns the number of entries in the pool
#[allow(clippy::arithmetic_side_effects)] // Pool sizes can't overflow a `usize`
pub(crate) fn len(&self) -> usize {
self.command_sps
.iter()
.fold(0, |sum, pool| sum + pool.len())
+ self.conf_change_sp.len()
}
}

/// Speculative pool for conf change entries
#[derive(Default)]
struct ConfChangeSp {
/// Store current conf change
change: Option<ConfChangeEntry>,
}

impl ConflictPoolOp for ConfChangeSp {
type Entry = ConfChangeEntry;

fn is_empty(&self) -> bool {
self.change.is_none()
}

fn remove(&mut self, _entry: Self::Entry) {
self.change = None;
}

fn all(&self) -> Vec<Self::Entry> {
self.change.clone().into_iter().collect()
}

fn clear(&mut self) {
self.change = None;
}

fn len(&self) -> usize {
self.change.iter().count()
}
}

impl SpeculativePoolOp for ConfChangeSp {
fn insert_if_not_conflict(&mut self, entry: Self::Entry) -> Option<Self::Entry> {
if self.change.is_some() {
return Some(entry);
}
self.change = Some(entry);
None
}
}
Loading

0 comments on commit 2e6844e

Please sign in to comment.