Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Speed up processing of federation stream RDATA rows. #7584

Merged
merged 5 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7584.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up processing of federation stream RDATA rows.
19 changes: 17 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,9 +863,24 @@ async def update_token(self, token):
a FEDERATION_ACK back to the master, and stores the token that we have processed
in `federation_stream_position` so that we can restart where we left off.
"""
try:
self.federation_position = token
self.federation_position = token

# We save and send the ACK to master asynchronously, so we don't block
# processing on persistence. We don't need to do this operation for
# every single RDATA we receive, we just need to do it periodically.

if self._fed_position_linearizer.is_queued(None):
# There is already a task queued up to save and send the token, so
# no need to queue up another task.
return

run_as_background_process("_save_and_send_ack", self._save_and_send_ack)

async def _save_and_send_ack(self):
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""Save the current federation position in the database and send an ACK
to master with where we're up to.
"""
try:
# We linearize here to ensure we don't have races updating the token
#
# XXX this appears to be redundant, since the ReplicationCommandHandler
Expand Down
2 changes: 2 additions & 0 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ def __init__(self, hs):
# batching works.
self._pending_batches = {} # type: Dict[str, List[Any]]

self._queued_events = {} # type: Dict[str, List[Any]]
clokep marked this conversation as resolved.
Show resolved Hide resolved

# The factory used to create connections.
self._factory = None # type: Optional[ReconnectingClientFactory]

Expand Down
12 changes: 12 additions & 0 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,18 @@ def __init__(self, name=None, max_count=1, clock=None):
{}
) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]

def is_queued(self, key) -> bool:
"""Checks whether there is a process queued up waiting
"""
entry = self.key_to_defer.get(key)
if not entry:
# No entry so nothing is waiting.
return False

# There are waiting deferreds only in the OrderedDict of deferreds is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# There are waiting deferreds only in the OrderedDict of deferreds is
# There are waiting deferreds only if the OrderedDict of deferreds is

I don't think entry is an OrderedDict, isn't it a Sequence? I'm actually a little confused why this would check entry[1].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing, and
# the second element is an OrderedDict, where the keys are deferreds for the
# things blocked from executing.
self.key_to_defer = (
{}
) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]
explains it. The sequence is basically a data structure of "number of stuff current in flight" and "deferreds that are waiting".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. entry[1] is getting the OrderedDict of deferreds waiting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I only read half the comment. 🙄 Should this really be a Tuple[int, Dict[defer.Deferred, int]] I wonder?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this a bit...and it can't be a tuple cause it gets modified. Anyway, the type checking here is funky, but it is essentially a list of two items.

# non-empty.
return bool(entry[1])

def queue(self, key):
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
Expand Down