Skip to content

Commit

Permalink
make tuple::hca::HighContentionAllocator be Send (#214)
Browse files Browse the repository at this point in the history
make tuple::hca::HighContentionAllocator be Send

- thread rng is not `Send`; instead, seed a `SmallRng` from it, which
  is `Send`
- use of allocation mutex was causing non-`Send`ability despite being
  dropped before any `.await` boundaries; wrapped in blocks.
- update related test to use `.boxed()` instead of `.boxed_local()`
  • Loading branch information
zerth authored Dec 21, 2020
1 parent 85474f5 commit f41fa13
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 22 deletions.
2 changes: 1 addition & 1 deletion foundationdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ foundationdb-gen = { version = "0.5.1", path = "../foundationdb-gen", default-fe
foundationdb-sys = { version = "0.5.1", path = "../foundationdb-sys", default-features = false }
futures = "0.3.1"
memchr = "2.2.1"
rand = "0.7.2"
rand = { version = "0.7.2", features = ["default", "small_rng"] }
static_assertions = "1.1.0"
uuid = { version = "0.8.1", optional = true }
num-bigint = { version = "0.3.0", optional = true }
Expand Down
48 changes: 29 additions & 19 deletions foundationdb/src/tuple/hca.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::fmt;
use std::sync::{Mutex, PoisonError};

use futures::future;
use rand::Rng;
use rand::{self, rngs::SmallRng, Error as RandError, Rng, SeedableRng};

use crate::options::{ConflictRangeType, MutationType, TransactionOption};
use crate::tuple::{PackError, Subspace};
Expand All @@ -41,6 +41,7 @@ pub enum HcaError {
PackError(PackError),
InvalidDirectoryLayerMetadata,
PoisonError,
RandError(RandError),
}

impl fmt::Debug for HcaError {
Expand All @@ -52,6 +53,7 @@ impl fmt::Debug for HcaError {
write!(f, "invalid directory layer metadata")
}
HcaError::PoisonError => write!(f, "mutex poisoned"),
HcaError::RandError(err) => err.fmt(f),
}
}
}
Expand All @@ -71,6 +73,11 @@ impl<T> From<PoisonError<T>> for HcaError {
Self::PoisonError
}
}
impl From<RandError> for HcaError {
fn from(err: RandError) -> Self {
Self::RandError(err)
}
}

impl TransactError for HcaError {
fn try_into_fdb_error(self) -> Result<FdbError, Self> {
Expand Down Expand Up @@ -114,7 +121,7 @@ impl HighContentionAllocator {
reverse: true,
..RangeOption::default()
};
let mut rng = rand::thread_rng();
let mut rng = SmallRng::from_rng(&mut rand::thread_rng())?;

loop {
let kvs = trx.get_range(&counters_range, 1, true).await?;
Expand All @@ -130,17 +137,18 @@ impl HighContentionAllocator {
let window = loop {
let counters_start = self.counters.subspace(&start);

let mutex_guard = self.allocation_mutex.lock()?;
if window_advanced {
trx.clear_range(self.counters.bytes(), counters_start.bytes());
trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
trx.clear_range(self.recent.bytes(), self.recent.subspace(&start).bytes());
}

// Increment the allocation count for the current window
trx.atomic_op(counters_start.bytes(), ONE_BYTES, MutationType::Add);
let count_future = trx.get(counters_start.bytes(), true);
drop(mutex_guard);
let count_future = {
let _mutex_guard = self.allocation_mutex.lock()?;
if window_advanced {
trx.clear_range(self.counters.bytes(), counters_start.bytes());
trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
trx.clear_range(self.recent.bytes(), self.recent.subspace(&start).bytes());
};

// Increment the allocation count for the current window
trx.atomic_op(counters_start.bytes(), ONE_BYTES, MutationType::Add);
trx.get(counters_start.bytes(), true)
};

let count_value = count_future.await?;
let count = if let Some(count_value) = count_value {
Expand Down Expand Up @@ -171,12 +179,14 @@ impl HighContentionAllocator {
let candidate: i64 = rng.gen_range(start, start + window);
let recent_candidate = self.recent.subspace(&candidate);

let mutex_guard = self.allocation_mutex.lock()?;
let latest_counter = trx.get_range(&counters_range, 1, true);
let candidate_value = trx.get(recent_candidate.bytes(), false);
trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
trx.set(recent_candidate.bytes(), &[]);
drop(mutex_guard);
let (latest_counter, candidate_value) = {
let _mutex_guard = self.allocation_mutex.lock()?;
let latest_counter = trx.get_range(&counters_range, 1, true);
let candidate_value = trx.get(recent_candidate.bytes(), false);
trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
trx.set(recent_candidate.bytes(), &[]);
(latest_counter, candidate_value)
};

let (latest_counter, candidate_value) =
future::try_join(latest_counter, candidate_value).await?;
Expand Down
4 changes: 2 additions & 2 deletions foundationdb/tests/hca.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ async fn test_hca_concurrent_allocations_async() -> FdbResult<()> {
let hca = HighContentionAllocator::new(Subspace::from_bytes(KEY));

let all_ints: Vec<i64> = future::try_join_all((0..N).map(|_| {
db.transact_boxed_local(
db.transact_boxed(
&hca,
move |tx, hca| hca.allocate(tx).boxed_local(),
move |tx, hca| hca.allocate(tx).boxed(),
TransactOption::default(),
)
}))
Expand Down

0 comments on commit f41fa13

Please sign in to comment.