Skip to content

Commit

Permalink
add db_speed metric
Browse files Browse the repository at this point in the history
  • Loading branch information
xudaquan2003 committed Nov 24, 2023
1 parent 1743da4 commit 09b5ddc
Show file tree
Hide file tree
Showing 22 changed files with 244 additions and 682 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions bin/reth/src/performance_metrics/dashboard_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ pub(crate) struct DBSpeedDisplayer {
#[cfg(feature = "enable_db_speed_record")]
impl DBSpeedDisplayer {
pub(crate) fn update_db_speed_record(&mut self, record: DbSpeedRecord) {
self.db_speed_record = record;
self.db_speed_record.add(record);
}

pub(crate) fn print(&self) {
Expand Down Expand Up @@ -390,16 +390,16 @@ impl CacheDBRecordDisplayer {
self.cache_db_record.total_in_block_hash() as f64
}

fn misses_in_load_account_pencentage(&self) -> f64 {
self.cache_db_record.misses.load_account as f64 /
self.cache_db_record.total_in_load_account() as f64
}

fn total_misses_pencentage(&self) -> f64 {
self.cache_db_record.total_miss() as f64 /
(self.cache_db_record.total_hits() + self.cache_db_record.total_miss()) as f64
}

fn misses_in_load_account_pencentage(&self) -> f64 {
self.cache_db_record.misses.load_account as f64 /
self.cache_db_record.total_in_load_account() as f64
}

pub(crate) fn print(&self) {
println!("===============================Metric of CacheDb========================================================");
println!("===============================Hit in CacheDb===========================================================");
Expand Down
96 changes: 30 additions & 66 deletions crates/stages/src/metrics/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,15 @@ impl ExecutionDurationRecord {
#[derive(Debug, Clone, Copy)]
pub struct DbSpeedRecord {
/// time of read header td from db
pub read_header_td_db_time: (u64, Duration),
pub read_header_td_db_time: Duration,
/// data size of read header td from db
pub read_header_td_db_size: u64,
/// time of read block with senders from db
pub read_block_with_senders_db_time: (u64, Duration),
pub read_block_with_senders_db_time: Duration,
/// data size of read block with senders from db
pub read_block_with_senders_db_size: u64,
/// time of write to db
pub write_to_db_time: (u64, Duration),
pub write_to_db_time: Duration,
/// data size of write to db
pub write_to_db_size: u64,
}
Expand All @@ -195,98 +195,62 @@ pub struct DbSpeedRecord {
impl Default for DbSpeedRecord {
fn default() -> Self {
Self {
read_header_td_db_time: (0, Duration::default()),
read_header_td_db_time: Duration::default(),
read_header_td_db_size: 0,
read_block_with_senders_db_time: (0, Duration::default()),
read_block_with_senders_db_time: Duration::default(),
read_block_with_senders_db_size: 0,
write_to_db_time: (0, Duration::default()),
write_to_db_time: Duration::default(),
write_to_db_size: 0,
}
}
}

#[cfg(feature = "enable_db_speed_record")]
impl DbSpeedRecord {
/// add time of write to db
pub(crate) fn add_read_header_td_db_time(&mut self, add_time: Duration, get_time_count: u64) {
self.read_header_td_db_time.0 =
self.read_header_td_db_time.0.checked_add(get_time_count).expect("overflow");
self.read_header_td_db_time.1 =
self.read_header_td_db_time.1.checked_add(add_time).expect("overflow");
}

/// add time of write to db
pub(crate) fn add_read_header_td_db_size(&mut self, add_size: u64) {
/// add record of read_header_td_db
pub(crate) fn add_read_header_td_db_record(&mut self, size: u64, time: Duration) {
self.read_header_td_db_size =
self.read_header_td_db_size.checked_add(add_size).expect("overflow");
self.read_header_td_db_size.checked_add(size).expect("overflow");
self.read_header_td_db_time =
self.read_header_td_db_time.checked_add(time).expect("overflow");
}

/// add time of write to db
pub(crate) fn add_read_block_with_senders_db_time(
&mut self,
add_time: Duration,
get_time_count: u64,
) {
self.read_block_with_senders_db_time.0 =
self.read_block_with_senders_db_time.0.checked_add(get_time_count).expect("overflow");
self.read_block_with_senders_db_time.1 =
self.read_block_with_senders_db_time.1.checked_add(add_time).expect("overflow");
}

/// add time of write to db
pub(crate) fn add_read_block_with_senders_db_size(&mut self, add_size: u64) {
pub(crate) fn add_read_block_with_senders_db_record(&mut self, size: u64, time: Duration) {
self.read_block_with_senders_db_size =
self.read_block_with_senders_db_size.checked_add(add_size).expect("overflow");
}

/// add time of write to db
pub(crate) fn add_write_to_db_time(&mut self, add_time: Duration, get_time_count: u64) {
self.write_to_db_time.0 =
self.write_to_db_time.0.checked_add(get_time_count).expect("overflow");
self.write_to_db_time.1 = self.write_to_db_time.1.checked_add(add_time).expect("overflow");
self.read_block_with_senders_db_size.checked_add(size).expect("overflow");
self.read_block_with_senders_db_time =
self.read_block_with_senders_db_time.checked_add(time).expect("overflow");
}

/// add time of write to db
pub(crate) fn add_write_to_db_size(&mut self, add_size: u64) {
self.write_to_db_size = self.write_to_db_size.checked_add(add_size).expect("overflow");
/// add record of write to db
pub(crate) fn add_write_to_db_record(&mut self, size: u64, time: Duration) {
self.write_to_db_size = self.write_to_db_size.checked_add(size).expect("overflow");
self.write_to_db_time = self.write_to_db_time.checked_add(time).expect("overflow");
}

/// add
pub fn add(&mut self, other: Self) {
self.read_header_td_db_time = (
self.read_header_td_db_time
.0
.checked_add(other.read_header_td_db_time.0)
.expect("overflow"),
self.read_header_td_db_time =
self.read_header_td_db_time
.1
.checked_add(other.read_header_td_db_time.1)
.expect("overflow"),
);
.checked_add(other.read_header_td_db_time)
.expect("overflow");
self.read_header_td_db_size = self
.read_header_td_db_size
.checked_add(other.read_header_td_db_size)
.expect("overflow");

self.read_block_with_senders_db_time = (
self.read_block_with_senders_db_time
.0
.checked_add(other.read_block_with_senders_db_time.0)
.expect("overflow"),
self.read_block_with_senders_db_time =
self.read_block_with_senders_db_time
.1
.checked_add(other.read_block_with_senders_db_time.1)
.expect("overflow"),
);
.checked_add(other.read_block_with_senders_db_time)
.expect("overflow");
self.read_block_with_senders_db_size = self
.read_block_with_senders_db_size
.checked_add(other.read_block_with_senders_db_size)
.expect("overflow");

self.write_to_db_time = (
self.write_to_db_time.0.checked_add(other.write_to_db_time.0).expect("overflow"),
self.write_to_db_time.1.checked_add(other.write_to_db_time.1).expect("overflow"),
);
self.write_to_db_time =
self.write_to_db_time.checked_add(other.write_to_db_time).expect("overflow");
self.write_to_db_size =
self.write_to_db_size.checked_add(other.write_to_db_size).expect("overflow");
}
Expand All @@ -302,17 +266,17 @@ impl DbSpeedRecord {

let col_len = 15;

let read_header_td_time = self.read_header_td_db_time.1.as_secs_f64();
let read_header_td_time = self.read_header_td_db_time.as_secs_f64();
let read_header_td_size = self.cover_size_bytes_to_m(self.read_header_td_db_size);
let read_header_td_rate = read_header_td_size / read_header_td_time;

let read_block_with_senders_time = self.read_block_with_senders_db_time.1.as_secs_f64();
let read_block_with_senders_time = self.read_block_with_senders_db_time.as_secs_f64();
let read_block_with_senders_size =
self.cover_size_bytes_to_m(self.read_block_with_senders_db_size);
let read_block_with_senders_rate =
read_block_with_senders_size / read_block_with_senders_time;

let write_to_db_time = self.write_to_db_time.1.as_secs_f64();
let write_to_db_time = self.write_to_db_time.as_secs_f64();
let write_to_db_size = self.cover_size_bytes_to_m(self.write_to_db_size);
let write_to_db_rate = write_to_db_size / write_to_db_time;

Expand Down
45 changes: 29 additions & 16 deletions crates/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ use reth_provider::{
use std::{ops::RangeInclusive, time::Instant};
use tracing::*;

#[cfg(feature = "enable_db_speed_record")]
use reth_db::metric::*;

/// The execution stage executes all transactions and
/// update history indexes.
///
Expand Down Expand Up @@ -154,21 +157,33 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {

#[cfg(feature = "enable_db_speed_record")]
let (td, block) = {
let (option_td, db_size, db_time, get_time_count) =
provider.header_td_by_number_with_db_info(block_number)?;
let td =
option_td.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
#[cfg(feature = "enable_db_speed_record")]
start_reth_db_record();

let td = provider
.header_td_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

#[cfg(feature = "enable_db_speed_record")]
{
let (read_size, read_time, _, _) = get_reth_db_record();
db_speed_record.add_read_header_td_db_record(read_size as u64, read_time);

db_speed_record.add_read_header_td_db_time(db_time, get_time_count);
db_speed_record.add_read_header_td_db_size(db_size);
start_reth_db_record();
}

let (option_block, db_size, db_time, get_time_count) =
provider.block_with_senders_with_db_info(block_number)?;
let block = option_block
let block = provider
.block_with_senders(block_number)?
.ok_or_else(|| ProviderError::BlockNotFound(block_number.into()))?;

db_speed_record.add_read_block_with_senders_db_size(db_size);
db_speed_record.add_read_block_with_senders_db_time(db_time, get_time_count);
// db_speed_record.add_read_block_with_senders_db_time(db_time, get_time_count);

#[cfg(feature = "enable_db_speed_record")]
{
let (read_size, read_time, _, _) = get_reth_db_record();
db_speed_record
.add_read_block_with_senders_db_record(read_size as u64, read_time);
}

(td, block)
};
Expand Down Expand Up @@ -279,10 +294,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
let start = Instant::now();

#[cfg(feature = "enable_db_speed_record")]
let write_to_db_time = {
db_speed_record.add_write_to_db_size(u64::try_from(state.size()).unwrap());
Instant::now()
};
start_reth_db_record();

#[cfg(feature = "enable_execution_duration_record")]
duration_record.start_time_recorder();
Expand All @@ -294,7 +306,8 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {

#[cfg(feature = "enable_db_speed_record")]
{
db_speed_record.add_write_to_db_time(write_to_db_time.elapsed(), 1);
let (_, _, write_size, write_time) = get_reth_db_record();
db_speed_record.add_write_to_db_record(write_size as u64, write_time);

if let Some(metrics_tx) = &mut self.metrics_tx {
let _ = metrics_tx.send(MetricEvent::DBSpeedInfo { db_speed_record });
Expand Down
2 changes: 2 additions & 0 deletions crates/storage/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ arbitrary = [
"dep:proptest-derive",
]

enable_db_speed_record =["reth-libmdbx/enable_db_speed_record"]

[[bench]]
name = "hash_keys"
harness = false
Expand Down
8 changes: 8 additions & 0 deletions crates/storage/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@
pub mod abstraction;

mod implementation;

#[cfg(feature = "enable_db_speed_record")]
/// for metric
pub mod metric;

pub mod tables;
mod utils;
pub mod version;
Expand All @@ -88,6 +93,9 @@ pub use reth_interfaces::db::{DatabaseError, DatabaseWriteOperation};
pub use tables::*;
pub use utils::is_database_empty;

#[cfg(feature = "enable_db_speed_record")]
pub use metric::*;

#[cfg(feature = "mdbx")]
use mdbx::{Env, EnvKind, NoWriteMap, WriteMap};

Expand Down
12 changes: 12 additions & 0 deletions crates/storage/db/src/metric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use reth_libmdbx::metric::*;
use std::time::Duration;

/// start db record
pub fn start_reth_db_record() {
start_db_record();
}

/// get reth_db record (read_size, read_time, write_size, write_time)
pub fn get_reth_db_record() -> (usize, Duration, usize, Duration) {
get_db_record()
}
2 changes: 2 additions & 0 deletions crates/storage/libmdbx-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ lifetimed-bytes = { version = "0.1", optional = true }
default = []
return-borrowed = []

enable_db_speed_record =[]

[dev-dependencies]
pprof = { version = "0.12", features = ["flamegraph", "frame-pointer", "criterion"] }
criterion = "0.5"
Expand Down
12 changes: 12 additions & 0 deletions crates/storage/libmdbx-rs/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use libc::c_void;
use parking_lot::Mutex;
use std::{borrow::Cow, fmt, marker::PhantomData, mem, ptr, rc::Rc, result};

#[cfg(feature = "enable_db_speed_record")]
use crate::metric::*;

/// A cursor for navigating the items within a database.
pub struct Cursor<'txn, K>
where
Expand Down Expand Up @@ -82,6 +85,13 @@ where
let key_ptr = key_val.iov_base;
let data_ptr = data_val.iov_base;
txn_execute(&self.txn, |txn| {
#[cfg(feature = "enable_db_speed_record")]
let _record = ReadRecord::new(
key_ptr,
&key_val as *const ffi::MDBX_val,
&data_val as *const ffi::MDBX_val,
);

let v = mdbx_result(ffi::mdbx_cursor_get(
self.cursor,
&mut key_val,
Expand Down Expand Up @@ -432,6 +442,8 @@ impl<'txn> Cursor<'txn, RW> {
ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
mdbx_result(unsafe {
txn_execute(&self.txn, |_| {
#[cfg(feature = "enable_db_speed_record")]
let _record = WriteRecord::new(data_val.iov_len);
ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits())
})
})?;
Expand Down
4 changes: 4 additions & 0 deletions crates/storage/libmdbx-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ pub mod ffi {
pub use ffi::{MDBX_dbi as DBI, MDBX_log_level_t as LogLevel};
}

#[cfg(feature = "enable_db_speed_record")]
/// for metric
pub mod metric;

mod codec;
mod cursor;
mod database;
Expand Down
Loading

0 comments on commit 09b5ddc

Please sign in to comment.