Skip to content

Commit

Permalink
fix test case
Browse files Browse the repository at this point in the history
Signed-off-by: Fullstop000 <fullstop1005@gmail.com>
  • Loading branch information
Fullstop000 committed Jun 11, 2020
1 parent a4dde24 commit 60d7771
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 76 deletions.
2 changes: 1 addition & 1 deletion benches/suites/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn quick_raw_node(logger: &slog::Logger) -> RawNode<MemStorage> {
let conf_state = ConfState::from((vec![1], vec![]));
let storage = MemStorage::new_with_conf_state(conf_state);
let config = Config::new(id);
RawNode::new(&config, storage, logger).unwrap()
RawNode::new(config, storage, logger).unwrap()
}

pub fn bench_raw_node_new(c: &mut Criterion) {
Expand Down
4 changes: 2 additions & 2 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl Node {
s.mut_metadata().mut_conf_state().voters = vec![1];
let storage = MemStorage::new();
storage.wl().apply_snapshot(s).unwrap();
let raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap());
let raft_group = Some(RawNode::new(cfg, storage, &logger).unwrap());
Node {
raft_group,
my_mailbox,
Expand Down Expand Up @@ -214,7 +214,7 @@ impl Node {
cfg.id = msg.to;
let logger = logger.new(o!("tag" => format!("peer_{}", msg.to)));
let storage = MemStorage::new();
self.raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap());
self.raft_group = Some(RawNode::new(cfg, storage, &logger).unwrap());
}

// Step a raft message, initialize the raft if need.
Expand Down
2 changes: 1 addition & 1 deletion examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn main() {
};

// Create the Raft node.
let mut r = RawNode::new(&cfg, storage, &logger).unwrap();
let mut r = RawNode::new(cfg, storage, &logger).unwrap();

let (sender, receiver) = mpsc::channel();

Expand Down
33 changes: 25 additions & 8 deletions harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,10 @@ fn test_commit_pagination() {
raw_node.advance(rd);
}


// test_commit_pagination_after_restart regression tests a scenario in which the
// Storage's Entries size limitation is slightly more permissive than Raft's
// internal one
//
//
// - node learns that index 11 is committed
// - next_entries returns index 1..10 in committed_entries (but index 10 already
// exceeds maxBytes), which isn't noticed internally by Raft
Expand Down Expand Up @@ -614,19 +613,37 @@ fn test_commit_pagination_after_restart() {
// this and *will* return it (which is how the Commit index ended up being 10 initially).
cfg.max_size_per_msg = size - 1;

s.inner.wl().append(&vec![new_entry(1, 11, Some("boom"))]).unwrap();
s.inner
.wl()
.append(&vec![new_entry(1, 11, Some("boom"))])
.unwrap();
let mut raw_node = RawNode::with_default_logger(cfg, s).unwrap();
let mut highest_applied = 0;
while highest_applied != 11 {
while highest_applied != 11 {
let rd = raw_node.ready();
dbg!(&rd);
let committed_entries = rd.committed_entries.clone().unwrap();
assert!(committed_entries.len() > 0, "stop applying entries at index {}", highest_applied);
assert!(
committed_entries.len() > 0,
"stop applying entries at index {}",
highest_applied
);
let next = committed_entries.first().unwrap().get_index();
if highest_applied != 0 {
assert_eq!(highest_applied + 1, next, "attempting to apply index {} after index {}, leaving a gap", next, highest_applied)
assert_eq!(
highest_applied + 1,
next,
"attempting to apply index {} after index {}, leaving a gap",
next,
highest_applied
)
}
highest_applied = rd.committed_entries.as_ref().unwrap().last().unwrap().get_index();
highest_applied = rd
.committed_entries
.as_ref()
.unwrap()
.last()
.unwrap()
.get_index();
raw_node.advance(rd);
let mut m = new_message(1, 1, MessageType::MsgHeartbeat, 0);
m.set_term(1);
Expand Down
10 changes: 7 additions & 3 deletions harness/tests/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,20 @@ pub fn new_snapshot(index: u64, term: u64, voters: Vec<u64>) -> Snapshot {

#[derive(Default)]
pub struct IgnoreSizeHintMemStorage {
pub inner: MemStorage
pub inner: MemStorage,
}

impl Storage for IgnoreSizeHintMemStorage {
fn initial_state(&self) -> Result<RaftState> {
self.inner.initial_state()
}

fn entries(&self, low: u64, high: u64, _max_size: impl Into<Option<u64>>) -> Result<Vec<Entry>> {
fn entries(
&self,
low: u64,
high: u64,
_max_size: impl Into<Option<u64>>,
) -> Result<Vec<Entry>> {
self.inner.entries(low, high, u64::MAX)
}

Expand All @@ -194,4 +199,3 @@ impl Storage for IgnoreSizeHintMemStorage {
self.inner.snapshot(request_index)
}
}

38 changes: 19 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use raft::{
use slog::{Drain, o};
// Select some defaults, then change what we need.
let config = Config {
let mut config = Config {
id: 1,
..Default::default()
};
Expand All @@ -44,7 +44,7 @@ config.validate().unwrap();
// We'll use the built-in `MemStorage`, but you will likely want your own.
// Finally, create our Raft node!
let storage = MemStorage::new_with_conf_state((vec![1], vec![]));
let mut node = RawNode::new(&config, storage, &logger).unwrap();
let mut node = RawNode::new(config, storage, &logger).unwrap();
// We will coax it into being the lead of a single node cluster for exploration.
node.raft.become_candidate();
node.raft.become_leader();
Expand All @@ -62,7 +62,7 @@ channel `recv_timeout` to drive the Raft node at least every 100ms, calling
# let config = Config { id: 1, ..Default::default() };
# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
# let mut node = RawNode::new(&config, store, &logger).unwrap();
# let mut node = RawNode::new(config, store, &logger).unwrap();
# node.raft.become_candidate();
# node.raft.become_leader();
use std::{sync::mpsc::{channel, RecvTimeoutError}, time::{Instant, Duration}};
Expand Down Expand Up @@ -129,7 +129,7 @@ Here is a simple example to use `propose` and `step`:
# let config = Config { id: 1, ..Default::default() };
# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
# let mut node = RawNode::new(&config, store, &logger).unwrap();
# let mut node = RawNode::new(config, store, &logger).unwrap();
# node.raft.become_candidate();
# node.raft.become_leader();
#
Expand Down Expand Up @@ -188,11 +188,11 @@ state:
# use slog::{Drain, o};
# use raft::{Config, storage::MemStorage, raw_node::RawNode};
#
# let config = Config { id: 1, ..Default::default() };
# let mut config = Config { id: 1, ..Default::default() };
# config.validate().unwrap();
# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
# let mut node = RawNode::new(&config, store, &logger).unwrap();
# let mut node = RawNode::new(config, store, &logger).unwrap();
#
if !node.has_ready() {
return;
Expand All @@ -212,11 +212,11 @@ a Raft snapshot from the leader and we must apply the snapshot:
# use slog::{Drain, o};
# use raft::{Config, storage::MemStorage, raw_node::RawNode};
#
# let config = Config { id: 1, ..Default::default() };
# let mut config = Config { id: 1, ..Default::default() };
# config.validate().unwrap();
# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
# let mut node = RawNode::new(&config, store, &logger).unwrap();
# let mut node = RawNode::new(config, store, &logger).unwrap();
#
# if !node.has_ready() {
# return;
Expand All @@ -240,11 +240,11 @@ entries but has not been committed yet, we must append the entries to the Raft l
# use slog::{Drain, o};
# use raft::{Config, storage::MemStorage, raw_node::RawNode};
#
# let config = Config { id: 1, ..Default::default() };
# let mut config = Config { id: 1, ..Default::default() };
# config.validate().unwrap();
# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
# let mut node = RawNode::new(&config, store, &logger).unwrap();
# let mut node = RawNode::new(config, store, &logger).unwrap();
#
# if !node.has_ready() {
# return;
Expand All @@ -266,11 +266,11 @@ We must persist the changed `HardState`:
# use slog::{Drain, o};
# use raft::{Config, storage::MemStorage, raw_node::RawNode};
#
# let config = Config { id: 1, ..Default::default() };
# let mut config = Config { id: 1, ..Default::default() };
# config.validate().unwrap();
# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
# let mut node = RawNode::new(&config, store, &logger).unwrap();
# let mut node = RawNode::new(config, store, &logger).unwrap();
#
# if !node.has_ready() {
# return;
Expand All @@ -292,11 +292,11 @@ messages to the leader after appending the Raft entries:
# use slog::{Drain, o};
# use raft::{Config, storage::MemStorage, raw_node::RawNode, StateRole};
#
# let config = Config { id: 1, ..Default::default() };
# let mut config = Config { id: 1, ..Default::default() };
# config.validate().unwrap();
# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
# let mut node = RawNode::new(&config, store, &logger).unwrap();
# let mut node = RawNode::new(config, store, &logger).unwrap();
#
# if !node.has_ready() {
# return;
Expand All @@ -322,11 +322,11 @@ need to update the applied index and resume `apply` later:
# use slog::{Drain, o};
# use raft::{Config, storage::MemStorage, raw_node::RawNode, eraftpb::EntryType};
#
# let config = Config { id: 1, ..Default::default() };
# let mut config = Config { id: 1, ..Default::default() };
# config.validate().unwrap();
# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
# let mut node = RawNode::new(&config, store, &logger).unwrap();
# let mut node = RawNode::new(config, store, &logger).unwrap();
#
# if !node.has_ready() {
# return;
Expand Down Expand Up @@ -366,11 +366,11 @@ need to update the applied index and resume `apply` later:
# use slog::{Drain, o};
# use raft::{Config, storage::MemStorage, raw_node::RawNode, eraftpb::EntryType};
#
# let config = Config { id: 1, ..Default::default() };
# let mut config = Config { id: 1, ..Default::default() };
# config.validate().unwrap();
# let store = MemStorage::new_with_conf_state((vec![1], vec![]));
# let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
# let mut node = RawNode::new(&config, store, &logger).unwrap();
# let mut node = RawNode::new(config, store, &logger).unwrap();
#
# if !node.has_ready() {
# return;
Expand Down Expand Up @@ -420,7 +420,7 @@ use slog::{Drain, o};
let mut config = Config { id: 1, ..Default::default() };
let store = MemStorage::new_with_conf_state((vec![1, 2], vec![]));
let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
let mut node = RawNode::new(&mut config, store, &logger).unwrap();
let mut node = RawNode::new(config, store, &logger).unwrap();
node.raft.become_candidate();
node.raft.become_leader();
Expand Down
5 changes: 3 additions & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ impl<T: Storage> Raft<T> {
let conf_state = &raft_state.conf_state;
let voters = &conf_state.voters;
let learners = &conf_state.learners;
let raft_log = RaftLog::new_with_size(store, logger.clone(), c.max_committed_size_per_ready);
let raft_log =
RaftLog::new_with_size(store, logger.clone(), c.max_committed_size_per_ready);
let mut r = Raft {
id: c.id,
read_states: Default::default(),
Expand Down Expand Up @@ -292,7 +293,7 @@ impl<T: Storage> Raft<T> {
"term" => r.term,
"commit" => r.raft_log.committed,
"applied" => r.raft_log.applied,
"last index" => dbg!(r.raft_log.last_index()),
"last index" => r.raft_log.last_index(),
"last term" => r.raft_log.last_term(),
"peers" => ?r.prs().voters().collect::<Vec<_>>(),
);
Expand Down
10 changes: 1 addition & 9 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,14 @@ where
}

impl<T: Storage> RaftLog<T> {

/// Creates a new raft log with a given storage and default options. It recovers
/// the log to the state that it just commits and applies the latest snapshot
pub fn new(store: T, logger: Logger) -> RaftLog<T> {
Self::new_with_size(store, logger, NO_LIMIT)
}

/// Creates a new raft log with the given storage and max next entries size
pub fn new_with_size(store: T, logger:Logger, max_next_ents_size: u64) -> RaftLog<T> {
pub fn new_with_size(store: T, logger: Logger, max_next_ents_size: u64) -> RaftLog<T> {
let first_index = store.first_index().unwrap();
let last_index = store.last_index().unwrap();

Expand All @@ -88,7 +87,6 @@ impl<T: Storage> RaftLog<T> {
}
}


/// Grabs the term from the last entry.
///
/// # Panics
Expand Down Expand Up @@ -372,7 +370,6 @@ impl<T: Storage> RaftLog<T> {
let offset = cmp::max(since_idx + 1, self.first_index());
let committed = self.committed;
if committed + 1 > offset {
dbg!(self.max_next_ents_size);
match self.slice(offset, committed + 1, self.max_next_ents_size) {
Ok(vec) => return Some(vec),
Err(e) => fatal!(self.unstable.logger, "{}", e),
Expand All @@ -385,7 +382,6 @@ impl<T: Storage> RaftLog<T> {
/// If applied is smaller than the index of snapshot, it returns all committed
/// entries after the index of snapshot.
pub fn next_entries(&self) -> Option<Vec<Entry>> {
dbg!(self.applied);
self.next_entries_since(self.applied)
}

Expand Down Expand Up @@ -456,8 +452,6 @@ impl<T: Storage> RaftLog<T> {
high: u64,
max_size: impl Into<Option<u64>>,
) -> Result<Vec<Entry>> {
dbg!(low);
dbg!(high);
let max_size = max_size.into();
if let Some(err) = self.must_check_outofbounds(low, high) {
return Err(err);
Expand All @@ -484,14 +478,12 @@ impl<T: Storage> RaftLog<T> {
Ok(entries) => {
ents = entries;
if (ents.len() as u64) < unstable_high - low {
dbg!("from storage");
return Ok(ents);
}
}
}
}

dbg!("from unstable");
if high > self.unstable.offset {
let offset = self.unstable.offset;
let unstable = self.unstable.slice(cmp::max(low, offset), high);
Expand Down
Loading

0 comments on commit 60d7771

Please sign in to comment.