Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Use a lock instead of atomics for snapshot Progress (#11197)
Browse files Browse the repository at this point in the history
* WIP. Typos and logging.

* Format todos

* Pause pruning while a snapshot is under way
Logs, docs and todos

* Allocate memory for the full chunk

* Name snapshotting threads

* Ensure `taking_snapshot` is set to false whenever and however `take_snapshot`returns
Rename `take_at` to `request_snapshot_at`
Cleanup

* Let "in_progress" deletion fail
Fix tests

* Just use an atomic

* Review grumbles

* Finish the sentence

* Resolve a few todos and clarify comments.

* Calculate progress rate since last update

* Lockfile

* Fix tests

* typo

* Reinstate default snapshotting frequency
Cut down on the logging noise

* Use a lock instead of atomics for snapshot Progress

* Update ethcore/types/src/snapshot.rs

Co-Authored-By: Andronik Ordian <write@reusable.software>

* Avoid truncating cast
Cleanup
  • Loading branch information
dvdplm authored Oct 28, 2019
1 parent 293e06e commit 0d3423c
Show file tree
Hide file tree
Showing 18 changed files with 105 additions and 111 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions ethcore/snapshot/snapshot-tests/src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use ethereum_types::{H256, Address};
use hash_db::{HashDB, EMPTY_PREFIX};
use keccak_hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak};
use kvdb::DBValue;
use parking_lot::RwLock;
use rlp::Rlp;
use snapshot::test_helpers::{ACC_EMPTY, to_fat_rlps, from_fat_rlp};

Expand All @@ -48,7 +49,7 @@ fn encoding_basic() {

let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::new();
let p = RwLock::new(Progress::new());
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account);
Expand All @@ -69,7 +70,7 @@ fn encoding_version() {

let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::new();
let p = RwLock::new(Progress::new());
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account);
Expand All @@ -96,7 +97,7 @@ fn encoding_storage() {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);

let p = Progress::new();
let p = RwLock::new(Progress::new());

let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap();
Expand Down Expand Up @@ -124,7 +125,7 @@ fn encoding_storage_split() {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);

let p = Progress::new();
let p = RwLock::new(Progress::new());
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), 500, 1000, &p).unwrap();
let mut root = KECCAK_NULL_RLP;
let mut restored_account = None;
Expand Down Expand Up @@ -170,8 +171,8 @@ fn encoding_code() {
};

let mut used_code = HashSet::new();
let p1 = Progress::new();
let p2 = Progress::new();
let p1 = RwLock::new(Progress::new());
let p2 = RwLock::new(Progress::new());
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::from_hash(db.as_hash_db(), keccak(addr1)), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap();
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::from_hash(db.as_hash_db(), keccak(addr2)), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap();
assert_eq!(used_code.len(), 1);
Expand Down
3 changes: 2 additions & 1 deletion ethcore/snapshot/snapshot-tests/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use keccak_hash::{KECCAK_NULL_RLP};
use keccak_hasher::KeccakHasher;
use kvdb::DBValue;
use log::trace;
use parking_lot::RwLock;
use rand::Rng;
use rlp;
use snapshot::{
Expand Down Expand Up @@ -146,7 +147,7 @@ pub fn snap(client: &Client) -> (Box<dyn SnapshotReader>, TempDir) {
let tempdir = TempDir::new("").unwrap();
let path = tempdir.path().join("file");
let writer = PackedWriter::new(&path).unwrap();
let progress = Progress::new();
let progress = RwLock::new(Progress::new());

let hash = client.chain_info().best_block_hash;
client.take_snapshot(writer, BlockId::Hash(hash), &progress).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions ethcore/snapshot/snapshot-tests/src/proof_of_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use snapshot::{
io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter},
PowSnapshot,
};
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use snappy;
use keccak_hash::KECCAK_NULL_RLP;
use kvdb::DBTransaction;
Expand Down Expand Up @@ -74,7 +74,7 @@ fn chunk_and_restore(amount: u64) {
&bc,
best_hash,
&writer,
&Progress::new()
&RwLock::new(Progress::new())
).unwrap();

let manifest = ManifestData {
Expand Down
6 changes: 3 additions & 3 deletions ethcore/snapshot/snapshot-tests/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use ethcore::{
test_helpers::{new_db, new_temp_db, generate_dummy_client_with_spec_and_data, restoration_db_handler}
};

use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use ethcore_io::{IoChannel, IoService};
use kvdb_rocksdb::DatabaseConfig;
use journaldb::Algorithm;
Expand Down Expand Up @@ -278,7 +278,7 @@ fn keep_ancient_blocks() {
&bc,
best_hash,
&writer,
&Progress::new()
&RwLock::new(Progress::new())
).unwrap();
let state_db = client.state_db().journal_db().boxed_clone();
let start_header = bc.block_header_data(&best_hash).unwrap();
Expand All @@ -287,7 +287,7 @@ fn keep_ancient_blocks() {
state_db.as_hash_db(),
&state_root,
&writer,
&Progress::new(),
&RwLock::new(Progress::new()),
None,
0
).unwrap();
Expand Down
20 changes: 15 additions & 5 deletions ethcore/snapshot/snapshot-tests/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use rand_xorshift::XorShiftRng;
use ethereum_types::H256;
use journaldb::{self, Algorithm};
use kvdb_rocksdb::{Database, DatabaseConfig};
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use tempdir::TempDir;

use crate::helpers::StateProducer;
Expand All @@ -61,8 +61,9 @@ fn snap_and_restore() {
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());

let mut state_hashes = Vec::new();
let progress = RwLock::new(Progress::new());
for part in 0..SNAPSHOT_SUBPARTS {
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new(), Some(part), 0).unwrap();
let mut hashes = chunk_state(&old_db, &state_root, &writer, &progress, Some(part), 0).unwrap();
state_hashes.append(&mut hashes);
}

Expand Down Expand Up @@ -133,8 +134,16 @@ fn get_code_from_prev_chunk() {
let mut make_chunk = |acc, hash| {
let mut db = journaldb::new_memory_db();
AccountDBMut::from_hash(&mut db, hash).insert(EMPTY_PREFIX, &code[..]);
let p = Progress::new();
let fat_rlp = to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value(), &p).unwrap();
let p = RwLock::new(Progress::new());
let fat_rlp = to_fat_rlps(
&hash,
&acc,
&AccountDB::from_hash(&db, hash),
&mut used_code,
usize::max_value(),
usize::max_value(),
&p
).unwrap();
let mut stream = RlpStream::new_list(1);
stream.append_raw(&fat_rlp[0], 1);
stream.out()
Expand Down Expand Up @@ -177,8 +186,9 @@ fn checks_flag() {

let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
let progress = RwLock::new(Progress::new());

let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new(), None, 0).unwrap();
let state_hashes = chunk_state(&old_db, &state_root, &writer, &progress, None, 0).unwrap();

writer.into_inner().finish(ManifestData {
version: 2,
Expand Down
6 changes: 3 additions & 3 deletions ethcore/snapshot/src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
//! Account state encoding and decoding

use std::collections::HashSet;
use std::sync::atomic::Ordering;

use account_db::{AccountDB, AccountDBMut};
use bytes::Bytes;
Expand All @@ -31,6 +30,7 @@ use ethtrie::{TrieDB, TrieDBMut};
use hash_db::HashDB;
use keccak_hash::{KECCAK_EMPTY, KECCAK_NULL_RLP};
use log::{trace, warn};
use parking_lot::RwLock;
use rlp::{RlpStream, Rlp};
use trie_db::{Trie, TrieMut};

Expand Down Expand Up @@ -79,7 +79,7 @@ pub fn to_fat_rlps(
used_code: &mut HashSet<H256>,
first_chunk_size: usize,
max_chunk_size: usize,
p: &Progress,
p: &RwLock<Progress>,
) -> Result<Vec<Bytes>, Error> {
let db = &(acct_db as &dyn HashDB<_,_>);
let db = TrieDB::new(db, &acc.storage_root)?;
Expand Down Expand Up @@ -135,7 +135,7 @@ pub fn to_fat_rlps(
}

loop {
if p.abort.load(Ordering::SeqCst) {
if p.read().abort {
trace!(target: "snapshot", "to_fat_rlps: aborting snapshot");
return Err(Error::SnapshotAborted);
}
Expand Down
3 changes: 2 additions & 1 deletion ethcore/snapshot/src/consensus/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use ethereum_types::{H256, U256};
use itertools::{Position, Itertools};
use kvdb::KeyValueDB;
use log::trace;
use parking_lot::RwLock;
use rlp::{RlpStream, Rlp};

use crate::{SnapshotComponents, Rebuilder};
Expand All @@ -62,7 +63,7 @@ impl SnapshotComponents for PoaSnapshot {
chain: &BlockChain,
block_at: H256,
sink: &mut ChunkSink,
_progress: &Progress,
_progress: &RwLock<Progress>,
preferred_size: usize,
) -> Result<(), SnapshotError> {
let number = chain.block_number(&block_at)
Expand Down
7 changes: 4 additions & 3 deletions ethcore/snapshot/src/consensus/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use engine::Engine;
use ethereum_types::{H256, U256};
use kvdb::KeyValueDB;
use log::trace;
use parking_lot::RwLock;
use rand::rngs::OsRng;
use rlp::{RlpStream, Rlp};
use triehash::ordered_trie_root;
Expand Down Expand Up @@ -72,7 +73,7 @@ impl SnapshotComponents for PowSnapshot {
chain: &BlockChain,
block_at: H256,
chunk_sink: &mut ChunkSink,
progress: &Progress,
progress: &RwLock<Progress>,
preferred_size: usize,
) -> Result<(), SnapshotError> {
PowWorker {
Expand Down Expand Up @@ -110,7 +111,7 @@ struct PowWorker<'a> {
rlps: VecDeque<Bytes>,
current_hash: H256,
writer: &'a mut ChunkSink<'a>,
progress: &'a Progress,
progress: &'a RwLock<Progress>,
preferred_size: usize,
}

Expand Down Expand Up @@ -153,7 +154,7 @@ impl<'a> PowWorker<'a> {

last = self.current_hash;
self.current_hash = block.header_view().parent_hash();
self.progress.blocks.fetch_add(1, Ordering::SeqCst);
self.progress.write().blocks += 1;
}

if loaded_size != 0 {
Expand Down
18 changes: 9 additions & 9 deletions ethcore/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use ethtrie::{TrieDB, TrieDBMut};
use hash_db::HashDB;
use journaldb::{self, Algorithm, JournalDB};
use keccak_hasher::KeccakHasher;
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use kvdb::{KeyValueDB, DBValue};
use log::{debug, info, trace};
use num_cpus;
Expand Down Expand Up @@ -121,7 +121,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
block_hash: H256,
state_db: &dyn HashDB<KeccakHasher, DBValue>,
writer: W,
p: &Progress,
p: &RwLock<Progress>,
processing_threads: usize,
) -> Result<(), Error> {
let start_header = chain.block_header_data(&block_hash)
Expand Down Expand Up @@ -168,7 +168,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
state_hashes.extend(part_state_hashes);
}

info!("Took a snapshot at #{} of {} accounts", block_number, p.accounts());
info!("Took a snapshot at #{} of {} accounts", block_number, p.read().accounts());

Ok((state_hashes, block_hashes))
}).expect("Sub-thread never panics; qed")?;
Expand All @@ -186,7 +186,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(

writer.into_inner().finish(manifest_data)?;

p.done.store(true, Ordering::SeqCst);
p.write().done = true;

Ok(())
}
Expand All @@ -202,7 +202,7 @@ pub fn chunk_secondary<'a>(
chain: &'a BlockChain,
start_hash: H256,
writer: &Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress
progress: &'a RwLock<Progress>
) -> Result<Vec<H256>, Error> {
let mut chunk_hashes = Vec::new();
let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)];
Expand All @@ -218,7 +218,7 @@ pub fn chunk_secondary<'a>(
trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}",
hash, size, raw_data.len());

progress.update(0, size);
progress.write().update(0, size as u64);
chunk_hashes.push(hash);
Ok(())
};
Expand All @@ -242,7 +242,7 @@ struct StateChunker<'a> {
cur_size: usize,
snappy_buffer: Vec<u8>,
writer: &'a Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress,
progress: &'a RwLock<Progress>,
thread_idx: usize,
}

Expand Down Expand Up @@ -275,7 +275,7 @@ impl<'a> StateChunker<'a> {
self.writer.lock().write_state_chunk(hash, compressed)?;
trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len());

self.progress.update(num_entries, compressed_size);
self.progress.write().update(num_entries as u64, compressed_size as u64);

self.hashes.push(hash);
self.cur_size = 0;
Expand All @@ -300,7 +300,7 @@ pub fn chunk_state<'a>(
db: &dyn HashDB<KeccakHasher, DBValue>,
root: &H256,
writer: &Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress,
progress: &'a RwLock<Progress>,
part: Option<usize>,
thread_idx: usize,
) -> Result<Vec<H256>, Error> {
Expand Down
12 changes: 6 additions & 6 deletions ethcore/snapshot/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub struct Service<C: Send + Sync + 'static> {
state_chunks: AtomicUsize,
block_chunks: AtomicUsize,
client: Arc<C>,
progress: Progress,
progress: RwLock<Progress>,
taking_snapshot: AtomicBool,
restoring_snapshot: AtomicBool,
}
Expand All @@ -280,7 +280,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
state_chunks: AtomicUsize::new(0),
block_chunks: AtomicUsize::new(0),
client: params.client,
progress: Progress::new(),
progress: RwLock::new(Progress::new()),
taking_snapshot: AtomicBool::new(false),
restoring_snapshot: AtomicBool::new(false),
};
Expand Down Expand Up @@ -483,9 +483,9 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
/// Tick the snapshot service. This will log any active snapshot
/// being taken.
pub fn tick(&self) {
if self.progress.done() || !self.taking_snapshot.load(Ordering::SeqCst) { return }
if self.progress.read().done() || !self.taking_snapshot.load(Ordering::SeqCst) { return }

let p = &self.progress;
let p = &self.progress.read();
info!("Snapshot: {} accounts, {} blocks, {} bytes", p.accounts(), p.blocks(), p.bytes());
let rate = p.rate();
debug!(target: "snapshot", "Current progress rate: {:.0} acc/s, {:.0} bytes/s (compressed)", rate.0, rate.1);
Expand All @@ -507,7 +507,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
self.taking_snapshot.store(false, Ordering::SeqCst);
}}
let start_time = std::time::Instant::now();
self.progress.reset();
*self.progress.write() = Progress::new();

let temp_dir = self.temp_snapshot_dir();
let snapshot_dir = self.snapshot_dir();
Expand Down Expand Up @@ -893,7 +893,7 @@ impl<C: Send + Sync> SnapshotService for Service<C> {
fn abort_snapshot(&self) {
if self.taking_snapshot.load(Ordering::SeqCst) {
trace!(target: "snapshot", "Aborting snapshot – Snapshot under way");
self.progress.abort.store(true, Ordering::SeqCst);
self.progress.write().abort = true;
}
}

Expand Down
Loading

0 comments on commit 0d3423c

Please sign in to comment.