Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Move locally_reject_invite into event persistence.
Browse files Browse the repository at this point in the history
This also means adding a replication API to call it from other
instances.
  • Loading branch information
erikjohnston committed May 19, 2020
1 parent fcbaf3f commit 8b02186
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 31 deletions.
34 changes: 28 additions & 6 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.replication.http.membership import (
ReplicationLocallyRejectInviteRestServlet,
)
from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
Expand All @@ -44,11 +47,6 @@ class RoomMemberHandler(object):
__metaclass__ = abc.ABCMeta

def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer):
"""
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
Expand All @@ -71,6 +69,17 @@ def __init__(self, hs):
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles

self._event_stream_writer_instance = hs.config.worker.writers.events
self._is_on_event_persistence_instance = (
self._event_stream_writer_instance == hs.get_instance_name()
)
if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence
else:
self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
hs
)

# This is only used to get at ratelimit function, and
# maybe_kick_guest_users. It's fine there are multiple of these as
# it doesn't store state.
Expand Down Expand Up @@ -121,6 +130,19 @@ async def _remote_reject_invite(
"""
raise NotImplementedError()

async def locally_reject_invite(self, user_id: str, room_id: str):
"""Mark the invite has having been rejected even though we failed to
create a leave event for it.
"""
if self._is_on_event_persistence_instance:
await self.persist_event_storage.locally_reject_invite(user_id, room_id)
else:
await self._locally_reject_client(
instance_name=self._event_stream_writer_instance,
user_id=user_id,
room_id=room_id,
)

@abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
Expand Down Expand Up @@ -1000,7 +1022,7 @@ async def _remote_reject_invite(
#
logger.warning("Failed to reject invite: %s", e)

await self.store.locally_reject_invite(target.to_string(), room_id)
await self.locally_reject_invite(target.to_string(), room_id)
return {}

async def _user_joined_room(self, target: UserID, room_id: str) -> None:
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ def register_servlets(self, hs):
send_event.register_servlets(hs, self)
federation.register_servlets(hs, self)
presence.register_servlets(hs, self)
membership.register_servlets(hs, self)

# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
membership.register_servlets(hs, self)
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
Expand Down
38 changes: 37 additions & 1 deletion synapse/replication/http/membership.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING

from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -106,6 +110,7 @@ def __init__(self, hs):
self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.member_handler = hs.get_room_member_handler()

@staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
Expand Down Expand Up @@ -149,12 +154,42 @@ async def _handle_request(self, request, room_id, user_id):
#
logger.warning("Failed to reject invite: %s", e)

await self.store.locally_reject_invite(user_id, room_id)
await self.member_handler.locally_reject_invite(user_id, room_id)
ret = {}

return 200, ret


class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint):
"""Rejects the invite for the user and room locally.
Request format:
POST /_synapse/replication/locally_reject_invite/:room_id/:user_id
{}
"""

NAME = "locally_reject_invite"
PATH_ARGS = ("room_id", "user_id")

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.member_handler = hs.get_room_member_handler()

@staticmethod
def _serialize_payload(room_id, user_id):
return {}

async def _handle_request(self, request, room_id, user_id):
logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id)

await self.member_handler.locally_reject_invite(user_id, room_id)

return 200, {}


class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
"""Notifies that a user has joined or left the room
Expand Down Expand Up @@ -208,3 +243,4 @@ def register_servlets(hs, http_server):
ReplicationRemoteJoinRestServlet(hs).register(http_server)
ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
ReplicationLocallyRejectInviteRestServlet(hs).register(http_server)
26 changes: 26 additions & 0 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1590,3 +1590,29 @@ def _update_backward_extremeties(self, txn, events):
if not ev.internal_metadata.is_outlier()
],
)

async def locally_reject_invite(self, user_id: str, room_id: str):
"""Mark the invite has having been rejected even though we failed to
create a leave event for it.
"""

sql = (
"UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
" room_id = ? AND invitee = ? AND locally_rejected is NULL"
" AND replaced_by is NULL"
)

def f(txn, stream_ordering):
txn.execute(sql, (stream_ordering, True, room_id, user_id))

# We also clear this entry from `local_current_membership`.
# Ideally we'd point to a leave event, but we don't have one, so
# nevermind.
self.db.simple_delete_txn(
txn,
table="local_current_membership",
keyvalues={"room_id": room_id, "user_id": user_id},
)

with self._stream_id_gen.get_next() as stream_ordering:
await self.db.runInteraction("locally_reject_invite", f, stream_ordering)
23 changes: 0 additions & 23 deletions synapse/storage/data_stores/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,29 +1046,6 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
def __init__(self, database: Database, db_conn, hs):
super(RoomMemberStore, self).__init__(database, db_conn, hs)

@defer.inlineCallbacks
def locally_reject_invite(self, user_id, room_id):
sql = (
"UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
" room_id = ? AND invitee = ? AND locally_rejected is NULL"
" AND replaced_by is NULL"
)

def f(txn, stream_ordering):
txn.execute(sql, (stream_ordering, True, room_id, user_id))

# We also clear this entry from `local_current_membership`.
# Ideally we'd point to a leave event, but we don't have one, so
# nevermind.
self.db.simple_delete_txn(
txn,
table="local_current_membership",
keyvalues={"room_id": room_id, "user_id": user_id},
)

with self._stream_id_gen.get_next() as stream_ordering:
yield self.db.runInteraction("locally_reject_invite", f, stream_ordering)

def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id."""

Expand Down
6 changes: 6 additions & 0 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,3 +786,9 @@ async def _handle_potentially_left_users(self, user_ids: Set[str]):

for user_id in left_users:
await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)

async def locally_reject_invite(self, user_id: str, room_id: str):
"""Mark the invite has having been rejected even though we failed to
create a leave event for it.
"""
await self.persist_events_store.locally_reject_invite(user_id, room_id)

0 comments on commit 8b02186

Please sign in to comment.