Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: parallel request handle throw IndexOutOfBoundsException #5281

Merged
merged 3 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/en-us/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Add changes here for all PR submitted to the develop branch.
- [[#5266](https://github.com/seata/seata/pull/5265)] fix server console has queried the released lock
- [[#5245](https://github.com/seata/seata/pull/5245)] fix the incomplete dependency of distribution module
- [[#5239](https://github.com/seata/seata/pull/5239)] fix `getConfig` throw `ClassCastException` when use JDK proxy
- [[#5281](https://github.com/seata/seata/pull/5281)] parallel request handle throw IndexOutOfBoundsException

### optimize:
- [[#5208](https://github.com/seata/seata/pull/5208)] optimize throwable getCause once more
Expand Down Expand Up @@ -44,6 +45,7 @@ Thanks to these contributors for their code commits. Please report an unintended
- [pengten](https://github.com/pengten)
- [wangliang181230](https://github.com/wangliang181230)
- [GoodBoyCoder](https://github.com/GoodBoyCoder)
- [a364176773](https://github.com/a364176773)


Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
2 changes: 2 additions & 0 deletions changes/zh-cn/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- [[#5266](https://github.com/seata/seata/pull/5265)] 修复控制台全局锁查询接口查到了已释放的锁
- [[#5245](https://github.com/seata/seata/pull/5245)] 修复不完整的distribution模块依赖
- [[#5239](https://github.com/seata/seata/pull/5239)] 修复当使用JDK代理时,`getConfig` 方法获取部分配置时抛出 `ClassCastException` 异常的问题
- [[#5281](https://github.com/seata/seata/pull/5281)] 修复并行rm请求处理时数组索引越界问题

### optimize:
- [[#5208](https://github.com/seata/seata/pull/5208)] 优化多次重复获取Throwable#getCause问题
Expand Down Expand Up @@ -44,6 +45,7 @@
- [pengten](https://github.com/pengten)
- [wangliang181230](https://github.com/wangliang181230)
- [GoodBoyCoder](https://github.com/GoodBoyCoder)
- [a364176773](https://github.com/a364176773)


同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -179,26 +178,26 @@ private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage)
}
}
} else {
List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();
List<CompletableFuture<Void>> completableFutures = null;
List<AbstractResultMessage> results = new ArrayList<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不在多线程中add,不存在线程安全问题,改为ArrayList

List<CompletableFuture<AbstractResultMessage>> completableFutures = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

每个future的返回值都为AbstractResultMessage

for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
if (PARALLEL_REQUEST_HANDLE) {
if (completableFutures == null) {
completableFutures = new ArrayList<>();
}
int finalI = i;
completableFutures.add(CompletableFuture.runAsync(() -> {
results.add(finalI, handleRequestsByMergedWarpMessage(
((MergedWarpMessage)message).msgs.get(finalI), rpcContext));
}));
completableFutures.add(CompletableFuture.supplyAsync(() -> handleRequestsByMergedWarpMessage(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有序的将每个执行request处理的future放入completableFutures

((MergedWarpMessage)message).msgs.get(finalI), rpcContext)));
} else {
results.add(i,
handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
}
}
if (CollectionUtils.isNotEmpty(completableFutures)) {
try {
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();
for (CompletableFuture<AbstractResultMessage> completableFuture : completableFutures) {
results.add(completableFuture.get());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

按顺序读取再加回去,completableFutures的数量等于msgs,顺序也一致

}
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("handle request error: {}", e.getMessage(), e);
}
Expand Down