From fa45c5e0a6cab670d34741589ff7ed9282fd8863 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Fri, 14 Jul 2023 17:02:28 +0200 Subject: [PATCH] The pool asynchronously removes connection slots when they are identified as removable. This create races, e.g a connection can be evicted from the pool and then acquired before the removal happens. Connection slots are now removed synchronously when the pool identifies connections to be removed immediately within the pool logic. The post action removal are now enqueued in the main post action (when there is one), the combiner executor has been modified to take in account the fact that a post action might next actions to execute. --- .../core/net/impl/pool/CombinerExecutor.java | 5 ++- .../core/net/impl/pool/SemaphoreExecutor.java | 3 +- .../net/impl/pool/SimpleConnectionPool.java | 27 ++++++++++----- .../io/vertx/core/net/impl/pool/Task.java | 4 +++ .../net/impl/pool/ConnectionPoolTest.java | 34 +++++++++++++++++++ 5 files changed, 63 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/vertx/core/net/impl/pool/CombinerExecutor.java b/src/main/java/io/vertx/core/net/impl/pool/CombinerExecutor.java index 4cbba5142e3..69366fe3731 100644 --- a/src/main/java/io/vertx/core/net/impl/pool/CombinerExecutor.java +++ b/src/main/java/io/vertx/core/net/impl/pool/CombinerExecutor.java @@ -54,11 +54,14 @@ public void submit(Action action) { if (a == null) { break; } - Task task = a.execute(state); + final Task task = a.execute(state); if (task != null) { if (head == null) { assert tail == null; tail = task; + for (Task next = tail.next();next != null;next = tail.next()) { + tail = tail.next(); + } head = task; } else { tail = tail.next(task); diff --git a/src/main/java/io/vertx/core/net/impl/pool/SemaphoreExecutor.java b/src/main/java/io/vertx/core/net/impl/pool/SemaphoreExecutor.java index 22adb16cd3c..b9defc2cdad 100644 --- a/src/main/java/io/vertx/core/net/impl/pool/SemaphoreExecutor.java +++ b/src/main/java/io/vertx/core/net/impl/pool/SemaphoreExecutor.java @@ -30,8 +30,9 @@ public void submit(Action action) { post = action.execute(state); } finally { lock.unlock(); - if (post != null) { + while (post != null) { post.run(); + post = post.next(); } } } diff --git a/src/main/java/io/vertx/core/net/impl/pool/SimpleConnectionPool.java b/src/main/java/io/vertx/core/net/impl/pool/SimpleConnectionPool.java index 0c599893f51..4263ea26563 100644 --- a/src/main/java/io/vertx/core/net/impl/pool/SimpleConnectionPool.java +++ b/src/main/java/io/vertx/core/net/impl/pool/SimpleConnectionPool.java @@ -368,10 +368,7 @@ public Task execute(SimpleConnectionPool pool) { } else { waiter.disposed = true; } - if (!pool.closed) { - pool.remove(removed); - } - return new Task() { + Task task = new Task() { @Override public void run() { if (waiter != null) { @@ -386,6 +383,14 @@ public void run() { removed.result.fail(cause); } }; + if (!pool.closed) { + Task removeTask = new Remove<>(removed).execute(pool); + if (removeTask != null) { + removeTask.next(task); + task = removeTask; + } + } + return task; } } @@ -522,15 +527,21 @@ public void run() { res.add(slot.connection); } } - for (Slot slot : removed) { - pool.remove(slot); - } - return new Task() { + Task head = new Task() { @Override public void run() { handler.handle(Future.succeededFuture(res)); } }; + Task tail = head; + for (Slot slot : removed) { + Task next = new Remove<>(slot).execute(pool); + if (next != null) { + tail.next(next); + tail = next; + } + } + return head; } } diff --git a/src/main/java/io/vertx/core/net/impl/pool/Task.java b/src/main/java/io/vertx/core/net/impl/pool/Task.java index 55f756d0053..7d02b6598f4 100644 --- a/src/main/java/io/vertx/core/net/impl/pool/Task.java +++ b/src/main/java/io/vertx/core/net/impl/pool/Task.java @@ -20,6 +20,10 @@ public Task replaceNext(Task next) { return oldNext; } + public Task next() { + return next; + } + public Task next(Task next) { this.next = next; return next; diff --git a/src/test/java/io/vertx/core/net/impl/pool/ConnectionPoolTest.java b/src/test/java/io/vertx/core/net/impl/pool/ConnectionPoolTest.java index d00201e5561..6f065fc1b29 100644 --- a/src/test/java/io/vertx/core/net/impl/pool/ConnectionPoolTest.java +++ b/src/test/java/io/vertx/core/net/impl/pool/ConnectionPoolTest.java @@ -444,6 +444,40 @@ public void testRemoveEvicted() throws Exception { assertEquals(0, pool.size()); } + @Test + public void testSynchronousEviction() throws Exception { + ConnectionManager mgr = new ConnectionManager(); + ConnectionPool pool = ConnectionPool.pool(mgr, new int[] { 1 }, 1); + EventLoopContext ctx = vertx.createEventLoopContext(); + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + CountDownLatch latch3 = new CountDownLatch(1); + pool.acquire(ctx, 0, onSuccess(lease -> { + lease.recycle(); + latch1.countDown(); + })); + ConnectionRequest request = mgr.assertRequest(); + Connection conn1 = new Connection(); + request.connect(conn1, 0); + awaitLatch(latch1); + Connection conn2 = new Connection(); + pool.evict(candidate -> { + assertSame(candidate, conn1); + pool.acquire(ctx, 0, onSuccess(lease -> { + Connection c2 = lease.get(); + assertSame(conn2, c2); + latch3.countDown(); + })); + return true; + }, onSuccess(list -> { + latch2.countDown(); + })); + awaitLatch(latch2); + request = mgr.assertRequest(); + request.connect(conn2, 0); + awaitLatch(latch3); + } + @Test public void testConnectionInProgressShouldNotBeEvicted() { ConnectionManager mgr = new ConnectionManager();