Skip to content

Commit

Permalink
Removes support for loading snapshots with > 1 append vec per slot (#474
Browse files Browse the repository at this point in the history
)
  • Loading branch information
brooksprumo authored Mar 28, 2024
1 parent f36a45a commit d5c0c0b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 139 deletions.
70 changes: 0 additions & 70 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6318,49 +6318,6 @@ impl AccountsDb {
self.flush_slot_cache_with_clean(slot, None::<&mut fn(&_, &_) -> bool>, None)
}

/// 1.13 and some 1.14 could produce legal snapshots with more than 1 append vec per slot.
/// This is now illegal at runtime in the validator.
/// However, there is a clear path to be able to support this.
/// So, combine all accounts from 'slot_stores' into a new storage and return it.
/// This runs prior to the storages being put in AccountsDb.storage
pub fn combine_multiple_slots_into_one_at_startup(
path: &Path,
id: AccountsFileId,
slot: Slot,
slot_stores: &HashMap<AccountsFileId, Arc<AccountStorageEntry>>,
) -> Arc<AccountStorageEntry> {
let size = slot_stores.values().map(|storage| storage.capacity()).sum();
let storage = AccountStorageEntry::new(path, slot, id, size);

// get unique accounts, most recent version by write_version
let mut accum = HashMap::<Pubkey, StoredAccountMeta<'_>>::default();
slot_stores.iter().for_each(|(_id, store)| {
store.accounts.account_iter().for_each(|loaded_account| {
match accum.entry(*loaded_account.pubkey()) {
hash_map::Entry::Occupied(mut occupied_entry) => {
if loaded_account.write_version() > occupied_entry.get().write_version() {
occupied_entry.insert(loaded_account);
}
}
hash_map::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(loaded_account);
}
}
});
});

// store all unique accounts into new storage
let accounts = accum.values().collect::<Vec<_>>();
let to_store = (slot, &accounts[..]);
let storable =
StorableAccountsWithHashesAndWriteVersions::<'_, '_, _, _, &AccountHash>::new(
&to_store,
);
storage.accounts.append_accounts(&storable, 0);

Arc::new(storage)
}

/// `should_flush_f` is an optional closure that determines whether a given
/// account should be flushed. Passing `None` will by default flush all
/// accounts
Expand Down Expand Up @@ -10107,33 +10064,6 @@ pub mod tests {
}
}

#[test]
fn test_combine_multiple_slots_into_one_at_startup() {
solana_logger::setup();
let (db, slot1) = create_db_with_storages_and_index(false, 2, None);
let slot2 = slot1 + 1;

let initial_accounts = get_all_accounts(&db, slot1..(slot2 + 1));

let tf = TempDir::new().unwrap();
let stores = db
.storage
.all_slots()
.into_iter()
.map(|slot| {
let storage = db.storage.get_slot_storage_entry(slot).unwrap();
(storage.append_vec_id(), storage)
})
.collect::<HashMap<_, _>>();
let new_storage =
AccountsDb::combine_multiple_slots_into_one_at_startup(tf.path(), 1000, slot1, &stores);

compare_all_accounts(
&initial_accounts,
&get_all_accounts_from_storages(std::iter::once(&new_storage)),
);
}

#[test]
fn test_accountsdb_scan_snapshot_stores_hash_not_stored() {
let accounts_db = AccountsDb::new_single_for_tests();
Expand Down
3 changes: 3 additions & 0 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ pub enum SnapshotError {

#[error("failed to archive snapshot package: {0}")]
ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),

#[error("failed to rebuild snapshot storages: {0}")]
RebuildStorages(String),
}

#[derive(Error, Debug)]
Expand Down
83 changes: 14 additions & 69 deletions runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ use {
regex::Regex,
solana_accounts_db::{
account_storage::{AccountStorageMap, AccountStorageReference},
accounts_db::{AccountStorageEntry, AccountsDb, AccountsFileId, AtomicAccountsFileId},
append_vec::AppendVec,
accounts_db::{AccountStorageEntry, AccountsFileId, AtomicAccountsFileId},
},
solana_sdk::clock::Slot,
std::{
collections::HashMap,
fs::File,
io::{BufReader, Error as IoError},
path::{Path, PathBuf},
path::PathBuf,
str::FromStr as _,
sync::{
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -333,52 +332,19 @@ impl SnapshotStorageRebuilder {
.collect::<Result<HashMap<AccountsFileId, Arc<AccountStorageEntry>>, SnapshotError>>(
)?;

let storage = if slot_stores.len() > 1 {
let remapped_append_vec_folder = lock.first().unwrap().parent().unwrap();
let remapped_append_vec_id = Self::get_unique_append_vec_id(
&self.next_append_vec_id,
remapped_append_vec_folder,
slot,
);
AccountsDb::combine_multiple_slots_into_one_at_startup(
remapped_append_vec_folder,
remapped_append_vec_id,
slot,
&slot_stores,
)
} else {
slot_stores
.into_values()
.next()
.expect("at least 1 storage per slot required")
};

self.storage.insert(
slot,
AccountStorageReference {
id: storage.append_vec_id(),
storage,
},
);
Ok(())
}

/// increment `next_append_vec_id` until there is no file in `parent_folder` with this id and slot
/// return the id
fn get_unique_append_vec_id(
next_append_vec_id: &Arc<AtomicAccountsFileId>,
parent_folder: &Path,
slot: Slot,
) -> AccountsFileId {
loop {
let remapped_append_vec_id = next_append_vec_id.fetch_add(1, Ordering::AcqRel);
let remapped_file_name = AppendVec::file_name(slot, remapped_append_vec_id);
let remapped_append_vec_path = parent_folder.join(remapped_file_name);
if std::fs::metadata(&remapped_append_vec_path).is_err() {
// getting an err here means that there is no existing file here
return remapped_append_vec_id;
}
if slot_stores.len() != 1 {
return Err(SnapshotError::RebuildStorages(format!(
"there must be exactly one storage per slot, but slot {slot} has {} storages",
slot_stores.len()
)));
}
// SAFETY: The check above guarantees there is one item in slot_stores,
// so `.next()` will always return `Some`
let (id, storage) = slot_stores.into_iter().next().unwrap();

self.storage
.insert(slot, AccountStorageReference { id, storage });
Ok(())
}

/// Wait for the completion of the rebuilding threads
Expand Down Expand Up @@ -462,27 +428,6 @@ mod tests {
solana_accounts_db::append_vec::AppendVec,
};

#[test]
fn test_get_unique_append_vec_id() {
let folder = tempfile::TempDir::new().unwrap();
let folder = folder.path();
let next_id = Arc::default();
let slot = 1;
let append_vec_id =
SnapshotStorageRebuilder::get_unique_append_vec_id(&next_id, folder, slot);
assert_eq!(append_vec_id, 0);
let file_name = AppendVec::file_name(slot, append_vec_id);
let append_vec_path = folder.join(file_name);

// create a file at this path
_ = File::create(append_vec_path).unwrap();
next_id.store(0, Ordering::Release);
let append_vec_id =
SnapshotStorageRebuilder::get_unique_append_vec_id(&next_id, folder, slot);
// should have found a conflict with 0
assert_eq!(append_vec_id, 1);
}

#[test]
fn test_get_snapshot_file_kind() {
assert_eq!(None, get_snapshot_file_kind("file.txt"));
Expand Down

0 comments on commit d5c0c0b

Please sign in to comment.