diff --git a/components/test_raftstore/src/transport_simulate.rs b/components/test_raftstore/src/transport_simulate.rs index 623317935e7..bf4c8a4d806 100644 --- a/components/test_raftstore/src/transport_simulate.rs +++ b/components/test_raftstore/src/transport_simulate.rs @@ -312,14 +312,15 @@ impl Direction { /// Drop specified messages for the store with special region. /// -/// If `msg_type` is None, all message will be filtered. +/// If `drop_type` is empty, all message will be dropped. #[derive(Clone)] pub struct RegionPacketFilter { region_id: u64, store_id: u64, direction: Direction, block: Either, Arc>, - msg_type: Option, + drop_type: Vec, + skip_type: Vec, dropped_messages: Option>>>, msg_callback: Option>, } @@ -330,14 +331,13 @@ impl Filter for RegionPacketFilter { let region_id = m.get_region_id(); let from_store_id = m.get_from_peer().get_store_id(); let to_store_id = m.get_to_peer().get_store_id(); + let msg_type = m.get_message().get_msg_type(); if self.region_id == region_id && (self.direction.is_send() && self.store_id == from_store_id || self.direction.is_recv() && self.store_id == to_store_id) - && self - .msg_type - .as_ref() - .map_or(true, |t| t == &m.get_message().get_msg_type()) + && (self.drop_type.is_empty() || self.drop_type.contains(&msg_type)) + && !self.skip_type.contains(&msg_type) { if let Some(f) = self.msg_callback.as_ref() { f(m) @@ -373,7 +373,8 @@ impl RegionPacketFilter { region_id, store_id, direction: Direction::Both, - msg_type: None, + drop_type: vec![], + skip_type: vec![], block: Either::Right(Arc::new(AtomicBool::new(true))), dropped_messages: None, msg_callback: None, @@ -385,8 +386,14 @@ impl RegionPacketFilter { self } + // TODO: rename it to `drop`. pub fn msg_type(mut self, m_type: MessageType) -> RegionPacketFilter { - self.msg_type = Some(m_type); + self.drop_type.push(m_type); + self + } + + pub fn skip(mut self, m_type: MessageType) -> RegionPacketFilter { + self.skip_type.push(m_type); self } diff --git a/src/raftstore/store/fsm/apply.rs b/src/raftstore/store/fsm/apply.rs index 92a7ca999c6..94a3bc2344e 100644 --- a/src/raftstore/store/fsm/apply.rs +++ b/src/raftstore/store/fsm/apply.rs @@ -2435,11 +2435,7 @@ impl ApplyFsm { apply_ctx.timer = Some(SlowTimer::new()); } - fail_point!( - "on_handle_apply_1000_1003", - self.delegate.region_id() == 1000 && self.delegate.id() == 1003, - |_| {} - ); + fail_point!("on_handle_apply_1003", self.delegate.id() == 1003, |_| {}); fail_point!("on_handle_apply", |_| {}); if apply.entries.is_empty() || self.delegate.pending_remove || self.delegate.stopped { @@ -2498,8 +2494,8 @@ impl ApplyFsm { ctx.flush(); } fail_point!( - "before_peer_destroy_1000_1003", - self.delegate.region_id() == 1000 && self.delegate.id() == 1003, + "before_peer_destroy_1003", + self.delegate.id() == 1003, |_| {} ); info!( @@ -2603,8 +2599,8 @@ impl ApplyFsm { fail_point!("after_handle_catch_up_logs_for_merge"); fail_point!( - "after_handle_catch_up_logs_for_merge_1000_1003", - self.delegate.region_id() == 1000 && self.delegate.id() == 1003, + "after_handle_catch_up_logs_for_merge_1003", + self.delegate.id() == 1003, |_| {} ); diff --git a/src/raftstore/store/fsm/peer.rs b/src/raftstore/store/fsm/peer.rs index 3b95adcef09..d077b0c8cfc 100644 --- a/src/raftstore/store/fsm/peer.rs +++ b/src/raftstore/store/fsm/peer.rs @@ -918,7 +918,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { let from_peer_id = msg.get_from_peer().get_id(); self.fsm.peer.insert_peer_cache(msg.take_from_peer()); - self.fsm.peer.step(msg.take_message())?; + self.fsm.peer.step(self.ctx, msg.take_message())?; if self.fsm.peer.any_new_peer_catch_up(from_peer_id) { self.fsm.peer.heartbeat_pd(self.ctx); @@ -1488,7 +1488,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { self.fsm.has_ready = true; self.fsm.peer.peers_start_pending_time.push((id, now)); } - self.fsm.peer.recent_conf_change_time = now; self.fsm.peer.insert_peer_cache(peer); } ConfChangeType::RemoveNode => { @@ -1501,7 +1500,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { .retain(|&(p, _)| p != peer_id); } self.fsm.peer.remove_peer_from_cache(peer_id); - self.fsm.peer.recent_conf_change_time = now; } ConfChangeType::BeginMembershipChange | ConfChangeType::FinalizeMembershipChange => { unimplemented!() diff --git a/src/raftstore/store/peer.rs b/src/raftstore/store/peer.rs index acd484ce745..e8535905e0c 100644 --- a/src/raftstore/store/peer.rs +++ b/src/raftstore/store/peer.rs @@ -205,7 +205,6 @@ pub struct Peer { pub peers_start_pending_time: Vec<(u64, Instant)>, /// A inaccurate cache about which peer is marked as down. down_peer_ids: Vec, - pub recent_conf_change_time: Instant, /// An inaccurate difference in region size since last reset. /// It is used to decide whether split check is needed. @@ -299,7 +298,6 @@ impl Peer { peer_heartbeats: HashMap::default(), peers_start_pending_time: vec![], down_peer_ids: vec![], - recent_conf_change_time: Instant::now(), size_diff_hint: 0, delete_keys_hint: 0, approximate_size: None, @@ -714,7 +712,11 @@ impl Peer { } /// Steps the raft message. - pub fn step(&mut self, mut m: eraftpb::Message) -> Result<()> { + pub fn step( + &mut self, + ctx: &mut PollContext, + mut m: eraftpb::Message, + ) -> Result<()> { fail_point!( "step_message_3_1", { self.peer.get_store_id() == 3 && self.region_id == 1 }, @@ -747,6 +749,10 @@ impl Peer { return Ok(()); } } + if msg_type == MessageType::MsgTransferLeader { + self.execute_transfer_leader(ctx, &m); + return Ok(()); + } self.raft_group.step(m)?; Ok(()) @@ -1800,43 +1806,68 @@ impl Peer { self.raft_group.transfer_leader(peer.get_id()); } + fn pre_transfer_leader(&mut self, peer: &metapb::Peer) -> bool { + // Checks if safe to transfer leader. + if self.raft_group.raft.has_pending_conf() { + info!( + "reject transfer leader due to pending conf change"; + "region_id" => self.region_id, + "peer_id" => self.peer.get_id(), + "peer" => ?peer, + ); + return false; + } + + // Broadcast heartbeat to make sure followers commit the entries immediately. + // It's only necessary to ping the target peer, but ping all for simplicity. + self.raft_group.ping(); + let mut msg = eraftpb::Message::new(); + msg.set_to(peer.get_id()); + msg.set_msg_type(eraftpb::MessageType::MsgTransferLeader); + msg.set_from(self.peer_id()); + msg.set_term(self.term()); + self.raft_group.raft.msgs.push(msg); + true + } + fn ready_to_transfer_leader( &self, ctx: &mut PollContext, + mut index: u64, peer: &metapb::Peer, - ) -> bool { + ) -> Option<&'static str> { let peer_id = peer.get_id(); let status = self.raft_group.status_ref(); let progress = status.progress.unwrap(); if !progress.voter_ids().contains(&peer_id) { - return false; + return Some("non voter"); } - for (_, progress) in progress.voters() { + for (id, progress) in progress.voters() { if progress.state == ProgressState::Snapshot { - return false; + return Some("pending snapshot"); + } + if *id == peer_id && index == 0 { + // index will be zero if it's sent from an instance without + // pre-transfer-leader feature. Set it to matched to make it + // possible to transfer leader to an older version. It may be + // useful during rolling restart. + index = progress.matched; } } - // Checks if safe to transfer leader. - // Check `has_pending_conf` is necessary because `recent_conf_change_time` is updated - // on applied. TODO: fix the transfer leader issue in Raft. if self.raft_group.raft.has_pending_conf() - || duration_to_sec(self.recent_conf_change_time.elapsed()) - < ctx.cfg.raft_reject_transfer_leader_duration.as_secs() as f64 + || self.raft_group.raft.pending_conf_index > index { - debug!( - "reject transfer leader due to the region was config changed recently"; - "region_id" => self.region_id, - "peer_id" => self.peer.get_id(), - "peer" => ?peer, - ); - return false; + return Some("pending conf change"); } let last_index = self.get_store().last_index(); - last_index <= progress.get(peer_id).unwrap().matched + ctx.cfg.leader_transfer_max_log_lag + if last_index >= index + ctx.cfg.leader_transfer_max_log_lag { + return Some("log gap"); + } + None } fn read_local( @@ -2183,7 +2214,75 @@ impl Peer { Ok(propose_index) } - // Return true to if the transfer leader request is accepted. + fn execute_transfer_leader( + &mut self, + ctx: &mut PollContext, + msg: &eraftpb::Message, + ) { + if msg.get_term() != self.term() { + return; + } + + if self.is_leader() { + let from = match self.get_peer_from_cache(msg.get_from()) { + Some(p) => p, + None => return, + }; + match self.ready_to_transfer_leader(ctx, msg.get_index(), &from) { + Some(reason) => { + info!( + "reject to transfer leader"; + "region_id" => self.region_id, + "peer_id" => self.peer.get_id(), + "to" => ?from, + "reason" => reason, + "index" => msg.get_index(), + "last_index" => self.get_store().last_index(), + ); + } + None => self.transfer_leader(&from), + } + return; + } + + if self.is_applying_snapshot() + || self.has_pending_snapshot() + || msg.get_from() != self.leader_id() + { + info!( + "reject transferring leader"; + "region_id" =>self.region_id, + "peer_id" => self.peer.get_id(), + "from" => msg.get_from(), + ); + return; + } + + let mut msg = eraftpb::Message::new(); + msg.set_from(self.peer_id()); + msg.set_to(self.leader_id()); + msg.set_msg_type(eraftpb::MessageType::MsgTransferLeader); + msg.set_index(self.get_store().applied_index()); + msg.set_term(self.term()); + self.raft_group.raft.msgs.push(msg); + } + + /// Return true to if the transfer leader request is accepted. + /// + /// When transferring leadership begins, leader sends a pre-transfer + /// to target follower first to ensures it's ready to become leader. + /// After that the real transfer leader process begin. + /// + /// 1. pre_transfer_leader on leader: + /// Leader will send a MsgTransferLeader to follower. + /// 2. execute_transfer_leader on follower + /// If follower passes all necessary checks, it will reply an + /// ACK with type MsgTransferLeader and its promised persistent index. + /// 3. execute_transfer_leader on leader: + /// Leader checks if it's appropriate to transfer leadership. If it + /// does, it calls raft transfer_leader API to do the remaining work. + /// + /// See also: tikv/rfcs#37. fn propose_transfer_leader( &mut self, ctx: &mut PollContext, @@ -2195,18 +2294,7 @@ impl Peer { let transfer_leader = get_transfer_leader_cmd(&req).unwrap(); let peer = transfer_leader.get_peer(); - let transferred = if self.ready_to_transfer_leader(ctx, peer) { - self.transfer_leader(peer); - true - } else { - info!( - "transfer leader message ignored directly"; - "region_id" => self.region_id, - "peer_id" => self.peer.get_id(), - "message" => ?req, - ); - false - }; + let transferred = self.pre_transfer_leader(peer); // transfer leader command doesn't need to replicate log and apply, so we // return immediately. Note that this command may fail, we can view it just as an advice diff --git a/tests/failpoints/cases/mod.rs b/tests/failpoints/cases/mod.rs index f9b0c00dbb6..21da42038f9 100644 --- a/tests/failpoints/cases/mod.rs +++ b/tests/failpoints/cases/mod.rs @@ -12,5 +12,6 @@ mod test_split_region; mod test_stale_peer; mod test_stale_read; mod test_storage; +mod test_transfer_leader; // TODO: enable these tests. // mod test_upgrade; diff --git a/tests/failpoints/cases/test_merge.rs b/tests/failpoints/cases/test_merge.rs index 8e096778982..25da9fefe99 100644 --- a/tests/failpoints/cases/test_merge.rs +++ b/tests/failpoints/cases/test_merge.rs @@ -238,12 +238,12 @@ fn test_node_merge_catch_up_logs_restart() { must_get_none(&cluster.get_engine(3), b"k11"); // after source peer is applied but before set it to tombstone - fail::cfg("after_handle_catch_up_logs_for_merge_1000_1003", "return()").unwrap(); + fail::cfg("after_handle_catch_up_logs_for_merge_1003", "return()").unwrap(); pd_client.must_merge(left.get_id(), right.get_id()); thread::sleep(Duration::from_millis(100)); cluster.shutdown(); - fail::remove("after_handle_catch_up_logs_for_merge_1000_1003"); + fail::remove("after_handle_catch_up_logs_for_merge_1003"); cluster.start().unwrap(); must_get_equal(&cluster.get_engine(3), b"k11", b"v11"); } @@ -274,7 +274,7 @@ fn test_node_merge_catch_up_logs_leader_election() { let state1 = cluster.truncated_state(1000, 1); // let the entries committed but not applied - fail::cfg("on_handle_apply_1000_1003", "pause").unwrap(); + fail::cfg("on_handle_apply_1003", "pause").unwrap(); for i in 2..20 { cluster.must_put(format!("k1{}", i).as_bytes(), b"v"); } @@ -297,13 +297,13 @@ fn test_node_merge_catch_up_logs_leader_election() { must_get_none(&cluster.get_engine(3), b"k11"); // let peer not destroyed before election timeout - fail::cfg("before_peer_destroy_1000_1003", "pause").unwrap(); - fail::remove("on_handle_apply_1000_1003"); + fail::cfg("before_peer_destroy_1003", "pause").unwrap(); + fail::remove("on_handle_apply_1003"); pd_client.must_merge(left.get_id(), right.get_id()); // wait election timeout thread::sleep(Duration::from_millis(500)); - fail::remove("before_peer_destroy_1000_1003"); + fail::remove("before_peer_destroy_1003"); must_get_equal(&cluster.get_engine(3), b"k11", b"v11"); } diff --git a/tests/failpoints/cases/test_stale_read.rs b/tests/failpoints/cases/test_stale_read.rs index 6f83d229467..df78fc02205 100644 --- a/tests/failpoints/cases/test_stale_read.rs +++ b/tests/failpoints/cases/test_stale_read.rs @@ -376,6 +376,7 @@ fn test_read_index_when_transfer_leader_2() { let filter = Box::new( RegionPacketFilter::new(r1.get_id(), old_leader.get_store_id()) .direction(Direction::Recv) + .skip(MessageType::MsgTransferLeader) .when(Arc::new(AtomicBool::new(true))) .reserve_dropped(Arc::clone(&dropped_msgs)), ); diff --git a/tests/failpoints/cases/test_transfer_leader.rs b/tests/failpoints/cases/test_transfer_leader.rs new file mode 100644 index 00000000000..7645a7157cf --- /dev/null +++ b/tests/failpoints/cases/test_transfer_leader.rs @@ -0,0 +1,41 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +use fail; +use test_raftstore::*; + +/// When a follower applies log slowly, leader should not transfer leader +/// to it. Otherwise, new leader may wait a long time to serve read/write +/// requests. +#[test] +fn test_transfer_leader_slow_apply() { + let _guard = crate::setup(); + + // 3 nodes cluster. + let mut cluster = new_node_cluster(0, 3); + + let pd_client = cluster.pd_client.clone(); + pd_client.disable_default_operator(); + + let r1 = cluster.run_conf_change(); + pd_client.must_add_peer(r1, new_peer(2, 1002)); + pd_client.must_add_peer(r1, new_peer(3, 1003)); + + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + let fp = "on_handle_apply_1003"; + fail::cfg(fp, "pause").unwrap(); + for i in 0..cluster.cfg.raft_store.leader_transfer_max_log_lag + 1 { + let bytes = format!("k{:03}", i).into_bytes(); + cluster.must_put(&bytes, &bytes); + } + cluster.transfer_leader(r1, new_peer(3, 1003)); + cluster.must_put(b"k2", b"v2"); + must_get_equal(&cluster.get_engine(1), b"k2", b"v2"); + assert_ne!(cluster.leader_of_region(r1).unwrap(), new_peer(3, 1003)); + fail::remove(fp); + cluster.must_transfer_leader(r1, new_peer(3, 1003)); + cluster.must_put(b"k3", b"v3"); + must_get_equal(&cluster.get_engine(3), b"k3", b"v3"); +} diff --git a/tests/integrations/raftstore/test_lease_read.rs b/tests/integrations/raftstore/test_lease_read.rs index a8cbf97aeb7..68336e79134 100644 --- a/tests/integrations/raftstore/test_lease_read.rs +++ b/tests/integrations/raftstore/test_lease_read.rs @@ -389,6 +389,7 @@ fn test_read_index_when_transfer_leader_1() { let filter = Box::new( RegionPacketFilter::new(r1.get_id(), old_leader.get_store_id()) .direction(Direction::Recv) + .skip(MessageType::MsgTransferLeader) .when(Arc::new(AtomicBool::new(true))) .reserve_dropped(Arc::clone(&dropped_msgs)), ); diff --git a/tests/integrations/raftstore/test_transfer_leader.rs b/tests/integrations/raftstore/test_transfer_leader.rs index b45eddbc008..591799a8744 100644 --- a/tests/integrations/raftstore/test_transfer_leader.rs +++ b/tests/integrations/raftstore/test_transfer_leader.rs @@ -59,12 +59,6 @@ fn test_server_basic_transfer_leader() { test_basic_transfer_leader(&mut cluster); } -#[test] -fn test_node_basic_transfer_leader() { - let mut cluster = new_node_cluster(0, 3); - test_basic_transfer_leader(&mut cluster); -} - fn test_pd_transfer_leader(cluster: &mut Cluster) { let pd_client = Arc::clone(&cluster.pd_client); pd_client.disable_default_operator(); @@ -119,12 +113,6 @@ fn test_server_pd_transfer_leader() { test_pd_transfer_leader(&mut cluster); } -#[test] -fn test_node_pd_transfer_leader() { - let mut cluster = new_node_cluster(0, 3); - test_pd_transfer_leader(&mut cluster); -} - fn test_transfer_leader_during_snapshot(cluster: &mut Cluster) { let pd_client = Arc::clone(&cluster.pd_client); // Disable default max peer count check. @@ -165,6 +153,7 @@ fn test_transfer_leader_during_snapshot(cluster: &mut Cluster) let resp = cluster.call_command_on_leader(put, Duration::from_secs(5)); // if it's transferring leader, resp will timeout. assert!(resp.is_ok(), "{:?}", resp); + must_get_equal(&cluster.get_engine(1), b"k1", b"v1"); } #[test] @@ -172,9 +161,3 @@ fn test_server_transfer_leader_during_snapshot() { let mut cluster = new_server_cluster(0, 3); test_transfer_leader_during_snapshot(&mut cluster); } - -#[test] -fn test_node_transfer_leader_during_snapshot() { - let mut cluster = new_node_cluster(0, 3); - test_transfer_leader_during_snapshot(&mut cluster); -}