Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Dedupe KeyedEdu and Devices federation repl traffic #2116

Merged
merged 3 commits into from
Apr 12, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,12 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
keys = self.keyed_edu_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
# We purposefully clobber based on the key here, python dict comprehensions
# always use the last value, so this will correctly point to the last
# stream position.
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}

for (pos, (destination, edu_key)) in keyed_edus:
for ((destination, edu_key), pos) in keyed_edus.iteritems():
rows.append((pos, KeyedEduRow(
key=edu_key,
edu=self.keyed_edu[(destination, edu_key)],
Expand All @@ -279,7 +282,7 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
keys = self.edus.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
edus = set((k, self.edus[k]) for k in keys[i:j])
edus = ((k, self.edus[k]) for k in keys[i:j])

for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
Expand All @@ -288,7 +291,7 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
keys = self.failures.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
failures = set((k, self.failures[k]) for k in keys[i:j])
failures = ((k, self.failures[k]) for k in keys[i:j])

for (pos, (destination, failure)) in failures:
rows.append((pos, FailureRow(
Expand All @@ -300,9 +303,9 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
keys = self.device_messages.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
device_messages = {self.device_messages[k]: k for k in keys[i:j]}

for (pos, destination) in device_messages:
for (destination, pos) in device_messages.iteritems():
rows.append((pos, DeviceRow(
destination=destination,
)))
Expand Down Expand Up @@ -380,6 +383,10 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
"key", # tuple(str) - the edu key passed to send_edu
"edu", # Edu
))):
"""Streams EDUs that have an associated key that is ued to clobber. For example,
typing EDUs clobber based on room_id.
"""

TypeId = "k"

@staticmethod
Expand All @@ -404,6 +411,8 @@ def add_to_buffer(self, buff):
class EduRow(BaseFederationRow, namedtuple("EduRow", (
"edu", # Edu
))):
"""Streams EDUs that don't have keys. See KeyedEduRow
"""
TypeId = "e"

@staticmethod
Expand All @@ -421,6 +430,11 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
"destination", # str
"failure",
))):
"""Streams failures to a remote server. Failures are issued when there was
something wrong with a transaction the remote sent us, e.g. it included
an event that was invalid.
"""

TypeId = "f"

@staticmethod
Expand All @@ -443,6 +457,10 @@ def add_to_buffer(self, buff):
class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
"destination", # str
))):
"""Streams the fact that either a) there is pending to device messages for
users on the remote, or b) a local users device has changed and needs to
be sent to the remote.
"""
TypeId = "d"

@staticmethod
Expand Down