From ec6c1edc3c05c5a307bec7648d4c3f5e7cda568b Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Mon, 25 Dec 2023 17:07:48 +0800 Subject: [PATCH 01/15] mock server --- pom.xml | 1 + test-mock-server/pom.xml | 62 ++++++++ .../io/seata/mockserver/MockCoordinator.java | 134 ++++++++++++++++++ .../mockserver/MockNettyRemotingServer.java | 109 ++++++++++++++ .../java/io/seata/mockserver/MockServer.java | 70 +++++++++ .../java/io/seata/mockserver/call/CallRm.java | 106 ++++++++++++++ .../processor/MockHeartbeatProcessor.java | 25 ++++ .../processor/MockOnReqProcessor.java | 78 ++++++++++ .../processor/MockOnRespProcessor.java | 55 +++++++ .../processor/MockRegisterProcessor.java | 78 ++++++++++ .../processor/MockRemotingProcessor.java | 52 +++++++ .../src/main/resources/application.yml | 53 +++++++ .../src/main/resources/logback-spring.xml | 110 ++++++++++++++ 13 files changed, 933 insertions(+) create mode 100644 test-mock-server/pom.xml create mode 100644 test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java create mode 100644 test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java create mode 100644 test-mock-server/src/main/java/io/seata/mockserver/MockServer.java create mode 100644 test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java create mode 100644 test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java create mode 100644 test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java create mode 100644 test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java create mode 100644 test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java create mode 100644 test-mock-server/src/main/java/io/seata/mockserver/processor/MockRemotingProcessor.java create mode 100644 test-mock-server/src/main/resources/application.yml create mode 100644 test-mock-server/src/main/resources/logback-spring.xml diff --git a/pom.xml b/pom.xml index 049bfccac5b..c70b6d4e2e9 100644 --- a/pom.xml +++ b/pom.xml @@ -59,6 +59,7 @@ spring tcc test + test-mock-server tm metrics serializer diff --git a/test-mock-server/pom.xml b/test-mock-server/pom.xml new file mode 100644 index 00000000000..53644b17224 --- /dev/null +++ b/test-mock-server/pom.xml @@ -0,0 +1,62 @@ + + + + + io.seata + seata-parent + ${revision} + + 4.0.0 + seata-mock-server + jar + seata-mock-server + Seata mock server + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + + + + + io.seata + seata-server + ${project.version} + + + slf4j-log4j12 + org.slf4j + + + log4j + log4j + + + + + + + \ No newline at end of file diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java new file mode 100644 index 00000000000..196b1f6872c --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java @@ -0,0 +1,134 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver; + +import io.seata.core.exception.TransactionException; +import io.seata.core.model.BranchStatus; +import io.seata.core.model.GlobalStatus; +import io.seata.core.protocol.AbstractMessage; +import io.seata.core.protocol.AbstractResultMessage; +import io.seata.core.protocol.ProtocolConstants; +import io.seata.core.protocol.ResultCode; +import io.seata.core.protocol.Version; +import io.seata.core.protocol.transaction.*; +import io.seata.core.rpc.Disposable; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.RpcContext; +import io.seata.core.rpc.TransactionMessageHandler; +import io.seata.core.rpc.netty.ChannelManager; +import io.seata.mockserver.call.CallRm; +import io.seata.serializer.seata.MessageCodecFactory; +import io.seata.serializer.seata.MessageSeataCodec; +import io.seata.serializer.seata.protocol.v1.MessageCodecFactoryV1; +import io.seata.server.AbstractTCInboundHandler; + +/** + * Mock Coordinator + * + * @author minghua.xie + * @date 2023/11/14 + **/ +public class MockCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { + + RemotingServer remotingServer; + @Override + public void destroy() { + + } + + @Override + public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) { + if (!(request instanceof AbstractTransactionRequestToTC)) { + throw new IllegalArgumentException(); + } + AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request; + transactionRequest.setTCInboundHandler(this); + + return transactionRequest.handle(context); + } + + @Override + public void onResponse(AbstractResultMessage response, RpcContext context) { + response.setResultCode(ResultCode.Success); + } + + @Override + protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { + response.setXid("666"); + response.setResultCode(ResultCode.Success); + } + + @Override + protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException { + response.setGlobalStatus(GlobalStatus.Committed); + response.setResultCode(ResultCode.Success); + } + + @Override + protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) throws TransactionException { + response.setGlobalStatus(GlobalStatus.Rollbacked); + response.setResultCode(ResultCode.Success); + } + + @Override + protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException { + response.setBranchId(9L); + response.setResultCode(ResultCode.Success); + + String resourceId = request.getResourceId(); + String clientId = rpcContext.getClientId(); + + Thread thread = new Thread(() -> { + try { + Thread.sleep(1000); + BranchStatus commit = CallRm.branchCommit(remotingServer, resourceId, clientId); + BranchStatus rollback = CallRm.branchRollback(remotingServer, resourceId, clientId); + if (ProtocolConstants.VERSION_0 != Version.calcProtocolVersion(rpcContext.getVersion())) { + CallRm.deleteUndoLog(remotingServer, resourceId, clientId); + } + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread.start(); + } + + @Override + protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext) throws TransactionException { + response.setResultCode(ResultCode.Success); + } + + @Override + protected void doLockCheck(GlobalLockQueryRequest request, GlobalLockQueryResponse response, RpcContext rpcContext) throws TransactionException { + response.setResultCode(ResultCode.Success); + } + + @Override + protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext) throws TransactionException { + response.setGlobalStatus(GlobalStatus.Committed); + response.setResultCode(ResultCode.Success); + } + + @Override + protected void doGlobalReport(GlobalReportRequest request, GlobalReportResponse response, RpcContext rpcContext) throws TransactionException { + response.setGlobalStatus(GlobalStatus.Committed); + response.setResultCode(ResultCode.Success); + } + + public void setRemotingServer(RemotingServer remotingServer) { + this.remotingServer = remotingServer; + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java b/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java new file mode 100644 index 00000000000..cf51d7218db --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java @@ -0,0 +1,109 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver; + +import io.netty.channel.Channel; +import io.seata.core.protocol.MessageType; +import io.seata.core.rpc.TransactionMessageHandler; +import io.seata.core.rpc.netty.AbstractNettyRemotingServer; +import io.seata.core.rpc.netty.NettyServerConfig; +import io.seata.core.rpc.processor.server.ServerHeartbeatProcessor; +import io.seata.mockserver.processor.MockHeartbeatProcessor; +import io.seata.mockserver.processor.MockOnReqProcessor; +import io.seata.mockserver.processor.MockOnRespProcessor; +import io.seata.mockserver.processor.MockRemotingProcessor; +import io.seata.mockserver.processor.MockRegisterProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * The mock netty remoting server. + * + * @author Bughue + */ +public class MockNettyRemotingServer extends AbstractNettyRemotingServer { + + private static final Logger LOGGER = LoggerFactory.getLogger(MockNettyRemotingServer.class); + + private TransactionMessageHandler handler; + + public void setHandler(TransactionMessageHandler transactionMessageHandler) { + this.handler = transactionMessageHandler; + } + + @Override + public void init() { + // registry processor + registerProcessor(); + super.init(); + } + + /** + * Instantiates a new Rpc remoting server. + * + * @param messageExecutor the message executor + */ + public MockNettyRemotingServer(ThreadPoolExecutor messageExecutor) { + super(messageExecutor, new NettyServerConfig()); + } + + @Override + public void destroyChannel(String serverAddress, Channel channel) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("will destroy channel:{},address:{}", channel, serverAddress); + } + channel.disconnect(); + channel.close(); + } + + private void registerProcessor() { + // 1. registry on request message processor + MockOnReqProcessor onRequestProcessor = new MockOnReqProcessor(this, handler); + super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor); + + // 2. registry on response message processor + MockOnRespProcessor onResponseProcessor = new MockOnRespProcessor(this, handler,getFutures()); + super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor); + + // 3. registry rm reg processor + MockRegisterProcessor regRmProcessor = new MockRegisterProcessor(this, MockRegisterProcessor.Role.RM); + super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor); + + // 4. registry tm reg processor + MockRegisterProcessor regTmProcessor = new MockRegisterProcessor(this, MockRegisterProcessor.Role.TM); + super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null); + + // 5. registry heartbeat message processor + MockHeartbeatProcessor heartbeatMessageProcessor = new MockHeartbeatProcessor(this,handler); + super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null); + } + + @Override + public void destroy() { + super.destroy(); + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java new file mode 100644 index 00000000000..3a7e6297f38 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java @@ -0,0 +1,70 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver; + +import io.seata.common.XID; +import io.seata.common.thread.NamedThreadFactory; +import io.seata.common.util.NetUtil; +import io.seata.server.UUIDGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * The type Mock Server. + * + * @author Bughue + */ +@SpringBootApplication +public class MockServer { + + protected static final Logger LOGGER = LoggerFactory.getLogger(MockServer.class); + + /** + * The entry point of application. + * + * @param args the input arguments + */ + public static void main(String[] args) { + SpringApplication.run(MockServer.class, args); + + ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(50, + 50, 500, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(20000), + new NamedThreadFactory("ServerHandlerThread", 500), new ThreadPoolExecutor.CallerRunsPolicy()); + + MockNettyRemotingServer nettyRemotingServer = new MockNettyRemotingServer(workingThreads); + + // set registry + XID.setIpAddress(NetUtil.getLocalIp()); + XID.setPort(8092); + // init snowflake for transactionId, branchId + UUIDGenerator.init(1L); + + MockCoordinator coordinator = new MockCoordinator(); + coordinator.setRemotingServer(nettyRemotingServer); + nettyRemotingServer.setHandler(coordinator); + nettyRemotingServer.init(); + + LOGGER.info("pid info: "+ ManagementFactory.getRuntimeMXBean().getName()); + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java b/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java new file mode 100644 index 00000000000..b693fb9a3a0 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java @@ -0,0 +1,106 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.call; + +import io.netty.channel.Channel; +import io.seata.core.model.BranchStatus; +import io.seata.core.model.BranchType; +import io.seata.core.protocol.transaction.AbstractBranchEndRequest; +import io.seata.core.protocol.transaction.BranchCommitRequest; +import io.seata.core.protocol.transaction.BranchCommitResponse; +import io.seata.core.protocol.transaction.BranchRollbackRequest; +import io.seata.core.protocol.transaction.BranchRollbackResponse; +import io.seata.core.protocol.transaction.UndoLogDeleteRequest; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.netty.ChannelManager; + +import java.util.concurrent.TimeoutException; + +/** + * call rm + * + * @author minghua.xie + * @date 2023/11/21 + **/ +public class CallRm { + + /** + * call branchCommit :TYPE_BRANCH_COMMIT = 3 , TYPE_BRANCH_COMMIT_RESULT = 4 + * + * @param remotingServer + * @return + */ + public static BranchStatus branchCommit(RemotingServer remotingServer, String resourceId, String clientId) { + BranchCommitRequest request = new BranchCommitRequest(); + setReq(request, resourceId); + + try { + BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest( + resourceId, clientId, request, false); + return response.getBranchStatus(); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } + + + /** + * call branchRollback :TYPE_BRANCH_ROLLBACK = 5 , TYPE_BRANCH_ROLLBACK_RESULT = 6 + * + * @param remotingServer + * @return + */ + public static BranchStatus branchRollback(RemotingServer remotingServer, String resourceId, String clientId) { + BranchRollbackRequest request = new BranchRollbackRequest(); + setReq(request, resourceId); + + try { + BranchRollbackResponse response = (BranchRollbackResponse) remotingServer.sendSyncRequest( + resourceId, clientId, request, false); + return response.getBranchStatus(); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } + + /** + * call deleteUndoLog :TYPE_RM_DELETE_UNDOLOG = 111 + * + * @param remotingServer + * @return + */ + public static void deleteUndoLog(RemotingServer remotingServer, String resourceId, String clientId) { + UndoLogDeleteRequest request = new UndoLogDeleteRequest(); + request.setResourceId(resourceId); + request.setSaveDays((short) 1); + request.setBranchType(BranchType.TCC); + try { + Channel channel = ChannelManager.getChannel(resourceId, clientId, false); + remotingServer.sendAsyncRequest(channel, request); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void setReq(AbstractBranchEndRequest request, String resourceId) { + request.setXid("1"); + request.setBranchId(1L); + request.setResourceId(resourceId); + request.setApplicationData("{\"k\":\"v\"}"); + request.setBranchType(BranchType.TCC); + // todo AT SAGA + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java new file mode 100644 index 00000000000..ba49135ebd9 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java @@ -0,0 +1,25 @@ +package io.seata.mockserver.processor; + +import io.netty.channel.ChannelHandlerContext; +import io.seata.core.protocol.HeartbeatMessage; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.TransactionMessageHandler; + +/** + * ? + * + * @author minghua.xie + * @date 2023/11/29 + **/ +public class MockHeartbeatProcessor extends MockRemotingProcessor{ + public MockHeartbeatProcessor(RemotingServer remotingServer, TransactionMessageHandler handler) { + super(remotingServer, handler); + } + + @Override + public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + super.process(ctx, rpcMessage); + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), HeartbeatMessage.PONG); + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java new file mode 100644 index 00000000000..4e72dbd2d83 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java @@ -0,0 +1,78 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.processor; + +import io.netty.channel.ChannelHandlerContext; +import io.seata.core.protocol.AbstractMessage; +import io.seata.core.protocol.AbstractResultMessage; +import io.seata.core.protocol.MergeResultMessage; +import io.seata.core.protocol.MergedWarpMessage; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.RpcContext; +import io.seata.core.rpc.TransactionMessageHandler; +import io.seata.core.rpc.netty.ChannelManager; +import io.seata.core.rpc.processor.server.ServerOnRequestProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Mock Remoting Processor + * + * @author minghua.xie + * @date 2023/11/14 + **/ +public class MockOnReqProcessor extends MockRemotingProcessor { + protected static final Logger LOGGER = LoggerFactory.getLogger(MockOnReqProcessor.class); + + + public MockOnReqProcessor(RemotingServer remotingServer, TransactionMessageHandler handler) { + super(remotingServer, handler); + } + @Override + public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + super.process(ctx, rpcMessage); + Object message = rpcMessage.getBody(); + + RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel()); + + // the batch send request message + if (message instanceof MergedWarpMessage) { + MergedWarpMessage mmsg = (MergedWarpMessage) message; + MergeResultMessage resultMessage = new MergeResultMessage(); + List resList = new ArrayList<>(); + for (int i = 0; i < mmsg.msgs.size(); i++) { + AbstractMessage msg = mmsg.msgs.get(i); + resList.add(handler.onRequest(msg, rpcContext)); + } + AbstractResultMessage[] resultMsgs = Arrays.copyOf(resList.toArray(), resList.size(), AbstractResultMessage[].class); + resultMessage.setMsgs(resultMsgs); + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage); + LOGGER.info("sendAsyncResponse: {}", resultMessage); + } else { + final AbstractMessage msg = (AbstractMessage) message; + AbstractResultMessage result = handler.onRequest(msg, rpcContext); + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result); + LOGGER.info("sendAsyncResponse: {}", result); + } + } + + +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java new file mode 100644 index 00000000000..a88b312473e --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java @@ -0,0 +1,55 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.processor; + +import io.netty.channel.ChannelHandlerContext; +import io.seata.core.protocol.MessageFuture; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.TransactionMessageHandler; +import io.seata.core.rpc.netty.ChannelManager; +import io.seata.core.rpc.processor.RemotingProcessor; + +import java.util.concurrent.ConcurrentMap; + +/** + * Mock Remoting Processor + * + * @author minghua.xie + * @date 2023/11/14 + **/ +public class MockOnRespProcessor extends MockRemotingProcessor { + + private ConcurrentMap futures; + + + public MockOnRespProcessor(RemotingServer remotingServer, TransactionMessageHandler handler + , ConcurrentMap futures) { + super(remotingServer, handler); + this.futures = futures; + } + + @Override + public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + super.process(ctx, rpcMessage); + MessageFuture messageFuture = futures.remove(rpcMessage.getId()); + if (messageFuture != null) { + messageFuture.setResultMessage(rpcMessage.getBody()); + } + } + + +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java new file mode 100644 index 00000000000..fea7a6a8bf0 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java @@ -0,0 +1,78 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.processor; + +import io.netty.channel.ChannelHandlerContext; +import io.seata.core.protocol.HeartbeatMessage; +import io.seata.core.protocol.RegisterRMRequest; +import io.seata.core.protocol.RegisterRMResponse; +import io.seata.core.protocol.RegisterTMRequest; +import io.seata.core.protocol.RegisterTMResponse; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.protocol.Version; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.netty.ChannelManager; +import io.seata.core.rpc.processor.RemotingProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mock Remoting Processor + * + * @author minghua.xie + * @date 2023/11/14 + **/ +public class MockRegisterProcessor implements RemotingProcessor { + + protected static final Logger LOGGER = LoggerFactory.getLogger(MockRegisterProcessor.class); + private RemotingServer remotingServer; + private Role role; + + public MockRegisterProcessor(RemotingServer remotingServer, Role role) { + this.remotingServer = remotingServer; + this.role = role; + } + + @Override + public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + if (role == Role.TM) { + RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody(); + LOGGER.info("message = " + message); + + ChannelManager.registerTMChannel(message, ctx.channel()); + Version.putChannelVersion(ctx.channel(), message.getVersion()); + + RegisterTMResponse resp = new RegisterTMResponse(); + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resp); + LOGGER.info("sendAsyncResponse: {}", resp); + } else if (role == Role.RM) { + RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody(); + LOGGER.info("message = " + message); + + ChannelManager.registerRMChannel(message, ctx.channel()); + Version.putChannelVersion(ctx.channel(), message.getVersion()); + + RegisterRMResponse resp = new RegisterRMResponse(); + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resp); + LOGGER.info("sendAsyncResponse: {}", resp); + } + } + + + public static enum Role { + TM, RM + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRemotingProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRemotingProcessor.java new file mode 100644 index 00000000000..011c0cb7afd --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRemotingProcessor.java @@ -0,0 +1,52 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.processor; + +import io.netty.channel.ChannelHandlerContext; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.TransactionMessageHandler; +import io.seata.core.rpc.processor.RemotingProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mock Remoting Processor + * + * @author minghua.xie + * @date 2023/11/14 + **/ +public class MockRemotingProcessor implements RemotingProcessor { + + protected static final Logger LOGGER = LoggerFactory.getLogger(MockRemotingProcessor.class); + protected RemotingServer remotingServer; + protected final TransactionMessageHandler handler; + + + public MockRemotingProcessor(RemotingServer remotingServer, TransactionMessageHandler handler) { + this.remotingServer = remotingServer; + this.handler = handler; + } + + @Override + public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + Object message = rpcMessage.getBody(); + LOGGER.info("process message : " + message); + + } + + +} diff --git a/test-mock-server/src/main/resources/application.yml b/test-mock-server/src/main/resources/application.yml new file mode 100644 index 00000000000..37cee3e20bf --- /dev/null +++ b/test-mock-server/src/main/resources/application.yml @@ -0,0 +1,53 @@ +# Copyright 1999-2019 Seata.io Group. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +server: + port: 7091 + +spring: + application: + name: seata-server + +logging: + config: classpath:logback-spring.xml + file: + path: ${log.home:${user.home}/logs/seata} + extend: + logstash-appender: + destination: 127.0.0.1:4560 + kafka-appender: + bootstrap-servers: 127.0.0.1:9092 + topic: logback_to_logstash + +console: + user: + username: seata + password: seata +seata: + config: + # support: nacos, consul, apollo, zk, etcd3 + type: file + registry: + # support: nacos, eureka, redis, zk, consul, etcd3, sofa + type: file + store: + # support: file 、 db 、 redis 、 raft + mode: file + # server: + # service-port: 8091 #If not configured, the default is '${server.port} + 1000' + security: + secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 + tokenValidityInMilliseconds: 1800000 + ignore: + urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/** diff --git a/test-mock-server/src/main/resources/logback-spring.xml b/test-mock-server/src/main/resources/logback-spring.xml new file mode 100644 index 00000000000..490141ac6c5 --- /dev/null +++ b/test-mock-server/src/main/resources/logback-spring.xml @@ -0,0 +1,110 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + true + 0 + 2048 + true + + + + + true + 0 + 2048 + true + + + + true + 0 + 1024 + true + + + + true + 0 + 1024 + true + + + + + + + + + + + + + + + + + + + From 34ae9f71d8c42c7d3360b521ea1d1842fe6e9f95 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 26 Dec 2023 14:43:20 +0800 Subject: [PATCH 02/15] license --- .../io/seata/mockserver/MockCoordinator.java | 35 ++++++++----------- .../mockserver/MockNettyRemotingServer.java | 24 ++++++------- .../java/io/seata/mockserver/MockServer.java | 24 ++++++------- .../java/io/seata/mockserver/call/CallRm.java | 25 +++++++------ .../processor/MockHeartbeatProcessor.java | 20 +++++++++-- .../processor/MockOnReqProcessor.java | 25 +++++++------ .../processor/MockOnRespProcessor.java | 25 +++++++------ .../processor/MockRegisterProcessor.java | 25 +++++++------ .../processor/MockRemotingProcessor.java | 26 +++++++------- 9 files changed, 116 insertions(+), 113 deletions(-) diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java index 196b1f6872c..5ebe54ae653 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java @@ -1,17 +1,18 @@ /* - * Copyright 1999-2019 Seata.io Group. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.seata.mockserver; @@ -28,18 +29,12 @@ import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.RpcContext; import io.seata.core.rpc.TransactionMessageHandler; -import io.seata.core.rpc.netty.ChannelManager; import io.seata.mockserver.call.CallRm; -import io.seata.serializer.seata.MessageCodecFactory; -import io.seata.serializer.seata.MessageSeataCodec; -import io.seata.serializer.seata.protocol.v1.MessageCodecFactoryV1; import io.seata.server.AbstractTCInboundHandler; /** * Mock Coordinator * - * @author minghua.xie - * @date 2023/11/14 **/ public class MockCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { @@ -96,9 +91,9 @@ protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterRes Thread.sleep(1000); BranchStatus commit = CallRm.branchCommit(remotingServer, resourceId, clientId); BranchStatus rollback = CallRm.branchRollback(remotingServer, resourceId, clientId); - if (ProtocolConstants.VERSION_0 != Version.calcProtocolVersion(rpcContext.getVersion())) { - CallRm.deleteUndoLog(remotingServer, resourceId, clientId); - } +// if (ProtocolConstants.VERSION_0 != Version.calcProtocolVersion(rpcContext.getVersion())) { +// CallRm.deleteUndoLog(remotingServer, resourceId, clientId); +// } } catch (Exception e) { e.printStackTrace(); } diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java b/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java index cf51d7218db..53e13624599 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java @@ -1,17 +1,18 @@ /* - * Copyright 1999-2019 Seata.io Group. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.seata.mockserver; @@ -34,7 +35,6 @@ /** * The mock netty remoting server. * - * @author Bughue */ public class MockNettyRemotingServer extends AbstractNettyRemotingServer { diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java index 3a7e6297f38..9650df1360d 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java @@ -1,17 +1,18 @@ /* - * Copyright 1999-2019 Seata.io Group. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.seata.mockserver; @@ -32,7 +33,6 @@ /** * The type Mock Server. * - * @author Bughue */ @SpringBootApplication public class MockServer { diff --git a/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java b/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java index b693fb9a3a0..5ba1a2aee4d 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java @@ -1,17 +1,18 @@ /* - * Copyright 1999-2019 Seata.io Group. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.seata.mockserver.call; @@ -32,8 +33,6 @@ /** * call rm * - * @author minghua.xie - * @date 2023/11/21 **/ public class CallRm { diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java index ba49135ebd9..eaa80684858 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.seata.mockserver.processor; import io.netty.channel.ChannelHandlerContext; @@ -7,10 +23,8 @@ import io.seata.core.rpc.TransactionMessageHandler; /** - * ? + * Mock Heartbeat Processor * - * @author minghua.xie - * @date 2023/11/29 **/ public class MockHeartbeatProcessor extends MockRemotingProcessor{ public MockHeartbeatProcessor(RemotingServer remotingServer, TransactionMessageHandler handler) { diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java index 4e72dbd2d83..4d7b0c3c1a5 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java @@ -1,17 +1,18 @@ /* - * Copyright 1999-2019 Seata.io Group. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.seata.mockserver.processor; @@ -36,8 +37,6 @@ /** * Mock Remoting Processor * - * @author minghua.xie - * @date 2023/11/14 **/ public class MockOnReqProcessor extends MockRemotingProcessor { protected static final Logger LOGGER = LoggerFactory.getLogger(MockOnReqProcessor.class); diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java index a88b312473e..1ec439bdcd2 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java @@ -1,17 +1,18 @@ /* - * Copyright 1999-2019 Seata.io Group. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.seata.mockserver.processor; @@ -28,8 +29,6 @@ /** * Mock Remoting Processor * - * @author minghua.xie - * @date 2023/11/14 **/ public class MockOnRespProcessor extends MockRemotingProcessor { diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java index fea7a6a8bf0..5c65e25ef06 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java @@ -1,17 +1,18 @@ /* - * Copyright 1999-2019 Seata.io Group. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.seata.mockserver.processor; @@ -32,8 +33,6 @@ /** * Mock Remoting Processor * - * @author minghua.xie - * @date 2023/11/14 **/ public class MockRegisterProcessor implements RemotingProcessor { diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRemotingProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRemotingProcessor.java index 011c0cb7afd..c95ea4b19a3 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRemotingProcessor.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRemotingProcessor.java @@ -1,17 +1,18 @@ /* - * Copyright 1999-2019 Seata.io Group. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.seata.mockserver.processor; @@ -25,9 +26,6 @@ /** * Mock Remoting Processor - * - * @author minghua.xie - * @date 2023/11/14 **/ public class MockRemotingProcessor implements RemotingProcessor { From 45e85f5978d1e730453775bd081499a9b825f2f1 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 26 Dec 2023 14:54:51 +0800 Subject: [PATCH 03/15] style --- .../io/seata/mockserver/MockCoordinator.java | 22 +++++++++++++++---- .../mockserver/MockNettyRemotingServer.java | 7 ++---- .../java/io/seata/mockserver/MockServer.java | 3 +-- .../processor/MockOnReqProcessor.java | 3 +-- .../processor/MockOnRespProcessor.java | 3 --- .../processor/MockRegisterProcessor.java | 2 -- 6 files changed, 22 insertions(+), 18 deletions(-) diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java index 5ebe54ae653..aae61226896 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java @@ -21,10 +21,24 @@ import io.seata.core.model.GlobalStatus; import io.seata.core.protocol.AbstractMessage; import io.seata.core.protocol.AbstractResultMessage; -import io.seata.core.protocol.ProtocolConstants; import io.seata.core.protocol.ResultCode; -import io.seata.core.protocol.Version; -import io.seata.core.protocol.transaction.*; +import io.seata.core.protocol.transaction.AbstractTransactionRequestToTC; +import io.seata.core.protocol.transaction.BranchRegisterRequest; +import io.seata.core.protocol.transaction.BranchRegisterResponse; +import io.seata.core.protocol.transaction.BranchReportRequest; +import io.seata.core.protocol.transaction.BranchReportResponse; +import io.seata.core.protocol.transaction.GlobalBeginRequest; +import io.seata.core.protocol.transaction.GlobalBeginResponse; +import io.seata.core.protocol.transaction.GlobalCommitRequest; +import io.seata.core.protocol.transaction.GlobalCommitResponse; +import io.seata.core.protocol.transaction.GlobalLockQueryRequest; +import io.seata.core.protocol.transaction.GlobalLockQueryResponse; +import io.seata.core.protocol.transaction.GlobalReportRequest; +import io.seata.core.protocol.transaction.GlobalReportResponse; +import io.seata.core.protocol.transaction.GlobalRollbackRequest; +import io.seata.core.protocol.transaction.GlobalRollbackResponse; +import io.seata.core.protocol.transaction.GlobalStatusRequest; +import io.seata.core.protocol.transaction.GlobalStatusResponse; import io.seata.core.rpc.Disposable; import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.RpcContext; @@ -34,11 +48,11 @@ /** * Mock Coordinator - * **/ public class MockCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { RemotingServer remotingServer; + @Override public void destroy() { diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java b/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java index 53e13624599..43ba0263114 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java @@ -21,11 +21,9 @@ import io.seata.core.rpc.TransactionMessageHandler; import io.seata.core.rpc.netty.AbstractNettyRemotingServer; import io.seata.core.rpc.netty.NettyServerConfig; -import io.seata.core.rpc.processor.server.ServerHeartbeatProcessor; import io.seata.mockserver.processor.MockHeartbeatProcessor; import io.seata.mockserver.processor.MockOnReqProcessor; import io.seata.mockserver.processor.MockOnRespProcessor; -import io.seata.mockserver.processor.MockRemotingProcessor; import io.seata.mockserver.processor.MockRegisterProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +32,6 @@ /** * The mock netty remoting server. - * */ public class MockNettyRemotingServer extends AbstractNettyRemotingServer { @@ -85,7 +82,7 @@ private void registerProcessor() { super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor); // 2. registry on response message processor - MockOnRespProcessor onResponseProcessor = new MockOnRespProcessor(this, handler,getFutures()); + MockOnRespProcessor onResponseProcessor = new MockOnRespProcessor(this, handler, getFutures()); super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor); @@ -98,7 +95,7 @@ private void registerProcessor() { super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null); // 5. registry heartbeat message processor - MockHeartbeatProcessor heartbeatMessageProcessor = new MockHeartbeatProcessor(this,handler); + MockHeartbeatProcessor heartbeatMessageProcessor = new MockHeartbeatProcessor(this, handler); super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null); } diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java index 9650df1360d..ab2e34a16ac 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java @@ -32,7 +32,6 @@ /** * The type Mock Server. - * */ @SpringBootApplication public class MockServer { @@ -65,6 +64,6 @@ public static void main(String[] args) { nettyRemotingServer.setHandler(coordinator); nettyRemotingServer.init(); - LOGGER.info("pid info: "+ ManagementFactory.getRuntimeMXBean().getName()); + LOGGER.info("pid info: " + ManagementFactory.getRuntimeMXBean().getName()); } } diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java index 4d7b0c3c1a5..d5227515ffc 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java @@ -26,7 +26,6 @@ import io.seata.core.rpc.RpcContext; import io.seata.core.rpc.TransactionMessageHandler; import io.seata.core.rpc.netty.ChannelManager; -import io.seata.core.rpc.processor.server.ServerOnRequestProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +35,6 @@ /** * Mock Remoting Processor - * **/ public class MockOnReqProcessor extends MockRemotingProcessor { protected static final Logger LOGGER = LoggerFactory.getLogger(MockOnReqProcessor.class); @@ -45,6 +43,7 @@ public class MockOnReqProcessor extends MockRemotingProcessor { public MockOnReqProcessor(RemotingServer remotingServer, TransactionMessageHandler handler) { super(remotingServer, handler); } + @Override public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { super.process(ctx, rpcMessage); diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java index 1ec439bdcd2..5856f3e0c6b 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java @@ -21,14 +21,11 @@ import io.seata.core.protocol.RpcMessage; import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.TransactionMessageHandler; -import io.seata.core.rpc.netty.ChannelManager; -import io.seata.core.rpc.processor.RemotingProcessor; import java.util.concurrent.ConcurrentMap; /** * Mock Remoting Processor - * **/ public class MockOnRespProcessor extends MockRemotingProcessor { diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java index 5c65e25ef06..225bbbfd830 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java @@ -17,7 +17,6 @@ package io.seata.mockserver.processor; import io.netty.channel.ChannelHandlerContext; -import io.seata.core.protocol.HeartbeatMessage; import io.seata.core.protocol.RegisterRMRequest; import io.seata.core.protocol.RegisterRMResponse; import io.seata.core.protocol.RegisterTMRequest; @@ -32,7 +31,6 @@ /** * Mock Remoting Processor - * **/ public class MockRegisterProcessor implements RemotingProcessor { From ad4064dcb6b1dcd96641ee12e8c52e472ec94514 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 26 Dec 2023 15:05:07 +0800 Subject: [PATCH 04/15] style --- .../io/seata/mockserver/processor/MockHeartbeatProcessor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java index eaa80684858..84f0c5f0650 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java @@ -24,9 +24,8 @@ /** * Mock Heartbeat Processor - * **/ -public class MockHeartbeatProcessor extends MockRemotingProcessor{ +public class MockHeartbeatProcessor extends MockRemotingProcessor { public MockHeartbeatProcessor(RemotingServer remotingServer, TransactionMessageHandler handler) { super(remotingServer, handler); } From 777bdb7814cb9f334b15024c40f63a242f2d7379 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 26 Dec 2023 15:10:18 +0800 Subject: [PATCH 05/15] license --- .../src/main/resources/application.yml | 24 +++++++------- .../src/main/resources/logback-spring.xml | 31 ++++++++++--------- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/test-mock-server/src/main/resources/application.yml b/test-mock-server/src/main/resources/application.yml index 37cee3e20bf..062444485bf 100644 --- a/test-mock-server/src/main/resources/application.yml +++ b/test-mock-server/src/main/resources/application.yml @@ -1,17 +1,19 @@ -# Copyright 1999-2019 Seata.io Group. # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - server: port: 7091 diff --git a/test-mock-server/src/main/resources/logback-spring.xml b/test-mock-server/src/main/resources/logback-spring.xml index 490141ac6c5..7f9f51ee841 100644 --- a/test-mock-server/src/main/resources/logback-spring.xml +++ b/test-mock-server/src/main/resources/logback-spring.xml @@ -1,19 +1,22 @@ + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> From 8b92b8d52dd2d9656472743e229351952cfdba13 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 26 Dec 2023 15:12:37 +0800 Subject: [PATCH 06/15] license --- test-mock-server/pom.xml | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/test-mock-server/pom.xml b/test-mock-server/pom.xml index 53644b17224..6b816c2a1ea 100644 --- a/test-mock-server/pom.xml +++ b/test-mock-server/pom.xml @@ -1,19 +1,22 @@ + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> From cf0bfbb5cb243b99f3f93308e5d6ffbf2e0ab1c1 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Fri, 29 Dec 2023 14:42:21 +0800 Subject: [PATCH 07/15] fix --- test-mock-server/pom.xml | 3 --- .../main/java/io/seata/mockserver/MockCoordinator.java | 8 ++++++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/test-mock-server/pom.xml b/test-mock-server/pom.xml index 6b816c2a1ea..f40078b5ec4 100644 --- a/test-mock-server/pom.xml +++ b/test-mock-server/pom.xml @@ -36,9 +36,6 @@ org.apache.maven.plugins maven-deploy-plugin - - true - diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java index aae61226896..4f1852d641a 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java @@ -45,6 +45,8 @@ import io.seata.core.rpc.TransactionMessageHandler; import io.seata.mockserver.call.CallRm; import io.seata.server.AbstractTCInboundHandler; +import io.seata.server.UUIDGenerator; +import io.seata.server.session.GlobalSession; /** * Mock Coordinator @@ -76,7 +78,9 @@ public void onResponse(AbstractResultMessage response, RpcContext context) { @Override protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { - response.setXid("666"); + GlobalSession session = GlobalSession.createGlobalSession(rpcContext.getApplicationId(), + rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout()); + response.setXid(session.getXid()); response.setResultCode(ResultCode.Success); } @@ -94,7 +98,7 @@ protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackRes @Override protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException { - response.setBranchId(9L); + response.setBranchId(UUIDGenerator.generateUUID()); response.setResultCode(ResultCode.Success); String resourceId = request.getResourceId(); From c8455da1a682da9a2ecf0534576731fc11cc2243 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Fri, 29 Dec 2023 15:53:21 +0800 Subject: [PATCH 08/15] expect --- .../mockserver/ExpectTransactionResult.java | 51 +++++++ .../io/seata/mockserver/MockCoordinator.java | 132 ++++++++++++++---- .../java/io/seata/mockserver/MockServer.java | 2 +- .../controller/MockHelpController.java | 55 ++++++++ 4 files changed, 208 insertions(+), 32 deletions(-) create mode 100644 test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java create mode 100644 test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java diff --git a/test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java b/test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java new file mode 100644 index 00000000000..550b0a640c4 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver; + + +public enum ExpectTransactionResult { + + AllCommitted(0, "all success"), + AllRollbacked(1, "all rollback"), + PhaseOneTimeoutRollbacked(2, "phase one failed"); + + private final int code; + private final String desc; + + ExpectTransactionResult(int code, String desc) { + this.code = code; + this.desc = desc; + } + + /** + * Gets code. + * + * @return the code + */ + public int getCode() { + return code; + } + + public static ExpectTransactionResult covert(int code) { + for (ExpectTransactionResult result : ExpectTransactionResult.values()) { + if (result.getCode() == code) { + return result; + } + } + return null; + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java index 4f1852d641a..a8f24bb4a7f 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java @@ -16,29 +16,14 @@ */ package io.seata.mockserver; +import io.seata.common.util.CollectionUtils; import io.seata.core.exception.TransactionException; import io.seata.core.model.BranchStatus; import io.seata.core.model.GlobalStatus; import io.seata.core.protocol.AbstractMessage; import io.seata.core.protocol.AbstractResultMessage; import io.seata.core.protocol.ResultCode; -import io.seata.core.protocol.transaction.AbstractTransactionRequestToTC; -import io.seata.core.protocol.transaction.BranchRegisterRequest; -import io.seata.core.protocol.transaction.BranchRegisterResponse; -import io.seata.core.protocol.transaction.BranchReportRequest; -import io.seata.core.protocol.transaction.BranchReportResponse; -import io.seata.core.protocol.transaction.GlobalBeginRequest; -import io.seata.core.protocol.transaction.GlobalBeginResponse; -import io.seata.core.protocol.transaction.GlobalCommitRequest; -import io.seata.core.protocol.transaction.GlobalCommitResponse; -import io.seata.core.protocol.transaction.GlobalLockQueryRequest; -import io.seata.core.protocol.transaction.GlobalLockQueryResponse; -import io.seata.core.protocol.transaction.GlobalReportRequest; -import io.seata.core.protocol.transaction.GlobalReportResponse; -import io.seata.core.protocol.transaction.GlobalRollbackRequest; -import io.seata.core.protocol.transaction.GlobalRollbackResponse; -import io.seata.core.protocol.transaction.GlobalStatusRequest; -import io.seata.core.protocol.transaction.GlobalStatusResponse; +import io.seata.core.protocol.transaction.*; import io.seata.core.rpc.Disposable; import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.RpcContext; @@ -46,15 +31,52 @@ import io.seata.mockserver.call.CallRm; import io.seata.server.AbstractTCInboundHandler; import io.seata.server.UUIDGenerator; +import io.seata.server.coordinator.DefaultCoordinator; +import io.seata.server.session.BranchSession; import io.seata.server.session.GlobalSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.IntStream; /** * Mock Coordinator **/ public class MockCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { + protected static final Logger LOGGER = LoggerFactory.getLogger(MockCoordinator.class); + RemotingServer remotingServer; + private static MockCoordinator coordinator; + + private Map expectTransactionResultMap; + private Map expectRetryTimesMap; + private Map> branchMap; + + private MockCoordinator() { + } + + + public static MockCoordinator getInstance() { + if (coordinator == null) { + synchronized (MockCoordinator.class) { + if (coordinator == null) { + coordinator = new MockCoordinator(); + coordinator.expectTransactionResultMap = new ConcurrentHashMap<>(); + coordinator.expectRetryTimesMap = new ConcurrentHashMap<>(); + coordinator.branchMap = new ConcurrentHashMap<>(); + } + } + } + return coordinator; + } + + @Override public void destroy() { @@ -80,6 +102,7 @@ public void onResponse(AbstractResultMessage response, RpcContext context) { protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { GlobalSession session = GlobalSession.createGlobalSession(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout()); + expectTransactionResultMap.putIfAbsent(session.getXid(), ExpectTransactionResult.AllCommitted); response.setXid(session.getXid()); response.setResultCode(ResultCode.Success); } @@ -88,35 +111,73 @@ protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse res protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException { response.setGlobalStatus(GlobalStatus.Committed); response.setResultCode(ResultCode.Success); + + int retry = expectRetryTimesMap.getOrDefault(request.getXid(),0); + List branchSessions = branchMap.get(request.getXid()); + if(CollectionUtils.isEmpty(branchSessions)){ + LOGGER.info("branchSessions is empty,XID=" + request.getXid()); + } + branchSessions.forEach(branch -> { + CallRm.branchCommit(remotingServer, branch.getResourceId(), branch.getClientId()); + IntStream.range(0, retry).forEach(i -> + CallRm.branchCommit(remotingServer, branch.getResourceId(), branch.getClientId())); + }); } @Override protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) throws TransactionException { response.setGlobalStatus(GlobalStatus.Rollbacked); response.setResultCode(ResultCode.Success); + + + int retry = expectRetryTimesMap.getOrDefault(request.getXid(),0); + List branchSessions = branchMap.get(request.getXid()); + if(CollectionUtils.isEmpty(branchSessions)){ + LOGGER.info("branchSessions is empty,XID=" + request.getXid()); + } + branchSessions.forEach(branch -> { + CallRm.branchRollback(remotingServer, branch.getResourceId(), branch.getClientId()); + IntStream.range(0, retry).forEach(i -> + CallRm.branchRollback(remotingServer, branch.getResourceId(), branch.getClientId())); + }); } @Override protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException { - response.setBranchId(UUIDGenerator.generateUUID()); - response.setResultCode(ResultCode.Success); - String resourceId = request.getResourceId(); - String clientId = rpcContext.getClientId(); + BranchSession branchSession = new BranchSession(request.getBranchType()); + + String xid = request.getXid(); + branchSession.setXid(xid); +// branchSession.setTransactionId(request.getTransactionId()); + branchSession.setBranchId(UUIDGenerator.generateUUID()); + branchSession.setResourceId(request.getResourceId()); + branchSession.setLockKey(request.getLockKey()); + branchSession.setClientId(rpcContext.getClientId()); + branchSession.setApplicationData(request.getApplicationData()); + branchSession.setStatus(BranchStatus.Registered); + branchMap.compute(xid, (key, val) -> { + if (val == null) { + val = new ArrayList<>(); + } + val.add(branchSession); + return val; + }); + + response.setBranchId(branchSession.getBranchId()); + response.setResultCode(ResultCode.Success); - Thread thread = new Thread(() -> { - try { - Thread.sleep(1000); - BranchStatus commit = CallRm.branchCommit(remotingServer, resourceId, clientId); - BranchStatus rollback = CallRm.branchRollback(remotingServer, resourceId, clientId); +// Thread thread = new Thread(() -> { +// try { +// Thread.sleep(1000); // if (ProtocolConstants.VERSION_0 != Version.calcProtocolVersion(rpcContext.getVersion())) { // CallRm.deleteUndoLog(remotingServer, resourceId, clientId); // } - } catch (Exception e) { - e.printStackTrace(); - } - }); - thread.start(); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// }); +// thread.start(); } @Override @@ -144,4 +205,13 @@ protected void doGlobalReport(GlobalReportRequest request, GlobalReportResponse public void setRemotingServer(RemotingServer remotingServer) { this.remotingServer = remotingServer; } + + + public void setExpectedResult(String xid, ExpectTransactionResult expected) { + expectTransactionResultMap.put(xid, expected); + } + + public void setExpectedRetry(String xid, int times) { + expectRetryTimesMap.put(xid, times); + } } diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java index ab2e34a16ac..8795f04523f 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java @@ -59,7 +59,7 @@ public static void main(String[] args) { // init snowflake for transactionId, branchId UUIDGenerator.init(1L); - MockCoordinator coordinator = new MockCoordinator(); + MockCoordinator coordinator = MockCoordinator.getInstance(); coordinator.setRemotingServer(nettyRemotingServer); nettyRemotingServer.setHandler(coordinator); nettyRemotingServer.init(); diff --git a/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java b/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java new file mode 100644 index 00000000000..1af2f0f8715 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.controller; + +import io.seata.mockserver.ExpectTransactionResult; +import io.seata.mockserver.MockCoordinator; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; + +/** + * ? + * + * @author minghua.xie + * @date 2023/12/29 + **/ +@RequestMapping("/help") +public class MockHelpController { + + static String OK = "ok"; + + @GetMapping("/health") + public String health() { + return OK; + } + + @PostMapping("/expect/status") + public String expectTransactionResult(@RequestParam String xid, @RequestParam int code) { + MockCoordinator.getInstance().setExpectedResult(xid, ExpectTransactionResult.covert(code)); + return OK; + } + + @PostMapping("/expect/retry") + public String expectTransactionRetry(@RequestParam String xid, @RequestParam int times) { + MockCoordinator.getInstance().setExpectedRetry(xid, times); + return OK; + } + + +} From a27ba755d655c568f3baecf3f3a683b64a474af8 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Fri, 29 Dec 2023 15:54:04 +0800 Subject: [PATCH 09/15] expect --- .../java/io/seata/mockserver/ExpectTransactionResult.java | 5 ++++- .../io/seata/mockserver/controller/MockHelpController.java | 4 +--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java b/test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java index 550b0a640c4..c0060836385 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java @@ -16,7 +16,10 @@ */ package io.seata.mockserver; - +/** + * The enum Expect transaction result. + * + */ public enum ExpectTransactionResult { AllCommitted(0, "all success"), diff --git a/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java b/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java index 1af2f0f8715..1ee21db7285 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java @@ -24,10 +24,8 @@ import org.springframework.web.bind.annotation.RequestParam; /** - * ? + * Mock Help Controller * - * @author minghua.xie - * @date 2023/12/29 **/ @RequestMapping("/help") public class MockHelpController { From 48f047a95edd88a871ba716e2f12a0135422c3af Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Fri, 29 Dec 2023 16:01:42 +0800 Subject: [PATCH 10/15] style --- .../io/seata/mockserver/MockCoordinator.java | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java index a8f24bb4a7f..e1bc613dd28 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java @@ -23,7 +23,23 @@ import io.seata.core.protocol.AbstractMessage; import io.seata.core.protocol.AbstractResultMessage; import io.seata.core.protocol.ResultCode; -import io.seata.core.protocol.transaction.*; +import io.seata.core.protocol.transaction.AbstractTransactionRequestToTC; +import io.seata.core.protocol.transaction.BranchRegisterRequest; +import io.seata.core.protocol.transaction.BranchRegisterResponse; +import io.seata.core.protocol.transaction.BranchReportRequest; +import io.seata.core.protocol.transaction.BranchReportResponse; +import io.seata.core.protocol.transaction.GlobalBeginRequest; +import io.seata.core.protocol.transaction.GlobalBeginResponse; +import io.seata.core.protocol.transaction.GlobalCommitRequest; +import io.seata.core.protocol.transaction.GlobalCommitResponse; +import io.seata.core.protocol.transaction.GlobalLockQueryRequest; +import io.seata.core.protocol.transaction.GlobalLockQueryResponse; +import io.seata.core.protocol.transaction.GlobalReportRequest; +import io.seata.core.protocol.transaction.GlobalReportResponse; +import io.seata.core.protocol.transaction.GlobalRollbackRequest; +import io.seata.core.protocol.transaction.GlobalRollbackResponse; +import io.seata.core.protocol.transaction.GlobalStatusRequest; +import io.seata.core.protocol.transaction.GlobalStatusResponse; import io.seata.core.rpc.Disposable; import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.RpcContext; @@ -31,7 +47,6 @@ import io.seata.mockserver.call.CallRm; import io.seata.server.AbstractTCInboundHandler; import io.seata.server.UUIDGenerator; -import io.seata.server.coordinator.DefaultCoordinator; import io.seata.server.session.BranchSession; import io.seata.server.session.GlobalSession; import org.slf4j.Logger; @@ -112,9 +127,9 @@ protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response.setGlobalStatus(GlobalStatus.Committed); response.setResultCode(ResultCode.Success); - int retry = expectRetryTimesMap.getOrDefault(request.getXid(),0); + int retry = expectRetryTimesMap.getOrDefault(request.getXid(), 0); List branchSessions = branchMap.get(request.getXid()); - if(CollectionUtils.isEmpty(branchSessions)){ + if (CollectionUtils.isEmpty(branchSessions)) { LOGGER.info("branchSessions is empty,XID=" + request.getXid()); } branchSessions.forEach(branch -> { @@ -130,9 +145,9 @@ protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackRes response.setResultCode(ResultCode.Success); - int retry = expectRetryTimesMap.getOrDefault(request.getXid(),0); + int retry = expectRetryTimesMap.getOrDefault(request.getXid(), 0); List branchSessions = branchMap.get(request.getXid()); - if(CollectionUtils.isEmpty(branchSessions)){ + if (CollectionUtils.isEmpty(branchSessions)) { LOGGER.info("branchSessions is empty,XID=" + request.getXid()); } branchSessions.forEach(branch -> { From 63c737142c54465ee5775214a9fadf3dd4333522 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 2 Jan 2024 16:24:59 +0800 Subject: [PATCH 11/15] changes --- changes/en-us/2.x.md | 2 +- changes/zh-cn/2.x.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 93dd17f4ffd..fd8759bc8a8 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -3,7 +3,7 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: -- [[#PR_NO](https://github.com/apache/incubator-seata/pull/PR_NO)] A brief and accurate description of PR +- [[#6205](https://github.com/apache/incubator-seata/pull/6205)] mock server ### bugfix: - [[#6090](https://github.com/apache/incubator-seata/pull/6090)] fix the TCC aspect exception handling process, do not wrapping the internal call exceptions diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 632833f9473..4b6722b54b5 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -3,7 +3,7 @@ ### feature: -- [[#PR_NO](https://github.com/apache/incubator-seata/pull/PR_NO)] 准确简要的PR描述 +- [[#6205](https://github.com/apache/incubator-seata/pull/6205)] 提供mock server ### bugfix: - [[#6090](https://github.com/apache/incubator-seata/pull/6090)] 修复tcc切面异常处理过程,不对内部调用异常做包装处理,直接向外抛出 From a05f7a66ee1933b9a27cd44a6cb670f2f8b1cc43 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Wed, 10 Jan 2024 18:13:49 +0800 Subject: [PATCH 12/15] test --- .../io/seata/mockserver/MockCoordinator.java | 60 +++++++--- .../java/io/seata/mockserver/MockServer.java | 16 ++- .../java/io/seata/mockserver/call/CallRm.java | 28 ++--- .../controller/MockHelpController.java | 9 +- test/pom.xml | 5 + .../rpc/netty/ChannelManagerTestHelper.java | 46 +++----- .../core/rpc/netty/mockserver/Action1.java | 45 ++++++++ .../rpc/netty/mockserver/Action1Impl.java | 66 +++++++++++ .../rpc/netty/mockserver/MockServerTest.java | 98 ++++++++++++++++ .../mockserver/ProtocolTestConstants.java | 27 +++++ .../rpc/netty/mockserver/RmClientTest.java | 102 +++++++++++++++++ .../rpc/netty/mockserver/TmClientTest.java | 105 ++++++++++++++++++ 12 files changed, 540 insertions(+), 67 deletions(-) rename test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java => test/src/test/java/io/seata/core/rpc/netty/ChannelManagerTestHelper.java (51%) create mode 100644 test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1.java create mode 100644 test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1Impl.java create mode 100644 test/src/test/java/io/seata/core/rpc/netty/mockserver/MockServerTest.java create mode 100644 test/src/test/java/io/seata/core/rpc/netty/mockserver/ProtocolTestConstants.java create mode 100644 test/src/test/java/io/seata/core/rpc/netty/mockserver/RmClientTest.java create mode 100644 test/src/test/java/io/seata/core/rpc/netty/mockserver/TmClientTest.java diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java index e1bc613dd28..3dda61e1bab 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java @@ -18,6 +18,7 @@ import io.seata.common.util.CollectionUtils; import io.seata.core.exception.TransactionException; +import io.seata.core.exception.TransactionExceptionCode; import io.seata.core.model.BranchStatus; import io.seata.core.model.GlobalStatus; import io.seata.core.protocol.AbstractMessage; @@ -69,7 +70,10 @@ public class MockCoordinator extends AbstractTCInboundHandler implements Transac private static MockCoordinator coordinator; - private Map expectTransactionResultMap; + private static String AllBeginFailXid = "0"; + + private Map globalStatusMap; + private Map expectedResultMap; private Map expectRetryTimesMap; private Map> branchMap; @@ -82,7 +86,8 @@ public static MockCoordinator getInstance() { synchronized (MockCoordinator.class) { if (coordinator == null) { coordinator = new MockCoordinator(); - coordinator.expectTransactionResultMap = new ConcurrentHashMap<>(); + coordinator.expectedResultMap = new ConcurrentHashMap<>(); + coordinator.globalStatusMap = new ConcurrentHashMap<>(); coordinator.expectRetryTimesMap = new ConcurrentHashMap<>(); coordinator.branchMap = new ConcurrentHashMap<>(); } @@ -115,53 +120,59 @@ public void onResponse(AbstractResultMessage response, RpcContext context) { @Override protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(AllBeginFailXid); GlobalSession session = GlobalSession.createGlobalSession(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout()); - expectTransactionResultMap.putIfAbsent(session.getXid(), ExpectTransactionResult.AllCommitted); + globalStatusMap.putIfAbsent(session.getXid(), GlobalStatus.Begin); response.setXid(session.getXid()); response.setResultCode(ResultCode.Success); } + @Override protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(request.getXid()); response.setGlobalStatus(GlobalStatus.Committed); response.setResultCode(ResultCode.Success); + globalStatusMap.put(request.getXid(), GlobalStatus.Committed); int retry = expectRetryTimesMap.getOrDefault(request.getXid(), 0); List branchSessions = branchMap.get(request.getXid()); if (CollectionUtils.isEmpty(branchSessions)) { - LOGGER.info("branchSessions is empty,XID=" + request.getXid()); + LOGGER.warn("[doGlobalCommit]branchSessions is empty,XID=" + request.getXid()); + return; } branchSessions.forEach(branch -> { - CallRm.branchCommit(remotingServer, branch.getResourceId(), branch.getClientId()); + CallRm.branchCommit(remotingServer, branch); IntStream.range(0, retry).forEach(i -> - CallRm.branchCommit(remotingServer, branch.getResourceId(), branch.getClientId())); + CallRm.branchCommit(remotingServer, branch)); }); } @Override protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(request.getXid()); response.setGlobalStatus(GlobalStatus.Rollbacked); response.setResultCode(ResultCode.Success); - + globalStatusMap.put(request.getXid(), GlobalStatus.Rollbacked); int retry = expectRetryTimesMap.getOrDefault(request.getXid(), 0); List branchSessions = branchMap.get(request.getXid()); if (CollectionUtils.isEmpty(branchSessions)) { - LOGGER.info("branchSessions is empty,XID=" + request.getXid()); + LOGGER.warn("[doGlobalRollback]branchSessions is empty,XID=" + request.getXid()); + return; } branchSessions.forEach(branch -> { - CallRm.branchRollback(remotingServer, branch.getResourceId(), branch.getClientId()); + CallRm.branchRollback(remotingServer, branch); IntStream.range(0, retry).forEach(i -> - CallRm.branchRollback(remotingServer, branch.getResourceId(), branch.getClientId())); + CallRm.branchRollback(remotingServer, branch)); }); } @Override protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException { - + checkMockActionFail(request.getXid()); BranchSession branchSession = new BranchSession(request.getBranchType()); - String xid = request.getXid(); branchSession.setXid(xid); // branchSession.setTransactionId(request.getTransactionId()); @@ -197,23 +208,34 @@ protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterRes @Override protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(request.getXid()); response.setResultCode(ResultCode.Success); } @Override protected void doLockCheck(GlobalLockQueryRequest request, GlobalLockQueryResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(request.getXid()); + response.setLockable(true); response.setResultCode(ResultCode.Success); } @Override protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext) throws TransactionException { - response.setGlobalStatus(GlobalStatus.Committed); + checkMockActionFail(request.getXid()); + GlobalStatus globalStatus = globalStatusMap.get(request.getXid()); + if (globalStatus == null) { + globalStatus = GlobalStatus.UnKnown; + } + response.setGlobalStatus(globalStatus); response.setResultCode(ResultCode.Success); } @Override protected void doGlobalReport(GlobalReportRequest request, GlobalReportResponse response, RpcContext rpcContext) throws TransactionException { - response.setGlobalStatus(GlobalStatus.Committed); + checkMockActionFail(request.getXid()); + GlobalStatus globalStatus = request.getGlobalStatus(); + globalStatusMap.put(request.getXid(), globalStatus); + response.setGlobalStatus(globalStatus); response.setResultCode(ResultCode.Success); } @@ -222,11 +244,17 @@ public void setRemotingServer(RemotingServer remotingServer) { } - public void setExpectedResult(String xid, ExpectTransactionResult expected) { - expectTransactionResultMap.put(xid, expected); + public void setExepectedResult(String xid, ResultCode expected) { + expectedResultMap.put(xid, expected); } public void setExpectedRetry(String xid, int times) { expectRetryTimesMap.put(xid, times); } + + private void checkMockActionFail(String xid) throws TransactionException { + if (expectedResultMap.get(xid) == ResultCode.Failed) { + throw new TransactionException(TransactionExceptionCode.Broken, "mock action expect fail"); + } + } } diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java index 8795f04523f..3319799d025 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java @@ -38,6 +38,9 @@ public class MockServer { protected static final Logger LOGGER = LoggerFactory.getLogger(MockServer.class); + static ThreadPoolExecutor workingThreads; + static MockNettyRemotingServer nettyRemotingServer; + /** * The entry point of application. * @@ -45,13 +48,15 @@ public class MockServer { */ public static void main(String[] args) { SpringApplication.run(MockServer.class, args); + start(); + } - ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(50, + public static void start() { + workingThreads = new ThreadPoolExecutor(50, 50, 500, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20000), new NamedThreadFactory("ServerHandlerThread", 500), new ThreadPoolExecutor.CallerRunsPolicy()); - - MockNettyRemotingServer nettyRemotingServer = new MockNettyRemotingServer(workingThreads); + nettyRemotingServer = new MockNettyRemotingServer(workingThreads); // set registry XID.setIpAddress(NetUtil.getLocalIp()); @@ -66,4 +71,9 @@ public static void main(String[] args) { LOGGER.info("pid info: " + ManagementFactory.getRuntimeMXBean().getName()); } + + public static void close(){ + workingThreads.shutdown(); + nettyRemotingServer.destroy(); + } } diff --git a/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java b/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java index 5ba1a2aee4d..914906495a6 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java @@ -27,12 +27,12 @@ import io.seata.core.protocol.transaction.UndoLogDeleteRequest; import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.netty.ChannelManager; +import io.seata.server.session.BranchSession; import java.util.concurrent.TimeoutException; /** * call rm - * **/ public class CallRm { @@ -42,13 +42,13 @@ public class CallRm { * @param remotingServer * @return */ - public static BranchStatus branchCommit(RemotingServer remotingServer, String resourceId, String clientId) { + public static BranchStatus branchCommit(RemotingServer remotingServer, BranchSession branchSession) { BranchCommitRequest request = new BranchCommitRequest(); - setReq(request, resourceId); + setReq(request, branchSession); try { BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest( - resourceId, clientId, request, false); + branchSession.getResourceId(), branchSession.getClientId(), request, false); return response.getBranchStatus(); } catch (TimeoutException e) { throw new RuntimeException(e); @@ -62,13 +62,13 @@ public static BranchStatus branchCommit(RemotingServer remotingServer, String re * @param remotingServer * @return */ - public static BranchStatus branchRollback(RemotingServer remotingServer, String resourceId, String clientId) { + public static BranchStatus branchRollback(RemotingServer remotingServer, BranchSession branchSession) { BranchRollbackRequest request = new BranchRollbackRequest(); - setReq(request, resourceId); + setReq(request, branchSession); try { BranchRollbackResponse response = (BranchRollbackResponse) remotingServer.sendSyncRequest( - resourceId, clientId, request, false); + branchSession.getResourceId(), branchSession.getClientId(), request, false); return response.getBranchStatus(); } catch (TimeoutException e) { throw new RuntimeException(e); @@ -81,23 +81,23 @@ public static BranchStatus branchRollback(RemotingServer remotingServer, String * @param remotingServer * @return */ - public static void deleteUndoLog(RemotingServer remotingServer, String resourceId, String clientId) { + public static void deleteUndoLog(RemotingServer remotingServer, BranchSession branchSession) { UndoLogDeleteRequest request = new UndoLogDeleteRequest(); - request.setResourceId(resourceId); + request.setResourceId(branchSession.getResourceId()); request.setSaveDays((short) 1); request.setBranchType(BranchType.TCC); try { - Channel channel = ChannelManager.getChannel(resourceId, clientId, false); + Channel channel = ChannelManager.getChannel(branchSession.getResourceId(), branchSession.getClientId(), false); remotingServer.sendAsyncRequest(channel, request); } catch (Exception e) { throw new RuntimeException(e); } } - private static void setReq(AbstractBranchEndRequest request, String resourceId) { - request.setXid("1"); - request.setBranchId(1L); - request.setResourceId(resourceId); + private static void setReq(AbstractBranchEndRequest request, BranchSession branchSession) { + request.setXid(branchSession.getXid()); + request.setBranchId(branchSession.getBranchId()); + request.setResourceId(branchSession.getResourceId()); request.setApplicationData("{\"k\":\"v\"}"); request.setBranchType(BranchType.TCC); // todo AT SAGA diff --git a/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java b/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java index 1ee21db7285..a9826241fe0 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java @@ -16,7 +16,8 @@ */ package io.seata.mockserver.controller; -import io.seata.mockserver.ExpectTransactionResult; +import io.seata.core.model.GlobalStatus; +import io.seata.core.protocol.ResultCode; import io.seata.mockserver.MockCoordinator; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; @@ -37,9 +38,9 @@ public String health() { return OK; } - @PostMapping("/expect/status") - public String expectTransactionResult(@RequestParam String xid, @RequestParam int code) { - MockCoordinator.getInstance().setExpectedResult(xid, ExpectTransactionResult.covert(code)); + @PostMapping("/expect/result") + public String expectResult(@RequestParam String xid, @RequestParam int code) { + MockCoordinator.getInstance().setExepectedResult(xid, ResultCode.get(code)); return OK; } diff --git a/test/pom.xml b/test/pom.xml index 86984fdc62b..30a21415e01 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -70,6 +70,11 @@ seata-saga-engine-store ${project.version} + + ${project.groupId} + seata-mock-server + ${project.version} + ${project.groupId} seata-spring diff --git a/test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java b/test/src/test/java/io/seata/core/rpc/netty/ChannelManagerTestHelper.java similarity index 51% rename from test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java rename to test/src/test/java/io/seata/core/rpc/netty/ChannelManagerTestHelper.java index c0060836385..258d59d574c 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/ExpectTransactionResult.java +++ b/test/src/test/java/io/seata/core/rpc/netty/ChannelManagerTestHelper.java @@ -14,41 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.seata.mockserver; +package io.seata.core.rpc.netty; -/** - * The enum Expect transaction result. - * - */ -public enum ExpectTransactionResult { - - AllCommitted(0, "all success"), - AllRollbacked(1, "all rollback"), - PhaseOneTimeoutRollbacked(2, "phase one failed"); +import io.netty.channel.Channel; +import io.seata.core.rpc.netty.mockserver.ProtocolTestConstants; - private final int code; - private final String desc; +import java.util.concurrent.ConcurrentMap; - ExpectTransactionResult(int code, String desc) { - this.code = code; - this.desc = desc; +/** + * Channel Manager Test Helper + * + **/ +public class ChannelManagerTestHelper { + public static ConcurrentMap getChannelConcurrentMap(AbstractNettyRemotingClient remotingClient) { + return getChannelManager(remotingClient).getChannels(); } - /** - * Gets code. - * - * @return the code - */ - public int getCode() { - return code; + public static Channel getChannel(TmNettyRemotingClient client) { + return getChannelManager(client) + .acquireChannel(ProtocolTestConstants.SERVER_ADDRESS); } - - public static ExpectTransactionResult covert(int code) { - for (ExpectTransactionResult result : ExpectTransactionResult.values()) { - if (result.getCode() == code) { - return result; - } - } - return null; + private static NettyClientChannelManager getChannelManager(AbstractNettyRemotingClient remotingClient) { + return remotingClient.getClientChannelManager(); } } diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1.java new file mode 100644 index 00000000000..0c6b1d80ffd --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + +import io.seata.rm.tcc.api.BusinessActionContext; +import io.seata.rm.tcc.api.BusinessActionContextParameter; +import io.seata.rm.tcc.api.LocalTCC; +import io.seata.rm.tcc.api.TwoPhaseBusinessAction; + +import java.util.Map; + +/** + * The interface Action1. + * + */ +@LocalTCC +public interface Action1 { + + @TwoPhaseBusinessAction(name = "mock-action", commitMethod = "commitTcc", rollbackMethod = "cancel" +// , useTCCFence = true + ) + String insert(@BusinessActionContextParameter Long reqId, + @BusinessActionContextParameter(paramName = "params") Map params + ); + + + boolean commitTcc(BusinessActionContext actionContext); + + + boolean cancel(BusinessActionContext actionContext); +} diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1Impl.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1Impl.java new file mode 100644 index 00000000000..5ad30d6a09d --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1Impl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + + +import io.seata.rm.tcc.api.BusinessActionContext; +import org.springframework.stereotype.Service; +import vlsi.utils.CompactHashMap; + +import java.util.Map; + +/** + * The type Action1. + */ +@Service +public class Action1Impl implements Action1 { + + private static Map commitMap = new CompactHashMap<>(); + private static Map rollbackMap = new CompactHashMap<>(); + + @Override + public String insert(Long reqId, Map params) { + System.out.println("prepare"); + return "prepare"; + } + + + @Override + public boolean commitTcc(BusinessActionContext actionContext) { + String xid = actionContext.getXid(); + System.out.println("commitTcc:" + xid); + commitMap.compute(xid, (k, v) -> v == null ? 1 : v + 1); + return true; + } + + @Override + public boolean cancel(BusinessActionContext actionContext) { + String xid = actionContext.getXid(); + System.out.println("commitTcc:" + xid); + rollbackMap.compute(xid, (k, v) -> v == null ? 1 : v + 1); + System.out.println("cancel"); + return true; + } + + public static int getCommitTimes(String xid) { + return commitMap.getOrDefault(xid, 0); + } + + public static int getRollbackTimes(String xid) { + return rollbackMap.getOrDefault(xid, 0); + } +} diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/MockServerTest.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/MockServerTest.java new file mode 100644 index 00000000000..eb70ab42bfe --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/MockServerTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + +import io.seata.core.exception.TransactionException; +import io.seata.core.model.BranchType; +import io.seata.core.model.GlobalStatus; +import io.seata.core.model.TransactionManager; +import io.seata.mockserver.MockCoordinator; +import io.seata.mockserver.MockServer; +import io.seata.rm.DefaultResourceManager; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class MockServerTest { + + static String RESOURCE_ID = "mock-action"; + + @BeforeAll + public static void before() { + MockServer.start(); + } + + @AfterAll + public static void after() { + MockServer.close(); + } + + @Test + public void testCommit() throws TransactionException { + String xid = doTestCommit(0); + Assertions.assertEquals(Action1Impl.getCommitTimes(xid), 1); + Assertions.assertEquals(Action1Impl.getRollbackTimes(xid), 0); + } + + @Test + public void testCommitRetry() throws TransactionException { + String xid = doTestCommit(2); + Assertions.assertEquals(Action1Impl.getCommitTimes(xid), 3); + Assertions.assertEquals(Action1Impl.getRollbackTimes(xid), 0); + } + + @Test + public void testRollback() throws TransactionException { + String xid = doTestRollback(0); + Assertions.assertEquals(Action1Impl.getCommitTimes(xid), 0); + Assertions.assertEquals(Action1Impl.getRollbackTimes(xid), 1); + } + + @Test + public void testRollbackRetry() throws TransactionException { + String xid = doTestRollback(2); + Assertions.assertEquals(Action1Impl.getCommitTimes(xid), 0); + Assertions.assertEquals(Action1Impl.getRollbackTimes(xid), 3); + } + + private static String doTestCommit(int times) throws TransactionException { + TransactionManager tm = TmClientTest.getTm(); + DefaultResourceManager rm = RmClientTest.getRm(RESOURCE_ID); + + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP, "test", 60000); + MockCoordinator.getInstance().setExpectedRetry(xid, times); + Long branchId = rm.branchRegister(BranchType.AT, RESOURCE_ID, "1", xid, "1", "1"); + GlobalStatus commit = tm.commit(xid); + Assertions.assertEquals(commit, GlobalStatus.Committed); + return xid; + + } + + private static String doTestRollback(int times) throws TransactionException { + TransactionManager tm = TmClientTest.getTm(); + DefaultResourceManager rm = RmClientTest.getRm(RESOURCE_ID); + + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP, "test", 60000); + MockCoordinator.getInstance().setExpectedRetry(xid, times); + Long branchId = rm.branchRegister(BranchType.AT, RESOURCE_ID, "1", xid, "1", "1"); + GlobalStatus rollback = tm.rollback(xid); + Assertions.assertEquals(rollback, GlobalStatus.Rollbacked); + return xid; + + } +} diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/ProtocolTestConstants.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/ProtocolTestConstants.java new file mode 100644 index 00000000000..3d33226e0bf --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/ProtocolTestConstants.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + +/** + * Mock Constants + * + **/ +public class ProtocolTestConstants { + public static final String APPLICATION_ID = "my_app_test"; + public static final String SERVICE_GROUP = "default_tx_group"; + public static final String SERVER_ADDRESS = "0.0.0.0:8091"; +} diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/RmClientTest.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/RmClientTest.java new file mode 100644 index 00000000000..49766dd33f4 --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/RmClientTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + +import io.netty.channel.Channel; +import io.seata.core.context.RootContext; +import io.seata.core.exception.TransactionException; +import io.seata.core.model.BranchStatus; +import io.seata.core.model.BranchType; +import io.seata.core.protocol.HeartbeatMessage; +import io.seata.core.rpc.netty.RmNettyRemotingClient; +import io.seata.integration.tx.api.interceptor.parser.DefaultResourceRegisterParser; +import io.seata.mockserver.MockCoordinator; +import io.seata.mockserver.MockServer; +import io.seata.rm.DefaultResourceManager; +import io.seata.rm.RMClient; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentMap; + +import static io.seata.core.rpc.netty.ChannelManagerTestHelper.getChannelConcurrentMap; + +/** + * rm client test + **/ +public class RmClientTest { + + + protected static final Logger LOGGER = LoggerFactory.getLogger(RmClientTest.class); + + @BeforeAll + public static void before() { + MockServer.start(); + } + + @AfterAll + public static void after() { + MockServer.close(); + } + + @Test + public void testRm() throws TransactionException { + String resourceId = "mock-action"; + String xid = "1111"; + + DefaultResourceManager rm = getRm(resourceId); + + //branchRegister:TYPE_BRANCH_REGISTER = 11 , TYPE_BRANCH_REGISTER_RESULT = 12 + Long branchId = rm.branchRegister(BranchType.AT, resourceId, "1", xid, "1", "1"); + Assertions.assertTrue(branchId > 0); + + + // branchReport:TYPE_BRANCH_STATUS_REPORT = 13 , TYPE_BRANCH_STATUS_REPORT_RESULT = 14 + // TYPE_SEATA_MERGE = 59 , TYPE_SEATA_MERGE_RESULT = 60 + rm.branchReport(BranchType.AT, xid, branchId, BranchStatus.PhaseTwo_Committed, ""); + LOGGER.info("branchReport ok"); + + //lockQuery:TYPE_GLOBAL_LOCK_QUERY = 21 , TYPE_GLOBAL_LOCK_QUERY_RESULT = 22 + RootContext.bind(xid); + boolean b = rm.lockQuery(BranchType.AT, resourceId, xid, "1"); + LOGGER.info("lockQuery ok, result=" + b); + Assertions.assertTrue(b); + + RmNettyRemotingClient remotingClient = RmNettyRemotingClient.getInstance(); + ConcurrentMap channels = getChannelConcurrentMap(remotingClient); + channels.forEach( + (key, value) -> RmNettyRemotingClient.getInstance().sendAsyncRequest(value, HeartbeatMessage.PING)); + + } + + public static DefaultResourceManager getRm(String resourceId) { + RMClient.init(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP); + DefaultResourceManager rm = DefaultResourceManager.get(); + + //register:TYPE_REG_RM = 103 , TYPE_REG_RM_RESULT = 104 + Action1 target = new Action1Impl(); + DefaultResourceRegisterParser.get().registerResource(target, resourceId); + LOGGER.info("registerResource ok"); + return rm; + } + + +} diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/TmClientTest.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/TmClientTest.java new file mode 100644 index 00000000000..b5df95e524f --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/TmClientTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + + +import io.netty.channel.Channel; +import io.seata.core.model.GlobalStatus; +import io.seata.core.model.TransactionManager; +import io.seata.core.protocol.ResultCode; +import io.seata.core.rpc.netty.TmNettyRemotingClient; +import io.seata.mockserver.MockCoordinator; +import io.seata.mockserver.MockServer; +import io.seata.tm.DefaultTransactionManager; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.seata.core.rpc.netty.ChannelManagerTestHelper.getChannel; + +/** + * TmClient Test + **/ +public class TmClientTest { + + protected static final Logger LOGGER = LoggerFactory.getLogger(TmClientTest.class); + + @BeforeAll + public static void before() { + MockServer.start(); + } + + @AfterAll + public static void after() { + MockServer.close(); + } + + @Test + public void testTm() throws Exception { + + TransactionManager tm = getTm(); + + //globalBegin:TYPE_GLOBAL_BEGIN = 1 , TYPE_GLOBAL_BEGIN_RESULT = 2 + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, + ProtocolTestConstants.SERVICE_GROUP, "test", 60000); + LOGGER.info("globalBegin ok:xid=" + xid); + + //globalCommit:TYPE_GLOBAL_COMMIT = 7 , TYPE_GLOBAL_COMMIT_RESULT = 8 + GlobalStatus commit = tm.commit(xid); + LOGGER.info("globalCommit ok:" + commit); + Assertions.assertEquals(commit, GlobalStatus.Committed); + + //globalRollback:TYPE_GLOBAL_ROLLBACK = 9 , TYPE_GLOBAL_ROLLBACK_RESULT = 10 + GlobalStatus rollback = tm.rollback(xid); + LOGGER.info("globalRollback ok:" + rollback); + Assertions.assertEquals(rollback, GlobalStatus.Rollbacked); + + //getStatus:TYPE_GLOBAL_STATUS = 15 , TYPE_GLOBAL_STATUS_RESULT = 16 + GlobalStatus status = tm.getStatus(xid); + LOGGER.info("getStatus ok:" + status); + Assertions.assertEquals(status, GlobalStatus.Rollbacked); + + //globalReport:TYPE_GLOBAL_REPORT = 17 , TYPE_GLOBAL_REPORT_RESULT = 18 + GlobalStatus globalReport = tm.globalReport(xid, GlobalStatus.Committed); + LOGGER.info("globalReport ok:" + globalReport); + Assertions.assertEquals(globalReport, GlobalStatus.Committed); + + MockCoordinator.getInstance().setExepectedResult(xid, ResultCode.Failed); + GlobalStatus globalReport2 = tm.globalReport(xid, GlobalStatus.Committed); + // TODO expected response fail , but DefaultTransactionManager ignore resultCode + } + + @NotNull + public static TransactionManager getTm() { + TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance( + ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP); + tmNettyRemotingClient.init(); + TransactionManager tm = new DefaultTransactionManager(); + + //register:TYPE_REG_CLT = 101 , TYPE_REG_CLT_RESULT = 102 + TmNettyRemotingClient client = TmNettyRemotingClient.getInstance(); + Channel channel = getChannel(client); + LOGGER.info("TM register ok:channel=" + channel); + return tm; + } + + +} From 8fc08674bff4398e10fcaf0d86be4d544759eb3a Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Wed, 10 Jan 2024 18:44:03 +0800 Subject: [PATCH 13/15] test --- .../src/main/java/io/seata/mockserver/MockServer.java | 10 +++++----- .../seata/core/rpc/netty/mockserver/TmClientTest.java | 5 ++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java index 3319799d025..6c88f1b38ad 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java @@ -38,8 +38,8 @@ public class MockServer { protected static final Logger LOGGER = LoggerFactory.getLogger(MockServer.class); - static ThreadPoolExecutor workingThreads; - static MockNettyRemotingServer nettyRemotingServer; + private static ThreadPoolExecutor workingThreads; + private static MockNettyRemotingServer nettyRemotingServer; /** * The entry point of application. @@ -52,11 +52,11 @@ public static void main(String[] args) { } public static void start() { - workingThreads = new ThreadPoolExecutor(50, + workingThreads = new ThreadPoolExecutor(50, 50, 500, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20000), new NamedThreadFactory("ServerHandlerThread", 500), new ThreadPoolExecutor.CallerRunsPolicy()); - nettyRemotingServer = new MockNettyRemotingServer(workingThreads); + nettyRemotingServer = new MockNettyRemotingServer(workingThreads); // set registry XID.setIpAddress(NetUtil.getLocalIp()); @@ -72,7 +72,7 @@ public static void start() { LOGGER.info("pid info: " + ManagementFactory.getRuntimeMXBean().getName()); } - public static void close(){ + public static void close() { workingThreads.shutdown(); nettyRemotingServer.destroy(); } diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/TmClientTest.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/TmClientTest.java index b5df95e524f..875e71ed278 100644 --- a/test/src/test/java/io/seata/core/rpc/netty/mockserver/TmClientTest.java +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/TmClientTest.java @@ -83,7 +83,10 @@ public void testTm() throws Exception { Assertions.assertEquals(globalReport, GlobalStatus.Committed); MockCoordinator.getInstance().setExepectedResult(xid, ResultCode.Failed); - GlobalStatus globalReport2 = tm.globalReport(xid, GlobalStatus.Committed); +// GlobalStatus globalReport2 = tm.globalReport(xid, GlobalStatus.Committed); + + GlobalStatus rollback2 = tm.rollback(xid); + LOGGER.info("globalRollback ok:" + rollback2); // TODO expected response fail , but DefaultTransactionManager ignore resultCode } From 818c1dbf974e6d5117c7d204adb18a899086588c Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Thu, 11 Jan 2024 10:51:35 +0800 Subject: [PATCH 14/15] test --- .../java/io/seata/mockserver/controller/MockHelpController.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java b/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java index a9826241fe0..c4fa54c3b35 100644 --- a/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java +++ b/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java @@ -16,7 +16,6 @@ */ package io.seata.mockserver.controller; -import io.seata.core.model.GlobalStatus; import io.seata.core.protocol.ResultCode; import io.seata.mockserver.MockCoordinator; import org.springframework.web.bind.annotation.GetMapping; From ea1d9c49eb18cf6e5c9a3a6b7e7834d67b78c740 Mon Sep 17 00:00:00 2001 From: xingfudeshi Date: Thu, 18 Jan 2024 14:15:31 +0800 Subject: [PATCH 15/15] Update pom.xml --- test-mock-server/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test-mock-server/pom.xml b/test-mock-server/pom.xml index f40078b5ec4..f302069c407 100644 --- a/test-mock-server/pom.xml +++ b/test-mock-server/pom.xml @@ -28,7 +28,7 @@ 4.0.0 seata-mock-server jar - seata-mock-server + seata-mock-server ${project.version} Seata mock server @@ -59,4 +59,4 @@ - \ No newline at end of file +