From 7b260d1f7737a894d320830b323331c030bf1a86 Mon Sep 17 00:00:00 2001 From: Jay Date: Fri, 29 Jan 2021 21:13:44 +0800 Subject: [PATCH] resolve: detect tombstone correctly (#9593) ### What problem does this PR solve? Issue Number: close #9590 Problem Summary: PD client filter tombstone store and return error instead. Resolver should recognize the error and handle it correctly. ### Check List Tests - Unit test - Integration test ### Release note - Fix repeated tombstone logs when sunset nodes --- src/server/raft_client.rs | 36 ++++++----- src/server/resolve.rs | 23 +++++-- tests/integrations/server/raft_client.rs | 82 +++++++++++++++++++++--- 3 files changed, 110 insertions(+), 31 deletions(-) diff --git a/src/server/raft_client.rs b/src/server/raft_client.rs index 9724ac95051..5d6e202032f 100644 --- a/src/server/raft_client.rs +++ b/src/server/raft_client.rs @@ -800,25 +800,27 @@ where let (s, pool_len) = { let mut pool = self.pool.lock().unwrap(); if pool.tombstone_stores.contains(&store_id) { + let pool_len = pool.connections.len(); + drop(pool); + self.cache.resize(pool_len); return false; } - ( - pool.connections - .entry((store_id, conn_id)) - .or_insert_with(|| { - let queue = Arc::new(Queue::with_capacity(QUEUE_CAPACITY)); - let back_end = StreamBackEnd { - store_id, - queue: queue.clone(), - builder: self.builder.clone(), - }; - self.future_pool - .spawn(start(back_end, conn_id, self.pool.clone())); - queue - }) - .clone(), - pool.connections.len(), - ) + let conn = pool + .connections + .entry((store_id, conn_id)) + .or_insert_with(|| { + let queue = Arc::new(Queue::with_capacity(QUEUE_CAPACITY)); + let back_end = StreamBackEnd { + store_id, + queue: queue.clone(), + builder: self.builder.clone(), + }; + self.future_pool + .spawn(start(back_end, conn_id, self.pool.clone())); + queue + }) + .clone(); + (conn, pool.connections.len()) }; self.cache.resize(pool_len); self.cache.insert( diff --git a/src/server/resolve.rs b/src/server/resolve.rs index 4fd9c355cc2..3e356723fe1 100644 --- a/src/server/resolve.rs +++ b/src/server/resolve.rs @@ -6,7 +6,6 @@ use std::time::Instant; use collections::HashMap; use engine_rocks::RocksEngine; -use kvproto::metapb; use kvproto::replication_modepb::ReplicationMode; use pd_client::{take_peer_address, PdClient}; use raftstore::router::RaftStoreRouter; @@ -82,7 +81,16 @@ where fn get_address(&self, store_id: u64) -> Result { let pd_client = Arc::clone(&self.pd_client); - let mut s = box_try!(pd_client.get_store(store_id)); + let mut s = match pd_client.get_store(store_id) { + Ok(s) => s, + // `get_store` will filter tombstone store, so here needs to handle + // it explicitly. + Err(pd_client::Error::StoreTombstone(_)) => { + RESOLVE_STORE_COUNTER_STATIC.tombstone.inc(); + return Err(box_err!("store {} has been removed", store_id)); + } + Err(e) => return Err(box_err!(e)), + }; let mut group_id = None; let mut state = self.state.lock().unwrap(); if state.status().get_mode() == ReplicationMode::DrAutoSync { @@ -97,10 +105,6 @@ where if let Some(group_id) = group_id { self.router.report_resolved(store_id, group_id); } - if s.get_state() == metapb::StoreState::Tombstone { - RESOLVE_STORE_COUNTER_STATIC.tombstone.inc(); - return Err(box_err!("store {} has been removed", store_id)); - } let addr = take_peer_address(&mut s); // In some tests, we use empty address for store first, // so we should ignore here. @@ -191,6 +195,13 @@ mod tests { impl PdClient for MockPdClient { fn get_store(&self, _: u64) -> Result { + if self.store.get_state() == metapb::StoreState::Tombstone { + // Simulate the behavior of `get_store` in pd client. + return Err(pd_client::Error::StoreTombstone(format!( + "{:?}", + self.store + ))); + } // The store address will be changed every millisecond. let mut store = self.store.clone(); let mut sock = SocketAddr::from_str(store.get_address()).unwrap(); diff --git a/tests/integrations/server/raft_client.rs b/tests/integrations/server/raft_client.rs index 041cd4ed140..a13b5f2ff46 100644 --- a/tests/integrations/server/raft_client.rs +++ b/tests/integrations/server/raft_client.rs @@ -10,15 +10,18 @@ use futures::{FutureExt, StreamExt, TryStreamExt}; use grpcio::{ ClientStreamingSink, Environment, RequestStream, RpcContext, RpcStatus, RpcStatusCode, Server, }; +use kvproto::metapb; use kvproto::raft_serverpb::{Done, RaftMessage}; use kvproto::tikvpb::BatchRaftMessage; use raft::eraftpb::Entry; +use raftstore::errors::DiscardReason; use raftstore::router::{RaftStoreBlackHole, RaftStoreRouter}; use security::{SecurityConfig, SecurityManager}; use tikv::server::resolve::Callback; use tikv::server::{ - self, Config, ConnectionBuilder, RaftClient, StoreAddrResolver, TestRaftStoreRouter, + self, resolve, Config, ConnectionBuilder, RaftClient, StoreAddrResolver, TestRaftStoreRouter, }; +use tikv_util::worker::Builder as WorkerBuilder; use tikv_util::worker::LazyWorker; use super::{mock_kv_service, MockKv, MockKvService}; @@ -28,6 +31,12 @@ pub struct StaticResolver { port: u16, } +impl StaticResolver { + fn new(port: u16) -> StaticResolver { + StaticResolver { port } + } +} + impl StoreAddrResolver for StaticResolver { fn resolve(&self, _store_id: u64, cb: Callback) -> server::Result<()> { cb(Ok(format!("localhost:{}", self.port))); @@ -35,22 +44,22 @@ impl StoreAddrResolver for StaticResolver { } } -pub fn get_raft_client_with_router(router: R, port: u16) -> RaftClient +fn get_raft_client(router: R, resolver: T) -> RaftClient where R: RaftStoreRouter + Unpin + 'static, + T: StoreAddrResolver + 'static, { let env = Arc::new(Environment::new(2)); let cfg = Arc::new(Config::default()); let security_mgr = Arc::new(SecurityManager::new(&SecurityConfig::default()).unwrap()); - let resolver = StaticResolver { port }; let worker = LazyWorker::new("test-raftclient"); let builder = ConnectionBuilder::new(env, cfg, security_mgr, resolver, router, worker.scheduler()); RaftClient::new(builder) } -pub fn get_raft_client(port: u16) -> RaftClient { - get_raft_client_with_router(RaftStoreBlackHole, port) +fn get_raft_client_by_port(port: u16) -> RaftClient { + get_raft_client(RaftStoreBlackHole, StaticResolver::new(port)) } #[derive(Clone)] @@ -127,7 +136,7 @@ fn test_batch_raft_fallback() { let service = MockKvForRaft::new(Arc::clone(&msg_count), Arc::clone(&batch_msg_count), false); let (mock_server, port) = create_mock_server(service, 60000, 60100).unwrap(); - let mut raft_client = get_raft_client(port); + let mut raft_client = get_raft_client_by_port(port); (0..100).for_each(|_| { raft_client.send(RaftMessage::default()).unwrap(); thread::sleep(time::Duration::from_millis(10)); @@ -150,7 +159,7 @@ fn test_raft_client_reconnect() { let (tx, rx) = mpsc::channel(); let (significant_msg_sender, _significant_msg_receiver) = mpsc::channel(); let router = TestRaftStoreRouter::new(tx, significant_msg_sender); - let mut raft_client = get_raft_client_with_router(router, port); + let mut raft_client = get_raft_client(router, StaticResolver::new(port)); (0..50).for_each(|_| raft_client.send(RaftMessage::default()).unwrap()); raft_client.flush(); @@ -186,7 +195,7 @@ fn test_batch_size_limit() { let service = MockKvForRaft::new(Arc::clone(&msg_count), Arc::clone(&batch_msg_count), true); let (mock_server, port) = create_mock_server(service, 60200, 60300).unwrap(); - let mut raft_client = get_raft_client(port); + let mut raft_client = get_raft_client_by_port(port); // `send` should success. for _ in 0..10 { @@ -251,3 +260,60 @@ fn check_msg_count(max_delay_ms: u64, count: &AtomicUsize, expected: usize) { } panic!("check_msg_count wants {}, gets {}", expected, got); } + +/// Check if raft client can add tombstone stores in block list. +#[test] +fn test_tombstone_block_list() { + let pd_server = test_pd::Server::new(1); + let eps = pd_server.bind_addrs(); + let pd_client = Arc::new(test_pd::util::new_client(eps, None)); + let bg_worker = WorkerBuilder::new(thd_name!("background")) + .thread_count(2) + .create(); + let resolver = resolve::new_resolver(pd_client, &bg_worker, RaftStoreBlackHole).0; + + let msg_count = Arc::new(AtomicUsize::new(0)); + let batch_msg_count = Arc::new(AtomicUsize::new(0)); + let service = MockKvForRaft::new(Arc::clone(&msg_count), Arc::clone(&batch_msg_count), true); + let (_mock_server, port) = create_mock_server(service, 60200, 60300).unwrap(); + + let mut raft_client = get_raft_client(RaftStoreBlackHole, resolver); + + let mut store1 = metapb::Store::default(); + store1.set_id(1); + store1.set_address(format!("127.0.0.1:{}", port)); + pd_server.default_handler().add_store(store1.clone()); + + // `send` should success. + for _ in 0..10 { + // 5M per RaftMessage. + let mut raft_m = RaftMessage::default(); + raft_m.mut_to_peer().set_store_id(1); + for _ in 0..(5 * 1024) { + let mut e = Entry::default(); + e.set_data(vec![b'a'; 1024]); + raft_m.mut_message().mut_entries().push(e); + } + raft_client.send(raft_m).unwrap(); + } + raft_client.flush(); + + check_msg_count(500, &msg_count, 10); + + let mut store2 = metapb::Store::default(); + store2.set_id(2); + store2.set_address(store1.get_address().to_owned()); + store2.set_state(metapb::StoreState::Tombstone); + pd_server.default_handler().add_store(store2); + let mut message = RaftMessage::default(); + message.mut_to_peer().set_store_id(2); + // First message should be OK. + raft_client.send(message.clone()).unwrap(); + // Wait some time for the resolve result. + thread::sleep(time::Duration::from_millis(50)); + // Second message should fail as the store should be added to block list. + assert_eq!( + DiscardReason::Disconnected, + raft_client.send(message).unwrap_err() + ); +}