From 5266b9af953029094e53b6d76373de18587ad78a Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 6 Oct 2022 11:27:42 -0700 Subject: [PATCH 01/15] change _get_power_levels_and_sender_level to check for events in batch --- synapse/push/bulk_push_rule_evaluator.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index a75386f6a0ec..0dfec60c0ae2 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -165,7 +165,10 @@ async def _get_rules_for_event( return rules_by_user async def _get_power_levels_and_sender_level( - self, event: EventBase, context: EventContext + self, + event: EventBase, + context: EventContext, + event_id_to_event: Mapping[str, EventBase], ) -> Tuple[dict, Optional[int]]: # There are no power levels and sender levels possible to get from outlier if event.internal_metadata.is_outlier(): @@ -177,15 +180,26 @@ async def _get_power_levels_and_sender_level( ) pl_event_id = prev_state_ids.get(POWER_KEY) + # fastpath: if there's a power level event, that's all we need, and + # not having a power level event is an extreme edge case if pl_event_id: - # fastpath: if there's a power level event, that's all we need, and - # not having a power level event is an extreme edge case - auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)} + # check that the power level event is not in the batch before checking the DB + pl_event = event_id_to_event.get(pl_event_id) + if pl_event: + auth_events = {POWER_KEY: pl_event} + else: + auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)} else: auth_events_ids = self._event_auth_handler.compute_auth_events( event, prev_state_ids, for_verification=False ) auth_events_dict = await self.store.get_events(auth_events_ids) + # check to see that there aren't any needed auth events in the batch as it + # hasn't been persisted yet + for auth_event_id in auth_events_ids: + auth_event = event_id_to_event.get(auth_event_id) + if auth_event: + auth_events_dict[auth_event_id] = auth_event auth_events = {(e.type, e.state_key): e for e in auth_events_dict.values()} sender_level = get_user_power_level(event.sender, auth_events) From 1fae650428c5476f7e179d8f3ec9f46d4e487849 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 6 Oct 2022 11:28:56 -0700 Subject: [PATCH 02/15] change action_for_event_by_user to take a list of events/context and pass batch of events to _get_power_levels_and_sender_level --- synapse/push/bulk_push_rule_evaluator.py | 240 ++++++++++++----------- 1 file changed, 127 insertions(+), 113 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 0dfec60c0ae2..2f4a9280019f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -210,125 +210,139 @@ async def _get_power_levels_and_sender_level( @measure_func("action_for_event_by_user") async def action_for_event_by_user( - self, event: EventBase, context: EventContext + self, events_and_context: List[Tuple[EventBase, EventContext]] ) -> None: - """Given an event and context, evaluate the push rules, check if the message - should increment the unread count, and insert the results into the - event_push_actions_staging table. + """Given a list of events and their associated contexts, evaluate the push rules + for each event, check if the message should increment the unread count, and + insert the results into the event_push_actions_staging table. """ - if not event.internal_metadata.is_notifiable(): - # Push rules for events that aren't notifiable can't be processed by this - return - - # Disable counting as unread unless the experimental configuration is - # enabled, as it can cause additional (unwanted) rows to be added to the - # event_push_actions table. - count_as_unread = False - if self.hs.config.experimental.msc2654_enabled: - count_as_unread = _should_count_as_unread(event, context) - - rules_by_user = await self._get_rules_for_event(event) - actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {} - - room_member_count = await self.store.get_number_joined_users_in_room( - event.room_id - ) + for event, context in events_and_context: + if not event.internal_metadata.is_notifiable(): + # Push rules for events that aren't notifiable can't be processed by this + return + # Skip push notification actions for historical messages + # because we don't want to notify people about old history back in time. + # The historical messages also do not have the proper `context.current_state_ids` + # and `state_groups` because they have `prev_events` that aren't persisted yet + # (historical messages persisted in reverse-chronological order). + if event.internal_metadata.is_historical(): + return + + # Disable counting as unread unless the experimental configuration is + # enabled, as it can cause additional (unwanted) rows to be added to the + # event_push_actions table. + count_as_unread = False + if self.hs.config.experimental.msc2654_enabled: + count_as_unread = _should_count_as_unread(event, context) + + rules_by_user = await self._get_rules_for_event(event) + actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {} + + room_member_count = await self.store.get_number_joined_users_in_room( + event.room_id + ) - ( - power_levels, - sender_power_level, - ) = await self._get_power_levels_and_sender_level(event, context) - - # Find the event's thread ID. - relation = relation_from_event(event) - # If the event does not have a relation, then it cannot have a thread ID. - thread_id = MAIN_TIMELINE - if relation: - # Recursively attempt to find the thread this event relates to. - if relation.rel_type == RelationTypes.THREAD: - thread_id = relation.parent_id - else: - # Since the event has not yet been persisted we check whether - # the parent is part of a thread. - thread_id = await self.store.get_thread_id(relation.parent_id) - - # It's possible that old room versions have non-integer power levels (floats or - # strings). Workaround this by explicitly converting to int. - notification_levels = power_levels.get("notifications", {}) - if not event.room_version.msc3667_int_only_power_levels: - for user_id, level in notification_levels.items(): - notification_levels[user_id] = int(level) - - evaluator = PushRuleEvaluator( - _flatten_dict(event), - room_member_count, - sender_power_level, - notification_levels, - ) + # For batched events the power level events may not have been persisted yet, + # so we pass in the batched events. Thus if the event cannot be found in the + # database we can check in the batch. + event_id_to_event = {e.event_id: e for e, _ in events_and_context} + ( + power_levels, + sender_power_level, + ) = await self._get_power_levels_and_sender_level( + event, context, event_id_to_event + ) - users = rules_by_user.keys() - profiles = await self.store.get_subset_users_in_room_with_profiles( - event.room_id, users - ) + # Find the event's thread ID. + relation = relation_from_event(event) + # If the event does not have a relation, then it cannot have a thread ID. + thread_id = MAIN_TIMELINE + if relation: + # Recursively attempt to find the thread this event relates to. + if relation.rel_type == RelationTypes.THREAD: + thread_id = relation.parent_id + else: + # Since the event has not yet been persisted we check whether + # the parent is part of a thread. + thread_id = await self.store.get_thread_id(relation.parent_id) + + # It's possible that old room versions have non-integer power levels (floats or + # strings). Workaround this by explicitly converting to int. + notification_levels = power_levels.get("notifications", {}) + if not event.room_version.msc3667_int_only_power_levels: + for user_id, level in notification_levels.items(): + notification_levels[user_id] = int(level) + + evaluator = PushRuleEvaluator( + _flatten_dict(event), + room_member_count, + sender_power_level, + notification_levels, + ) - for uid, rules in rules_by_user.items(): - if event.sender == uid: - continue - - display_name = None - profile = profiles.get(uid) - if profile: - display_name = profile.display_name - - if not display_name: - # Handle the case where we are pushing a membership event to - # that user, as they might not be already joined. - if event.type == EventTypes.Member and event.state_key == uid: - display_name = event.content.get("displayname", None) - if not isinstance(display_name, str): - display_name = None - - if count_as_unread: - # Add an element for the current user if the event needs to be marked as - # unread, so that add_push_actions_to_staging iterates over it. - # If the event shouldn't be marked as unread but should notify the - # current user, it'll be added to the dict later. - actions_by_user[uid] = [] - - actions = evaluator.run(rules, uid, display_name) - if "notify" in actions: - # Push rules say we should notify the user of this event - actions_by_user[uid] = actions - - # If there aren't any actions then we can skip the rest of the - # processing. - if not actions_by_user: - return - - # This is a check for the case where user joins a room without being - # allowed to see history, and then the server receives a delayed event - # from before the user joined, which they should not be pushed for - # - # We do this *after* calculating the push actions as a) its unlikely - # that we'll filter anyone out and b) for large rooms its likely that - # most users will have push disabled and so the set of users to check is - # much smaller. - uids_with_visibility = await filter_event_for_clients_with_state( - self.store, actions_by_user.keys(), event, context - ) + users = rules_by_user.keys() + profiles = await self.store.get_subset_users_in_room_with_profiles( + event.room_id, users + ) - for user_id in set(actions_by_user).difference(uids_with_visibility): - actions_by_user.pop(user_id, None) - - # Mark in the DB staging area the push actions for users who should be - # notified for this event. (This will then get handled when we persist - # the event) - await self.store.add_push_actions_to_staging( - event.event_id, - actions_by_user, - count_as_unread, - thread_id, - ) + for uid, rules in rules_by_user.items(): + if event.sender == uid: + continue + + display_name = None + profile = profiles.get(uid) + if profile: + display_name = profile.display_name + + if not display_name: + # Handle the case where we are pushing a membership event to + # that user, as they might not be already joined. + if event.type == EventTypes.Member and event.state_key == uid: + display_name = event.content.get("displayname", None) + if not isinstance(display_name, str): + display_name = None + + if count_as_unread: + # Add an element for the current user if the event needs to be marked as + # unread, so that add_push_actions_to_staging iterates over it. + # If the event shouldn't be marked as unread but should notify the + # current user, it'll be added to the dict later. + actions_by_user[uid] = [] + + actions = evaluator.run(rules, uid, display_name) + if "notify" in actions: + # Push rules say we should notify the user of this event + actions_by_user[uid] = actions + + # If there aren't any actions then we can skip the rest of the + # processing. + if not actions_by_user: + return + + # This is a check for the case where user joins a room without being + # allowed to see history, and then the server receives a delayed event + # from before the user joined, which they should not be pushed for + # + # We do this *after* calculating the push actions as a) its unlikely + # that we'll filter anyone out and b) for large rooms its likely that + # most users will have push disabled and so the set of users to check is + # much smaller. + uids_with_visibility = await filter_event_for_clients_with_state( + self.store, actions_by_user.keys(), event, context + ) + + for user_id in set(actions_by_user).difference(uids_with_visibility): + actions_by_user.pop(user_id, None) + + # Mark in the DB staging area the push actions for users who should be + # notified for this event. (This will then get handled when we persist + # the event) + await self.store.add_push_actions_to_staging( + event.event_id, + actions_by_user, + count_as_unread, + thread_id, + ) MemberMap = Dict[str, Optional[EventIdMembership]] From f8b8de9c4e5d983baca5bea0da7303ed3d188fe5 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 6 Oct 2022 11:30:11 -0700 Subject: [PATCH 03/15] update callsites of action_for_event_by_user --- synapse/handlers/federation.py | 4 +++- synapse/handlers/federation_event.py | 2 +- synapse/handlers/message.py | 16 ++++------------ 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 275a37a5751e..4b4de5098dc0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1017,7 +1017,9 @@ async def on_invite_request( context = EventContext.for_outlier(self._storage_controllers) - await self._bulk_push_rule_evaluator.action_for_event_by_user(event, context) + await self._bulk_push_rule_evaluator.action_for_event_by_user( + [(event, context)] + ) try: await self._federation_event_handler.persist_events_and_notify( event.room_id, [(event, context)] diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 06e41b5cc0de..dcb81525a16f 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -2172,7 +2172,7 @@ async def _run_push_actions_and_persist_event( ) else: await self._bulk_push_rule_evaluator.action_for_event_by_user( - event, context + [(event, context)] ) try: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 15b828dd742d..c4dae13240b5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1432,18 +1432,10 @@ async def _persist_events( PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ - - for event, context in events_and_context: - # Skip push notification actions for historical messages - # because we don't want to notify people about old history back in time. - # The historical messages also do not have the proper `context.current_state_ids` - # and `state_groups` because they have `prev_events` that aren't persisted yet - # (historical messages persisted in reverse-chronological order). - if not event.internal_metadata.is_historical(): - with opentracing.start_active_span("calculate_push_actions"): - await self._bulk_push_rule_evaluator.action_for_event_by_user( - event, context - ) + with opentracing.start_active_span("calculate_push_actions"): + await self._bulk_push_rule_evaluator.action_for_event_by_user( + events_and_context + ) try: # If we're a worker we need to hit out to the master. From 7c9c0da58881e25bedd88e7c5c817289bad8b3a2 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 6 Oct 2022 11:30:32 -0700 Subject: [PATCH 04/15] add power level event to batched events --- synapse/handlers/room.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 638f54051ada..a61685dfa97f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1157,6 +1157,7 @@ async def send( depth += 1 state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id + events_to_send = [] # We treat the power levels override specially as this needs to be one # of the first events that get sent into a room. pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None) @@ -1165,7 +1166,7 @@ async def send( EventTypes.PowerLevels, pl_content, False ) current_state_group = power_context._state_group - await send(power_event, power_context, creator) + events_to_send.append((power_event, power_context)) else: power_level_content: JsonDict = { "users": {creator_id: 100}, @@ -1214,9 +1215,8 @@ async def send( False, ) current_state_group = pl_context._state_group - await send(pl_event, pl_context, creator) + events_to_send.append((pl_event, pl_context)) - events_to_send = [] if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state: room_alias_event, room_alias_context = await create_event( EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True From 4eebbd8cc73953f5f7ce5bee7e762c74b7958b73 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 6 Oct 2022 12:10:06 -0700 Subject: [PATCH 05/15] fix tests to reflect new reality --- tests/push/test_bulk_push_rule_evaluator.py | 2 +- tests/rest/client/test_rooms.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/push/test_bulk_push_rule_evaluator.py b/tests/push/test_bulk_push_rule_evaluator.py index 675d7df2ac45..4d8bee3fda8e 100644 --- a/tests/push/test_bulk_push_rule_evaluator.py +++ b/tests/push/test_bulk_push_rule_evaluator.py @@ -71,4 +71,4 @@ def test_action_for_event_by_user_handles_noninteger_power_levels(self) -> None: bulk_evaluator = BulkPushRuleEvaluator(self.hs) # should not raise - self.get_success(bulk_evaluator.action_for_event_by_user(event, context)) + self.get_success(bulk_evaluator.action_for_event_by_user([(event, context)])) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 71b1637be8fa..9511c14431dd 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -711,7 +711,7 @@ def test_post_room_no_keys(self) -> None: self.assertEqual(HTTPStatus.OK, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(34, channel.resource_usage.db_txn_count) + self.assertEqual(31, channel.resource_usage.db_txn_count) def test_post_room_initial_state(self) -> None: # POST with initial_state config key, expect new room id @@ -724,7 +724,7 @@ def test_post_room_initial_state(self) -> None: self.assertEqual(HTTPStatus.OK, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(37, channel.resource_usage.db_txn_count) + self.assertEqual(33, channel.resource_usage.db_txn_count) def test_post_room_visibility_key(self) -> None: # POST with visibility config key, expect new room id From 52781384c9359959960aeb90893c3913ea4f8b38 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 6 Oct 2022 17:34:34 -0700 Subject: [PATCH 06/15] bump max request body size in replication test cases to accomodate larger body size due to batched events --- tests/replication/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index ce53f808db9a..121f3d8d6517 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -371,7 +371,7 @@ def make_worker_hs( config=worker_hs.config.server.listeners[0], resource=resource, server_version_string="1", - max_request_body_size=4096, + max_request_body_size=8192, reactor=self.reactor, ) From d896cb3be3de26ed4fa755a7449c15fba77c7fb6 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 18 Oct 2022 11:10:14 -0700 Subject: [PATCH 07/15] newsfragment --- changelog.d/14228.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14228.misc diff --git a/changelog.d/14228.misc b/changelog.d/14228.misc new file mode 100644 index 000000000000..14fe31a8bce2 --- /dev/null +++ b/changelog.d/14228.misc @@ -0,0 +1 @@ +Add initial power level event to batch of bulk persisted events when creating a new room. From c1d60276477e1ac1ae251b7bf2b9c4c3e94e1aad Mon Sep 17 00:00:00 2001 From: Shay Date: Wed, 19 Oct 2022 13:00:29 -0700 Subject: [PATCH 08/15] Apply suggestions from code review Co-authored-by: Patrick Cloke --- synapse/push/bulk_push_rule_evaluator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 2f4a9280019f..f925d3d00d1f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -183,7 +183,7 @@ async def _get_power_levels_and_sender_level( # fastpath: if there's a power level event, that's all we need, and # not having a power level event is an extreme edge case if pl_event_id: - # check that the power level event is not in the batch before checking the DB + # Get the power level event from the batch, or fall back to the database. pl_event = event_id_to_event.get(pl_event_id) if pl_event: auth_events = {POWER_KEY: pl_event} @@ -194,8 +194,8 @@ async def _get_power_levels_and_sender_level( event, prev_state_ids, for_verification=False ) auth_events_dict = await self.store.get_events(auth_events_ids) - # check to see that there aren't any needed auth events in the batch as it - # hasn't been persisted yet + # Some needed auth events might be in the batch, combine them with those + # fetched from the database. for auth_event_id in auth_events_ids: auth_event = event_id_to_event.get(auth_event_id) if auth_event: From dc0889d241f72591690678627f02162c4ae37b8e Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 20 Oct 2022 10:23:58 -0700 Subject: [PATCH 09/15] add a function action_for_events_by_user and remove for loop from action_for_event_by_user --- synapse/push/bulk_push_rule_evaluator.py | 266 ++++++++++++----------- 1 file changed, 143 insertions(+), 123 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 2f4a9280019f..5762757348ae 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -170,6 +170,16 @@ async def _get_power_levels_and_sender_level( context: EventContext, event_id_to_event: Mapping[str, EventBase], ) -> Tuple[dict, Optional[int]]: + """ + Given an event and an event context, get the power level event relevant to the event + and the power level of the sender of the event. + Args: + event: event to check + context: context of event to check + event_id_to_event: a mapping of event_id to event for a set of events being + batch persisted. This is needed as the sought-after power level event may + be in this batch rather than the DB + """ # There are no power levels and sender levels possible to get from outlier if event.internal_metadata.is_outlier(): return {}, None @@ -208,141 +218,151 @@ async def _get_power_levels_and_sender_level( return pl_event.content if pl_event else {}, sender_level - @measure_func("action_for_event_by_user") - async def action_for_event_by_user( + @measure_func("action_for_events_by_user") + async def action_for_events_by_user( self, events_and_context: List[Tuple[EventBase, EventContext]] ) -> None: """Given a list of events and their associated contexts, evaluate the push rules for each event, check if the message should increment the unread count, and insert the results into the event_push_actions_staging table. """ + # For batched events the power level events may not have been persisted yet, + # so we pass in the batched events. Thus if the event cannot be found in the + # database we can check in the batch. + event_id_to_event = {e.event_id: e for e, _ in events_and_context} for event, context in events_and_context: - if not event.internal_metadata.is_notifiable(): - # Push rules for events that aren't notifiable can't be processed by this - return - # Skip push notification actions for historical messages - # because we don't want to notify people about old history back in time. - # The historical messages also do not have the proper `context.current_state_ids` - # and `state_groups` because they have `prev_events` that aren't persisted yet - # (historical messages persisted in reverse-chronological order). - if event.internal_metadata.is_historical(): - return - - # Disable counting as unread unless the experimental configuration is - # enabled, as it can cause additional (unwanted) rows to be added to the - # event_push_actions table. - count_as_unread = False - if self.hs.config.experimental.msc2654_enabled: - count_as_unread = _should_count_as_unread(event, context) - - rules_by_user = await self._get_rules_for_event(event) - actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {} - - room_member_count = await self.store.get_number_joined_users_in_room( - event.room_id - ) + await self._action_for_event_by_user(event, context, event_id_to_event) - # For batched events the power level events may not have been persisted yet, - # so we pass in the batched events. Thus if the event cannot be found in the - # database we can check in the batch. - event_id_to_event = {e.event_id: e for e, _ in events_and_context} - ( - power_levels, - sender_power_level, - ) = await self._get_power_levels_and_sender_level( - event, context, event_id_to_event - ) + async def _action_for_event_by_user( + self, + event: EventBase, + context: EventContext, + event_id_to_event: Mapping[str, EventBase], + ) -> None: - # Find the event's thread ID. - relation = relation_from_event(event) - # If the event does not have a relation, then it cannot have a thread ID. - thread_id = MAIN_TIMELINE - if relation: - # Recursively attempt to find the thread this event relates to. - if relation.rel_type == RelationTypes.THREAD: - thread_id = relation.parent_id - else: - # Since the event has not yet been persisted we check whether - # the parent is part of a thread. - thread_id = await self.store.get_thread_id(relation.parent_id) - - # It's possible that old room versions have non-integer power levels (floats or - # strings). Workaround this by explicitly converting to int. - notification_levels = power_levels.get("notifications", {}) - if not event.room_version.msc3667_int_only_power_levels: - for user_id, level in notification_levels.items(): - notification_levels[user_id] = int(level) - - evaluator = PushRuleEvaluator( - _flatten_dict(event), - room_member_count, - sender_power_level, - notification_levels, - ) + if not event.internal_metadata.is_notifiable(): + # Push rules for events that aren't notifiable can't be processed by this + return + + # Skip push notification actions for historical messages + # because we don't want to notify people about old history back in time. + # The historical messages also do not have the proper `context.current_state_ids` + # and `state_groups` because they have `prev_events` that aren't persisted yet + # (historical messages persisted in reverse-chronological order). + if event.internal_metadata.is_historical(): + return + + # Disable counting as unread unless the experimental configuration is + # enabled, as it can cause additional (unwanted) rows to be added to the + # event_push_actions table. + count_as_unread = False + if self.hs.config.experimental.msc2654_enabled: + count_as_unread = _should_count_as_unread(event, context) + + rules_by_user = await self._get_rules_for_event(event) + actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {} + + room_member_count = await self.store.get_number_joined_users_in_room( + event.room_id + ) - users = rules_by_user.keys() - profiles = await self.store.get_subset_users_in_room_with_profiles( - event.room_id, users - ) + ( + power_levels, + sender_power_level, + ) = await self._get_power_levels_and_sender_level( + event, context, event_id_to_event + ) - for uid, rules in rules_by_user.items(): - if event.sender == uid: - continue - - display_name = None - profile = profiles.get(uid) - if profile: - display_name = profile.display_name - - if not display_name: - # Handle the case where we are pushing a membership event to - # that user, as they might not be already joined. - if event.type == EventTypes.Member and event.state_key == uid: - display_name = event.content.get("displayname", None) - if not isinstance(display_name, str): - display_name = None - - if count_as_unread: - # Add an element for the current user if the event needs to be marked as - # unread, so that add_push_actions_to_staging iterates over it. - # If the event shouldn't be marked as unread but should notify the - # current user, it'll be added to the dict later. - actions_by_user[uid] = [] - - actions = evaluator.run(rules, uid, display_name) - if "notify" in actions: - # Push rules say we should notify the user of this event - actions_by_user[uid] = actions - - # If there aren't any actions then we can skip the rest of the - # processing. - if not actions_by_user: - return - - # This is a check for the case where user joins a room without being - # allowed to see history, and then the server receives a delayed event - # from before the user joined, which they should not be pushed for - # - # We do this *after* calculating the push actions as a) its unlikely - # that we'll filter anyone out and b) for large rooms its likely that - # most users will have push disabled and so the set of users to check is - # much smaller. - uids_with_visibility = await filter_event_for_clients_with_state( - self.store, actions_by_user.keys(), event, context - ) + # Find the event's thread ID. + relation = relation_from_event(event) + # If the event does not have a relation, then it cannot have a thread ID. + thread_id = MAIN_TIMELINE + if relation: + # Recursively attempt to find the thread this event relates to. + if relation.rel_type == RelationTypes.THREAD: + thread_id = relation.parent_id + else: + # Since the event has not yet been persisted we check whether + # the parent is part of a thread. + thread_id = await self.store.get_thread_id(relation.parent_id) + + # It's possible that old room versions have non-integer power levels (floats or + # strings). Workaround this by explicitly converting to int. + notification_levels = power_levels.get("notifications", {}) + if not event.room_version.msc3667_int_only_power_levels: + for user_id, level in notification_levels.items(): + notification_levels[user_id] = int(level) + + evaluator = PushRuleEvaluator( + _flatten_dict(event), + room_member_count, + sender_power_level, + notification_levels, + ) - for user_id in set(actions_by_user).difference(uids_with_visibility): - actions_by_user.pop(user_id, None) - - # Mark in the DB staging area the push actions for users who should be - # notified for this event. (This will then get handled when we persist - # the event) - await self.store.add_push_actions_to_staging( - event.event_id, - actions_by_user, - count_as_unread, - thread_id, - ) + users = rules_by_user.keys() + profiles = await self.store.get_subset_users_in_room_with_profiles( + event.room_id, users + ) + + for uid, rules in rules_by_user.items(): + if event.sender == uid: + continue + + display_name = None + profile = profiles.get(uid) + if profile: + display_name = profile.display_name + + if not display_name: + # Handle the case where we are pushing a membership event to + # that user, as they might not be already joined. + if event.type == EventTypes.Member and event.state_key == uid: + display_name = event.content.get("displayname", None) + if not isinstance(display_name, str): + display_name = None + + if count_as_unread: + # Add an element for the current user if the event needs to be marked as + # unread, so that add_push_actions_to_staging iterates over it. + # If the event shouldn't be marked as unread but should notify the + # current user, it'll be added to the dict later. + actions_by_user[uid] = [] + + actions = evaluator.run(rules, uid, display_name) + if "notify" in actions: + # Push rules say we should notify the user of this event + actions_by_user[uid] = actions + + # If there aren't any actions then we can skip the rest of the + # processing. + if not actions_by_user: + return + + # This is a check for the case where user joins a room without being + # allowed to see history, and then the server receives a delayed event + # from before the user joined, which they should not be pushed for + # + # We do this *after* calculating the push actions as a) its unlikely + # that we'll filter anyone out and b) for large rooms its likely that + # most users will have push disabled and so the set of users to check is + # much smaller. + uids_with_visibility = await filter_event_for_clients_with_state( + self.store, actions_by_user.keys(), event, context + ) + + for user_id in set(actions_by_user).difference(uids_with_visibility): + actions_by_user.pop(user_id, None) + + # Mark in the DB staging area the push actions for users who should be + # notified for this event. (This will then get handled when we persist + # the event) + await self.store.add_push_actions_to_staging( + event.event_id, + actions_by_user, + count_as_unread, + thread_id, + ) MemberMap = Dict[str, Optional[EventIdMembership]] From aaa95027ce4535a48e627ebfd651a1629e474665 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 20 Oct 2022 10:24:20 -0700 Subject: [PATCH 10/15] update callsites --- synapse/handlers/federation.py | 2 +- synapse/handlers/federation_event.py | 2 +- synapse/handlers/message.py | 2 +- tests/push/test_bulk_push_rule_evaluator.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4b4de5098dc0..4fbc79a6cb24 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1017,7 +1017,7 @@ async def on_invite_request( context = EventContext.for_outlier(self._storage_controllers) - await self._bulk_push_rule_evaluator.action_for_event_by_user( + await self._bulk_push_rule_evaluator.action_for_events_by_user( [(event, context)] ) try: diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index dcb81525a16f..7da6316a8237 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -2171,7 +2171,7 @@ async def _run_push_actions_and_persist_event( min_depth, ) else: - await self._bulk_push_rule_evaluator.action_for_event_by_user( + await self._bulk_push_rule_evaluator.action_for_events_by_user( [(event, context)] ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c4dae13240b5..b54ecf30aebe 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1433,7 +1433,7 @@ async def _persist_events( a room that has been un-partial stated. """ with opentracing.start_active_span("calculate_push_actions"): - await self._bulk_push_rule_evaluator.action_for_event_by_user( + await self._bulk_push_rule_evaluator.action_for_events_by_user( events_and_context ) diff --git a/tests/push/test_bulk_push_rule_evaluator.py b/tests/push/test_bulk_push_rule_evaluator.py index 4d8bee3fda8e..594e7937a8ac 100644 --- a/tests/push/test_bulk_push_rule_evaluator.py +++ b/tests/push/test_bulk_push_rule_evaluator.py @@ -71,4 +71,4 @@ def test_action_for_event_by_user_handles_noninteger_power_levels(self) -> None: bulk_evaluator = BulkPushRuleEvaluator(self.hs) # should not raise - self.get_success(bulk_evaluator.action_for_event_by_user([(event, context)])) + self.get_success(bulk_evaluator.action_for_events_by_user([(event, context)])) From 67176eb369dd6e5ffc9ecbcdb4bde43ca4be8a0b Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 20 Oct 2022 10:24:34 -0700 Subject: [PATCH 11/15] fix tests to reflect new reality --- tests/rest/client/test_rooms.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 9511c14431dd..71b1637be8fa 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -711,7 +711,7 @@ def test_post_room_no_keys(self) -> None: self.assertEqual(HTTPStatus.OK, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(31, channel.resource_usage.db_txn_count) + self.assertEqual(34, channel.resource_usage.db_txn_count) def test_post_room_initial_state(self) -> None: # POST with initial_state config key, expect new room id @@ -724,7 +724,7 @@ def test_post_room_initial_state(self) -> None: self.assertEqual(HTTPStatus.OK, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(33, channel.resource_usage.db_txn_count) + self.assertEqual(37, channel.resource_usage.db_txn_count) def test_post_room_visibility_key(self) -> None: # POST with visibility config key, expect new room id From ae4911f8c5de4ebc9a82a0ae436c75107226a4e4 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 20 Oct 2022 11:20:58 -0700 Subject: [PATCH 12/15] consolidate guard clause --- synapse/push/bulk_push_rule_evaluator.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index f8a8c82ae4a2..fa9c2420fa02 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -240,16 +240,16 @@ async def _action_for_event_by_user( event_id_to_event: Mapping[str, EventBase], ) -> None: - if not event.internal_metadata.is_notifiable(): - # Push rules for events that aren't notifiable can't be processed by this - return - - # Skip push notification actions for historical messages - # because we don't want to notify people about old history back in time. - # The historical messages also do not have the proper `context.current_state_ids` - # and `state_groups` because they have `prev_events` that aren't persisted yet - # (historical messages persisted in reverse-chronological order). - if event.internal_metadata.is_historical(): + if ( + not event.internal_metadata.is_notifiable() + or event.internal_metadata.is_historical() + ): + # Push rules for events that aren't notifiable can't be processed by this and + # we want to skip push notification actions for historical messages + # because we don't want to notify people about old history back in time. + # The historical messages also do not have the proper `context.current_state_ids` + # and `state_groups` because they have `prev_events` that aren't persisted yet + # (historical messages persisted in reverse-chronological order). return # Disable counting as unread unless the experimental configuration is From b28d5d56dcc74e21fd4c579e0828d83d7e4c02d1 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 20 Oct 2022 11:42:19 -0700 Subject: [PATCH 13/15] tracing --- synapse/handlers/message.py | 8 ++++---- synapse/push/bulk_push_rule_evaluator.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b54ecf30aebe..468900a07f22 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1432,10 +1432,10 @@ async def _persist_events( PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ - with opentracing.start_active_span("calculate_push_actions"): - await self._bulk_push_rule_evaluator.action_for_events_by_user( - events_and_context - ) + + await self._bulk_push_rule_evaluator.action_for_events_by_user( + events_and_context + ) try: # If we're a worker we need to hit out to the master. diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index fa9c2420fa02..d7795a9080a2 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -218,7 +218,6 @@ async def _get_power_levels_and_sender_level( return pl_event.content if pl_event else {}, sender_level - @measure_func("action_for_events_by_user") async def action_for_events_by_user( self, events_and_context: List[Tuple[EventBase, EventContext]] ) -> None: @@ -233,6 +232,7 @@ async def action_for_events_by_user( for event, context in events_and_context: await self._action_for_event_by_user(event, context, event_id_to_event) + @measure_func("action_for_event_by_user") async def _action_for_event_by_user( self, event: EventBase, From 0b36b9e9f0cb123aac54fc6b863827d2466382a1 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 20 Oct 2022 13:04:58 -0700 Subject: [PATCH 14/15] inline function functionality for simplicity --- synapse/handlers/room.py | 28 +++++++--------------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a61685dfa97f..a3b87a5d08d7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1102,26 +1102,6 @@ async def create_event( return new_event, new_context - async def send( - event: EventBase, - context: synapse.events.snapshot.EventContext, - creator: Requester, - ) -> int: - nonlocal last_sent_event_id - - ev = await self.event_creation_handler.handle_new_client_event( - requester=creator, - events_and_context=[(event, context)], - ratelimit=False, - ignore_shadow_ban=True, - ) - - last_sent_event_id = ev.event_id - - # we know it was persisted, so must have a stream ordering - assert ev.internal_metadata.stream_ordering - return ev.internal_metadata.stream_ordering - try: config = self._presets_dict[preset_config] except KeyError: @@ -1135,7 +1115,13 @@ async def send( ) logger.debug("Sending %s in new room", EventTypes.Member) - await send(creation_event, creation_context, creator) + ev = await self.event_creation_handler.handle_new_client_event( + requester=creator, + events_and_context=[(creation_event, creation_context)], + ratelimit=False, + ignore_shadow_ban=True, + ) + last_sent_event_id = ev.event_id # Room create event must exist at this point assert last_sent_event_id is not None From 74fe030bfa46fdd062d7a4bd530b4012c526c7dc Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Fri, 21 Oct 2022 09:37:10 -0700 Subject: [PATCH 15/15] fix last_sent_event_id --- synapse/handlers/room.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a3b87a5d08d7..cc1e5c8f9704 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1055,9 +1055,6 @@ async def _send_events_for_new_room( event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""} depth = 1 - # the last event sent/persisted to the db - last_sent_event_id: Optional[str] = None - # the most recently created event prev_event: List[str] = [] # a map of event types, state keys -> event_ids. We collect these mappings this as events are @@ -1123,8 +1120,6 @@ async def create_event( ) last_sent_event_id = ev.event_id - # Room create event must exist at this point - assert last_sent_event_id is not None member_event_id, _ = await self.room_member_handler.update_membership( creator, creator.user,