From cee99cdf2cd56244bda017858404cf89863e8f9d Mon Sep 17 00:00:00 2001 From: FUNKYE <364176773@qq.com> Date: Thu, 2 Feb 2023 13:57:06 +0800 Subject: [PATCH] bugfix: parallel request handle throw IndexOutOfBoundsException (#5281) --- changes/en-us/develop.md | 2 ++ changes/zh-cn/develop.md | 2 ++ .../server/ServerOnRequestProcessor.java | 15 +++++++-------- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/changes/en-us/develop.md b/changes/en-us/develop.md index 9372318c00f..dcf297f77fa 100644 --- a/changes/en-us/develop.md +++ b/changes/en-us/develop.md @@ -14,6 +14,7 @@ Add changes here for all PR submitted to the develop branch. - [[#5266](https://github.com/seata/seata/pull/5265)] fix server console has queried the released lock - [[#5245](https://github.com/seata/seata/pull/5245)] fix the incomplete dependency of distribution module - [[#5239](https://github.com/seata/seata/pull/5239)] fix `getConfig` throw `ClassCastException` when use JDK proxy +- [[#5281](https://github.com/seata/seata/pull/5281)] parallel request handle throw IndexOutOfBoundsException ### optimize: - [[#5208](https://github.com/seata/seata/pull/5208)] optimize throwable getCause once more @@ -44,6 +45,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [pengten](https://github.com/pengten) - [wangliang181230](https://github.com/wangliang181230) - [GoodBoyCoder](https://github.com/GoodBoyCoder) +- [a364176773](https://github.com/a364176773) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/develop.md b/changes/zh-cn/develop.md index f1cd49437be..74646d2bb0b 100644 --- a/changes/zh-cn/develop.md +++ b/changes/zh-cn/develop.md @@ -14,6 +14,7 @@ - [[#5266](https://github.com/seata/seata/pull/5265)] 修复控制台全局锁查询接口查到了已释放的锁 - [[#5245](https://github.com/seata/seata/pull/5245)] 修复不完整的distribution模块依赖 - [[#5239](https://github.com/seata/seata/pull/5239)] 修复当使用JDK代理时,`getConfig` 方法获取部分配置时抛出 `ClassCastException` 异常的问题 +- [[#5281](https://github.com/seata/seata/pull/5281)] 修复并行rm请求处理时数组索引越界问题 ### optimize: - [[#5208](https://github.com/seata/seata/pull/5208)] 优化多次重复获取Throwable#getCause问题 @@ -44,6 +45,7 @@ - [pengten](https://github.com/pengten) - [wangliang181230](https://github.com/wangliang181230) - [GoodBoyCoder](https://github.com/GoodBoyCoder) +- [a364176773](https://github.com/a364176773) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 diff --git a/core/src/main/java/io/seata/core/rpc/processor/server/ServerOnRequestProcessor.java b/core/src/main/java/io/seata/core/rpc/processor/server/ServerOnRequestProcessor.java index b5933444384..5701de4b711 100644 --- a/core/src/main/java/io/seata/core/rpc/processor/server/ServerOnRequestProcessor.java +++ b/core/src/main/java/io/seata/core/rpc/processor/server/ServerOnRequestProcessor.java @@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -179,18 +178,16 @@ private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) } } } else { - List results = new CopyOnWriteArrayList<>(); - List> completableFutures = null; + List results = new ArrayList<>(); + List> completableFutures = null; for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) { if (PARALLEL_REQUEST_HANDLE) { if (completableFutures == null) { completableFutures = new ArrayList<>(); } int finalI = i; - completableFutures.add(CompletableFuture.runAsync(() -> { - results.add(finalI, handleRequestsByMergedWarpMessage( - ((MergedWarpMessage)message).msgs.get(finalI), rpcContext)); - })); + completableFutures.add(CompletableFuture.supplyAsync(() -> handleRequestsByMergedWarpMessage( + ((MergedWarpMessage)message).msgs.get(finalI), rpcContext))); } else { results.add(i, handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext)); @@ -198,7 +195,9 @@ private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) } if (CollectionUtils.isNotEmpty(completableFutures)) { try { - CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get(); + for (CompletableFuture completableFuture : completableFutures) { + results.add(completableFuture.get()); + } } catch (InterruptedException | ExecutionException e) { LOGGER.error("handle request error: {}", e.getMessage(), e); }