Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Miscellaneous cleanups to replication code #7239

Merged
merged 4 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7329.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move catchup of replication streams logic to worker.
49 changes: 24 additions & 25 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
allowed to be sent by which side.
"""

import abc
import logging
import platform
from typing import Tuple, Type
Expand All @@ -34,34 +34,29 @@
logger = logging.getLogger(__name__)


class Command(object):
class Command(metaclass=abc.ABCMeta):
"""The base command class.

All subclasses must set the NAME variable which equates to the name of the
command on the wire.

A full command line on the wire is constructed from `NAME + " " + to_line()`

The default implementation creates a command of form `<NAME> <data>`
"""

NAME = None # type: str

def __init__(self, data):
self.data = data

@classmethod
@abc.abstractmethod
def from_line(cls, line):
"""Deserialises a line from the wire into this command. `line` does not
include the command.
"""
return cls(line)

def to_line(self):
@abc.abstractmethod
def to_line(self) -> str:
"""Serialises the comamnd for the wire. Does not include the command
prefix.
"""
return self.data

def get_logcontext_id(self):
"""Get a suitable string for the logcontext when processing this command"""
Expand All @@ -70,7 +65,21 @@ def get_logcontext_id(self):
return self.NAME


class ServerCommand(Command):
class _SimpleCommand(Command):
"""An implementation of Command whose argument is just a 'data' string."""

def __init__(self, data):
self.data = data

@classmethod
def from_line(cls, line):
return cls(line)

def to_line(self) -> str:
return self.data


class ServerCommand(_SimpleCommand):
"""Sent by the server on new connection and includes the server_name.

Format::
Expand Down Expand Up @@ -155,22 +164,22 @@ def to_line(self):
return " ".join((self.stream_name, str(self.token)))


class ErrorCommand(Command):
class ErrorCommand(_SimpleCommand):
"""Sent by either side if there was an ERROR. The data is a string describing
the error.
"""

NAME = "ERROR"


class PingCommand(Command):
class PingCommand(_SimpleCommand):
"""Sent by either side as a keep alive. The data is arbitary (often timestamp)
"""

NAME = "PING"


class NameCommand(Command):
class NameCommand(_SimpleCommand):
"""Sent by client to inform the server of the client's identity. The data
is the name
"""
Expand Down Expand Up @@ -289,14 +298,6 @@ def to_line(self):
return str(self.token)


class SyncCommand(Command):
"""Used for testing. The client protocol implementation allows waiting
on a SYNC command with a specified data.
"""

NAME = "SYNC"


class RemovePusherCommand(Command):
"""Sent by the client to request the master remove the given pusher.

Expand Down Expand Up @@ -395,7 +396,7 @@ def to_line(self):
)


class RemoteServerUpCommand(Command):
class RemoteServerUpCommand(_SimpleCommand):
"""Sent when a worker has detected that a remote server is no longer
"down" and retry timings should be reset.

Expand All @@ -419,7 +420,6 @@ class RemoteServerUpCommand(Command):
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
SyncCommand,
RemovePusherCommand,
InvalidateCacheCommand,
UserIpCommand,
Expand All @@ -437,7 +437,6 @@ class RemoteServerUpCommand(Command):
PositionCommand.NAME,
ErrorCommand.NAME,
PingCommand.NAME,
SyncCommand.NAME,
RemoteServerUpCommand.NAME,
)

Expand Down
4 changes: 0 additions & 4 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
RemoteServerUpCommand,
RemovePusherCommand,
ReplicateCommand,
SyncCommand,
UserIpCommand,
UserSyncCommand,
)
Expand Down Expand Up @@ -281,9 +280,6 @@ async def on_POSITION(self, cmd: PositionCommand):

self._streams_connected.add(cmd.stream_name)

async def on_SYNC(self, cmd: SyncCommand):
pass

async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
""""Called when get a new REMOTE_SERVER_UP command."""
self._replication_data_handler.on_remote_server_up(cmd.data)
Expand Down
14 changes: 11 additions & 3 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,23 @@ def send_ping(self):
)
self.send_error("ping timeout")

def lineReceived(self, line):
def lineReceived(self, line: bytes):
"""Called when we've received a line
"""
if line.strip() == "":
# Ignore blank lines
return

line = line.decode("utf-8")
cmd_name, rest_of_line = line.split(" ", 1)
linestr = line.decode("utf-8")

# split at the first " ", handling one-word commands
idx = linestr.index(" ")
if idx >= 0:
cmd_name = linestr[:idx]
rest_of_line = linestr[idx + 1 :]
else:
cmd_name = linestr
rest_of_line = ""
Comment on lines +214 to +220
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have used split(maxsplit=1), but doesn't seem to really simplify this code at all.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, split(maxsplit=1) is what we have at https://github.com/matrix-org/synapse/pull/7239/files#diff-88dc76170973754b5122a253db41d339L212.

indeed this impl feels cumbersome, but I'm not aware of a better way :-S


if cmd_name not in self.VALID_INBOUND_COMMANDS:
logger.error("[%s] invalid command %s", self.id(), cmd_name)
Expand Down