Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Break cycle
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jul 19, 2023
1 parent 3b2dea2 commit b965651
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 15 deletions.
9 changes: 5 additions & 4 deletions synapse/handlers/worker_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def __init__(self, hs: "HomeServer") -> None:
self._reactor = hs.get_reactor()
self._store = hs.get_datastores().main
self._clock = hs.get_clock()
self._replication_handler = hs.get_replication_command_handler()
self._notifier = hs.get_notifier()
self._instance_name = hs.get_instance_name()

# Map from lock name/key to set of `WaitingLock` that are active for
# that lock.
Expand Down Expand Up @@ -143,10 +143,11 @@ def notify_lock_released(self, lock_name: str, lock_key: str) -> None:
Pokes both the notifier and replication.
"""

self._replication_handler.send_lock_released(lock_name, lock_key)
self._notifier.notify_lock_released(lock_name, lock_key)
self._notifier.notify_lock_released(self._instance_name, lock_name, lock_key)

def _on_lock_released(self, lock_name: str, lock_key: str) -> None:
def _on_lock_released(
self, instance_name: str, lock_name: str, lock_key: str
) -> None:
"""Called when a lock has been released.
Wakes up any locks that might be waiting on this.
Expand Down
12 changes: 8 additions & 4 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def __init__(self, hs: "HomeServer"):
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules

# List of callbacks to be notified when a lock is released
self._lock_released_callback: List[Callable[[str, str], None]] = []
self._lock_released_callback: List[Callable[[str, str, str], None]] = []

self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
Expand Down Expand Up @@ -788,14 +788,18 @@ def notify_remote_server_up(self, server: str) -> None:
# that any in flight requests can be immediately retried.
self._federation_client.wake_destination(server)

def add_lock_released_callback(self, callback: Callable[[str, str], None]) -> None:
def add_lock_released_callback(
self, callback: Callable[[str, str, str], None]
) -> None:
"""Add a function to be called whenever we are notified about a released lock."""
self._lock_released_callback.append(callback)

def notify_lock_released(self, lock_name: str, lock_key: str) -> None:
def notify_lock_released(
self, instance_name: str, lock_name: str, lock_key: str
) -> None:
"""Notify the callbacks that a lock has been released."""
for cb in self._lock_released_callback:
cb(lock_name, lock_key)
cb(instance_name, lock_name, lock_key)


@attr.s(auto_attribs=True)
Expand Down
10 changes: 6 additions & 4 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,27 +427,29 @@ class LockReleasedCommand(Command):
Format::
LOCK_RELEASED ["<lock_name>", "<lock_key>"]
LOCK_RELEASED ["<instance_name>", "<lock_name>", "<lock_key>"]
"""

NAME = "LOCK_RELEASED"

def __init__(
self,
instance_name: str,
lock_name: str,
lock_key: str,
):
self.instance_name = instance_name
self.lock_name = lock_name
self.lock_key = lock_key

@classmethod
def from_line(cls: Type["LockReleasedCommand"], line: str) -> "LockReleasedCommand":
lock_name, lock_key = json_decoder.decode(line)
instance_name, lock_name, lock_key = json_decoder.decode(line)

return cls(lock_name, lock_key)
return cls(instance_name, lock_name, lock_key)

def to_line(self) -> str:
return json_encoder.encode([self.lock_name, self.lock_key])
return json_encoder.encode([self.instance_name, self.lock_name, self.lock_key])


_COMMANDS: Tuple[Type[Command], ...] = (
Expand Down
16 changes: 13 additions & 3 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ def __init__(self, hs: "HomeServer"):
if self._is_master or self._should_insert_client_ips:
self.subscribe_to_channel("USER_IP")

self._notifier.add_lock_released_callback(self.on_lock_released)

def subscribe_to_channel(self, channel_name: str) -> None:
"""
Indicates that we wish to subscribe to a Redis channel by name.
Expand Down Expand Up @@ -653,7 +655,12 @@ def on_LOCK_RELEASED(
self, conn: IReplicationConnection, cmd: LockReleasedCommand
) -> None:
"""Called when we get a new LOCK_RELEASED command."""
self._notifier.notify_lock_released(cmd.lock_name, cmd.lock_key)
if cmd.instance_name == self._instance_name:
return

self._notifier.notify_lock_released(
cmd.instance_name, cmd.lock_name, cmd.lock_key
)

def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection."""
Expand Down Expand Up @@ -761,9 +768,12 @@ def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> No
"""
self.send_command(RdataCommand(stream_name, self._instance_name, token, data))

def send_lock_released(self, lock_name: str, lock_key: str) -> None:
def on_lock_released(
self, instance_name: str, lock_name: str, lock_key: str
) -> None:
"""Called when we released a lock and should notify other instances."""
self.send_command(LockReleasedCommand(lock_name, lock_key))
if instance_name == self._instance_name:
self.send_command(LockReleasedCommand(instance_name, lock_name, lock_key))


UpdateToken = TypeVar("UpdateToken")
Expand Down

0 comments on commit b965651

Please sign in to comment.