diff --git a/cluster2/src/main/java/io/scalecube/cluster2/MemberCodec.java b/cluster2/src/main/java/io/scalecube/cluster2/MemberCodec.java index bed0b02f..542cd781 100644 --- a/cluster2/src/main/java/io/scalecube/cluster2/MemberCodec.java +++ b/cluster2/src/main/java/io/scalecube/cluster2/MemberCodec.java @@ -46,6 +46,7 @@ private MutableDirectBuffer encode(Consumer consumer) { public Member member(Consumer consumer) { consumer.accept(unsafeBuffer); + memberDecoder.wrapAndApplyHeader(unsafeBuffer, 0, headerDecoder); final UUID id = uuid(memberDecoder.id()); diff --git a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipCodec.java b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipCodec.java index 4dee1fd9..ca669341 100644 --- a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipCodec.java +++ b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipCodec.java @@ -1,20 +1,55 @@ package io.scalecube.cluster2.gossip; +import static io.scalecube.cluster2.UUIDCodec.uuid; + import io.scalecube.cluster2.AbstractCodec; +import io.scalecube.cluster2.UUIDCodec; +import io.scalecube.cluster2.sbe.GossipDecoder; +import io.scalecube.cluster2.sbe.GossipEncoder; +import java.util.UUID; import java.util.function.Consumer; +import org.agrona.MutableDirectBuffer; import org.agrona.concurrent.UnsafeBuffer; public class GossipCodec extends AbstractCodec { + private final GossipEncoder gossipEncoder = new GossipEncoder(); + private final GossipDecoder gossipDecoder = new GossipDecoder(); + public GossipCodec() {} + // Encode + + public MutableDirectBuffer encode(Gossip gossip) { + encodedLength = 0; + + gossipEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder); + UUIDCodec.encode(gossip.gossiperId(), gossipEncoder.gossiperId()); + gossipEncoder.sequenceId(gossip.sequenceId()); + gossipEncoder.putMessage(gossip.message(), 0, gossip.message().length); + + encodedLength = headerEncoder.encodedLength() + gossipEncoder.encodedLength(); + return encodedBuffer; + } + // Decode public Gossip gossip(Consumer consumer) { consumer.accept(unsafeBuffer); - // TODO + gossipDecoder.wrapAndApplyHeader(unsafeBuffer, 0, headerDecoder); + + final UUID gossiperId = uuid(gossipDecoder.gossiperId()); + if (gossiperId == null) { + return null; + } + + final long sequenceId = gossipDecoder.sequenceId(); + + final int messageLength = gossipDecoder.messageLength(); + final byte[] message = new byte[messageLength]; + gossipDecoder.getMessage(message, 0, messageLength); - return new Gossip(null, null, 1); + return new Gossip(gossiperId, sequenceId, message); } } diff --git a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java index 87141a52..19057d4e 100644 --- a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java +++ b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java @@ -9,7 +9,6 @@ import io.scalecube.cluster2.MemberCodec; import io.scalecube.cluster2.sbe.GossipMessageDecoder; import io.scalecube.cluster2.sbe.GossipRequestDecoder; -import io.scalecube.cluster2.sbe.GossipRequestDecoder.GossipsDecoder; import io.scalecube.cluster2.sbe.MembershipEventDecoder; import io.scalecube.cluster2.sbe.MembershipEventType; import io.scalecube.cluster2.sbe.MessageHeaderDecoder; @@ -36,7 +35,8 @@ public class GossipProtocol extends AbstractAgent { private final GossipMessageDecoder gossipMessageDecoder = new GossipMessageDecoder(); private final GossipRequestDecoder gossipRequestDecoder = new GossipRequestDecoder(); private final MembershipEventDecoder membershipEventDecoder = new MembershipEventDecoder(); - private final GossipCodec codec = new GossipCodec(); + private final GossipRequestCodec gossipRequestCodec = new GossipRequestCodec(); + private final GossipCodec gossipCodec = new GossipCodec(); private final MemberCodec memberCodec = new MemberCodec(); private final UnsafeBuffer unsafeBuffer = new UnsafeBuffer(); private final String roleName; @@ -46,6 +46,7 @@ public class GossipProtocol extends AbstractAgent { private final Map gossips = new Object2ObjectHashMap<>(); private final List remoteMembers = new ArrayList<>(); private final List gossipMembers = new ArrayList<>(); + private final List gossipsToSend = new ArrayList<>(); private final List gossipsToRemove = new ArrayList<>(); public GossipProtocol( @@ -86,7 +87,7 @@ protected void onTick() { nextGossipMembers(); for (int i = 0, n = gossipMembers.size(); i < n; i++) { - spreadGossipsTo(period, gossipMembers.get(i)); + spreadGossips(period, gossipMembers.get(i)); } // Sweep gossips @@ -142,6 +143,33 @@ private void checkGossipSegmentation() { } } + private void spreadGossips(long period, Member member) { + nextGossipsToSend(period, member); + + final String address = member.address(); + final UUID from = localMember.id(); + + for (int i = 0, n = gossipsToSend.size(); i < n; i++) { + final Gossip gossip = gossipsToSend.get(i); + transport.send( + address, gossipRequestCodec.encode(from, gossip), 0, gossipRequestCodec.encodedLength()); + } + } + + private void nextGossipsToSend(long period, Member member) { + gossipsToSend.clear(); + + final int periodsToSpread = + ClusterMath.gossipPeriodsToSpread(config.gossipRepeatMult(), remoteMembers.size() + 1); + + for (final GossipState gossipState : gossips.values()) { + if (gossipState.infectionPeriod() + periodsToSpread >= period + && !gossipState.isInfected(member.id())) { + gossipsToSend.add(gossipState.gossip()); + } + } + } + @Override public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length) { headerDecoder.wrap(buffer, index); @@ -180,20 +208,20 @@ private void onGossipRequest(GossipRequestDecoder decoder) { final long period = currentPeriod; final UUID from = uuid(decoder.from()); - for (GossipsDecoder gossipsDecoder : decoder.gossips()) { - final Gossip gossip = codec.gossip(gossipsDecoder::wrapGossip); - GossipState gossipState = gossips.get(gossip.gossipId()); - if (ensureSequence(gossip.gossiperId()).add(gossip.sequenceId())) { - if (gossipState == null) { // new gossip - gossipState = new GossipState(gossip, period); - gossips.put(gossip.gossipId(), gossipState); - emitGossipMessage(gossip.message()); - } - } - if (gossipState != null) { - gossipState.addToInfected(from); + final Gossip gossip = gossipCodec.gossip(decoder::wrapGossip); + GossipState gossipState = gossips.get(gossip.gossipId()); + + if (ensureSequence(gossip.gossiperId()).add(gossip.sequenceId())) { + if (gossipState == null) { // new gossip + gossipState = new GossipState(gossip, period); + gossips.put(gossip.gossipId(), gossipState); + emitGossipMessage(gossip.message()); } } + + if (gossipState != null) { + gossipState.addToInfected(from); + } } private void emitGossipMessage(byte[] message) { diff --git a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipRequestCodec.java b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipRequestCodec.java new file mode 100644 index 00000000..bce28d49 --- /dev/null +++ b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipRequestCodec.java @@ -0,0 +1,26 @@ +package io.scalecube.cluster2.gossip; + +import io.scalecube.cluster2.AbstractCodec; +import io.scalecube.cluster2.UUIDCodec; +import io.scalecube.cluster2.sbe.GossipRequestEncoder; +import java.util.UUID; +import org.agrona.MutableDirectBuffer; + +public class GossipRequestCodec extends AbstractCodec { + + private final GossipRequestEncoder gossipRequestEncoder = new GossipRequestEncoder(); + private final GossipCodec gossipCodec = new GossipCodec(); + + public GossipRequestCodec() {} + + public MutableDirectBuffer encode(UUID from, Gossip gossip) { + encodedLength = 0; + + gossipRequestEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder); + UUIDCodec.encode(from, gossipRequestEncoder.from()); + gossipRequestEncoder.putGossip(gossipCodec.encode(gossip), 0, gossipCodec.encodedLength()); + + encodedLength = headerEncoder.encodedLength() + gossipRequestEncoder.encodedLength(); + return encodedBuffer; + } +} diff --git a/cluster2/src/main/resources/scalecube-cluster-codecs.xml b/cluster2/src/main/resources/scalecube-cluster-codecs.xml index 62fd6c47..2f47944d 100644 --- a/cluster2/src/main/resources/scalecube-cluster-codecs.xml +++ b/cluster2/src/main/resources/scalecube-cluster-codecs.xml @@ -103,9 +103,7 @@ - - - +