Skip to content

Commit

Permalink
feat(bindingtester,foundationdb): bindingtester is almost ok
Browse files Browse the repository at this point in the history
  • Loading branch information
PierreZ committed Apr 16, 2021
1 parent a6164c6 commit a9e07cd
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 209 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
toolchain: ["1.40.0", "stable"]
toolchain: ["1.51.0", "stable"]

runs-on: ${{ matrix.os }}

Expand Down
200 changes: 108 additions & 92 deletions foundationdb-bindingtester/src/main.rs

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion foundationdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ num-bigint = { version = "0.3.0", optional = true }
byteorder = "1.3.2"
async-trait = "0.1.48"
async-recursion = "0.3.2"
parking_lot = "0.11.1"

[dev-dependencies]
lazy_static = "1.4.0"
Expand Down
157 changes: 85 additions & 72 deletions foundationdb/src/directory/directory_layer.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
use std::cmp::Ordering;

use async_recursion::async_recursion;
use async_trait::async_trait;
use byteorder::{LittleEndian, WriteBytesExt};

use crate::directory::directory_partition::DirectoryPartition;
use crate::directory::directory_subspace::DirectorySubspace;
use crate::directory::error::DirectoryError;
use crate::directory::node::Node;
use crate::directory::{compare_slice, Directory, DirectoryOutput};
use crate::future::{FdbKeyValue, FdbSlice, FdbValue, FdbValuesIter};
use crate::future::FdbSlice;
use crate::tuple::hca::HighContentionAllocator;
use crate::tuple::{Subspace, TuplePack};
use crate::RangeOption;
use crate::{FdbResult, Transaction};
use futures::prelude::stream::{Iter, Next};
use futures::stream::StreamExt;
use async_recursion::async_recursion;
use async_trait::async_trait;
use byteorder::{LittleEndian, WriteBytesExt};
use futures::try_join;
use futures::{join, TryStreamExt};
use parking_lot::{FairMutex, RawMutex};
use std::cmp::Ordering;
use std::option::Option::Some;
use std::rc::Rc;
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};

pub(crate) const DEFAULT_SUB_DIRS: i64 = 0;
const MAJOR_VERSION: u32 = 1;
const MINOR_VERSION: u32 = 0;
const PATCH_VERSION: u32 = 0;
pub(crate) const DEFAULT_NODE_PREFIX: &[u8] = b"\xFE";
pub const DEFAULT_NODE_PREFIX: &[u8] = b"\xFE";
const DEFAULT_HCA_PREFIX: &[u8] = b"hca";
pub(crate) const PARTITION_LAYER: &[u8] = b"partition";

/// Directories are identified by hierarchical paths analogous to the paths
/// in a Unix-like file system. A path is represented as a List of strings.
/// Each directory has an associated subspace used to store its content. The
/// layer maps each path to a short prefix used for the corresponding
/// subspace. In effect, directories provide a level of indirection for
/// access to subspaces.
#[derive(Debug, Clone)]
pub struct DirectoryLayer {
pub root_node: Subspace,
Expand Down Expand Up @@ -71,6 +69,10 @@ impl DirectoryLayer {
}
}

pub fn get_path(&self) -> Vec<String> {
self.path.clone()
}

fn node_with_optional_prefix(&self, prefix: Option<FdbSlice>) -> Option<Subspace> {
match prefix {
None => None,
Expand All @@ -94,10 +96,13 @@ impl DirectoryLayer {
// walking through the provided path
for path_name in path.iter() {
node.current_path.push(path_name.clone());
let key = node
.subspace
.unwrap()
.subspace(&(DEFAULT_SUB_DIRS, path_name.to_owned()));
let node_subspace = match node.subspace {
// unreachable because on first iteration, it is set to root_node,
// on other iteration, `node.exists` is checking for the subspace's value
None => unreachable!("node's subspace is not set"),
Some(s) => s,
};
let key = node_subspace.subspace(&(DEFAULT_SUB_DIRS, path_name.to_owned()));

// finding the next node
let fdb_slice_value = trx.get(key.bytes(), false).await?;
Expand Down Expand Up @@ -141,8 +146,6 @@ impl DirectoryLayer {
) -> Result<DirectoryOutput, DirectoryError> {
let prefix: Vec<u8> = self.node_subspace.unpack(node.bytes())?;

println!("prefix: {:?}", &prefix);

if layer.eq(PARTITION_LAYER) {
Ok(DirectoryOutput::DirectoryPartition(
DirectoryPartition::new(self.to_absolute_path(&path), prefix, self.clone()),
Expand Down Expand Up @@ -188,8 +191,13 @@ impl DirectoryLayer {
// TODO true or false?
if node.is_in_partition(false) {
let sub_path = node.get_partition_subpath();
let subspace_node = match node.subspace {
// not reachable because `self.find` is creating a node with a subspace,
None => unreachable!("node's subspace is not set"),
Some(s) => s,
};
let dir_space =
self.contents_of_node(node.subspace.unwrap(), node.current_path, node.layer)?;
self.contents_of_node(subspace_node, node.current_path, node.layer)?;
dir_space
.create_or_open(trx, sub_path.to_owned(), prefix, layer)
.await?;
Expand Down Expand Up @@ -223,8 +231,14 @@ impl DirectoryLayer {
},
}

let subspace_node = match node.to_owned().subspace {
// not reachable because `self.find` is creating a node with a subspace,
None => unreachable!("node's subspace is not set"),
Some(s) => s,
};

self.contents_of_node(
node.subspace.as_ref().unwrap().clone(),
subspace_node.clone(),
node.target_path.to_owned(),
layer.unwrap_or(vec![]),
)
Expand All @@ -241,14 +255,11 @@ impl DirectoryLayer {
if !allow_create {
return Err(DirectoryError::DirectoryDoesNotExists);
}

let layer = layer.unwrap_or(vec![]);

self.check_version(trx, allow_create).await?;
let new_prefix = self.get_prefix(trx, prefix.clone()).await?;

println!("new_prefix: {:?}", &new_prefix);

let is_prefix_free = self
.is_prefix_free(trx, new_prefix.to_owned(), !prefix.is_some())
.await?;
Expand All @@ -258,18 +269,14 @@ impl DirectoryLayer {
}

let parent_node = self.get_parent_node(trx, path.to_owned()).await?;
println!("parent_node: {:?}", &parent_node);
let node = self.node_with_prefix(&new_prefix);
println!("node: {:?}", &node);

let key = parent_node.subspace(&(DEFAULT_SUB_DIRS, path.last().unwrap()));

trx.set(&key.bytes(), &new_prefix);
trx.set(node.subspace(&b"layer".to_vec()).bytes(), &layer);
println!(
"writing layer in row {:?}",
node.subspace(&b"layer".to_vec()).bytes()
);

println!("@@@ created directory on path {:?}", &path);

self.contents_of_node(node, path.to_owned(), layer.to_owned())
}
Expand All @@ -282,12 +289,9 @@ impl DirectoryLayer {
if path.len() > 1 {
let (_, list) = path.split_last().unwrap();

println!("searching for parent");

let parent = self
.create_or_open_internal(trx, list.to_vec(), None, None, true, true)
.await?;
println!("found a parent: {:?}", parent.bytes());
Ok(self.node_with_prefix(&parent.bytes().to_vec()))
} else {
Ok(self.root_node.clone())
Expand Down Expand Up @@ -329,27 +333,21 @@ impl DirectoryLayer {
if key.starts_with(self.node_subspace.bytes()) {
return Ok(Some(self.root_node.clone()));
}
// FIXME: got sometimes an error where the scan include another layer...
// https://github.com/apple/foundationdb/blob/master/bindings/flow/DirectoryLayer.actor.cpp#L186-L194
let (begin_range, _) = self.node_subspace.range();
let mut end_range = self.node_subspace.pack(&key);
// simulate keyAfter
end_range.push(0);

// checking range
let result = trx
.get_range(&RangeOption::from((begin_range, end_range)), 1, snapshot)
.await?;
let key = self.node_subspace.pack(&key);

if result.len() > 0 {
let previous_prefix: (String) =
self.node_subspace.unpack(result.get(0).unwrap().key())?;
if key.starts_with(previous_prefix.as_bytes()) {
return Ok(Some(self.node_with_prefix(&(previous_prefix))));
// checking range
match trx.get(&key, snapshot).await? {
None => Ok(None),
Some(_) => {
let previous_prefix: Vec<u8> = self.node_subspace.unpack(key.as_slice())?;
if key.starts_with(&previous_prefix) {
Ok(Some(self.node_with_prefix(&previous_prefix)))
} else {
Ok(None)
}
}
}

Ok(None)
}

async fn get_prefix(
Expand Down Expand Up @@ -450,19 +448,21 @@ impl DirectoryLayer {
) -> Result<bool, DirectoryError> {
self.check_version(trx, false).await?;

if path.is_empty() {
return Err(DirectoryError::NoPathProvided);
}

let node = self.find(trx, path.to_owned()).await?;

if !node.exists() {
return Ok(false);
}

if node.is_in_partition(false) {
let subspace_node = match node.subspace {
// not reachable because `self.find` is creating a node with a subspace,
None => unreachable!("node's subspace is not set"),
Some(ref s) => s.clone(),
};

let directory_partition = self.contents_of_node(
node.clone().subspace.unwrap(),
subspace_node,
node.current_path.to_owned(),
node.layer.to_owned(),
)?;
Expand All @@ -486,8 +486,14 @@ impl DirectoryLayer {
return Err(DirectoryError::PathDoesNotExists);
}
if node.is_in_partition(true) {
let subspace_node = match node.subspace {
// not reachable because `self.find` is creating a node with a subspace.
None => unreachable!("node's subspace is not set"),
Some(ref s) => s.clone(),
};

let directory_partition = self.contents_of_node(
node.clone().subspace.unwrap(),
subspace_node,
node.current_path.to_owned(),
node.layer.to_owned(),
)?;
Expand Down Expand Up @@ -528,8 +534,14 @@ impl DirectoryLayer {
return Err(DirectoryError::CannotMoveBetweenPartition);
}

let subspace_new_node = match new_node.subspace {
// not reachable because `self.find` is creating a node with a subspace,
None => unreachable!("node's subspace is not set"),
Some(ref s) => s.clone(),
};

let directory_partition = self.contents_of_node(
new_node.clone().subspace.unwrap(),
subspace_new_node,
new_node.current_path.to_owned(),
new_node.layer.to_owned(),
)?;
Expand Down Expand Up @@ -557,10 +569,14 @@ impl DirectoryLayer {
return Err(DirectoryError::ParentDirDoesNotExists);
}

let key = parent_node
.subspace
.unwrap()
.subspace(&(DEFAULT_SUB_DIRS, new_path.to_owned().last().unwrap()));
let subspace_parent_node = match parent_node.subspace {
// not reachable because `self.find` is creating a node with a subspace,
None => unreachable!("node's subspace is not set"),
Some(ref s) => s.clone(),
};

let key =
subspace_parent_node.subspace(&(DEFAULT_SUB_DIRS, new_path.to_owned().last().unwrap()));
let value: Vec<u8> = self
.node_subspace
.unpack(old_node.subspace.clone().unwrap().bytes())?;
Expand Down Expand Up @@ -611,6 +627,7 @@ impl DirectoryLayer {
}

let node = self.find(&trx, path.to_owned()).await?;
dbg!(&node);
if !node.exists() {
return if fail_on_nonexistent {
Err(DirectoryError::DirectoryDoesNotExists)
Expand All @@ -628,7 +645,7 @@ impl DirectoryLayer {
try_join!(
self.remove_recursive(trx, node.subspace.unwrap().clone()),
self.remove_from_parent(trx, path.to_owned())
);
)?;

Ok(true)
}
Expand All @@ -647,16 +664,12 @@ impl DirectoryLayer {

let range = trx.get_range(&range_option, 1, false).await?;
let has_more = range.more();
let range: Arc<FairMutex<FdbValuesIter>> = Arc::new(FairMutex::new(range.into_iter()));

loop {
let value_row = match range.lock().next() {
None => break,
Some(next_key_value) => next_key_value.value().to_vec(),
};

let sub_node = self.node_with_prefix(&value_row);
for row_key in range {
let sub_node = self.node_with_prefix(&row_key.value());
self.remove_recursive(trx, sub_node).await?;
begin = row_key.key().pack_to_vec();
begin.push(0);
}

if !has_more {
Expand All @@ -667,7 +680,7 @@ impl DirectoryLayer {
let mut node_prefix: Vec<u8> = self.node_subspace.unpack(node_sub.bytes())?;

// equivalent of strinc?
node_prefix.remove(node_prefix.len());
node_prefix.remove(node_prefix.len() - 1);

trx.clear_range(node_prefix.as_slice(), node_prefix.as_slice());
trx.clear_subspace_range(&node_sub);
Expand Down
22 changes: 18 additions & 4 deletions foundationdb/src/directory/directory_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ use crate::directory::directory_layer::{DirectoryLayer, PARTITION_LAYER};
use crate::directory::directory_subspace::DirectorySubspace;
use crate::directory::error::DirectoryError;
use crate::directory::{Directory, DirectoryOutput};
use crate::tuple::Subspace;
use crate::tuple::{Subspace, TuplePack};
use crate::Transaction;
use async_trait::async_trait;

/// A `DirectoryPartition` is a DirectorySubspace whose prefix is preprended to all of its descendant
/// directories's prefixes. It cannot be used as a Subspace. Instead, you must create at
/// least one subdirectory to store content.
#[derive(Debug, Clone)]
pub struct DirectoryPartition {
directory_subspace: DirectorySubspace,
parent_directory_layer: DirectoryLayer,
Expand All @@ -18,11 +22,11 @@ impl DirectoryPartition {

let mut d = DirectoryPartition {
directory_subspace: DirectorySubspace::new(
path,
prefix,
path.clone(),
prefix.clone(),
&DirectoryLayer::new(
Subspace::from_bytes(&node_subspace_bytes),
Subspace::from_bytes(prefix.as_slice()),
Subspace::from_bytes(prefix.clone().as_slice()),
false,
),
Vec::from(PARTITION_LAYER),
Expand All @@ -36,6 +40,16 @@ impl DirectoryPartition {
}
}

impl DirectoryPartition {
pub fn get_path(&self) -> Vec<String> {
self.directory_subspace.get_path()
}

pub fn get_layer(&self) -> Vec<u8> {
"partition".pack_to_vec()
}
}

#[async_trait]
impl Directory for DirectoryPartition {
async fn create_or_open(
Expand Down
Loading

0 comments on commit a9e07cd

Please sign in to comment.