Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta/sled): simplify SledTree APIs #8203

Merged
merged 1 commit into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/meta/service/src/store/store_bare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,13 +669,15 @@ impl RaftStoreBare {
None => return Ok(vec![]),
};

let nodes = sm.nodes().range_kvs(..)?;
let nodes = sm.nodes().range(..)?;
let mut ns = vec![];

let ns = nodes
.into_iter()
.filter(|(node_id, _)| predicate(&membership, node_id))
.map(|(_, node)| node)
.collect();
for x in nodes {
let item = x?;
if predicate(&membership, &item.key()?) {
ns.push(item.value()?);
}
}

Ok(ns)
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/sled-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use sled_serde::SledSerde;
pub use sled_tree::AsKeySpace;
pub use sled_tree::AsTxnKeySpace;
pub use sled_tree::SledAsRef;
pub use sled_tree::SledItem;
pub use sled_tree::SledTree;
pub use sled_tree::TransactionSledTree;
pub use store::Store;
Expand Down
221 changes: 74 additions & 147 deletions src/meta/sled-store/src/sled_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt::Display;
use std::marker::PhantomData;
use std::ops::Bound;
use std::ops::Deref;
use std::ops::RangeBounds;

Expand All @@ -24,11 +23,13 @@ use common_meta_types::anyerror::AnyError;
use sled::transaction::ConflictableTransactionError;
use sled::transaction::TransactionResult;
use sled::transaction::TransactionalTree;
use sled::IVec;
use tracing::debug;
use tracing::warn;

use crate::sled::transaction::TransactionError;
use crate::store::Store;
use crate::SledBytesError;
use crate::SledKeySpace;

/// Get a ref to the key or to the value.
Expand Down Expand Up @@ -68,6 +69,38 @@ pub struct SledTree {
pub tree: sled::Tree,
}

/// A key-value item stored in sled store.
pub struct SledItem<KV: SledKeySpace> {
key: IVec,
value: IVec,
_p: PhantomData<KV>,
}

impl<KV: SledKeySpace> SledItem<KV> {
pub fn new(key: IVec, value: IVec) -> Self {
//
Self {
key,
value,
_p: Default::default(),
}
}

pub fn key(&self) -> Result<KV::K, SledBytesError> {
KV::deserialize_key(&self.key)
}

pub fn value(&self) -> Result<KV::V, SledBytesError> {
KV::deserialize_value(&self.value)
}

pub fn kv(&self) -> Result<(KV::K, KV::V), SledBytesError> {
let k = self.key()?;
let v = self.value()?;
Ok((k, v))
}
}

#[allow(clippy::type_complexity)]
impl SledTree {
/// Open SledTree
Expand Down Expand Up @@ -169,7 +202,10 @@ impl SledTree {
}

/// Retrieve the value of key.
pub fn get<KV: SledKeySpace>(&self, key: &KV::K) -> Result<Option<KV::V>, MetaStorageError> {
pub(crate) fn get<KV: SledKeySpace>(
&self,
key: &KV::K,
) -> Result<Option<KV::V>, MetaStorageError> {
let got = self
.tree
.get(KV::serialize_key(key)?)
Expand All @@ -183,55 +219,13 @@ impl SledTree {
Ok(v)
}

/// Retrieve the last key value pair.
pub fn last<KV>(&self) -> Result<Option<(KV::K, KV::V)>, MetaStorageError>
where KV: SledKeySpace {
let range = KV::serialize_range(&(Bound::Unbounded::<KV::K>, Bound::Unbounded::<KV::K>))?;

let mut it = self.tree.range(range).rev();
let last = it.next();
let last = match last {
None => {
return Ok(None);
}
Some(res) => res,
};

let last = last.context(|| "last")?;

let (k, v) = last;
let key = KV::deserialize_key(k)?;
let value = KV::deserialize_value(v)?;
Ok(Some((key, value)))
}

#[tracing::instrument(level = "debug", skip(self))]
pub async fn remove<KV>(
&self,
key: &KV::K,
flush: bool,
) -> Result<Option<KV::V>, MetaStorageError>
where
KV: SledKeySpace,
{
let removed = self
.tree
.remove(KV::serialize_key(key)?)
.context(|| format!("remove: {}", key,))?;

self.flush_async(flush).await?;

let removed = match removed {
Some(x) => Some(KV::deserialize_value(x)?),
None => None,
};

Ok(removed)
}

/// Delete kvs that are in `range`.
#[tracing::instrument(level = "debug", skip(self, range))]
pub async fn range_remove<KV, R>(&self, range: R, flush: bool) -> Result<(), MetaStorageError>
pub(crate) async fn range_remove<KV, R>(
&self,
range: R,
flush: bool,
) -> Result<(), MetaStorageError>
where
KV: SledKeySpace,
R: RangeBounds<KV::K>,
Expand All @@ -257,57 +251,12 @@ impl SledTree {
Ok(())
}

/// Get keys in `range`
pub fn range_keys<KV, R>(&self, range: R) -> Result<Vec<KV::K>, MetaStorageError>
where
KV: SledKeySpace,
R: RangeBounds<KV::K>,
{
let mut res = vec![];

let range_mes = self.range_message::<KV, _>(&range);

// Convert K range into sled::IVec range
let range = KV::serialize_range(&range)?;
for item in self.tree.range(range) {
let (k, _) = item.context(|| format!("range_get: {}", range_mes,))?;

let key = KV::deserialize_key(k)?;
res.push(key);
}

Ok(res)
}

/// Get key-values in `range`
pub fn range_kvs<KV, R>(&self, range: R) -> Result<Vec<(KV::K, KV::V)>, MetaStorageError>
where
KV: SledKeySpace,
R: RangeBounds<KV::K>,
{
let mut res = vec![];

let range_mes = self.range_message::<KV, _>(&range);

// Convert K range into sled::IVec range
let range = KV::serialize_range(&range)?;
for item in self.tree.range(range) {
let (k, v) = item.context(|| format!("range_get: {}", range_mes,))?;

let key = KV::deserialize_key(k)?;
let value = KV::deserialize_value(v)?;
res.push((key, value));
}

Ok(res)
}

/// Get key-values in `range`
pub fn range<KV, R>(
pub(crate) fn range<KV, R>(
&self,
range: R,
) -> Result<
impl DoubleEndedIterator<Item = Result<(KV::K, KV::V), MetaStorageError>>,
impl DoubleEndedIterator<Item = Result<SledItem<KV>, MetaStorageError>>,
MetaStorageError,
>
where
Expand All @@ -323,18 +272,21 @@ impl SledTree {
let it = it.map(move |item| {
let (k, v) = item.context(|| format!("range_get: {}", range_mes,))?;

let key = KV::deserialize_key(k)?;
let value = KV::deserialize_value(v)?;

Ok((key, value))
let item = SledItem::new(k, v);
Ok(item)
});

Ok(it)
}

/// Get key-values in with the same prefix
pub fn scan_prefix<KV>(&self, prefix: &KV::K) -> Result<Vec<(KV::K, KV::V)>, MetaStorageError>
where KV: SledKeySpace {
pub(crate) fn scan_prefix<KV>(
&self,
prefix: &KV::K,
) -> Result<Vec<(KV::K, KV::V)>, MetaStorageError>
where
KV: SledKeySpace,
{
let mut res = vec![];

let mes = || format!("scan_prefix: {}", prefix);
Expand All @@ -351,31 +303,8 @@ impl SledTree {
Ok(res)
}

/// Get values of key in `range`
pub fn range_values<KV, R>(&self, range: R) -> Result<Vec<KV::V>, MetaStorageError>
where
KV: SledKeySpace,
R: RangeBounds<KV::K>,
{
let mut res = vec![];

let mes = || format!("range_get: {}", self.range_message::<KV, _>(&range));

// Convert K range into sled::IVec range
let range = KV::serialize_range(&range)?;

for item in self.tree.range(range) {
let (_, v) = item.context(mes)?;

let ent = KV::deserialize_value(v)?;
res.push(ent);
}

Ok(res)
}

/// Append many key-values into SledTree.
pub async fn append<KV, T>(&self, kvs: &[T]) -> Result<(), MetaStorageError>
pub(crate) async fn append<KV, T>(&self, kvs: &[T]) -> Result<(), MetaStorageError>
where
KV: SledKeySpace,
T: SledAsRef<KV::K, KV::V>,
Expand All @@ -401,7 +330,7 @@ impl SledTree {

/// Insert a single kv.
/// Returns the last value if it is set.
pub async fn insert<KV>(
pub(crate) async fn insert<KV>(
&self,
key: &KV::K,
value: &KV::V,
Expand Down Expand Up @@ -563,15 +492,15 @@ impl<'a, KV: SledKeySpace> AsKeySpace<'a, KV> {
}

pub fn last(&self) -> Result<Option<(KV::K, KV::V)>, MetaStorageError> {
self.inner.last::<KV>()
}
let mut it = self.inner.range::<KV, _>(..)?.rev();
let last = it.next();
let last = match last {
None => return Ok(None),
Some(x) => x,
};

pub async fn remove(
&self,
key: &KV::K,
flush: bool,
) -> Result<Option<KV::V>, MetaStorageError> {
self.inner.remove::<KV>(key, flush).await
let kv = last?.kv()?;
Ok(Some(kv))
}

pub async fn range_remove<R>(&self, range: R, flush: bool) -> Result<(), MetaStorageError>
Expand All @@ -587,16 +516,11 @@ impl<'a, KV: SledKeySpace> AsKeySpace<'a, KV> {
}
}

pub fn range_keys<R>(&self, range: R) -> Result<Vec<KV::K>, MetaStorageError>
where R: RangeBounds<KV::K> {
self.inner.range_keys::<KV, R>(range)
}

pub fn range<R>(
&self,
range: R,
) -> Result<
impl DoubleEndedIterator<Item = Result<(KV::K, KV::V), MetaStorageError>>,
impl DoubleEndedIterator<Item = Result<SledItem<KV>, MetaStorageError>>,
MetaStorageError,
>
where
Expand All @@ -605,18 +529,21 @@ impl<'a, KV: SledKeySpace> AsKeySpace<'a, KV> {
self.inner.range::<KV, R>(range)
}

pub fn range_kvs<R>(&self, range: R) -> Result<Vec<(KV::K, KV::V)>, MetaStorageError>
where R: RangeBounds<KV::K> {
self.inner.range_kvs::<KV, R>(range)
}

pub fn scan_prefix(&self, prefix: &KV::K) -> Result<Vec<(KV::K, KV::V)>, MetaStorageError> {
self.inner.scan_prefix::<KV>(prefix)
}

pub fn range_values<R>(&self, range: R) -> Result<Vec<KV::V>, MetaStorageError>
where R: RangeBounds<KV::K> {
self.inner.range_values::<KV, R>(range)
let it = self.inner.range::<KV, R>(range)?;
let mut res = vec![];
for r in it {
let item = r?;
let v = item.value()?;
res.push(v);
}

Ok(res)
}

pub async fn append<T>(&self, kvs: &[T]) -> Result<(), MetaStorageError>
Expand Down
Loading