Skip to content

Commit

Permalink
Merge pull request #8226 from dantengsky/fix-deadlock-semaphore
Browse files Browse the repository at this point in the history
fix: dead lock caused by incorrect semaphore permit control
  • Loading branch information
BohuTANG authored Oct 15, 2022
2 parents 7c0d1f3 + 0f33a05 commit 6b8fa0f
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 41 deletions.
46 changes: 37 additions & 9 deletions src/common/base/src/base/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_exception::Result;
use tokio::runtime::Builder;
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tokio::sync::OwnedSemaphorePermit;
use tokio::sync::Semaphore;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -191,17 +192,45 @@ impl Runtime {
self.handle.block_on(future).flatten()
}

// This function make the futures always run fits the max_concurrent with a Semaphore latch.
// This is not same as: `futures::stream::iter(futures).buffer_unordered(max_concurrent)`:
// The comparison of them please see https://github.com/BohuTANG/joint
pub async fn try_spawn_batch<F>(
// For each future of `futures`, before being executed
// a permit will be acquired from the semaphore, and released when it is done
pub async fn try_spawn_batch<Fut>(
&self,
semaphore: Semaphore,
futures: impl IntoIterator<Item = Fut>,
) -> Result<Vec<JoinHandle<Fut::Output>>>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let semaphore = Arc::new(semaphore);
let iter = futures.into_iter().map(|v| {
|permit| {
let _permit = permit;
v
}
});
self.try_spawn_batch_with_owned_semaphore(semaphore, iter)
.await
}

// For each future of `futures`, before being executed
// a permit will be acquired from the semaphore, and released when it is done

// Please take care using the `semaphore`.
// If sub task may be spawned in the `futures`, and uses the
// clone of semaphore to acquire permits, please release the permits on time,
// or give sufficient(but not abundant, of course) permits, to tolerant the
// maximum degree of parallelism, otherwise, it may lead to deadlock.
pub async fn try_spawn_batch_with_owned_semaphore<F, Fut>(
&self,
semaphore: Arc<Semaphore>,
futures: impl IntoIterator<Item = F>,
) -> Result<Vec<JoinHandle<F::Output>>>
) -> Result<Vec<JoinHandle<Fut::Output>>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
F: FnOnce(OwnedSemaphorePermit) -> Fut + Send + 'static,
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let iter = futures.into_iter();
let mut handlers =
Expand All @@ -220,8 +249,7 @@ impl Runtime {
})?;
let handler = self.handle.spawn(async move {
// take the ownership of the permit, (implicitly) drop it when task is done
let _pin = permit;
fut.await
fut(permit).await
});
handlers.push(handler)
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/base/tests/it/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async fn test_runtime_try_spawn_batch() -> Result<()> {
futs.push(mock_get_page(i));
}

let max_concurrency = Arc::new(Semaphore::new(3));
let max_concurrency = Semaphore::new(3);
let handlers = runtime.try_spawn_batch(max_concurrency, futs).await?;
let result = futures::future::try_join_all(handlers).await.unwrap();
assert_eq!(result.len(), 20);
Expand Down
6 changes: 2 additions & 4 deletions src/query/storages/fuse/src/fuse_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,14 @@ impl FuseFile {
});

// 1.2 build the runtime.
let semaphore = Arc::new(Semaphore::new(max_io_requests));
let semaphore = Semaphore::new(max_io_requests);
let file_runtime = Arc::new(Runtime::with_worker_threads(
max_runtime_threads,
Some("fuse-req-remove-files-worker".to_owned()),
)?);

// 1.3 spawn all the tasks to the runtime.
let join_handlers = file_runtime
.try_spawn_batch(semaphore.clone(), tasks)
.await?;
let join_handlers = file_runtime.try_spawn_batch(semaphore, tasks).await?;

// 1.4 get all the result.
future::try_join_all(join_handlers).await.map_err(|e| {
Expand Down
6 changes: 2 additions & 4 deletions src/query/storages/fuse/src/fuse_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,14 @@ impl FuseSegmentIO {
});

// 1.2 build the runtime.
let semaphore = Arc::new(Semaphore::new(max_io_requests));
let semaphore = Semaphore::new(max_io_requests);
let segments_runtime = Arc::new(Runtime::with_worker_threads(
max_runtime_threads,
Some("fuse-req-segments-worker".to_owned()),
)?);

// 1.3 spawn all the tasks to the runtime.
let join_handlers = segments_runtime
.try_spawn_batch(semaphore.clone(), tasks)
.await?;
let join_handlers = segments_runtime.try_spawn_batch(semaphore, tasks).await?;

// 1.4 get all the result.
let joint: Vec<Result<Arc<SegmentInfo>>> = future::try_join_all(join_handlers)
Expand Down
6 changes: 2 additions & 4 deletions src/query/storages/fuse/src/fuse_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,14 @@ impl FuseSnapshotIO {
});

// 1.2 build the runtime.
let semaphore = Arc::new(Semaphore::new(max_io_requests));
let semaphore = Semaphore::new(max_io_requests);
let snapshot_runtime = Arc::new(Runtime::with_worker_threads(
max_runtime_threads,
Some("fuse-req-snapshots-worker".to_owned()),
)?);

// 1.3 spawn all the tasks to the runtime.
let join_handlers = snapshot_runtime
.try_spawn_batch(semaphore.clone(), tasks)
.await?;
let join_handlers = snapshot_runtime.try_spawn_batch(semaphore, tasks).await?;

// 1.4 get all the result.
future::try_join_all(join_handlers)
Expand Down
54 changes: 35 additions & 19 deletions src/query/storages/fuse/src/pruning/pruning_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use common_base::base::tokio::sync::OwnedSemaphorePermit;
use common_base::base::tokio::sync::Semaphore;
use common_base::base::tokio::task::JoinHandle;
use common_base::base::Runtime;
Expand Down Expand Up @@ -95,11 +96,6 @@ impl BlockPruner {
let max_concurrency = {
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;
// Prevent us from miss-configured max_storage_io_requests setting, e.g. 0
//
// note that inside the segment pruning, the SAME Semaphore is used to
// control the concurrency of block pruning, to prevent us from waiting for
// a permit while hold the last permit, at least 2 permits should be
// given to this semaphore.
let v = std::cmp::max(max_io_requests, 10);
if v > max_io_requests {
warn!(
Expand Down Expand Up @@ -136,19 +132,20 @@ impl BlockPruner {
None
} else {
segments.next().map(|(segment_idx, segment_location)| {
Self::prune_segment(
dal.clone(),
pruning_ctx.clone(),
segment_idx,
segment_location,
)
let segment_location = segment_location;
let dal = dal.clone();
let pruning_ctx = pruning_ctx.clone();
move |permit| async move {
Self::prune_segment(permit, dal, pruning_ctx, segment_idx, segment_location)
.await
}
})
}
});

// 4.2 spawns the segment pruning tasks, with concurrency control
let join_handlers = pruning_runtime
.try_spawn_batch(semaphore.clone(), tasks)
.try_spawn_batch_with_owned_semaphore(semaphore.clone(), tasks)
.await?;

// 4.3 flatten the results
Expand All @@ -173,6 +170,7 @@ impl BlockPruner {
#[inline]
#[tracing::instrument(level = "debug", skip_all)]
async fn prune_segment(
permit: OwnedSemaphorePermit,
dal: Operator,
pruning_ctx: Arc<PruningContext>,
segment_idx: SegmentIndex,
Expand All @@ -183,6 +181,16 @@ impl BlockPruner {

let (path, ver) = segment_location;
let segment_info = segment_reader.read(path, None, ver).await?;

// IO job of reading segment done, release the permit, allows more concurrent pruners
// Note:
// It is required to explicitly release this permit before pruning blocks, to avoid deadlock.
//
// Otherwise, 1) the whole pruning job should be divided into chunks of segments pruning tasks,
// which contains at most (max_concurrency -1) segments per chunk, and 2) tasks should be executed
// sequentially.
drop(permit);

let result = if pruning_ctx.range_pruner.should_keep(
&segment_info.summary.col_stats,
segment_info.summary.row_count,
Expand Down Expand Up @@ -210,12 +218,15 @@ impl BlockPruner {
let mut blocks = segment_info.blocks.iter().enumerate();
let pruning_runtime = &pruning_ctx.rt;
let semaphore = &pruning_ctx.semaphore;

let tasks = std::iter::from_fn(|| {
// check limit speculatively
if pruning_ctx.limiter.exceeded() {
return None;
}
type BlockPruningFuture = Pin<Box<dyn Future<Output = (usize, bool)> + Send>>;
type BlockPruningFutureReturn = Pin<Box<dyn Future<Output = (usize, bool)> + Send>>;
type BlockPruningFuture =
Box<dyn FnOnce(OwnedSemaphorePermit) -> BlockPruningFutureReturn + Send + 'static>;
blocks.next().map(|(block_idx, block_meta)| {
let row_count = block_meta.row_count;
if pruning_ctx
Expand All @@ -227,20 +238,25 @@ impl BlockPruner {
let filter_pruner = filter_pruner.clone();
let index_location = block_meta.bloom_filter_index_location.clone();
let index_size = block_meta.bloom_filter_index_size;
let v: BlockPruningFuture = Box::pin(async move {
let keep = filter_pruner.should_keep(&index_location, index_size).await
&& ctx.limiter.within_limit(row_count);
(block_idx, keep)
let v: BlockPruningFuture = Box::new(move |_: OwnedSemaphorePermit| {
Box::pin(async move {
let keep = filter_pruner.should_keep(&index_location, index_size).await
&& ctx.limiter.within_limit(row_count);
(block_idx, keep)
})
});
v
} else {
Box::pin(async move { (block_idx, false) })
let v: BlockPruningFuture = Box::new(move |_: OwnedSemaphorePermit| {
Box::pin(async move { (block_idx, false) })
});
v
}
})
});

let join_handlers = pruning_runtime
.try_spawn_batch(semaphore.clone(), tasks)
.try_spawn_batch_with_owned_semaphore(semaphore.clone(), tasks)
.await?;

let joint = future::try_join_all(join_handlers)
Expand Down

1 comment on commit 6b8fa0f

@vercel
Copy link

@vercel vercel bot commented on 6b8fa0f Oct 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-databend.vercel.app
databend.vercel.app
databend-git-main-databend.vercel.app
databend.rs

Please sign in to comment.