diff --git a/reactor-pool/src/main/java/reactor/pool/AbstractPool.java b/reactor-pool/src/main/java/reactor/pool/AbstractPool.java index 822c4b2..ea55551 100644 --- a/reactor-pool/src/main/java/reactor/pool/AbstractPool.java +++ b/reactor-pool/src/main/java/reactor/pool/AbstractPool.java @@ -422,19 +422,7 @@ public void run() { @Override public void request(long n) { if (Operators.validate(n)) { - //start the countdown - - boolean noIdle = pool.idleSize() == 0; - boolean noPermits = pool.poolConfig.allocationStrategy().estimatePermitCount() == 0; - - if (noIdle && noPermits) { - pendingAcquireStart = pool.clock.millis(); - if (!pendingAcquireTimeout.isZero()) { - timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout); - } - } - //doAcquire should interrupt the countdown if there is either an available - //resource or the pool can allocate one + // doAcquire will check for acquire timeout pool.doAcquire(this); } } diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 135622d..eaa2b25 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -571,7 +571,6 @@ final void maybeRecycleAndDrain(QueuePooledRef poolSlot, CoreSubscribe * @param pending a new {@link reactor.pool.AbstractPool.Borrower} to add to the queue and later either serve or consider pending */ void pendingOffer(Borrower pending) { - int maxPending = poolConfig.maxPending(); ConcurrentLinkedDeque> pendingQueue = this.pending; if (pendingQueue == TERMINATED) { return; @@ -581,7 +580,19 @@ void pendingOffer(Borrower pending) { postOffer = PENDING_SIZE.incrementAndGet(this); } + int idle = idleSize; + int estimatePermitCount = poolConfig.allocationStrategy().estimatePermitCount(); + + // This is "best effort" + if (idle + estimatePermitCount < postOffer) { + pending.pendingAcquireStart = clock.millis(); + if (!pending.pendingAcquireTimeout.isZero()) { + pending.timeoutTask = config().pendingAcquireTimer().apply(pending, pending.pendingAcquireTimeout); + } + } + if (WIP.getAndIncrement(this) == 0) { + int maxPending = poolConfig.maxPending(); if (maxPending >= 0 && postOffer > maxPending && idleSize == 0 && poolConfig.allocationStrategy().estimatePermitCount() == 0) { //fail fast. differentiate slightly special case of maxPending == 0 Borrower toCull = pendingQueue.pollLast(); diff --git a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java index 6760b9d..f617949 100644 --- a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java +++ b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java @@ -31,6 +31,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1117,6 +1119,36 @@ void discardCloseableWhenCloseFailureLogs(PoolStyle configAdjuster) { } } + @ParameterizedTestWithName + @MethodSource("allPools") + void pendingTimeout(PoolStyle configAdjuster) throws Exception { + PoolBuilder builder = PoolBuilder + .from(Mono.just("pooled")) + .sizeBetween(0, 1) + .maxPendingAcquire(10); + AbstractPool pool = configAdjuster.apply(builder); + + CountDownLatch latch = new CountDownLatch(3); + ExecutorService executorService = Executors.newFixedThreadPool(20); + try { + CompletableFuture[] completableFutures = new CompletableFuture[4]; + for (int i = 0; i < completableFutures.length; i++) { + completableFutures[i] = CompletableFuture.runAsync( + () -> pool.acquire(Duration.ofMillis(10)) + .doOnError(t -> latch.countDown()) + .onErrorResume(PoolAcquireTimeoutException.class, t -> Mono.empty()) + .block(), + executorService); + } + + CompletableFuture.allOf(completableFutures).join(); + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + } + finally { + executorService.shutdown(); + } + } + @ParameterizedTestWithName @MethodSource("allPools") void pendingTimeoutNotImpactedByLongAllocation(PoolStyle configAdjuster) {