From 1f190cad7965377ed4a5a5575c06a3f0bc9596b4 Mon Sep 17 00:00:00 2001 From: Anwar Date: Fri, 2 Dec 2022 21:48:21 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E3=80=90fix=E3=80=91forking=20cluster?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/ForkingClusterInvoker.java | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java index eaf8f69353b..7a640fbc170 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java @@ -29,11 +29,9 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; import static org.apache.dubbo.common.constants.CommonConstants.FORKS_KEY; @@ -83,26 +81,29 @@ public Result doInvoke(final Invocation invocation, List> invokers, L } RpcContext.getServiceContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); - final BlockingQueue ref = new LinkedBlockingQueue<>(1); - for (final Invoker invoker : selected) { + final CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference result = new AtomicReference<>(); + selected.forEach(invoker -> { URL consumerUrl = RpcContext.getServiceContext().getConsumerUrl(); - executor.execute(() -> { - try { - if (ref.size() > 0) { - return; - } - Result result = invokeWithContextAsync(invoker, invocation, consumerUrl); - ref.offer(result); - } catch (Throwable e) { + CompletableFuture.supplyAsync(() -> { + if (result.get() != null) { + return null; + } + return invokeWithContextAsync(invoker, invocation, consumerUrl); + }, executor).whenComplete((v, t) -> { + if (v != null && result.compareAndSet(null, v)) { + countDownLatch.countDown(); + } else if (t != null) { int value = count.incrementAndGet(); - if (value >= selected.size()) { - ref.offer(e); + if (value >= selected.size() && result.compareAndSet(null, t)) { + countDownLatch.countDown(); } } }); - } + }); try { - Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); + countDownLatch.await(timeout, TimeUnit.MILLISECONDS); + Object ret = result.get(); if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : RpcException.UNKNOWN_EXCEPTION, From ea988163c6697f11ace323584d7ea4fd70598848 Mon Sep 17 00:00:00 2001 From: Anwar Date: Mon, 5 Dec 2022 17:13:39 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=20=E3=80=90fix=E3=80=91fix=20code=20style?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dubbo/rpc/cluster/support/ForkingClusterInvoker.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java index 7a640fbc170..da71f7f3529 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java @@ -29,7 +29,10 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; From 67391297de9ba4387881429b2e1a6e1dec8bab43 Mon Sep 17 00:00:00 2001 From: Anwar Date: Tue, 6 Dec 2022 20:37:29 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=20=E3=80=90fix=E3=80=91unit=20test=20unwra?= =?UTF-8?q?p=20exception?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dubbo/rpc/cluster/support/ForkingClusterInvoker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java index da71f7f3529..f6726bcd2f1 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -108,7 +109,7 @@ public Result doInvoke(final Invocation invocation, List> invokers, L countDownLatch.await(timeout, TimeUnit.MILLISECONDS); Object ret = result.get(); if (ret instanceof Throwable) { - Throwable e = (Throwable) ret; + Throwable e = ret instanceof CompletionException ? ((CompletionException) ret).getCause() : (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : RpcException.UNKNOWN_EXCEPTION, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. " + "Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); From 643a18efca0231eb1bb0cc233c1391fb03a2032b Mon Sep 17 00:00:00 2001 From: "kanji.ainiwaer" Date: Tue, 6 Dec 2022 22:16:59 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E3=80=90fix=E3=80=91unit=20test=20unwrap?= =?UTF-8?q?=20CompletionException?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dubbo/rpc/cluster/support/ForkingClusterInvoker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java index da71f7f3529..f6726bcd2f1 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -108,7 +109,7 @@ public Result doInvoke(final Invocation invocation, List> invokers, L countDownLatch.await(timeout, TimeUnit.MILLISECONDS); Object ret = result.get(); if (ret instanceof Throwable) { - Throwable e = (Throwable) ret; + Throwable e = ret instanceof CompletionException ? ((CompletionException) ret).getCause() : (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : RpcException.UNKNOWN_EXCEPTION, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. " + "Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); From 70fa0a01e1a086a161b8d8ea989bd4bc0cc1c053 Mon Sep 17 00:00:00 2001 From: Anwar Date: Wed, 14 Dec 2022 00:25:21 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=20=E3=80=90fix=E3=80=91code=20review?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/ForkingClusterInvoker.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java index f6726bcd2f1..a1334aab078 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java @@ -29,13 +29,13 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; import static org.apache.dubbo.common.constants.CommonConstants.FORKS_KEY; @@ -85,29 +85,27 @@ public Result doInvoke(final Invocation invocation, List> invokers, L } RpcContext.getServiceContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); - final CountDownLatch countDownLatch = new CountDownLatch(1); - AtomicReference result = new AtomicReference<>(); + final BlockingQueue ref = new LinkedBlockingQueue<>(1); selected.forEach(invoker -> { URL consumerUrl = RpcContext.getServiceContext().getConsumerUrl(); CompletableFuture.supplyAsync(() -> { - if (result.get() != null) { + if (ref.size() > 0) { return null; } return invokeWithContextAsync(invoker, invocation, consumerUrl); }, executor).whenComplete((v, t) -> { - if (v != null && result.compareAndSet(null, v)) { - countDownLatch.countDown(); - } else if (t != null) { + if (t == null) { + ref.offer(v); + } else { int value = count.incrementAndGet(); - if (value >= selected.size() && result.compareAndSet(null, t)) { - countDownLatch.countDown(); + if (value >= selected.size()) { + ref.offer(t); } } }); }); try { - countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - Object ret = result.get(); + Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = ret instanceof CompletionException ? ((CompletionException) ret).getCause() : (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : RpcException.UNKNOWN_EXCEPTION,