diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index 2590c9e87d0..61f47501ade 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -22,9 +22,10 @@ use tikv::server::transport::{RaftStoreRouter, ServerRaftStoreRouter}; use tikv::server::Node; use tikv::server::Result as ServerResult; use tikv_util::collections::{HashMap, HashSet}; -use tikv_util::worker::{FutureWorker, Worker}; +use tikv_util::worker::FutureWorker; use super::*; +use tikv::raftstore::store::fsm::store::{StoreMeta, PENDING_VOTES_CAP}; pub struct ChannelTransportCore { snap_paths: HashMap, @@ -164,10 +165,6 @@ impl Simulator for NodeCluster { assert!(node_id == 0 || !self.nodes.contains_key(&node_id)); let pd_worker = FutureWorker::new("test-pd-worker"); - // Create localreader. - let local_reader = Worker::new("test-local-reader"); - let local_ch = local_reader.scheduler(); - let simulate_trans = SimulateTransport::new(self.trans.clone()); let mut node = Node::new( system, @@ -206,12 +203,14 @@ impl Simulator for NodeCluster { Arc::new(SSTImporter::new(dir).unwrap()) }; + let store_meta = Arc::new(Mutex::new(StoreMeta::new(PENDING_VOTES_CAP))); + let local_reader = LocalReader::new(engines.kv.clone(), store_meta.clone(), router.clone()); node.start( engines.clone(), simulate_trans.clone(), snap_mgr.clone(), pd_worker, - local_reader, + store_meta, coprocessor_host, importer, )?; @@ -239,7 +238,7 @@ impl Simulator for NodeCluster { .insert(node_id, (snap_mgr, tmp)); } - let router = ServerRaftStoreRouter::new(router.clone(), local_ch); + let router = ServerRaftStoreRouter::new(router, local_reader); self.trans .core .lock() diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index 31c09086589..d4f8d20eeb8 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -1,7 +1,7 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. use std::path::Path; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use std::{thread, usize}; @@ -16,10 +16,9 @@ use tikv::coprocessor; use tikv::import::{ImportSSTService, SSTImporter}; use tikv::raftstore::coprocessor::{CoprocessorHost, RegionInfoAccessor}; use tikv::raftstore::store::fsm::{RaftBatchSystem, RaftRouter}; -use tikv::raftstore::store::{Callback, SnapManager}; +use tikv::raftstore::store::{Callback, LocalReader, SnapManager}; use tikv::raftstore::Result; use tikv::server::load_statistics::ThreadLoad; -use tikv::server::readpool; use tikv::server::resolve::{self, Task as ResolveTask}; use tikv::server::transport::ServerRaftStoreRouter; use tikv::server::transport::{RaftStoreBlackHole, RaftStoreRouter}; @@ -29,12 +28,13 @@ use tikv::server::{ ServerTransport, }; -use tikv::storage::RaftKv; +use tikv::storage::{self, RaftKv}; use tikv_util::collections::{HashMap, HashSet}; use tikv_util::security::SecurityManager; use tikv_util::worker::{FutureWorker, Worker}; use super::*; +use tikv::raftstore::store::fsm::store::{StoreMeta, PENDING_VOTES_CAP}; type SimulateStoreTransport = SimulateTransport; type SimulateServerTransport = @@ -120,16 +120,17 @@ impl Simulator for ServerCluster { cfg.server.addr = addr.clone(); } - // Create localreader. - let local_reader = Worker::new("test-local-reader"); - let local_ch = local_reader.scheduler(); - - let raft_router = ServerRaftStoreRouter::new(router.clone(), local_ch); + let store_meta = Arc::new(Mutex::new(StoreMeta::new(PENDING_VOTES_CAP))); + let local_reader = LocalReader::new(engines.kv.clone(), store_meta.clone(), router.clone()); + let raft_router = ServerRaftStoreRouter::new(router.clone(), local_reader); let sim_router = SimulateTransport::new(raft_router); + let raft_engine = RaftKv::new(sim_router.clone()); + // Create storage. let pd_worker = FutureWorker::new("test-pd-worker"); - let storage_read_pool = readpool::Builder::build_for_test(); + let storage_read_pool = + storage::readpool_impl::build_read_pool_for_test(raft_engine.clone()); let store = create_raft_storage( RaftKv::new(sim_router.clone()), &cfg.storage, @@ -139,7 +140,7 @@ impl Simulator for ServerCluster { None, None, )?; - self.storages.insert(node_id, store.get_engine()); + self.storages.insert(node_id, raft_engine); // Create import service. let importer = { @@ -216,7 +217,7 @@ impl Simulator for ServerCluster { simulate_trans.clone(), snap_mgr.clone(), pd_worker, - local_reader, + store_meta, coprocessor_host, importer, )?; diff --git a/src/binutil/server.rs b/src/binutil/server.rs index e67d45fbb27..8468d5d0bc2 100644 --- a/src/binutil/server.rs +++ b/src/binutil/server.rs @@ -9,18 +9,18 @@ use crate::fatal; use crate::import::{ImportSSTService, SSTImporter}; use crate::pd::{PdClient, RpcClient}; use crate::raftstore::coprocessor::{CoprocessorHost, RegionInfoAccessor}; -use crate::raftstore::store::fsm; +use crate::raftstore::store::fsm::store::{StoreMeta, PENDING_VOTES_CAP}; +use crate::raftstore::store::{fsm, LocalReader}; use crate::raftstore::store::{new_compaction_listener, SnapManagerBuilder}; use crate::server::resolve; use crate::server::status_server::StatusServer; use crate::server::transport::ServerRaftStoreRouter; use crate::server::DEFAULT_CLUSTER_ID; use crate::server::{create_raft_storage, Node, Server}; -use crate::storage::kv::raftkv::RaftKv; use crate::storage::lock_manager::{ Detector, DetectorScheduler, Service as DeadlockService, WaiterManager, WaiterMgrScheduler, }; -use crate::storage::{self, AutoGCConfig, DEFAULT_ROCKSDB_SUB_DIR}; +use crate::storage::{self, AutoGCConfig, RaftKv, DEFAULT_ROCKSDB_SUB_DIR}; use engine::rocks; use engine::rocks::util::metrics_flusher::{MetricsFlusher, DEFAULT_FLUSHER_INTERVAL}; use engine::rocks::util::security::encrypted_env_from_cipher_file; @@ -28,14 +28,13 @@ use engine::Engines; use fs2::FileExt; use std::fs::File; use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use std::time::Duration; -use std::usize; use tikv_util::check_environment_variables; use tikv_util::security::SecurityManager; use tikv_util::time::Monitor; -use tikv_util::worker::{Builder, FutureWorker}; +use tikv_util::worker::FutureWorker; const RESERVED_OPEN_FDS: u64 = 1000; @@ -115,14 +114,6 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc PeerFsmDelegate<'a, T, C> { meta.merge_locks.remove(®ion_id); // Destroy read delegates. - if self - .ctx - .local_reader - .schedule(ReadTask::destroy(region_id)) - .is_err() - { - info!( - "unable to destroy read delegate, are we shutting down?"; - "region_id" => self.fsm.region_id(), - "peer_id" => self.fsm.peer_id(), - ); - } + meta.readers.remove(®ion_id); self.ctx .apply_router .schedule_task(region_id, ApplyTask::destroy(region_id)); @@ -1327,12 +1316,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { } { let mut meta = self.ctx.store_meta.lock().unwrap(); - meta.set_region( - &self.ctx.coprocessor_host, - &self.ctx.local_reader, - cp.region, - &mut self.fsm.peer, - ); + meta.set_region(&self.ctx.coprocessor_host, cp.region, &mut self.fsm.peer); } let peer_id = cp.peer.get_id(); @@ -1428,12 +1412,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { let mut guard = self.ctx.store_meta.lock().unwrap(); let meta: &mut StoreMeta = &mut *guard; let region_id = derived.get_id(); - meta.set_region( - &self.ctx.coprocessor_host, - &self.ctx.local_reader, - derived, - &mut self.fsm.peer, - ); + meta.set_region(&self.ctx.coprocessor_host, derived, &mut self.fsm.peer); self.fsm.peer.post_split(); let is_leader = self.fsm.peer.is_leader(); if is_leader { @@ -1537,6 +1516,8 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { new_peer.peer.activate(self.ctx); meta.regions.insert(new_region_id, new_region); + meta.readers + .insert(new_region_id, ReadDelegate::from_peer(new_peer.get_peer())); if last_region_id == new_region_id { // To prevent from big region, the right region needs run split // check again after split. @@ -1743,7 +1724,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { let mut meta = self.ctx.store_meta.lock().unwrap(); meta.set_region( &self.ctx.coprocessor_host, - &self.ctx.local_reader, region.clone(), &mut self.fsm.peer, ); @@ -1833,12 +1813,8 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { meta.region_ranges .insert(enc_end_key(®ion), region.get_id()); assert!(meta.regions.remove(&source.get_id()).is_some()); - meta.set_region( - &self.ctx.coprocessor_host, - &self.ctx.local_reader, - region, - &mut self.fsm.peer, - ); + assert!(meta.readers.remove(&source.get_id()).is_some()); + meta.set_region(&self.ctx.coprocessor_host, region, &mut self.fsm.peer); // make approximate size and keys updated in time. // the reason why follower need to update is that there is a issue that after merge // and then transfer leader, the new leader may have stale size and keys. @@ -1889,12 +1865,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { { let mut meta = self.ctx.store_meta.lock().unwrap(); if let Some(r) = region { - meta.set_region( - &self.ctx.coprocessor_host, - &self.ctx.local_reader, - r, - &mut self.fsm.peer, - ); + meta.set_region(&self.ctx.coprocessor_host, r, &mut self.fsm.peer); } let region = self.fsm.peer.region(); let region_id = region.get_id(); diff --git a/src/raftstore/store/fsm/store.rs b/src/raftstore/store/fsm/store.rs index 454b156d782..1dccfd0193c 100644 --- a/src/raftstore/store/fsm/store.rs +++ b/src/raftstore/store/fsm/store.rs @@ -45,8 +45,8 @@ use crate::raftstore::store::transport::Transport; use crate::raftstore::store::util::is_initial_msg; use crate::raftstore::store::worker::{ CleanupSSTRunner, CleanupSSTTask, CompactRunner, CompactTask, ConsistencyCheckRunner, - ConsistencyCheckTask, LocalReader, RaftlogGcRunner, RaftlogGcTask, ReadTask, RegionRunner, - RegionTask, SplitCheckRunner, SplitCheckTask, + ConsistencyCheckTask, RaftlogGcRunner, RaftlogGcTask, ReadDelegate, RegionRunner, RegionTask, + SplitCheckRunner, SplitCheckTask, }; use crate::raftstore::store::{ util, Callback, CasualMessage, PeerMsg, RaftCommand, SignificantMsg, SnapManager, @@ -67,7 +67,7 @@ type Key = Vec; const KV_WB_SHRINK_SIZE: usize = 256 * 1024; const RAFT_WB_SHRINK_SIZE: usize = 1024 * 1024; -const PENDING_VOTES_CAP: usize = 20; +pub const PENDING_VOTES_CAP: usize = 20; const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); pub struct StoreInfo { @@ -76,10 +76,14 @@ pub struct StoreInfo { } pub struct StoreMeta { + /// store id + pub store_id: Option, /// region_end_key -> region_id pub region_ranges: BTreeMap, u64>, /// region_id -> region pub regions: HashMap, + /// region_id -> reader + pub readers: HashMap, /// `MsgRequestPreVote` or `MsgRequestVote` messages from newly split Regions shouldn't be dropped if there is no /// such Region in this store now. So the messages are recorded temporarily and will be handled later. pub pending_votes: RingQueue, @@ -99,10 +103,12 @@ pub struct StoreMeta { } impl StoreMeta { - fn new(vote_capacity: usize) -> StoreMeta { + pub fn new(vote_capacity: usize) -> StoreMeta { StoreMeta { + store_id: None, region_ranges: BTreeMap::default(), regions: HashMap::default(), + readers: HashMap::default(), pending_votes: RingQueue::with_capacity(vote_capacity), pending_snapshot_regions: Vec::default(), pending_merge_targets: HashMap::default(), @@ -115,7 +121,6 @@ impl StoreMeta { pub fn set_region( &mut self, host: &CoprocessorHost, - reader: &Scheduler, region: Region, peer: &mut crate::raftstore::store::Peer, ) { @@ -124,6 +129,7 @@ impl StoreMeta { // TODO: may not be a good idea to panic when holding a lock. panic!("{} region corrupted", peer.tag); } + let reader = self.readers.get_mut(®ion.get_id()).unwrap(); peer.set_region(host, reader, region); } } @@ -188,7 +194,6 @@ pub struct PollContext { pub consistency_check_scheduler: Scheduler, pub split_check_scheduler: Scheduler, pub cleanup_sst_scheduler: Scheduler, - pub local_reader: Scheduler, pub region_scheduler: Scheduler, pub apply_router: ApplyRouter, pub router: RaftRouter, @@ -671,7 +676,6 @@ pub struct RaftPollerBuilder { consistency_check_scheduler: Scheduler, split_check_scheduler: Scheduler, cleanup_sst_scheduler: Scheduler, - local_reader: Scheduler, pub region_scheduler: Scheduler, apply_router: ApplyRouter, pub router: RaftRouter, @@ -868,7 +872,6 @@ where consistency_check_scheduler: self.consistency_check_scheduler.clone(), split_check_scheduler: self.split_check_scheduler.clone(), cleanup_sst_scheduler: self.cleanup_sst_scheduler.clone(), - local_reader: self.local_reader.clone(), region_scheduler: self.region_scheduler.clone(), apply_router: self.apply_router.clone(), router: self.router.clone(), @@ -915,7 +918,6 @@ struct Workers { consistency_check_worker: Worker, split_check_worker: Worker, cleanup_sst_worker: Worker, - local_reader: Worker, region_worker: Worker, compact_worker: Worker, coprocessor_host: Arc, @@ -944,7 +946,7 @@ impl RaftBatchSystem { pd_client: Arc, mgr: SnapManager, pd_worker: FutureWorker, - local_reader: Worker, + store_meta: Arc>, mut coprocessor_host: CoprocessorHost, importer: Arc, ) -> Result<()> { @@ -963,7 +965,6 @@ impl RaftBatchSystem { raftlog_gc_worker: Worker::new("raft-gc-worker"), compact_worker: Worker::new("compact-worker"), pd_worker, - local_reader, consistency_check_worker: Worker::new("consistency-check"), cleanup_sst_worker: Worker::new("cleanup-sst"), coprocessor_host: Arc::new(coprocessor_host), @@ -985,14 +986,13 @@ impl RaftBatchSystem { consistency_check_scheduler: workers.consistency_check_worker.scheduler(), cleanup_sst_scheduler: workers.cleanup_sst_worker.scheduler(), apply_router: self.apply_router.clone(), - local_reader: workers.local_reader.scheduler(), trans, pd_client, coprocessor_host: workers.coprocessor_host.clone(), importer, snap_mgr: mgr, global_stat: GlobalStoreStat::default(), - store_meta: Arc::new(Mutex::new(StoreMeta::new(PENDING_VOTES_CAP))), + store_meta, applying_snap_count: Arc::new(AtomicUsize::new(0)), future_poller: workers.future_poller.sender().clone(), }; @@ -1024,6 +1024,15 @@ impl RaftBatchSystem { self.apply_system .schedule_all(region_peers.iter().map(|pair| pair.1.get_peer())); + { + let mut meta = builder.store_meta.lock().unwrap(); + for (_, peer_fsm) in ®ion_peers { + let peer = peer_fsm.get_peer(); + meta.readers + .insert(peer_fsm.region_id(), ReadDelegate::from_peer(peer)); + } + } + let router = Mutex::new(self.router.clone()); pd_client.handle_reconnect(move || { router @@ -1032,8 +1041,6 @@ impl RaftBatchSystem { .broadcast_normal(|| PeerMsg::HeartbeatPd); }); - let reader = LocalReader::new(&builder, region_peers.iter().map(|pair| pair.1.get_peer())); - let tag = format!("raftstore-{}", store.get_id()); self.system.spawn(tag, builder); let mut mailboxes = Vec::with_capacity(region_peers.len()); @@ -1055,15 +1062,12 @@ impl RaftBatchSystem { self.apply_system .spawn("apply".to_owned(), apply_poller_builder); - let timer = LocalReader::new_timer(); - box_try!(workers.local_reader.start_with_timer(reader, timer)); let split_check_runner = SplitCheckRunner::new( Arc::clone(&engines.kv), self.router.clone(), Arc::clone(&workers.coprocessor_host), ); - box_try!(workers.split_check_worker.start(split_check_runner)); let region_runner = RegionRunner::new( @@ -1126,7 +1130,6 @@ impl RaftBatchSystem { handles.push(workers.pd_worker.stop()); handles.push(workers.consistency_check_worker.stop()); handles.push(workers.cleanup_sst_worker.stop()); - handles.push(workers.local_reader.stop()); self.apply_system.shutdown(); self.system.shutdown(); for h in handles { diff --git a/src/raftstore/store/mod.rs b/src/raftstore/store/mod.rs index 7c70a0b6eb8..8357e272f10 100644 --- a/src/raftstore/store/mod.rs +++ b/src/raftstore/store/mod.rs @@ -42,7 +42,7 @@ pub use self::snap::{ SnapManagerBuilder, Snapshot, SnapshotDeleter, SnapshotStatistics, }; pub use self::transport::{CasualRouter, ProposalRouter, StoreRouter, Transport}; -pub use self::worker::{KeyEntry, ReadTask, RegionTask}; +pub use self::worker::{KeyEntry, LocalReader, RegionTask}; // Only used in tests #[cfg(test)] diff --git a/src/raftstore/store/peer.rs b/src/raftstore/store/peer.rs index b29e104d236..745a498e1ba 100644 --- a/src/raftstore/store/peer.rs +++ b/src/raftstore/store/peer.rs @@ -34,7 +34,7 @@ use crate::raftstore::store::fsm::{ apply, Apply, ApplyMetrics, ApplyTask, ApplyTaskRes, Proposal, RegionProposal, }; use crate::raftstore::store::keys::{enc_end_key, enc_start_key}; -use crate::raftstore::store::worker::{ReadProgress, ReadTask, RegionTask}; +use crate::raftstore::store::worker::{ReadDelegate, ReadProgress, RegionTask}; use crate::raftstore::store::{keys, Callback, Config, ReadResponse, RegionSnapshot}; use crate::raftstore::{Error, Result}; use tikv_util::collections::HashMap; @@ -387,19 +387,11 @@ impl Peer { Ok(peer) } - /// Register self to apply_scheduler and read_scheduler so that the peer is then usable. + /// Register self to apply_scheduler so that the peer is then usable. /// Also trigger `RegionChangeEvent::Create` here. pub fn activate(&self, ctx: &PollContext) { ctx.apply_router .schedule_task(self.region_id, ApplyTask::register(self)); - if let Err(e) = ctx.local_reader.schedule(ReadTask::register(self)) { - info!( - "failed to schedule local reader, are we shutting down?"; - "region_id" => self.region_id, - "peer_id" => self.peer.get_id(), - "err" => ?e, - ); - } ctx.coprocessor_host.on_region_changed( self.region(), @@ -530,7 +522,7 @@ impl Peer { pub fn set_region( &mut self, host: &CoprocessorHost, - local_reader: &Scheduler, + reader: &mut ReadDelegate, region: metapb::Region, ) { if self.region().get_region_epoch().get_version() < region.get_region_epoch().get_version() @@ -542,7 +534,7 @@ impl Peer { let progress = ReadProgress::region(region); // Always update read delegate's region to avoid stale region info after a follower // becoming a leader. - self.maybe_update_read_progress(local_reader, progress); + self.maybe_update_read_progress(reader, progress); if !self.pending_remove { host.on_region_changed(self.region(), RegionChangeEvent::Update, self.get_role()); @@ -851,9 +843,8 @@ impl Peer { // It is recommended to update the lease expiring time right after // this peer becomes leader because it's more convenient to do it here and // it has no impact on the correctness. - let progress = ReadProgress::term(self.term()); - self.maybe_update_read_progress(&ctx.local_reader, progress); - self.maybe_renew_leader_lease(&ctx.local_reader, monotonic_raw_now()); + let progress_term = ReadProgress::term(self.term()); + self.maybe_renew_leader_lease(monotonic_raw_now(), ctx, Some(progress_term)); debug!( "becomes leader with lease"; "region_id" => self.region_id, @@ -1124,6 +1115,9 @@ impl Peer { if apply_snap_result.is_some() { self.activate(ctx); + let mut meta = ctx.store_meta.lock().unwrap(); + meta.readers + .insert(self.region_id, ReadDelegate::from_peer(self)); } apply_snap_result @@ -1158,7 +1152,7 @@ impl Peer { if lease_to_be_updated { let propose_time = self.find_propose_time(entry.get_index(), entry.get_term()); if let Some(propose_time) = propose_time { - self.maybe_renew_leader_lease(&ctx.local_reader, propose_time); + self.maybe_renew_leader_lease(propose_time, ctx, None); lease_to_be_updated = false; } } @@ -1293,7 +1287,7 @@ impl Peer { if self.leader_lease.inspect(Some(propose_time)) == LeaseState::Suspect { return; } - self.maybe_renew_leader_lease(&ctx.local_reader, propose_time); + self.maybe_renew_leader_lease(propose_time, ctx, None); } } @@ -1347,7 +1341,9 @@ impl Peer { // Only leaders need to update applied_index_term. if progress_to_be_updated && self.is_leader() { let progress = ReadProgress::applied_index_term(applied_index_term); - self.maybe_update_read_progress(&ctx.local_reader, progress); + let mut meta = ctx.store_meta.lock().unwrap(); + let reader = meta.readers.get_mut(&self.region_id).unwrap(); + self.maybe_update_read_progress(reader, progress); } has_ready } @@ -1359,12 +1355,16 @@ impl Peer { } /// Try to renew leader lease. - fn maybe_renew_leader_lease(&mut self, reader: &Scheduler, ts: Timespec) { + fn maybe_renew_leader_lease( + &mut self, + ts: Timespec, + ctx: &mut PollContext, + progress: Option, + ) { // A nonleader peer should never has leader lease. - if !self.is_leader() { - return; - } - if self.is_splitting() { + let read_progress = if !self.is_leader() { + None + } else if self.is_splitting() { // A splitting leader should not renew its lease. // Because we split regions asynchronous, the leader may read stale results // if splitting runs slow on the leader. @@ -1373,9 +1373,8 @@ impl Peer { "region_id" => self.region_id, "peer_id" => self.peer.get_id(), ); - return; - } - if self.is_merging() { + None + } else if self.is_merging() { // A merging leader should not renew its lease. // Because we merge regions asynchronous, the leader may read stale results // if commit merge runs slow on sibling peers. @@ -1384,39 +1383,39 @@ impl Peer { "region_id" => self.region_id, "peer_id" => self.peer.get_id(), ); - return; + None + } else { + self.leader_lease.renew(ts); + let term = self.term(); + if let Some(remote_lease) = self.leader_lease.maybe_new_remote_lease(term) { + Some(ReadProgress::leader_lease(remote_lease)) + } else { + None + } + }; + if let Some(progress) = progress { + let mut meta = ctx.store_meta.lock().unwrap(); + let reader = meta.readers.get_mut(&self.region_id).unwrap(); + self.maybe_update_read_progress(reader, progress); } - self.leader_lease.renew(ts); - let term = self.term(); - if let Some(remote_lease) = self.leader_lease.maybe_new_remote_lease(term) { - let progress = ReadProgress::leader_lease(remote_lease); + if let Some(progress) = read_progress { + let mut meta = ctx.store_meta.lock().unwrap(); + let reader = meta.readers.get_mut(&self.region_id).unwrap(); self.maybe_update_read_progress(reader, progress); } } - fn maybe_update_read_progress( - &self, - local_reader: &Scheduler, - progress: ReadProgress, - ) { + fn maybe_update_read_progress(&self, reader: &mut ReadDelegate, progress: ReadProgress) { if self.pending_remove { return; } - let update = ReadTask::update(self.region_id, progress); debug!( "update read progress"; "region_id" => self.region_id, "peer_id" => self.peer.get_id(), - "update" => %update, + "progress" => ?progress, ); - if let Err(e) = local_reader.schedule(update) { - info!( - "failed to update read progress, are we shutting down?"; - "region_id" => self.region_id, - "peer_id" => self.peer.get_id(), - "err" => ?e, - ); - } + reader.update(progress); } pub fn maybe_campaign(&mut self, parent_is_leader: bool) -> bool { diff --git a/src/raftstore/store/util.rs b/src/raftstore/store/util.rs index 38c5d692cf1..78c47d5b422 100644 --- a/src/raftstore/store/util.rs +++ b/src/raftstore/store/util.rs @@ -427,10 +427,7 @@ impl Lease { term, }; // Clone the remote. - let remote_clone = RemoteLease { - expired_time: Arc::clone(&remote.expired_time), - term, - }; + let remote_clone = remote.clone(); self.remote = Some(remote); Some(remote_clone) } @@ -450,6 +447,7 @@ impl fmt::Debug for Lease { /// A remote lease, it can only be derived by `Lease`. It will be sent /// to the local read thread, so name it remote. If Lease expires, the remote must /// expire too. +#[derive(Clone)] pub struct RemoteLease { expired_time: Arc, term: u64, diff --git a/src/raftstore/store/worker/mod.rs b/src/raftstore/store/worker/mod.rs index 957d776f37e..71c4232adf5 100644 --- a/src/raftstore/store/worker/mod.rs +++ b/src/raftstore/store/worker/mod.rs @@ -13,7 +13,7 @@ pub use self::cleanup_sst::{Runner as CleanupSSTRunner, Task as CleanupSSTTask}; pub use self::compact::{Runner as CompactRunner, Task as CompactTask}; pub use self::consistency_check::{Runner as ConsistencyCheckRunner, Task as ConsistencyCheckTask}; pub use self::raftlog_gc::{Runner as RaftlogGcRunner, Task as RaftlogGcTask}; -pub use self::read::{LocalReader, Progress as ReadProgress, Task as ReadTask}; +pub use self::read::{LocalReader, Progress as ReadProgress, ReadDelegate}; pub use self::region::{ Runner as RegionRunner, Task as RegionTask, PENDING_APPLY_CHECK_INTERVAL, STALE_PEER_CHECK_INTERVAL, diff --git a/src/raftstore/store/worker/read.rs b/src/raftstore/store/worker/read.rs index 1ea2cb73c9e..3846c22b0da 100644 --- a/src/raftstore/store/worker/read.rs +++ b/src/raftstore/store/worker/read.rs @@ -1,8 +1,8 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. -use std::cell::RefCell; +use std::cell::{Cell, RefCell}; use std::fmt::{self, Display, Formatter}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use crossbeam::TrySendError; @@ -13,7 +13,7 @@ use prometheus::local::LocalHistogram; use time::Timespec; use crate::raftstore::errors::RAFTSTORE_IS_BUSY; -use crate::raftstore::store::fsm::{RaftPollerBuilder, RaftRouter}; +use crate::raftstore::store::fsm::RaftRouter; use crate::raftstore::store::util::{self, LeaseState, RemoteLease}; use crate::raftstore::store::{ cmd_resp, Peer, ProposalRouter, RaftCommand, ReadExecutor, ReadResponse, RequestInspector, @@ -22,14 +22,13 @@ use crate::raftstore::store::{ use crate::raftstore::Result; use engine::DB; use tikv_util::collections::HashMap; -use tikv_util::time::duration_to_sec; -use tikv_util::timer::Timer; -use tikv_util::worker::{Runnable, RunnableWithTimer}; +use tikv_util::time::Instant; use super::metrics::*; +use crate::raftstore::store::fsm::store::StoreMeta; /// A read only delegate of `Peer`. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ReadDelegate { region: metapb::Region, peer_id: u64, @@ -42,7 +41,7 @@ pub struct ReadDelegate { } impl ReadDelegate { - fn from_peer(peer: &Peer) -> ReadDelegate { + pub fn from_peer(peer: &Peer) -> ReadDelegate { let region = peer.region().clone(); let region_id = region.get_id(); let peer_id = peer.peer.get_id(); @@ -57,7 +56,7 @@ impl ReadDelegate { } } - fn update(&mut self, progress: Progress) { + pub fn update(&mut self, progress: Progress) { match progress { Progress::Region(region) => { self.region = region; @@ -150,111 +149,30 @@ impl Progress { } } -pub enum Task { - Register(ReadDelegate), - Update((u64, Progress)), - Read(RaftCommand), - Destroy(u64), -} - -impl Task { - pub fn register(peer: &Peer) -> Task { - let delegate = ReadDelegate::from_peer(peer); - Task::Register(delegate) - } - - pub fn update(region_id: u64, progress: Progress) -> Task { - Task::Update((region_id, progress)) - } - - pub fn destroy(region_id: u64) -> Task { - Task::Destroy(region_id) - } - - #[inline] - pub fn read(cmd: RaftCommand) -> Task { - Task::Read(cmd) - } - - /// Task accepts `RaftCmdRequest`s that contain Get/Snap requests. - /// Returns `true`, it can be saftly sent to localreader, - /// Returns `false`, it must not be sent to localreader. - #[inline] - pub fn acceptable(request: &RaftCmdRequest) -> bool { - if request.has_admin_request() || request.has_status_request() { - false - } else { - for r in request.get_requests() { - match r.get_cmd_type() { - CmdType::Get | CmdType::Snap => (), - CmdType::Delete - | CmdType::Put - | CmdType::DeleteRange - | CmdType::Prewrite - | CmdType::IngestSST - | CmdType::ReadIndex - | CmdType::Invalid => return false, - } - } - true - } - } -} - -impl Display for Task { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match *self { - Task::Register(ref delegate) => write!(f, "localreader Task::Register {:?}", delegate), - Task::Read(ref cmd) => write!(f, "localreader Task::Read {:?}", cmd.request), - Task::Update(ref progress) => write!(f, "localreader Task::Update {:?}", progress), - Task::Destroy(region_id) => write!(f, "localreader Task::Destroy region {}", region_id), - } - } -} - pub struct LocalReader { - store_id: u64, + store_id: Cell>, + store_meta: Arc>, kv_engine: Arc, metrics: RefCell, // region id -> ReadDelegate - delegates: HashMap, + delegates: RefCell>>, // A channel to raftstore. router: C, tag: String, } impl LocalReader { - pub fn new<'a, T, P>( - builder: &RaftPollerBuilder, - peers: impl Iterator, - ) -> Self { - let mut delegates = - HashMap::with_capacity_and_hasher(peers.size_hint().0, Default::default()); - for p in peers { - let delegate = ReadDelegate::from_peer(p); - info!( - "create ReadDelegate"; - "tag" => &delegate.tag, - "peer" => delegate.peer_id, - ); - delegates.insert(p.region().get_id(), delegate); - } - let store_id = builder.store.get_id(); + pub fn new(kv_engine: Arc, store_meta: Arc>, router: RaftRouter) -> Self { LocalReader { - delegates, - store_id, - kv_engine: builder.engines.kv.clone(), - router: builder.router.clone(), + store_meta, + kv_engine, + router, + store_id: Cell::new(None), metrics: Default::default(), - tag: format!("[store {}]", store_id), + delegates: RefCell::new(HashMap::default()), + tag: "[local_reader]".to_string(), } } - - pub fn new_timer() -> Timer<()> { - let mut timer = Timer::new(1); - timer.add_task(Duration::from_millis(METRICS_FLUSH_INTERVAL), ()); - timer - } } impl LocalReader { @@ -289,12 +207,15 @@ impl LocalReader { cmd.callback.invoke_read(read_resp); } - fn pre_propose_raft_command<'a>( - &'a self, - req: &RaftCmdRequest, - ) -> Result> { + fn pre_propose_raft_command(&self, req: &RaftCmdRequest) -> Result> { // Check store id. - if let Err(e) = util::check_store_id(req, self.store_id) { + if self.store_id.get().is_none() { + let store_id = self.store_meta.lock().unwrap().store_id; + self.store_id.set(store_id); + } + let store_id = self.store_id.get().unwrap(); + + if let Err(e) = util::check_store_id(req, store_id) { self.metrics.borrow_mut().rejected_by_store_id_mismatch += 1; debug!("rejected by store id not match"; "err" => %e); return Err(e); @@ -302,14 +223,17 @@ impl LocalReader { // Check region id. let region_id = req.get_header().get_region_id(); - let delegate = match self.delegates.get(®ion_id) { + let delegate = match self.delegates.borrow_mut().get_mut(®ion_id) { Some(delegate) => { fail_point!("localreader_on_find_delegate"); - delegate + match delegate.take() { + Some(d) => d, + None => return Ok(None), + } } None => { - self.metrics.borrow_mut().rejected_by_no_region += 1; - debug!("rejected by no region"; "region_id" => region_id); + self.metrics.borrow_mut().rejected_by_cache_miss += 1; + debug!("rejected by cache miss"; "region_id" => region_id); return Ok(None); } }; @@ -339,7 +263,7 @@ impl LocalReader { } let mut inspector = Inspector { - delegate, + delegate: &delegate, metrics: &mut *self.metrics.borrow_mut(), }; match inspector.inspect(req) { @@ -351,32 +275,108 @@ impl LocalReader { } // It can only handle read command. - fn propose_raft_command(&mut self, cmd: RaftCommand, executor: &mut ReadExecutor) { + pub fn propose_raft_command(&self, cmd: RaftCommand) { let region_id = cmd.request.get_header().get_region_id(); - match self.pre_propose_raft_command(&cmd.request) { - Ok(Some(delegate)) => { - let mut metrics = self.metrics.borrow_mut(); - if let Some(resp) = delegate.handle_read(&cmd.request, executor, &mut *metrics) { - cmd.callback.invoke_read(resp); + let mut executor = ReadExecutor::new( + self.kv_engine.clone(), + false, /* dont check region epoch */ + true, /* we need snapshot time */ + ); + + loop { + match self.pre_propose_raft_command(&cmd.request) { + Ok(Some(delegate)) => { + let mut metrics = self.metrics.borrow_mut(); + if let Some(resp) = + delegate.handle_read(&cmd.request, &mut executor, &mut *metrics) + { + cmd.callback.invoke_read(resp); + self.delegates + .borrow_mut() + .insert(region_id, Some(delegate)); + return; + } + break; + } + // It can not handle the request, forwards to raftstore. + Ok(None) => { + if self.delegates.borrow().get(®ion_id).is_some() { + break; + } + let meta = self.store_meta.lock().unwrap(); + match meta.readers.get(®ion_id).cloned() { + Some(reader) => { + self.delegates.borrow_mut().insert(region_id, Some(reader)); + } + None => { + self.metrics.borrow_mut().rejected_by_no_region += 1; + debug!("rejected by no region"; "region_id" => region_id); + break; + } + } + } + Err(e) => { + let mut response = cmd_resp::new_error(e); + if let Some(Some(ref delegate)) = self.delegates.borrow().get(®ion_id) { + cmd_resp::bind_term(&mut response, delegate.term); + } + cmd.callback.invoke_read(ReadResponse { + response, + snapshot: None, + }); + self.delegates.borrow_mut().remove(®ion_id); return; } } - // It can not handle the rquest, forwards to raftstore. - Ok(None) => {} - Err(e) => { - let mut response = cmd_resp::new_error(e); - if let Some(delegate) = self.delegates.get(®ion_id) { - cmd_resp::bind_term(&mut response, delegate.term); + } + // Remove delegate for updating it by next cmd execution. + self.delegates.borrow_mut().remove(®ion_id); + // Forward to raftstore. + self.redirect(cmd); + } + + #[inline] + pub fn execute_raft_command(&self, cmd: RaftCommand) { + self.propose_raft_command(cmd); + self.metrics.borrow_mut().maybe_flush(); + } + + /// Task accepts `RaftCmdRequest`s that contain Get/Snap requests. + /// Returns `true`, it can be safely sent to localreader, + /// Returns `false`, it must not be sent to localreader. + #[inline] + pub fn acceptable(request: &RaftCmdRequest) -> bool { + if request.has_admin_request() || request.has_status_request() { + false + } else { + for r in request.get_requests() { + match r.get_cmd_type() { + CmdType::Get | CmdType::Snap => (), + CmdType::Delete + | CmdType::Put + | CmdType::DeleteRange + | CmdType::Prewrite + | CmdType::IngestSST + | CmdType::ReadIndex + | CmdType::Invalid => return false, } - cmd.callback.invoke_read(ReadResponse { - response, - snapshot: None, - }); - return; } + true } + } +} - self.redirect(cmd); +impl Clone for LocalReader { + fn clone(&self) -> Self { + LocalReader { + store_meta: self.store_meta.clone(), + kv_engine: self.kv_engine.clone(), + router: self.router.clone(), + store_id: self.store_id.clone(), + metrics: Default::default(), + delegates: RefCell::new(HashMap::default()), + tag: self.tag.clone(), + } } } @@ -414,69 +414,9 @@ impl<'r, 'm> RequestInspector for Inspector<'r, 'm> { } } -impl Runnable for LocalReader { - fn run_batch(&mut self, tasks: &mut Vec) { - self.metrics - .borrow() - .batch_requests_size - .observe(tasks.len() as _); - - let mut sent = None; - let mut executor = ReadExecutor::new( - self.kv_engine.clone(), - false, /* dont check region epoch */ - true, /* we need snapshot time */ - ); - - for task in tasks.drain(..) { - match task { - Task::Register(delegate) => { - info!("register ReadDelegate"; "tag" => &delegate.tag); - self.delegates.insert(delegate.region.get_id(), delegate); - } - Task::Read(cmd) => { - if sent.is_none() { - sent = Some(cmd.send_time); - } - self.propose_raft_command(cmd, &mut executor); - } - Task::Update((region_id, progress)) => { - if let Some(delegate) = self.delegates.get_mut(®ion_id) { - delegate.update(progress); - } else { - warn!( - "update unregistered ReadDelegate"; - "region_id" => region_id, - "progress" => ?progress, - ); - } - } - Task::Destroy(region_id) => { - if let Some(delegate) = self.delegates.remove(®ion_id) { - info!("destroy ReadDelegate"; "tag" => &delegate.tag); - } - } - } - } - - if let Some(send_time) = sent { - self.metrics - .borrow_mut() - .requests_wait_duration - .observe(duration_to_sec(send_time.elapsed())); - } - } -} - const METRICS_FLUSH_INTERVAL: u64 = 15_000; // 15s -impl RunnableWithTimer for LocalReader { - fn on_timeout(&mut self, timer: &mut Timer<()>, _: ()) { - self.metrics.borrow_mut().flush(); - timer.add_task(Duration::from_millis(METRICS_FLUSH_INTERVAL), ()); - } -} - +#[derive(Clone)] struct ReadMetrics { requests_wait_duration: LocalHistogram, batch_requests_size: LocalHistogram, @@ -491,6 +431,9 @@ struct ReadMetrics { rejected_by_epoch: i64, rejected_by_appiled_term: i64, rejected_by_channel_full: i64, + rejected_by_cache_miss: i64, + + last_flush_time: Instant, } impl Default for ReadMetrics { @@ -507,11 +450,20 @@ impl Default for ReadMetrics { rejected_by_epoch: 0, rejected_by_appiled_term: 0, rejected_by_channel_full: 0, + rejected_by_cache_miss: 0, + last_flush_time: Instant::now(), } } } impl ReadMetrics { + pub fn maybe_flush(&mut self) { + if self.last_flush_time.elapsed() >= Duration::from_millis(METRICS_FLUSH_INTERVAL) { + self.flush(); + self.last_flush_time = Instant::now(); + } + } + fn flush(&mut self) { self.requests_wait_duration.flush(); self.batch_requests_size.flush(); @@ -569,6 +521,12 @@ impl ReadMetrics { .inc_by(self.rejected_by_channel_full); self.rejected_by_channel_full = 0; } + if self.rejected_by_cache_miss > 0 { + LOCAL_READ_REJECT + .with_label_values(&["cache_miss"]) + .inc_by(self.rejected_by_cache_miss); + self.rejected_by_cache_miss = 0; + } } } @@ -592,6 +550,7 @@ mod tests { fn new_reader( path: &str, store_id: u64, + store_meta: Arc>, ) -> ( TempDir, LocalReader>, @@ -602,10 +561,11 @@ mod tests { rocks::util::new_engine(path.path().to_str().unwrap(), None, ALL_CFS, None).unwrap(); let (ch, rx) = sync_channel(1); let reader = LocalReader { - store_id, + store_meta, + store_id: Cell::new(Some(store_id)), router: ch, kv_engine: Arc::new(db), - delegates: HashMap::default(), + delegates: RefCell::new(HashMap::default()), metrics: Default::default(), tag: "foo".to_owned(), }; @@ -629,13 +589,13 @@ mod tests { rx: &Receiver, cmd: RaftCmdRequest, ) { - let task = Task::read(RaftCommand::new( + let task = RaftCommand::new( cmd.clone(), Callback::Read(Box::new(|resp| { panic!("unexpected invoke, {:?}", resp); })), - )); - reader.run_batch(&mut vec![task]); + ); + reader.propose_raft_command(task); assert_eq!( rx.recv_timeout(Duration::seconds(5).to_std().unwrap()) .unwrap() @@ -644,10 +604,20 @@ mod tests { ); } + fn must_not_redirect( + reader: &mut LocalReader>, + rx: &Receiver, + task: RaftCommand, + ) { + reader.propose_raft_command(task); + assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); + } + #[test] fn test_read() { let store_id = 2; - let (_tmp, mut reader, rx) = new_reader("test-local-reader", store_id); + let store_meta = Arc::new(Mutex::new(StoreMeta::new(0))); + let (_tmp, mut reader, rx) = new_reader("test-local-reader", store_id, store_meta.clone()); // region: 1, // peers: 2, 3, 4, @@ -684,49 +654,57 @@ mod tests { // The region is not register yet. must_redirect(&mut reader, &rx, cmd.clone()); assert_eq!(reader.metrics.borrow().rejected_by_no_region, 1); + assert_eq!(reader.metrics.borrow().rejected_by_cache_miss, 1); // Register region 1 lease.renew(monotonic_raw_now()); let remote = lease.maybe_new_remote_lease(term6).unwrap(); // But the applied_index_term is stale. - let register_region1 = Task::Register(ReadDelegate { - tag: String::new(), - region: region1.clone(), - peer_id: leader2.get_id(), - term: term6, - applied_index_term: term6 - 1, - leader_lease: Some(remote), - last_valid_ts: RefCell::new(Timespec::new(0, 0)), - }); - reader.run_batch(&mut vec![register_region1]); - assert!(reader.delegates.get(&1).is_some()); - assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); + { + let mut meta = store_meta.lock().unwrap(); + let read_delegate = ReadDelegate { + tag: String::new(), + region: region1.clone(), + peer_id: leader2.get_id(), + term: term6, + applied_index_term: term6 - 1, + leader_lease: Some(remote), + last_valid_ts: RefCell::new(Timespec::new(0, 0)), + }; + meta.readers.insert(1, read_delegate); + } // The applied_index_term is stale must_redirect(&mut reader, &rx, cmd.clone()); + assert_eq!(reader.metrics.borrow().rejected_by_cache_miss, 2); assert_eq!(reader.metrics.borrow().rejected_by_appiled_term, 1); + assert!(reader.delegates.borrow().get(&1).is_none()); // Make the applied_index_term matches current term. let pg = Progress::applied_index_term(term6); - let update_region1 = Task::update(1, pg); - reader.run_batch(&mut vec![update_region1]); - assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); + { + let mut meta = store_meta.lock().unwrap(); + meta.readers.get_mut(&1).unwrap().update(pg); + } + let task = RaftCommand::new(cmd.clone(), Callback::Read(Box::new(move |_| {}))); + must_not_redirect(&mut reader, &rx, task); + assert_eq!(reader.metrics.borrow().rejected_by_cache_miss, 3); // Let's read. let region = region1.clone(); - let task = Task::read(RaftCommand::new( + let task = RaftCommand::new( cmd.clone(), Callback::Read(Box::new(move |resp: ReadResponse| { let snap = resp.snapshot.unwrap(); assert_eq!(snap.get_region(), ®ion); })), - )); - reader.run_batch(&mut vec![task]); - assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); + ); + must_not_redirect(&mut reader, &rx, task); // Wait for expiration. thread::sleep(Duration::seconds(1).to_std().unwrap()); must_redirect(&mut reader, &rx, cmd.clone()); + assert_eq!(reader.metrics.borrow().rejected_by_lease_expire, 1); // Renew lease. lease.renew(monotonic_raw_now()); @@ -737,16 +715,17 @@ mod tests { .mut_header() .mut_peer() .set_store_id(store_id + 1); - let task = Task::read(RaftCommand::new( + let task = RaftCommand::new( cmd_store_id, Callback::Read(Box::new(move |resp: ReadResponse| { let err = resp.response.get_header().get_error(); assert!(err.has_store_not_match()); assert!(resp.snapshot.is_none()); })), - )); - reader.run_batch(&mut vec![task]); + ); + reader.propose_raft_command(task); assert_eq!(reader.metrics.borrow().rejected_by_store_id_mismatch, 1); + assert_eq!(reader.metrics.borrow().rejected_by_cache_miss, 3); // metapb::Peer id mismatch. let mut cmd_peer_id = cmd.clone(); @@ -754,7 +733,7 @@ mod tests { .mut_header() .mut_peer() .set_id(leader2.get_id() + 1); - let task = Task::read(RaftCommand::new( + let task = RaftCommand::new( cmd_peer_id, Callback::Read(Box::new(move |resp: ReadResponse| { assert!( @@ -764,28 +743,31 @@ mod tests { ); assert!(resp.snapshot.is_none()); })), - )); - reader.run_batch(&mut vec![task]); + ); + reader.propose_raft_command(task); assert_eq!(reader.metrics.borrow().rejected_by_peer_id_mismatch, 1); + assert_eq!(reader.metrics.borrow().rejected_by_cache_miss, 4); // Read quorum. let mut cmd_read_quorum = cmd.clone(); cmd_read_quorum.mut_header().set_read_quorum(true); must_redirect(&mut reader, &rx, cmd_read_quorum); + assert_eq!(reader.metrics.borrow().rejected_by_cache_miss, 5); // Term mismatch. let mut cmd_term = cmd.clone(); cmd_term.mut_header().set_term(term6 - 2); - let task = Task::read(RaftCommand::new( + let task = RaftCommand::new( cmd_term, Callback::Read(Box::new(move |resp: ReadResponse| { let err = resp.response.get_header().get_error(); assert!(err.has_stale_command(), "{:?}", resp); assert!(resp.snapshot.is_none()); })), - )); - reader.run_batch(&mut vec![task]); + ); + reader.propose_raft_command(task); assert_eq!(reader.metrics.borrow().rejected_by_term_mismatch, 1); + assert_eq!(reader.metrics.borrow().rejected_by_cache_miss, 6); // Stale epoch. let mut epoch12 = epoch13.clone(); @@ -794,6 +776,7 @@ mod tests { cmd_epoch.mut_header().set_region_epoch(epoch12); must_redirect(&mut reader, &rx, cmd_epoch); assert_eq!(reader.metrics.borrow().rejected_by_epoch, 1); + assert_eq!(reader.metrics.borrow().rejected_by_cache_miss, 7); // Expire lease manually, and it can not be renewed. let previous_lease_rejection = reader.metrics.borrow().rejected_by_lease_expire; @@ -804,19 +787,20 @@ mod tests { reader.metrics.borrow().rejected_by_lease_expire, previous_lease_rejection + 1 ); + assert_eq!(reader.metrics.borrow().rejected_by_cache_miss, 8); // Channel full. - let task1 = Task::read(RaftCommand::new(cmd.clone(), Callback::None)); - let task_full = Task::read(RaftCommand::new( + let task1 = RaftCommand::new(cmd.clone(), Callback::None); + let task_full = RaftCommand::new( cmd.clone(), Callback::Read(Box::new(move |resp: ReadResponse| { let err = resp.response.get_header().get_error(); assert!(err.has_server_is_busy(), "{:?}", resp); assert!(resp.snapshot.is_none()); })), - )); - reader.run_batch(&mut vec![task1]); - reader.run_batch(&mut vec![task_full]); + ); + reader.propose_raft_command(task1); + reader.propose_raft_command(task_full); rx.try_recv().unwrap(); assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); assert_eq!(reader.metrics.borrow().rejected_by_channel_full, 1); @@ -825,18 +809,24 @@ mod tests { let previous_term_rejection = reader.metrics.borrow().rejected_by_term_mismatch; let mut cmd9 = cmd.clone(); cmd9.mut_header().set_term(term6 + 3); - let msg = RaftCommand::new( + let task = RaftCommand::new( cmd9.clone(), Callback::Read(Box::new(|resp| { panic!("unexpected invoke, {:?}", resp); })), ); - let mut batch = vec![ - Task::update(1, Progress::term(term6 + 3)), - Task::update(1, Progress::applied_index_term(term6 + 3)), - Task::read(msg), - ]; - reader.run_batch(&mut batch); + { + let mut meta = store_meta.lock().unwrap(); + meta.readers + .get_mut(&1) + .unwrap() + .update(Progress::term(term6 + 3)); + meta.readers + .get_mut(&1) + .unwrap() + .update(Progress::applied_index_term(term6 + 3)); + } + reader.propose_raft_command(task); assert_eq!( rx.recv_timeout(Duration::seconds(5).to_std().unwrap()) .unwrap() @@ -847,11 +837,5 @@ mod tests { reader.metrics.borrow().rejected_by_term_mismatch, previous_term_rejection + 1, ); - - // Destroy region 1. - let destroy_region1 = Task::destroy(1); - reader.run_batch(&mut vec![destroy_region1]); - assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); - assert!(reader.delegates.get(&1).is_none()); } } diff --git a/src/server/node.rs b/src/server/node.rs index 53bdf68205f..8ad92a53ad4 100644 --- a/src/server/node.rs +++ b/src/server/node.rs @@ -1,6 +1,6 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; @@ -9,9 +9,10 @@ use super::Result; use crate::import::SSTImporter; use crate::pd::{Error as PdError, PdClient, PdTask, INVALID_ID}; use crate::raftstore::coprocessor::dispatcher::CoprocessorHost; +use crate::raftstore::store::fsm::store::StoreMeta; use crate::raftstore::store::fsm::{RaftBatchSystem, RaftRouter}; use crate::raftstore::store::{ - self, initial_region, keys, Config as StoreConfig, ReadTask, SnapManager, Transport, + self, initial_region, keys, Config as StoreConfig, SnapManager, Transport, }; use crate::server::readpool::ReadPool; use crate::server::Config as ServerConfig; @@ -24,7 +25,7 @@ use engine::Peekable; use kvproto::metapb; use kvproto::raft_serverpb::StoreIdent; use protobuf::RepeatedField; -use tikv_util::worker::{FutureWorker, Worker}; +use tikv_util::worker::FutureWorker; const MAX_CHECK_CLUSTER_BOOTSTRAPPED_RETRY_COUNT: u64 = 60; const CHECK_CLUSTER_BOOTSTRAPPED_RETRY_SECONDS: u64 = 3; @@ -116,7 +117,7 @@ where trans: T, snap_mgr: SnapManager, pd_worker: FutureWorker, - local_read_worker: Worker, + store_meta: Arc>, coprocessor_host: CoprocessorHost, importer: Arc, ) -> Result<()> @@ -131,7 +132,10 @@ where ))); } self.store.set_id(store_id); - + { + let mut meta = store_meta.lock().unwrap(); + meta.store_id = Some(store_id); + } if let Some(first_region) = self.check_or_prepare_bootstrap_cluster(&engines, store_id)? { info!("try bootstrap cluster"; "store_id" => store_id, "region" => ?first_region); // cluster is not bootstrapped, and we choose first store to bootstrap @@ -150,7 +154,7 @@ where trans, snap_mgr, pd_worker, - local_read_worker, + store_meta, coprocessor_host, importer, )?; @@ -316,7 +320,7 @@ where trans: T, snap_mgr: SnapManager, pd_worker: FutureWorker, - local_read_worker: Worker, + store_meta: Arc>, coprocessor_host: CoprocessorHost, importer: Arc, ) -> Result<()> @@ -340,7 +344,7 @@ where pd_client, snap_mgr, pd_worker, - local_read_worker, + store_meta, coprocessor_host, importer, )?; diff --git a/src/server/transport.rs b/src/server/transport.rs index 2fcb51a6864..6cc3113fd82 100644 --- a/src/server/transport.rs +++ b/src/server/transport.rs @@ -11,7 +11,7 @@ use super::resolve::StoreAddrResolver; use super::snap::Task as SnapTask; use crate::raftstore::store::fsm::RaftRouter; use crate::raftstore::store::{ - Callback, CasualMessage, PeerMsg, RaftCommand, ReadTask, SignificantMsg, StoreMsg, Transport, + Callback, CasualMessage, LocalReader, PeerMsg, RaftCommand, SignificantMsg, StoreMsg, Transport, }; use crate::raftstore::{DiscardReason, Error as RaftStoreError, Result as RaftStoreResult}; use crate::server::raft_client::RaftClient; @@ -95,15 +95,15 @@ impl RaftStoreRouter for RaftStoreBlackHole { #[derive(Clone)] pub struct ServerRaftStoreRouter { router: RaftRouter, - local_reader_ch: Scheduler, + local_reader: LocalReader, } impl ServerRaftStoreRouter { /// Creates a new router. - pub fn new(router: RaftRouter, local_reader_ch: Scheduler) -> ServerRaftStoreRouter { + pub fn new(router: RaftRouter, local_reader: LocalReader) -> ServerRaftStoreRouter { ServerRaftStoreRouter { router, - local_reader_ch, + local_reader, } } @@ -135,10 +135,9 @@ impl RaftStoreRouter for ServerRaftStoreRouter { fn send_command(&self, req: RaftCmdRequest, cb: Callback) -> RaftStoreResult<()> { let cmd = RaftCommand::new(req, cb); - if ReadTask::acceptable(&cmd.request) { - self.local_reader_ch - .schedule(ReadTask::read(cmd)) - .map_err(|e| box_err!(e)) + if LocalReader::::acceptable(&cmd.request) { + self.local_reader.execute_raft_command(cmd); + Ok(()) } else { let region_id = cmd.request.get_header().get_region_id(); self.router diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 16a4acc774a..4143ed9347a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -10,12 +10,10 @@ pub mod readpool_impl; pub mod txn; pub mod types; -use std::cmp; -use std::error; use std::fmt::{self, Debug, Display, Formatter}; use std::io::Error as IoError; -use std::sync::{atomic, Arc}; -use std::u64; +use std::sync::{atomic, Arc, Mutex}; +use std::{cmp, error, u64}; use engine::rocks::DB; use engine::{IterOption, DATA_KEY_PREFIX_LEN}; @@ -35,9 +33,9 @@ pub use self::config::{BlockCacheConfig, Config, DEFAULT_DATA_DIR, DEFAULT_ROCKS pub use self::gc_worker::{AutoGCConfig, GCSafePointProvider}; pub use self::kv::raftkv::RaftKv; pub use self::kv::{ - CFStatistics, Cursor, CursorBuilder, Engine, Error as EngineError, FlowStatistics, Iterator, - Modify, RegionInfoProvider, RocksEngine, ScanMode, Snapshot, Statistics, StatisticsSummary, - TestEngineBuilder, + destroy_tls_engine, set_tls_engine, with_tls_engine, CFStatistics, Cursor, CursorBuilder, + Engine, Error as EngineError, FlowStatistics, Iterator, Modify, RegionInfoProvider, + RocksEngine, ScanMode, Snapshot, Statistics, StatisticsSummary, TestEngineBuilder, }; use self::lock_manager::{DetectorScheduler, WaiterMgrScheduler}; pub use self::mvcc::Scanner as StoreScanner; @@ -497,7 +495,11 @@ impl TestStorageBuilder { /// Build a `Storage`. pub fn build(self) -> Result> { - let read_pool = ReadPoolBuilder::from_config(&readpool::Config::default_for_test()).build(); + let engine = Arc::new(Mutex::new(self.engine.clone())); + let read_pool = ReadPoolBuilder::from_config(&readpool::Config::default_for_test()) + .after_start(move || set_tls_engine(engine.lock().unwrap().clone())) + .before_stop(|| destroy_tls_engine::()) + .build(); Storage::from_engine( self.engine, &self.config, @@ -660,7 +662,7 @@ impl Storage { } /// Get a snapshot of `engine`. - fn async_snapshot(engine: E, ctx: &Context) -> impl Future { + fn async_snapshot(engine: &E, ctx: &Context) -> impl Future { let (callback, future) = tikv_util::future::paired_future_callback(); let val = engine.async_snapshot(ctx, callback); @@ -681,42 +683,43 @@ impl Storage { start_ts: u64, ) -> impl Future, Error = Error> { const CMD: &str = "get"; - let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); let res = self.read_pool.spawn_handle(priority, move || { tls_collect_command_count(CMD, priority); let command_duration = tikv_util::time::Instant::now_coarse(); - Self::async_snapshot(engine, &ctx) - .and_then(move |snapshot: E::Snap| { - tls_processing_read_observe_duration(CMD, || { - let mut statistics = Statistics::default(); - let snap_store = SnapshotStore::new( - snapshot, - start_ts, - ctx.get_isolation_level(), - !ctx.get_not_fill_cache(), - ); - let result = snap_store - .get(&key, &mut statistics) - // map storage::txn::Error -> storage::Error - .map_err(Error::from) - .map(|r| { - tls_collect_key_reads(CMD, 1); - r - }); - - tls_collect_scan_count(CMD, &statistics); - tls_collect_read_flow(ctx.get_region_id(), &statistics); - - result + with_tls_engine(|engine| { + Self::async_snapshot(engine, &ctx) + .and_then(move |snapshot: E::Snap| { + tls_processing_read_observe_duration(CMD, || { + let mut statistics = Statistics::default(); + let snap_store = SnapshotStore::new( + snapshot, + start_ts, + ctx.get_isolation_level(), + !ctx.get_not_fill_cache(), + ); + let result = snap_store + .get(&key, &mut statistics) + // map storage::txn::Error -> storage::Error + .map_err(Error::from) + .map(|r| { + tls_collect_key_reads(CMD, 1); + r + }); + + tls_collect_scan_count(CMD, &statistics); + tls_collect_read_flow(ctx.get_region_id(), &statistics); + + result + }) }) - }) - .then(move |r| { - tls_collect_command_duration(CMD, command_duration.elapsed()); - r - }) + .then(move |r| { + tls_collect_command_duration(CMD, command_duration.elapsed()); + r + }) + }) }); future::result(res) @@ -733,48 +736,49 @@ impl Storage { start_ts: u64, ) -> impl Future>, Error = Error> { const CMD: &str = "batch_get"; - let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); let res = self.read_pool.spawn_handle(priority, move || { tls_collect_command_count(CMD, priority); let command_duration = tikv_util::time::Instant::now_coarse(); - Self::async_snapshot(engine, &ctx) - .and_then(move |snapshot: E::Snap| { - tls_processing_read_observe_duration(CMD, || { - let mut statistics = Statistics::default(); - let snap_store = SnapshotStore::new( - snapshot, - start_ts, - ctx.get_isolation_level(), - !ctx.get_not_fill_cache(), - ); - let kv_pairs: Vec<_> = snap_store - .batch_get(&keys, &mut statistics) - .into_iter() - .zip(keys) - .filter(|&(ref v, ref _k)| { - !(v.is_ok() && v.as_ref().unwrap().is_none()) - }) - .map(|(v, k)| match v { - Ok(Some(x)) => Ok((k.into_raw().unwrap(), x)), - Err(e) => Err(Error::from(e)), - _ => unreachable!(), - }) - .collect(); - - tls_collect_key_reads(CMD, kv_pairs.len()); - tls_collect_scan_count(CMD, &statistics); - tls_collect_read_flow(ctx.get_region_id(), &statistics); - - Ok(kv_pairs) + with_tls_engine(|engine| { + Self::async_snapshot(engine, &ctx) + .and_then(move |snapshot: E::Snap| { + tls_processing_read_observe_duration(CMD, || { + let mut statistics = Statistics::default(); + let snap_store = SnapshotStore::new( + snapshot, + start_ts, + ctx.get_isolation_level(), + !ctx.get_not_fill_cache(), + ); + let kv_pairs: Vec<_> = snap_store + .batch_get(&keys, &mut statistics) + .into_iter() + .zip(keys) + .filter(|&(ref v, ref _k)| { + !(v.is_ok() && v.as_ref().unwrap().is_none()) + }) + .map(|(v, k)| match v { + Ok(Some(x)) => Ok((k.into_raw().unwrap(), x)), + Err(e) => Err(Error::from(e)), + _ => unreachable!(), + }) + .collect(); + + tls_collect_key_reads(CMD, kv_pairs.len()); + tls_collect_scan_count(CMD, &statistics); + tls_collect_read_flow(ctx.get_region_id(), &statistics); + + Ok(kv_pairs) + }) }) - }) - .then(move |r| { - tls_collect_command_duration(CMD, command_duration.elapsed()); - r - }) + .then(move |r| { + tls_collect_command_duration(CMD, command_duration.elapsed()); + r + }) + }) }); future::result(res) @@ -795,58 +799,59 @@ impl Storage { options: Options, ) -> impl Future>, Error = Error> { const CMD: &str = "scan"; - let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); let res = self.read_pool.spawn_handle(priority, move || { tls_collect_command_count(CMD, priority); let command_duration = tikv_util::time::Instant::now_coarse(); - Self::async_snapshot(engine, &ctx) - .and_then(move |snapshot: E::Snap| { - tls_processing_read_observe_duration(CMD, || { - let snap_store = SnapshotStore::new( - snapshot, - start_ts, - ctx.get_isolation_level(), - !ctx.get_not_fill_cache(), - ); - - let mut scanner; - if !options.reverse_scan { - scanner = snap_store.scanner( - false, - options.key_only, - Some(start_key), - end_key, - )?; - } else { - scanner = snap_store.scanner( - true, - options.key_only, - end_key, - Some(start_key), - )?; - }; - let res = scanner.scan(limit); - - let statistics = scanner.take_statistics(); - tls_collect_scan_count(CMD, &statistics); - tls_collect_read_flow(ctx.get_region_id(), &statistics); - - res.map_err(Error::from).map(|results| { - tls_collect_key_reads(CMD, results.len()); - results - .into_iter() - .map(|x| x.map_err(Error::from)) - .collect() + with_tls_engine(|engine| { + Self::async_snapshot(engine, &ctx) + .and_then(move |snapshot: E::Snap| { + tls_processing_read_observe_duration(CMD, || { + let snap_store = SnapshotStore::new( + snapshot, + start_ts, + ctx.get_isolation_level(), + !ctx.get_not_fill_cache(), + ); + + let mut scanner; + if !options.reverse_scan { + scanner = snap_store.scanner( + false, + options.key_only, + Some(start_key), + end_key, + )?; + } else { + scanner = snap_store.scanner( + true, + options.key_only, + end_key, + Some(start_key), + )?; + }; + let res = scanner.scan(limit); + + let statistics = scanner.take_statistics(); + tls_collect_scan_count(CMD, &statistics); + tls_collect_read_flow(ctx.get_region_id(), &statistics); + + res.map_err(Error::from).map(|results| { + tls_collect_key_reads(CMD, results.len()); + results + .into_iter() + .map(|x| x.map_err(Error::from)) + .collect() + }) }) }) - }) - .then(move |r| { - tls_collect_command_duration(CMD, command_duration.elapsed()); - r - }) + .then(move |r| { + tls_collect_command_duration(CMD, command_duration.elapsed()); + r + }) + }) }); future::result(res) @@ -1109,46 +1114,50 @@ impl Storage { key: Vec, ) -> impl Future>, Error = Error> { const CMD: &str = "raw_get"; - let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); - let timer = SCHED_HISTOGRAM_VEC_STATIC.raw_get.start_coarse_timer(); - - let readpool = self.read_pool.clone(); - - Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { - let res = readpool.spawn_handle(priority, move || { - tls_processing_read_observe_duration(CMD, || { - let cf = match Self::rawkv_cf(&cf) { - Ok(x) => x, - Err(e) => return future::err(e), - }; - // no scan_count for this kind of op. - - let key_len = key.len(); - let result = snapshot - .get_cf(cf, &Key::from_encoded(key)) - // map storage::engine::Error -> storage::Error - .map_err(Error::from) - .map(|r| { - if let Some(ref value) = r { - let mut stats = Statistics::default(); - stats.data.flow_stats.read_keys = 1; - stats.data.flow_stats.read_bytes = key_len + value.len(); - tls_collect_read_flow(ctx.get_region_id(), &stats); - tls_collect_key_reads(CMD, 1); - } - r - }); + let res = self.read_pool.spawn_handle(priority, move || { + tls_collect_command_count(CMD, priority); + let command_duration = tikv_util::time::Instant::now_coarse(); - timer.observe_duration(); - future::result(result) - }) - }); - future::result(res) - .map_err(|_| Error::SchedTooBusy) - .flatten() - }) + with_tls_engine(|engine| { + Self::async_snapshot(engine, &ctx) + .and_then(move |snapshot: E::Snap| { + tls_processing_read_observe_duration(CMD, || { + let cf = match Self::rawkv_cf(&cf) { + Ok(x) => x, + Err(e) => return future::err(e), + }; + // no scan_count for this kind of op. + + let key_len = key.len(); + let result = snapshot + .get_cf(cf, &Key::from_encoded(key)) + // map storage::engine::Error -> storage::Error + .map_err(Error::from) + .map(|r| { + if let Some(ref value) = r { + let mut stats = Statistics::default(); + stats.data.flow_stats.read_keys = 1; + stats.data.flow_stats.read_bytes = key_len + value.len(); + tls_collect_read_flow(ctx.get_region_id(), &stats); + tls_collect_key_reads(CMD, 1); + } + r + }); + future::result(result) + }) + }) + .then(move |r| { + tls_collect_command_duration(CMD, command_duration.elapsed()); + r + }) + }) + }); + + future::result(res) + .map_err(|_| Error::SchedTooBusy) + .flatten() } /// Get the values of some raw keys in a batch. @@ -1159,54 +1168,57 @@ impl Storage { keys: Vec>, ) -> impl Future>, Error = Error> { const CMD: &str = "raw_batch_get"; - let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); - let timer = SCHED_HISTOGRAM_VEC_STATIC - .raw_batch_get - .start_coarse_timer(); - - let readpool = self.read_pool.clone(); - - Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { - let res = readpool.spawn_handle(priority, move || { - tls_processing_read_observe_duration(CMD, || { - let keys: Vec = keys.into_iter().map(Key::from_encoded).collect(); - let cf = match Self::rawkv_cf(&cf) { - Ok(x) => x, - Err(e) => return future::err(e), - }; - // no scan_count for this kind of op. - let mut stats = Statistics::default(); - let result: Vec> = keys - .into_iter() - .map(|k| { - let v = snapshot.get_cf(cf, &k); - (k, v) - }) - .filter(|&(_, ref v)| !(v.is_ok() && v.as_ref().unwrap().is_none())) - .map(|(k, v)| match v { - Ok(Some(v)) => { - stats.data.flow_stats.read_keys += 1; - stats.data.flow_stats.read_bytes += k.as_encoded().len() + v.len(); - Ok((k.into_encoded(), v)) - } - Err(e) => Err(Error::from(e)), - _ => unreachable!(), - }) - .collect(); + let res = self.read_pool.spawn_handle(priority, move || { + tls_collect_command_count(CMD, priority); + let command_duration = tikv_util::time::Instant::now_coarse(); - tls_collect_key_reads(CMD, stats.data.flow_stats.read_keys as usize); - tls_collect_read_flow(ctx.get_region_id(), &stats); + with_tls_engine(|engine| { + Self::async_snapshot(engine, &ctx) + .and_then(move |snapshot: E::Snap| { + tls_processing_read_observe_duration(CMD, || { + let keys: Vec = keys.into_iter().map(Key::from_encoded).collect(); + let cf = match Self::rawkv_cf(&cf) { + Ok(x) => x, + Err(e) => return future::err(e), + }; + // no scan_count for this kind of op. + let mut stats = Statistics::default(); + let result: Vec> = keys + .into_iter() + .map(|k| { + let v = snapshot.get_cf(cf, &k); + (k, v) + }) + .filter(|&(_, ref v)| !(v.is_ok() && v.as_ref().unwrap().is_none())) + .map(|(k, v)| match v { + Ok(Some(v)) => { + stats.data.flow_stats.read_keys += 1; + stats.data.flow_stats.read_bytes += + k.as_encoded().len() + v.len(); + Ok((k.into_encoded(), v)) + } + Err(e) => Err(Error::from(e)), + _ => unreachable!(), + }) + .collect(); + + tls_collect_key_reads(CMD, stats.data.flow_stats.read_keys as usize); + tls_collect_read_flow(ctx.get_region_id(), &stats); + future::ok(result) + }) + }) + .then(move |r| { + tls_collect_command_duration(CMD, command_duration.elapsed()); + r + }) + }) + }); - timer.observe_duration(); - future::ok(result) - }) - }); - future::result(res) - .map_err(|_| Error::SchedTooBusy) - .flatten() - }) + future::result(res) + .map_err(|_| Error::SchedTooBusy) + .flatten() } /// Write a raw key to the storage. @@ -1439,55 +1451,62 @@ impl Storage { reverse: bool, ) -> impl Future>, Error = Error> { const CMD: &str = "raw_scan"; - let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); - let timer = SCHED_HISTOGRAM_VEC_STATIC.raw_scan.start_coarse_timer(); - - let readpool = self.read_pool.clone(); - - Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { - let res = readpool.spawn_handle(priority, move || { - tls_processing_read_observe_duration(CMD, || { - let end_key = end_key.map(Key::from_encoded); - - let mut statistics = Statistics::default(); - let result = if reverse { - Self::reverse_raw_scan( - &snapshot, - &cf, - &Key::from_encoded(key), - end_key, - limit, - &mut statistics, - key_only, - ) - .map_err(Error::from) - } else { - Self::raw_scan( - &snapshot, - &cf, - &Key::from_encoded(key), - end_key, - limit, - &mut statistics, - key_only, - ) - .map_err(Error::from) - }; - - tls_collect_read_flow(ctx.get_region_id(), &statistics); - tls_collect_key_reads(CMD, statistics.write.flow_stats.read_keys as usize); - tls_collect_scan_count(CMD, &statistics); - - timer.observe_duration(); - future::result(result) - }) - }); - future::result(res) - .map_err(|_| Error::SchedTooBusy) - .flatten() - }) + let res = self.read_pool.spawn_handle(priority, move || { + tls_collect_command_count(CMD, priority); + let command_duration = tikv_util::time::Instant::now_coarse(); + + with_tls_engine(|engine| { + Self::async_snapshot(engine, &ctx) + .and_then(move |snapshot: E::Snap| { + tls_processing_read_observe_duration(CMD, || { + let end_key = end_key.map(Key::from_encoded); + + let mut statistics = Statistics::default(); + let result = if reverse { + Self::reverse_raw_scan( + &snapshot, + &cf, + &Key::from_encoded(key), + end_key, + limit, + &mut statistics, + key_only, + ) + .map_err(Error::from) + } else { + Self::raw_scan( + &snapshot, + &cf, + &Key::from_encoded(key), + end_key, + limit, + &mut statistics, + key_only, + ) + .map_err(Error::from) + }; + + tls_collect_read_flow(ctx.get_region_id(), &statistics); + tls_collect_key_reads( + CMD, + statistics.write.flow_stats.read_keys as usize, + ); + tls_collect_scan_count(CMD, &statistics); + future::result(result) + }) + }) + .then(move |r| { + tls_collect_command_duration(CMD, command_duration.elapsed()); + r + }) + }) + }); + + future::result(res) + .map_err(|_| Error::SchedTooBusy) + .flatten() } /// Check the given raw kv CF name. Return the CF name, or `Err` if given CF name is invalid. @@ -1537,79 +1556,83 @@ impl Storage { reverse: bool, ) -> impl Future>, Error = Error> { const CMD: &str = "raw_batch_scan"; - let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); - let timer = SCHED_HISTOGRAM_VEC_STATIC - .raw_batch_scan - .start_coarse_timer(); - - let readpool = self.read_pool.clone(); - - Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { - let res = readpool.spawn_handle(priority, move || { - tls_processing_read_observe_duration(CMD, || { - let mut statistics = Statistics::default(); - if !Self::check_key_ranges(&ranges, reverse) { - return future::result(Err(box_err!("Invalid KeyRanges"))); - }; - let mut result = Vec::new(); - let ranges_len = ranges.len(); - for i in 0..ranges_len { - let start_key = Key::from_encoded(ranges[i].take_start_key()); - let end_key = ranges[i].take_end_key(); - let end_key = if end_key.is_empty() { - if i + 1 == ranges_len { - None - } else { - Some(Key::from_encoded_slice(ranges[i + 1].get_start_key())) - } - } else { - Some(Key::from_encoded(end_key)) - }; - let pairs = if reverse { - match Self::reverse_raw_scan( - &snapshot, - &cf, - &start_key, - end_key, - each_limit, - &mut statistics, - key_only, - ) { - Ok(x) => x, - Err(e) => return future::err(e), - } - } else { - match Self::raw_scan( - &snapshot, - &cf, - &start_key, - end_key, - each_limit, - &mut statistics, - key_only, - ) { - Ok(x) => x, - Err(e) => return future::err(e), - } - }; - result.extend(pairs.into_iter()); - } + let res = self.read_pool.spawn_handle(priority, move || { + tls_collect_command_count(CMD, priority); + let command_duration = tikv_util::time::Instant::now_coarse(); - tls_collect_read_flow(ctx.get_region_id(), &statistics); - tls_collect_key_reads(CMD, statistics.write.flow_stats.read_keys as usize); + with_tls_engine(|engine| { + Self::async_snapshot(engine, &ctx) + .and_then(move |snapshot: E::Snap| { + tls_processing_read_observe_duration(CMD, || { + let mut statistics = Statistics::default(); + if !Self::check_key_ranges(&ranges, reverse) { + return future::result(Err(box_err!("Invalid KeyRanges"))); + }; + let mut result = Vec::new(); + let ranges_len = ranges.len(); + for i in 0..ranges_len { + let start_key = Key::from_encoded(ranges[i].take_start_key()); + let end_key = ranges[i].take_end_key(); + let end_key = if end_key.is_empty() { + if i + 1 == ranges_len { + None + } else { + Some(Key::from_encoded_slice(ranges[i + 1].get_start_key())) + } + } else { + Some(Key::from_encoded(end_key)) + }; + let pairs = if reverse { + match Self::reverse_raw_scan( + &snapshot, + &cf, + &start_key, + end_key, + each_limit, + &mut statistics, + key_only, + ) { + Ok(x) => x, + Err(e) => return future::err(e), + } + } else { + match Self::raw_scan( + &snapshot, + &cf, + &start_key, + end_key, + each_limit, + &mut statistics, + key_only, + ) { + Ok(x) => x, + Err(e) => return future::err(e), + } + }; + result.extend(pairs.into_iter()); + } - tls_collect_scan_count(CMD, &statistics); + tls_collect_read_flow(ctx.get_region_id(), &statistics); + tls_collect_key_reads( + CMD, + statistics.write.flow_stats.read_keys as usize, + ); + tls_collect_scan_count(CMD, &statistics); + future::ok(result) + }) + }) + .then(move |r| { + tls_collect_command_duration(CMD, command_duration.elapsed()); + r + }) + }) + }); - timer.observe_duration(); - future::ok(result) - }) - }); - future::result(res) - .map_err(|_| Error::SchedTooBusy) - .flatten() - }) + future::result(res) + .map_err(|_| Error::SchedTooBusy) + .flatten() } /// Get MVCC info of a transactional key. @@ -3105,9 +3128,10 @@ mod tests { ] .into_iter() .map(|(k, v)| Some((k, v))); + let engine = storage.get_engine(); expect_multi_values( results.clone().collect(), - >::async_snapshot(storage.get_engine(), &ctx) + >::async_snapshot(&engine, &ctx) .and_then(move |snapshot| { >::raw_scan( &snapshot, @@ -3123,7 +3147,7 @@ mod tests { ); expect_multi_values( results.rev().collect(), - >::async_snapshot(storage.get_engine(), &ctx) + >::async_snapshot(&engine, &ctx) .and_then(move |snapshot| { >::reverse_raw_scan( &snapshot, diff --git a/tests/integrations/raftstore/test_bootstrap.rs b/tests/integrations/raftstore/test_bootstrap.rs index 32036386de8..fe5a721e42c 100644 --- a/tests/integrations/raftstore/test_bootstrap.rs +++ b/tests/integrations/raftstore/test_bootstrap.rs @@ -1,7 +1,7 @@ // Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0. use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tempdir::TempDir; @@ -14,9 +14,10 @@ use engine::*; use test_raftstore::*; use tikv::import::SSTImporter; use tikv::raftstore::coprocessor::CoprocessorHost; +use tikv::raftstore::store::fsm::store::StoreMeta; use tikv::raftstore::store::{bootstrap_store, fsm, keys, SnapManager}; use tikv::server::Node; -use tikv_util::worker::{FutureWorker, Worker}; +use tikv_util::worker::FutureWorker; fn test_bootstrap_idempotent(cluster: &mut Cluster) { // assume that there is a node bootstrap the cluster and add region in pd successfully @@ -59,7 +60,6 @@ fn test_node_bootstrap_with_prepared_data() { let mut node = Node::new(system, &cfg.server, &cfg.raft_store, Arc::clone(&pd_client)); let snap_mgr = SnapManager::new(tmp_mgr.path().to_str().unwrap(), Some(node.get_router())); let pd_worker = FutureWorker::new("test-pd-worker"); - let local_reader = Worker::new("test-local-reader"); // assume there is a node has bootstrapped the cluster and add region in pd successfully bootstrap_with_first_region(Arc::clone(&pd_client)).unwrap(); @@ -92,7 +92,7 @@ fn test_node_bootstrap_with_prepared_data() { simulate_trans, snap_mgr, pd_worker, - local_reader, + Arc::new(Mutex::new(StoreMeta::new(0))), coprocessor_host, importer, )