Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve code documentation for the typing stream over replication. #12211

Merged
merged 4 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12211.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve code documentation for the typing stream over replication.
5 changes: 3 additions & 2 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ def process_replication_rows(
"""Should be called whenever we receive updates for typing stream."""

if self._latest_room_serial > token:
# The master has gone backwards. To prevent inconsistent data, just
# clear everything.
# The typing worker has gone backwards (e.g. it may have restarted).
# To prevent inconsistent data, just clear everything.
logger.info("Typing handler stream went backwards; resetting")
self._reset()

# Set the latest serial token to whatever the server gave us.
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ def send_remote_server_up(self, server: str) -> None:
self.send_command(RemoteServerUpCommand(server))

def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None:
"""Called when a new update is available to stream to clients.
"""Called when a new update is available to stream to Redis subscribers.

We need to check if the client is interested in the stream or not
"""
Expand Down
6 changes: 3 additions & 3 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def buildProtocol(self, addr: IAddress) -> ServerReplicationStreamProtocol:
class ReplicationStreamer:
"""Handles replication connections.

This needs to be poked when new replication data may be available. When new
data is available it will propagate to all connected clients.
This needs to be poked when new replication data may be available.
When new data is available it will propagate to all Redis subscribers.
"""

def __init__(self, hs: "HomeServer"):
Expand Down Expand Up @@ -109,7 +109,7 @@ def __init__(self, hs: "HomeServer"):

def on_notifier_poke(self) -> None:
"""Checks if there is actually any new data and sends it to the
connections if there are.
Redis subscribers if there are.

This should get called each time new data is available, even if it
is currently being executed, so that nothing gets missed
Expand Down
12 changes: 12 additions & 0 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,19 @@ def __init__(self, hs: "HomeServer"):
class TypingStream(Stream):
@attr.s(slots=True, frozen=True, auto_attribs=True)
class TypingStreamRow:
"""
An entry in the typing stream.
Describes all the users that are 'typing' right now in one room.

When a user stops typing, it will be streamed as a new update with that
user absent; you can think of the `user_ids` list as overwriting the
entire list that was there previously.
"""

# The room that this update is for.
room_id: str

# All the users that are 'typing' right now in the specified room.
user_ids: List[str]

NAME = "typing"
Expand Down