From 7e4116f55c921ec61db071bf328331ffef8c478d Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Wed, 22 Jun 2022 19:54:28 +0300 Subject: [PATCH 1/2] IGNITE-16183 wip --- .../ignite/raft/jraft/core/ItNodeTest.java | 7 +- .../raft/server/ItJraftCounterServerTest.java | 153 ++++++++++++++++-- .../raft/server/impl/JraftServerImpl.java | 9 +- .../apache/ignite/raft/jraft/NodeManager.java | 22 +++ .../ignite/raft/jraft/RaftGroupService.java | 13 +- .../ignite/raft/jraft/RaftMessageGroup.java | 6 + .../ignite/raft/jraft/core/NodeImpl.java | 3 +- .../ignite/raft/jraft/core/Replicator.java | 36 +++-- .../raft/jraft/core/ReplicatorGroupImpl.java | 2 +- .../ignite/raft/jraft/option/NodeOptions.java | 44 +++-- .../raft/jraft/rpc/RaftClientService.java | 15 +- .../ignite/raft/jraft/rpc/RpcClient.java | 2 +- .../ignite/raft/jraft/rpc/RpcRequests.java | 10 ++ .../raft/jraft/rpc/impl/IgniteRpcServer.java | 2 + .../core/AppendEntriesRequestProcessor.java | 8 +- .../impl/core/DefaultRaftClientService.java | 30 ++++ .../impl/core/HeartbeatRequestProcessor.java | 65 ++++++++ .../raft/jraft/core/ReplicatorTest.java | 7 +- .../ignite/raft/jraft/core/TestCluster.java | 8 +- .../ignite/raft/jraft/rpc/IgniteRpcTest.java | 3 +- .../raft/jraft/rpc/TestIgniteRpcServer.java | 5 +- 21 files changed, 370 insertions(+), 80 deletions(-) create mode 100644 modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/HeartbeatRequestProcessor.java diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java index 3087963b980..97fd6f5cc7a 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java @@ -3776,6 +3776,7 @@ private NodeOptions createNodeOptions(int nodeIdx) { log.start(); options.setServiceFactory(new DefaultJRaftServiceFactory(log)); + options.setNodeManager(new NodeManager()); return options; } @@ -3864,8 +3865,6 @@ private RaftGroupService createService(String groupId, PeerId peerId, NodeOption .map(JRaftUtils::addressFromEndpoint) .collect(toList()); - var nodeManager = new NodeManager(); - ClusterService clusterService = ClusterServiceTestUtils.clusterService( testInfo, peerId.getEndpoint().getPort(), @@ -3876,13 +3875,13 @@ private RaftGroupService createService(String groupId, PeerId peerId, NodeOption executors.add(requestExecutor); - IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions, requestExecutor); + IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, nodeOptions, requestExecutor); nodeOptions.setRpcClient(new IgniteRpcClient(clusterService)); clusterService.start(); - var service = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager) { + var service = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer) { @Override public synchronized void shutdown() { rpcServer.shutdown(); diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java index cd36eb2e1c7..427fab80297 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java @@ -37,7 +37,11 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -45,6 +49,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -53,6 +58,7 @@ import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.thread.NamedThreadFactory; @@ -66,14 +72,27 @@ import org.apache.ignite.raft.client.WriteCommand; import org.apache.ignite.raft.client.service.CommandClosure; import org.apache.ignite.raft.client.service.RaftGroupService; +import org.apache.ignite.raft.jraft.ReplicatorGroup; +import org.apache.ignite.raft.jraft.Status; +import org.apache.ignite.raft.jraft.core.ElectionPriority; import org.apache.ignite.raft.jraft.core.NodeImpl; -import org.apache.ignite.raft.jraft.core.StateMachineAdapter; +import org.apache.ignite.raft.jraft.entity.PeerId; import org.apache.ignite.raft.jraft.option.NodeOptions; +import org.apache.ignite.raft.jraft.rpc.CoalescedHeartbeatRequestBuilder; +import org.apache.ignite.raft.jraft.rpc.InvokeCallback; +import org.apache.ignite.raft.jraft.rpc.Message; +import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest; +import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesResponse; +import org.apache.ignite.raft.jraft.rpc.RpcRequests.CoalescedHeartbeatResponse; +import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter; +import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient; import org.apache.ignite.raft.jraft.rpc.impl.RaftException; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl; +import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor.PeerPair; import org.apache.ignite.raft.jraft.test.TestUtils; import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -694,7 +713,7 @@ public void testTimerThreadsCount() { for (int i = 0; i < groupsCnt; i++) { String grp = "counter" + i; - assertTrue(waitForCondition(() -> hasLeader(grp), 30_000)); + assertTrue(waitForCondition(() -> getLeader(grp) != null, 30_000)); } Set threads = Thread.getAllStackTraces().keySet(); @@ -707,6 +726,122 @@ public void testTimerThreadsCount() { "All timer threads: " + timerThreads.toString()); } + /** Tests if a starting a new group in shared pools mode doesn't increases timer threads count. */ + @Test + public void testTimerThreadsCount2() throws Exception { + JraftServerImpl srv0 = startServer(0, x -> {}, opts -> {}); + JraftServerImpl srv1 = startServer(1, x -> {}, opts -> opts.setElectionPriority(ElectionPriority.NotElected)); + JraftServerImpl srv2 = startServer(2, x -> {}, opts -> opts.setElectionPriority(ElectionPriority.NotElected)); + + waitForTopology(srv0.clusterService(), 3, 5_000); + + final int grps = 2; + + for (int i = 0; i < grps; i++) { + String grp = "testGrp" + i; + srv0.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF); + srv1.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF); + srv2.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF); + } + + // Allow the parallel start. + for (int i = 0; i < grps; i++) { + final String grp = "testGrp" + i; + assertTrue(waitForCondition(() -> getLeader(grp) != null, 30_000)); + } + + // Allow the parallel start. + for (int i = 0; i < grps; i++) { + final String grp = "testGrp" + i; + + NodeImpl leader = getLeader(grp); + ReplicatorGroup replicatorGroup = IgniteTestUtils.getFieldValue(leader, "replicatorGroup"); + List peers = leader.getCurrentConf().getPeers(); + + CountDownLatch l = new CountDownLatch(2); + + for (PeerId peer : peers) { + if (peer.equals(leader.getServerId())) + continue; + + replicatorGroup.sendHeartbeat(peer, new RpcResponseClosureAdapter() { + @Override + public void run(Status status) { + AppendEntriesResponse response = getResponse(); + + assertNotNull(response); + + l.countDown(); + } + }); + } + } + + RaftGroupService service = startClient("testGrp0"); + service.refreshLeader().get(); + + // TODO iterate over all leaders per server + NodeImpl leader = getLeader("testGrp0"); + + service.run(new IncrementAndGetCommand(1)).get(); + + ConcurrentMap> coalesced = leader.getOptions().getNodeManager().getCoalesced(); + + assertEquals(grps, coalesced.size()); + + IgniteRpcClient sender = new IgniteRpcClient(srv0.clusterService()); + + for (PeerPair peerPair : coalesced.keySet()) { + coalesced.compute(peerPair, new BiFunction, Queue>() { + @Override + public Queue apply(PeerPair peerPair, Queue queue) { + if (!queue.isEmpty()) { + CoalescedHeartbeatRequestBuilder builder = leader.getOptions().getRaftMessagesFactory().coalescedHeartbeatRequest(); + builder.messages(new ArrayList<>()); + + Object[] req; + + List> futs = new ArrayList<>(); + + while((req = queue.poll()) != null) { + builder.messages().add((AppendEntriesRequest) req[0]); + + futs.add((CompletableFuture) req[1]); + } + + // TODO asch store parsed. + PeerId peerId = PeerId.parsePeer(peerPair.remote); + + sender.invokeAsync(peerId.getEndpoint(), builder.build(), null, new InvokeCallback() { + @Override + public void complete(Object result, Throwable err) { + CoalescedHeartbeatResponse resp = (CoalescedHeartbeatResponse) result; + + assert resp.messages().size() == futs.size(); + + // TODO asch complete futures in parallel + int i = 0; + for (Message message : resp.messages()) { + futs.get(i++).complete(message); + } + } + }, 5_000); + } + + // TODO asch return null + return queue; + } + }); + } + + Thread.sleep(2000); + + System.out.println(); + + + //assertTrue(l.await(5_000, TimeUnit.MILLISECONDS)); + } + /** * Returns {@code true} if thread is related to timers. * @@ -724,16 +859,12 @@ private boolean isTimer(Thread thread) { * Returns {@code true} if a raft group has elected a leader for a some term. * * @param grpId Group id. - * @return {@code True} if a leader is elected. + * @return The leader or null. */ - private boolean hasLeader(String grpId) { - return servers.stream().anyMatch(s -> { - NodeImpl node = (NodeImpl) s.raftGroupService(grpId).getRaftNode(); - - StateMachineAdapter fsm = (StateMachineAdapter) node.getOptions().getFsm(); - - return node.isLeader() && fsm.getLeaderTerm() == node.getCurrentTerm(); - }); + private @Nullable NodeImpl getLeader(String grpId) { + return servers.stream().map(s -> (NodeImpl) s.raftGroupService(grpId).getRaftNode()).filter(node -> { + return node.isLeader(); + }).findFirst().orElse(null); } /** diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index d1569316415..7e5f982e882 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -94,9 +94,6 @@ public class JraftServerImpl implements RaftServer { /** Started groups. */ private ConcurrentMap groups = new ConcurrentHashMap<>(); - /** Node manager. */ - private final NodeManager nodeManager; - /** Options. */ private final NodeOptions opts; @@ -123,7 +120,6 @@ public JraftServerImpl(ClusterService service, Path dataPath) { public JraftServerImpl(ClusterService service, Path dataPath, NodeOptions opts) { this.service = service; this.dataPath = dataPath; - this.nodeManager = new NodeManager(); this.logStorageFactory = new DefaultLogStorageFactory(dataPath.resolve("log")); this.opts = opts; @@ -132,6 +128,7 @@ public JraftServerImpl(ClusterService service, Path dataPath, NodeOptions opts) this.opts.setRpcDefaultTimeout(this.opts.getElectionTimeoutMs() / 2); this.opts.setSharedPools(true); this.opts.setServiceFactory(new DefaultJRaftServiceFactory(logStorageFactory)); + this.opts.setNodeManager(new NodeManager()); if (opts.getServerName() == null) { this.opts.setServerName(service.localConfiguration().getName()); @@ -192,7 +189,7 @@ public void start() { rpcServer = new IgniteRpcServer( service, - nodeManager, + opts.getNodeManager(), opts.getRaftMessagesFactory(), requestExecutor ); @@ -360,7 +357,7 @@ public synchronized boolean startRaftGroup(String groupId, @NotNull RaftGroupEve var peerId = new PeerId(addr.host(), addr.port(), 0, ElectionPriority.DISABLED); - var server = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager); + var server = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer); server.start(); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java index c66f7359c72..dc502f773ee 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java @@ -16,14 +16,23 @@ */ package org.apache.ignite.raft.jraft; +import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; import java.util.stream.Collectors; import org.apache.ignite.raft.jraft.entity.NodeId; import org.apache.ignite.raft.jraft.entity.PeerId; +import org.apache.ignite.raft.jraft.rpc.Message; +import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest; +import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesResponse; +import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor.PeerPair; import org.apache.ignite.raft.jraft.util.OnlyForTest; /** @@ -32,6 +41,7 @@ public class NodeManager { private final ConcurrentMap nodeMap = new ConcurrentHashMap<>(); private final ConcurrentMap> groupMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> coalesced = new ConcurrentHashMap<>(); /** * Adds a node. @@ -96,4 +106,16 @@ public List getNodesByGroupId(final String groupId) { public List getAllNodes() { return this.groupMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); } + + public CompletableFuture enqueue(AppendEntriesRequest request) { + CompletableFuture fut = new CompletableFuture<>(); + + boolean added = coalesced.computeIfAbsent(new PeerPair(request.serverId(), request.peerId()), k -> new ConcurrentLinkedQueue<>()).add(new Object[]{request, fut}); + + return fut; + } + + public ConcurrentMap> getCoalesced() { + return coalesced; + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java index 8c6355e01cd..719d5171114 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java @@ -60,26 +60,19 @@ public class RaftGroupService { */ private Node node; - /** - * The node manager. - */ - private NodeManager nodeManager; - /** * @param groupId Group Id. * @param serverId Server id. * @param nodeOptions Node options. * @param rpcServer RPC server. - * @param nodeManager Node manager. */ public RaftGroupService(final String groupId, final PeerId serverId, final NodeOptions nodeOptions, - final RpcServer rpcServer, final NodeManager nodeManager) { + final RpcServer rpcServer) { super(); this.groupId = groupId; this.serverId = serverId; this.nodeOptions = nodeOptions; this.rpcServer = rpcServer; - this.nodeManager = nodeManager; } public synchronized Node getRaftNode() { @@ -119,7 +112,7 @@ public synchronized Node start() { throw new IgniteInternalException("Fail to init node, please see the logs to find the reason."); } - this.nodeManager.add(this.node); + this.nodeOptions.getNodeManager().add(this.node); this.started = true; LOG.info("Start the RaftGroupService successfully {}", this.node.getNodeId()); return this.node; @@ -139,7 +132,7 @@ public synchronized void shutdown() { LOG.error("Interrupted while waiting for the node to shutdown"); } - nodeManager.remove(this.node); + this.nodeOptions.getNodeManager().remove(this.node); this.started = false; LOG.info("Stop the RaftGroupService successfully."); } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java index 9aae7bc9315..1656be3612a 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java @@ -162,6 +162,12 @@ public static final class RpcRequestsMessageGroup { /** */ public static final short SM_ERROR_RESPONSE = 3014; + + /** */ + public static final short COALESCED_HEARTBEAT_REQUEST = 3015; + + /** */ + public static final short COALESCED_HEARTBEAT_RESPONSE = 3016; } /** diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index 962fbfa9067..469d0fc39f8 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -36,7 +36,6 @@ import java.util.stream.Collectors; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.lang.IgniteLogger; -import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.client.Peer; import org.apache.ignite.raft.jraft.Closure; import org.apache.ignite.raft.jraft.FSMCaller; @@ -649,7 +648,7 @@ private void adjustElectionTimeout() { electionRound++; if (electionRound > 1) - LOG.info("Unsuccessful election round number {}", electionRound); + LOG.info("Node {} election failed, round number {}", getNodeId(), electionRound); if (!electionAdjusted) { initialElectionTimeout = options.getElectionTimeoutMs(); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java index cc75449b3b3..b9e610efd3b 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java @@ -511,8 +511,7 @@ private Inflight pollInflight() { private void startHeartbeatTimer(final long startMs) { final long dueTime = startMs + this.options.getDynamicHeartBeatTimeoutMs(); try { - this.heartbeatTimer = this.timerManager.schedule(() -> onTimeout(this.id), dueTime - Utils.nowMs(), - TimeUnit.MILLISECONDS); + this.heartbeatTimer = this.timerManager.schedule(() -> onTimeout(this.id), dueTime - Utils.nowMs(), TimeUnit.MILLISECONDS); } catch (final Exception e) { LOG.error("Fail to schedule heartbeat timer", e); @@ -657,7 +656,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r, } private void sendEmptyEntries(final boolean isHeartbeat) { - sendEmptyEntries(isHeartbeat, null); + sendEmptyEntries(isHeartbeat, null, false); } /** @@ -665,10 +664,11 @@ private void sendEmptyEntries(final boolean isHeartbeat) { * * @param isHeartbeat if current entries is heartbeat * @param heartBeatClosure heartbeat callback + * @param coalesce {@code True} to coalesce heartbeats. */ @SuppressWarnings("NonAtomicOperationOnVolatileField") - private void sendEmptyEntries(final boolean isHeartbeat, - final RpcResponseClosure heartBeatClosure) { + private void sendEmptyEntries(final boolean isHeartbeat, final RpcResponseClosure heartBeatClosure, + boolean coalesce) { final AppendEntriesRequestBuilder rb = raftOptions.getRaftMessagesFactory().appendEntriesRequest(); if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) { // id is unlock in installSnapshot @@ -700,8 +700,8 @@ public void run(final Status status) { } }; } - this.heartbeatInFly = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request, - this.options.getElectionTimeoutMs() / 2, heartbeatDone); + this.heartbeatInFly = this.rpcService.sendHeartbeat(this.options.getPeerId().getEndpoint(), request, coalesce, + this.options.getElectionTimeoutMs() / 2, heartbeatDone); } else { // No entries and has empty data means a probe request. @@ -813,7 +813,7 @@ public static ThreadId start(final ReplicatorOptions opts, final RaftOptions raf LOG.info("Replicator={}@{} is started", r.id, r.options.getPeerId()); r.catchUpClosure = null; r.lastRpcSendTimestamp = Utils.monotonicMs(); - r.startHeartbeatTimer(Utils.nowMs()); + //r.startHeartbeatTimer(Utils.nowMs()); // id.unlock in sendEmptyEntries r.sendEmptyEntries(false); return r.id; @@ -953,7 +953,7 @@ public void onError(final ThreadId id, final Object data, final int errorCode) { try { for (final Inflight inflight : r.inflights) { if (inflight != r.rpcInFly) { - inflight.rpcFuture.cancel(true); // TODO asch makes sense to cancel scalecube future ? + inflight.rpcFuture.cancel(true); } } if (r.rpcInFly != null) { @@ -1111,7 +1111,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap r.state = State.Probe; notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); if (++r.consecutiveErrorTimes % 10 == 0) { - LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(), + LOG.warn("QQQQ Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(), r.consecutiveErrorTimes, status); } r.startHeartbeatTimer(startTimeMs); // TODO asch use discovery instead of constant probing IGNITE-14843 @@ -1444,11 +1444,11 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight if (r.timeoutNowIndex > 0 && r.timeoutNowIndex < r.nextIndex) { r.sendTimeoutNow(false, false); } + r.startHeartbeatTimer(Utils.nowMs()); // Start hearbeating after successful log reconciliation. return true; } - private boolean fillCommonFields(final AppendEntriesRequestBuilder rb, long prevLogIndex, - final boolean isHeartbeat) { + private boolean fillCommonFields(final AppendEntriesRequestBuilder rb, long prevLogIndex, final boolean isHeartbeat) { final long prevLogTerm = this.options.getLogManager().getTerm(prevLogIndex); if (prevLogTerm == 0 && prevLogIndex != 0) { if (!isHeartbeat) { @@ -1623,15 +1623,18 @@ public void run(final Status status) { public static void sendHeartbeat( final ThreadId id, final RpcResponseClosure closure, - ExecutorService executor + ExecutorService executor, boolean coalesce ) { final Replicator r = (Replicator) id.lock(); if (r == null) { Utils.runClosureInThread(executor, closure, new Status(RaftError.EHOSTDOWN, "Peer %s is not connected", id)); return; } +// if (r.getOpts().getPeerId().getPort() == 5005) +// LOG.info("sendHeartbeat {}", id); + //id unlock in send empty entries. - r.sendEmptyEntries(true, closure); + r.sendEmptyEntries(true, closure, coalesce); } private static void sendHeartbeat(final ThreadId id) { @@ -1640,7 +1643,10 @@ private static void sendHeartbeat(final ThreadId id) { return; } // unlock in sendEmptyEntries - r.sendEmptyEntries(true); +// if (r.getOpts().getPeerId().getPort() == 5005) +// LOG.info("sendHeartbeat 2 {}", id); + + r.sendEmptyEntries(true, null, false); } @SuppressWarnings("SameParameterValue") diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReplicatorGroupImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReplicatorGroupImpl.java index 76e71c9b963..37a71efe65a 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReplicatorGroupImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReplicatorGroupImpl.java @@ -94,7 +94,7 @@ public void sendHeartbeat(final PeerId peer, final RpcResponseClosure { */ private StripedDisruptor logManagerDisruptor; + /** + * Node manager. + */ + private NodeManager nodeManager; + /** * Amount of Disruptors that will handle the RAFT server. */ @@ -573,6 +580,14 @@ public void setLogManagerDisruptor(StripedDisruptor requestVote(final Endpoint endpoint, final RpcRequests.RequestVo * * @param endpoint destination address (ip, port) * @param request request data + * @param timeoutMs timeout * @param done callback * @return a future with result */ Future appendEntries(final Endpoint endpoint, final RpcRequests.AppendEntriesRequest request, final int timeoutMs, final RpcResponseClosure done); + /** + * Sends a heartbeat request and handle the response with done. + * + * @param endpoint destination address (ip, port) + * @param request request data + * @param coalesce {@code true} to coalesce heartbeats + * @param timeoutMs timeout + * @param done callback + * @return a future with result + */ + Future sendHeartbeat(final Endpoint endpoint, final RpcRequests.AppendEntriesRequest request, boolean coalesce, + final int timeoutMs, final RpcResponseClosure done); + /** * Sends a install-snapshot request and handle the response with done. * diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java index 6251d8b5235..82a9b8bf7f0 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java @@ -33,7 +33,7 @@ public interface RpcClient extends Lifecycle { * * @param endpoint target address * @return true if there is a connection and the connection is active and writable. - * @deprecated // TODO asch remove IGNITE-14832 + * @deprecated // TODO asch not used, remove IGNITE-14832 */ boolean checkConnection(Endpoint endpoint); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java index 0cf6093bafd..75b755dd33d 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java @@ -226,4 +226,14 @@ public interface ReadIndexResponse extends Message { boolean success(); } + + @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.COALESCED_HEARTBEAT_REQUEST) + public interface CoalescedHeartbeatRequest extends Message { + Collection messages(); + } + + @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.COALESCED_HEARTBEAT_RESPONSE) + public interface CoalescedHeartbeatResponse extends Message { + Collection messages(); + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java index 7681e7979c4..aed352b76b1 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java @@ -50,6 +50,7 @@ import org.apache.ignite.raft.jraft.rpc.impl.cli.TransferLeaderRequestProcessor; import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor; import org.apache.ignite.raft.jraft.rpc.impl.core.GetFileRequestProcessor; +import org.apache.ignite.raft.jraft.rpc.impl.core.HeartbeatRequestProcessor; import org.apache.ignite.raft.jraft.rpc.impl.core.InstallSnapshotRequestProcessor; import org.apache.ignite.raft.jraft.rpc.impl.core.ReadIndexRequestProcessor; import org.apache.ignite.raft.jraft.rpc.impl.core.RequestVoteRequestProcessor; @@ -100,6 +101,7 @@ public IgniteRpcServer( registerProcessor(new PingRequestProcessor(rpcExecutor, raftMessagesFactory)); registerProcessor(new TimeoutNowRequestProcessor(rpcExecutor, raftMessagesFactory)); registerProcessor(new ReadIndexRequestProcessor(rpcExecutor, raftMessagesFactory)); + registerProcessor(new HeartbeatRequestProcessor(rpcExecutor, raftMessagesFactory)); // raft native cli service registerProcessor(new AddPeerRequestProcessor(rpcExecutor, raftMessagesFactory)); registerProcessor(new RemovePeerRequestProcessor(rpcExecutor, raftMessagesFactory)); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java index 6b69c2f8d24..84a57f3d92c 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java @@ -152,13 +152,13 @@ PeerPair pairOf(final String peerId, final String serverId) { /** * A peer pair */ - static class PeerPair { + public static class PeerPair { // peer in local node - final String local; + public final String local; // peer in remote node - final String remote; + public final String remote; - PeerPair(final String local, final String remote) { + public PeerPair(final String local, final String remote) { super(); this.local = local; this.remote = remote; diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java index f474b659544..cf387659e72 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java @@ -21,6 +21,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import org.apache.ignite.raft.jraft.NodeManager; import org.apache.ignite.raft.jraft.Status; import org.apache.ignite.raft.jraft.error.RaftError; import org.apache.ignite.raft.jraft.error.RemotingException; @@ -31,6 +33,7 @@ import org.apache.ignite.raft.jraft.rpc.RaftClientService; import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest; import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesResponse; +import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse; import org.apache.ignite.raft.jraft.rpc.RpcRequests.GetFileRequest; import org.apache.ignite.raft.jraft.rpc.RpcRequests.GetFileResponse; import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotRequest; @@ -91,6 +94,33 @@ public Future appendEntries(final Endpoint endpoint, final AppendEntrie return failedFuture(executor, request, done, endpoint); } + @Override + public Future sendHeartbeat(Endpoint endpoint, AppendEntriesRequest request, boolean coalesce, int timeoutMs, + RpcResponseClosure done) { + if (!coalesce) + return appendEntries(endpoint, request, timeoutMs, done); + + NodeManager nodeManager = this.nodeOptions.getNodeManager(); + + CompletableFuture fut = nodeManager.enqueue(request); + + fut.whenComplete(new BiConsumer() { + @Override + public void accept(Message resp, Throwable throwable) { + if (resp instanceof ErrorResponse) { + ErrorResponse resp0 = (ErrorResponse) resp; + done.run(new Status(resp0.errorCode(), resp0.errorMsg())); + } + else { + done.setResponse((AppendEntriesResponse) resp); + done.run(Status.OK()); + } + } + }); + + return fut; + } + @Override public Future getFile(final Endpoint endpoint, final GetFileRequest request, final int timeoutMs, final RpcResponseClosure done) { diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/HeartbeatRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/HeartbeatRequestProcessor.java new file mode 100644 index 00000000000..258df44b78c --- /dev/null +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/HeartbeatRequestProcessor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.raft.jraft.rpc.impl.core; + +import java.util.ArrayList; +import java.util.concurrent.Executor; +import org.apache.ignite.raft.jraft.Node; +import org.apache.ignite.raft.jraft.RaftMessagesFactory; +import org.apache.ignite.raft.jraft.entity.PeerId; +import org.apache.ignite.raft.jraft.rpc.CoalescedHeartbeatResponseBuilder; +import org.apache.ignite.raft.jraft.rpc.Message; +import org.apache.ignite.raft.jraft.rpc.RaftServerService; +import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure; +import org.apache.ignite.raft.jraft.rpc.RpcRequestProcessor; +import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest; +import org.apache.ignite.raft.jraft.rpc.RpcRequests.CoalescedHeartbeatRequest; + +/** + * Heartbeat request procesor. + */ +public class HeartbeatRequestProcessor extends RpcRequestProcessor { + public HeartbeatRequestProcessor(Executor executor, RaftMessagesFactory msgFactory) { + super(executor, msgFactory); + } + + @Override + public Message processRequest(CoalescedHeartbeatRequest request, RpcRequestClosure done) { + CoalescedHeartbeatResponseBuilder builder = msgFactory().coalescedHeartbeatResponse(); + builder.messages(new ArrayList<>()); + + for (AppendEntriesRequest message : request.messages()) { + PeerId peerId = PeerId.parsePeer(message.peerId()); + + // TODO asch error handle + final Node node = done.getRpcCtx().getNodeManager().get(message.groupId(), peerId); + + RaftServerService svc = (RaftServerService) node; + + Message msg = svc.handleAppendEntriesRequest(message, null); + + builder.messages().add(msg); + } + + return builder.build(); + } + + @Override + public String interest() { + return CoalescedHeartbeatRequest.class.getName(); + } +} diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java index 103ac9d4951..2b89f37e67b 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java @@ -595,7 +595,10 @@ public void testSendHeartbeat() { assertNull(r.getHeartbeatInFly()); final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest(true); Mockito.when( - this.rpcService.appendEntries(eq(this.peerId.getEndpoint()), eq(request), + this.rpcService.sendHeartbeat( + eq(this.peerId.getEndpoint()), + eq(request), + Mockito.anyBoolean(), eq(this.opts.getElectionTimeoutMs() / 2), Mockito.any())).thenAnswer(new Answer() { @Override public Future answer(InvocationOnMock invocation) throws Throwable { return new CompletableFuture<>(); @@ -606,7 +609,7 @@ public void testSendHeartbeat() { public void run(final Status status) { assertTrue(status.isOk()); } - }, node.getOptions().getCommonExecutor()); + }, node.getOptions().getCommonExecutor(), false); assertNotNull(r.getHeartbeatInFly()); diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java index 83b6a2b8b8d..c92de8f1b11 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java @@ -62,7 +62,6 @@ import org.apache.ignite.raft.jraft.util.Endpoint; import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; import org.apache.ignite.raft.jraft.util.Utils; -import org.apache.ignite.utils.ClusterServiceTestUtils; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.TestInfo; @@ -252,17 +251,16 @@ public boolean start(Endpoint listenAddr, boolean emptyPeers, int snapshotInterv .map(JRaftUtils::addressFromEndpoint) .collect(toList()); - NodeManager nodeManager = new NodeManager(); - ClusterService clusterService = clusterService(testInfo, listenAddr.getPort(), new StaticNodeFinder(addressList)); var rpcClient = new IgniteRpcClient(clusterService); nodeOptions.setRpcClient(rpcClient); + nodeOptions.setNodeManager(new NodeManager()); ExecutorService requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions); - var rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions, requestExecutor); + var rpcServer = new TestIgniteRpcServer(clusterService, nodeOptions, requestExecutor); clusterService.start(); @@ -270,7 +268,7 @@ public boolean start(Endpoint listenAddr, boolean emptyPeers, int snapshotInterv optsClo.accept(nodeOptions); RaftGroupService server = new RaftGroupService(this.name, new PeerId(listenAddr, 0, priority), - nodeOptions, rpcServer, nodeManager) { + nodeOptions, rpcServer) { @Override public synchronized void shutdown() { // This stop order is consistent with JRaftServerImpl rpcServer.shutdown(); diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java index 4199cec336d..646980f5804 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java @@ -70,10 +70,11 @@ public IgniteRpcTest(TestInfo testInfo) { ); NodeOptions nodeOptions = new NodeOptions(); + nodeOptions.setNodeManager(new NodeManager()); requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions); - var server = new TestIgniteRpcServer(service, new NodeManager(), nodeOptions, requestExecutor) { + var server = new TestIgniteRpcServer(service, nodeOptions, requestExecutor) { @Override public void shutdown() { super.shutdown(); diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java index 07a1fb514eb..3dcec3ef563 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java @@ -29,15 +29,14 @@ public class TestIgniteRpcServer extends IgniteRpcServer { /** * @param clusterService Cluster service. - * @param nodeManager Node manager. * @param nodeOptions Node options. * @param requestExecutor Requests executor. */ - public TestIgniteRpcServer(ClusterService clusterService, NodeManager nodeManager, NodeOptions nodeOptions, + public TestIgniteRpcServer(ClusterService clusterService, NodeOptions nodeOptions, ExecutorService requestExecutor) { super( clusterService, - nodeManager, + nodeOptions.getNodeManager(), nodeOptions.getRaftMessagesFactory(), requestExecutor ); From 568601cd255e66640b5523f77f19950356be6b3e Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Thu, 23 Jun 2022 14:26:20 +0300 Subject: [PATCH 2/2] IGNITE-16183 wip --- .../raft/server/ItJraftCounterServerTest.java | 33 ++++++++----------- .../ignite/raft/jraft/core/Replicator.java | 30 ++++++++++------- .../ignite/raft/jraft/util/ThreadId.java | 2 +- 3 files changed, 34 insertions(+), 31 deletions(-) diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java index 427fab80297..9287fad7438 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java @@ -750,7 +750,15 @@ public void testTimerThreadsCount2() throws Exception { assertTrue(waitForCondition(() -> getLeader(grp) != null, 30_000)); } - // Allow the parallel start. + final String grp0 = "testGrp0"; + + // Move committed index for a first group. + RaftGroupService service = startClient(grp0); + service.refreshLeader().get(); + service.run(new IncrementAndGetCommand(1)).get(); + + CountDownLatch l = new CountDownLatch(4); + for (int i = 0; i < grps; i++) { final String grp = "testGrp" + i; @@ -758,8 +766,6 @@ public void testTimerThreadsCount2() throws Exception { ReplicatorGroup replicatorGroup = IgniteTestUtils.getFieldValue(leader, "replicatorGroup"); List peers = leader.getCurrentConf().getPeers(); - CountDownLatch l = new CountDownLatch(2); - for (PeerId peer : peers) { if (peer.equals(leader.getServerId())) continue; @@ -777,14 +783,7 @@ public void run(Status status) { } } - RaftGroupService service = startClient("testGrp0"); - service.refreshLeader().get(); - - // TODO iterate over all leaders per server - NodeImpl leader = getLeader("testGrp0"); - - service.run(new IncrementAndGetCommand(1)).get(); - + NodeImpl leader = getLeader(grp0); ConcurrentMap> coalesced = leader.getOptions().getNodeManager().getCoalesced(); assertEquals(grps, coalesced.size()); @@ -797,7 +796,8 @@ public void run(Status status) { public Queue apply(PeerPair peerPair, Queue queue) { if (!queue.isEmpty()) { CoalescedHeartbeatRequestBuilder builder = leader.getOptions().getRaftMessagesFactory().coalescedHeartbeatRequest(); - builder.messages(new ArrayList<>()); + ArrayList list = new ArrayList<>(); + builder.messages(list); Object[] req; @@ -822,7 +822,7 @@ public void complete(Object result, Throwable err) { // TODO asch complete futures in parallel int i = 0; for (Message message : resp.messages()) { - futs.get(i++).complete(message); + futs.get(i++).complete(message); // Future completion will trigger callbacks. } } }, 5_000); @@ -834,12 +834,7 @@ public void complete(Object result, Throwable err) { }); } - Thread.sleep(2000); - - System.out.println(); - - - //assertTrue(l.await(5_000, TimeUnit.MILLISECONDS)); + assertTrue(l.await(5_000, TimeUnit.MILLISECONDS)); } /** diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java index b9e610efd3b..6093fc826e4 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java @@ -511,7 +511,8 @@ private Inflight pollInflight() { private void startHeartbeatTimer(final long startMs) { final long dueTime = startMs + this.options.getDynamicHeartBeatTimeoutMs(); try { - this.heartbeatTimer = this.timerManager.schedule(() -> onTimeout(this.id), dueTime - Utils.nowMs(), TimeUnit.MILLISECONDS); + this.heartbeatTimer = this.timerManager.schedule(() -> onTimeout(this.id), dueTime - Utils.nowMs(), + TimeUnit.MILLISECONDS); } catch (final Exception e) { LOG.error("Fail to schedule heartbeat timer", e); @@ -953,7 +954,7 @@ public void onError(final ThreadId id, final Object data, final int errorCode) { try { for (final Inflight inflight : r.inflights) { if (inflight != r.rpcInFly) { - inflight.rpcFuture.cancel(true); + inflight.rpcFuture.cancel(true); // TODO asch makes sense to cancel scalecube future ? } } if (r.rpcInFly != null) { @@ -1075,8 +1076,13 @@ private void releaseReader() { } } - static void onHeartbeatReturned(final ThreadId id, final Status status, final AppendEntriesRequest request, - final AppendEntriesResponse response, final long rpcSendTime) { + public static void onHeartbeatReturned(final ThreadId id, final Status status, final AppendEntriesRequest request, + final AppendEntriesResponse response, final long rpcSendTime) { + onHeartbeatReturned(id, status, request, response, rpcSendTime, true); + } + + public static void onHeartbeatReturned(final ThreadId id, final Status status, final AppendEntriesRequest request, + final AppendEntriesResponse response, final long rpcSendTime, boolean reschedule) { if (id == null) { // replicator already was destroyed. return; @@ -1096,11 +1102,11 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap .append(':') // .append(r.options.getServerId()) // .append(" received HeartbeatResponse from ") // - .append(r.options.getPeerId()) // - .append(" prevLogIndex=") // - .append(request.prevLogIndex()) // - .append(" prevLogTerm=") // - .append(request.prevLogTerm()); + .append(r.options.getPeerId()); // +// .append(" prevLogIndex=") // +// .append(request.prevLogIndex()) // +// .append(" prevLogTerm=") // +// .append(request.prevLogTerm()); } if (!status.isOk()) { if (isLogDebugEnabled) { @@ -1153,7 +1159,8 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap if (rpcSendTime > r.lastRpcSendTimestamp) { r.lastRpcSendTimestamp = rpcSendTime; } - r.startHeartbeatTimer(startTimeMs); + if (reschedule) + r.startHeartbeatTimer(startTimeMs); } finally { if (doUnlock) { @@ -1444,7 +1451,8 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight if (r.timeoutNowIndex > 0 && r.timeoutNowIndex < r.nextIndex) { r.sendTimeoutNow(false, false); } - r.startHeartbeatTimer(Utils.nowMs()); // Start hearbeating after successful log reconciliation. + if (entriesSize == 0) // Start hearbeating after successful log reconciliation (after probe request) + r.startHeartbeatTimer(Utils.nowMs()); return true; } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java index 89fef9c30e6..9dd92bfe8da 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java @@ -32,7 +32,7 @@ public class ThreadId { private final Object data; private final NonReentrantLock lock = new NonReentrantLock(); - private final List pendingErrors = new ArrayList<>(); + private final List pendingErrors = new ArrayList<>(); // TODO asch move to hash to avoid duplicate errors private final OnError onError; private volatile boolean destroyed;