Skip to content

Commit

Permalink
resolve: detect tombstone correctly (tikv#9593)
Browse files Browse the repository at this point in the history
<!--
Thank you for contributing to TiKV!

If you haven't already, please read TiKV's [CONTRIBUTING](https://github.com/tikv/tikv/blob/master/CONTRIBUTING.md) document.

If you're unsure about anything, just ask; somebody should be along to answer within a day or two.

PR Title Format:
1. module [, module2, module3]: what's changed
2. *: what's changed

If you want to open the **Challenge Program** pull request, please use the following template:
https://raw.githubusercontent.com/tikv/.github/master/.github/PULL_REQUEST_TEMPLATE/challenge-program.md
You can use it with query parameters: https://github.com/tikv/tikv/compare/master...${you branch}?template=challenge-program.md
-->

### What problem does this PR solve?

Issue Number: close tikv#9590

Problem Summary:

PD client filter tombstone store and return error instead. Resolver should recognize the error and handle it correctly.

### Check List <!--REMOVE the items that are not applicable-->

Tests <!-- At least one of them must be included. -->

- Unit test
- Integration test

### Release note <!-- bugfixes or new feature need a release note -->
- Fix repeated tombstone logs when sunset nodes
  • Loading branch information
BusyJay authored Jan 29, 2021
1 parent a2c7aac commit 7b260d1
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 31 deletions.
36 changes: 19 additions & 17 deletions src/server/raft_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,25 +800,27 @@ where
let (s, pool_len) = {
let mut pool = self.pool.lock().unwrap();
if pool.tombstone_stores.contains(&store_id) {
let pool_len = pool.connections.len();
drop(pool);
self.cache.resize(pool_len);
return false;
}
(
pool.connections
.entry((store_id, conn_id))
.or_insert_with(|| {
let queue = Arc::new(Queue::with_capacity(QUEUE_CAPACITY));
let back_end = StreamBackEnd {
store_id,
queue: queue.clone(),
builder: self.builder.clone(),
};
self.future_pool
.spawn(start(back_end, conn_id, self.pool.clone()));
queue
})
.clone(),
pool.connections.len(),
)
let conn = pool
.connections
.entry((store_id, conn_id))
.or_insert_with(|| {
let queue = Arc::new(Queue::with_capacity(QUEUE_CAPACITY));
let back_end = StreamBackEnd {
store_id,
queue: queue.clone(),
builder: self.builder.clone(),
};
self.future_pool
.spawn(start(back_end, conn_id, self.pool.clone()));
queue
})
.clone();
(conn, pool.connections.len())
};
self.cache.resize(pool_len);
self.cache.insert(
Expand Down
23 changes: 17 additions & 6 deletions src/server/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::time::Instant;

use collections::HashMap;
use engine_rocks::RocksEngine;
use kvproto::metapb;
use kvproto::replication_modepb::ReplicationMode;
use pd_client::{take_peer_address, PdClient};
use raftstore::router::RaftStoreRouter;
Expand Down Expand Up @@ -82,7 +81,16 @@ where

fn get_address(&self, store_id: u64) -> Result<String> {
let pd_client = Arc::clone(&self.pd_client);
let mut s = box_try!(pd_client.get_store(store_id));
let mut s = match pd_client.get_store(store_id) {
Ok(s) => s,
// `get_store` will filter tombstone store, so here needs to handle
// it explicitly.
Err(pd_client::Error::StoreTombstone(_)) => {
RESOLVE_STORE_COUNTER_STATIC.tombstone.inc();
return Err(box_err!("store {} has been removed", store_id));
}
Err(e) => return Err(box_err!(e)),
};
let mut group_id = None;
let mut state = self.state.lock().unwrap();
if state.status().get_mode() == ReplicationMode::DrAutoSync {
Expand All @@ -97,10 +105,6 @@ where
if let Some(group_id) = group_id {
self.router.report_resolved(store_id, group_id);
}
if s.get_state() == metapb::StoreState::Tombstone {
RESOLVE_STORE_COUNTER_STATIC.tombstone.inc();
return Err(box_err!("store {} has been removed", store_id));
}
let addr = take_peer_address(&mut s);
// In some tests, we use empty address for store first,
// so we should ignore here.
Expand Down Expand Up @@ -191,6 +195,13 @@ mod tests {

impl PdClient for MockPdClient {
fn get_store(&self, _: u64) -> Result<metapb::Store> {
if self.store.get_state() == metapb::StoreState::Tombstone {
// Simulate the behavior of `get_store` in pd client.
return Err(pd_client::Error::StoreTombstone(format!(
"{:?}",
self.store
)));
}
// The store address will be changed every millisecond.
let mut store = self.store.clone();
let mut sock = SocketAddr::from_str(store.get_address()).unwrap();
Expand Down
82 changes: 74 additions & 8 deletions tests/integrations/server/raft_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ use futures::{FutureExt, StreamExt, TryStreamExt};
use grpcio::{
ClientStreamingSink, Environment, RequestStream, RpcContext, RpcStatus, RpcStatusCode, Server,
};
use kvproto::metapb;
use kvproto::raft_serverpb::{Done, RaftMessage};
use kvproto::tikvpb::BatchRaftMessage;
use raft::eraftpb::Entry;
use raftstore::errors::DiscardReason;
use raftstore::router::{RaftStoreBlackHole, RaftStoreRouter};
use security::{SecurityConfig, SecurityManager};
use tikv::server::resolve::Callback;
use tikv::server::{
self, Config, ConnectionBuilder, RaftClient, StoreAddrResolver, TestRaftStoreRouter,
self, resolve, Config, ConnectionBuilder, RaftClient, StoreAddrResolver, TestRaftStoreRouter,
};
use tikv_util::worker::Builder as WorkerBuilder;
use tikv_util::worker::LazyWorker;

use super::{mock_kv_service, MockKv, MockKvService};
Expand All @@ -28,29 +31,35 @@ pub struct StaticResolver {
port: u16,
}

impl StaticResolver {
fn new(port: u16) -> StaticResolver {
StaticResolver { port }
}
}

impl StoreAddrResolver for StaticResolver {
fn resolve(&self, _store_id: u64, cb: Callback) -> server::Result<()> {
cb(Ok(format!("localhost:{}", self.port)));
Ok(())
}
}

pub fn get_raft_client_with_router<R>(router: R, port: u16) -> RaftClient<StaticResolver, R>
fn get_raft_client<R, T>(router: R, resolver: T) -> RaftClient<T, R>
where
R: RaftStoreRouter<RocksEngine> + Unpin + 'static,
T: StoreAddrResolver + 'static,
{
let env = Arc::new(Environment::new(2));
let cfg = Arc::new(Config::default());
let security_mgr = Arc::new(SecurityManager::new(&SecurityConfig::default()).unwrap());
let resolver = StaticResolver { port };
let worker = LazyWorker::new("test-raftclient");
let builder =
ConnectionBuilder::new(env, cfg, security_mgr, resolver, router, worker.scheduler());
RaftClient::new(builder)
}

pub fn get_raft_client(port: u16) -> RaftClient<StaticResolver, RaftStoreBlackHole> {
get_raft_client_with_router(RaftStoreBlackHole, port)
fn get_raft_client_by_port(port: u16) -> RaftClient<StaticResolver, RaftStoreBlackHole> {
get_raft_client(RaftStoreBlackHole, StaticResolver::new(port))
}

#[derive(Clone)]
Expand Down Expand Up @@ -127,7 +136,7 @@ fn test_batch_raft_fallback() {
let service = MockKvForRaft::new(Arc::clone(&msg_count), Arc::clone(&batch_msg_count), false);
let (mock_server, port) = create_mock_server(service, 60000, 60100).unwrap();

let mut raft_client = get_raft_client(port);
let mut raft_client = get_raft_client_by_port(port);
(0..100).for_each(|_| {
raft_client.send(RaftMessage::default()).unwrap();
thread::sleep(time::Duration::from_millis(10));
Expand All @@ -150,7 +159,7 @@ fn test_raft_client_reconnect() {
let (tx, rx) = mpsc::channel();
let (significant_msg_sender, _significant_msg_receiver) = mpsc::channel();
let router = TestRaftStoreRouter::new(tx, significant_msg_sender);
let mut raft_client = get_raft_client_with_router(router, port);
let mut raft_client = get_raft_client(router, StaticResolver::new(port));
(0..50).for_each(|_| raft_client.send(RaftMessage::default()).unwrap());
raft_client.flush();

Expand Down Expand Up @@ -186,7 +195,7 @@ fn test_batch_size_limit() {
let service = MockKvForRaft::new(Arc::clone(&msg_count), Arc::clone(&batch_msg_count), true);
let (mock_server, port) = create_mock_server(service, 60200, 60300).unwrap();

let mut raft_client = get_raft_client(port);
let mut raft_client = get_raft_client_by_port(port);

// `send` should success.
for _ in 0..10 {
Expand Down Expand Up @@ -251,3 +260,60 @@ fn check_msg_count(max_delay_ms: u64, count: &AtomicUsize, expected: usize) {
}
panic!("check_msg_count wants {}, gets {}", expected, got);
}

/// Check if raft client can add tombstone stores in block list.
#[test]
fn test_tombstone_block_list() {
let pd_server = test_pd::Server::new(1);
let eps = pd_server.bind_addrs();
let pd_client = Arc::new(test_pd::util::new_client(eps, None));
let bg_worker = WorkerBuilder::new(thd_name!("background"))
.thread_count(2)
.create();
let resolver = resolve::new_resolver(pd_client, &bg_worker, RaftStoreBlackHole).0;

let msg_count = Arc::new(AtomicUsize::new(0));
let batch_msg_count = Arc::new(AtomicUsize::new(0));
let service = MockKvForRaft::new(Arc::clone(&msg_count), Arc::clone(&batch_msg_count), true);
let (_mock_server, port) = create_mock_server(service, 60200, 60300).unwrap();

let mut raft_client = get_raft_client(RaftStoreBlackHole, resolver);

let mut store1 = metapb::Store::default();
store1.set_id(1);
store1.set_address(format!("127.0.0.1:{}", port));
pd_server.default_handler().add_store(store1.clone());

// `send` should success.
for _ in 0..10 {
// 5M per RaftMessage.
let mut raft_m = RaftMessage::default();
raft_m.mut_to_peer().set_store_id(1);
for _ in 0..(5 * 1024) {
let mut e = Entry::default();
e.set_data(vec![b'a'; 1024]);
raft_m.mut_message().mut_entries().push(e);
}
raft_client.send(raft_m).unwrap();
}
raft_client.flush();

check_msg_count(500, &msg_count, 10);

let mut store2 = metapb::Store::default();
store2.set_id(2);
store2.set_address(store1.get_address().to_owned());
store2.set_state(metapb::StoreState::Tombstone);
pd_server.default_handler().add_store(store2);
let mut message = RaftMessage::default();
message.mut_to_peer().set_store_id(2);
// First message should be OK.
raft_client.send(message.clone()).unwrap();
// Wait some time for the resolve result.
thread::sleep(time::Duration::from_millis(50));
// Second message should fail as the store should be added to block list.
assert_eq!(
DiscardReason::Disconnected,
raft_client.send(message).unwrap_err()
);
}

0 comments on commit 7b260d1

Please sign in to comment.