Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add initial power level event to batch of bulk persisted events when creating a new room. #14228

Merged
merged 16 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14228.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add initial power level event to batch of bulk persisted events when creating a new room.
4 changes: 3 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,9 @@ async def on_invite_request(

context = EventContext.for_outlier(self._storage_controllers)

await self._bulk_push_rule_evaluator.action_for_event_by_user(event, context)
await self._bulk_push_rule_evaluator.action_for_event_by_user(
[(event, context)]
)
try:
await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)]
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2172,7 +2172,7 @@ async def _run_push_actions_and_persist_event(
)
else:
await self._bulk_push_rule_evaluator.action_for_event_by_user(
event, context
[(event, context)]
)

try:
Expand Down
16 changes: 4 additions & 12 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1432,18 +1432,10 @@ async def _persist_events(
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""

for event, context in events_and_context:
# Skip push notification actions for historical messages
# because we don't want to notify people about old history back in time.
# The historical messages also do not have the proper `context.current_state_ids`
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
if not event.internal_metadata.is_historical():
with opentracing.start_active_span("calculate_push_actions"):
await self._bulk_push_rule_evaluator.action_for_event_by_user(
event, context
)
with opentracing.start_active_span("calculate_push_actions"):
clokep marked this conversation as resolved.
Show resolved Hide resolved
await self._bulk_push_rule_evaluator.action_for_event_by_user(
events_and_context
)

try:
# If we're a worker we need to hit out to the master.
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,7 @@ async def send(
depth += 1
state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id

events_to_send = []
# We treat the power levels override specially as this needs to be one
# of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
Expand All @@ -1165,7 +1166,7 @@ async def send(
EventTypes.PowerLevels, pl_content, False
)
current_state_group = power_context._state_group
await send(power_event, power_context, creator)
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
events_to_send.append((power_event, power_context))
else:
power_level_content: JsonDict = {
"users": {creator_id: 100},
Expand Down Expand Up @@ -1214,9 +1215,8 @@ async def send(
False,
)
current_state_group = pl_context._state_group
await send(pl_event, pl_context, creator)
events_to_send.append((pl_event, pl_context))

events_to_send = []
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
room_alias_event, room_alias_context = await create_event(
EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
Expand Down
262 changes: 145 additions & 117 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ async def _get_rules_for_event(
return rules_by_user

async def _get_power_levels_and_sender_level(
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
self, event: EventBase, context: EventContext
self,
event: EventBase,
context: EventContext,
event_id_to_event: Mapping[str, EventBase],
) -> Tuple[dict, Optional[int]]:
# There are no power levels and sender levels possible to get from outlier
if event.internal_metadata.is_outlier():
Expand All @@ -177,15 +180,26 @@ async def _get_power_levels_and_sender_level(
)
pl_event_id = prev_state_ids.get(POWER_KEY)

# fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case
if pl_event_id:
# fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case
auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)}
# check that the power level event is not in the batch before checking the DB
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
pl_event = event_id_to_event.get(pl_event_id)
if pl_event:
auth_events = {POWER_KEY: pl_event}
else:
auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)}
else:
auth_events_ids = self._event_auth_handler.compute_auth_events(
event, prev_state_ids, for_verification=False
)
auth_events_dict = await self.store.get_events(auth_events_ids)
# check to see that there aren't any needed auth events in the batch as it
# hasn't been persisted yet
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
for auth_event_id in auth_events_ids:
auth_event = event_id_to_event.get(auth_event_id)
if auth_event:
auth_events_dict[auth_event_id] = auth_event
auth_events = {(e.type, e.state_key): e for e in auth_events_dict.values()}

sender_level = get_user_power_level(event.sender, auth_events)
Expand All @@ -196,125 +210,139 @@ async def _get_power_levels_and_sender_level(

@measure_func("action_for_event_by_user")
async def action_for_event_by_user(
self, event: EventBase, context: EventContext
self, events_and_context: List[Tuple[EventBase, EventContext]]
) -> None:
"""Given an event and context, evaluate the push rules, check if the message
should increment the unread count, and insert the results into the
event_push_actions_staging table.
"""Given a list of events and their associated contexts, evaluate the push rules
for each event, check if the message should increment the unread count, and
insert the results into the event_push_actions_staging table.
"""
if not event.internal_metadata.is_notifiable():
# Push rules for events that aren't notifiable can't be processed by this
return

# Disable counting as unread unless the experimental configuration is
# enabled, as it can cause additional (unwanted) rows to be added to the
# event_push_actions table.
count_as_unread = False
if self.hs.config.experimental.msc2654_enabled:
count_as_unread = _should_count_as_unread(event, context)

rules_by_user = await self._get_rules_for_event(event)
actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {}

room_member_count = await self.store.get_number_joined_users_in_room(
event.room_id
)
for event, context in events_and_context:
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
if not event.internal_metadata.is_notifiable():
# Push rules for events that aren't notifiable can't be processed by this
return
# Skip push notification actions for historical messages
# because we don't want to notify people about old history back in time.
# The historical messages also do not have the proper `context.current_state_ids`
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
if event.internal_metadata.is_historical():
return
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

# Disable counting as unread unless the experimental configuration is
# enabled, as it can cause additional (unwanted) rows to be added to the
# event_push_actions table.
count_as_unread = False
if self.hs.config.experimental.msc2654_enabled:
count_as_unread = _should_count_as_unread(event, context)

rules_by_user = await self._get_rules_for_event(event)
actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {}

room_member_count = await self.store.get_number_joined_users_in_room(
event.room_id
)

(
power_levels,
sender_power_level,
) = await self._get_power_levels_and_sender_level(event, context)

# Find the event's thread ID.
relation = relation_from_event(event)
# If the event does not have a relation, then it cannot have a thread ID.
thread_id = MAIN_TIMELINE
if relation:
# Recursively attempt to find the thread this event relates to.
if relation.rel_type == RelationTypes.THREAD:
thread_id = relation.parent_id
else:
# Since the event has not yet been persisted we check whether
# the parent is part of a thread.
thread_id = await self.store.get_thread_id(relation.parent_id)

# It's possible that old room versions have non-integer power levels (floats or
# strings). Workaround this by explicitly converting to int.
notification_levels = power_levels.get("notifications", {})
if not event.room_version.msc3667_int_only_power_levels:
for user_id, level in notification_levels.items():
notification_levels[user_id] = int(level)

evaluator = PushRuleEvaluator(
_flatten_dict(event),
room_member_count,
sender_power_level,
notification_levels,
)
# For batched events the power level events may not have been persisted yet,
# so we pass in the batched events. Thus if the event cannot be found in the
# database we can check in the batch.
event_id_to_event = {e.event_id: e for e, _ in events_and_context}
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
(
power_levels,
sender_power_level,
) = await self._get_power_levels_and_sender_level(
event, context, event_id_to_event
)

users = rules_by_user.keys()
profiles = await self.store.get_subset_users_in_room_with_profiles(
event.room_id, users
)
# Find the event's thread ID.
relation = relation_from_event(event)
# If the event does not have a relation, then it cannot have a thread ID.
thread_id = MAIN_TIMELINE
if relation:
# Recursively attempt to find the thread this event relates to.
if relation.rel_type == RelationTypes.THREAD:
thread_id = relation.parent_id
else:
# Since the event has not yet been persisted we check whether
# the parent is part of a thread.
thread_id = await self.store.get_thread_id(relation.parent_id)

# It's possible that old room versions have non-integer power levels (floats or
# strings). Workaround this by explicitly converting to int.
notification_levels = power_levels.get("notifications", {})
if not event.room_version.msc3667_int_only_power_levels:
for user_id, level in notification_levels.items():
notification_levels[user_id] = int(level)

evaluator = PushRuleEvaluator(
_flatten_dict(event),
room_member_count,
sender_power_level,
notification_levels,
)

for uid, rules in rules_by_user.items():
if event.sender == uid:
continue

display_name = None
profile = profiles.get(uid)
if profile:
display_name = profile.display_name

if not display_name:
# Handle the case where we are pushing a membership event to
# that user, as they might not be already joined.
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)
if not isinstance(display_name, str):
display_name = None

if count_as_unread:
# Add an element for the current user if the event needs to be marked as
# unread, so that add_push_actions_to_staging iterates over it.
# If the event shouldn't be marked as unread but should notify the
# current user, it'll be added to the dict later.
actions_by_user[uid] = []

actions = evaluator.run(rules, uid, display_name)
if "notify" in actions:
# Push rules say we should notify the user of this event
actions_by_user[uid] = actions

# If there aren't any actions then we can skip the rest of the
# processing.
if not actions_by_user:
return

# This is a check for the case where user joins a room without being
# allowed to see history, and then the server receives a delayed event
# from before the user joined, which they should not be pushed for
#
# We do this *after* calculating the push actions as a) its unlikely
# that we'll filter anyone out and b) for large rooms its likely that
# most users will have push disabled and so the set of users to check is
# much smaller.
uids_with_visibility = await filter_event_for_clients_with_state(
self.store, actions_by_user.keys(), event, context
)
users = rules_by_user.keys()
profiles = await self.store.get_subset_users_in_room_with_profiles(
event.room_id, users
)

for user_id in set(actions_by_user).difference(uids_with_visibility):
actions_by_user.pop(user_id, None)

# Mark in the DB staging area the push actions for users who should be
# notified for this event. (This will then get handled when we persist
# the event)
await self.store.add_push_actions_to_staging(
event.event_id,
actions_by_user,
count_as_unread,
thread_id,
)
for uid, rules in rules_by_user.items():
if event.sender == uid:
continue

display_name = None
profile = profiles.get(uid)
if profile:
display_name = profile.display_name

if not display_name:
# Handle the case where we are pushing a membership event to
# that user, as they might not be already joined.
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)
if not isinstance(display_name, str):
display_name = None

if count_as_unread:
# Add an element for the current user if the event needs to be marked as
# unread, so that add_push_actions_to_staging iterates over it.
# If the event shouldn't be marked as unread but should notify the
# current user, it'll be added to the dict later.
actions_by_user[uid] = []

actions = evaluator.run(rules, uid, display_name)
if "notify" in actions:
# Push rules say we should notify the user of this event
actions_by_user[uid] = actions

# If there aren't any actions then we can skip the rest of the
# processing.
if not actions_by_user:
return

# This is a check for the case where user joins a room without being
# allowed to see history, and then the server receives a delayed event
# from before the user joined, which they should not be pushed for
#
# We do this *after* calculating the push actions as a) its unlikely
# that we'll filter anyone out and b) for large rooms its likely that
# most users will have push disabled and so the set of users to check is
# much smaller.
uids_with_visibility = await filter_event_for_clients_with_state(
self.store, actions_by_user.keys(), event, context
)

for user_id in set(actions_by_user).difference(uids_with_visibility):
actions_by_user.pop(user_id, None)

# Mark in the DB staging area the push actions for users who should be
# notified for this event. (This will then get handled when we persist
# the event)
await self.store.add_push_actions_to_staging(
event.event_id,
actions_by_user,
count_as_unread,
thread_id,
)


MemberMap = Dict[str, Optional[EventIdMembership]]
Expand Down
2 changes: 1 addition & 1 deletion tests/push/test_bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ def test_action_for_event_by_user_handles_noninteger_power_levels(self) -> None:

bulk_evaluator = BulkPushRuleEvaluator(self.hs)
# should not raise
self.get_success(bulk_evaluator.action_for_event_by_user(event, context))
self.get_success(bulk_evaluator.action_for_event_by_user([(event, context)]))
2 changes: 1 addition & 1 deletion tests/replication/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def make_worker_hs(
config=worker_hs.config.server.listeners[0],
resource=resource,
server_version_string="1",
max_request_body_size=4096,
max_request_body_size=8192,
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
reactor=self.reactor,
)

Expand Down
Loading