Skip to content

Commit

Permalink
raftstore: cleanup storage import (tikv#6538)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus authored and c1ay committed May 9, 2020
1 parent 8a0f47c commit 6f42e56
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 43 deletions.
6 changes: 2 additions & 4 deletions src/raftstore/coprocessor/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::cmp;
use std::collections::HashMap;
use std::u64;

use crate::storage::mvcc::{TimeStamp, Write, WriteType};
use engine::rocks::{
CFHandle, DBEntryType, Range, TablePropertiesCollector, TablePropertiesCollectorFactory, DB,
};
Expand All @@ -13,7 +12,7 @@ pub use engine_rocks::{
RangePropertiesCollector, RangePropertiesCollectorFactory, UserProperties,
};
use tikv_util::codec::Result;
use txn_types::Key;
use txn_types::{Key, TimeStamp, Write, WriteType};

const PROP_NUM_ERRORS: &str = "tikv.num_errors";
const PROP_MIN_TS: &str = "tikv.min_ts";
Expand Down Expand Up @@ -223,11 +222,10 @@ mod tests {
use test::Bencher;

use crate::raftstore::coprocessor::properties::MvccPropertiesCollectorFactory;
use crate::storage::mvcc::{Write, WriteType};
use engine::rocks;
use engine::rocks::util::CFOptions;
use engine::{CF_WRITE, LARGE_CFS};
use txn_types::Key;
use txn_types::{Key, Write, WriteType};

use super::*;

Expand Down
3 changes: 1 addition & 2 deletions src/raftstore/coprocessor/split_check/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ mod tests {
};
use crate::raftstore::coprocessor::{Config, CoprocessorHost};
use crate::raftstore::store::{CasualMessage, SplitCheckRunner, SplitCheckTask};
use crate::storage::mvcc::{TimeStamp, Write, WriteType};
use engine::rocks;
use engine::rocks::util::{new_engine_opt, CFOptions};
use engine::rocks::{ColumnFamilyOptions, DBOptions, Writable};
Expand All @@ -219,7 +218,7 @@ mod tests {
use std::u64;
use tempfile::Builder;
use tikv_util::worker::Runnable;
use txn_types::Key;
use txn_types::{Key, TimeStamp, Write, WriteType};

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ pub use self::snap::{
SnapManagerBuilder, Snapshot, SnapshotDeleter, SnapshotStatistics,
};
pub use self::transport::{CasualRouter, ProposalRouter, StoreRouter, Transport};
pub use self::worker::PdTask;
pub use self::worker::{FlowStatistics, FlowStatsReporter, PdTask};
pub use self::worker::{KeyEntry, LocalReader, RegionTask};
pub use self::worker::{SplitCheckRunner, SplitCheckTask};
3 changes: 1 addition & 2 deletions src/raftstore/store/worker/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,8 @@ mod tests {

use crate::raftstore::coprocessor::properties::get_range_entries_and_versions;
use crate::raftstore::coprocessor::properties::MvccPropertiesCollectorFactory;
use crate::storage::mvcc::{TimeStamp, Write, WriteType};
use keys::data_key;
use txn_types::Key;
use txn_types::{Key, TimeStamp, Write, WriteType};

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use self::cleanup::{Runner as CleanupRunner, Task as CleanupTask};
pub use self::cleanup_sst::{Runner as CleanupSSTRunner, Task as CleanupSSTTask};
pub use self::compact::{Runner as CompactRunner, Task as CompactTask};
pub use self::consistency_check::{Runner as ConsistencyCheckRunner, Task as ConsistencyCheckTask};
pub use self::pd::{Runner as PdRunner, Task as PdTask};
pub use self::pd::{FlowStatistics, FlowStatsReporter, Runner as PdRunner, Task as PdTask};
pub use self::raftlog_gc::{Runner as RaftlogGcRunner, Task as RaftlogGcTask};
pub use self::read::{LocalReader, Progress as ReadProgress, ReadDelegate};
pub use self::region::{
Expand Down
30 changes: 29 additions & 1 deletion src/raftstore/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::raftstore::store::util::KeysInfoFormatter;
use crate::raftstore::store::Callback;
use crate::raftstore::store::StoreInfo;
use crate::raftstore::store::{CasualMessage, PeerMsg, RaftCommand, RaftRouter};
use crate::storage::FlowStatistics;
use pd_client::metrics::*;
use pd_client::{ConfigClient, Error, PdClient, RegionStat};
use tikv_util::collections::HashMap;
Expand All @@ -41,6 +40,35 @@ use tikv_util::worker::{FutureRunnable as Runnable, FutureScheduler as Scheduler

type RecordPairVec = Vec<pdpb::RecordPair>;

#[derive(Default, Debug, Clone)]
pub struct FlowStatistics {
pub read_keys: usize,
pub read_bytes: usize,
}

impl FlowStatistics {
pub fn add(&mut self, other: &Self) {
self.read_bytes = self.read_bytes.saturating_add(other.read_bytes);
self.read_keys = self.read_keys.saturating_add(other.read_keys);
}
}

// Reports flow statistics to outside.
pub trait FlowStatsReporter: Send + Clone + Sync + 'static {
// Reports read flow statistics, the argument `read_stats` is a hash map
// saves the flow statistics of different region.
// TODO: maybe we need to return a Result later?
fn report_read_stats(&self, read_stats: HashMap<u64, FlowStatistics>);
}

impl FlowStatsReporter for Scheduler<Task> {
fn report_read_stats(&self, read_stats: HashMap<u64, FlowStatistics>) {
if let Err(e) = self.schedule(Task::ReadStats { read_stats }) {
error!("Failed to send read flow statistics"; "err" => ?e);
}
}
}

/// Uses an asynchronous thread to tell PD something.
pub enum Task {
AskSplit {
Expand Down
33 changes: 1 addition & 32 deletions src/storage/kv/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

use engine::{CF_DEFAULT, CF_LOCK, CF_WRITE};
use kvproto::kvrpcpb::{ScanDetail, ScanInfo};
use tikv_util::collections::HashMap;
use tikv_util::worker::FutureScheduler;

use crate::raftstore::store::PdTask;
pub use crate::raftstore::store::{FlowStatistics, FlowStatsReporter};

const STAT_TOTAL: &str = "total";
const STAT_PROCESSED: &str = "processed";
Expand All @@ -31,35 +29,6 @@ pub struct CfStatistics {
pub flow_stats: FlowStatistics,
}

#[derive(Default, Debug, Clone)]
pub struct FlowStatistics {
pub read_keys: usize,
pub read_bytes: usize,
}

// Reports flow statistics to outside.
pub trait FlowStatsReporter: Send + Clone + Sync + 'static {
// Reports read flow statistics, the argument `read_stats` is a hash map
// saves the flow statistics of different region.
// TODO: maybe we need to return a Result later?
fn report_read_stats(&self, read_stats: HashMap<u64, FlowStatistics>);
}

impl FlowStatsReporter for FutureScheduler<PdTask> {
fn report_read_stats(&self, read_stats: HashMap<u64, FlowStatistics>) {
if let Err(e) = self.schedule(PdTask::ReadStats { read_stats }) {
error!("Failed to send read flow statistics"; "err" => ?e);
}
}
}

impl FlowStatistics {
pub fn add(&mut self, other: &Self) {
self.read_bytes = self.read_bytes.saturating_add(other.read_bytes);
self.read_keys = self.read_keys.saturating_add(other.read_keys);
}
}

impl CfStatistics {
#[inline]
pub fn total_op_count(&self) -> usize {
Expand Down

0 comments on commit 6f42e56

Please sign in to comment.