Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: swap Dashmap for StaticFileWriters on StaticFileProvider #10089

Merged
merged 7 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 9 additions & 20 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use super::{
metrics::StaticFileProviderMetrics, LoadedJar, StaticFileJarProvider, StaticFileProviderRW,
StaticFileProviderRWRefMut, BLOCKS_PER_STATIC_FILE,
metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
BLOCKS_PER_STATIC_FILE,
};
use crate::{
to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, DatabaseProvider,
HeaderProvider, ReceiptProvider, RequestsProvider, StageCheckpointReader, StatsReader,
TransactionVariant, TransactionsProvider, TransactionsProviderExt, WithdrawalsProvider,
};
use dashmap::{mapref::entry::Entry as DashMapEntry, DashMap};
use dashmap::DashMap;
use parking_lot::RwLock;
use reth_chainspec::ChainInfo;
use reth_db::{
Expand Down Expand Up @@ -114,8 +115,8 @@ pub struct StaticFileProviderInner {
/// Whether [`StaticFileJarProvider`] loads filters into memory. If not, `by_hash` queries
/// won't be able to be queried directly.
load_filters: bool,
/// Maintains a map of `StaticFile` writers for each [`StaticFileSegment`]
writers: DashMap<StaticFileSegment, StaticFileProviderRW>,
/// Maintains a writer set of [`StaticFileSegment`].
writers: StaticFileWriters,
metrics: Option<Arc<StaticFileProviderMetrics>>,
/// Access rights of the provider.
access: StaticFileAccess,
Expand Down Expand Up @@ -1055,17 +1056,8 @@ impl StaticFileWriter for StaticFileProvider {
}

trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
Ok(match self.writers.entry(segment) {
DashMapEntry::Occupied(entry) => entry.into_ref(),
DashMapEntry::Vacant(entry) => {
let writer = StaticFileProviderRW::new(
segment,
block,
Arc::downgrade(&self.0),
self.metrics.clone(),
)?;
entry.insert(writer)
}
self.writers.get_or_create(segment, || {
StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
})
}

Expand All @@ -1077,10 +1069,7 @@ impl StaticFileWriter for StaticFileProvider {
}

fn commit(&self) -> ProviderResult<()> {
for mut writer in self.writers.iter_mut() {
writer.commit()?;
}
Ok(())
self.writers.commit()
}

fn ensure_file_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
Expand Down
66 changes: 63 additions & 3 deletions crates/storage/provider/src/providers/static_file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{
manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
};
use crate::providers::static_file::metrics::StaticFileProviderOperation;
use dashmap::mapref::one::RefMut;
use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
use reth_codecs::Compact;
use reth_db_api::models::CompactU256;
use reth_nippy_jar::{ConsistencyFailStrategy, NippyJar, NippyJarError, NippyJarWriter};
Expand All @@ -20,8 +20,68 @@ use std::{
};
use tracing::debug;

/// Mutable reference to a dashmap element of [`StaticFileProviderRW`].
pub type StaticFileProviderRWRefMut<'a> = RefMut<'a, StaticFileSegment, StaticFileProviderRW>;
/// Static file writers for every known [`StaticFileSegment`].
///
/// WARNING: Trying to use more than one writer for the same segment type **will result in a
/// deadlock**.
#[derive(Debug, Default)]
pub(crate) struct StaticFileWriters {
headers: RwLock<Option<StaticFileProviderRW>>,
transactions: RwLock<Option<StaticFileProviderRW>>,
receipts: RwLock<Option<StaticFileProviderRW>>,
}

impl StaticFileWriters {
pub(crate) fn get_or_create(
&self,
segment: StaticFileSegment,
create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW>,
) -> ProviderResult<StaticFileProviderRWRefMut<'_>> {
let mut write_guard = match segment {
StaticFileSegment::Headers => self.headers.write(),
StaticFileSegment::Transactions => self.transactions.write(),
StaticFileSegment::Receipts => self.receipts.write(),
};

if write_guard.is_none() {
*write_guard = Some(create_fn()?);
}

Ok(StaticFileProviderRWRefMut(write_guard))
}

pub(crate) fn commit(&self) -> ProviderResult<()> {
for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
let mut writer = writer_lock.write();
if let Some(writer) = writer.as_mut() {
writer.commit()?;
}
}
Ok(())
}
}

/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
#[derive(Debug)]
pub struct StaticFileProviderRWRefMut<'a>(
pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW>>,
);

impl<'a> std::ops::DerefMut for StaticFileProviderRWRefMut<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
// This is always created by [`StaticFileWriters::get_or_create`]
self.0.as_mut().expect("static file writer provider should be init")
}
}

impl<'a> std::ops::Deref for StaticFileProviderRWRefMut<'a> {
type Target = StaticFileProviderRW;

fn deref(&self) -> &Self::Target {
// This is always created by [`StaticFileWriters::get_or_create`]
self.0.as_ref().expect("static file writer provider should be init")
}
}

#[derive(Debug)]
/// Extends `StaticFileProvider` with writing capabilities
Expand Down
25 changes: 10 additions & 15 deletions crates/storage/provider/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ where

debug!(target: "provider::storage_writer", block_count = %blocks.len(), "Writing blocks and execution data to storage");

// Only write receipts to static files if there is no receipt pruning configured.
let mut state_writer = if self.database().prune_modes_ref().has_receipts_pruning() {
UnifiedStorageWriter::from_database(self.database())
} else {
UnifiedStorageWriter::from(
self.database(),
self.static_file().get_writer(first_block.number, StaticFileSegment::Receipts)?,
)
};

// TODO: remove all the clones and do performant / batched writes for each type of object
// instead of a loop over all blocks,
// meaning:
Expand All @@ -175,21 +185,6 @@ where
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
let execution_outcome = block.execution_outcome().clone();

// Only write receipts to static files if there is no receipt pruning configured.
let mut state_writer = if self.database().prune_modes_ref().has_receipts_pruning() {
UnifiedStorageWriter::from_database(self.database())
} else {
// This should be inside the hotloop, because preferably there should only be one
// mutable reference to a static file writer, since there's a 3 in 100 chance that
// another segment shares the same shard as the `Receipts` one. Which would result
// in a deadlock.
UnifiedStorageWriter::from(
self.database(),
self.static_file()
.get_writer(first_block.number, StaticFileSegment::Receipts)?,
)
};
state_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;

// insert hashes and intermediate merkle nodes
Expand Down
Loading