Skip to content

Commit

Permalink
feat: create NodeStorage from checkpoint (near#8876)
Browse files Browse the repository at this point in the history
This should be safe to use with already open db.
Not sure about easy, though. home_dir, config, and archive flag are not available everywhere right now without any hustle.
  • Loading branch information
posvyatokum authored and nikurt committed Apr 5, 2023
1 parent b26d202 commit ef4ae2b
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 2 deletions.
3 changes: 3 additions & 0 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ pub trait Database: Sync + Send {

/// Returns statistics about the database if available.
fn get_store_statistics(&self) -> Option<StoreStatistics>;

/// Create checkpoint in provided path
fn create_checkpoint(&self, path: &std::path::Path) -> anyhow::Result<()>;
}

fn assert_no_overwrite(col: DBCol, key: &[u8], value: &[u8], old_value: &[u8]) {
Expand Down
4 changes: 4 additions & 0 deletions core/store/src/db/colddb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ impl Database for ColdDB {
fn get_store_statistics(&self) -> Option<crate::StoreStatistics> {
self.cold.get_store_statistics()
}

fn create_checkpoint(&self, path: &std::path::Path) -> anyhow::Result<()> {
self.cold.create_checkpoint(path)
}
}

/// Adjust database operation to be performed on cold storage.
Expand Down
6 changes: 6 additions & 0 deletions core/store/src/db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,12 @@ impl Database for RocksDB {
Some(result)
}
}

fn create_checkpoint(&self, path: &std::path::Path) -> anyhow::Result<()> {
let cp = ::rocksdb::checkpoint::Checkpoint::new(&self.db)?;
cp.create_checkpoint(path)?;
Ok(())
}
}

/// DB level options
Expand Down
5 changes: 5 additions & 0 deletions core/store/src/db/splitdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ impl Database for SplitDB {
log_assert_fail!("get_store_statistics is not allowed - the split storage has two stores");
None
}

fn create_checkpoint(&self, _path: &std::path::Path) -> anyhow::Result<()> {
log_assert_fail!("create_checkpoint is not allowed - the split storage has two stores");
Ok(())
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions core/store/src/db/testdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,8 @@ impl Database for TestDB {
fn get_store_statistics(&self) -> Option<StoreStatistics> {
self.stats.read().unwrap().clone()
}

fn create_checkpoint(&self, _path: &std::path::Path) -> anyhow::Result<()> {
Ok(())
}
}
4 changes: 3 additions & 1 deletion core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ pub mod test_utils;
mod trie;

pub use crate::config::{Mode, StoreConfig};
pub use crate::opener::{StoreMigrator, StoreOpener, StoreOpenerError};
pub use crate::opener::{
checkpoint_hot_storage_and_cleanup_columns, StoreMigrator, StoreOpener, StoreOpenerError,
};

/// Specifies temperature of a storage.
///
Expand Down
121 changes: 120 additions & 1 deletion core/store/src/opener.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::sync::Arc;
use strum::IntoEnumIterator;

use crate::db::rocksdb::snapshot::{Snapshot, SnapshotError, SnapshotRemoveError};
use crate::db::rocksdb::RocksDB;
use crate::metadata::{DbKind, DbMetadata, DbVersion, DB_VERSION};
use crate::{Mode, NodeStorage, Store, StoreConfig, Temperature};
use crate::{DBCol, DBTransaction, Mode, NodeStorage, Store, StoreConfig, Temperature};

#[derive(Debug, thiserror::Error)]
pub enum StoreOpenerError {
Expand Down Expand Up @@ -569,3 +570,121 @@ pub trait StoreMigrator {
/// equal to [`DB_VERSION`].
fn migrate(&self, store: &Store, version: DbVersion) -> anyhow::Result<()>;
}

/// Creates checkpoint of hot storage in `home_dir.join(checkpoint_relative_path)`
///
/// If `columns_to_keep` is None doesn't cleanup columns.
/// Otherwise deletes all columns that are not in `columns_to_keep`.
///
/// Returns NodeStorage of checkpoint db.
/// `archive` -- is hot storage archival (needed to open checkpoint).
#[allow(dead_code)]
pub fn checkpoint_hot_storage_and_cleanup_columns(
db_storage: &NodeStorage,
home_dir: &std::path::Path,
checkpoint_relative_path: std::path::PathBuf,
columns_to_keep: Option<Vec<DBCol>>,
archive: bool,
) -> anyhow::Result<NodeStorage> {
let checkpoint_path = home_dir.join(checkpoint_relative_path);

db_storage.hot_storage.create_checkpoint(&checkpoint_path)?;

// As only path from config is used in StoreOpener, default config with custom path will do.
let mut config = StoreConfig::default();
config.path = Some(checkpoint_path);
let opener = StoreOpener::new(home_dir, archive, &config, None);
let node_storage = opener.open()?;

if let Some(columns_to_keep) = columns_to_keep {
let columns_to_keep_set: std::collections::HashSet<DBCol> =
std::collections::HashSet::from_iter(columns_to_keep.into_iter());
let mut transaction = DBTransaction::new();

for col in DBCol::iter() {
if !columns_to_keep_set.contains(&col) {
transaction.delete_all(col);
}
}

node_storage.hot_storage.write(transaction)?;
}

Ok(node_storage)
}

#[cfg(test)]
mod tests {
use super::*;

fn check_keys_existence(store: &Store, column: &DBCol, keys: &Vec<Vec<u8>>, expected: bool) {
for key in keys {
assert_eq!(store.exists(*column, &key).unwrap(), expected, "Column {:?}", column);
}
}

#[test]
fn test_checkpoint_hot_storage_and_cleanup_columns() {
let (home_dir, opener) = NodeStorage::test_opener();
let node_storage = opener.open().unwrap();

let keys = vec![vec![0], vec![1], vec![2], vec![3]];
let columns = vec![DBCol::Block, DBCol::Chunks, DBCol::BlockHeader];

let mut store_update = node_storage.get_hot_store().store_update();
for column in columns {
for key in &keys {
store_update.insert(column, key, &vec![42]);
}
}
store_update.commit().unwrap();

let store = checkpoint_hot_storage_and_cleanup_columns(
&node_storage,
&home_dir.path(),
std::path::PathBuf::from("checkpoint_none"),
None,
false,
)
.unwrap();
check_keys_existence(&store.get_hot_store(), &DBCol::Block, &keys, true);
check_keys_existence(&store.get_hot_store(), &DBCol::Chunks, &keys, true);
check_keys_existence(&store.get_hot_store(), &DBCol::BlockHeader, &keys, true);

let store = checkpoint_hot_storage_and_cleanup_columns(
&node_storage,
&home_dir.path(),
std::path::PathBuf::from("checkpoint_some"),
Some(vec![DBCol::Block]),
false,
)
.unwrap();
check_keys_existence(&store.get_hot_store(), &DBCol::Block, &keys, true);
check_keys_existence(&store.get_hot_store(), &DBCol::Chunks, &keys, false);
check_keys_existence(&store.get_hot_store(), &DBCol::BlockHeader, &keys, false);

let store = checkpoint_hot_storage_and_cleanup_columns(
&node_storage,
&home_dir.path(),
std::path::PathBuf::from("checkpoint_all"),
Some(vec![DBCol::Block, DBCol::Chunks, DBCol::BlockHeader]),
false,
)
.unwrap();
check_keys_existence(&store.get_hot_store(), &DBCol::Block, &keys, true);
check_keys_existence(&store.get_hot_store(), &DBCol::Chunks, &keys, true);
check_keys_existence(&store.get_hot_store(), &DBCol::BlockHeader, &keys, true);

let store = checkpoint_hot_storage_and_cleanup_columns(
&node_storage,
&home_dir.path(),
std::path::PathBuf::from("checkpoint_empty"),
Some(vec![]),
false,
)
.unwrap();
check_keys_existence(&store.get_hot_store(), &DBCol::Block, &keys, false);
check_keys_existence(&store.get_hot_store(), &DBCol::Chunks, &keys, false);
check_keys_existence(&store.get_hot_store(), &DBCol::BlockHeader, &keys, false);
}
}

0 comments on commit ef4ae2b

Please sign in to comment.