Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #8463 from matrix-org/rav/clean_up_event_handling
Browse files Browse the repository at this point in the history
Reduce inconsistencies between codepaths for membership and non-membership events.
  • Loading branch information
richvdh committed Oct 7, 2020
2 parents 4f06373 + 903fcd2 commit 43c6228
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 92 deletions.
1 change: 1 addition & 0 deletions changelog.d/8463.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce inconsistencies between codepaths for membership and non-membership events.
134 changes: 66 additions & 68 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,61 +635,6 @@ async def assert_accepted_privacy_policy(self, requester: Requester) -> None:
msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)

async def send_nonmember_event(
self,
requester: Requester,
event: EventBase,
context: EventContext,
ratelimit: bool = True,
ignore_shadow_ban: bool = False,
) -> int:
"""
Persists and notifies local clients and federation of an event.
Args:
requester: The requester sending the event.
event: The event to send.
context: The context of the event.
ratelimit: Whether to rate limit this send.
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.
Return:
The stream_id of the persisted event.
Raises:
ShadowBanError if the requester has been shadow-banned.
"""
if event.type == EventTypes.Member:
raise SynapseError(
500, "Tried to send member event through non-member codepath"
)

if not ignore_shadow_ban and requester.shadow_banned:
# We randomly sleep a bit just to annoy the requester.
await self.clock.sleep(random.randint(1, 10))
raise ShadowBanError()

user = UserID.from_string(event.sender)

assert self.hs.is_mine(user), "User must be our own: %s" % (user,)

if event.is_state():
prev_event = await self.deduplicate_state_event(event, context)
if prev_event is not None:
logger.info(
"Not bothering to persist state event %s duplicated by %s",
event.event_id,
prev_event.event_id,
)
# we know it was persisted, so must have a stream ordering
assert prev_event.internal_metadata.stream_ordering
return prev_event.internal_metadata.stream_ordering

return await self.handle_new_client_event(
requester=requester, event=event, context=context, ratelimit=ratelimit
)

async def deduplicate_state_event(
self, event: EventBase, context: EventContext
) -> Optional[EventBase]:
Expand Down Expand Up @@ -730,7 +675,7 @@ async def create_and_send_nonmember_event(
"""
Creates an event, then sends it.
See self.create_event and self.send_nonmember_event.
See self.create_event and self.handle_new_client_event.
Args:
requester: The requester sending the event.
Expand All @@ -740,9 +685,19 @@ async def create_and_send_nonmember_event(
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.
Returns:
The event, and its stream ordering (if state event deduplication happened,
the previous, duplicate event).
Raises:
ShadowBanError if the requester has been shadow-banned.
"""

if event_dict["type"] == EventTypes.Member:
raise SynapseError(
500, "Tried to send member event through non-member codepath"
)

if not ignore_shadow_ban and requester.shadow_banned:
# We randomly sleep a bit just to annoy the requester.
await self.clock.sleep(random.randint(1, 10))
Expand All @@ -758,20 +713,27 @@ async def create_and_send_nonmember_event(
requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
)

assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
event.sender,
)

spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, str):
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)

stream_id = await self.send_nonmember_event(
requester,
event,
context,
ev = await self.handle_new_client_event(
requester=requester,
event=event,
context=context,
ratelimit=ratelimit,
ignore_shadow_ban=ignore_shadow_ban,
)
return event, stream_id

# we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering
return ev, ev.internal_metadata.stream_ordering

@measure_func("create_new_client_event")
async def create_new_client_event(
Expand Down Expand Up @@ -845,8 +807,11 @@ async def handle_new_client_event(
context: EventContext,
ratelimit: bool = True,
extra_users: List[UserID] = [],
) -> int:
"""Processes a new event. This includes checking auth, persisting it,
ignore_shadow_ban: bool = False,
) -> EventBase:
"""Processes a new event.
This includes deduplicating, checking auth, persisting,
notifying users, sending to remote servers, etc.
If called from a worker will hit out to the master process for final
Expand All @@ -859,10 +824,39 @@ async def handle_new_client_event(
ratelimit
extra_users: Any extra users to notify about event
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.
Return:
The stream_id of the persisted event.
If the event was deduplicated, the previous, duplicate, event. Otherwise,
`event`.
Raises:
ShadowBanError if the requester has been shadow-banned.
"""

# we don't apply shadow-banning to membership events here. Invites are blocked
# higher up the stack, and we allow shadow-banned users to send join and leave
# events as normal.
if (
event.type != EventTypes.Member
and not ignore_shadow_ban
and requester.shadow_banned
):
# We randomly sleep a bit just to annoy the requester.
await self.clock.sleep(random.randint(1, 10))
raise ShadowBanError()

if event.is_state():
prev_event = await self.deduplicate_state_event(event, context)
if prev_event is not None:
logger.info(
"Not bothering to persist state event %s duplicated by %s",
event.event_id,
prev_event.event_id,
)
return prev_event

if event.is_state() and (event.type, event.state_key) == (
EventTypes.Create,
"",
Expand Down Expand Up @@ -917,13 +911,13 @@ async def handle_new_client_event(
)
stream_id = result["stream_id"]
event.internal_metadata.stream_ordering = stream_id
return stream_id
return event

stream_id = await self.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)

return stream_id
return event
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
Expand Down Expand Up @@ -1234,8 +1228,12 @@ async def _send_dummy_event_for_room(self, room_id: str) -> bool:

# Since this is a dummy-event it is OK if it is sent by a
# shadow-banned user.
await self.send_nonmember_event(
requester, event, context, ratelimit=False, ignore_shadow_ban=True,
await self.handle_new_client_event(
requester=requester,
event=event,
context=context,
ratelimit=False,
ignore_shadow_ban=True,
)
return True
except ConsentNotGivenError:
Expand Down
5 changes: 3 additions & 2 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ async def _upgrade_room(
ShadowBanError if the requester is shadow-banned.
"""
user_id = requester.user.to_string()
assert self.hs.is_mine_id(user_id), "User must be our own: %s" % (user_id,)

# start by allocating a new room id
r = await self.store.get_room(old_room_id)
Expand Down Expand Up @@ -229,8 +230,8 @@ async def _upgrade_room(
)

# now send the tombstone
await self.event_creation_handler.send_nonmember_event(
requester, tombstone_event, tombstone_context
await self.event_creation_handler.handle_new_client_event(
requester=requester, event=tombstone_event, context=tombstone_context,
)

old_room_state = await tombstone_context.get_current_state_ids()
Expand Down
29 changes: 9 additions & 20 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,6 @@ async def _local_membership_update(
require_consent=require_consent,
)

# Check if this event matches the previous membership event for the user.
duplicate = await self.event_creation_handler.deduplicate_state_event(
event, context
)
if duplicate is not None:
# Discard the new event since this membership change is a no-op.
# we know it was persisted, so must have a stream ordering.
assert duplicate.internal_metadata.stream_ordering
return duplicate.event_id, duplicate.internal_metadata.stream_ordering

prev_state_ids = await context.get_prev_state_ids()

prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
Expand All @@ -222,7 +212,7 @@ async def _local_membership_update(
retry_after_ms=int(1000 * (time_allowed - time_now_s))
)

stream_id = await self.event_creation_handler.handle_new_client_event(
result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)

Expand All @@ -232,7 +222,9 @@ async def _local_membership_update(
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id)

return event.event_id, stream_id
# we know it was persisted, so should have a stream ordering
assert result_event.internal_metadata.stream_ordering
return result_event.event_id, result_event.internal_metadata.stream_ordering

async def copy_room_tags_and_direct_to_room(
self, old_room_id, new_room_id, user_id
Expand Down Expand Up @@ -673,12 +665,6 @@ async def send_membership_event(
else:
requester = types.create_requester(target_user)

prev_event = await self.event_creation_handler.deduplicate_state_event(
event, context
)
if prev_event is not None:
return

prev_state_ids = await context.get_prev_state_ids()
if event.membership == Membership.JOIN:
if requester.is_guest:
Expand Down Expand Up @@ -1186,10 +1172,13 @@ async def _locally_reject_invite(

context = await self.state_handler.compute_event_context(event)
context.app_service = requester.app_service
stream_id = await self.event_creation_handler.handle_new_client_event(
result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
return event.event_id, stream_id
# we know it was persisted, so must have a stream ordering
assert result_event.internal_metadata.stream_ordering

return result_event.event_id, result_event.internal_metadata.stream_ordering

async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
Expand Down
2 changes: 1 addition & 1 deletion tests/handlers/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def test_auto_create_auto_join_room_preset_invalid_permissions(self):
)
)
self.get_success(
event_creation_handler.send_nonmember_event(requester, event, context)
event_creation_handler.handle_new_client_event(requester, event, context)
)

# Register a second user, which won't be be in the room (or even have an invite)
Expand Down
4 changes: 3 additions & 1 deletion tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,9 @@ def create_and_send_event(
if soft_failed:
event.internal_metadata.soft_failed = True

self.get_success(event_creator.send_nonmember_event(requester, event, context))
self.get_success(
event_creator.handle_new_client_event(requester, event, context)
)

return event.event_id

Expand Down

0 comments on commit 43c6228

Please sign in to comment.