diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index e8cd9a5bc6b..de6f599f9b5 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -3,6 +3,7 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: +- [[#6205](https://github.com/apache/incubator-seata/pull/6205)] mock server - [[#6169](https://github.com/apache/incubator-seata/pull/6169)] full support for states in the refactored state machine designer ### bugfix: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index c77fd33ee43..4458b26de7d 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -3,6 +3,7 @@ ### feature: +- [[#6205](https://github.com/apache/incubator-seata/pull/6205)] 提供mock server - [[#6169](https://github.com/apache/incubator-seata/pull/6169)] 支持新版本状态机设计器 ### bugfix: diff --git a/pom.xml b/pom.xml index 049bfccac5b..c70b6d4e2e9 100644 --- a/pom.xml +++ b/pom.xml @@ -59,6 +59,7 @@ spring tcc test + test-mock-server tm metrics serializer diff --git a/test-mock-server/pom.xml b/test-mock-server/pom.xml new file mode 100644 index 00000000000..f302069c407 --- /dev/null +++ b/test-mock-server/pom.xml @@ -0,0 +1,62 @@ + + + + + io.seata + seata-parent + ${revision} + + 4.0.0 + seata-mock-server + jar + seata-mock-server ${project.version} + Seata mock server + + + + + org.apache.maven.plugins + maven-deploy-plugin + + + + + + + io.seata + seata-server + ${project.version} + + + slf4j-log4j12 + org.slf4j + + + log4j + log4j + + + + + + + diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java new file mode 100644 index 00000000000..3dda61e1bab --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockCoordinator.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver; + +import io.seata.common.util.CollectionUtils; +import io.seata.core.exception.TransactionException; +import io.seata.core.exception.TransactionExceptionCode; +import io.seata.core.model.BranchStatus; +import io.seata.core.model.GlobalStatus; +import io.seata.core.protocol.AbstractMessage; +import io.seata.core.protocol.AbstractResultMessage; +import io.seata.core.protocol.ResultCode; +import io.seata.core.protocol.transaction.AbstractTransactionRequestToTC; +import io.seata.core.protocol.transaction.BranchRegisterRequest; +import io.seata.core.protocol.transaction.BranchRegisterResponse; +import io.seata.core.protocol.transaction.BranchReportRequest; +import io.seata.core.protocol.transaction.BranchReportResponse; +import io.seata.core.protocol.transaction.GlobalBeginRequest; +import io.seata.core.protocol.transaction.GlobalBeginResponse; +import io.seata.core.protocol.transaction.GlobalCommitRequest; +import io.seata.core.protocol.transaction.GlobalCommitResponse; +import io.seata.core.protocol.transaction.GlobalLockQueryRequest; +import io.seata.core.protocol.transaction.GlobalLockQueryResponse; +import io.seata.core.protocol.transaction.GlobalReportRequest; +import io.seata.core.protocol.transaction.GlobalReportResponse; +import io.seata.core.protocol.transaction.GlobalRollbackRequest; +import io.seata.core.protocol.transaction.GlobalRollbackResponse; +import io.seata.core.protocol.transaction.GlobalStatusRequest; +import io.seata.core.protocol.transaction.GlobalStatusResponse; +import io.seata.core.rpc.Disposable; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.RpcContext; +import io.seata.core.rpc.TransactionMessageHandler; +import io.seata.mockserver.call.CallRm; +import io.seata.server.AbstractTCInboundHandler; +import io.seata.server.UUIDGenerator; +import io.seata.server.session.BranchSession; +import io.seata.server.session.GlobalSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.IntStream; + +/** + * Mock Coordinator + **/ +public class MockCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { + + protected static final Logger LOGGER = LoggerFactory.getLogger(MockCoordinator.class); + + RemotingServer remotingServer; + + private static MockCoordinator coordinator; + + private static String AllBeginFailXid = "0"; + + private Map globalStatusMap; + private Map expectedResultMap; + private Map expectRetryTimesMap; + private Map> branchMap; + + private MockCoordinator() { + } + + + public static MockCoordinator getInstance() { + if (coordinator == null) { + synchronized (MockCoordinator.class) { + if (coordinator == null) { + coordinator = new MockCoordinator(); + coordinator.expectedResultMap = new ConcurrentHashMap<>(); + coordinator.globalStatusMap = new ConcurrentHashMap<>(); + coordinator.expectRetryTimesMap = new ConcurrentHashMap<>(); + coordinator.branchMap = new ConcurrentHashMap<>(); + } + } + } + return coordinator; + } + + + @Override + public void destroy() { + + } + + @Override + public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) { + if (!(request instanceof AbstractTransactionRequestToTC)) { + throw new IllegalArgumentException(); + } + AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request; + transactionRequest.setTCInboundHandler(this); + + return transactionRequest.handle(context); + } + + @Override + public void onResponse(AbstractResultMessage response, RpcContext context) { + response.setResultCode(ResultCode.Success); + } + + @Override + protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(AllBeginFailXid); + GlobalSession session = GlobalSession.createGlobalSession(rpcContext.getApplicationId(), + rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout()); + globalStatusMap.putIfAbsent(session.getXid(), GlobalStatus.Begin); + response.setXid(session.getXid()); + response.setResultCode(ResultCode.Success); + } + + + @Override + protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(request.getXid()); + response.setGlobalStatus(GlobalStatus.Committed); + response.setResultCode(ResultCode.Success); + globalStatusMap.put(request.getXid(), GlobalStatus.Committed); + + int retry = expectRetryTimesMap.getOrDefault(request.getXid(), 0); + List branchSessions = branchMap.get(request.getXid()); + if (CollectionUtils.isEmpty(branchSessions)) { + LOGGER.warn("[doGlobalCommit]branchSessions is empty,XID=" + request.getXid()); + return; + } + branchSessions.forEach(branch -> { + CallRm.branchCommit(remotingServer, branch); + IntStream.range(0, retry).forEach(i -> + CallRm.branchCommit(remotingServer, branch)); + }); + } + + @Override + protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(request.getXid()); + response.setGlobalStatus(GlobalStatus.Rollbacked); + response.setResultCode(ResultCode.Success); + globalStatusMap.put(request.getXid(), GlobalStatus.Rollbacked); + + int retry = expectRetryTimesMap.getOrDefault(request.getXid(), 0); + List branchSessions = branchMap.get(request.getXid()); + if (CollectionUtils.isEmpty(branchSessions)) { + LOGGER.warn("[doGlobalRollback]branchSessions is empty,XID=" + request.getXid()); + return; + } + branchSessions.forEach(branch -> { + CallRm.branchRollback(remotingServer, branch); + IntStream.range(0, retry).forEach(i -> + CallRm.branchRollback(remotingServer, branch)); + }); + } + + @Override + protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(request.getXid()); + BranchSession branchSession = new BranchSession(request.getBranchType()); + String xid = request.getXid(); + branchSession.setXid(xid); +// branchSession.setTransactionId(request.getTransactionId()); + branchSession.setBranchId(UUIDGenerator.generateUUID()); + branchSession.setResourceId(request.getResourceId()); + branchSession.setLockKey(request.getLockKey()); + branchSession.setClientId(rpcContext.getClientId()); + branchSession.setApplicationData(request.getApplicationData()); + branchSession.setStatus(BranchStatus.Registered); + branchMap.compute(xid, (key, val) -> { + if (val == null) { + val = new ArrayList<>(); + } + val.add(branchSession); + return val; + }); + + response.setBranchId(branchSession.getBranchId()); + response.setResultCode(ResultCode.Success); + +// Thread thread = new Thread(() -> { +// try { +// Thread.sleep(1000); +// if (ProtocolConstants.VERSION_0 != Version.calcProtocolVersion(rpcContext.getVersion())) { +// CallRm.deleteUndoLog(remotingServer, resourceId, clientId); +// } +// } catch (Exception e) { +// e.printStackTrace(); +// } +// }); +// thread.start(); + } + + @Override + protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(request.getXid()); + response.setResultCode(ResultCode.Success); + } + + @Override + protected void doLockCheck(GlobalLockQueryRequest request, GlobalLockQueryResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(request.getXid()); + response.setLockable(true); + response.setResultCode(ResultCode.Success); + } + + @Override + protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(request.getXid()); + GlobalStatus globalStatus = globalStatusMap.get(request.getXid()); + if (globalStatus == null) { + globalStatus = GlobalStatus.UnKnown; + } + response.setGlobalStatus(globalStatus); + response.setResultCode(ResultCode.Success); + } + + @Override + protected void doGlobalReport(GlobalReportRequest request, GlobalReportResponse response, RpcContext rpcContext) throws TransactionException { + checkMockActionFail(request.getXid()); + GlobalStatus globalStatus = request.getGlobalStatus(); + globalStatusMap.put(request.getXid(), globalStatus); + response.setGlobalStatus(globalStatus); + response.setResultCode(ResultCode.Success); + } + + public void setRemotingServer(RemotingServer remotingServer) { + this.remotingServer = remotingServer; + } + + + public void setExepectedResult(String xid, ResultCode expected) { + expectedResultMap.put(xid, expected); + } + + public void setExpectedRetry(String xid, int times) { + expectRetryTimesMap.put(xid, times); + } + + private void checkMockActionFail(String xid) throws TransactionException { + if (expectedResultMap.get(xid) == ResultCode.Failed) { + throw new TransactionException(TransactionExceptionCode.Broken, "mock action expect fail"); + } + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java b/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java new file mode 100644 index 00000000000..43ba0263114 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockNettyRemotingServer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver; + +import io.netty.channel.Channel; +import io.seata.core.protocol.MessageType; +import io.seata.core.rpc.TransactionMessageHandler; +import io.seata.core.rpc.netty.AbstractNettyRemotingServer; +import io.seata.core.rpc.netty.NettyServerConfig; +import io.seata.mockserver.processor.MockHeartbeatProcessor; +import io.seata.mockserver.processor.MockOnReqProcessor; +import io.seata.mockserver.processor.MockOnRespProcessor; +import io.seata.mockserver.processor.MockRegisterProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * The mock netty remoting server. + */ +public class MockNettyRemotingServer extends AbstractNettyRemotingServer { + + private static final Logger LOGGER = LoggerFactory.getLogger(MockNettyRemotingServer.class); + + private TransactionMessageHandler handler; + + public void setHandler(TransactionMessageHandler transactionMessageHandler) { + this.handler = transactionMessageHandler; + } + + @Override + public void init() { + // registry processor + registerProcessor(); + super.init(); + } + + /** + * Instantiates a new Rpc remoting server. + * + * @param messageExecutor the message executor + */ + public MockNettyRemotingServer(ThreadPoolExecutor messageExecutor) { + super(messageExecutor, new NettyServerConfig()); + } + + @Override + public void destroyChannel(String serverAddress, Channel channel) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("will destroy channel:{},address:{}", channel, serverAddress); + } + channel.disconnect(); + channel.close(); + } + + private void registerProcessor() { + // 1. registry on request message processor + MockOnReqProcessor onRequestProcessor = new MockOnReqProcessor(this, handler); + super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor); + + // 2. registry on response message processor + MockOnRespProcessor onResponseProcessor = new MockOnRespProcessor(this, handler, getFutures()); + super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor); + + // 3. registry rm reg processor + MockRegisterProcessor regRmProcessor = new MockRegisterProcessor(this, MockRegisterProcessor.Role.RM); + super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor); + + // 4. registry tm reg processor + MockRegisterProcessor regTmProcessor = new MockRegisterProcessor(this, MockRegisterProcessor.Role.TM); + super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null); + + // 5. registry heartbeat message processor + MockHeartbeatProcessor heartbeatMessageProcessor = new MockHeartbeatProcessor(this, handler); + super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null); + } + + @Override + public void destroy() { + super.destroy(); + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java new file mode 100644 index 00000000000..6c88f1b38ad --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/MockServer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver; + +import io.seata.common.XID; +import io.seata.common.thread.NamedThreadFactory; +import io.seata.common.util.NetUtil; +import io.seata.server.UUIDGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * The type Mock Server. + */ +@SpringBootApplication +public class MockServer { + + protected static final Logger LOGGER = LoggerFactory.getLogger(MockServer.class); + + private static ThreadPoolExecutor workingThreads; + private static MockNettyRemotingServer nettyRemotingServer; + + /** + * The entry point of application. + * + * @param args the input arguments + */ + public static void main(String[] args) { + SpringApplication.run(MockServer.class, args); + start(); + } + + public static void start() { + workingThreads = new ThreadPoolExecutor(50, + 50, 500, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(20000), + new NamedThreadFactory("ServerHandlerThread", 500), new ThreadPoolExecutor.CallerRunsPolicy()); + nettyRemotingServer = new MockNettyRemotingServer(workingThreads); + + // set registry + XID.setIpAddress(NetUtil.getLocalIp()); + XID.setPort(8092); + // init snowflake for transactionId, branchId + UUIDGenerator.init(1L); + + MockCoordinator coordinator = MockCoordinator.getInstance(); + coordinator.setRemotingServer(nettyRemotingServer); + nettyRemotingServer.setHandler(coordinator); + nettyRemotingServer.init(); + + LOGGER.info("pid info: " + ManagementFactory.getRuntimeMXBean().getName()); + } + + public static void close() { + workingThreads.shutdown(); + nettyRemotingServer.destroy(); + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java b/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java new file mode 100644 index 00000000000..914906495a6 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/call/CallRm.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.call; + +import io.netty.channel.Channel; +import io.seata.core.model.BranchStatus; +import io.seata.core.model.BranchType; +import io.seata.core.protocol.transaction.AbstractBranchEndRequest; +import io.seata.core.protocol.transaction.BranchCommitRequest; +import io.seata.core.protocol.transaction.BranchCommitResponse; +import io.seata.core.protocol.transaction.BranchRollbackRequest; +import io.seata.core.protocol.transaction.BranchRollbackResponse; +import io.seata.core.protocol.transaction.UndoLogDeleteRequest; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.netty.ChannelManager; +import io.seata.server.session.BranchSession; + +import java.util.concurrent.TimeoutException; + +/** + * call rm + **/ +public class CallRm { + + /** + * call branchCommit :TYPE_BRANCH_COMMIT = 3 , TYPE_BRANCH_COMMIT_RESULT = 4 + * + * @param remotingServer + * @return + */ + public static BranchStatus branchCommit(RemotingServer remotingServer, BranchSession branchSession) { + BranchCommitRequest request = new BranchCommitRequest(); + setReq(request, branchSession); + + try { + BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest( + branchSession.getResourceId(), branchSession.getClientId(), request, false); + return response.getBranchStatus(); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } + + + /** + * call branchRollback :TYPE_BRANCH_ROLLBACK = 5 , TYPE_BRANCH_ROLLBACK_RESULT = 6 + * + * @param remotingServer + * @return + */ + public static BranchStatus branchRollback(RemotingServer remotingServer, BranchSession branchSession) { + BranchRollbackRequest request = new BranchRollbackRequest(); + setReq(request, branchSession); + + try { + BranchRollbackResponse response = (BranchRollbackResponse) remotingServer.sendSyncRequest( + branchSession.getResourceId(), branchSession.getClientId(), request, false); + return response.getBranchStatus(); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } + + /** + * call deleteUndoLog :TYPE_RM_DELETE_UNDOLOG = 111 + * + * @param remotingServer + * @return + */ + public static void deleteUndoLog(RemotingServer remotingServer, BranchSession branchSession) { + UndoLogDeleteRequest request = new UndoLogDeleteRequest(); + request.setResourceId(branchSession.getResourceId()); + request.setSaveDays((short) 1); + request.setBranchType(BranchType.TCC); + try { + Channel channel = ChannelManager.getChannel(branchSession.getResourceId(), branchSession.getClientId(), false); + remotingServer.sendAsyncRequest(channel, request); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void setReq(AbstractBranchEndRequest request, BranchSession branchSession) { + request.setXid(branchSession.getXid()); + request.setBranchId(branchSession.getBranchId()); + request.setResourceId(branchSession.getResourceId()); + request.setApplicationData("{\"k\":\"v\"}"); + request.setBranchType(BranchType.TCC); + // todo AT SAGA + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java b/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java new file mode 100644 index 00000000000..c4fa54c3b35 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/controller/MockHelpController.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.controller; + +import io.seata.core.protocol.ResultCode; +import io.seata.mockserver.MockCoordinator; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; + +/** + * Mock Help Controller + * + **/ +@RequestMapping("/help") +public class MockHelpController { + + static String OK = "ok"; + + @GetMapping("/health") + public String health() { + return OK; + } + + @PostMapping("/expect/result") + public String expectResult(@RequestParam String xid, @RequestParam int code) { + MockCoordinator.getInstance().setExepectedResult(xid, ResultCode.get(code)); + return OK; + } + + @PostMapping("/expect/retry") + public String expectTransactionRetry(@RequestParam String xid, @RequestParam int times) { + MockCoordinator.getInstance().setExpectedRetry(xid, times); + return OK; + } + + +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java new file mode 100644 index 00000000000..84f0c5f0650 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockHeartbeatProcessor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.processor; + +import io.netty.channel.ChannelHandlerContext; +import io.seata.core.protocol.HeartbeatMessage; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.TransactionMessageHandler; + +/** + * Mock Heartbeat Processor + **/ +public class MockHeartbeatProcessor extends MockRemotingProcessor { + public MockHeartbeatProcessor(RemotingServer remotingServer, TransactionMessageHandler handler) { + super(remotingServer, handler); + } + + @Override + public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + super.process(ctx, rpcMessage); + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), HeartbeatMessage.PONG); + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java new file mode 100644 index 00000000000..d5227515ffc --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnReqProcessor.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.processor; + +import io.netty.channel.ChannelHandlerContext; +import io.seata.core.protocol.AbstractMessage; +import io.seata.core.protocol.AbstractResultMessage; +import io.seata.core.protocol.MergeResultMessage; +import io.seata.core.protocol.MergedWarpMessage; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.RpcContext; +import io.seata.core.rpc.TransactionMessageHandler; +import io.seata.core.rpc.netty.ChannelManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Mock Remoting Processor + **/ +public class MockOnReqProcessor extends MockRemotingProcessor { + protected static final Logger LOGGER = LoggerFactory.getLogger(MockOnReqProcessor.class); + + + public MockOnReqProcessor(RemotingServer remotingServer, TransactionMessageHandler handler) { + super(remotingServer, handler); + } + + @Override + public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + super.process(ctx, rpcMessage); + Object message = rpcMessage.getBody(); + + RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel()); + + // the batch send request message + if (message instanceof MergedWarpMessage) { + MergedWarpMessage mmsg = (MergedWarpMessage) message; + MergeResultMessage resultMessage = new MergeResultMessage(); + List resList = new ArrayList<>(); + for (int i = 0; i < mmsg.msgs.size(); i++) { + AbstractMessage msg = mmsg.msgs.get(i); + resList.add(handler.onRequest(msg, rpcContext)); + } + AbstractResultMessage[] resultMsgs = Arrays.copyOf(resList.toArray(), resList.size(), AbstractResultMessage[].class); + resultMessage.setMsgs(resultMsgs); + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage); + LOGGER.info("sendAsyncResponse: {}", resultMessage); + } else { + final AbstractMessage msg = (AbstractMessage) message; + AbstractResultMessage result = handler.onRequest(msg, rpcContext); + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result); + LOGGER.info("sendAsyncResponse: {}", result); + } + } + + +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java new file mode 100644 index 00000000000..5856f3e0c6b --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockOnRespProcessor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.processor; + +import io.netty.channel.ChannelHandlerContext; +import io.seata.core.protocol.MessageFuture; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.TransactionMessageHandler; + +import java.util.concurrent.ConcurrentMap; + +/** + * Mock Remoting Processor + **/ +public class MockOnRespProcessor extends MockRemotingProcessor { + + private ConcurrentMap futures; + + + public MockOnRespProcessor(RemotingServer remotingServer, TransactionMessageHandler handler + , ConcurrentMap futures) { + super(remotingServer, handler); + this.futures = futures; + } + + @Override + public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + super.process(ctx, rpcMessage); + MessageFuture messageFuture = futures.remove(rpcMessage.getId()); + if (messageFuture != null) { + messageFuture.setResultMessage(rpcMessage.getBody()); + } + } + + +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java new file mode 100644 index 00000000000..225bbbfd830 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRegisterProcessor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.processor; + +import io.netty.channel.ChannelHandlerContext; +import io.seata.core.protocol.RegisterRMRequest; +import io.seata.core.protocol.RegisterRMResponse; +import io.seata.core.protocol.RegisterTMRequest; +import io.seata.core.protocol.RegisterTMResponse; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.protocol.Version; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.netty.ChannelManager; +import io.seata.core.rpc.processor.RemotingProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mock Remoting Processor + **/ +public class MockRegisterProcessor implements RemotingProcessor { + + protected static final Logger LOGGER = LoggerFactory.getLogger(MockRegisterProcessor.class); + private RemotingServer remotingServer; + private Role role; + + public MockRegisterProcessor(RemotingServer remotingServer, Role role) { + this.remotingServer = remotingServer; + this.role = role; + } + + @Override + public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + if (role == Role.TM) { + RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody(); + LOGGER.info("message = " + message); + + ChannelManager.registerTMChannel(message, ctx.channel()); + Version.putChannelVersion(ctx.channel(), message.getVersion()); + + RegisterTMResponse resp = new RegisterTMResponse(); + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resp); + LOGGER.info("sendAsyncResponse: {}", resp); + } else if (role == Role.RM) { + RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody(); + LOGGER.info("message = " + message); + + ChannelManager.registerRMChannel(message, ctx.channel()); + Version.putChannelVersion(ctx.channel(), message.getVersion()); + + RegisterRMResponse resp = new RegisterRMResponse(); + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resp); + LOGGER.info("sendAsyncResponse: {}", resp); + } + } + + + public static enum Role { + TM, RM + } +} diff --git a/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRemotingProcessor.java b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRemotingProcessor.java new file mode 100644 index 00000000000..c95ea4b19a3 --- /dev/null +++ b/test-mock-server/src/main/java/io/seata/mockserver/processor/MockRemotingProcessor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.mockserver.processor; + +import io.netty.channel.ChannelHandlerContext; +import io.seata.core.protocol.RpcMessage; +import io.seata.core.rpc.RemotingServer; +import io.seata.core.rpc.TransactionMessageHandler; +import io.seata.core.rpc.processor.RemotingProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mock Remoting Processor + **/ +public class MockRemotingProcessor implements RemotingProcessor { + + protected static final Logger LOGGER = LoggerFactory.getLogger(MockRemotingProcessor.class); + protected RemotingServer remotingServer; + protected final TransactionMessageHandler handler; + + + public MockRemotingProcessor(RemotingServer remotingServer, TransactionMessageHandler handler) { + this.remotingServer = remotingServer; + this.handler = handler; + } + + @Override + public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + Object message = rpcMessage.getBody(); + LOGGER.info("process message : " + message); + + } + + +} diff --git a/test-mock-server/src/main/resources/application.yml b/test-mock-server/src/main/resources/application.yml new file mode 100644 index 00000000000..062444485bf --- /dev/null +++ b/test-mock-server/src/main/resources/application.yml @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +server: + port: 7091 + +spring: + application: + name: seata-server + +logging: + config: classpath:logback-spring.xml + file: + path: ${log.home:${user.home}/logs/seata} + extend: + logstash-appender: + destination: 127.0.0.1:4560 + kafka-appender: + bootstrap-servers: 127.0.0.1:9092 + topic: logback_to_logstash + +console: + user: + username: seata + password: seata +seata: + config: + # support: nacos, consul, apollo, zk, etcd3 + type: file + registry: + # support: nacos, eureka, redis, zk, consul, etcd3, sofa + type: file + store: + # support: file 、 db 、 redis 、 raft + mode: file + # server: + # service-port: 8091 #If not configured, the default is '${server.port} + 1000' + security: + secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 + tokenValidityInMilliseconds: 1800000 + ignore: + urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/** diff --git a/test-mock-server/src/main/resources/logback-spring.xml b/test-mock-server/src/main/resources/logback-spring.xml new file mode 100644 index 00000000000..7f9f51ee841 --- /dev/null +++ b/test-mock-server/src/main/resources/logback-spring.xml @@ -0,0 +1,113 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + true + 0 + 2048 + true + + + + + true + 0 + 2048 + true + + + + true + 0 + 1024 + true + + + + true + 0 + 1024 + true + + + + + + + + + + + + + + + + + + + diff --git a/test/pom.xml b/test/pom.xml index 86984fdc62b..30a21415e01 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -70,6 +70,11 @@ seata-saga-engine-store ${project.version} + + ${project.groupId} + seata-mock-server + ${project.version} + ${project.groupId} seata-spring diff --git a/test/src/test/java/io/seata/core/rpc/netty/ChannelManagerTestHelper.java b/test/src/test/java/io/seata/core/rpc/netty/ChannelManagerTestHelper.java new file mode 100644 index 00000000000..258d59d574c --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/ChannelManagerTestHelper.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty; + +import io.netty.channel.Channel; +import io.seata.core.rpc.netty.mockserver.ProtocolTestConstants; + +import java.util.concurrent.ConcurrentMap; + +/** + * Channel Manager Test Helper + * + **/ +public class ChannelManagerTestHelper { + public static ConcurrentMap getChannelConcurrentMap(AbstractNettyRemotingClient remotingClient) { + return getChannelManager(remotingClient).getChannels(); + } + + public static Channel getChannel(TmNettyRemotingClient client) { + return getChannelManager(client) + .acquireChannel(ProtocolTestConstants.SERVER_ADDRESS); + } + private static NettyClientChannelManager getChannelManager(AbstractNettyRemotingClient remotingClient) { + return remotingClient.getClientChannelManager(); + } +} diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1.java new file mode 100644 index 00000000000..0c6b1d80ffd --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + +import io.seata.rm.tcc.api.BusinessActionContext; +import io.seata.rm.tcc.api.BusinessActionContextParameter; +import io.seata.rm.tcc.api.LocalTCC; +import io.seata.rm.tcc.api.TwoPhaseBusinessAction; + +import java.util.Map; + +/** + * The interface Action1. + * + */ +@LocalTCC +public interface Action1 { + + @TwoPhaseBusinessAction(name = "mock-action", commitMethod = "commitTcc", rollbackMethod = "cancel" +// , useTCCFence = true + ) + String insert(@BusinessActionContextParameter Long reqId, + @BusinessActionContextParameter(paramName = "params") Map params + ); + + + boolean commitTcc(BusinessActionContext actionContext); + + + boolean cancel(BusinessActionContext actionContext); +} diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1Impl.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1Impl.java new file mode 100644 index 00000000000..5ad30d6a09d --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/Action1Impl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + + +import io.seata.rm.tcc.api.BusinessActionContext; +import org.springframework.stereotype.Service; +import vlsi.utils.CompactHashMap; + +import java.util.Map; + +/** + * The type Action1. + */ +@Service +public class Action1Impl implements Action1 { + + private static Map commitMap = new CompactHashMap<>(); + private static Map rollbackMap = new CompactHashMap<>(); + + @Override + public String insert(Long reqId, Map params) { + System.out.println("prepare"); + return "prepare"; + } + + + @Override + public boolean commitTcc(BusinessActionContext actionContext) { + String xid = actionContext.getXid(); + System.out.println("commitTcc:" + xid); + commitMap.compute(xid, (k, v) -> v == null ? 1 : v + 1); + return true; + } + + @Override + public boolean cancel(BusinessActionContext actionContext) { + String xid = actionContext.getXid(); + System.out.println("commitTcc:" + xid); + rollbackMap.compute(xid, (k, v) -> v == null ? 1 : v + 1); + System.out.println("cancel"); + return true; + } + + public static int getCommitTimes(String xid) { + return commitMap.getOrDefault(xid, 0); + } + + public static int getRollbackTimes(String xid) { + return rollbackMap.getOrDefault(xid, 0); + } +} diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/MockServerTest.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/MockServerTest.java new file mode 100644 index 00000000000..eb70ab42bfe --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/MockServerTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + +import io.seata.core.exception.TransactionException; +import io.seata.core.model.BranchType; +import io.seata.core.model.GlobalStatus; +import io.seata.core.model.TransactionManager; +import io.seata.mockserver.MockCoordinator; +import io.seata.mockserver.MockServer; +import io.seata.rm.DefaultResourceManager; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class MockServerTest { + + static String RESOURCE_ID = "mock-action"; + + @BeforeAll + public static void before() { + MockServer.start(); + } + + @AfterAll + public static void after() { + MockServer.close(); + } + + @Test + public void testCommit() throws TransactionException { + String xid = doTestCommit(0); + Assertions.assertEquals(Action1Impl.getCommitTimes(xid), 1); + Assertions.assertEquals(Action1Impl.getRollbackTimes(xid), 0); + } + + @Test + public void testCommitRetry() throws TransactionException { + String xid = doTestCommit(2); + Assertions.assertEquals(Action1Impl.getCommitTimes(xid), 3); + Assertions.assertEquals(Action1Impl.getRollbackTimes(xid), 0); + } + + @Test + public void testRollback() throws TransactionException { + String xid = doTestRollback(0); + Assertions.assertEquals(Action1Impl.getCommitTimes(xid), 0); + Assertions.assertEquals(Action1Impl.getRollbackTimes(xid), 1); + } + + @Test + public void testRollbackRetry() throws TransactionException { + String xid = doTestRollback(2); + Assertions.assertEquals(Action1Impl.getCommitTimes(xid), 0); + Assertions.assertEquals(Action1Impl.getRollbackTimes(xid), 3); + } + + private static String doTestCommit(int times) throws TransactionException { + TransactionManager tm = TmClientTest.getTm(); + DefaultResourceManager rm = RmClientTest.getRm(RESOURCE_ID); + + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP, "test", 60000); + MockCoordinator.getInstance().setExpectedRetry(xid, times); + Long branchId = rm.branchRegister(BranchType.AT, RESOURCE_ID, "1", xid, "1", "1"); + GlobalStatus commit = tm.commit(xid); + Assertions.assertEquals(commit, GlobalStatus.Committed); + return xid; + + } + + private static String doTestRollback(int times) throws TransactionException { + TransactionManager tm = TmClientTest.getTm(); + DefaultResourceManager rm = RmClientTest.getRm(RESOURCE_ID); + + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP, "test", 60000); + MockCoordinator.getInstance().setExpectedRetry(xid, times); + Long branchId = rm.branchRegister(BranchType.AT, RESOURCE_ID, "1", xid, "1", "1"); + GlobalStatus rollback = tm.rollback(xid); + Assertions.assertEquals(rollback, GlobalStatus.Rollbacked); + return xid; + + } +} diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/ProtocolTestConstants.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/ProtocolTestConstants.java new file mode 100644 index 00000000000..3d33226e0bf --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/ProtocolTestConstants.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + +/** + * Mock Constants + * + **/ +public class ProtocolTestConstants { + public static final String APPLICATION_ID = "my_app_test"; + public static final String SERVICE_GROUP = "default_tx_group"; + public static final String SERVER_ADDRESS = "0.0.0.0:8091"; +} diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/RmClientTest.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/RmClientTest.java new file mode 100644 index 00000000000..49766dd33f4 --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/RmClientTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + +import io.netty.channel.Channel; +import io.seata.core.context.RootContext; +import io.seata.core.exception.TransactionException; +import io.seata.core.model.BranchStatus; +import io.seata.core.model.BranchType; +import io.seata.core.protocol.HeartbeatMessage; +import io.seata.core.rpc.netty.RmNettyRemotingClient; +import io.seata.integration.tx.api.interceptor.parser.DefaultResourceRegisterParser; +import io.seata.mockserver.MockCoordinator; +import io.seata.mockserver.MockServer; +import io.seata.rm.DefaultResourceManager; +import io.seata.rm.RMClient; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentMap; + +import static io.seata.core.rpc.netty.ChannelManagerTestHelper.getChannelConcurrentMap; + +/** + * rm client test + **/ +public class RmClientTest { + + + protected static final Logger LOGGER = LoggerFactory.getLogger(RmClientTest.class); + + @BeforeAll + public static void before() { + MockServer.start(); + } + + @AfterAll + public static void after() { + MockServer.close(); + } + + @Test + public void testRm() throws TransactionException { + String resourceId = "mock-action"; + String xid = "1111"; + + DefaultResourceManager rm = getRm(resourceId); + + //branchRegister:TYPE_BRANCH_REGISTER = 11 , TYPE_BRANCH_REGISTER_RESULT = 12 + Long branchId = rm.branchRegister(BranchType.AT, resourceId, "1", xid, "1", "1"); + Assertions.assertTrue(branchId > 0); + + + // branchReport:TYPE_BRANCH_STATUS_REPORT = 13 , TYPE_BRANCH_STATUS_REPORT_RESULT = 14 + // TYPE_SEATA_MERGE = 59 , TYPE_SEATA_MERGE_RESULT = 60 + rm.branchReport(BranchType.AT, xid, branchId, BranchStatus.PhaseTwo_Committed, ""); + LOGGER.info("branchReport ok"); + + //lockQuery:TYPE_GLOBAL_LOCK_QUERY = 21 , TYPE_GLOBAL_LOCK_QUERY_RESULT = 22 + RootContext.bind(xid); + boolean b = rm.lockQuery(BranchType.AT, resourceId, xid, "1"); + LOGGER.info("lockQuery ok, result=" + b); + Assertions.assertTrue(b); + + RmNettyRemotingClient remotingClient = RmNettyRemotingClient.getInstance(); + ConcurrentMap channels = getChannelConcurrentMap(remotingClient); + channels.forEach( + (key, value) -> RmNettyRemotingClient.getInstance().sendAsyncRequest(value, HeartbeatMessage.PING)); + + } + + public static DefaultResourceManager getRm(String resourceId) { + RMClient.init(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP); + DefaultResourceManager rm = DefaultResourceManager.get(); + + //register:TYPE_REG_RM = 103 , TYPE_REG_RM_RESULT = 104 + Action1 target = new Action1Impl(); + DefaultResourceRegisterParser.get().registerResource(target, resourceId); + LOGGER.info("registerResource ok"); + return rm; + } + + +} diff --git a/test/src/test/java/io/seata/core/rpc/netty/mockserver/TmClientTest.java b/test/src/test/java/io/seata/core/rpc/netty/mockserver/TmClientTest.java new file mode 100644 index 00000000000..875e71ed278 --- /dev/null +++ b/test/src/test/java/io/seata/core/rpc/netty/mockserver/TmClientTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty.mockserver; + + +import io.netty.channel.Channel; +import io.seata.core.model.GlobalStatus; +import io.seata.core.model.TransactionManager; +import io.seata.core.protocol.ResultCode; +import io.seata.core.rpc.netty.TmNettyRemotingClient; +import io.seata.mockserver.MockCoordinator; +import io.seata.mockserver.MockServer; +import io.seata.tm.DefaultTransactionManager; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.seata.core.rpc.netty.ChannelManagerTestHelper.getChannel; + +/** + * TmClient Test + **/ +public class TmClientTest { + + protected static final Logger LOGGER = LoggerFactory.getLogger(TmClientTest.class); + + @BeforeAll + public static void before() { + MockServer.start(); + } + + @AfterAll + public static void after() { + MockServer.close(); + } + + @Test + public void testTm() throws Exception { + + TransactionManager tm = getTm(); + + //globalBegin:TYPE_GLOBAL_BEGIN = 1 , TYPE_GLOBAL_BEGIN_RESULT = 2 + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, + ProtocolTestConstants.SERVICE_GROUP, "test", 60000); + LOGGER.info("globalBegin ok:xid=" + xid); + + //globalCommit:TYPE_GLOBAL_COMMIT = 7 , TYPE_GLOBAL_COMMIT_RESULT = 8 + GlobalStatus commit = tm.commit(xid); + LOGGER.info("globalCommit ok:" + commit); + Assertions.assertEquals(commit, GlobalStatus.Committed); + + //globalRollback:TYPE_GLOBAL_ROLLBACK = 9 , TYPE_GLOBAL_ROLLBACK_RESULT = 10 + GlobalStatus rollback = tm.rollback(xid); + LOGGER.info("globalRollback ok:" + rollback); + Assertions.assertEquals(rollback, GlobalStatus.Rollbacked); + + //getStatus:TYPE_GLOBAL_STATUS = 15 , TYPE_GLOBAL_STATUS_RESULT = 16 + GlobalStatus status = tm.getStatus(xid); + LOGGER.info("getStatus ok:" + status); + Assertions.assertEquals(status, GlobalStatus.Rollbacked); + + //globalReport:TYPE_GLOBAL_REPORT = 17 , TYPE_GLOBAL_REPORT_RESULT = 18 + GlobalStatus globalReport = tm.globalReport(xid, GlobalStatus.Committed); + LOGGER.info("globalReport ok:" + globalReport); + Assertions.assertEquals(globalReport, GlobalStatus.Committed); + + MockCoordinator.getInstance().setExepectedResult(xid, ResultCode.Failed); +// GlobalStatus globalReport2 = tm.globalReport(xid, GlobalStatus.Committed); + + GlobalStatus rollback2 = tm.rollback(xid); + LOGGER.info("globalRollback ok:" + rollback2); + // TODO expected response fail , but DefaultTransactionManager ignore resultCode + } + + @NotNull + public static TransactionManager getTm() { + TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance( + ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP); + tmNettyRemotingClient.init(); + TransactionManager tm = new DefaultTransactionManager(); + + //register:TYPE_REG_CLT = 101 , TYPE_REG_CLT_RESULT = 102 + TmNettyRemotingClient client = TmNettyRemotingClient.getInstance(); + Channel channel = getChannel(client); + LOGGER.info("TM register ok:channel=" + channel); + return tm; + } + + +}