Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
feat/simplify: impl a storage lock guard, use the StorageValueRef API…
Browse files Browse the repository at this point in the history
… for simplicity
  • Loading branch information
Bernhard Schuster committed May 12, 2020
1 parent a037d25 commit 9537827
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 66 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions primitives/runtime/src/offchain/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ impl<'a> StorageValueRef<'a> {
.map(|val| T::decode(&mut &*val).ok())
}


/// Remove the value from storage.
pub fn remove(&self) {
sp_io::offchain::local_storage_remove(self.kind, self.key)
}

/// Retrieve & decode the value and set it to a new one atomically.
///
/// Function `f` should return a new value that we should attempt to write to storage.
Expand Down
6 changes: 2 additions & 4 deletions primitives/session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ sp-api = { version = "2.0.0-dev", default-features = false, path = "../api" }
sp-core = { version = "2.0.0-dev", default-features = false, path = "../core" }
sp-std = { version = "2.0.0-dev", default-features = false, path = "../std" }
sp-staking = { version = "2.0.0-dev", default-features = false, path = "../staking" }
sp-runtime = { version = "2.0.0-dev", optional = true, path = "../runtime" }
sp-runtime = { version = "2.0.0-dev", default-features = false, path = "../runtime" }

# x
codec = { package = "parity-scale-codec", version = "1.3.0", default-features = false }
hash-db = { version = "0.15.2", default-features = false }
trie-db = { version = "0.20.1", default-features = false }
sp-io ={ path = "../../primitives/io", default-features = false , version = "2.0.0-dev"}
sp-io = { version = "2.0.0-dev", default-features = false, path = "../../primitives/io"}

[features]
default = [ "std" ]
Expand Down
175 changes: 113 additions & 62 deletions primitives/session/src/offchain_hashdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,77 @@ use trie_db::TrieLayout;
use hash_db::{AsHashDB, HashDB, Hasher, Prefix};

use codec::{Decode, Encode};
use sp_core::offchain::StorageKind;
use sp_core::offchain::{Duration, StorageKind, Timestamp};

use sp_runtime::offchain::storage::StorageValueRef;

use sp_io::offchain;
use sp_std::prelude::*;
use sp_std::collections::btree_set::BTreeSet;
use sp_std::prelude::*;


const STORAGE_TRACKING_GUARD_FALLBACK_DURATION: u64 = 30_000;


/// A persisted guard state.
///
/// An in DB persistent mutex for multi access items which are modified
/// i.e. vecs or sets.
pub struct StorageTrackingGuard<'a> {
key: &'a [u8],
locked_until: Option<Timestamp>,
}

impl<'a> StorageTrackingGuard<'a> {
pub fn lock<'b>(key: &'b [u8]) -> Self
where
'b: 'a,
{
// @todo lock this scope I guess
let value_ref = StorageValueRef::persistent(key);
loop {
let now = offchain::timestamp();
let valid_until_unix_timestamp = match value_ref.get::<Timestamp>() {
None => break, // no data at all
Some(None) => break, // garbage data could not be decoded
Some(Some(valid_until_unix_timestamp)) => {
if valid_until_unix_timestamp < now {
break;
}
valid_until_unix_timestamp
}
};
// do not snooze the full duration, but instead snooze 100ms
// @todo add some additive jitter
let remainder = valid_until_unix_timestamp.diff(&now);
let snooze = offchain::timestamp().add(Duration::from_millis(core::cmp::min(
remainder.millis(),
100,
)));
offchain::sleep_until(snooze);
}
let locked_at_most_until = offchain::timestamp().add(Duration::from_millis(STORAGE_TRACKING_GUARD_FALLBACK_DURATION));
value_ref.set(&locked_at_most_until);
// @todo unlock
Self {
key,
locked_until: Some(locked_at_most_until),
}
}
}

impl<'a> core::ops::Drop for StorageTrackingGuard<'a> {
fn drop(&mut self) {
let Self { key, locked_until } = self;

if let Some(_locked_until) = locked_until {
let value_ref = StorageValueRef::persistent(key);
value_ref.remove();
}
}
}

/// Implementation of a key value store with pruning.
#[derive(Default)]
pub struct AlternativeDB<L>
where
Expand All @@ -56,63 +122,59 @@ where

// @todo requires some internal mutability concept to work properly
// @todo which is yet to be hashed out
unsafe impl<L> core::marker::Sync for AlternativeDB<L> where
L: TrieLayout + Send,{}
unsafe impl<L> core::marker::Sync for AlternativeDB<L> where L: TrieLayout + Send {}




const INDEX_DB_LOCK_KEY: &'static [u8] = b"alternate_db::tracking::lock";


/// Session aware, Offchain DB based HashDB.
///
/// Creates two indices:
/// index: session -> [(key,tree_prefix),(key,tree_prefix),..]
impl<L> AlternativeDB<L>
where
L: TrieLayout + Send,
<L as TrieLayout>::Hash: Default,
<L as trie_db::TrieLayout>::Hash: core::default::Default + core::hash::BuildHasher,
<<L as trie_db::TrieLayout>::Hash as Hasher>::Out: AsRef<[u8]> + Default,
L: TrieLayout + Send,
<L as TrieLayout>::Hash: Default,
<L as trie_db::TrieLayout>::Hash: core::default::Default + core::hash::BuildHasher,
<<L as trie_db::TrieLayout>::Hash as Hasher>::Out: AsRef<[u8]> + Default,
{

/// Additional prefix to distinguish data used.
const PREFIX: &'static [u8] = b"__TO_BE_DEFINED__";

/// Addition tracking prefix to separate index tracking data from
/// actual key value data being tracked.
const TRACKING_PREFIX: &'static [u8] = b"__TRACKING__";


/// prune all data associated to a particular `session_index`.
///
/// If `prefix` is set, only items having a particular tree prefix
/// will be pruned.
pub fn prune_session(&mut self, session_index: &[u8], prefix: Option<Prefix>) {
let _lock = StorageTrackingGuard::lock(INDEX_DB_LOCK_KEY);
let index = if let Some(prefix) = prefix {

let tree_prefix = prefix.encode();
let tree_prefix = tree_prefix.as_slice();

self.get_index(session_index)
.into_iter()
.filter_map(move |(derived_key, prefix)| {
if tree_prefix == prefix.as_slice() {
// @todo FIXME erase
offchain::local_storage_set(
StorageKind::PERSISTENT,
derived_key.as_slice(),
&[]);
StorageValueRef::persistent(derived_key.as_slice()).remove();
None
} else {
Some((derived_key, prefix))
}
})
.collect::<BTreeSet<_>>()

} else {

let index = self.get_index(session_index);
index.into_iter().for_each(move |(derived_key, _prefix)| {
offchain::local_storage_set(StorageKind::PERSISTENT, derived_key.as_slice(), &[]);
StorageValueRef::persistent(derived_key.as_ref()).remove();
});
BTreeSet::new()

};
self.set_index(session_index, index);
}
Expand All @@ -128,16 +190,13 @@ L: TrieLayout + Send,
fn add_to_index(&self, derived_key: &[u8], tree_prefix: &[u8], session_index: &[u8]) {
let per_session_key: Vec<u8> = Self::derive_tracking_index_key(session_index);
let mut mapping: BTreeSet<(Vec<u8>, Vec<u8>)> =
offchain::local_storage_get(StorageKind::PERSISTENT, derived_key)
.map(|bytes| {
<BTreeSet<(Vec<u8>, Vec<u8>)> as Decode>::decode(&mut &bytes[..]).unwrap()
})
StorageValueRef::persistent(derived_key.as_ref())
.get::<BTreeSet<(Vec<u8>, Vec<u8>)>>()
.flatten()
.unwrap_or_else(|| BTreeSet::new());

mapping.insert((derived_key.to_vec(), tree_prefix.to_vec()));

let encoded = mapping.encode();
offchain::local_storage_set(StorageKind::PERSISTENT, per_session_key.as_ref(), encoded.as_slice());
StorageValueRef::persistent(per_session_key.as_ref()).set(&mapping);
}

/// Forget `key` from the persistent index.
Expand All @@ -156,22 +215,19 @@ L: TrieLayout + Send,
fn get_index(&self, session_index: &[u8]) -> BTreeSet<(Vec<u8>, Vec<u8>)> {
let tracking_key: Vec<u8> = Self::derive_tracking_index_key(session_index);
let mapping: BTreeSet<(Vec<u8>, Vec<u8>)> =
offchain::local_storage_get(StorageKind::PERSISTENT, tracking_key.as_ref())
.map(|bytes| {
<BTreeSet<(Vec<u8>, Vec<u8>)> as Decode>::decode(&mut &bytes[..]).unwrap()
})
StorageValueRef::persistent(tracking_key.as_ref())
.get::<BTreeSet<(Vec<u8>, Vec<u8>)>>()
.flatten()
.unwrap_or_else(|| BTreeSet::new());
mapping
}

/// Set the (modified) key tracking index.
fn set_index(&self, session_index: &[u8], val: BTreeSet<(Vec<u8>, Vec<u8>)>) {
let tracking_key: Vec<u8> = Self::derive_tracking_index_key(session_index);
let val = val.encode();
offchain::local_storage_set(StorageKind::PERSISTENT, tracking_key.as_ref(), val.as_slice());
StorageValueRef::persistent(tracking_key.as_ref()).set(&val);
}


fn derive_tracking_index_key(session_index: &[u8]) -> Vec<u8> {
//@todo probably waaaaay to slow
// _ + _ + _ + _
Expand All @@ -184,7 +240,6 @@ L: TrieLayout + Send,
final_key
}


/// Concatenate the static prefix with a tree prefix.
fn derive_key(key: &[u8], tree_prefix: &[u8], session_index: &[u8]) -> Vec<u8> {
//@todo probably waaaaay to slow
Expand All @@ -201,7 +256,6 @@ L: TrieLayout + Send,
final_key.push(b'+');
final_key
}

}

impl<L, T> HashDB<L::Hash, T> for AlternativeDB<L>
Expand All @@ -223,23 +277,28 @@ where
prefix.encode().as_slice(),
self.session_index.as_slice(),
);
offchain::local_storage_get(StorageKind::PERSISTENT, derived_key.as_slice())
.map(|v| Decode::decode(&mut &v[..]).unwrap())

StorageValueRef::persistent(derived_key.as_ref())
.get::<T>()
.flatten()
}

fn contains(
&self,
key: &<<L as trie_db::TrieLayout>::Hash as Hasher>::Out,
prefix: Prefix,
) -> bool {
let key: &[u8] = AsRef::<[u8]>::as_ref(&*key);
// locking is not necessary since it is really just a read
// and that MUST be locked at the DB level already
// and the index is not involved here
let key: &[u8] = AsRef::<[u8]>::as_ref(key);
let derived_key: Vec<u8> = Self::derive_key(
key,
prefix.encode().as_slice(),
self.session_index.as_slice(),
);
offchain::local_storage_get(StorageKind::PERSISTENT, derived_key.as_slice()).is_some()
StorageValueRef::persistent(derived_key.as_ref())
.get::<Vec<u8>>()
.is_some()
}

fn insert(
Expand All @@ -250,14 +309,11 @@ where
let digest = <<L as trie_db::TrieLayout>::Hash as Hasher>::hash(value);
let prefix = prefix.encode();
let prefix = prefix.as_slice();
let derived_key: Vec<u8> = Self::derive_key(
digest.as_ref(),
prefix,
self.session_index.as_slice(),
);

offchain::local_storage_set(StorageKind::PERSISTENT, derived_key.as_ref(), value);
let derived_key: Vec<u8> =
Self::derive_key(digest.as_ref(), prefix, self.session_index.as_slice());

let _lock = StorageTrackingGuard::lock(INDEX_DB_LOCK_KEY);
StorageValueRef::persistent(derived_key.as_ref()).set(&value);
self.add_to_index(derived_key.as_ref(), prefix, self.session_index.as_slice());
digest
}
Expand All @@ -270,28 +326,23 @@ where
) {
let prefix = prefix.encode();
let prefix = prefix.as_slice();
let derived_key: Vec<u8> = Self::derive_key(
key.as_ref(),
prefix,
self.session_index.as_slice(),
);
let derived_key: Vec<u8> =
Self::derive_key(key.as_ref(), prefix, self.session_index.as_slice());

let value: Vec<u8> = <T as Encode>::encode(&value);
offchain::local_storage_set(StorageKind::PERSISTENT, derived_key.as_ref(), value.as_slice());
let _lock = StorageTrackingGuard::lock(INDEX_DB_LOCK_KEY);
StorageValueRef::persistent(derived_key.as_ref()).set(&value);
self.add_to_index(derived_key.as_ref(), prefix, self.session_index.as_slice());
}

fn remove(&mut self, key: &<<L as trie_db::TrieLayout>::Hash as Hasher>::Out, prefix: Prefix) {
let prefix = prefix.encode();
let prefix = prefix.as_slice();
let derived_key: Vec<u8> = Self::derive_key(
key.as_ref(),
prefix,
self.session_index.as_slice(),
);
let derived_key: Vec<u8> =
Self::derive_key(key.as_ref(), prefix, self.session_index.as_slice());

let _lock = StorageTrackingGuard::lock(INDEX_DB_LOCK_KEY);
self.remove_from_index(derived_key.as_ref(), prefix, self.session_index.as_slice());
// @todo it would be great if we could just erase this directly, but for now setting it to an empty slice must suffice
offchain::local_storage_set(StorageKind::PERSISTENT, derived_key.as_ref(), &[]);
StorageValueRef::persistent(derived_key.as_ref()).remove();
}
}

Expand Down

0 comments on commit 9537827

Please sign in to comment.