Skip to content

Commit

Permalink
feat(storage): custom tables
Browse files Browse the repository at this point in the history
  • Loading branch information
Vid201 committed Apr 25, 2024
1 parent d312dbb commit 920c123
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 60 deletions.
2 changes: 1 addition & 1 deletion bin/reth/src/commands/db/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ where
T::Key: Hash,
T::Value: PartialEq,
{
let table = T::TABLE;
let table = T::NAME;

info!("Analyzing table {table}...");
let result = find_diffs_advanced::<T>(&primary_tx, &secondary_tx)?;
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/commands/db/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Command {
]);

tool.provider_factory.db_ref().view(|tx| {
let mut db_tables = Tables::ALL.iter().map(|table| table.name()).collect::<Vec<_>>();
let mut db_tables = Tables::ALL.iter().map(|table| table.name()).collect::<Vec<_>>(); // TODO: custom tables
db_tables.sort();
let mut total_size = 0;
for db_table in db_tables {
Expand Down Expand Up @@ -318,7 +318,7 @@ impl Command {
table.load_preset(comfy_table::presets::ASCII_MARKDOWN);
table.set_header(vec![Cell::new("Table"), Cell::new("Checksum"), Cell::new("Elapsed")]);

let db_tables = Tables::ALL;
let db_tables = Tables::ALL; // TODO: custom tables
let mut total_elapsed = Duration::default();

for db_table in db_tables {
Expand Down
6 changes: 4 additions & 2 deletions bin/reth/src/commands/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use clap::{value_parser, Args, Parser};
use reth_cli_runner::CliContext;
use reth_db::{init_db, DatabaseEnv};
use reth_db::{init_db, DatabaseEnv, Tables};
use reth_node_builder::{NodeBuilder, WithLaunchContext};
use reth_node_core::{node_config::NodeConfig, version};
use reth_primitives::ChainSpec;
Expand Down Expand Up @@ -183,7 +183,9 @@ impl<Ext: clap::Args + fmt::Debug> NodeCommand<Ext> {
let db_path = data_dir.db_path();

tracing::info!(target: "reth::cli", path = ?db_path, "Opening database");
let database = Arc::new(init_db(db_path.clone(), self.db.database_args())?.with_metrics());
let tables = Tables::ALL.iter().map(|table| table.name()).collect::<Vec<_>>();
let database =
Arc::new(init_db(db_path.clone(), self.db.database_args())?.with_metrics(tables));

if with_unused_ports {
node_config = node_config.with_unused_ports();
Expand Down
11 changes: 7 additions & 4 deletions crates/storage/db/src/abstraction/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,14 @@ impl<T> Value for T where T: Compress + Decompress + Serialize {}
/// It allows for the use of codecs. See [`crate::models::ShardedKey`] for a custom
/// implementation.
pub trait Table: Send + Sync + Debug + 'static {
/// The dynamic type of the table.
const TABLE: crate::Tables;

/// The table's name.
const NAME: &'static str = Self::TABLE.name();
const NAME: &'static str;

/// Index of the table.
const INDEX: usize;

/// If the table allows for duplicate keys.
const DUPSORT: bool;

/// Key element of `Table`.
///
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/db/src/implementation/mdbx/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
f: impl FnOnce(&mut Self) -> R,
) -> R {
if let Some(metrics) = self.metrics.as_ref().cloned() {
metrics.record_operation(T::TABLE, operation, value_size, || f(self))
metrics.record_operation(T::NAME, operation, value_size, || f(self))
} else {
f(self)
}
Expand Down
68 changes: 57 additions & 11 deletions crates/storage/db/src/implementation/mdbx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ const DEFAULT_MAX_READERS: u64 = 32_000;
#[cfg(not(windows))]
const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;

/// Maximum number of allowed databases/tables.
const MAX_DBS: usize = 256;

/// Environment used when opening a MDBX environment. RO/RW.
#[derive(Debug)]
pub enum DatabaseEnvKind {
Expand All @@ -58,7 +61,7 @@ impl DatabaseEnvKind {
}

/// Arguments for database initialization.
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug)]
pub struct DatabaseArguments {
/// Client version that accesses the database.
client_version: ClientVersion,
Expand Down Expand Up @@ -87,6 +90,26 @@ pub struct DatabaseArguments {
///
/// This flag affects only at environment opening but can't be changed after.
exclusive: Option<bool>,
/// Creates default tables.
///
/// If 'true', the default tables are created when the database is opened. If [None], the
/// default value is used.
///
/// If 'false', the default tables are not created when the database is opened.
/// This is useful when using the database only for custom tables.
default_tables: Option<bool>,
}

impl Default for DatabaseArguments {
fn default() -> Self {
Self {
client_version: ClientVersion::default(),
log_level: None,
max_read_transaction_duration: None,
exclusive: None,
default_tables: Some(true),
}
}
}

impl DatabaseArguments {
Expand All @@ -97,6 +120,7 @@ impl DatabaseArguments {
log_level: None,
max_read_transaction_duration: None,
exclusive: None,
default_tables: Some(true),
}
}

Expand All @@ -121,10 +145,21 @@ impl DatabaseArguments {
self
}

/// Set the default tables flag.
pub fn with_default_tables(mut self, default_tables: Option<bool>) -> Self {
self.default_tables = default_tables;
self
}

/// Returns the client version if any.
pub fn client_version(&self) -> &ClientVersion {
&self.client_version
}

/// Returns the default tables flag if any.
pub fn default_tables(&self) -> Option<bool> {
self.default_tables
}
}

/// Wrapper for the libmdbx environment: [Environment]
Expand Down Expand Up @@ -169,6 +204,7 @@ impl DatabaseMetrics for DatabaseEnv {

let _ = self
.view(|tx| {
// TODO: custom tables
for table in Tables::ALL.iter().map(Tables::name) {
let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;

Expand Down Expand Up @@ -245,7 +281,7 @@ impl DatabaseMetadata for DatabaseEnv {
impl DatabaseEnv {
/// Opens the database at the specified path with the given `EnvKind`.
///
/// It does not create the tables, for that call [`DatabaseEnv::create_tables`].
/// It does not create the tables, for that call [`DatabaseEnv::create_default_tables`].
pub fn open(
path: &Path,
kind: DatabaseEnvKind,
Expand All @@ -264,8 +300,7 @@ impl DatabaseEnv {

// Note: We set max dbs to 256 here to allow for custom tables. This needs to be set on
// environment creation.
debug_assert!(Tables::ALL.len() <= 256, "number of tables exceed max dbs");
inner_env.set_max_dbs(256);
inner_env.set_max_dbs(MAX_DBS);
inner_env.set_geometry(Geometry {
// Maximum database size of 4 terabytes
size: Some(0..(4 * TERABYTE)),
Expand Down Expand Up @@ -371,7 +406,7 @@ impl DatabaseEnv {
LogLevel::Extra => 7,
});
} else {
return Err(DatabaseError::LogLevelUnavailable(log_level))
return Err(DatabaseError::LogLevelUnavailable(log_level));
}
}

Expand All @@ -388,13 +423,24 @@ impl DatabaseEnv {
}

/// Enables metrics on the database.
pub fn with_metrics(mut self) -> Self {
self.metrics = Some(DatabaseEnvMetrics::new().into());
pub fn with_metrics(mut self, tables: Vec<&'static str>) -> Self {
self.metrics = Some(DatabaseEnvMetrics::new(tables).into());
self
}

/// Creates all the defined tables, if necessary.
pub fn create_tables(&self) -> Result<(), DatabaseError> {
/// Creates a table.
pub fn create_table(&self, name: &'static str, dupsort: bool) -> Result<(), DatabaseError> {
let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;

let flags = if dupsort { DatabaseFlags::DUP_SORT } else { DatabaseFlags::default() };
tx.create_db(Some(name), flags).map_err(|e| DatabaseError::CreateTable(e.into()))?;
tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;

Ok(())
}

/// Creates all the defined default tables, if necessary.
pub fn create_default_tables(&self) -> Result<(), DatabaseError> {
let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;

for table in Tables::ALL {
Expand All @@ -415,7 +461,7 @@ impl DatabaseEnv {
/// Records version that accesses the database with write privileges.
pub fn record_client_version(&self, version: ClientVersion) -> Result<(), DatabaseError> {
if version.is_empty() {
return Ok(())
return Ok(());
}

let tx = self.tx_mut()?;
Expand Down Expand Up @@ -473,7 +519,7 @@ mod tests {
fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
let env = DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
.expect(ERROR_DB_CREATION);
env.create_tables().expect(ERROR_TABLE_CREATION);
env.create_default_tables().expect(ERROR_TABLE_CREATION);
env
}

Expand Down
24 changes: 14 additions & 10 deletions crates/storage/db/src/implementation/mdbx/tx.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! Transaction wrapper for libmdbx-sys.
use super::cursor::Cursor;
use super::{cursor::Cursor, MAX_DBS};
use crate::{
metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
table::{Compress, DupSort, Encode, Table, TableImporter},
tables::{utils::decode_one, Tables},
tables::utils::decode_one,
transaction::{DbTx, DbTxMut},
DatabaseError,
};
Expand Down Expand Up @@ -39,7 +39,7 @@ pub struct Tx<K: TransactionKind> {

/// Database table handle cache.
// TODO: Use `std::sync::OnceLock` once `get_or_try_init` is stable.
db_handles: [OnceCell<DBI>; Tables::COUNT],
db_handles: [OnceCell<DBI>; MAX_DBS],
}

impl<K: TransactionKind> Tx<K> {
Expand Down Expand Up @@ -75,7 +75,7 @@ impl<K: TransactionKind> Tx<K> {
#[allow(clippy::declare_interior_mutable_const)]
const ONCECELL_DBI_NEW: OnceCell<DBI> = OnceCell::new();
#[allow(clippy::declare_interior_mutable_const)]
const DB_HANDLES: [OnceCell<DBI>; Tables::COUNT] = [ONCECELL_DBI_NEW; Tables::COUNT];
const DB_HANDLES: [OnceCell<DBI>; MAX_DBS] = [ONCECELL_DBI_NEW; MAX_DBS];
Self { inner, db_handles: DB_HANDLES, metrics_handler }
}

Expand All @@ -86,7 +86,7 @@ impl<K: TransactionKind> Tx<K> {

/// Gets a table database handle if it exists, otherwise creates it.
pub fn get_dbi<T: Table>(&self) -> Result<DBI, DatabaseError> {
self.db_handles[T::TABLE as usize]
self.db_handles[T::INDEX]
.get_or_try_init(|| {
self.inner
.open_db(Some(T::NAME))
Expand Down Expand Up @@ -170,7 +170,7 @@ impl<K: TransactionKind> Tx<K> {
metrics_handler.log_backtrace_on_long_read_transaction();
metrics_handler
.env_metrics
.record_operation(T::TABLE, operation, value_size, || f(&self.inner))
.record_operation(T::NAME, operation, value_size, || f(&self.inner))
} else {
f(&self.inner)
}
Expand Down Expand Up @@ -239,7 +239,7 @@ impl<K: TransactionKind> MetricsHandler<K> {
/// NOTE: Backtrace is recorded using [Backtrace::force_capture], so `RUST_BACKTRACE` env var is
/// not needed.
fn log_backtrace_on_long_read_transaction(&self) {
if self.record_backtrace &&
if self.record_backtrace &&
!self.backtrace_recorded.load(Ordering::Relaxed) &&
self.transaction_mode().is_read_only()
{
Expand Down Expand Up @@ -393,7 +393,7 @@ impl DbTxMut for Tx<RW> {
mod tests {
use crate::{
database::Database, mdbx::DatabaseArguments, models::client_version::ClientVersion, tables,
transaction::DbTx, DatabaseEnv, DatabaseEnvKind,
transaction::DbTx, DatabaseEnv, DatabaseEnvKind, Tables,
};
use reth_interfaces::db::DatabaseError;
use reth_libmdbx::MaxReadTransactionDuration;
Expand All @@ -409,7 +409,9 @@ mod tests {
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
MAX_DURATION,
)));
let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
let tables = Tables::ALL.iter().map(|table| table.name()).collect::<Vec<_>>();
let db =
DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics(tables);

let mut tx = db.tx().unwrap();
tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
Expand All @@ -435,7 +437,9 @@ mod tests {
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
MAX_DURATION,
)));
let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
let tables = Tables::ALL.iter().map(|table| table.name()).collect::<Vec<_>>();
let db =
DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics(tables);

let mut tx = db.tx().unwrap();
tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
Expand Down
5 changes: 4 additions & 1 deletion crates/storage/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,11 @@ pub fn init_db<P: AsRef<Path>>(path: P, args: DatabaseArguments) -> eyre::Result
#[cfg(feature = "mdbx")]
{
let client_version = args.client_version().clone();
let default_tables = args.default_tables().unwrap_or(true);
let db = create_db(path, args)?;
db.create_tables()?;
if default_tables {
db.create_default_tables()?;
}
db.record_client_version(client_version)?;
Ok(db)
}
Expand Down
Loading

0 comments on commit 920c123

Please sign in to comment.