Skip to content

Commit

Permalink
raftstore: remove the local reader thread (tikv#4558)
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers authored and hicqu committed May 23, 2019
1 parent fa292bc commit fdbaf9b
Show file tree
Hide file tree
Showing 14 changed files with 695 additions and 719 deletions.
13 changes: 6 additions & 7 deletions components/test_raftstore/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use tikv::server::transport::{RaftStoreRouter, ServerRaftStoreRouter};
use tikv::server::Node;
use tikv::server::Result as ServerResult;
use tikv_util::collections::{HashMap, HashSet};
use tikv_util::worker::{FutureWorker, Worker};
use tikv_util::worker::FutureWorker;

use super::*;
use tikv::raftstore::store::fsm::store::{StoreMeta, PENDING_VOTES_CAP};

pub struct ChannelTransportCore {
snap_paths: HashMap<u64, (SnapManager, TempDir)>,
Expand Down Expand Up @@ -164,10 +165,6 @@ impl Simulator for NodeCluster {
assert!(node_id == 0 || !self.nodes.contains_key(&node_id));
let pd_worker = FutureWorker::new("test-pd-worker");

// Create localreader.
let local_reader = Worker::new("test-local-reader");
let local_ch = local_reader.scheduler();

let simulate_trans = SimulateTransport::new(self.trans.clone());
let mut node = Node::new(
system,
Expand Down Expand Up @@ -206,12 +203,14 @@ impl Simulator for NodeCluster {
Arc::new(SSTImporter::new(dir).unwrap())
};

let store_meta = Arc::new(Mutex::new(StoreMeta::new(PENDING_VOTES_CAP)));
let local_reader = LocalReader::new(engines.kv.clone(), store_meta.clone(), router.clone());
node.start(
engines.clone(),
simulate_trans.clone(),
snap_mgr.clone(),
pd_worker,
local_reader,
store_meta,
coprocessor_host,
importer,
)?;
Expand Down Expand Up @@ -239,7 +238,7 @@ impl Simulator for NodeCluster {
.insert(node_id, (snap_mgr, tmp));
}

let router = ServerRaftStoreRouter::new(router.clone(), local_ch);
let router = ServerRaftStoreRouter::new(router, local_reader);
self.trans
.core
.lock()
Expand Down
25 changes: 13 additions & 12 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.

use std::path::Path;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use std::{thread, usize};

Expand All @@ -16,10 +16,9 @@ use tikv::coprocessor;
use tikv::import::{ImportSSTService, SSTImporter};
use tikv::raftstore::coprocessor::{CoprocessorHost, RegionInfoAccessor};
use tikv::raftstore::store::fsm::{RaftBatchSystem, RaftRouter};
use tikv::raftstore::store::{Callback, SnapManager};
use tikv::raftstore::store::{Callback, LocalReader, SnapManager};
use tikv::raftstore::Result;
use tikv::server::load_statistics::ThreadLoad;
use tikv::server::readpool;
use tikv::server::resolve::{self, Task as ResolveTask};
use tikv::server::transport::ServerRaftStoreRouter;
use tikv::server::transport::{RaftStoreBlackHole, RaftStoreRouter};
Expand All @@ -29,12 +28,13 @@ use tikv::server::{
ServerTransport,
};

use tikv::storage::RaftKv;
use tikv::storage::{self, RaftKv};
use tikv_util::collections::{HashMap, HashSet};
use tikv_util::security::SecurityManager;
use tikv_util::worker::{FutureWorker, Worker};

use super::*;
use tikv::raftstore::store::fsm::store::{StoreMeta, PENDING_VOTES_CAP};

type SimulateStoreTransport = SimulateTransport<ServerRaftStoreRouter>;
type SimulateServerTransport =
Expand Down Expand Up @@ -120,16 +120,17 @@ impl Simulator for ServerCluster {
cfg.server.addr = addr.clone();
}

// Create localreader.
let local_reader = Worker::new("test-local-reader");
let local_ch = local_reader.scheduler();

let raft_router = ServerRaftStoreRouter::new(router.clone(), local_ch);
let store_meta = Arc::new(Mutex::new(StoreMeta::new(PENDING_VOTES_CAP)));
let local_reader = LocalReader::new(engines.kv.clone(), store_meta.clone(), router.clone());
let raft_router = ServerRaftStoreRouter::new(router.clone(), local_reader);
let sim_router = SimulateTransport::new(raft_router);

let raft_engine = RaftKv::new(sim_router.clone());

// Create storage.
let pd_worker = FutureWorker::new("test-pd-worker");
let storage_read_pool = readpool::Builder::build_for_test();
let storage_read_pool =
storage::readpool_impl::build_read_pool_for_test(raft_engine.clone());
let store = create_raft_storage(
RaftKv::new(sim_router.clone()),
&cfg.storage,
Expand All @@ -139,7 +140,7 @@ impl Simulator for ServerCluster {
None,
None,
)?;
self.storages.insert(node_id, store.get_engine());
self.storages.insert(node_id, raft_engine);

// Create import service.
let importer = {
Expand Down Expand Up @@ -216,7 +217,7 @@ impl Simulator for ServerCluster {
simulate_trans.clone(),
snap_mgr.clone(),
pd_worker,
local_reader,
store_meta,
coprocessor_host,
importer,
)?;
Expand Down
24 changes: 9 additions & 15 deletions src/binutil/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,32 @@ use crate::fatal;
use crate::import::{ImportSSTService, SSTImporter};
use crate::pd::{PdClient, RpcClient};
use crate::raftstore::coprocessor::{CoprocessorHost, RegionInfoAccessor};
use crate::raftstore::store::fsm;
use crate::raftstore::store::fsm::store::{StoreMeta, PENDING_VOTES_CAP};
use crate::raftstore::store::{fsm, LocalReader};
use crate::raftstore::store::{new_compaction_listener, SnapManagerBuilder};
use crate::server::resolve;
use crate::server::status_server::StatusServer;
use crate::server::transport::ServerRaftStoreRouter;
use crate::server::DEFAULT_CLUSTER_ID;
use crate::server::{create_raft_storage, Node, Server};
use crate::storage::kv::raftkv::RaftKv;
use crate::storage::lock_manager::{
Detector, DetectorScheduler, Service as DeadlockService, WaiterManager, WaiterMgrScheduler,
};
use crate::storage::{self, AutoGCConfig, DEFAULT_ROCKSDB_SUB_DIR};
use crate::storage::{self, AutoGCConfig, RaftKv, DEFAULT_ROCKSDB_SUB_DIR};
use engine::rocks;
use engine::rocks::util::metrics_flusher::{MetricsFlusher, DEFAULT_FLUSHER_INTERVAL};
use engine::rocks::util::security::encrypted_env_from_cipher_file;
use engine::Engines;
use fs2::FileExt;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;
use std::usize;
use tikv_util::check_environment_variables;
use tikv_util::security::SecurityManager;
use tikv_util::time::Monitor;
use tikv_util::worker::{Builder, FutureWorker};
use tikv_util::worker::FutureWorker;

const RESERVED_OPEN_FDS: u64 = 1000;

Expand Down Expand Up @@ -115,14 +114,6 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
// Initialize raftstore channels.
let (router, system) = fsm::create_raft_batch_system(&cfg.raft_store);

// Create Local Reader.
let local_reader = Builder::new("local-reader")
.batch_size(cfg.raft_store.local_read_batch_size as usize)
.create();
let local_ch = local_reader.scheduler();

// Create router.
let raft_router = ServerRaftStoreRouter::new(router.clone(), local_ch);
let compaction_listener = new_compaction_listener(router.clone());

// Create pd client and pd worker
Expand Down Expand Up @@ -185,6 +176,9 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
.unwrap_or_else(|s| fatal!("failed to create kv engine: {}", s));

let engines = Engines::new(Arc::new(kv_engine), Arc::new(raft_engine), cache.is_some());
let store_meta = Arc::new(Mutex::new(StoreMeta::new(PENDING_VOTES_CAP)));
let local_reader = LocalReader::new(engines.kv.clone(), store_meta.clone(), router.clone());
let raft_router = ServerRaftStoreRouter::new(router.clone(), local_reader);

let engine = RaftKv::new(raft_router.clone());

Expand Down Expand Up @@ -284,7 +278,7 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
trans,
snap_mgr,
pd_worker,
local_reader,
store_meta,
coprocessor_host,
importer,
)
Expand Down
47 changes: 9 additions & 38 deletions src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::raftstore::store::peer_storage::{ApplySnapResult, InvokeContext};
use crate::raftstore::store::transport::Transport;
use crate::raftstore::store::util::KeysInfoFormatter;
use crate::raftstore::store::worker::{
CleanupSSTTask, ConsistencyCheckTask, RaftlogGcTask, ReadTask, RegionTask, SplitCheckTask,
CleanupSSTTask, ConsistencyCheckTask, RaftlogGcTask, ReadDelegate, RegionTask, SplitCheckTask,
};
use crate::raftstore::store::{
util, CasualMessage, Config, PeerMsg, PeerTicks, RaftCommand, SignificantMsg, SnapKey,
Expand Down Expand Up @@ -1262,18 +1262,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
meta.merge_locks.remove(&region_id);

// Destroy read delegates.
if self
.ctx
.local_reader
.schedule(ReadTask::destroy(region_id))
.is_err()
{
info!(
"unable to destroy read delegate, are we shutting down?";
"region_id" => self.fsm.region_id(),
"peer_id" => self.fsm.peer_id(),
);
}
meta.readers.remove(&region_id);
self.ctx
.apply_router
.schedule_task(region_id, ApplyTask::destroy(region_id));
Expand Down Expand Up @@ -1327,12 +1316,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}
{
let mut meta = self.ctx.store_meta.lock().unwrap();
meta.set_region(
&self.ctx.coprocessor_host,
&self.ctx.local_reader,
cp.region,
&mut self.fsm.peer,
);
meta.set_region(&self.ctx.coprocessor_host, cp.region, &mut self.fsm.peer);
}

let peer_id = cp.peer.get_id();
Expand Down Expand Up @@ -1428,12 +1412,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
let mut guard = self.ctx.store_meta.lock().unwrap();
let meta: &mut StoreMeta = &mut *guard;
let region_id = derived.get_id();
meta.set_region(
&self.ctx.coprocessor_host,
&self.ctx.local_reader,
derived,
&mut self.fsm.peer,
);
meta.set_region(&self.ctx.coprocessor_host, derived, &mut self.fsm.peer);
self.fsm.peer.post_split();
let is_leader = self.fsm.peer.is_leader();
if is_leader {
Expand Down Expand Up @@ -1537,6 +1516,8 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {

new_peer.peer.activate(self.ctx);
meta.regions.insert(new_region_id, new_region);
meta.readers
.insert(new_region_id, ReadDelegate::from_peer(new_peer.get_peer()));
if last_region_id == new_region_id {
// To prevent from big region, the right region needs run split
// check again after split.
Expand Down Expand Up @@ -1743,7 +1724,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
let mut meta = self.ctx.store_meta.lock().unwrap();
meta.set_region(
&self.ctx.coprocessor_host,
&self.ctx.local_reader,
region.clone(),
&mut self.fsm.peer,
);
Expand Down Expand Up @@ -1833,12 +1813,8 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
meta.region_ranges
.insert(enc_end_key(&region), region.get_id());
assert!(meta.regions.remove(&source.get_id()).is_some());
meta.set_region(
&self.ctx.coprocessor_host,
&self.ctx.local_reader,
region,
&mut self.fsm.peer,
);
assert!(meta.readers.remove(&source.get_id()).is_some());
meta.set_region(&self.ctx.coprocessor_host, region, &mut self.fsm.peer);
// make approximate size and keys updated in time.
// the reason why follower need to update is that there is a issue that after merge
// and then transfer leader, the new leader may have stale size and keys.
Expand Down Expand Up @@ -1889,12 +1865,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
{
let mut meta = self.ctx.store_meta.lock().unwrap();
if let Some(r) = region {
meta.set_region(
&self.ctx.coprocessor_host,
&self.ctx.local_reader,
r,
&mut self.fsm.peer,
);
meta.set_region(&self.ctx.coprocessor_host, r, &mut self.fsm.peer);
}
let region = self.fsm.peer.region();
let region_id = region.get_id();
Expand Down
Loading

0 comments on commit fdbaf9b

Please sign in to comment.