Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Jun 24, 2024
1 parent 3c5d971 commit 8b17c78
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import java.util.HashSet;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;

public class GossipState {
public class Gossip {

/** Gossip id, local member id. */
private final UUID gossiperId;
Expand All @@ -29,7 +30,7 @@ public class GossipState {
* @param message message.
* @param infectionPeriod infectionPeriod.
*/
public GossipState(UUID gossiperId, long sequenceId, byte[] message, long infectionPeriod) {
public Gossip(UUID gossiperId, long sequenceId, byte[] message, long infectionPeriod) {
this.gossiperId = gossiperId;
this.sequenceId = sequenceId;
this.message = message;
Expand Down Expand Up @@ -63,4 +64,15 @@ public void addToInfected(UUID memberId) {
public boolean isInfected(UUID memberId) {
return infectedSet.contains(memberId);
}

@Override
public String toString() {
return new StringJoiner(", ", Gossip.class.getSimpleName() + "[", "]")
.add("gossiperId=" + gossiperId)
.add("sequenceId=" + sequenceId)
.add("message[" + message.length + "]")
.add("infectionPeriod=" + infectionPeriod)
.add("infectedSet[" + infectedSet.size() + "]")
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ public class GossipCodec extends AbstractCodec {

public GossipCodec() {}

public MutableDirectBuffer encode(GossipState gossipState) {
public MutableDirectBuffer encode(Gossip gossip) {
encodedLength = 0;

gossipEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder);
UUIDCodec.encode(gossipState.gossiperId(), gossipEncoder.gossiperId());
gossipEncoder.sequenceId(gossipState.sequenceId());
gossipEncoder.putMessage(gossipState.message(), 0, gossipState.message().length);
UUIDCodec.encode(gossip.gossiperId(), gossipEncoder.gossiperId());
gossipEncoder.sequenceId(gossip.sequenceId());
gossipEncoder.putMessage(gossip.message(), 0, gossip.message().length);

encodedLength = headerEncoder.encodedLength() + gossipEncoder.encodedLength();
return encodedBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,16 @@ public class GossipProtocol extends AbstractAgent {
private final GossipDecoder gossipDecoder = new GossipDecoder();
private final MembershipEventDecoder membershipEventDecoder = new MembershipEventDecoder();
private final GossipRequestCodec gossipRequestCodec = new GossipRequestCodec();
private final GossipCodec gossipCodec = new GossipCodec();
private final MemberCodec memberCodec = new MemberCodec();
private final UnsafeBuffer unsafeBuffer = new UnsafeBuffer();
private final String roleName;
private long currentPeriod = 0;
private long gossipCounter = 0;
private final Map<UUID, SequenceIdCollector> sequenceIdCollectors = new Object2ObjectHashMap<>();
private final Map<String, GossipState> gossips = new Object2ObjectHashMap<>();
private final Map<String, Gossip> gossips = new Object2ObjectHashMap<>();
private final List<Member> remoteMembers = new ArrayList<>();
private final List<Member> gossipMembers = new ArrayList<>();
private final List<GossipState> gossipsToSend = new ArrayList<>();
private final List<Gossip> gossipsToSend = new ArrayList<>();
private final List<String> gossipsToRemove = new ArrayList<>();

public GossipProtocol(
Expand Down Expand Up @@ -125,9 +124,9 @@ private void nextGossipsToRemove(long period) {
final int periodsToSweep =
ClusterMath.gossipPeriodsToSweep(config.gossipRepeatMult(), remoteMembers.size() + 1);

for (final GossipState gossipState : gossips.values()) {
if (period > gossipState.infectionPeriod() + periodsToSweep) {
gossipsToRemove.add(gossipState.gossipId());
for (final Gossip gossip : gossips.values()) {
if (period > gossip.infectionPeriod() + periodsToSweep) {
gossipsToRemove.add(gossip.gossipId());
}
}
}
Expand All @@ -152,12 +151,9 @@ private void spreadGossips(long period, Member member) {
final UUID from = localMember.id();

for (int i = 0, n = gossipsToSend.size(); i < n; i++) {
final GossipState gossipState = gossipsToSend.get(i);
final Gossip gossip = gossipsToSend.get(i);
transport.send(
address,
gossipRequestCodec.encode(from, gossipState),
0,
gossipRequestCodec.encodedLength());
address, gossipRequestCodec.encode(from, gossip), 0, gossipRequestCodec.encodedLength());
}
}

Expand All @@ -167,10 +163,9 @@ private void nextGossipsToSend(long period, Member member) {
final int periodsToSpread =
ClusterMath.gossipPeriodsToSpread(config.gossipRepeatMult(), remoteMembers.size() + 1);

for (final GossipState gossipState : gossips.values()) {
if (gossipState.infectionPeriod() + periodsToSpread >= period
&& !gossipState.isInfected(member.id())) {
gossipsToSend.add(gossipState);
for (final Gossip gossip : gossips.values()) {
if (gossip.infectionPeriod() + periodsToSpread >= period && !gossip.isInfected(member.id())) {
gossipsToSend.add(gossip);
}
}
}
Expand Down Expand Up @@ -202,10 +197,10 @@ private void onGossipMessage(GossipMessageDecoder decoder) {
final byte[] message = new byte[messageLength];
decoder.getMessage(message, 0, messageLength);

final GossipState gossipState = new GossipState(localMember.id(), sequenceId, message, period);
final Gossip gossip = new Gossip(localMember.id(), sequenceId, message, period);

gossips.put(gossipState.gossipId(), gossipState);
ensureSequence(localMember.id()).add(gossipState.sequenceId());
gossips.put(gossip.gossipId(), gossip);
ensureSequence(localMember.id()).add(gossip.sequenceId());
}

private void onGossipRequest(GossipRequestDecoder decoder) {
Expand All @@ -223,18 +218,18 @@ private void onGossipRequest(GossipRequestDecoder decoder) {
gossipDecoder.getMessage(message, 0, messageLength);

final String gossipId = gossiperId + "-" + sequenceId;
GossipState gossipState = gossips.get(gossipId);
Gossip gossip = gossips.get(gossipId);

if (ensureSequence(gossiperId).add(sequenceId)) {
if (gossipState == null) { // new gossip
gossipState = new GossipState(gossiperId, sequenceId, message, period);
gossips.put(gossipId, gossipState);
if (gossip == null) { // new gossip
gossip = new Gossip(gossiperId, sequenceId, message, period);
gossips.put(gossipId, gossip);
emitGossipMessage(message);
}
}

if (gossipState != null) {
gossipState.addToInfected(from);
if (gossip != null) {
gossip.addToInfected(from);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ public class GossipRequestCodec extends AbstractCodec {

public GossipRequestCodec() {}

public MutableDirectBuffer encode(UUID from, GossipState gossipState) {
public MutableDirectBuffer encode(UUID from, Gossip gossip) {
encodedLength = 0;

gossipRequestEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder);
UUIDCodec.encode(from, gossipRequestEncoder.from());
gossipRequestEncoder.putGossip(gossipCodec.encode(gossipState), 0, gossipCodec.encodedLength());
gossipRequestEncoder.putGossip(gossipCodec.encode(gossip), 0, gossipCodec.encodedLength());

encodedLength = headerEncoder.encodedLength() + gossipRequestEncoder.encodedLength();
return encodedBuffer;
Expand Down

0 comments on commit 8b17c78

Please sign in to comment.