Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce inconsistencies between codepaths for membership and non-membership events. #8463

Merged
merged 6 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8463.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce inconsistencies between codepaths for membership and non-membership events.
133 changes: 65 additions & 68 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,61 +635,6 @@ async def assert_accepted_privacy_policy(self, requester: Requester) -> None:
msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)

async def send_nonmember_event(
self,
requester: Requester,
event: EventBase,
context: EventContext,
ratelimit: bool = True,
ignore_shadow_ban: bool = False,
) -> int:
"""
Persists and notifies local clients and federation of an event.

Args:
requester: The requester sending the event.
event: The event to send.
context: The context of the event.
ratelimit: Whether to rate limit this send.
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.

Return:
The stream_id of the persisted event.

Raises:
ShadowBanError if the requester has been shadow-banned.
"""
if event.type == EventTypes.Member:
raise SynapseError(
500, "Tried to send member event through non-member codepath"
)

if not ignore_shadow_ban and requester.shadow_banned:
# We randomly sleep a bit just to annoy the requester.
await self.clock.sleep(random.randint(1, 10))
raise ShadowBanError()

user = UserID.from_string(event.sender)

assert self.hs.is_mine(user), "User must be our own: %s" % (user,)

if event.is_state():
prev_event = await self.deduplicate_state_event(event, context)
if prev_event is not None:
logger.info(
"Not bothering to persist state event %s duplicated by %s",
event.event_id,
prev_event.event_id,
)
# we know it was persisted, so must have a stream ordering
assert prev_event.internal_metadata.stream_ordering
return prev_event.internal_metadata.stream_ordering

return await self.handle_new_client_event(
requester=requester, event=event, context=context, ratelimit=ratelimit
)

async def deduplicate_state_event(
self, event: EventBase, context: EventContext
) -> Optional[EventBase]:
Expand Down Expand Up @@ -730,7 +675,7 @@ async def create_and_send_nonmember_event(
"""
Creates an event, then sends it.

See self.create_event and self.send_nonmember_event.
See self.create_event and self.handle_new_client_event.

Args:
requester: The requester sending the event.
Expand All @@ -740,9 +685,19 @@ async def create_and_send_nonmember_event(
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.

Returns:
The event, and its stream ordering (if state event deduplication happened,
the previous, duplicate event).

Raises:
ShadowBanError if the requester has been shadow-banned.
"""

if event_dict["type"] == EventTypes.Member:
raise SynapseError(
500, "Tried to send member event through non-member codepath"
)

if not ignore_shadow_ban and requester.shadow_banned:
# We randomly sleep a bit just to annoy the requester.
await self.clock.sleep(random.randint(1, 10))
Expand All @@ -758,20 +713,27 @@ async def create_and_send_nonmember_event(
requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
)

assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
event.sender,
)

spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, str):
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)

stream_id = await self.send_nonmember_event(
requester,
event,
context,
ev = await self.handle_new_client_event(
requester=requester,
event=event,
context=context,
ratelimit=ratelimit,
ignore_shadow_ban=ignore_shadow_ban,
)
return event, stream_id

# we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering
return ev, ev.internal_metadata.stream_ordering

@measure_func("create_new_client_event")
async def create_new_client_event(
Expand Down Expand Up @@ -845,8 +807,11 @@ async def handle_new_client_event(
context: EventContext,
ratelimit: bool = True,
extra_users: List[UserID] = [],
) -> int:
"""Processes a new event. This includes checking auth, persisting it,
ignore_shadow_ban: bool = False,
) -> EventBase:
"""Processes a new event.

This includes deduplicating, checking auth, persisting,
notifying users, sending to remote servers, etc.

If called from a worker will hit out to the master process for final
Expand All @@ -859,10 +824,38 @@ async def handle_new_client_event(
ratelimit
extra_users: Any extra users to notify about event

ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.

Return:
The stream_id of the persisted event.
If the event was deduplicated, the previous, duplicate, event. Otherwise,
`event`.

Raises:
ShadowBanError if the requester has been shadow-banned.
"""

# we don't apply shadow-banning to membership events, so that the user
# can come and go as they want.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is really true? We do have code to handle them for invites, but I suppose those are handled before this point is reached.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess... I'll add a couple more words.

if (
event.type != EventTypes.Member
and not ignore_shadow_ban
and requester.shadow_banned
):
# We randomly sleep a bit just to annoy the requester.
await self.clock.sleep(random.randint(1, 10))
raise ShadowBanError()

if event.is_state():
prev_event = await self.deduplicate_state_event(event, context)
if prev_event is not None:
logger.info(
"Not bothering to persist state event %s duplicated by %s",
event.event_id,
prev_event.event_id,
)
return prev_event

if event.is_state() and (event.type, event.state_key) == (
EventTypes.Create,
"",
Expand Down Expand Up @@ -917,13 +910,13 @@ async def handle_new_client_event(
)
stream_id = result["stream_id"]
event.internal_metadata.stream_ordering = stream_id
return stream_id
return event

stream_id = await self.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)

return stream_id
return event
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
Expand Down Expand Up @@ -1234,8 +1227,12 @@ async def _send_dummy_event_for_room(self, room_id: str) -> bool:

# Since this is a dummy-event it is OK if it is sent by a
# shadow-banned user.
await self.send_nonmember_event(
requester, event, context, ratelimit=False, ignore_shadow_ban=True,
await self.handle_new_client_event(
requester=requester,
event=event,
context=context,
ratelimit=False,
ignore_shadow_ban=True,
)
return True
except ConsentNotGivenError:
Expand Down
5 changes: 3 additions & 2 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ async def _upgrade_room(
ShadowBanError if the requester is shadow-banned.
"""
user_id = requester.user.to_string()
assert self.hs.is_mine_id(user_id), "User must be our own: %s" % (user_id,)

# start by allocating a new room id
r = await self.store.get_room(old_room_id)
Expand Down Expand Up @@ -229,8 +230,8 @@ async def _upgrade_room(
)

# now send the tombstone
await self.event_creation_handler.send_nonmember_event(
requester, tombstone_event, tombstone_context
await self.event_creation_handler.handle_new_client_event(
requester=requester, event=tombstone_event, context=tombstone_context,
)

old_room_state = await tombstone_context.get_current_state_ids()
Expand Down
29 changes: 9 additions & 20 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,6 @@ async def _local_membership_update(
require_consent=require_consent,
)

# Check if this event matches the previous membership event for the user.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any concern that removing this check from earlier on might cause odd errors (e.g. rate limiting or in send_membership_event it could cause auth errors) to be hit in the case that a duplicate event is being persisted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the only difference between doing it here and at the start of handle_new_client_event is the "newly joined" code between lines 204 and 223, which by definition won't do anything in the case of a duplicate state event.

In send_membership_event, we'll now be doing auth checks on duplicate events where we previously would have skipped them. It's possible that a duplicate event will fail such checks, but that's a bit of an edge case and I don't really think it matters: fundamentally it suggests the client is trying to do something that the server doesn't want to allow.

duplicate = await self.event_creation_handler.deduplicate_state_event(
event, context
)
if duplicate is not None:
# Discard the new event since this membership change is a no-op.
# we know it was persisted, so must have a stream ordering.
assert duplicate.internal_metadata.stream_ordering
return duplicate.event_id, duplicate.internal_metadata.stream_ordering

prev_state_ids = await context.get_prev_state_ids()

prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
Expand All @@ -222,7 +212,7 @@ async def _local_membership_update(
retry_after_ms=int(1000 * (time_allowed - time_now_s))
)

stream_id = await self.event_creation_handler.handle_new_client_event(
result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)

Expand All @@ -232,7 +222,9 @@ async def _local_membership_update(
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id)

return event.event_id, stream_id
# we know it was persisted, so should have a stream ordering
assert result_event.internal_metadata.stream_ordering
return result_event.event_id, result_event.internal_metadata.stream_ordering

async def copy_room_tags_and_direct_to_room(
self, old_room_id, new_room_id, user_id
Expand Down Expand Up @@ -673,12 +665,6 @@ async def send_membership_event(
else:
requester = types.create_requester(target_user)

prev_event = await self.event_creation_handler.deduplicate_state_event(
event, context
)
if prev_event is not None:
return

prev_state_ids = await context.get_prev_state_ids()
if event.membership == Membership.JOIN:
if requester.is_guest:
Expand Down Expand Up @@ -1186,10 +1172,13 @@ async def _locally_reject_invite(

context = await self.state_handler.compute_event_context(event)
context.app_service = requester.app_service
stream_id = await self.event_creation_handler.handle_new_client_event(
result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
return event.event_id, stream_id
# we know it was persisted, so must have a stream ordering
assert result_event.internal_metadata.stream_ordering

return result_event.event_id, result_event.internal_metadata.stream_ordering

async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
Expand Down
2 changes: 1 addition & 1 deletion tests/handlers/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def test_auto_create_auto_join_room_preset_invalid_permissions(self):
)
)
self.get_success(
event_creation_handler.send_nonmember_event(requester, event, context)
event_creation_handler.handle_new_client_event(requester, event, context)
)

# Register a second user, which won't be be in the room (or even have an invite)
Expand Down
4 changes: 3 additions & 1 deletion tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,9 @@ def create_and_send_event(
if soft_failed:
event.internal_metadata.soft_failed = True

self.get_success(event_creator.send_nonmember_event(requester, event, context))
self.get_success(
event_creator.handle_new_client_event(requester, event, context)
)

return event.event_id

Expand Down