Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev-appdata-size-limit' into dev…
Browse files Browse the repository at this point in the history
…-appdata-size-limit

# Conflicts:
#	changes/en-us/2.x.md
#	changes/zh-cn/2.x.md
  • Loading branch information
Bughue committed Nov 27, 2023
2 parents 2288eb1 + effda8c commit 030a5bd
Show file tree
Hide file tree
Showing 15 changed files with 224 additions and 73 deletions.
6 changes: 6 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ Add changes here for all PR submitted to the 2.x branch.

### bugfix:
- [[#6075](https://github.com/seata/seata/pull/6075)] fix missing table alias for on update column of image SQL
- [[#6086](https://github.com/seata/seata/pull/6086)] fix oracle column alias cannot find
- [[#6085](https://github.com/seata/seata/pull/6085)] fix jdk9+ compile error

### optimize:
- [[#6061](https://github.com/seata/seata/pull/6061)] merge the rpcMergeMessageSend threads of rm and tm and increase the thread hibernation duration
- [[#6031](https://github.com/seata/seata/pull/6031)] add a check for the existence of the undolog table
- [[#4473](https://github.com/seata/seata/pull/4473)] rm appdata size limit

Expand All @@ -22,9 +25,12 @@ Thanks to these contributors for their code commits. Please report an unintended

<!-- Please make sure your Github ID is in the list below -->
- [slievrly](https://github.com/slievrly)
- [yiqi](https://github.com/PleaseGiveMeTheCoke)
- [ptyin](https://github.com/ptyin)
- [laywin](https://github.com/laywin)
- [imcmai](https://github.com/imcmai)
- [DroidEye2ONGU](https://github.com/DroidEye2ONGU)
- [funky-eyes](https://github.com/funky-eyes)
- [Bughue](https://github.com/Bughue)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
7 changes: 7 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@

### bugfix:
- [[#6075](https://github.com/seata/seata/pull/6075)] 修复镜像SQL对于on update列没有添加表别名的问题
- [[#6086](https://github.com/seata/seata/pull/6086)] 修复oracle alias 解析异常
- [[#6085](https://github.com/seata/seata/pull/6085)] 修复jdk9+版本编译后,引入后ByteBuffer#flip NoSuchMethodError的问题

### optimize:
- [[#6061](https://github.com/seata/seata/pull/6061)] 合并rm和tm的rpcMergeMessageSend线程,增加线程休眠时长
- [[#6031](https://github.com/seata/seata/pull/6031)] 添加undo_log表的存在性校验
- [[#4473](https://github.com/seata/seata/pull/4473)] rm appdata大小限制


### security:
- [[#6069](https://github.com/seata/seata/pull/6069)] 升级Guava依赖版本,修复安全漏洞

Expand All @@ -22,9 +26,12 @@

<!-- 请确保您的 GitHub ID 在以下列表中 -->
- [slievrly](https://github.com/slievrly)
- [yiqi](https://github.com/PleaseGiveMeTheCoke)
- [ptyin](https://github.com/ptyin)
- [laywin](https://github.com/laywin)
- [imcmai](https://github.com/imcmai)
- [DroidEye2ONGU](https://github.com/DroidEye2ONGU)
- [funky-eyes](https://github.com/funky-eyes)
- [Bughue](https://github.com/Bughue)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
93 changes: 93 additions & 0 deletions common/src/main/java/io/seata/common/util/BufferUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.seata.common.util;

import java.nio.Buffer;

/**
* Explicit cast to {@link Buffer} parent buffer type. It resolves issues with covariant return types in Java 9+ for
* {@link java.nio.ByteBuffer} and {@link java.nio.CharBuffer}. Explicit casting resolves the NoSuchMethodErrors (e.g
* java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip(I)Ljava/nio/ByteBuffer) when the project is compiled with
* newer Java version and run on Java 8.
* <p/>
* <a href="https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html">Java 8</a> doesn't provide override the
* following Buffer methods in subclasses:
*
* <pre>
* Buffer clear()
* Buffer flip()
* Buffer limit(int newLimit)
* Buffer mark()
* Buffer position(int newPosition)
* Buffer reset()
* Buffer rewind()
* </pre>
*
* <a href="https://docs.oracle.com/javase/9/docs/api/java/nio/ByteBuffer.html">Java 9</a> introduces the overrides in
* child classes (e.g the ByteBuffer), but the return type is the specialized one and not the abstract {@link Buffer}.
* So the code compiled with newer Java is not working on Java 8 unless a workaround with explicit casting is used.
*
* @author funkye
*/
public class BufferUtils {

/**
* @param buffer byteBuffer
*/
public static void flip(Buffer buffer) {
buffer.flip();
}

/**
* @param buffer byteBuffer
*/
public static void clear(Buffer buffer) {
buffer.clear();
}

/**
* @param buffer byteBuffer
*/
public static void limit(Buffer buffer, int newLimit) {
buffer.limit(newLimit);
}

/**
* @param buffer byteBuffer
*/
public static void mark(Buffer buffer) {
buffer.mark();
}

/**
* @param buffer byteBuffer
*/
public static void position(Buffer buffer, int newPosition) {
buffer.position(newPosition);
}

/**
* @param buffer byteBuffer
*/
public static void rewind(Buffer buffer) {
buffer.rewind();
}

/**
* @param buffer byteBuffer
*/
public static void reset(Buffer buffer) {
buffer.reset();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public abstract class AbstractNettyRemoting implements Disposable {
/**
* The Is sending.
*/
protected volatile boolean isSending = false;
protected static volatile boolean isSending = false;
private String group = "DEFAULT";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -77,14 +79,13 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
private static final String MSG_ID_PREFIX = "msgId:";
private static final String FUTURES_PREFIX = "futures:";
private static final String SINGLE_LOG_POSTFIX = ";";
private static final int MAX_MERGE_SEND_MILLS = 1;
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
private static final int MAX_MERGE_SEND_MILLS = 10;
private static final int MAX_MERGE_SEND_THREAD = 1;
private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L;
private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;
private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
protected final Object mergeLock = new Object();
private static final String MERGE_THREAD_NAME = "rpcMergeMessageSend";
protected static final Object MERGE_LOCK = new Object();

/**
* When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap.
Expand All @@ -96,29 +97,39 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
* Send via asynchronous thread {@link io.seata.core.rpc.netty.AbstractNettyRemotingClient.MergedSendRunnable}
* {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()}
*/
protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();
protected static final ConcurrentHashMap<String/*serverAddress*/, Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>>> BASKET_MAP = new ConcurrentHashMap<>();
private final NettyClientBootstrap clientBootstrap;
private final NettyClientChannelManager clientChannelManager;
private final NettyPoolKey.TransactionRole transactionRole;
private ExecutorService mergeSendExecutorService;
private static volatile ExecutorService mergeSendExecutorService;
private TransactionMessageHandler transactionMessageHandler;
protected volatile boolean enableClientBatchSendRequest;

@Override
public void init() {
timerExecutor.scheduleAtFixedRate(() -> clientChannelManager.reconnect(getTransactionServiceGroup()), SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
startMergeSendThread();
}
super.init();
clientBootstrap.start();
}

private void startMergeSendThread() {
if (mergeSendExecutorService == null) {
synchronized (AbstractNettyRemoting.class) {
if (mergeSendExecutorService == null) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(MERGE_THREAD_NAME, MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
}
}
}

public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
super(messageExecutor);
Expand Down Expand Up @@ -146,8 +157,14 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {
futures.put(rpcMessage.getId(), messageFuture);

// put message into basketMap
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>> roleMessage = CollectionUtils.computeIfAbsent(BASKET_MAP, serverAddress,
key -> {
Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>> map = new HashMap<>(2);
map.put(NettyPoolKey.TransactionRole.TMROLE, new LinkedBlockingQueue<>());
map.put(NettyPoolKey.TransactionRole.RMROLE, new LinkedBlockingQueue<>());
return map;
});
BlockingQueue<RpcMessage> basket = roleMessage.get(transactionRole);
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
Expand All @@ -157,8 +174,8 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
synchronized (MERGE_LOCK) {
MERGE_LOCK.notifyAll();
}
}

Expand Down Expand Up @@ -291,10 +308,6 @@ protected String getXid(Object msg) {
return StringUtils.isBlank(xid) ? String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) : xid;
}

private String getThreadPrefix() {
return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
}

/**
* Get pool key function.
*
Expand Down Expand Up @@ -326,59 +339,72 @@ private String getThreadPrefix() {
/**
* The type Merged send runnable.
*/
private class MergedSendRunnable implements Runnable {
private static class MergedSendRunnable implements Runnable {

@Override
public void run() {
while (true) {
synchronized (mergeLock) {
try {
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) {
synchronized (MERGE_LOCK) {
if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) {
try {
MERGE_LOCK.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
}
}
isSending = true;
basketMap.forEach((address, basket) -> {
BASKET_MAP.forEach((address, roleMessage) -> roleMessage.forEach((role, basket) -> {
if (basket.isEmpty()) {
return;
}

AbstractNettyRemotingClient client;
if (role.equals(NettyPoolKey.TransactionRole.RMROLE)) {
client = RmNettyRemotingClient.getInstance();
} else {
client = TmNettyRemotingClient.getInstance();
}

ConcurrentHashMap<Integer, MessageFuture> clientFutures = client.getFutures();

MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
printMergeMessageLog(clientFutures, mergeMessage);
}
Channel sendChannel = null;
try {
// send batch message is sync request, but there is no need to get the return value.
// Since the messageFuture has been created before the message is placed in basketMap,
// the return value will be obtained in ClientOnResponseProcessor.
sendChannel = clientChannelManager.acquireChannel(address);
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
sendChannel = client.getClientChannelManager().acquireChannel(address);
client.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
destroyChannel(address, sendChannel);
client.destroyChannel(address, sendChannel);
}
// fast fail
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
MessageFuture messageFuture = clientFutures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(
new RuntimeException(String.format("%s is unreachable", address), e));
new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
});
}));
isSending = false;
}
}

private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
private void printMergeMessageLog(ConcurrentHashMap<Integer, MessageFuture> clientFutures, MergedWarpMessage mergeMessage) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("merge msg size:{}", mergeMessage.msgIds.size());
for (AbstractMessage cm : mergeMessage.msgs) {
Expand All @@ -389,7 +415,7 @@ private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
sb.append(MSG_ID_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
sb.append("\n");
for (long l : futures.keySet()) {
for (long l : clientFutures.keySet()) {
sb.append(FUTURES_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
LOGGER.debug(sb.toString());
Expand Down
Loading

0 comments on commit 030a5bd

Please sign in to comment.