Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignite 16183 #5161

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3776,6 +3776,7 @@ private NodeOptions createNodeOptions(int nodeIdx) {
log.start();

options.setServiceFactory(new DefaultJRaftServiceFactory(log));
options.setNodeManager(new NodeManager());

return options;
}
Expand Down Expand Up @@ -3864,8 +3865,6 @@ private RaftGroupService createService(String groupId, PeerId peerId, NodeOption
.map(JRaftUtils::addressFromEndpoint)
.collect(toList());

var nodeManager = new NodeManager();

ClusterService clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
peerId.getEndpoint().getPort(),
Expand All @@ -3876,13 +3875,13 @@ private RaftGroupService createService(String groupId, PeerId peerId, NodeOption

executors.add(requestExecutor);

IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions, requestExecutor);
IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, nodeOptions, requestExecutor);

nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));

clusterService.start();

var service = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager) {
var service = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer) {
@Override public synchronized void shutdown() {
rpcServer.shutdown();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -53,6 +58,7 @@
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.thread.NamedThreadFactory;
Expand All @@ -66,14 +72,27 @@
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.ReplicatorGroup;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.core.ElectionPriority;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.CoalescedHeartbeatRequestBuilder;
import org.apache.ignite.raft.jraft.rpc.InvokeCallback;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.CoalescedHeartbeatResponse;
import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor.PeerPair;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -694,7 +713,7 @@ public void testTimerThreadsCount() {
for (int i = 0; i < groupsCnt; i++) {
String grp = "counter" + i;

assertTrue(waitForCondition(() -> hasLeader(grp), 30_000));
assertTrue(waitForCondition(() -> getLeader(grp) != null, 30_000));
}

Set<Thread> threads = Thread.getAllStackTraces().keySet();
Expand All @@ -707,6 +726,117 @@ public void testTimerThreadsCount() {
"All timer threads: " + timerThreads.toString());
}

/** Tests if a starting a new group in shared pools mode doesn't increases timer threads count. */
@Test
public void testTimerThreadsCount2() throws Exception {
JraftServerImpl srv0 = startServer(0, x -> {}, opts -> {});
JraftServerImpl srv1 = startServer(1, x -> {}, opts -> opts.setElectionPriority(ElectionPriority.NotElected));
JraftServerImpl srv2 = startServer(2, x -> {}, opts -> opts.setElectionPriority(ElectionPriority.NotElected));

waitForTopology(srv0.clusterService(), 3, 5_000);

final int grps = 2;

for (int i = 0; i < grps; i++) {
String grp = "testGrp" + i;
srv0.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
srv1.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
srv2.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
}

// Allow the parallel start.
for (int i = 0; i < grps; i++) {
final String grp = "testGrp" + i;
assertTrue(waitForCondition(() -> getLeader(grp) != null, 30_000));
}

final String grp0 = "testGrp0";

// Move committed index for a first group.
RaftGroupService service = startClient(grp0);
service.refreshLeader().get();
service.run(new IncrementAndGetCommand(1)).get();

CountDownLatch l = new CountDownLatch(4);

for (int i = 0; i < grps; i++) {
final String grp = "testGrp" + i;

NodeImpl leader = getLeader(grp);
ReplicatorGroup replicatorGroup = IgniteTestUtils.getFieldValue(leader, "replicatorGroup");
List<PeerId> peers = leader.getCurrentConf().getPeers();

for (PeerId peer : peers) {
if (peer.equals(leader.getServerId()))
continue;

replicatorGroup.sendHeartbeat(peer, new RpcResponseClosureAdapter<AppendEntriesResponse>() {
@Override
public void run(Status status) {
AppendEntriesResponse response = getResponse();

assertNotNull(response);

l.countDown();
}
});
}
}

NodeImpl leader = getLeader(grp0);
ConcurrentMap<PeerPair, Queue<Object[]>> coalesced = leader.getOptions().getNodeManager().getCoalesced();

assertEquals(grps, coalesced.size());

IgniteRpcClient sender = new IgniteRpcClient(srv0.clusterService());

for (PeerPair peerPair : coalesced.keySet()) {
coalesced.compute(peerPair, new BiFunction<PeerPair, Queue<Object[]>, Queue<Object[]>>() {
@Override
public Queue<Object[]> apply(PeerPair peerPair, Queue<Object[]> queue) {
if (!queue.isEmpty()) {
CoalescedHeartbeatRequestBuilder builder = leader.getOptions().getRaftMessagesFactory().coalescedHeartbeatRequest();
ArrayList<AppendEntriesRequest> list = new ArrayList<>();
builder.messages(list);

Object[] req;

List<CompletableFuture<Message>> futs = new ArrayList<>();

while((req = queue.poll()) != null) {
builder.messages().add((AppendEntriesRequest) req[0]);

futs.add((CompletableFuture<Message>) req[1]);
}

// TODO asch store parsed.
PeerId peerId = PeerId.parsePeer(peerPair.remote);

sender.invokeAsync(peerId.getEndpoint(), builder.build(), null, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
CoalescedHeartbeatResponse resp = (CoalescedHeartbeatResponse) result;

assert resp.messages().size() == futs.size();

// TODO asch complete futures in parallel
int i = 0;
for (Message message : resp.messages()) {
futs.get(i++).complete(message); // Future completion will trigger callbacks.
}
}
}, 5_000);
}

// TODO asch return null
return queue;
}
});
}

assertTrue(l.await(5_000, TimeUnit.MILLISECONDS));
}

/**
* Returns {@code true} if thread is related to timers.
*
Expand All @@ -724,16 +854,12 @@ private boolean isTimer(Thread thread) {
* Returns {@code true} if a raft group has elected a leader for a some term.
*
* @param grpId Group id.
* @return {@code True} if a leader is elected.
* @return The leader or null.
*/
private boolean hasLeader(String grpId) {
return servers.stream().anyMatch(s -> {
NodeImpl node = (NodeImpl) s.raftGroupService(grpId).getRaftNode();

StateMachineAdapter fsm = (StateMachineAdapter) node.getOptions().getFsm();

return node.isLeader() && fsm.getLeaderTerm() == node.getCurrentTerm();
});
private @Nullable NodeImpl getLeader(String grpId) {
return servers.stream().map(s -> (NodeImpl) s.raftGroupService(grpId).getRaftNode()).filter(node -> {
return node.isLeader();
}).findFirst().orElse(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ public class JraftServerImpl implements RaftServer {
/** Started groups. */
private ConcurrentMap<String, RaftGroupService> groups = new ConcurrentHashMap<>();

/** Node manager. */
private final NodeManager nodeManager;

/** Options. */
private final NodeOptions opts;

Expand All @@ -123,7 +120,6 @@ public JraftServerImpl(ClusterService service, Path dataPath) {
public JraftServerImpl(ClusterService service, Path dataPath, NodeOptions opts) {
this.service = service;
this.dataPath = dataPath;
this.nodeManager = new NodeManager();
this.logStorageFactory = new DefaultLogStorageFactory(dataPath.resolve("log"));
this.opts = opts;

Expand All @@ -132,6 +128,7 @@ public JraftServerImpl(ClusterService service, Path dataPath, NodeOptions opts)
this.opts.setRpcDefaultTimeout(this.opts.getElectionTimeoutMs() / 2);
this.opts.setSharedPools(true);
this.opts.setServiceFactory(new DefaultJRaftServiceFactory(logStorageFactory));
this.opts.setNodeManager(new NodeManager());

if (opts.getServerName() == null) {
this.opts.setServerName(service.localConfiguration().getName());
Expand Down Expand Up @@ -192,7 +189,7 @@ public void start() {

rpcServer = new IgniteRpcServer(
service,
nodeManager,
opts.getNodeManager(),
opts.getRaftMessagesFactory(),
requestExecutor
);
Expand Down Expand Up @@ -360,7 +357,7 @@ public synchronized boolean startRaftGroup(String groupId, @NotNull RaftGroupEve

var peerId = new PeerId(addr.host(), addr.port(), 0, ElectionPriority.DISABLED);

var server = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager);
var server = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer);

server.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,23 @@
*/
package org.apache.ignite.raft.jraft;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesResponse;
import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor.PeerPair;
import org.apache.ignite.raft.jraft.util.OnlyForTest;

/**
Expand All @@ -32,6 +41,7 @@
public class NodeManager {
private final ConcurrentMap<NodeId, Node> nodeMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, List<Node>> groupMap = new ConcurrentHashMap<>();
private final ConcurrentMap<PeerPair, Queue<Object[]>> coalesced = new ConcurrentHashMap<>();

/**
* Adds a node.
Expand Down Expand Up @@ -96,4 +106,16 @@ public List<Node> getNodesByGroupId(final String groupId) {
public List<Node> getAllNodes() {
return this.groupMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
}

public CompletableFuture<Message> enqueue(AppendEntriesRequest request) {
CompletableFuture<Message> fut = new CompletableFuture<>();

boolean added = coalesced.computeIfAbsent(new PeerPair(request.serverId(), request.peerId()), k -> new ConcurrentLinkedQueue<>()).add(new Object[]{request, fut});

return fut;
}

public ConcurrentMap<PeerPair, Queue<Object[]>> getCoalesced() {
return coalesced;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,26 +60,19 @@ public class RaftGroupService {
*/
private Node node;

/**
* The node manager.
*/
private NodeManager nodeManager;

/**
* @param groupId Group Id.
* @param serverId Server id.
* @param nodeOptions Node options.
* @param rpcServer RPC server.
* @param nodeManager Node manager.
*/
public RaftGroupService(final String groupId, final PeerId serverId, final NodeOptions nodeOptions,
final RpcServer rpcServer, final NodeManager nodeManager) {
final RpcServer rpcServer) {
super();
this.groupId = groupId;
this.serverId = serverId;
this.nodeOptions = nodeOptions;
this.rpcServer = rpcServer;
this.nodeManager = nodeManager;
}

public synchronized Node getRaftNode() {
Expand Down Expand Up @@ -119,7 +112,7 @@ public synchronized Node start() {
throw new IgniteInternalException("Fail to init node, please see the logs to find the reason.");
}

this.nodeManager.add(this.node);
this.nodeOptions.getNodeManager().add(this.node);
this.started = true;
LOG.info("Start the RaftGroupService successfully {}", this.node.getNodeId());
return this.node;
Expand All @@ -139,7 +132,7 @@ public synchronized void shutdown() {
LOG.error("Interrupted while waiting for the node to shutdown");
}

nodeManager.remove(this.node);
this.nodeOptions.getNodeManager().remove(this.node);
this.started = false;
LOG.info("Stop the RaftGroupService successfully.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ public static final class RpcRequestsMessageGroup {

/** */
public static final short SM_ERROR_RESPONSE = 3014;

/** */
public static final short COALESCED_HEARTBEAT_REQUEST = 3015;

/** */
public static final short COALESCED_HEARTBEAT_RESPONSE = 3016;
}

/**
Expand Down
Loading