Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race in remove_unrooted_race and flush_slot_cache #18785

Merged
merged 2 commits into from
Jul 22, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 104 additions & 22 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4229,22 +4229,32 @@ impl AccountsDb {
slot: Slot,
should_flush_f: Option<&mut impl FnMut(&Pubkey, &AccountSharedData) -> bool>,
) -> Option<FlushStats> {
self.accounts_cache.slot_cache(slot).and_then(|slot_cache| {
let is_being_purged = {
let mut slots_under_contention = self
.remove_unrooted_slots_synchronization
.slots_under_contention
.lock()
.unwrap();
// If we're purging this slot, don't flush it here
if slots_under_contention.contains(&slot) {
true
} else {
slots_under_contention.insert(slot);
false
let is_being_purged = {
let mut slots_under_contention = self
.remove_unrooted_slots_synchronization
.slots_under_contention
.lock()
.unwrap();
// If we're purging this slot, don't flush it here
if slots_under_contention.contains(&slot) {
true
} else {
slots_under_contention.insert(slot);
false
}
};

if !is_being_purged {
self.accounts_cache.slot_cache(slot).map(|slot_cache| {
#[cfg(test)]
{
// Give some time for cache flushing to occur here for unit tests
sleep(Duration::from_millis(self.load_delay));
}
};
if !is_being_purged {
// Since we added the slot to `slots_under_contention` AND this slot
// still exists in the cache, we know the slot cannot be removed
// by any other threads past this point. We are now responsible for
// flushing this slot.
let flush_stats = self.do_flush_slot_cache(slot, &slot_cache, should_flush_f);
// Nobody else should have been purging this slot, so should not have been removed
// from `self.remove_unrooted_slots_synchronization`.
Expand All @@ -4260,11 +4270,11 @@ impl AccountsDb {
self.remove_unrooted_slots_synchronization
.signal
.notify_all();
Some(flush_stats)
} else {
None
}
})
flush_stats
})
} else {
None
}
}

fn write_accounts_to_cache(
Expand Down Expand Up @@ -11516,7 +11526,79 @@ pub mod tests {
}

#[test]
fn test_cache_flush_remove_unrooted_race() {
fn test_cache_flush_delayed_remove_unrooted_race() {
let caching_enabled = true;
let mut db = AccountsDb::new_with_config(
Vec::new(),
&ClusterType::Development,
AccountSecondaryIndexes::default(),
caching_enabled,
AccountShrinkThreshold::default(),
);
db.load_delay = RACY_SLEEP_MS;
let db = Arc::new(db);
let slot = 10;
let bank_id = 10;

let lamports = 42;
let mut account = AccountSharedData::new(1, 0, AccountSharedData::default().owner());
account.set_lamports(lamports);

// Start up a thread to flush the accounts cache
let (flush_trial_start_sender, flush_trial_start_receiver) = unbounded();
let (flush_done_sender, flush_done_receiver) = unbounded();
let t_flush_cache = {
let db = db.clone();
std::thread::Builder::new()
.name("account-cache-flush".to_string())
.spawn(move || loop {
// Wait for the signal to start a trial
if flush_trial_start_receiver.recv().is_err() {
return;
}
db.flush_slot_cache(10, None::<&mut fn(&_, &_) -> bool>);
flush_done_sender.send(()).unwrap();
})
.unwrap()
};

// Start up a thread remove the slot
let (remove_trial_start_sender, remove_trial_start_receiver) = unbounded();
let (remove_done_sender, remove_done_receiver) = unbounded();
let t_remove = {
let db = db.clone();
std::thread::Builder::new()
.name("account-remove".to_string())
.spawn(move || loop {
// Wait for the signal to start a trial
if remove_trial_start_receiver.recv().is_err() {
return;
}
db.remove_unrooted_slots(&[(slot, bank_id)]);
remove_done_sender.send(()).unwrap();
})
.unwrap()
};

let num_trials = 10;
for _ in 0..num_trials {
let pubkey = Pubkey::new_unique();
db.store_cached(slot, &[(&pubkey, &account)]);
// Wait for both threads to finish
flush_trial_start_sender.send(()).unwrap();
remove_trial_start_sender.send(()).unwrap();
let _ = flush_done_receiver.recv();
let _ = remove_done_receiver.recv();
}

drop(flush_trial_start_sender);
drop(remove_trial_start_sender);
t_flush_cache.join().unwrap();
t_remove.join().unwrap();
}

#[test]
fn test_cache_flush_remove_unrooted_race_multiple_slots() {
let caching_enabled = true;
let db = AccountsDb::new_with_config(
Vec::new(),
Expand Down Expand Up @@ -11606,7 +11688,7 @@ pub mod tests {
// in which case flush should ignore/move past the slot to be dumped
//
// Hence, we split into chunks to get the dumping of each chunk to race with the
// flushes. If we were to dump the entire chunk at once, then this lessens the possibility
// flushes. If we were to dump the entire chunk at once, then this reduces the possibility
// of the flush occurring first since the dumping logic reserves all the slots it's about
// to dump immediately.
for chunks in slots_to_dump.chunks(slots_to_dump.len() / 2) {
Expand Down