Skip to content

Commit

Permalink
Introduce pre transfer leader (tikv#6539)
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay committed Feb 8, 2020
1 parent ca57531 commit cffe822
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 77 deletions.
23 changes: 15 additions & 8 deletions components/test_raftstore/src/transport_simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,15 @@ impl Direction {

/// Drop specified messages for the store with special region.
///
/// If `msg_type` is None, all message will be filtered.
/// If `drop_type` is empty, all message will be dropped.
#[derive(Clone)]
pub struct RegionPacketFilter {
region_id: u64,
store_id: u64,
direction: Direction,
block: Either<Arc<AtomicUsize>, Arc<AtomicBool>>,
msg_type: Option<MessageType>,
drop_type: Vec<MessageType>,
skip_type: Vec<MessageType>,
dropped_messages: Option<Arc<Mutex<Vec<RaftMessage>>>>,
msg_callback: Option<Arc<dyn Fn(&RaftMessage) + Send + Sync>>,
}
Expand All @@ -329,14 +330,13 @@ impl Filter for RegionPacketFilter {
let region_id = m.get_region_id();
let from_store_id = m.get_from_peer().get_store_id();
let to_store_id = m.get_to_peer().get_store_id();
let msg_type = m.get_message().get_msg_type();

if self.region_id == region_id
&& (self.direction.is_send() && self.store_id == from_store_id
|| self.direction.is_recv() && self.store_id == to_store_id)
&& self
.msg_type
.as_ref()
.map_or(true, |t| t == &m.get_message().get_msg_type())
&& (self.drop_type.is_empty() || self.drop_type.contains(&msg_type))
&& !self.skip_type.contains(&msg_type)
{
if let Some(f) = self.msg_callback.as_ref() {
f(m)
Expand Down Expand Up @@ -372,7 +372,8 @@ impl RegionPacketFilter {
region_id,
store_id,
direction: Direction::Both,
msg_type: None,
drop_type: vec![],
skip_type: vec![],
block: Either::Right(Arc::new(AtomicBool::new(true))),
dropped_messages: None,
msg_callback: None,
Expand All @@ -384,8 +385,14 @@ impl RegionPacketFilter {
self
}

// TODO: rename it to `drop`.
pub fn msg_type(mut self, m_type: MessageType) -> RegionPacketFilter {
self.msg_type = Some(m_type);
self.drop_type.push(m_type);
self
}

pub fn skip(mut self, m_type: MessageType) -> RegionPacketFilter {
self.skip_type.push(m_type);
self
}

Expand Down
14 changes: 5 additions & 9 deletions src/raftstore/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2388,11 +2388,7 @@ impl ApplyFsm {
apply_ctx.timer = Some(SlowTimer::new());
}

fail_point!(
"on_handle_apply_1000_1003",
self.delegate.region_id() == 1000 && self.delegate.id() == 1003,
|_| {}
);
fail_point!("on_handle_apply_1003", self.delegate.id() == 1003, |_| {});
fail_point!("on_handle_apply", |_| {});

if apply.entries.is_empty() || self.delegate.pending_remove || self.delegate.stopped {
Expand Down Expand Up @@ -2451,8 +2447,8 @@ impl ApplyFsm {
ctx.flush();
}
fail_point!(
"before_peer_destroy_1000_1003",
self.delegate.region_id() == 1000 && self.delegate.id() == 1003,
"before_peer_destroy_1003",
self.delegate.id() == 1003,
|_| {}
);
info!(
Expand Down Expand Up @@ -2556,8 +2552,8 @@ impl ApplyFsm {

fail_point!("after_handle_catch_up_logs_for_merge");
fail_point!(
"after_handle_catch_up_logs_for_merge_1000_1003",
self.delegate.region_id() == 1000 && self.delegate.id() == 1003,
"after_handle_catch_up_logs_for_merge_1003",
self.delegate.id() == 1003,
|_| {}
);

Expand Down
4 changes: 1 addition & 3 deletions src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {

let from_peer_id = msg.get_from_peer().get_id();
self.fsm.peer.insert_peer_cache(msg.take_from_peer());
self.fsm.peer.step(msg.take_message())?;
self.fsm.peer.step(self.ctx, msg.take_message())?;

if self.fsm.peer.any_new_peer_catch_up(from_peer_id) {
self.fsm.peer.heartbeat_pd(self.ctx);
Expand Down Expand Up @@ -1440,7 +1440,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
if self.fsm.peer.is_leader() {
self.fsm.peer.peers_start_pending_time.push((id, now));
}
self.fsm.peer.recent_conf_change_time = now;
self.fsm.peer.insert_peer_cache(peer);
}
ConfChangeType::RemoveNode => {
Expand All @@ -1453,7 +1452,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
.retain(|&(p, _)| p != peer_id);
}
self.fsm.peer.remove_peer_from_cache(peer_id);
self.fsm.peer.recent_conf_change_time = now;
}
}

Expand Down
151 changes: 118 additions & 33 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ pub struct Peer {
pub peers_start_pending_time: Vec<(u64, Instant)>,
/// A inaccurate cache about which peer is marked as down.
down_peer_ids: Vec<u64>,
pub recent_conf_change_time: Instant,

/// An inaccurate difference in region size since last reset.
/// It is used to decide whether split check is needed.
Expand Down Expand Up @@ -294,7 +293,6 @@ impl Peer {
peer_heartbeats: HashMap::default(),
peers_start_pending_time: vec![],
down_peer_ids: vec![],
recent_conf_change_time: Instant::now(),
size_diff_hint: 0,
delete_keys_hint: 0,
approximate_size: None,
Expand Down Expand Up @@ -702,7 +700,7 @@ impl Peer {
}

/// Steps the raft message.
pub fn step(&mut self, mut m: eraftpb::Message) -> Result<()> {
pub fn step<T, C>(&mut self, ctx: &mut PollContext<T, C>, m: eraftpb::Message) -> Result<()> {
fail_point!(
"step_message_3_1",
{ self.peer.get_store_id() == 3 && self.region_id == 1 },
Expand Down Expand Up @@ -736,6 +734,11 @@ impl Peer {
}
}

if msg_type == MessageType::MsgTransferLeader {
self.execute_transfer_leader(ctx, &m);
return Ok(());
}

self.raft_group.step(m)?;
Ok(())
}
Expand Down Expand Up @@ -1737,43 +1740,68 @@ impl Peer {
self.raft_group.transfer_leader(peer.get_id());
}

fn pre_transfer_leader(&mut self, peer: &metapb::Peer) -> bool {
// Checks if safe to transfer leader.
if self.raft_group.raft.has_pending_conf() {
info!(
"reject transfer leader due to pending conf change";
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
"peer" => ?peer,
);
return false;
}

// Broadcast heartbeat to make sure followers commit the entries immediately.
// It's only necessary to ping the target peer, but ping all for simplicity.
self.raft_group.ping();
let mut msg = eraftpb::Message::new();
msg.set_to(peer.get_id());
msg.set_msg_type(eraftpb::MessageType::MsgTransferLeader);
msg.set_from(self.peer_id());
msg.set_term(self.term());
self.raft_group.raft.msgs.push(msg);
true
}

fn ready_to_transfer_leader<T, C>(
&self,
ctx: &mut PollContext<T, C>,
mut index: u64,
peer: &metapb::Peer,
) -> bool {
) -> Option<&'static str> {
let peer_id = peer.get_id();
let status = self.raft_group.status_ref();
let progress = status.progress.unwrap();

if !progress.voters().contains_key(&peer_id) {
return false;
return Some("non voter");
}

for progress in progress.voters().values() {
for (id, progress) in progress.voters() {
if progress.state == ProgressState::Snapshot {
return false;
return Some("pending snapshot");
}
if *id == peer_id && index == 0 {
// index will be zero if it's sent from an instance without
// pre-transfer-leader feature. Set it to matched to make it
// possible to transfer leader to an older version. It may be
// useful during rolling restart.
index = progress.matched;
}
}

// Checks if safe to transfer leader.
// Check `has_pending_conf` is necessary because `recent_conf_change_time` is updated
// on applied. TODO: fix the transfer leader issue in Raft.
if self.raft_group.raft.has_pending_conf()
|| duration_to_sec(self.recent_conf_change_time.elapsed())
< ctx.cfg.raft_reject_transfer_leader_duration.as_secs() as f64
|| self.raft_group.raft.pending_conf_index > index
{
debug!(
"reject transfer leader due to the region was config changed recently";
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
"peer" => ?peer,
);
return false;
return Some("pending conf change");
}

let last_index = self.get_store().last_index();
last_index <= progress.voters()[&peer_id].matched + ctx.cfg.leader_transfer_max_log_lag
if last_index >= index + ctx.cfg.leader_transfer_max_log_lag {
return Some("log gap");
}
None
}

fn read_local<T, C>(&mut self, ctx: &mut PollContext<T, C>, req: RaftCmdRequest, cb: Callback) {
Expand Down Expand Up @@ -2089,7 +2117,75 @@ impl Peer {
Ok(propose_index)
}

// Return true to if the transfer leader request is accepted.
fn execute_transfer_leader<T, C>(
&mut self,
ctx: &mut PollContext<T, C>,
msg: &eraftpb::Message,
) {
if msg.get_term() != self.term() {
return;
}

if self.is_leader() {
let from = match self.get_peer_from_cache(msg.get_from()) {
Some(p) => p,
None => return,
};
match self.ready_to_transfer_leader(ctx, msg.get_index(), &from) {
Some(reason) => {
info!(
"reject to transfer leader";
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
"to" => ?from,
"reason" => reason,
"index" => msg.get_index(),
"last_index" => self.get_store().last_index(),
);
}
None => self.transfer_leader(&from),
}
return;
}

if self.is_applying_snapshot()
|| self.has_pending_snapshot()
|| msg.get_from() != self.leader_id()
{
info!(
"reject transferring leader";
"region_id" =>self.region_id,
"peer_id" => self.peer.get_id(),
"from" => msg.get_from(),
);
return;
}

let mut msg = eraftpb::Message::new();
msg.set_from(self.peer_id());
msg.set_to(self.leader_id());
msg.set_msg_type(eraftpb::MessageType::MsgTransferLeader);
msg.set_index(self.get_store().applied_index());
msg.set_term(self.term());
self.raft_group.raft.msgs.push(msg);
}

/// Return true to if the transfer leader request is accepted.
///
/// When transferring leadership begins, leader sends a pre-transfer
/// to target follower first to ensures it's ready to become leader.
/// After that the real transfer leader process begin.
///
/// 1. pre_transfer_leader on leader:
/// Leader will send a MsgTransferLeader to follower.
/// 2. execute_transfer_leader on follower
/// If follower passes all necessary checks, it will reply an
/// ACK with type MsgTransferLeader and its promised persistent index.
/// 3. execute_transfer_leader on leader:
/// Leader checks if it's appropriate to transfer leadership. If it
/// does, it calls raft transfer_leader API to do the remaining work.
///
/// See also: tikv/rfcs#37.
fn propose_transfer_leader<T, C>(
&mut self,
ctx: &mut PollContext<T, C>,
Expand All @@ -2101,18 +2197,7 @@ impl Peer {
let transfer_leader = get_transfer_leader_cmd(&req).unwrap();
let peer = transfer_leader.get_peer();

let transferred = if self.ready_to_transfer_leader(ctx, peer) {
self.transfer_leader(peer);
true
} else {
info!(
"transfer leader message ignored directly";
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
"message" => ?req,
);
false
};
let transferred = self.pre_transfer_leader(peer);

// transfer leader command doesn't need to replicate log and apply, so we
// return immediately. Note that this command may fail, we can view it just as an advice
Expand Down
1 change: 1 addition & 0 deletions tests/failpoints/cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ mod test_split_region;
mod test_stale_peer;
mod test_stale_read;
mod test_storage;
mod test_transfer_leader;
// TODO: enable these tests.
// mod test_upgrade;
12 changes: 6 additions & 6 deletions tests/failpoints/cases/test_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,12 @@ fn test_node_merge_catch_up_logs_restart() {
must_get_none(&cluster.get_engine(3), b"k11");

// after source peer is applied but before set it to tombstone
fail::cfg("after_handle_catch_up_logs_for_merge_1000_1003", "return()").unwrap();
fail::cfg("after_handle_catch_up_logs_for_merge_1003", "return()").unwrap();
pd_client.must_merge(left.get_id(), right.get_id());
thread::sleep(Duration::from_millis(100));
cluster.shutdown();

fail::remove("after_handle_catch_up_logs_for_merge_1000_1003");
fail::remove("after_handle_catch_up_logs_for_merge_1003");
cluster.start().unwrap();
must_get_equal(&cluster.get_engine(3), b"k11", b"v11");
}
Expand Down Expand Up @@ -273,7 +273,7 @@ fn test_node_merge_catch_up_logs_leader_election() {

let state1 = cluster.truncated_state(1000, 1);
// let the entries committed but not applied
fail::cfg("on_handle_apply_1000_1003", "pause").unwrap();
fail::cfg("on_handle_apply_1003", "pause").unwrap();
for i in 2..20 {
cluster.must_put(format!("k1{}", i).as_bytes(), b"v");
}
Expand All @@ -296,13 +296,13 @@ fn test_node_merge_catch_up_logs_leader_election() {
must_get_none(&cluster.get_engine(3), b"k11");

// let peer not destroyed before election timeout
fail::cfg("before_peer_destroy_1000_1003", "pause").unwrap();
fail::remove("on_handle_apply_1000_1003");
fail::cfg("before_peer_destroy_1003", "pause").unwrap();
fail::remove("on_handle_apply_1003");
pd_client.must_merge(left.get_id(), right.get_id());

// wait election timeout
thread::sleep(Duration::from_millis(500));
fail::remove("before_peer_destroy_1000_1003");
fail::remove("before_peer_destroy_1003");

must_get_equal(&cluster.get_engine(3), b"k11", b"v11");
}
Expand Down
1 change: 1 addition & 0 deletions tests/failpoints/cases/test_stale_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ fn test_read_index_when_transfer_leader_2() {
let filter = Box::new(
RegionPacketFilter::new(r1.get_id(), old_leader.get_store_id())
.direction(Direction::Recv)
.skip(MessageType::MsgTransferLeader)
.when(Arc::new(AtomicBool::new(true)))
.reserve_dropped(Arc::clone(&dropped_msgs)),
);
Expand Down
Loading

0 comments on commit cffe822

Please sign in to comment.