diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index ad813d6b81fe..523fdf18fa7a 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -1,13 +1,14 @@ use super::{ - metrics::StaticFileProviderMetrics, LoadedJar, StaticFileJarProvider, StaticFileProviderRW, - StaticFileProviderRWRefMut, BLOCKS_PER_STATIC_FILE, + metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar, + StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut, + BLOCKS_PER_STATIC_FILE, }; use crate::{ to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, DatabaseProvider, HeaderProvider, ReceiptProvider, RequestsProvider, StageCheckpointReader, StatsReader, TransactionVariant, TransactionsProvider, TransactionsProviderExt, WithdrawalsProvider, }; -use dashmap::{mapref::entry::Entry as DashMapEntry, DashMap}; +use dashmap::DashMap; use parking_lot::RwLock; use reth_chainspec::ChainInfo; use reth_db::{ @@ -114,8 +115,8 @@ pub struct StaticFileProviderInner { /// Whether [`StaticFileJarProvider`] loads filters into memory. If not, `by_hash` queries /// won't be able to be queried directly. load_filters: bool, - /// Maintains a map of `StaticFile` writers for each [`StaticFileSegment`] - writers: DashMap, + /// Maintains a writer set of [`StaticFileSegment`]. + writers: StaticFileWriters, metrics: Option>, /// Access rights of the provider. access: StaticFileAccess, @@ -1055,17 +1056,8 @@ impl StaticFileWriter for StaticFileProvider { } trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer."); - Ok(match self.writers.entry(segment) { - DashMapEntry::Occupied(entry) => entry.into_ref(), - DashMapEntry::Vacant(entry) => { - let writer = StaticFileProviderRW::new( - segment, - block, - Arc::downgrade(&self.0), - self.metrics.clone(), - )?; - entry.insert(writer) - } + self.writers.get_or_create(segment, || { + StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone()) }) } @@ -1077,10 +1069,7 @@ impl StaticFileWriter for StaticFileProvider { } fn commit(&self) -> ProviderResult<()> { - for mut writer in self.writers.iter_mut() { - writer.commit()?; - } - Ok(()) + self.writers.commit() } fn ensure_file_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> { diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index f63912512e47..6df36ab45fdb 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -2,7 +2,7 @@ use super::{ manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider, }; use crate::providers::static_file::metrics::StaticFileProviderOperation; -use dashmap::mapref::one::RefMut; +use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock}; use reth_codecs::Compact; use reth_db_api::models::CompactU256; use reth_nippy_jar::{ConsistencyFailStrategy, NippyJar, NippyJarError, NippyJarWriter}; @@ -20,8 +20,68 @@ use std::{ }; use tracing::debug; -/// Mutable reference to a dashmap element of [`StaticFileProviderRW`]. -pub type StaticFileProviderRWRefMut<'a> = RefMut<'a, StaticFileSegment, StaticFileProviderRW>; +/// Static file writers for every known [`StaticFileSegment`]. +/// +/// WARNING: Trying to use more than one writer for the same segment type **will result in a +/// deadlock**. +#[derive(Debug, Default)] +pub(crate) struct StaticFileWriters { + headers: RwLock>, + transactions: RwLock>, + receipts: RwLock>, +} + +impl StaticFileWriters { + pub(crate) fn get_or_create( + &self, + segment: StaticFileSegment, + create_fn: impl FnOnce() -> ProviderResult, + ) -> ProviderResult> { + let mut write_guard = match segment { + StaticFileSegment::Headers => self.headers.write(), + StaticFileSegment::Transactions => self.transactions.write(), + StaticFileSegment::Receipts => self.receipts.write(), + }; + + if write_guard.is_none() { + *write_guard = Some(create_fn()?); + } + + Ok(StaticFileProviderRWRefMut(write_guard)) + } + + pub(crate) fn commit(&self) -> ProviderResult<()> { + for writer_lock in [&self.headers, &self.transactions, &self.receipts] { + let mut writer = writer_lock.write(); + if let Some(writer) = writer.as_mut() { + writer.commit()?; + } + } + Ok(()) + } +} + +/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`]. +#[derive(Debug)] +pub struct StaticFileProviderRWRefMut<'a>( + pub(crate) RwLockWriteGuard<'a, RawRwLock, Option>, +); + +impl<'a> std::ops::DerefMut for StaticFileProviderRWRefMut<'a> { + fn deref_mut(&mut self) -> &mut Self::Target { + // This is always created by [`StaticFileWriters::get_or_create`] + self.0.as_mut().expect("static file writer provider should be init") + } +} + +impl<'a> std::ops::Deref for StaticFileProviderRWRefMut<'a> { + type Target = StaticFileProviderRW; + + fn deref(&self) -> &Self::Target { + // This is always created by [`StaticFileWriters::get_or_create`] + self.0.as_ref().expect("static file writer provider should be init") + } +} #[derive(Debug)] /// Extends `StaticFileProvider` with writing capabilities diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index eb4c610e5f03..b3250bef062e 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -157,6 +157,16 @@ where debug!(target: "provider::storage_writer", block_count = %blocks.len(), "Writing blocks and execution data to storage"); + // Only write receipts to static files if there is no receipt pruning configured. + let mut state_writer = if self.database().prune_modes_ref().has_receipts_pruning() { + UnifiedStorageWriter::from_database(self.database()) + } else { + UnifiedStorageWriter::from( + self.database(), + self.static_file().get_writer(first_block.number, StaticFileSegment::Receipts)?, + ) + }; + // TODO: remove all the clones and do performant / batched writes for each type of object // instead of a loop over all blocks, // meaning: @@ -175,21 +185,6 @@ where // Write state and changesets to the database. // Must be written after blocks because of the receipt lookup. let execution_outcome = block.execution_outcome().clone(); - - // Only write receipts to static files if there is no receipt pruning configured. - let mut state_writer = if self.database().prune_modes_ref().has_receipts_pruning() { - UnifiedStorageWriter::from_database(self.database()) - } else { - // This should be inside the hotloop, because preferably there should only be one - // mutable reference to a static file writer, since there's a 3 in 100 chance that - // another segment shares the same shard as the `Receipts` one. Which would result - // in a deadlock. - UnifiedStorageWriter::from( - self.database(), - self.static_file() - .get_writer(first_block.number, StaticFileSegment::Receipts)?, - ) - }; state_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?; // insert hashes and intermediate merkle nodes