Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement committed entries pagination #356

Closed
13 changes: 12 additions & 1 deletion harness/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub struct Network {
dropm: HashMap<Connection, f64>,
/// Drop messages of type `MessageType`.
ignorem: HashMap<MessageType, bool>,
/// msg_hook is called for each message sent. It may inspect the
/// message and return true to send it or false to drop it.
pub msg_hook: Option<Box<dyn Fn(&Message) -> bool>>,
}

impl Network {
Expand Down Expand Up @@ -141,7 +144,15 @@ impl Network {
})
.cloned()
.unwrap_or(0f64);
rand::random::<f64>() >= perc
if rand::random::<f64>() < perc {
return false;
}
if let Some(hook) = &self.msg_hook {
if !hook(&m) {
return false;
}
}
true
})
.collect()
}
Expand Down
165 changes: 165 additions & 0 deletions harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,168 @@ fn test_set_priority() {
assert_eq!(raw_node.raft.priority, p);
}
}

// test_append_pagination ensures that a message will never be sent with entries size overflowing the `max_msg_size`
#[test]
fn test_append_pagination() {
use std::cell::Cell;
use std::rc::Rc;
let l = default_logger();
let mut config = new_test_config(1, 10, 1);
let max_size_per_msg = 2048;
config.max_size_per_msg = max_size_per_msg;
let mut nt = Network::new_with_config(vec![None, None, None], &config, &l);
let seen_full_msg = Rc::new(Cell::new(false));
let b = seen_full_msg.clone();
nt.msg_hook = Some(Box::new(move |m: &Message| -> bool {
if m.msg_type == MessageType::MsgAppend {
let total_size = m.entries.iter().fold(0, |acc, e| acc + e.data.len());
if total_size as u64 > max_size_per_msg {
panic!("sent MsgApp that is too large: {} bytes", total_size);
}
if total_size as u64 > max_size_per_msg / 2 {
b.set(true);
}
}
true
}));
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
nt.isolate(1);
for _ in 0..5 {
let data = "a".repeat(1000);
nt.send(vec![new_message_with_entries(
1,
1,
MessageType::MsgPropose,
vec![new_entry(0, 0, Some(&data))],
)]);
}
nt.recover();
// After the partition recovers, tick the clock to wake everything
// back up and send the messages.
nt.send(vec![new_message(1, 1, MessageType::MsgBeat, 0)]);
assert!(
seen_full_msg.get(),
"didn't see any messages more than half the max size; something is wrong with this test"
);
}

// test_commit_pagination ensures that the max size of committed entries must be limit under `max_committed_size_per_ready` to per ready
#[test]
fn test_commit_pagination() {
let l = default_logger();
let storage = MemStorage::new_with_conf_state((vec![1], vec![]));
let mut config = new_test_config(1, 10, 1);
config.max_committed_size_per_ready = 2048;
let mut raw_node = RawNode::new(&config, storage, &l).unwrap();
raw_node.campaign().unwrap();
let rd = raw_node.ready();
let committed_len = rd.committed_entries.as_ref().unwrap().len();
assert_eq!(
committed_len, 1,
"expected 1 (empty) entry, got {}",
committed_len
);
raw_node.mut_store().wl().append(rd.entries()).unwrap();
raw_node.advance(rd);
let blob = "a".repeat(1000).into_bytes();
for _ in 0..3 {
raw_node.propose(vec![], blob.clone()).unwrap();
}
// The 3 proposals will commit in two batches.
let rd = raw_node.ready();
let committed_len = rd.committed_entries.as_ref().unwrap().len();
assert_eq!(
committed_len, 2,
"expected 2 entries in first batch, got {}",
committed_len
);
raw_node.mut_store().wl().append(rd.entries()).unwrap();
raw_node.advance(rd);

let rd = raw_node.ready();
let committed_len = rd.committed_entries.as_ref().unwrap().len();
assert_eq!(
committed_len, 1,
"expected 1 entry in second batch, got {}",
committed_len
);
raw_node.mut_store().wl().append(rd.entries()).unwrap();
raw_node.advance(rd);
}

// test_commit_pagination_after_restart regression tests a scenario in which the
// Storage's Entries size limitation is slightly more permissive than Raft's
// internal one
//
// - node learns that index 11 is committed
// - next_entries returns index 1..10 in committed_entries (but index 10 already
// exceeds maxBytes), which isn't noticed internally by Raft
// - Commit index gets bumped to 10
// - the node persists the HardState, but crashes before applying the entries
// - upon restart, the storage returns the same entries, but `slice` takes a
// different code path and removes the last entry.
// - Raft does not emit a HardState, but when the app calls advance(), it bumps
// its internal applied index cursor to 10 (when it should be 9)
// - the next Ready asks the app to apply index 11 (omitting index 10), losing a
// write.
#[test]
fn test_commit_pagination_after_restart() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case doesn't seem to be the same as upstream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is silently a little different from the one in etcd due to the diff of MemStorage

let mut persisted_hard_state = HardState::default();
persisted_hard_state.set_term(1);
persisted_hard_state.set_vote(1);
persisted_hard_state.set_commit(10);
let s = IgnoreSizeHintMemStorage::default();
s.inner.wl().set_hardstate(persisted_hard_state);
let ents_count = 10;
let mut ents = Vec::with_capacity(ents_count);
let mut size = 0u64;
for i in 0..ents_count as u64 {
let e = new_entry(1, i + 1, Some("a"));
size += u64::from(e.compute_size());
ents.push(e);
}
s.inner.wl().append(&ents).unwrap();

let mut cfg = new_test_config(1, 10, 1);
// Set a max_size_per_msg that would suggest to Raft that the last committed entry should
// not be included in the initial rd.committed_entries. However, our storage will ignore
// this and *will* return it (which is how the Commit index ended up being 10 initially).
cfg.max_size_per_msg = size - 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be size - uint64(s.ents[len(s.ents)-1].Size()) - 1. The purpose is to let raft return 9 entries instead of 10, so that entry at 10 will get missed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the etcd codes of this test case are kind of buggy because the firstIndex here is 2 because the codes in this test replace the ents with start index 1 and the firstIndex of MemoryStorage is always s.ents[0].Index + 1 due to the dummy entry (the index 0). Therefore, len(rd.CommittedEntries) in the first loop is 8 in etcd version.

And since MemStorage in raft-rs has no such a dummy entry approach, we can correctly use size - 1 here to return 9 committed entries in the first loop.

Though I found I should use cfg.max_committed_size_per_ready here 🤣


s.inner
.wl()
.append(&[new_entry(1, 11, Some("boom"))])
.unwrap();
let mut raw_node = RawNode::with_default_logger(&cfg, s).unwrap();
let mut highest_applied = 0;
while highest_applied != 11 {
let rd = raw_node.ready();
let committed_entries = rd.committed_entries.clone().unwrap();
assert!(
!committed_entries.is_empty(),
"stop applying entries at index {}",
highest_applied
);
let next = committed_entries.first().unwrap().get_index();
if highest_applied != 0 {
assert_eq!(
highest_applied + 1,
next,
"attempting to apply index {} after index {}, leaving a gap",
next,
highest_applied
)
}
highest_applied = rd
.committed_entries
.as_ref()
.unwrap()
.last()
.unwrap()
.get_index();
raw_node.advance(rd);
// node learns commit index is 11
raw_node.raft.r.raft_log.commit_to(11);
}
}
36 changes: 36 additions & 0 deletions harness/tests/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,39 @@ pub fn new_snapshot(index: u64, term: u64, voters: Vec<u64>) -> Snapshot {
s.mut_metadata().mut_conf_state().voters = voters;
s
}

#[derive(Default)]
pub struct IgnoreSizeHintMemStorage {
pub inner: MemStorage,
}

impl Storage for IgnoreSizeHintMemStorage {
fn initial_state(&self) -> Result<RaftState> {
self.inner.initial_state()
}

fn entries(
&self,
low: u64,
high: u64,
_max_size: impl Into<Option<u64>>,
) -> Result<Vec<Entry>> {
self.inner.entries(low, high, u64::max_value())
}

fn term(&self, idx: u64) -> Result<u64> {
self.inner.term(idx)
}

fn first_index(&self) -> Result<u64> {
self.inner.first_index()
}

fn last_index(&self) -> Result<u64> {
self.inner.last_index()
}

fn snapshot(&self, request_index: u64) -> Result<Snapshot> {
self.inner.snapshot(request_index)
}
}
9 changes: 7 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
pub use super::read_only::{ReadOnlyOption, ReadState};
use super::{
errors::{Error, Result},
INVALID_ID,
INVALID_ID, NO_LIMIT,
};

/// Config contains the parameters to start a raft.
Expand Down Expand Up @@ -48,9 +48,13 @@ pub struct Config {
/// Limit the max size of each append message. Smaller value lowers
/// the raft recovery cost(initial probing and message lost during normal operation).
/// On the other side, it might affect the throughput during normal replication.
/// Note: math.MaxUusize64 for unlimited, 0 for at most one entry per message.
/// Note: raft::NO_LIMIT for unlimited, 0 for at most one entry per message.
pub max_size_per_msg: u64,

/// max_committed_size_per_ready limits the size of the committed entries which
/// can be applied. Use `NO_LIMIT` as default to keep backword compatible.
pub max_committed_size_per_ready: u64,

/// Limit the max number of in-flight append messages during optimistic
/// replication phase. The application transportation layer usually has its own sending
/// buffer over TCP/UDP. Set to avoid overflowing that sending buffer.
Expand Down Expand Up @@ -101,6 +105,7 @@ impl Default for Config {
heartbeat_tick: HEARTBEAT_TICK,
applied: 0,
max_size_per_msg: 0,
max_committed_size_per_ready: NO_LIMIT,
max_inflight_msgs: 256,
check_quorum: false,
pre_vote: false,
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,10 @@ use raft::{Config, storage::MemStorage, raw_node::RawNode, eraftpb::*};
use protobuf::Message as PbMessage;
use slog::{Drain, o};

let mut config = Config { id: 1, ..Default::default() };
let config = Config { id: 1, ..Default::default() };
let store = MemStorage::new_with_conf_state((vec![1, 2], vec![]));
let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
let mut node = RawNode::new(&mut config, store, &logger).unwrap();
let mut node = RawNode::new(&config, store, &logger).unwrap();
node.raft.become_candidate();
node.raft.become_leader();

Expand Down
2 changes: 1 addition & 1 deletion src/log_unstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct Unstable {
/// The offset from the vector index.
pub offset: u64,

/// The tag to use when logging.
/// The logger being used.
pub logger: Logger,
}

Expand Down
5 changes: 3 additions & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,15 @@ impl<T: Storage> Raft<T> {
let conf_state = &raft_state.conf_state;
let voters = &conf_state.voters;
let learners = &conf_state.learners;

let raft_log =
RaftLog::new_with_size(store, logger.clone(), c.max_committed_size_per_ready);
let mut r = Raft {
prs: ProgressSet::with_capacity(voters.len(), learners.len(), logger.clone()),
msgs: Default::default(),
r: RaftCore {
id: c.id,
read_states: Default::default(),
raft_log: RaftLog::new(store, logger.clone()),
raft_log,
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
pending_request_snapshot: INVALID_INDEX,
Expand Down
15 changes: 13 additions & 2 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub struct RaftLog<T: Storage> {
///
/// Invariant: applied <= committed
pub applied: u64,

/// max_next_ents_size is the maximum number aggregate byte size of the messages
/// returned from calls to nextEnts.
pub max_next_ents_size: u64,
}

impl<T> ToString for RaftLog<T>
Expand All @@ -62,8 +66,14 @@ where
}

impl<T: Storage> RaftLog<T> {
/// Creates a new raft log with a given storage and tag.
/// Creates a new raft log with a given storage and default options. It recovers
/// the log to the state that it just commits and applies the latest snapshot
pub fn new(store: T, logger: Logger) -> RaftLog<T> {
Self::new_with_size(store, logger, NO_LIMIT)
}

/// Creates a new raft log with the given storage and max next entries size
pub fn new_with_size(store: T, logger: Logger, max_next_ents_size: u64) -> RaftLog<T> {
let first_index = store.first_index().unwrap();
let last_index = store.last_index().unwrap();

Expand All @@ -73,6 +83,7 @@ impl<T: Storage> RaftLog<T> {
committed: first_index - 1,
applied: first_index - 1,
unstable: Unstable::new(last_index + 1, logger),
max_next_ents_size,
}
}

Expand Down Expand Up @@ -359,7 +370,7 @@ impl<T: Storage> RaftLog<T> {
let offset = cmp::max(since_idx + 1, self.first_index());
let committed = self.committed;
if committed + 1 > offset {
match self.slice(offset, committed + 1, None) {
match self.slice(offset, committed + 1, self.max_next_ents_size) {
Ok(vec) => return Some(vec),
Err(e) => fatal!(self.unstable.logger, "{}", e),
}
Expand Down
Loading