Skip to content

Commit

Permalink
Added hop depth to CommunityDestination
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink committed Jun 13, 2017
1 parent 5b0b9b4 commit cf3e303
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 26 deletions.
23 changes: 22 additions & 1 deletion community.py
Original file line number Diff line number Diff line change
Expand Up @@ -2430,7 +2430,28 @@ def on_signature_response(self, messages):
old_body = old_submsg.packet[:len(old_submsg.packet) - sum([member.signature_length for member in old_submsg.authentication.members])]
new_body = new_submsg.packet[:len(new_submsg.packet) - sum([member.signature_length for member in new_submsg.authentication.members])]

result = cache.response_func(old_submsg, new_submsg, old_body != new_body, *cache.response_args)
changed = old_body != new_body

# A CommunityDestination is allowed to have one unsigned changed field: the hop count.
# This hop count has the restriction that it must be 1 less in the new message than
# in the old message.
if changed and isinstance(message.payload.message.meta.destination, CommunityDestination):
new_body_len = len(new_body)
# Create a list of differing indices
diffs = [i for i in xrange(len(old_body)) if (i < new_body_len) and (old_body[i] != new_body[i])]
# These indices may not exist if new_body and old_body are not of the same size
if diffs:
start_diff = min(diffs)
end_diff = max(diffs)
# We can have exactly a 1 byte difference (start == end)
if start_diff == end_diff:
import struct
i_old = struct.unpack_from("!b", old_body, start_diff)[0]
i_new = struct.unpack_from("!b", new_body, start_diff)[0]
# If this one byte is note 1 less than the new packet, it has changed.
changed = (i_new != (i_old - 1))

result = cache.response_func(old_submsg, new_submsg, changed, *cache.response_args)
assert isinstance(result, bool), "RESPONSE_FUNC must return a boolean value! True to accept the proposed message, False to reject %s %s" % (type(cache), str(cache.response_func))
if result:
# add our own signatures and we can handle the message
Expand Down
34 changes: 27 additions & 7 deletions conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,14 @@ def __init__(self, candidate, meta, offset, data, verify, allow_empty_signature)
self.payload = None

class EncodeFunctions(object):
__slots__ = ["byte", "authentication", "resolution", "distribution", "payload"]
__slots__ = ["byte", "authentication", "resolution", "distribution", "destination", "payload"]

def __init__(self, byte, authentication, resolution, distribution, payload):
def __init__(self, byte, authentication, resolution, distribution, destination, payload):
self.byte = byte
self.authentication = authentication
self.resolution = resolution
self.distribution = distribution
self.destination = destination
self.payload = payload

class DecodeFunctions(object):
Expand Down Expand Up @@ -235,9 +236,12 @@ def define_meta_message(self, byte, meta, encode_payload_func, decode_payload_fu

FullSyncDistribution: self._encode_full_sync_distribution,
LastSyncDistribution: self._encode_last_sync_distribution,
DirectDistribution: self._encode_direct_distribution}
DirectDistribution: self._encode_direct_distribution,

CandidateDestination: self._encode_candidate_destination,
CommunityDestination: self._encode_community_destination}

self._encode_message_map[meta.name] = self.EncodeFunctions(byte, mapping[type(meta.authentication)], mapping[type(meta.resolution)], mapping[type(meta.distribution)], encode_payload_func)
self._encode_message_map[meta.name] = self.EncodeFunctions(byte, mapping[type(meta.authentication)], mapping[type(meta.resolution)], mapping[type(meta.distribution)], mapping[type(meta.destination)], encode_payload_func)

mapping = {MemberAuthentication: self._decode_member_authentication,
DoubleMemberAuthentication: self._decode_double_member_authentication,
Expand All @@ -251,8 +255,8 @@ def define_meta_message(self, byte, meta, encode_payload_func, decode_payload_fu
FullSyncDistribution: self._decode_full_sync_distribution,
LastSyncDistribution: self._decode_last_sync_distribution,

CandidateDestination: self._decode_empty_destination,
CommunityDestination: self._decode_empty_destination}
CandidateDestination: self._decode_candidate_destination,
CommunityDestination: self._decode_community_destination}

self._decode_message_map[byte] = self.DecodeFunctions(meta, mapping[type(meta.authentication)], mapping[type(meta.resolution)], mapping[type(meta.distribution)], mapping[type(meta.destination)], decode_payload_func)

Expand Down Expand Up @@ -967,6 +971,12 @@ def _encode_dynamic_resolution(self, container, message):
container.append(chr(index))
# both the public and the linear resolution do not require any storage

def _encode_candidate_destination(self, container, message):
pass

def _encode_community_destination(self, container, message):
container.append(pack("!b", message.destination.depth))

def can_encode_message(self, message):
"""
Returns True when MESSAGE can be encoded using this conversion.
Expand All @@ -989,6 +999,9 @@ def encode_message(self, message, sign=True):
# resolution
encode_functions.resolution(container, message)

# destination
encode_functions.destination(container, message)

# distribution
encode_functions.distribution(container, message)

Expand Down Expand Up @@ -1157,9 +1170,16 @@ def _decode_double_member_authentication(self, placeholder):
placeholder.authentication = DoubleMemberAuthentication.Implementation(placeholder.meta.authentication, members,
signatures=signatures)

def _decode_empty_destination(self, placeholder):
def _decode_candidate_destination(self, placeholder):
placeholder.destination = placeholder.meta.destination.Implementation(placeholder.meta.destination)

def _decode_community_destination(self, placeholder):
depth, = unpack_from("!b", placeholder.data, placeholder.offset)
placeholder.offset += 1
new_depth = depth - 1 if depth > 0 else depth
placeholder.destination = placeholder.meta.destination.Implementation(placeholder.meta.destination,
depth=new_depth)

def can_decode_message(self, data):
"""
Returns True when DATA can be decoded using this conversion.
Expand Down
21 changes: 19 additions & 2 deletions destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class CommunityDestination(Destination):
"""
class Implementation(Destination.Implementation):

def __init__(self, meta, *candidates):
def __init__(self, meta, *candidates, **kwargs):
"""
Construct a CandidateDestination.Implementation object.
Expand All @@ -71,29 +71,46 @@ def __init__(self, meta, *candidates):
super(CommunityDestination.Implementation, self).__init__(meta)
self._candidates = candidates

if 'depth' in kwargs:
self._depth = kwargs['depth']
else:
self._depth = meta.depth

@property
def node_count(self):
return self._meta._node_count

@property
def depth(self):
return self._depth

@property
def candidates(self):
return self._candidates

def __init__(self, node_count):
def __init__(self, node_count, depth=-1):
"""
Construct a CommunityDestination object.
NODE_COUNT is an integer giving the number of nodes where, when the message is created, the
message must be sent to. These nodes are selected using the
community.yield_random_candidates(...) method. NODE_COUNT must be zero or higher.
DEPTH is an integer in [0, 127] v -1, this determines the _remaining_ hop count for this message. If
DEPTH is equal to -1, no hop depth will be used.
"""
assert isinstance(node_count, int)
assert node_count >= 0
self._node_count = node_count
self._depth = depth

@property
def node_count(self):
return self._node_count

@property
def depth(self):
return self._depth

def __str__(self):
return "<%s node_count:%d>" % (self.__class__.__name__, self._node_count)
3 changes: 3 additions & 0 deletions dispersy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,9 @@ def _forward(self, messages):
meta = messages[0].meta
if isinstance(meta.destination, (CommunityDestination, CandidateDestination)):
for message in messages:
# Don't forward messages with a 0 TTL
if isinstance(meta.destination, CommunityDestination) and message.destination.depth == 0:
continue
# CandidateDestination.candidates may be empty
candidates = set(message.destination.candidates)
# CommunityDestination.node_count is allowed to be zero
Expand Down
15 changes: 12 additions & 3 deletions tests/debugcommunity/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,23 @@ def initiate_meta_messages(self):
DoubleMemberAuthentication(allow_signature_func=self.allow_double_signed_text),
PublicResolution(),
LastSyncDistribution(synchronization_direction=u"ASC", priority=128, history_size=1),
CommunityDestination(node_count=10),
CommunityDestination(node_count=10, depth=42),
TextPayload(),
self._generic_timeline_check,
self.on_text),
Message(self, u"double-signed-text",
DoubleMemberAuthentication(allow_signature_func=self.allow_double_signed_text),
PublicResolution(),
DirectDistribution(),
CommunityDestination(node_count=10),
CommunityDestination(node_count=10, depth=42),
TextPayload(),
self._generic_timeline_check,
self.on_text),
Message(self, u"double-signed-text-split",
DoubleMemberAuthentication(allow_signature_func=self.allow_double_signed_text, split_payload_func=self.split_double_payload),
PublicResolution(),
DirectDistribution(),
CommunityDestination(node_count=10),
CommunityDestination(node_count=10, depth=42),
TextPayload(),
self._generic_timeline_check,
self.on_text),
Expand All @@ -85,6 +85,15 @@ def initiate_meta_messages(self):
self._generic_timeline_check,
self.on_text,
self.undo_text),
Message(self, u"n-hop-sync-text",
MemberAuthentication(),
PublicResolution(),
FullSyncDistribution(enable_sequence_number=False, synchronization_direction=u"ASC", priority=128),
CommunityDestination(node_count=1, depth=1),
TextPayload(),
self._generic_timeline_check,
self.on_text,
self.undo_text),
Message(self, u"bin-key-text",
MemberAuthentication(encoding="bin"),
PublicResolution(),
Expand Down
27 changes: 14 additions & 13 deletions tests/debugcommunity/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ def __init__(self, community, version="\x01"):
self.define_meta_message(chr(103), community.get_meta_message(u"double-signed-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(104), community.get_meta_message(u"double-signed-text-split"), self._encode_text, self._decode_text)
self.define_meta_message(chr(105), community.get_meta_message(u"full-sync-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(106), community.get_meta_message(u"ASC-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(107), community.get_meta_message(u"DESC-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(108), community.get_meta_message(u"last-1-doublemember-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(109), community.get_meta_message(u"protected-full-sync-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(110), community.get_meta_message(u"dynamic-resolution-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(111), community.get_meta_message(u"sequence-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(112), community.get_meta_message(u"full-sync-global-time-pruning-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(113), community.get_meta_message(u"high-priority-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(114), community.get_meta_message(u"low-priority-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(115), community.get_meta_message(u"medium-priority-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(116), community.get_meta_message(u"RANDOM-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(117), community.get_meta_message(u"batched-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(118), community.get_meta_message(u"bin-key-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(106), community.get_meta_message(u"n-hop-sync-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(107), community.get_meta_message(u"ASC-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(108), community.get_meta_message(u"DESC-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(109), community.get_meta_message(u"last-1-doublemember-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(110), community.get_meta_message(u"protected-full-sync-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(111), community.get_meta_message(u"dynamic-resolution-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(112), community.get_meta_message(u"sequence-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(113), community.get_meta_message(u"full-sync-global-time-pruning-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(114), community.get_meta_message(u"high-priority-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(115), community.get_meta_message(u"low-priority-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(116), community.get_meta_message(u"medium-priority-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(117), community.get_meta_message(u"RANDOM-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(118), community.get_meta_message(u"batched-text"), self._encode_text, self._decode_text)
self.define_meta_message(chr(119), community.get_meta_message(u"bin-key-text"), self._encode_text, self._decode_text)

def _encode_text(self, message):
"""
Expand Down
62 changes: 62 additions & 0 deletions tests/test_neighborhood.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,65 @@ def dispersy_yield_verified_candidates():

# We should never send to more than node_count + targeted_node_count nodes
self.assertEqual(forwarded_node_count, min(total_node_count, meta.destination.node_count + targeted_node_count))

def test_forward_hop_timeout(self):
"""
Test an n-hop-sync timeout.
This test uses a CommunityDestination of fan out 1 and depth 1.
In other words only 1 candidate should receive this message.
"""
other, another = self.clean_community_candidates(2)

# Create the message
meta = self._community.get_meta_message(u"n-hop-sync-text")
message = meta.impl(authentication=(self._mm.my_member,),
distribution=(42,),
payload=("Hello World!",))

# Forward the message
self._mm.community.dispersy._forward([message, ])

# Check if we did not send to ourselves
receive0 = list(self._mm.receive_message(names=[u"n-hop-sync-text", ], timeout=1.0))
self.assertEqual(receive0, [])
# Check if this arrived at a single other node
receive1 = list(other.receive_message(names=[u"n-hop-sync-text", ], timeout=1.0))
receive2 = list(another.receive_message(names=[u"n-hop-sync-text", ], timeout=1.0))
received = receive1 + receive2
_, message = received[0]

self.assertEqual(message.destination.depth, 0)

# Forward the message
self._mm.community.dispersy._forward([message, ])

# Check if this message has indeed not been forwarded
receive0 = list(self._mm.receive_message(names=[u"n-hop-sync-text", ]))
receive1 = list(other.receive_message(names=[u"n-hop-sync-text", ]))
receive2 = list(another.receive_message(names=[u"n-hop-sync-text", ]))

# self._mm has the packet in his database
# But only one other node should receive this message
self.assertEqual(receive0, [])
self.assertEqual(receive1, [])
self.assertEqual(receive2, [])

def clean_community_candidates(self, node_count):
"""
Add certain nodes to the community and disallow walking to other nodes.
:param nodes: the DebugNodes which should be a part of the community
"""
# Disallow community walking
self._community.cancel_pending_task("take step")
self._community.candidates.clear()

# Create the nodes
nodes = self.create_nodes(node_count)
for node in nodes:
self._mm.send_identity(node)
node.process_packets()
self._mm.process_packets()

return nodes

0 comments on commit cf3e303

Please sign in to comment.