From 9999827271da5f66e6dc3e6718c1906222aa59b2 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 13 Jun 2024 16:48:46 +0300 Subject: [PATCH 1/5] When applying pendingAcquireTimeout check for current pending Borrowers Fixes #219 --- .../main/java/reactor/pool/AbstractPool.java | 22 ++++++++------- .../java/reactor/pool/SimpleDequePool.java | 19 +++++++++++-- .../java/reactor/pool/CommonPoolTest.java | 27 +++++++++++++++++++ 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/reactor-pool/src/main/java/reactor/pool/AbstractPool.java b/reactor-pool/src/main/java/reactor/pool/AbstractPool.java index 822c4b21..ab5c8867 100644 --- a/reactor-pool/src/main/java/reactor/pool/AbstractPool.java +++ b/reactor-pool/src/main/java/reactor/pool/AbstractPool.java @@ -122,6 +122,18 @@ public boolean isInactiveForMoreThan(Duration duration) { // == common methods to interact with idle/pending queues == + void applyPendingAcquireTimeout(Borrower borrower) { + boolean noIdle = idleSize() == 0; + boolean noPermits = poolConfig.allocationStrategy().estimatePermitCount() == 0; + + if (noIdle && noPermits) { + borrower.pendingAcquireStart = clock.millis(); + if (!borrower.pendingAcquireTimeout.isZero()) { + borrower.timeoutTask = config().pendingAcquireTimer().apply(borrower, borrower.pendingAcquireTimeout); + } + } + } + abstract boolean elementOffer(POOLABLE element); //used in tests /** @@ -423,16 +435,8 @@ public void run() { public void request(long n) { if (Operators.validate(n)) { //start the countdown + pool.applyPendingAcquireTimeout(this); - boolean noIdle = pool.idleSize() == 0; - boolean noPermits = pool.poolConfig.allocationStrategy().estimatePermitCount() == 0; - - if (noIdle && noPermits) { - pendingAcquireStart = pool.clock.millis(); - if (!pendingAcquireTimeout.isZero()) { - timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout); - } - } //doAcquire should interrupt the countdown if there is either an available //resource or the pool can allocate one pool.doAcquire(this); diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 135622d6..d4f8cd5e 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -508,6 +508,11 @@ public int pendingAcquireSize() { return pendingSize; } + @Override + void applyPendingAcquireTimeout(Borrower borrower) { + // Pending acquire timeout is applied when pendingOffer() is invoked + } + @Override boolean elementOffer(POOLABLE element) { @SuppressWarnings("unchecked") @@ -571,7 +576,6 @@ final void maybeRecycleAndDrain(QueuePooledRef poolSlot, CoreSubscribe * @param pending a new {@link reactor.pool.AbstractPool.Borrower} to add to the queue and later either serve or consider pending */ void pendingOffer(Borrower pending) { - int maxPending = poolConfig.maxPending(); ConcurrentLinkedDeque> pendingQueue = this.pending; if (pendingQueue == TERMINATED) { return; @@ -581,6 +585,17 @@ void pendingOffer(Borrower pending) { postOffer = PENDING_SIZE.incrementAndGet(this); } + boolean noIdle = idleSize == 0; + int estimatePermitCount = poolConfig.allocationStrategy().estimatePermitCount(); + + if (noIdle && (estimatePermitCount == 0 || estimatePermitCount < postOffer)) { + pending.pendingAcquireStart = clock.millis(); + if (!pending.pendingAcquireTimeout.isZero()) { + pending.timeoutTask = config().pendingAcquireTimer().apply(pending, pending.pendingAcquireTimeout); + } + } + + int maxPending = poolConfig.maxPending(); if (WIP.getAndIncrement(this) == 0) { if (maxPending >= 0 && postOffer > maxPending && idleSize == 0 && poolConfig.allocationStrategy().estimatePermitCount() == 0) { //fail fast. differentiate slightly special case of maxPending == 0 diff --git a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java index 6760b9d0..70956421 100644 --- a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java +++ b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java @@ -31,6 +31,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1117,6 +1119,31 @@ void discardCloseableWhenCloseFailureLogs(PoolStyle configAdjuster) { } } + @ParameterizedTestWithName + @MethodSource("allPools") + void pendingTimeout(PoolStyle configAdjuster) throws Exception { + PoolBuilder builder = PoolBuilder + .from(Mono.just("pendingTimeout")) + .sizeBetween(0, 1) + .maxPendingAcquire(10); + AbstractPool pool = configAdjuster.apply(builder); + + CountDownLatch latch = new CountDownLatch(3); + ExecutorService executorService = Executors.newFixedThreadPool(20); + CompletableFuture[] completableFutures = new CompletableFuture[4]; + for (int i = 0; i < completableFutures.length; i++) { + completableFutures[i] = CompletableFuture.runAsync( + () -> pool.acquire(Duration.ofMillis(10)) + .doOnError(t -> latch.countDown()) + .onErrorResume(PoolAcquireTimeoutException.class, t -> Mono.empty()) + .block(), + executorService); + } + + Mono.fromCompletionStage(CompletableFuture.allOf(completableFutures)).block(Duration.ofSeconds(5)); + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + } + @ParameterizedTestWithName @MethodSource("allPools") void pendingTimeoutNotImpactedByLongAllocation(PoolStyle configAdjuster) { From e86b5c8bc0101bee0c4b2e5e0ca80757f5cf78f5 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 14 Jun 2024 09:21:46 +0300 Subject: [PATCH 2/5] Simplify --- reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index d4f8cd5e..34c7e165 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -585,10 +585,10 @@ void pendingOffer(Borrower pending) { postOffer = PENDING_SIZE.incrementAndGet(this); } - boolean noIdle = idleSize == 0; + int idle = idleSize; int estimatePermitCount = poolConfig.allocationStrategy().estimatePermitCount(); - if (noIdle && (estimatePermitCount == 0 || estimatePermitCount < postOffer)) { + if (idle + estimatePermitCount < postOffer) { pending.pendingAcquireStart = clock.millis(); if (!pending.pendingAcquireTimeout.isZero()) { pending.timeoutTask = config().pendingAcquireTimer().apply(pending, pending.pendingAcquireTimeout); From b4af764f9e652ee7fc5ee8867a47403b2390b10c Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 14 Jun 2024 11:34:33 +0300 Subject: [PATCH 3/5] AbstractPool is not public - no need to keep backwards compatibility --- .../main/java/reactor/pool/AbstractPool.java | 18 +----------------- .../java/reactor/pool/SimpleDequePool.java | 5 ----- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/reactor-pool/src/main/java/reactor/pool/AbstractPool.java b/reactor-pool/src/main/java/reactor/pool/AbstractPool.java index ab5c8867..ea55551a 100644 --- a/reactor-pool/src/main/java/reactor/pool/AbstractPool.java +++ b/reactor-pool/src/main/java/reactor/pool/AbstractPool.java @@ -122,18 +122,6 @@ public boolean isInactiveForMoreThan(Duration duration) { // == common methods to interact with idle/pending queues == - void applyPendingAcquireTimeout(Borrower borrower) { - boolean noIdle = idleSize() == 0; - boolean noPermits = poolConfig.allocationStrategy().estimatePermitCount() == 0; - - if (noIdle && noPermits) { - borrower.pendingAcquireStart = clock.millis(); - if (!borrower.pendingAcquireTimeout.isZero()) { - borrower.timeoutTask = config().pendingAcquireTimer().apply(borrower, borrower.pendingAcquireTimeout); - } - } - } - abstract boolean elementOffer(POOLABLE element); //used in tests /** @@ -434,11 +422,7 @@ public void run() { @Override public void request(long n) { if (Operators.validate(n)) { - //start the countdown - pool.applyPendingAcquireTimeout(this); - - //doAcquire should interrupt the countdown if there is either an available - //resource or the pool can allocate one + // doAcquire will check for acquire timeout pool.doAcquire(this); } } diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 34c7e165..a370814c 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -508,11 +508,6 @@ public int pendingAcquireSize() { return pendingSize; } - @Override - void applyPendingAcquireTimeout(Borrower borrower) { - // Pending acquire timeout is applied when pendingOffer() is invoked - } - @Override boolean elementOffer(POOLABLE element) { @SuppressWarnings("unchecked") From 9088627cb477e6ca159755eed79cf798991667ce Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 14 Jun 2024 11:35:39 +0300 Subject: [PATCH 4/5] Address feedback --- .../java/reactor/pool/CommonPoolTest.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java index 70956421..f617949e 100644 --- a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java +++ b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java @@ -1123,25 +1123,30 @@ void discardCloseableWhenCloseFailureLogs(PoolStyle configAdjuster) { @MethodSource("allPools") void pendingTimeout(PoolStyle configAdjuster) throws Exception { PoolBuilder builder = PoolBuilder - .from(Mono.just("pendingTimeout")) + .from(Mono.just("pooled")) .sizeBetween(0, 1) .maxPendingAcquire(10); AbstractPool pool = configAdjuster.apply(builder); CountDownLatch latch = new CountDownLatch(3); ExecutorService executorService = Executors.newFixedThreadPool(20); - CompletableFuture[] completableFutures = new CompletableFuture[4]; - for (int i = 0; i < completableFutures.length; i++) { - completableFutures[i] = CompletableFuture.runAsync( - () -> pool.acquire(Duration.ofMillis(10)) - .doOnError(t -> latch.countDown()) - .onErrorResume(PoolAcquireTimeoutException.class, t -> Mono.empty()) - .block(), - executorService); - } + try { + CompletableFuture[] completableFutures = new CompletableFuture[4]; + for (int i = 0; i < completableFutures.length; i++) { + completableFutures[i] = CompletableFuture.runAsync( + () -> pool.acquire(Duration.ofMillis(10)) + .doOnError(t -> latch.countDown()) + .onErrorResume(PoolAcquireTimeoutException.class, t -> Mono.empty()) + .block(), + executorService); + } - Mono.fromCompletionStage(CompletableFuture.allOf(completableFutures)).block(Duration.ofSeconds(5)); - assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + CompletableFuture.allOf(completableFutures).join(); + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + } + finally { + executorService.shutdown(); + } } @ParameterizedTestWithName From de3911bcf97d44045630efa769ce12ba24247756 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 14 Jun 2024 12:30:24 +0300 Subject: [PATCH 5/5] Address feedback --- reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index a370814c..eaa2b25e 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -583,6 +583,7 @@ void pendingOffer(Borrower pending) { int idle = idleSize; int estimatePermitCount = poolConfig.allocationStrategy().estimatePermitCount(); + // This is "best effort" if (idle + estimatePermitCount < postOffer) { pending.pendingAcquireStart = clock.millis(); if (!pending.pendingAcquireTimeout.isZero()) { @@ -590,8 +591,8 @@ void pendingOffer(Borrower pending) { } } - int maxPending = poolConfig.maxPending(); if (WIP.getAndIncrement(this) == 0) { + int maxPending = poolConfig.maxPending(); if (maxPending >= 0 && postOffer > maxPending && idleSize == 0 && poolConfig.allocationStrategy().estimatePermitCount() == 0) { //fail fast. differentiate slightly special case of maxPending == 0 Borrower toCull = pendingQueue.pollLast();