Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor backfilled into specific behavior function arguments pt.1 #11396

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11396.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `backfilled` into specific behavior function arguments (`persist_events_and_notify` and downstream calls).
63 changes: 56 additions & 7 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1809,7 +1809,24 @@ async def _run_push_actions_and_persist_event(

try:
await self.persist_events_and_notify(
event.room_id, [(event, context)], backfilled=backfilled
event.room_id,
[(event, context)],
# We should not send notifications about backfilled events.
inhibit_push_notifications=backfilled,
# We don't need to calculate the state for backfilled events and
# there is no need to update the forward extrems because we
# already know this event happened in the past if it was
# backfilled.
should_calculate_state_and_forward_extrems=not backfilled,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the should_ prefix seems a bit redundant here.

# Backfilled events get a negative stream ordering so they don't
# come down incremental `/sync`
use_negative_stream_ordering=backfilled,
# Backfilled events do not affect the current local state
inhibit_local_membership_updates=backfilled,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be sensible to have this implied by should_calculate_state_and_forward_extrems ?

Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the mental leap where updating the state in the room also encompasses member state events.

My first impression was that it wasn't very obvious but it makes sense ⏩

# Backfilled events have negative stream_ordering and happened
# in the past so we know that we don't need to update the
# stream_ordering tip for the room.
update_room_forward_stream_ordering=not backfilled,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a future PR, I'll extend the behavior arguments further upstream in place of backfilled.

This PR does not replace all backfilled instances. Just persist_events_and_notify and all downstream calls

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't update_room_forward_stream_ordering be implied by use_negative_stream_ordering ?

)
except Exception:
run_in_background(
Expand All @@ -1821,7 +1838,12 @@ async def persist_events_and_notify(
self,
room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
*,
inhibit_push_notifications: bool = False,
should_calculate_state_and_forward_extrems: bool = True,
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
update_room_forward_stream_ordering: bool = True,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richvdh How does this look? Is this on the right track?

) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.
Expand All @@ -1831,8 +1853,31 @@ async def persist_events_and_notify(
event_and_contexts: Sequence of events with their associated
context that should be persisted. All events must belong to
the same room.
backfilled: Whether these events are a result of
backfilling or not
inhibit_push_notifications: Whether to stop the notifiers/pushers
from knowing about the event. This should be set as True
for backfilled events because there is no need to send push
notifications for events in the past.
should_calculate_state_and_forward_extrems: Determines whether we
need to calculate the state and new forward extremities for the
room. This should be set to false for backfilled events because
we don't need to calculate the state for backfilled events and
there is no need to update the forward extrems because we
already know this event happened in the past if it was
backfilled.
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
update_room_forward_stream_ordering: Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should be set as False for backfilled
events because backfilled events have negative stream_ordering
and happened in the past so we know that we don't need to
update the stream_ordering tip for the room.

Returns:
The stream ID after which all events have been persisted.
Expand All @@ -1850,7 +1895,7 @@ async def persist_events_and_notify(
store=self._store,
room_id=room_id,
event_and_contexts=batch,
backfilled=backfilled,
inhibit_push_notifications=inhibit_push_notifications,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this need to pass through all the keyword args?

Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch 👍 I think so. Should match _serialize_payload.

It's a loose connection and I was having trouble typing it correctly, #11396 (comment), to have the linter yell at me for this.

)
return result["max_stream_id"]
else:
Expand All @@ -1859,15 +1904,19 @@ async def persist_events_and_notify(
# Note that this returns the events that were persisted, which may not be
# the same as were passed in if some were deduplicated due to transaction IDs.
events, max_stream_token = await self._storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
event_and_contexts,
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
use_negative_stream_ordering=use_negative_stream_ordering,
inhibit_local_membership_updates=inhibit_local_membership_updates,
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
)

if self._ephemeral_messages_enabled:
for event in events:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)

if not backfilled: # Never notify for backfilled events
if not inhibit_push_notifications: # Never notify for backfilled events
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavioral argument replacement: inhibit_push_notifications

for event in events:
await self._notify_persisted_event(event, max_stream_token)

Expand Down
18 changes: 11 additions & 7 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1565,20 +1565,24 @@ async def persist_and_notify_client_event(
errcode=Codes.INVALID_PARAM,
)

# Mark any `m.historical` messages as backfilled so they don't appear
# in `/sync` and have the proper decrementing `stream_ordering` as we import
backfilled = False
if event.internal_metadata.is_historical():
backfilled = True

# Note that this returns the event that was persisted, which may not be
# the same as we passed in if it was deduplicated due transaction IDs.
(
event,
event_pos,
max_stream_token,
) = await self.storage.persistence.persist_event(
event, context=context, backfilled=backfilled
event,
context=context,
# Make any historical messages behave like backfilled events
should_calculate_state_and_forward_extrems=not event.internal_metadata.is_historical(),
# We use a negative `stream_ordering`` for historical messages so
# they don't come down an incremental `/sync` and have the proper
# decrementing `stream_ordering` as we import so they sort
# as expected between two depths.
use_negative_stream_ordering=event.internal_metadata.is_historical(),
inhibit_local_membership_updates=event.internal_metadata.is_historical(),
update_room_forward_stream_ordering=not event.internal_metadata.is_historical(),
)

if self._ephemeral_events_enabled:
Expand Down
72 changes: 65 additions & 7 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
}],
"backfilled": false
"inhibit_push_notifications": false
}

200 OK
Expand All @@ -69,14 +69,46 @@ def __init__(self, hs: "HomeServer"):
self.federation_event_handler = hs.get_federation_event_handler()

@staticmethod
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
async def _serialize_payload(
store,
room_id,
event_and_contexts,
inhibit_push_notifications,
should_calculate_state_and_forward_extrems,
use_negative_stream_ordering,
inhibit_local_membership_updates,
update_room_forward_stream_ordering,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we can define the arguments here like the following? I can't get it to work even without the keyword-only marker.

async def _serialize_payload(
    store,
    room_id,
    event_and_contexts,
    *,
    inhibit_push_notifications: bool = False,
    should_calculate_state_and_forward_extrems: bool = True,
    use_negative_stream_ordering: bool = False,
    inhibit_local_membership_updates: bool = False,
    update_room_forward_stream_ordering: bool = True,
):

When I do this, I get the following error:

synapse/replication/http/federation.py:72: error: Signature of "_serialize_payload" incompatible with supertype "ReplicationEndpoint"  [override]

And I can't figure out how to adjust the abstract method to be more flexible,

@abc.abstractmethod
async def _serialize_payload(**kwargs):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure why, but it's the :bool type annotations that are upsetting it rather than the * or the default values. If I remove the :bools, it's fine.

):
"""
Args:
store
room_id (str)
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
inhibit_push_notifications: Whether to stop the notifiers/pushers
from knowing about the event. This should be set as True
for backfilled events because there is no need to send push
notifications for events in the past.
should_calculate_state_and_forward_extrems: Determines whether we
need to calculate the state and new forward extremities for the
room. This should be set to false for backfilled events because
we don't need to calculate the state for backfilled events and
there is no need to update the forward extrems because we
already know this event happened in the past if it was
backfilled.
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
update_room_forward_stream_ordering: Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should be set as False for backfilled
events because backfilled events have negative stream_ordering
and happened in the past so we know that we don't need to
update the stream_ordering tip for the room.
"""
event_payloads = []
for event, context in event_and_contexts:
Expand All @@ -96,7 +128,11 @@ async def _serialize_payload(store, room_id, event_and_contexts, backfilled):

payload = {
"events": event_payloads,
"backfilled": backfilled,
"inhibit_push_notifications": inhibit_push_notifications,
"should_calculate_state_and_forward_extrems": should_calculate_state_and_forward_extrems,
"use_negative_stream_ordering": use_negative_stream_ordering,
"inhibit_local_membership_updates": inhibit_local_membership_updates,
"update_room_forward_stream_ordering": update_room_forward_stream_ordering,
"room_id": room_id,
}

Expand All @@ -107,7 +143,23 @@ async def _handle_request(self, request):
content = parse_json_object_from_request(request)

room_id = content["room_id"]
backfilled = content["backfilled"]
inhibit_push_notifications = content["inhibit_push_notifications"]
should_calculate_state_and_forward_extrems = content[
"should_calculate_state_and_forward_extrems"
]
use_negative_stream_ordering = content["use_negative_stream_ordering"]
inhibit_local_membership_updates = content[
"inhibit_local_membership_updates"
]
update_room_forward_stream_ordering = content[
"update_room_forward_stream_ordering"
]

assert inhibit_push_notifications is not None
assert should_calculate_state_and_forward_extrems is not None
assert use_negative_stream_ordering is not None
assert inhibit_local_membership_updates is not None
assert update_room_forward_stream_ordering is not None

event_payloads = content["events"]

Expand All @@ -132,7 +184,13 @@ async def _handle_request(self, request):
logger.info("Got %d events from federation", len(event_and_contexts))

max_stream_id = await self.federation_event_handler.persist_events_and_notify(
room_id, event_and_contexts, backfilled
room_id,
event_and_contexts,
inhibit_push_notifications=inhibit_push_notifications,
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
use_negative_stream_ordering=use_negative_stream_ordering,
inhibit_local_membership_updates=inhibit_local_membership_updates,
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
)

return 200, {"max_stream_id": max_stream_id}
Expand Down
Loading