Skip to content

Commit

Permalink
Add checks when constructing a BankSnapshotInfo from a directory (sol…
Browse files Browse the repository at this point in the history
…ana-labs#30373)

* Read snapshot directories and find the highest

* format check

* Fix test_get_bank_snapshots

* fix test_bank_fields_from_snapshot

* review changes, is_file etc

* removed incremental snapshot

* SnapshotNewFromDirError

* nit and comments issues

* NewFromDir(#[from] SnapshotNewFromDirError)

* change fill to create

* replace unwrap with map_err and ok_or_else

* Remove BankForks, fix the bank loop
  • Loading branch information
xiangzhu70 authored and nickfrosty committed Mar 12, 2023
1 parent 8cc16e1 commit f095ef8
Showing 1 changed file with 190 additions and 24 deletions.
214 changes: 190 additions & 24 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ pub struct BankSnapshotInfo {
pub snapshot_type: BankSnapshotType,
/// Path to the bank snapshot directory
pub snapshot_dir: PathBuf,
/// Snapshot version
pub snapshot_version: SnapshotVersion,
}

impl PartialOrd for BankSnapshotInfo {
Expand All @@ -162,31 +164,58 @@ impl BankSnapshotInfo {
pub fn new_from_dir(
bank_snapshots_dir: impl AsRef<Path>,
slot: Slot,
) -> Option<BankSnapshotInfo> {
) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
// check this directory to see if there is a BankSnapshotPre and/or
// BankSnapshotPost file
let bank_snapshot_dir = get_bank_snapshots_dir(&bank_snapshots_dir, slot);
let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
let bank_snapshot_pre_path =
bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);

if bank_snapshot_pre_path.is_file() {
return Some(BankSnapshotInfo {
slot,
snapshot_type: BankSnapshotType::Pre,
snapshot_dir: bank_snapshot_dir,
});
if !bank_snapshot_dir.is_dir() {
return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
bank_snapshot_dir,
));
}

if bank_snapshot_post_path.is_file() {
return Some(BankSnapshotInfo {
slot,
snapshot_type: BankSnapshotType::Post,
snapshot_dir: bank_snapshot_dir,
});
let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
if !fs::metadata(&status_cache_file)?.is_file() {
return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
status_cache_file,
));
}

let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
let version_str = snapshot_version_from_file(&version_path).or(Err(
SnapshotNewFromDirError::MissingVersionFile(version_path),
))?;
let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
.or(Err(SnapshotNewFromDirError::InvalidVersion))?;

// There is a time window from the slot directory being created, and the content being completely
// filled. Check the completion to avoid using a highest found slot directory with missing content.
let completion_flag_file = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
if !fs::metadata(&completion_flag_file)?.is_file() {
return Err(SnapshotNewFromDirError::IncompleteDir(completion_flag_file));
}

None
let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
let bank_snapshot_pre_path =
bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);

let snapshot_type = if bank_snapshot_pre_path.is_file() {
BankSnapshotType::Pre
} else if bank_snapshot_post_path.is_file() {
BankSnapshotType::Post
} else {
return Err(SnapshotNewFromDirError::MissingSnapshotFile(
bank_snapshot_dir,
));
};

Ok(BankSnapshotInfo {
slot,
snapshot_type,
snapshot_dir: bank_snapshot_dir,
snapshot_version,
})
}

pub fn snapshot_path(&self) -> PathBuf {
Expand Down Expand Up @@ -294,12 +323,42 @@ pub enum SnapshotError {
#[error("snapshot slot deltas are invalid: {0}")]
VerifySlotDeltas(#[from] VerifySlotDeltasError),

#[error("bank_snapshot_info new_from_dir failed: {0}")]
NewFromDir(#[from] SnapshotNewFromDirError),

#[error("invalid AppendVec path: {}", .0.display())]
InvalidAppendVecPath(PathBuf),

#[error("invalid account path: {}", .0.display())]
InvalidAccountPath(PathBuf),

#[error("no valid snapshot dir found under {}", .0.display())]
NoSnapshotSlotDir(PathBuf),
}
#[derive(Error, Debug)]
pub enum SnapshotNewFromDirError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),

#[error("invalid bank snapshot directory {}", .0.display())]
InvalidBankSnapshotDir(PathBuf),

#[error("missing status cache file {}", .0.display())]
MissingStatusCacheFile(PathBuf),

#[error("missing version file {}", .0.display())]
MissingVersionFile(PathBuf),

#[error("invalid snapshot version")]
InvalidVersion,

#[error("snapshot directory incomplete {}", .0.display())]
IncompleteDir(PathBuf),

#[error("missing snapshot file {}", .0.display())]
MissingSnapshotFile(PathBuf),
}

pub type Result<T> = std::result::Result<T, SnapshotError>;

/// Errors that can happen in `verify_slot_deltas()`
Expand Down Expand Up @@ -644,13 +703,16 @@ pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnaps
.and_then(|file_name| file_name.parse::<Slot>().ok())
})
})
.for_each(|slot| {
if let Some(snapshot_info) =
BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot)
{
bank_snapshots.push(snapshot_info);
}
}),
.for_each(
|slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
Ok(snapshot_info) => {
bank_snapshots.push(snapshot_info);
}
Err(err) => {
error!("Unable to read bank snapshot for slot {}: {}", slot, err);
}
},
),
}
bank_snapshots
}
Expand Down Expand Up @@ -691,6 +753,13 @@ pub fn get_highest_bank_snapshot_post(
do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
}

/// Get the bank snapshot with the highest slot in a directory
///
/// This function gets the highest bank snapshot of any type
pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
}

fn do_get_highest_bank_snapshot(
mut bank_snapshots: Vec<BankSnapshotInfo>,
) -> Option<BankSnapshotInfo> {
Expand Down Expand Up @@ -1102,6 +1171,7 @@ pub fn add_bank_snapshot(
slot,
snapshot_type: BankSnapshotType::Pre,
snapshot_dir: bank_snapshot_dir,
snapshot_version,
})
}

Expand Down Expand Up @@ -1548,6 +1618,39 @@ fn streaming_unarchive_snapshot(
.collect()
}

/// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
/// as a valid one. A dir unpacked from an archive lacks these files. Fill them here to
/// allow new_from_dir() checks to pass. These checks are not needed for unpacked dirs,
/// but it is not clean to add another flag to new_from_dir() to skip them.
fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
let snapshots_dir = unpack_dir.as_ref().join("snapshots");
if !snapshots_dir.is_dir() {
return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
}

// The unpacked dir has a single slot dir, which is the snapshot slot dir.
let slot_dir = fs::read_dir(&snapshots_dir)
.map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
.find(|entry| entry.as_ref().unwrap().path().is_dir())
.ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
.map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
.path();

let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;

let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
fs::hard_link(
status_cache_file,
slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
)?;

let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
fs::File::create(state_complete_file)?;

Ok(())
}

/// Perform the common tasks when unarchiving a snapshot. Handles creating the temporary
/// directories, untaring, reading the version file, and then returning those fields plus the
/// rebuilt storage
Expand Down Expand Up @@ -1593,6 +1696,8 @@ where
);
info!("{}", measure_untar);

create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;

let RebuiltSnapshotStorage {
snapshot_version,
storage,
Expand Down Expand Up @@ -3069,6 +3174,16 @@ mod tests {
let snapshot_filename = get_snapshot_file_name(slot);
let snapshot_path = snapshot_dir.join(snapshot_filename);
File::create(snapshot_path).unwrap();

let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
File::create(status_cache_file).unwrap();

let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
write_snapshot_version_file(version_path, SnapshotVersion::default()).unwrap();

// Mark this directory complete so it can be used. Check this flag first before selecting for deserialization.
let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
fs::File::create(state_complete_path).unwrap();
}
}

Expand Down Expand Up @@ -4522,4 +4637,55 @@ mod tests {

assert!(matches!(ret, Err(SnapshotError::InvalidAppendVecPath(_))));
}

#[test]
fn test_get_highest_bank_snapshot() {
solana_logger::setup();

let genesis_config = GenesisConfig::default();
let mut bank = Arc::new(Bank::new_for_tests(&genesis_config));

let tmp_dir = tempfile::TempDir::new().unwrap();
let bank_snapshots_dir = tmp_dir.path();
let collecter_id = Pubkey::new_unique();
let snapshot_version = SnapshotVersion::default();

for _ in 0..4 {
// prepare the bank
bank = Arc::new(Bank::new_from_parent(&bank, &collecter_id, bank.slot() + 1));
bank.fill_bank_with_ticks_for_tests();
bank.squash();
bank.force_flush_accounts_cache();

// generate the bank snapshot directory for slot+1
let snapshot_storages = bank.get_snapshot_storages(None);
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
add_bank_snapshot(
bank_snapshots_dir,
&bank,
&snapshot_storages,
snapshot_version,
slot_deltas,
)
.unwrap();
}

let snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap();
assert_eq!(snapshot.slot, 4);

let complete_flag_file = snapshot.snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
fs::remove_file(complete_flag_file).unwrap();
let snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap();
assert_eq!(snapshot.slot, 3);

let snapshot_version_file = snapshot.snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
fs::remove_file(snapshot_version_file).unwrap();
let snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap();
assert_eq!(snapshot.slot, 2);

let status_cache_file = snapshot.snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
fs::remove_file(status_cache_file).unwrap();
let snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap();
assert_eq!(snapshot.slot, 1);
}
}

0 comments on commit f095ef8

Please sign in to comment.