Skip to content

Commit

Permalink
[reshardingV3] State ShardUIdMapping - initial implementation (#12084)
Browse files Browse the repository at this point in the history
Tracking issue: #12050
#### Summary
Currently the changes should be almost no-op, as we do not explicitly
save anything to `DBCol::ShardUIdMapping`.
The only difference is that we make an additional read from
`DBCol::ShardUIdMapping` column every time we access `State` column.
The main logic is in `Store::get_impl_state()`.
These changes implement mapping for reads, writes will be handled in the
next PR.

#### Changes:
- Added `DBCol::ShardUIdMapping` that is initially empty and will be
populated on future resharding events.
- Slight refactor: only allow `Store` to create `StoreUpdate`. 
- `Store::get_impl_state()` - special `get()` implementation for the
State column.

#### Next steps (see tracking issue
#12050):
- Use mapping for writes to db.
- Handle `copy_state_from_store` in `cold_storage.rs`.
- Integration.
- State clean up (e.g. gc parent state when it is no longer referenced
by any child).
- Tests.
  • Loading branch information
staffik authored Oct 11, 2024
1 parent 30e4ace commit 189222e
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 47 deletions.
1 change: 1 addition & 0 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::EpochSyncProof
| DBCol::Misc
| DBCol::_ReceiptIdToShardId
| DBCol::StateShardUIdMapping
=> unreachable!(),
}
self.merge(store_update);
Expand Down
18 changes: 17 additions & 1 deletion core/store/src/adapter/trie_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,24 @@ impl TrieStoreAdapter {
TrieStoreUpdateAdapter { store_update: StoreUpdateHolder::Owned(self.store.store_update()) }
}

/// Reads shard_uid mapping for given shard.
/// If the mapping does not exist, it means that `shard_uid` maps to itself.
pub(crate) fn read_shard_uid_mapping_from_db(
&self,
shard_uid: ShardUId,
) -> Result<ShardUId, StorageError> {
let mapped_shard_uid =
self.store.get_ser::<ShardUId>(DBCol::StateShardUIdMapping, &shard_uid.to_bytes());
let mapped_shard_uid = mapped_shard_uid
.map_err(|err| StorageError::StorageInconsistentState(err.to_string()))?;
Ok(mapped_shard_uid.unwrap_or(shard_uid))
}

/// Replaces shard_uid prefix with a mapped value according to mapping strategy in Resharding V3.
/// For this, it does extra read from `DBCol::StateShardUIdMapping`.
pub fn get(&self, shard_uid: ShardUId, hash: &CryptoHash) -> Result<Arc<[u8]>, StorageError> {
let key = get_key_from_shard_uid_and_hash(shard_uid, hash);
let mapped_shard_uid = self.read_shard_uid_mapping_from_db(shard_uid)?;
let key = get_key_from_shard_uid_and_hash(mapped_shard_uid, hash);
let val = self
.store
.get(DBCol::State, key.as_ref())
Expand Down
1 change: 1 addition & 0 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ fn copy_state_from_store(

let Some(trie_changes) = trie_changes else { continue };
for op in trie_changes.insertions() {
// TODO(reshardingV3) Handle shard_uid not mapped there
let key = join_two_keys(&shard_uid_key, op.hash().as_bytes());
let value = op.payload().to_vec();

Expand Down
11 changes: 10 additions & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ pub enum DBCol {
/// - *Rows*: only one key with 0 bytes.
/// - *Column type*: `EpochSyncProof`
EpochSyncProof,
/// Mapping of ShardUId to the ShardUId that should be used as the database key prefix for the State column.
/// The mapped ShardUId value can be the parent shard after resharding, an ancestor shard after many resharding
/// or just map shard to itself if there was no resharding or the mapping was removed after node stopped tracking the shard.
/// - *Rows*: `ShardUId`
/// - *Column type*: `ShardUId`
StateShardUIdMapping,
}

/// Defines different logical parts of a db key.
Expand Down Expand Up @@ -444,7 +450,9 @@ impl DBCol {
| DBCol::StateChangesForSplitStates
| DBCol::StateHeaders
| DBCol::TransactionResultForBlock
| DBCol::Transactions => true,
| DBCol::Transactions
// TODO(reshardingV3) How the mapping will work with split storage?
| DBCol::StateShardUIdMapping => true,

// TODO
DBCol::ChallengedBlocks => false,
Expand Down Expand Up @@ -575,6 +583,7 @@ impl DBCol {
DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey],
DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex],
DBCol::EpochSyncProof => &[DBKeyType::Empty],
DBCol::StateShardUIdMapping => &[DBKeyType::ShardUId],
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions core/store/src/db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,41 +811,41 @@ mod tests {
let store = opener.open().unwrap().get_hot_store();
let ptr = (&*store.storage) as *const (dyn Database + 'static);
let rocksdb = unsafe { &*(ptr as *const RocksDB) };
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
{
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.commit().unwrap();
}
{
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(
rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(),
rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(),
Some(&[1, 2, 0, 0, 0, 0, 0, 0, 0][..])
);
{
let mut store_update = store.store_update();
store_update.decrement_refcount(DBCol::State, &[1]);
store_update.decrement_refcount(DBCol::State, &[1; 8]);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(
rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(),
rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(),
Some(&[1, 1, 0, 0, 0, 0, 0, 0, 0][..])
);
{
let mut store_update = store.store_update();
store_update.decrement_refcount(DBCol::State, &[1]);
store_update.decrement_refcount(DBCol::State, &[1; 8]);
store_update.commit().unwrap();
}
// Refcount goes to 0 -> get() returns None
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
// Internally there is an empty value
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(), Some(&[][..]));
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[][..]));

// single_thread_rocksdb makes compact hang forever
if !cfg!(feature = "single_thread_rocksdb") {
Expand All @@ -858,14 +858,14 @@ mod tests {
// empty values.
rocksdb.db.compact_range_cf(cf, none, none);
assert_eq!(
rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap().as_deref(),
rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap().as_deref(),
Some(&[][..])
);
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);

rocksdb.db.compact_range_cf(cf, none, none);
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(rocksdb.get_raw_bytes(DBCol::State, &[1; 8]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
}
}

Expand Down
55 changes: 29 additions & 26 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ impl Store {
/// provides conversion into a vector or an Arc.
pub fn get(&self, column: DBCol, key: &[u8]) -> io::Result<Option<DBSlice<'_>>> {
let value = if column.is_rc() {
self.storage.get_with_rc_stripped(column, key)
self.storage.get_with_rc_stripped(column, &key)
} else {
self.storage.get_raw_bytes(column, key)
self.storage.get_raw_bytes(column, &key)
}?;
tracing::trace!(
target: "store",
Expand All @@ -327,7 +327,7 @@ impl Store {
}

pub fn store_update(&self) -> StoreUpdate {
StoreUpdate::new(Arc::clone(&self.storage))
StoreUpdate { transaction: DBTransaction::new(), storage: Arc::clone(&self.storage) }
}

pub fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> {
Expand All @@ -345,6 +345,7 @@ impl Store {
}

pub fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> {
assert!(col != DBCol::State, "can't iter prefix of State column");
self.storage.iter_prefix(col, key_prefix)
}

Expand All @@ -355,6 +356,8 @@ impl Store {
lower_bound: Option<&[u8]>,
upper_bound: Option<&[u8]>,
) -> DBIterator<'a> {
// That would fail if called `ScanDbColumnCmd`` for the `State` column.
assert!(col != DBCol::State, "can't range iter State column");
self.storage.iter_range(col, lower_bound, upper_bound)
}

Expand All @@ -363,6 +366,7 @@ impl Store {
col: DBCol,
key_prefix: &'a [u8],
) -> impl Iterator<Item = io::Result<(Box<[u8]>, T)>> + 'a {
assert!(col != DBCol::State, "can't iter prefix ser of State column");
self.storage
.iter_prefix(col, key_prefix)
.map(|item| item.and_then(|(key, value)| Ok((key, T::try_from_slice(value.as_ref())?))))
Expand Down Expand Up @@ -470,10 +474,6 @@ impl StoreUpdate {
None => panic!(),
};

pub(crate) fn new(db: Arc<dyn Database>) -> Self {
StoreUpdate { transaction: DBTransaction::new(), storage: db }
}

/// Inserts a new value into the database.
///
/// It is a programming error if `insert` overwrites an existing, different
Expand Down Expand Up @@ -598,6 +598,7 @@ impl StoreUpdate {
/// Must not be used for reference-counted columns; use
/// ['Self::increment_refcount'] or [`Self::decrement_refcount`] instead.
pub fn delete(&mut self, column: DBCol, key: &[u8]) {
// It would panic if called with `State` column, as it is refcounted.
assert!(!column.is_rc(), "can't delete: {column}");
self.transaction.delete(column, key.to_vec());
}
Expand All @@ -609,6 +610,7 @@ impl StoreUpdate {
/// Deletes the given key range from the database including `from`
/// and excluding `to` keys.
pub fn delete_range(&mut self, column: DBCol, from: &[u8], to: &[u8]) {
assert!(column != DBCol::State, "can't range delete State column");
self.transaction.delete_range(column, from.to_vec(), to.to_vec());
}

Expand Down Expand Up @@ -733,6 +735,7 @@ impl StoreUpdate {
}
}
}
// TODO(reshardingV3) Map shard_uid for ops referencing State column.
self.storage.write(self.transaction)
}
}
Expand Down Expand Up @@ -1171,21 +1174,21 @@ mod tests {
}

fn test_clear_column(store: Store) {
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
{
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[2], &[2]);
store_update.increment_refcount(DBCol::State, &[3], &[3]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.increment_refcount(DBCol::State, &[2; 8], &[2]);
store_update.increment_refcount(DBCol::State, &[3; 8], &[3]);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap().as_deref(), Some(&[1][..]));
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap().as_deref(), Some(&[1][..]));
{
let mut store_update = store.store_update();
store_update.delete_all(DBCol::State);
store_update.commit().unwrap();
}
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
assert_eq!(store.get(DBCol::State, &[1; 8]).unwrap(), None);
}

#[test]
Expand Down Expand Up @@ -1301,9 +1304,9 @@ mod tests {
{
let store = crate::test_utils::create_test_store();
let mut store_update = store.store_update();
store_update.increment_refcount(DBCol::State, &[1], &[1]);
store_update.increment_refcount(DBCol::State, &[2], &[2]);
store_update.increment_refcount(DBCol::State, &[2], &[2]);
store_update.increment_refcount(DBCol::State, &[1; 8], &[1]);
store_update.increment_refcount(DBCol::State, &[2; 8], &[2]);
store_update.increment_refcount(DBCol::State, &[2; 8], &[2]);
store_update.commit().unwrap();
store.save_state_to_file(tmp.path()).unwrap();
}
Expand All @@ -1314,9 +1317,9 @@ mod tests {
std::io::Read::read_to_end(tmp.as_file_mut(), &mut buffer).unwrap();
#[rustfmt::skip]
assert_eq!(&[
/* column: */ 0, /* key len: */ 1, 0, 0, 0, /* key: */ 1,
/* column: */ 0, /* key len: */ 8, 0, 0, 0, /* key: */ 1, 1, 1, 1, 1, 1, 1, 1,
/* val len: */ 9, 0, 0, 0, /* val: */ 1, 1, 0, 0, 0, 0, 0, 0, 0,
/* column: */ 0, /* key len: */ 1, 0, 0, 0, /* key: */ 2,
/* column: */ 0, /* key len: */ 8, 0, 0, 0, /* key: */ 2, 2, 2, 2, 2, 2, 2, 2,
/* val len: */ 9, 0, 0, 0, /* val: */ 2, 2, 0, 0, 0, 0, 0, 0, 0,
/* end mark: */ 255,
][..], buffer.as_slice());
Expand All @@ -1325,22 +1328,22 @@ mod tests {
{
// Fresh storage, should have no data.
let store = crate::test_utils::create_test_store();
assert_eq!(None, store.get(DBCol::State, &[1]).unwrap());
assert_eq!(None, store.get(DBCol::State, &[2]).unwrap());
assert_eq!(None, store.get(DBCol::State, &[1; 8]).unwrap());
assert_eq!(None, store.get(DBCol::State, &[2; 8]).unwrap());

// Read data from file.
store.load_state_from_file(tmp.path()).unwrap();
assert_eq!(Some(&[1u8][..]), store.get(DBCol::State, &[1]).unwrap().as_deref());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2]).unwrap().as_deref());
assert_eq!(Some(&[1u8][..]), store.get(DBCol::State, &[1; 8]).unwrap().as_deref());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2; 8]).unwrap().as_deref());

// Key &[2] should have refcount of two so once decreased it should
// still exist.
let mut store_update = store.store_update();
store_update.decrement_refcount(DBCol::State, &[1]);
store_update.decrement_refcount(DBCol::State, &[2]);
store_update.decrement_refcount(DBCol::State, &[1; 8]);
store_update.decrement_refcount(DBCol::State, &[2; 8]);
store_update.commit().unwrap();
assert_eq!(None, store.get(DBCol::State, &[1]).unwrap());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2]).unwrap().as_deref());
assert_eq!(None, store.get(DBCol::State, &[1; 8]).unwrap());
assert_eq!(Some(&[2u8][..]), store.get(DBCol::State, &[2; 8]).unwrap().as_deref());
}

// Verify detection of corrupt file.
Expand Down
1 change: 1 addition & 0 deletions core/store/src/trie/mem/parallel_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl ParallelMemTrieLoader {
arena: &mut impl ArenaMut,
) -> Result<MemTrieNodeId, StorageError> {
// Figure out which range corresponds to the prefix of this subtree.
// TODO(reshardingV3) This seems fragile, potentially does not work with mapping.
let (start, end) = subtree_to_load.to_iter_range(self.shard_uid);

// Load all the keys in this range from the FlatState column.
Expand Down
13 changes: 9 additions & 4 deletions integration-tests/src/tests/client/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,11 @@ fn test_storage_after_commit_of_cold_update() {
let cold_store = &storage.get_cold_store().unwrap();
let num_checks = check_iter(client_store, cold_store, col, &no_check_rules);
// assert that this test actually checks something
// apart from StateChangesForSplitStates and StateHeaders, that are empty
// apart from StateChangesForSplitStates, StateHeaders, and StateShardUIdMapping, that are empty
assert!(
col == DBCol::StateChangesForSplitStates
|| col == DBCol::StateHeaders
|| col == DBCol::StateShardUIdMapping
|| num_checks > 0
);
}
Expand Down Expand Up @@ -308,10 +309,11 @@ fn test_cold_db_copy_with_height_skips() {
let cold_store = storage.get_cold_store().unwrap();
let num_checks = check_iter(&client_store, &cold_store, col, &no_check_rules);
// assert that this test actually checks something
// apart from StateChangesForSplitStates and StateHeaders, that are empty
// apart from StateChangesForSplitStates, StateHeaders, and StateShardUIdMapping, that are empty
assert!(
col == DBCol::StateChangesForSplitStates
|| col == DBCol::StateHeaders
|| col == DBCol::StateShardUIdMapping
|| num_checks > 0
);
}
Expand Down Expand Up @@ -361,8 +363,11 @@ fn test_initial_copy_to_cold(batch_size: usize) {
continue;
}
let num_checks = check_iter(&client_store, &cold_store, col, &vec![]);
// StateChangesForSplitStates and StateHeaders are empty
if col == DBCol::StateChangesForSplitStates || col == DBCol::StateHeaders {
// StateChangesForSplitStates, StateHeaders, and StateShardUIdMapping are empty
if col == DBCol::StateChangesForSplitStates
|| col == DBCol::StateHeaders
|| col == DBCol::StateShardUIdMapping
{
continue;
}
// assert that this test actually checks something
Expand Down

0 comments on commit 189222e

Please sign in to comment.