Skip to content

Commit

Permalink
codec version
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue committed Nov 7, 2024
1 parent 8df9eb8 commit 0853ece
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.compressor.Compressor;
import org.apache.seata.core.compressor.CompressorFactory;
import org.apache.seata.core.exception.DecodeException;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.netty.ChannelManager;
import org.apache.seata.core.rpc.netty.ProtocolDecoder;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.serializer.SerializerServiceLoader;
Expand Down Expand Up @@ -86,6 +90,10 @@ int lengthFieldLength, FullLength is int(4B). so values is 4

@Override
public RpcMessage decodeFrame(ByteBuf frame) {
return decodeFrame(null, frame);
}

public RpcMessage decodeFrame(ChannelHandlerContext ctx, ByteBuf frame) {
byte b0 = frame.readByte();
byte b1 = frame.readByte();
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
Expand Down Expand Up @@ -129,7 +137,12 @@ public RpcMessage decodeFrame(ByteBuf frame) {
bs = compressor.decompress(bs);
SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec());
if (this.supportDeSerializerTypes.contains(protocolType)) {
Serializer serializer = SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1);
String sdkVersion = "";
if(ctx != null && ctx.channel() != null){
sdkVersion = Version.getChannelVersion(ctx.channel());
sdkVersion = StringUtils.isBlank(sdkVersion) ? "" : sdkVersion;
}
Serializer serializer = SerializerServiceLoader.load(protocolType, sdkVersion);
rpcMessage.setBody(serializer.deserialize(bs));
} else {
throw new IllegalArgumentException("SerializerType not match");
Expand All @@ -148,7 +161,7 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
if (decoded instanceof ByteBuf) {
ByteBuf frame = (ByteBuf)decoded;
try {
return decodeFrame(frame);
return decodeFrame(ctx, frame);
} finally {
frame.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.netty.ChannelManager;
import org.apache.seata.core.rpc.netty.ProtocolEncoder;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.compressor.Compressor;
Expand Down Expand Up @@ -64,7 +68,12 @@ public class ProtocolEncoderV1 extends MessageToByteEncoder implements ProtocolE
private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolEncoderV1.class);


@Override
public void encode(RpcMessage message, ByteBuf out) {
doEncode(null, message, out);
}

public void doEncode(ChannelHandlerContext ctx, RpcMessage message, ByteBuf out) {
try {
ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
rpcMessage.rpcMsg2ProtocolMsg(message);
Expand Down Expand Up @@ -94,7 +103,12 @@ public void encode(RpcMessage message, ByteBuf out) {
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
// heartbeat has no body
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1);
String sdkVersion = "";
if(ctx != null && ctx.channel() != null){
sdkVersion = Version.getChannelVersion(ctx.channel());
sdkVersion = StringUtils.isBlank(sdkVersion) ? "" : sdkVersion;
}
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), sdkVersion);
bodyBytes = serializer.serialize(rpcMessage.getBody());
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
bodyBytes = compressor.compress(bodyBytes);
Expand Down Expand Up @@ -125,7 +139,7 @@ public void encode(RpcMessage message, ByteBuf out) {
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
try {
if (msg instanceof RpcMessage) {
this.encode((RpcMessage)msg, out);
this.doEncode(ctx, (RpcMessage)msg, out);
} else {
throw new UnsupportedOperationException("Not support this class:" + msg.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.common.loader.Scope;
import org.apache.seata.common.util.BufferUtils;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.protocol.AbstractMessage;
import org.apache.seata.core.protocol.IncompatibleVersionException;
import org.apache.seata.core.protocol.ProtocolConstants;
Expand All @@ -39,7 +40,7 @@ public class SeataSerializer implements Serializer {
Serializer versionSeataSerializer;

public SeataSerializer(String version) {
if (version == null || Version.isAboveOrEqualVersion071(version)) {
if (StringUtils.isBlank(version) || Version.isAboveOrEqualVersion071(version)) {
versionSeataSerializer = SeataSerializerV1.getInstance(version);
} else if (!Version.isAboveOrEqualVersion071(version)) {
versionSeataSerializer = SeataSerializerV0.getInstance();
Expand Down

0 comments on commit 0853ece

Please sign in to comment.