From f41fa13eb57aa883944476e29ee81a6708e67b4d Mon Sep 17 00:00:00 2001 From: Mike Watters Date: Mon, 21 Dec 2020 10:13:53 -0700 Subject: [PATCH] make tuple::hca::HighContentionAllocator be Send (#214) make tuple::hca::HighContentionAllocator be Send - thread rng is not `Send`; instead, seed a `SmallRng` from it, which is `Send` - use of allocation mutex was causing non-`Send`ability despite being dropped before any `.await` boundaries; wrapped in blocks. - update related test to use `.boxed()` instead of `.boxed_local()` --- foundationdb/Cargo.toml | 2 +- foundationdb/src/tuple/hca.rs | 48 +++++++++++++++++++++-------------- foundationdb/tests/hca.rs | 4 +-- 3 files changed, 32 insertions(+), 22 deletions(-) diff --git a/foundationdb/Cargo.toml b/foundationdb/Cargo.toml index 844407dc..a244ca0c 100644 --- a/foundationdb/Cargo.toml +++ b/foundationdb/Cargo.toml @@ -45,7 +45,7 @@ foundationdb-gen = { version = "0.5.1", path = "../foundationdb-gen", default-fe foundationdb-sys = { version = "0.5.1", path = "../foundationdb-sys", default-features = false } futures = "0.3.1" memchr = "2.2.1" -rand = "0.7.2" +rand = { version = "0.7.2", features = ["default", "small_rng"] } static_assertions = "1.1.0" uuid = { version = "0.8.1", optional = true } num-bigint = { version = "0.3.0", optional = true } diff --git a/foundationdb/src/tuple/hca.rs b/foundationdb/src/tuple/hca.rs index 4e8326bd..59b05e2c 100644 --- a/foundationdb/src/tuple/hca.rs +++ b/foundationdb/src/tuple/hca.rs @@ -28,7 +28,7 @@ use std::fmt; use std::sync::{Mutex, PoisonError}; use futures::future; -use rand::Rng; +use rand::{self, rngs::SmallRng, Error as RandError, Rng, SeedableRng}; use crate::options::{ConflictRangeType, MutationType, TransactionOption}; use crate::tuple::{PackError, Subspace}; @@ -41,6 +41,7 @@ pub enum HcaError { PackError(PackError), InvalidDirectoryLayerMetadata, PoisonError, + RandError(RandError), } impl fmt::Debug for HcaError { @@ -52,6 +53,7 @@ impl fmt::Debug for HcaError { write!(f, "invalid directory layer metadata") } HcaError::PoisonError => write!(f, "mutex poisoned"), + HcaError::RandError(err) => err.fmt(f), } } } @@ -71,6 +73,11 @@ impl From> for HcaError { Self::PoisonError } } +impl From for HcaError { + fn from(err: RandError) -> Self { + Self::RandError(err) + } +} impl TransactError for HcaError { fn try_into_fdb_error(self) -> Result { @@ -114,7 +121,7 @@ impl HighContentionAllocator { reverse: true, ..RangeOption::default() }; - let mut rng = rand::thread_rng(); + let mut rng = SmallRng::from_rng(&mut rand::thread_rng())?; loop { let kvs = trx.get_range(&counters_range, 1, true).await?; @@ -130,17 +137,18 @@ impl HighContentionAllocator { let window = loop { let counters_start = self.counters.subspace(&start); - let mutex_guard = self.allocation_mutex.lock()?; - if window_advanced { - trx.clear_range(self.counters.bytes(), counters_start.bytes()); - trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?; - trx.clear_range(self.recent.bytes(), self.recent.subspace(&start).bytes()); - } - - // Increment the allocation count for the current window - trx.atomic_op(counters_start.bytes(), ONE_BYTES, MutationType::Add); - let count_future = trx.get(counters_start.bytes(), true); - drop(mutex_guard); + let count_future = { + let _mutex_guard = self.allocation_mutex.lock()?; + if window_advanced { + trx.clear_range(self.counters.bytes(), counters_start.bytes()); + trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?; + trx.clear_range(self.recent.bytes(), self.recent.subspace(&start).bytes()); + }; + + // Increment the allocation count for the current window + trx.atomic_op(counters_start.bytes(), ONE_BYTES, MutationType::Add); + trx.get(counters_start.bytes(), true) + }; let count_value = count_future.await?; let count = if let Some(count_value) = count_value { @@ -171,12 +179,14 @@ impl HighContentionAllocator { let candidate: i64 = rng.gen_range(start, start + window); let recent_candidate = self.recent.subspace(&candidate); - let mutex_guard = self.allocation_mutex.lock()?; - let latest_counter = trx.get_range(&counters_range, 1, true); - let candidate_value = trx.get(recent_candidate.bytes(), false); - trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?; - trx.set(recent_candidate.bytes(), &[]); - drop(mutex_guard); + let (latest_counter, candidate_value) = { + let _mutex_guard = self.allocation_mutex.lock()?; + let latest_counter = trx.get_range(&counters_range, 1, true); + let candidate_value = trx.get(recent_candidate.bytes(), false); + trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?; + trx.set(recent_candidate.bytes(), &[]); + (latest_counter, candidate_value) + }; let (latest_counter, candidate_value) = future::try_join(latest_counter, candidate_value).await?; diff --git a/foundationdb/tests/hca.rs b/foundationdb/tests/hca.rs index d1be639f..bf37b7c1 100644 --- a/foundationdb/tests/hca.rs +++ b/foundationdb/tests/hca.rs @@ -69,9 +69,9 @@ async fn test_hca_concurrent_allocations_async() -> FdbResult<()> { let hca = HighContentionAllocator::new(Subspace::from_bytes(KEY)); let all_ints: Vec = future::try_join_all((0..N).map(|_| { - db.transact_boxed_local( + db.transact_boxed( &hca, - move |tx, hca| hca.allocate(tx).boxed_local(), + move |tx, hca| hca.allocate(tx).boxed(), TransactOption::default(), ) }))