Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature : multi-version protocol control #5738

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5bff2f2
encoder
Bughue Jul 18, 2023
c7497c1
protocol
Bughue Jul 19, 2023
ed03135
protocol
Bughue Jul 19, 2023
fec3060
Merge branch '2.x' into dev_protocal
Bughue Jul 19, 2023
53a98ea
protocol
Bughue Jul 19, 2023
a1f0287
Merge remote-tracking branch 'origin/dev_protocal' into dev_protocal
Bughue Jul 19, 2023
b909dde
protocol
Bughue Aug 17, 2023
bbef2f9
Merge branch '2.x' of https://github.com/seata/seata into dev_protocal
Bughue Aug 17, 2023
6c08982
protocol
Bughue Aug 21, 2023
e85501f
protocol
Bughue Aug 21, 2023
900428f
protocol
Bughue Aug 21, 2023
45fc67e
protocol
Bughue Aug 21, 2023
74382e5
protocol
Bughue Aug 21, 2023
6cffd4a
protocol
Bughue Aug 28, 2023
9207d2f
protocol
Bughue Sep 21, 2023
1bec9bb
Merge branch '2.x' of https://github.com/seata/seata into dev_protocal
Bughue Sep 21, 2023
67c5a30
application.example.yml
Bughue Nov 13, 2023
45d1cf1
mock
Bughue Nov 14, 2023
eaf1e2d
Merge branch '2.x' of https://github.com/seata/seata into dev_protocal
Bughue Nov 14, 2023
a2fe483
classloader-version fix
Bughue Nov 15, 2023
be10f07
fix test and format
Bughue Nov 15, 2023
8577d01
test fix
Bughue Nov 15, 2023
e0bf2d0
code style
Bughue Nov 16, 2023
aa92694
Merge branch '2.x' into dev_protocal
Bughue Nov 16, 2023
bcec6ca
code style
Bughue Nov 16, 2023
af2599c
Merge remote-tracking branch 'origin/dev_protocal' into dev_protocal
Bughue Nov 16, 2023
5e5d5d7
classloader-version fix
Bughue Nov 30, 2023
8413a41
Merge branch '2.x' of https://github.com/seata/seata into dev_protocal
Bughue Nov 30, 2023
9338645
mock test
Bughue Nov 30, 2023
bb41c7f
log
Bughue Nov 30, 2023
93de15a
Merge branch '2.x' into dev_protocal
Bughue Dec 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@ public interface ProtocolConstants {
*/
byte[] MAGIC_CODE_BYTES = {(byte) 0xda, (byte) 0xda};

/**
* Old protocol version
*/
byte VERSION_0 = 0;

/**
* Protocol version
*/
byte VERSION = 1;
byte VERSION_1 = 1;

byte VERSION_CURRENT = VERSION_1;

/**
* Max frame length
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/seata/core/protocol/RpcMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class RpcMessage implements Serializable {
private Map<String, String> headMap = new HashMap<>();
private Object body;

private String version;

/**
* Gets id.
*
Expand Down Expand Up @@ -169,6 +171,14 @@ public void setMessageType(byte messageType) {
this.messageType = messageType;
}

public String getVersion() {
return version;
}

public void setVersion(String version) {
this.version = version;
}

@Override
public String toString() {
return StringUtils.toString(this);
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/java/io/seata/core/protocol/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class Version {
* The constant CURRENT.
*/
private static final String CURRENT = VersionInfo.VERSION;
private static final String VERSION_0_7_1 = "0.7.1";
public static final String VERSION_0_7_1 = "0.7.1";
private static final String VERSION_1_5_0 = "1.5.0";
private static final int MAX_VERSION_DOT = 3;

Expand Down Expand Up @@ -89,7 +89,7 @@ public static void checkVersion(String version) throws IncompatibleVersionExcept
long current = convertVersion(CURRENT);
long clientVersion = convertVersion(version);
long divideVersion = convertVersion(VERSION_0_7_1);
if ((current > divideVersion && clientVersion < divideVersion) || (current < divideVersion && clientVersion > divideVersion)) {
if (current < divideVersion && clientVersion > divideVersion) {
throw new IncompatibleVersionException("incompatible client version:" + version);
}
}
Expand Down Expand Up @@ -150,6 +150,16 @@ public static long convertVersionNotThrowException(String version) {
return -1;
}

public static byte calcProtocolVersion(String sdkVersion) throws IncompatibleVersionException {
long version = convertVersion(sdkVersion);
long v0 = convertVersion(VERSION_0_7_1);
if (version <= v0) {
return ProtocolConstants.VERSION_0;
} else {
return ProtocolConstants.VERSION_1;
}
}

private static long calculatePartValue(String partNumeric, int size, int index) {
return Long.parseLong(partNumeric) * Double.valueOf(Math.pow(100, size - index)).longValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.seata.core.protocol.MessageTypeAware;
import io.seata.core.protocol.ProtocolConstants;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.protocol.Version;
import io.seata.core.rpc.Disposable;
import io.seata.core.rpc.hook.RpcHook;
import io.seata.core.rpc.processor.Pair;
Expand All @@ -63,7 +64,7 @@ public abstract class AbstractNettyRemoting implements Disposable {
* The Timer executor.
*/
protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("timeoutChecker", 1, true));
new NamedThreadFactory("timeoutChecker", 1, true));
/**
* The Message executor.
*/
Expand Down Expand Up @@ -113,7 +114,7 @@ public void run() {
futures.remove(entry.getKey());
RpcMessage rpcMessage = future.getRequestMessage();
future.setResultMessage(new TimeoutException(String
.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
Expand Down Expand Up @@ -200,7 +201,7 @@ protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMi
return result;
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
rpcMessage.getBody());
rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
Expand All @@ -219,7 +220,7 @@ protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
channelWritableCheck(channel, rpcMessage.getBody());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
+ channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
+ channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
}

doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);
Expand All @@ -232,22 +233,32 @@ protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
}

protected RpcMessage buildRequestMessage(Object msg, byte messageType) {
return buildRequestMessage(msg, messageType, Version.getCurrent());
}

protected RpcMessage buildRequestMessage(Object msg, byte messageType, String version) {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setId(getNextMessageId());
rpcMessage.setMessageType(messageType);
rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC);
rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR);
rpcMessage.setBody(msg);
rpcMessage.setVersion(version);
return rpcMessage;
}

protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType) {
return buildResponseMessage(rpcMessage, msg, messageType, Version.getCurrent());
}

protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType, String version) {
RpcMessage rpcMsg = new RpcMessage();
rpcMsg.setMessageType(messageType);
rpcMsg.setCodec(rpcMessage.getCodec()); // same with request
rpcMsg.setCompressor(rpcMessage.getCompressor());
rpcMsg.setBody(msg);
rpcMsg.setId(rpcMessage.getId());
rpcMsg.setVersion(version);
return rpcMsg;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,13 @@ class ClientHandler extends ChannelDuplexHandler {

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
RpcMessage rpcMessage = null;
if (msg instanceof ProtocolRpcMessage) {
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
processMessage(ctx, rpcMessage);
} else {
LOGGER.error("rpcMessage type error");
}
processMessage(ctx, (RpcMessage) msg);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,20 @@ public Object sendSyncRequest(String resourceId, String clientId, Object msg, bo
if (channel == null) {
throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);
}
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC, rpcContext.getVersion());
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
}



@Override
public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {
if (channel == null) {
throw new RuntimeException("client is not connected");
}
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC, rpcContext.getVersion());
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
}

Expand All @@ -87,7 +91,8 @@ public void sendAsyncRequest(Channel channel, Object msg) {
if (channel == null) {
throw new RuntimeException("client is not connected");
}
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY, rpcContext.getVersion());
super.sendAsync(channel, rpcMessage);
}

Expand All @@ -98,9 +103,10 @@ public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg
clientChannel = ChannelManager.getSameClientChannel(channel);
}
if (clientChannel != null) {
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage
? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE
: ProtocolConstants.MSGTYPE_RESPONSE);
: ProtocolConstants.MSGTYPE_RESPONSE, rpcContext.getVersion());
super.sendAsync(clientChannel, rpcMsg);
} else {
throw new RuntimeException("channel is error.");
Expand Down Expand Up @@ -163,10 +169,13 @@ class ServerHandler extends ChannelDuplexHandler {
*/
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
RpcMessage rpcMessage = null;
if (msg instanceof ProtocolRpcMessage) {
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
processMessage(ctx, rpcMessage);
} else {
LOGGER.error("rpcMessage type error");
}
processMessage(ctx, (RpcMessage) msg);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.rpc.netty;

import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.seata.core.exception.DecodeException;
import io.seata.core.protocol.ProtocolConstants;
import io.seata.core.rpc.netty.v0.ProtocolDecoderV0;
import io.seata.core.rpc.netty.v1.ProtocolDecoderV1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* <pre>
* (> 0.7.0)
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
* | magic |Proto| Full length | Head | Msg |Seria|Compr| RequestId |
* | code |colVer| (head+body) | Length |Type |lizer|ess | |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
*
* (<= 0.7.0)
* 0 1 2 3 4 6 8 10 12 14
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
* | 0xdada | flag | typecode/ | requestid |
* | | | bodylength| |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+
*
* </pre>
* <p>
* <li>Full Length: include all data </li>
* <li>Head Length: include head data from magic code to head map. </li>
* <li>Body Length: Full Length - Head Length</li>
* </p>
* https://github.com/seata/seata/issues/893
*
* @author Bughue
* @since 2.0.0
*/
public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder {

private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolDecoder.class);
private static Map<Byte, ProtocolDecoder> protocolDecoderMap;

public CompatibleProtocolDecoder() {
// default is 8M
this(ProtocolConstants.MAX_FRAME_LENGTH);
}

public CompatibleProtocolDecoder(int maxFrameLength) {
/*
int maxFrameLength,
int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3
int lengthFieldLength, FullLength is int(4B). so values is 4
int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7
int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0
*/
super(maxFrameLength, 3, 4, -7, 0);
protocolDecoderMap = ImmutableMap.<Byte, ProtocolDecoder>builder()
.put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0())
.put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1())
.build();
}

@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame;
Object decoded;
byte version;
try {
if (isV0(in)) {
decoded = in;
version = ProtocolConstants.VERSION_0;
} else {
decoded = super.decode(ctx, in);
version = decideVersion(decoded);
}

if (decoded instanceof ByteBuf) {
frame = (ByteBuf) decoded;
try {
ProtocolDecoder decoder = protocolDecoderMap.get(version);
if (decoder == null) {
throw new IllegalArgumentException("Unknown version: " + version);
}
return decoder.decodeFrame(frame);
} finally {
if (!isV0(version)) {
frame.release();
}
}
}
} catch (Exception exx) {
LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
throw new DecodeException(exx);
}
return decoded;
}

protected byte decideVersion(Object in) {
if (in instanceof ByteBuf) {
ByteBuf frame = (ByteBuf) in;
frame.markReaderIndex();
byte b0 = frame.readByte();
byte b1 = frame.readByte();
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
|| ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);
}

byte version = frame.readByte();
frame.resetReaderIndex();
return version;
}
return -1;
}


protected boolean isV0(ByteBuf in) {
boolean isV0 = false;
in.markReaderIndex();
byte b0 = in.readByte();
byte b1 = in.readByte();
byte version = in.readByte();
if (ProtocolConstants.MAGIC_CODE_BYTES[0] == b0
&& ProtocolConstants.MAGIC_CODE_BYTES[1] == b1
&& isV0(version)) {
isV0 = true;
}

in.resetReaderIndex();
return isV0;
}

protected boolean isV0(byte version) {
return version == ProtocolConstants.VERSION_0;
}
}
Loading
Loading