diff --git a/cluster2/src/main/java/io/scalecube/cluster2/gossip/Gossip.java b/cluster2/src/main/java/io/scalecube/cluster2/gossip/Gossip.java index 516b4ce6..90af588c 100644 --- a/cluster2/src/main/java/io/scalecube/cluster2/gossip/Gossip.java +++ b/cluster2/src/main/java/io/scalecube/cluster2/gossip/Gossip.java @@ -37,10 +37,6 @@ public Gossip(UUID gossiperId, long sequenceId, byte[] message, long infectionPe this.infectionPeriod = infectionPeriod; } - public String gossipId() { - return gossiperId + "-" + sequenceId; - } - public UUID gossiperId() { return gossiperId; } diff --git a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java index 6b6c2402..c94ba1d1 100644 --- a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java +++ b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java @@ -15,12 +15,13 @@ import io.scalecube.cluster2.sbe.MessageHeaderDecoder; import java.time.Duration; import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; import java.util.function.Supplier; import org.agrona.MutableDirectBuffer; +import org.agrona.collections.ArrayListUtil; +import org.agrona.collections.IntArrayList; import org.agrona.collections.Object2ObjectHashMap; import org.agrona.concurrent.EpochClock; import org.agrona.concurrent.UnsafeBuffer; @@ -44,11 +45,11 @@ public class GossipProtocol extends AbstractAgent { private long currentPeriod = 0; private long gossipCounter = 0; private final Map sequenceIdCollectors = new Object2ObjectHashMap<>(); - private final Map gossips = new Object2ObjectHashMap<>(); - private final List remoteMembers = new ArrayList<>(); - private final List gossipMembers = new ArrayList<>(); - private final List gossipsToSend = new ArrayList<>(); - private final List gossipsToRemove = new ArrayList<>(); + private final ArrayList gossips = new ArrayList<>(); + private final ArrayList remoteMembers = new ArrayList<>(); + private final ArrayList gossipMembers = new ArrayList<>(); + private final IntArrayList gossipsToSend = new IntArrayList(); + private final IntArrayList gossipsToRemove = new IntArrayList(); public GossipProtocol( Transport transport, @@ -87,16 +88,19 @@ protected void onTick() { nextGossipMembers(); - for (int i = 0, n = gossipMembers.size(); i < n; i++) { - spreadGossips(period, gossipMembers.get(i)); + for (int n = gossipMembers.size() - 1, i = n; i >= 0; i--) { + final Member member = gossipMembers.get(i); + ArrayListUtil.fastUnorderedRemove(gossipMembers, i); + spreadGossips(period, member); } // Sweep gossips nextGossipsToRemove(period); - for (int i = 0, n = gossipsToRemove.size(); i < n; i++) { - gossips.remove(gossipsToRemove.get(i)); + for (int n = gossipsToRemove.size() - 1, i = n; i >= 0; i--) { + final int index = gossipsToRemove.fastUnorderedRemove(i); + ArrayListUtil.fastUnorderedRemove(gossips, index); } } @@ -124,9 +128,10 @@ private void nextGossipsToRemove(long period) { final int periodsToSweep = ClusterMath.gossipPeriodsToSweep(config.gossipRepeatMult(), remoteMembers.size() + 1); - for (final Gossip gossip : gossips.values()) { + for (int i = 0, n = gossips.size(); i < n; i++) { + final Gossip gossip = gossips.get(i); if (period > gossip.infectionPeriod() + periodsToSweep) { - gossipsToRemove.add(gossip.gossipId()); + gossipsToRemove.addInt(i); } } } @@ -150,8 +155,8 @@ private void spreadGossips(long period, Member member) { final String address = member.address(); final UUID from = localMember.id(); - for (int i = 0, n = gossipsToSend.size(); i < n; i++) { - final Gossip gossip = gossipsToSend.get(i); + for (int n = gossipsToSend.size() - 1, i = n; i >= 0; i--) { + final Gossip gossip = gossips.get(gossipsToSend.fastUnorderedRemove(i)); transport.send( address, gossipRequestCodec.encode(from, gossip), 0, gossipRequestCodec.encodedLength()); } @@ -163,9 +168,10 @@ private void nextGossipsToSend(long period, Member member) { final int periodsToSpread = ClusterMath.gossipPeriodsToSpread(config.gossipRepeatMult(), remoteMembers.size() + 1); - for (final Gossip gossip : gossips.values()) { + for (int i = 0, n = gossips.size(); i < n; i++) { + final Gossip gossip = gossips.get(i); if (gossip.infectionPeriod() + periodsToSpread >= period && !gossip.isInfected(member.id())) { - gossipsToSend.add(gossip); + gossipsToSend.addInt(i); } } } @@ -197,10 +203,8 @@ private void onGossipMessage(GossipMessageDecoder decoder) { final byte[] message = new byte[messageLength]; decoder.getMessage(message, 0, messageLength); - final Gossip gossip = new Gossip(localMember.id(), sequenceId, message, period); - - gossips.put(gossip.gossipId(), gossip); - ensureSequence(localMember.id()).add(gossip.sequenceId()); + gossips.add(new Gossip(localMember.id(), sequenceId, message, period)); + ensureSequence(localMember.id()).add(sequenceId); } private void onGossipRequest(GossipRequestDecoder decoder) { @@ -217,13 +221,12 @@ private void onGossipRequest(GossipRequestDecoder decoder) { final byte[] message = new byte[messageLength]; gossipDecoder.getMessage(message, 0, messageLength); - final String gossipId = gossiperId + "-" + sequenceId; - Gossip gossip = gossips.get(gossipId); + Gossip gossip = getGossip(gossiperId, sequenceId); if (ensureSequence(gossiperId).add(sequenceId)) { if (gossip == null) { // new gossip gossip = new Gossip(gossiperId, sequenceId, message, period); - gossips.put(gossipId, gossip); + gossips.add(gossip); emitGossipMessage(message); } } @@ -233,6 +236,18 @@ private void onGossipRequest(GossipRequestDecoder decoder) { } } + private Gossip getGossip(UUID gossiperId, long sequenceId) { + Gossip result = null; + for (int n = gossips.size(), i = 0; i < n; i++) { + final Gossip gossip = gossips.get(i); + if (gossip.gossiperId() == gossiperId && gossip.sequenceId() == sequenceId) { + result = gossip; + break; + } + } + return result; + } + private void emitGossipMessage(byte[] message) { unsafeBuffer.wrap(message); messageTx.transmit(1, unsafeBuffer, 0, unsafeBuffer.capacity());