Skip to content

Commit

Permalink
Fix race in remove_unrooted_race and flush_slot_cache (#18785)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin authored Jul 22, 2021
1 parent 0682d2d commit 5cabb5b
Showing 1 changed file with 124 additions and 36 deletions.
160 changes: 124 additions & 36 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3691,6 +3691,7 @@ impl AccountsDb {

{
// Slots that are currently being flushed by flush_slot_cache()

let mut currently_contended_slots = slots_under_contention.lock().unwrap();

// Slots that are currently being flushed by flush_slot_cache() AND
Expand Down Expand Up @@ -4229,42 +4230,53 @@ impl AccountsDb {
slot: Slot,
should_flush_f: Option<&mut impl FnMut(&Pubkey, &AccountSharedData) -> bool>,
) -> Option<FlushStats> {
self.accounts_cache.slot_cache(slot).and_then(|slot_cache| {
let is_being_purged = {
let mut slots_under_contention = self
.remove_unrooted_slots_synchronization
.slots_under_contention
.lock()
.unwrap();
// If we're purging this slot, don't flush it here
if slots_under_contention.contains(&slot) {
true
} else {
slots_under_contention.insert(slot);
false
}
};
if !is_being_purged {
let flush_stats = self.do_flush_slot_cache(slot, &slot_cache, should_flush_f);
// Nobody else should have been purging this slot, so should not have been removed
// from `self.remove_unrooted_slots_synchronization`.
assert!(self
.remove_unrooted_slots_synchronization
.slots_under_contention
.lock()
.unwrap()
.remove(&slot));

// Signal to any threads blocked on `remove_unrooted_slots(slot)` that we have finished
// flushing
self.remove_unrooted_slots_synchronization
.signal
.notify_all();
Some(flush_stats)
let is_being_purged = {
let mut slots_under_contention = self
.remove_unrooted_slots_synchronization
.slots_under_contention
.lock()
.unwrap();
// If we're purging this slot, don't flush it here
if slots_under_contention.contains(&slot) {
true
} else {
None
slots_under_contention.insert(slot);
false
}
})
};

if !is_being_purged {
let flush_stats = self.accounts_cache.slot_cache(slot).map(|slot_cache| {
#[cfg(test)]
{
// Give some time for cache flushing to occur here for unit tests
sleep(Duration::from_millis(self.load_delay));
}
// Since we added the slot to `slots_under_contention` AND this slot
// still exists in the cache, we know the slot cannot be removed
// by any other threads past this point. We are now responsible for
// flushing this slot.
self.do_flush_slot_cache(slot, &slot_cache, should_flush_f)
});

// Nobody else should have been purging this slot, so should not have been removed
// from `self.remove_unrooted_slots_synchronization`.
assert!(self
.remove_unrooted_slots_synchronization
.slots_under_contention
.lock()
.unwrap()
.remove(&slot));

// Signal to any threads blocked on `remove_unrooted_slots(slot)` that we have finished
// flushing
self.remove_unrooted_slots_synchronization
.signal
.notify_all();
flush_stats
} else {
None
}
}

fn write_accounts_to_cache(
Expand Down Expand Up @@ -11516,7 +11528,79 @@ pub mod tests {
}

#[test]
fn test_cache_flush_remove_unrooted_race() {
fn test_cache_flush_delayed_remove_unrooted_race() {
let caching_enabled = true;
let mut db = AccountsDb::new_with_config(
Vec::new(),
&ClusterType::Development,
AccountSecondaryIndexes::default(),
caching_enabled,
AccountShrinkThreshold::default(),
);
db.load_delay = RACY_SLEEP_MS;
let db = Arc::new(db);
let slot = 10;
let bank_id = 10;

let lamports = 42;
let mut account = AccountSharedData::new(1, 0, AccountSharedData::default().owner());
account.set_lamports(lamports);

// Start up a thread to flush the accounts cache
let (flush_trial_start_sender, flush_trial_start_receiver) = unbounded();
let (flush_done_sender, flush_done_receiver) = unbounded();
let t_flush_cache = {
let db = db.clone();
std::thread::Builder::new()
.name("account-cache-flush".to_string())
.spawn(move || loop {
// Wait for the signal to start a trial
if flush_trial_start_receiver.recv().is_err() {
return;
}
db.flush_slot_cache(10, None::<&mut fn(&_, &_) -> bool>);
flush_done_sender.send(()).unwrap();
})
.unwrap()
};

// Start up a thread remove the slot
let (remove_trial_start_sender, remove_trial_start_receiver) = unbounded();
let (remove_done_sender, remove_done_receiver) = unbounded();
let t_remove = {
let db = db.clone();
std::thread::Builder::new()
.name("account-remove".to_string())
.spawn(move || loop {
// Wait for the signal to start a trial
if remove_trial_start_receiver.recv().is_err() {
return;
}
db.remove_unrooted_slots(&[(slot, bank_id)]);
remove_done_sender.send(()).unwrap();
})
.unwrap()
};

let num_trials = 10;
for _ in 0..num_trials {
let pubkey = Pubkey::new_unique();
db.store_cached(slot, &[(&pubkey, &account)]);
// Wait for both threads to finish
flush_trial_start_sender.send(()).unwrap();
remove_trial_start_sender.send(()).unwrap();
let _ = flush_done_receiver.recv();
let _ = remove_done_receiver.recv();
}

drop(flush_trial_start_sender);
drop(remove_trial_start_sender);
t_flush_cache.join().unwrap();
t_remove.join().unwrap();
}

#[test]
fn test_cache_flush_remove_unrooted_race_multiple_slots() {
let caching_enabled = true;
let db = AccountsDb::new_with_config(
Vec::new(),
Expand Down Expand Up @@ -11606,15 +11690,17 @@ pub mod tests {
// in which case flush should ignore/move past the slot to be dumped
//
// Hence, we split into chunks to get the dumping of each chunk to race with the
// flushes. If we were to dump the entire chunk at once, then this lessens the possibility
// flushes. If we were to dump the entire chunk at once, then this reduces the possibility
// of the flush occurring first since the dumping logic reserves all the slots it's about
// to dump immediately.

for chunks in slots_to_dump.chunks(slots_to_dump.len() / 2) {
db.remove_unrooted_slots(chunks);
}

// Check that all the slots in `slots_to_dump` were completely removed from the
// cache, storage, and index

for (slot, _) in slots_to_dump {
assert!(db.storage.get_slot_storage_entries(*slot).is_none());
assert!(db.accounts_cache.slot_cache(*slot).is_none());
Expand All @@ -11626,6 +11712,7 @@ pub mod tests {
}

// Wait for flush to finish before starting next trial

flush_done_receiver.recv().unwrap();

for (slot, bank_id) in slots_to_keep {
Expand All @@ -11646,6 +11733,7 @@ pub mod tests {
exit.store(true, Ordering::Relaxed);
drop(new_trial_start_sender);
t_flush_cache.join().unwrap();

t_spurious_signal.join().unwrap();
}

Expand Down

0 comments on commit 5cabb5b

Please sign in to comment.