From 6f42e5649ed688186e0e47ac6e06e50a9b00013f Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 7 Feb 2020 17:30:56 +0800 Subject: [PATCH] raftstore: cleanup storage import (#6538) Signed-off-by: Neil Shen --- src/raftstore/coprocessor/properties.rs | 6 ++-- src/raftstore/coprocessor/split_check/keys.rs | 3 +- src/raftstore/store/mod.rs | 2 +- src/raftstore/store/worker/compact.rs | 3 +- src/raftstore/store/worker/mod.rs | 2 +- src/raftstore/store/worker/pd.rs | 30 ++++++++++++++++- src/storage/kv/stats.rs | 33 +------------------ 7 files changed, 36 insertions(+), 43 deletions(-) diff --git a/src/raftstore/coprocessor/properties.rs b/src/raftstore/coprocessor/properties.rs index 20ff87ac220..f5542b6fde4 100644 --- a/src/raftstore/coprocessor/properties.rs +++ b/src/raftstore/coprocessor/properties.rs @@ -4,7 +4,6 @@ use std::cmp; use std::collections::HashMap; use std::u64; -use crate::storage::mvcc::{TimeStamp, Write, WriteType}; use engine::rocks::{ CFHandle, DBEntryType, Range, TablePropertiesCollector, TablePropertiesCollectorFactory, DB, }; @@ -13,7 +12,7 @@ pub use engine_rocks::{ RangePropertiesCollector, RangePropertiesCollectorFactory, UserProperties, }; use tikv_util::codec::Result; -use txn_types::Key; +use txn_types::{Key, TimeStamp, Write, WriteType}; const PROP_NUM_ERRORS: &str = "tikv.num_errors"; const PROP_MIN_TS: &str = "tikv.min_ts"; @@ -223,11 +222,10 @@ mod tests { use test::Bencher; use crate::raftstore::coprocessor::properties::MvccPropertiesCollectorFactory; - use crate::storage::mvcc::{Write, WriteType}; use engine::rocks; use engine::rocks::util::CFOptions; use engine::{CF_WRITE, LARGE_CFS}; - use txn_types::Key; + use txn_types::{Key, Write, WriteType}; use super::*; diff --git a/src/raftstore/coprocessor/split_check/keys.rs b/src/raftstore/coprocessor/split_check/keys.rs index 05e9d242a5d..3bfdf1b1dcb 100644 --- a/src/raftstore/coprocessor/split_check/keys.rs +++ b/src/raftstore/coprocessor/split_check/keys.rs @@ -206,7 +206,6 @@ mod tests { }; use crate::raftstore::coprocessor::{Config, CoprocessorHost}; use crate::raftstore::store::{CasualMessage, SplitCheckRunner, SplitCheckTask}; - use crate::storage::mvcc::{TimeStamp, Write, WriteType}; use engine::rocks; use engine::rocks::util::{new_engine_opt, CFOptions}; use engine::rocks::{ColumnFamilyOptions, DBOptions, Writable}; @@ -219,7 +218,7 @@ mod tests { use std::u64; use tempfile::Builder; use tikv_util::worker::Runnable; - use txn_types::Key; + use txn_types::{Key, TimeStamp, Write, WriteType}; use super::*; diff --git a/src/raftstore/store/mod.rs b/src/raftstore/store/mod.rs index 32137528af5..7e362334581 100644 --- a/src/raftstore/store/mod.rs +++ b/src/raftstore/store/mod.rs @@ -44,6 +44,6 @@ pub use self::snap::{ SnapManagerBuilder, Snapshot, SnapshotDeleter, SnapshotStatistics, }; pub use self::transport::{CasualRouter, ProposalRouter, StoreRouter, Transport}; -pub use self::worker::PdTask; +pub use self::worker::{FlowStatistics, FlowStatsReporter, PdTask}; pub use self::worker::{KeyEntry, LocalReader, RegionTask}; pub use self::worker::{SplitCheckRunner, SplitCheckTask}; diff --git a/src/raftstore/store/worker/compact.rs b/src/raftstore/store/worker/compact.rs index 43213934428..0c30d113b7d 100644 --- a/src/raftstore/store/worker/compact.rs +++ b/src/raftstore/store/worker/compact.rs @@ -261,9 +261,8 @@ mod tests { use crate::raftstore::coprocessor::properties::get_range_entries_and_versions; use crate::raftstore::coprocessor::properties::MvccPropertiesCollectorFactory; - use crate::storage::mvcc::{TimeStamp, Write, WriteType}; use keys::data_key; - use txn_types::Key; + use txn_types::{Key, TimeStamp, Write, WriteType}; use super::*; diff --git a/src/raftstore/store/worker/mod.rs b/src/raftstore/store/worker/mod.rs index 2d1d76342d9..b5a24ae2d9b 100644 --- a/src/raftstore/store/worker/mod.rs +++ b/src/raftstore/store/worker/mod.rs @@ -15,7 +15,7 @@ pub use self::cleanup::{Runner as CleanupRunner, Task as CleanupTask}; pub use self::cleanup_sst::{Runner as CleanupSSTRunner, Task as CleanupSSTTask}; pub use self::compact::{Runner as CompactRunner, Task as CompactTask}; pub use self::consistency_check::{Runner as ConsistencyCheckRunner, Task as ConsistencyCheckTask}; -pub use self::pd::{Runner as PdRunner, Task as PdTask}; +pub use self::pd::{FlowStatistics, FlowStatsReporter, Runner as PdRunner, Task as PdTask}; pub use self::raftlog_gc::{Runner as RaftlogGcRunner, Task as RaftlogGcTask}; pub use self::read::{LocalReader, Progress as ReadProgress, ReadDelegate}; pub use self::region::{ diff --git a/src/raftstore/store/worker/pd.rs b/src/raftstore/store/worker/pd.rs index ff45830d0a7..070b5b8876c 100644 --- a/src/raftstore/store/worker/pd.rs +++ b/src/raftstore/store/worker/pd.rs @@ -31,7 +31,6 @@ use crate::raftstore::store::util::KeysInfoFormatter; use crate::raftstore::store::Callback; use crate::raftstore::store::StoreInfo; use crate::raftstore::store::{CasualMessage, PeerMsg, RaftCommand, RaftRouter}; -use crate::storage::FlowStatistics; use pd_client::metrics::*; use pd_client::{ConfigClient, Error, PdClient, RegionStat}; use tikv_util::collections::HashMap; @@ -41,6 +40,35 @@ use tikv_util::worker::{FutureRunnable as Runnable, FutureScheduler as Scheduler type RecordPairVec = Vec; +#[derive(Default, Debug, Clone)] +pub struct FlowStatistics { + pub read_keys: usize, + pub read_bytes: usize, +} + +impl FlowStatistics { + pub fn add(&mut self, other: &Self) { + self.read_bytes = self.read_bytes.saturating_add(other.read_bytes); + self.read_keys = self.read_keys.saturating_add(other.read_keys); + } +} + +// Reports flow statistics to outside. +pub trait FlowStatsReporter: Send + Clone + Sync + 'static { + // Reports read flow statistics, the argument `read_stats` is a hash map + // saves the flow statistics of different region. + // TODO: maybe we need to return a Result later? + fn report_read_stats(&self, read_stats: HashMap); +} + +impl FlowStatsReporter for Scheduler { + fn report_read_stats(&self, read_stats: HashMap) { + if let Err(e) = self.schedule(Task::ReadStats { read_stats }) { + error!("Failed to send read flow statistics"; "err" => ?e); + } + } +} + /// Uses an asynchronous thread to tell PD something. pub enum Task { AskSplit { diff --git a/src/storage/kv/stats.rs b/src/storage/kv/stats.rs index 251cf371262..256899f3d25 100644 --- a/src/storage/kv/stats.rs +++ b/src/storage/kv/stats.rs @@ -2,10 +2,8 @@ use engine::{CF_DEFAULT, CF_LOCK, CF_WRITE}; use kvproto::kvrpcpb::{ScanDetail, ScanInfo}; -use tikv_util::collections::HashMap; -use tikv_util::worker::FutureScheduler; -use crate::raftstore::store::PdTask; +pub use crate::raftstore::store::{FlowStatistics, FlowStatsReporter}; const STAT_TOTAL: &str = "total"; const STAT_PROCESSED: &str = "processed"; @@ -31,35 +29,6 @@ pub struct CfStatistics { pub flow_stats: FlowStatistics, } -#[derive(Default, Debug, Clone)] -pub struct FlowStatistics { - pub read_keys: usize, - pub read_bytes: usize, -} - -// Reports flow statistics to outside. -pub trait FlowStatsReporter: Send + Clone + Sync + 'static { - // Reports read flow statistics, the argument `read_stats` is a hash map - // saves the flow statistics of different region. - // TODO: maybe we need to return a Result later? - fn report_read_stats(&self, read_stats: HashMap); -} - -impl FlowStatsReporter for FutureScheduler { - fn report_read_stats(&self, read_stats: HashMap) { - if let Err(e) = self.schedule(PdTask::ReadStats { read_stats }) { - error!("Failed to send read flow statistics"; "err" => ?e); - } - } -} - -impl FlowStatistics { - pub fn add(&mut self, other: &Self) { - self.read_bytes = self.read_bytes.saturating_add(other.read_bytes); - self.read_keys = self.read_keys.saturating_add(other.read_keys); - } -} - impl CfStatistics { #[inline] pub fn total_op_count(&self) -> usize {