Skip to content

Commit

Permalink
【fix】code review
Browse files Browse the repository at this point in the history
  • Loading branch information
zeusbee committed Dec 13, 2022
1 parent 7132a67 commit 70fa0a0
Showing 1 changed file with 10 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.FORKS_KEY;
Expand Down Expand Up @@ -85,29 +85,27 @@ public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, L
}
RpcContext.getServiceContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<Object> result = new AtomicReference<>();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<>(1);
selected.forEach(invoker -> {
URL consumerUrl = RpcContext.getServiceContext().getConsumerUrl();
CompletableFuture.<Object>supplyAsync(() -> {
if (result.get() != null) {
if (ref.size() > 0) {
return null;
}
return invokeWithContextAsync(invoker, invocation, consumerUrl);
}, executor).whenComplete((v, t) -> {
if (v != null && result.compareAndSet(null, v)) {
countDownLatch.countDown();
} else if (t != null) {
if (t == null) {
ref.offer(v);
} else {
int value = count.incrementAndGet();
if (value >= selected.size() && result.compareAndSet(null, t)) {
countDownLatch.countDown();
if (value >= selected.size()) {
ref.offer(t);
}
}
});
});
try {
countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
Object ret = result.get();
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = ret instanceof CompletionException ? ((CompletionException) ret).getCause() : (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : RpcException.UNKNOWN_EXCEPTION,
Expand Down

0 comments on commit 70fa0a0

Please sign in to comment.