-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Avoid checking the event cache when backfilling events #14164
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable; agreed that getting them in one call to the DB should be more efficient.
synapse/handlers/federation_event.py
Outdated
if event_id in existing_events_map: | ||
existing_event = existing_events_map[event_id] | ||
|
||
# ...and the event itself was not previously stored as an outlier... | ||
if not existing_event.event.internal_metadata.is_outlier(): | ||
# ...then there's no need to persist it. We have it already. | ||
logger.info( | ||
"_process_pulled_event: Ignoring received event %s which we " | ||
"have already seen", | ||
event.event_id, | ||
) | ||
|
||
# While we have seen this event before, it was stored as an outlier. | ||
# We'll now persist it as a non-outlier. | ||
logger.info("De-outliering event %s", event_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This control flow is only doing logging! I think you wanted a continue
in the inner if block, right? :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Maybe this indicates that a test is needed...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serves me right for not manually testing again before putting this up!
I'll write up a unit test for this function, but otherwise I find this behaviour a little hard to test...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A complement test seems like it would fit #14161 (comment) well
If you're looking for a test that works with _process_pulled_events
, I wrote one in #13864, see (still a bit messy in scratch state)
synapse/tests/handlers/test_federation_event.py
Lines 872 to 1320 in 3f8fef2
def test_process_pulled_events_asdf(self) -> None: | |
main_store = self.hs.get_datastores().main | |
state_storage_controller = self.hs.get_storage_controllers().state | |
def _debug_event_string(event: EventBase) -> str: | |
debug_body = event.content.get("body", event.type) | |
maybe_state_key = getattr(event, "state_key", None) | |
return f"event_id={event.event_id},depth={event.depth},body={debug_body}({maybe_state_key}),prevs={event.prev_event_ids()}" | |
known_event_dict: Dict[str, Tuple[EventBase, List[EventBase]]] = {} | |
def _add_to_known_event_list( | |
event: EventBase, state_events: Optional[List[EventBase]] = None | |
) -> None: | |
if state_events is None: | |
state_map = self.get_success( | |
state_storage_controller.get_state_for_event(event.event_id) | |
) | |
state_events = list(state_map.values()) | |
known_event_dict[event.event_id] = (event, state_events) | |
async def get_room_state_ids( | |
destination: str, room_id: str, event_id: str | |
) -> JsonDict: | |
self.assertEqual(destination, self.OTHER_SERVER_NAME) | |
known_event_info = known_event_dict.get(event_id) | |
if known_event_info is None: | |
self.fail( | |
f"stubbed get_room_state_ids: Event ({event_id}) not part of our known events list" | |
) | |
known_event, known_event_state_list = known_event_info | |
logger.info( | |
"stubbed get_room_state_ids: destination=%s event_id=%s auth_event_ids=%s", | |
destination, | |
event_id, | |
known_event.auth_event_ids(), | |
) | |
# self.assertEqual(event_id, missing_event.event_id) | |
return { | |
"pdu_ids": [ | |
state_event.event_id for state_event in known_event_state_list | |
], | |
"auth_chain_ids": known_event.auth_event_ids(), | |
} | |
async def get_room_state( | |
room_version: RoomVersion, destination: str, room_id: str, event_id: str | |
) -> StateRequestResponse: | |
self.assertEqual(destination, self.OTHER_SERVER_NAME) | |
known_event_info = known_event_dict.get(event_id) | |
if known_event_info is None: | |
self.fail( | |
f"stubbed get_room_state: Event ({event_id}) not part of our known events list" | |
) | |
known_event, known_event_state_list = known_event_info | |
logger.info( | |
"stubbed get_room_state: destination=%s event_id=%s auth_event_ids=%s", | |
destination, | |
event_id, | |
known_event.auth_event_ids(), | |
) | |
auth_event_ids = known_event.auth_event_ids() | |
auth_events = [] | |
for auth_event_id in auth_event_ids: | |
known_event_info = known_event_dict.get(event_id) | |
if known_event_info is None: | |
self.fail( | |
f"stubbed get_room_state: Auth event ({auth_event_id}) is not part of our known events list" | |
) | |
known_auth_event, _ = known_event_info | |
auth_events.append(known_auth_event) | |
return StateRequestResponse( | |
state=known_event_state_list, | |
auth_events=auth_events, | |
) | |
async def get_event(destination: str, event_id: str, timeout=None): | |
self.assertEqual(destination, self.OTHER_SERVER_NAME) | |
known_event_info = known_event_dict.get(event_id) | |
if known_event_info is None: | |
self.fail( | |
f"stubbed get_event: Event ({event_id}) not part of our known events list" | |
) | |
known_event, _ = known_event_info | |
return {"pdus": [known_event.get_pdu_json()]} | |
self.mock_federation_transport_client.get_room_state_ids.side_effect = ( | |
get_room_state_ids | |
) | |
self.mock_federation_transport_client.get_room_state.side_effect = ( | |
get_room_state | |
) | |
self.mock_federation_transport_client.get_event.side_effect = get_event | |
# create the room | |
room_creator = self.appservice.sender | |
room_id = self.helper.create_room_as( | |
room_creator=self.appservice.sender, tok=self.appservice.token | |
) | |
room_version = self.get_success(main_store.get_room_version(room_id)) | |
event_before = self.get_success( | |
inject_event( | |
self.hs, | |
room_id=room_id, | |
sender=room_creator, | |
type=EventTypes.Message, | |
content={"body": "eventBefore0", "msgtype": "m.text"}, | |
) | |
) | |
_add_to_known_event_list(event_before) | |
event_after = self.get_success( | |
inject_event( | |
self.hs, | |
room_id=room_id, | |
sender=room_creator, | |
type=EventTypes.Message, | |
content={"body": "eventAfter0", "msgtype": "m.text"}, | |
) | |
) | |
_add_to_known_event_list(event_after) | |
state_map = self.get_success( | |
state_storage_controller.get_state_for_event(event_before.event_id) | |
) | |
room_create_event = state_map.get((EventTypes.Create, "")) | |
pl_event = state_map.get((EventTypes.PowerLevels, "")) | |
as_membership_event = state_map.get((EventTypes.Member, room_creator)) | |
assert room_create_event is not None | |
assert pl_event is not None | |
assert as_membership_event is not None | |
for state_event in state_map.values(): | |
_add_to_known_event_list(state_event) | |
# This should be the successor of the event we want to insert next to | |
# (the successor of event_before is event_after). | |
inherited_depth = event_after.depth | |
historical_base_auth_event_ids = [ | |
room_create_event.event_id, | |
pl_event.event_id, | |
] | |
historical_state_events = list(state_map.values()) | |
historical_state_event_ids = [ | |
state_event.event_id for state_event in historical_state_events | |
] | |
maria_mxid = "@maria:test" | |
maria_membership_event, _ = self.get_success( | |
create_event( | |
self.hs, | |
room_id=room_id, | |
sender=maria_mxid, | |
state_key=maria_mxid, | |
type=EventTypes.Member, | |
content={ | |
"membership": "join", | |
}, | |
# It all works when I add a prev_event for the floating | |
# insertion event but the event no longer floats. | |
# It's able to resolve state at the prev_events though. | |
prev_event_ids=[event_before.event_id], | |
# allow_no_prev_events=True, | |
# prev_event_ids=[], | |
# auth_event_ids=historical_base_auth_event_ids, | |
# | |
# Because we're creating all of these events without persisting them yet, | |
# we have to explicitly provide some auth_events. For member events, we do it this way. | |
state_event_ids=historical_state_event_ids, | |
depth=inherited_depth, | |
) | |
) | |
_add_to_known_event_list(maria_membership_event, historical_state_events) | |
historical_state_events.append(maria_membership_event) | |
historical_state_event_ids.append(maria_membership_event.event_id) | |
batch_id = random_string(8) | |
next_batch_id = random_string(8) | |
insertion_event, _ = self.get_success( | |
create_event( | |
self.hs, | |
room_id=room_id, | |
sender=room_creator, | |
type=EventTypes.MSC2716_INSERTION, | |
content={ | |
EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id, | |
EventContentFields.MSC2716_HISTORICAL: True, | |
}, | |
# The difference from the actual room /batch_send is that this is normally | |
# floating as well. But seems to work once we connect it to the | |
# floating historical state chain. | |
prev_event_ids=[maria_membership_event.event_id], | |
# allow_no_prev_events=True, | |
# prev_event_ids=[], | |
# Because we're creating all of these events without persisting them yet, | |
# we have to explicitly provide some auth_events | |
auth_event_ids=[ | |
*historical_base_auth_event_ids, | |
as_membership_event.event_id, | |
], | |
# state_event_ids=historical_state_event_ids, | |
depth=inherited_depth, | |
) | |
) | |
_add_to_known_event_list(insertion_event, historical_state_events) | |
historical_message_event, _ = self.get_success( | |
create_event( | |
self.hs, | |
room_id=room_id, | |
sender=maria_mxid, | |
type=EventTypes.Message, | |
content={"body": "Historical message", "msgtype": "m.text"}, | |
prev_event_ids=[insertion_event.event_id], | |
# Because we're creating all of these events without persisting them yet, | |
# we have to explicitly provide some auth_events | |
auth_event_ids=[ | |
*historical_base_auth_event_ids, | |
maria_membership_event.event_id, | |
], | |
depth=inherited_depth, | |
) | |
) | |
_add_to_known_event_list(historical_message_event, historical_state_events) | |
batch_event, _ = self.get_success( | |
create_event( | |
self.hs, | |
room_id=room_id, | |
sender=room_creator, | |
type=EventTypes.MSC2716_BATCH, | |
content={ | |
EventContentFields.MSC2716_BATCH_ID: batch_id, | |
EventContentFields.MSC2716_HISTORICAL: True, | |
}, | |
prev_event_ids=[historical_message_event.event_id], | |
# Because we're creating all of these events without persisting them yet, | |
# we have to explicitly provide some auth_events | |
auth_event_ids=[ | |
*historical_base_auth_event_ids, | |
as_membership_event.event_id, | |
], | |
depth=inherited_depth, | |
) | |
) | |
_add_to_known_event_list(batch_event, historical_state_events) | |
base_insertion_event, base_insertion_event_context = self.get_success( | |
create_event( | |
self.hs, | |
room_id=room_id, | |
sender=room_creator, | |
type=EventTypes.MSC2716_INSERTION, | |
content={ | |
EventContentFields.MSC2716_NEXT_BATCH_ID: batch_id, | |
EventContentFields.MSC2716_HISTORICAL: True, | |
}, | |
prev_event_ids=[event_before.event_id], | |
# Because we're creating all of these events without persisting them yet, | |
# we have to explicitly provide some auth_events | |
auth_event_ids=[ | |
*historical_base_auth_event_ids, | |
as_membership_event.event_id, | |
], | |
# state_event_ids=historical_state_event_ids, | |
depth=inherited_depth, | |
) | |
) | |
_add_to_known_event_list(base_insertion_event, historical_state_events) | |
# Chronological | |
pulled_events: List[EventBase] = [ | |
# Beginning of room (oldest messages) | |
# *list(state_map.values()), | |
room_create_event, | |
pl_event, | |
as_membership_event, | |
state_map.get((EventTypes.JoinRules, "")), | |
state_map.get((EventTypes.RoomHistoryVisibility, "")), | |
event_before, | |
# HISTORICAL MESSAGE END | |
insertion_event, | |
historical_message_event, | |
batch_event, | |
base_insertion_event, | |
# HISTORICAL MESSAGE START | |
event_after, | |
# Latest in the room (newest messages) | |
] | |
# pulled_events: List[EventBase] = [ | |
# # Beginning of room (oldest messages) | |
# # *list(state_map.values()), | |
# room_create_event, | |
# pl_event, | |
# as_membership_event, | |
# state_map.get((EventTypes.JoinRules, "")), | |
# state_map.get((EventTypes.RoomHistoryVisibility, "")), | |
# event_before, | |
# # HISTORICAL MESSAGE END | |
# insertion_event, | |
# historical_message_event, | |
# batch_event, | |
# base_insertion_event, | |
# # HISTORICAL MESSAGE START | |
# event_after, | |
# # Latest in the room (newest messages) | |
# ] | |
# The order that we get after passing reverse chronological events in | |
# that mostly passes. Only the insertion event is rejected but the | |
# historical messages appear /messages scrollback. | |
# pulled_events: List[EventBase] = [ | |
# # Beginning of room (oldest messages) | |
# # *list(state_map.values()), | |
# room_create_event, | |
# pl_event, | |
# as_membership_event, | |
# state_map.get((EventTypes.JoinRules, "")), | |
# state_map.get((EventTypes.RoomHistoryVisibility, "")), | |
# event_before, | |
# event_after, | |
# base_insertion_event, | |
# batch_event, | |
# historical_message_event, | |
# insertion_event, | |
# # Latest in the room (newest messages) | |
# ] | |
import logging | |
logger = logging.getLogger(__name__) | |
logger.info( | |
"pulled_events=%s", | |
json.dumps( | |
[_debug_event_string(event) for event in pulled_events], | |
indent=4, | |
), | |
) | |
for event, _ in known_event_dict.values(): | |
if event.internal_metadata.outlier: | |
self.fail("Our pristine events should not be marked as an outlier") | |
self.get_success( | |
self.hs.get_federation_event_handler()._process_pulled_events( | |
self.OTHER_SERVER_NAME, | |
[ | |
# Make copies of events since Synapse modifies the | |
# internal_metadata in place and we want to keep our | |
# pristine copies | |
make_event_from_dict(pulled_event.get_pdu_json(), room_version) | |
for pulled_event in pulled_events | |
], | |
backfilled=True, | |
) | |
) | |
from_token = self.get_success( | |
self.hs.get_event_sources().get_current_token_for_pagination(room_id) | |
) | |
actual_events_in_room_reverse_chronological, _ = self.get_success( | |
main_store.paginate_room_events( | |
room_id, from_key=from_token.room_key, limit=100, direction="b" | |
) | |
) | |
# We have to reverse the list to make it chronological. | |
actual_events_in_room_chronological = list( | |
reversed(actual_events_in_room_reverse_chronological) | |
) | |
expected_event_order = [ | |
# Beginning of room (oldest messages) | |
# *list(state_map.values()), | |
room_create_event, | |
as_membership_event, | |
pl_event, | |
state_map.get((EventTypes.JoinRules, "")), | |
state_map.get((EventTypes.RoomHistoryVisibility, "")), | |
event_before, | |
# HISTORICAL MESSAGE END | |
insertion_event, | |
historical_message_event, | |
batch_event, | |
base_insertion_event, | |
# HISTORICAL MESSAGE START | |
event_after, | |
# Latest in the room (newest messages) | |
] | |
event_id_diff = {event.event_id for event in expected_event_order} - { | |
event.event_id for event in actual_events_in_room_chronological | |
} | |
event_diff_ordered = [ | |
event for event in expected_event_order if event.event_id in event_id_diff | |
] | |
event_id_extra = { | |
event.event_id for event in actual_events_in_room_chronological | |
} - {event.event_id for event in expected_event_order} | |
event_extra_ordered = [ | |
event | |
for event in actual_events_in_room_chronological | |
if event.event_id in event_id_extra | |
] | |
assertion_message = ( | |
"Debug info:\nActual events missing from expected list: %s\nActual events contain %d additional events compared to expected: %s\nExpected event order: %s\nActual event order: %s" | |
% ( | |
json.dumps( | |
[_debug_event_string(event) for event in event_diff_ordered], | |
indent=4, | |
), | |
len(event_extra_ordered), | |
json.dumps( | |
[_debug_event_string(event) for event in event_extra_ordered], | |
indent=4, | |
), | |
json.dumps( | |
[_debug_event_string(event) for event in expected_event_order], | |
indent=4, | |
), | |
json.dumps( | |
[ | |
_debug_event_string(event) | |
for event in actual_events_in_room_chronological | |
], | |
indent=4, | |
), | |
) | |
) | |
# assert ( | |
# actual_events_in_room_chronological == expected_event_order | |
# ), assertion_message | |
self.assertEqual( | |
[event.event_id for event in actual_events_in_room_chronological], | |
[event.event_id for event in expected_event_order], | |
assertion_message, | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with a Complement test is that we'd need to call Synapse's Delete Room Admin API, which would only be specific to Synapse. I could write this test, but I wouldn't be able to publish it anywhere permanent, and the steps in it would be no different than manually testing.
Manually testing again, with the continue
backfill doesn't drop any events. Thanks for catching that :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cracked and settled for writing a unit test for backfill, while including a bit that specifically checks that backfill won't rely on the event cache to determine whether to persist an event. The result is a test for the backfill + the specific behaviour that PR removes.
synapse/tests/handlers/test_federation.py
Lines 332 to 426 in 42f1410
def test_backfill_ignores_known_events(self) -> None: | |
""" | |
Tests that events that we already know about are ignored when backfilling. | |
""" | |
# Set up users | |
user_id = self.register_user("kermit", "test") | |
tok = self.login("kermit", "test") | |
other_server = "otherserver" | |
other_user = "@otheruser:" + other_server | |
# Create a room to backfill events into | |
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) | |
room_version = self.get_success(self.store.get_room_version(room_id)) | |
# Build an event to backfill | |
event = event_from_pdu_json( | |
{ | |
"type": EventTypes.Message, | |
"content": {"body": "hello world", "msgtype": "m.text"}, | |
"room_id": room_id, | |
"sender": other_user, | |
"depth": 32, | |
"prev_events": [], | |
"auth_events": [], | |
"origin_server_ts": self.clock.time_msec(), | |
}, | |
room_version, | |
) | |
# Ensure the event is not already in the DB | |
self.get_failure( | |
self.store.get_event(event.event_id), | |
NotFoundError, | |
) | |
# Backfill the event and check that it has entered the DB. | |
# We mock out the FederationClient.backfill method, to pretend that a remote | |
# server has returned our fake event. | |
federation_client_backfill_mock = Mock(return_value=make_awaitable([event])) | |
self.hs.get_federation_client().backfill = federation_client_backfill_mock | |
# We also mock the persist method with a side effect of itself. This allows us | |
# to track when it has been called while preserving its function. | |
persist_events_and_notify_mock = Mock( | |
side_effect=self.hs.get_federation_event_handler().persist_events_and_notify | |
) | |
self.hs.get_federation_event_handler().persist_events_and_notify = ( | |
persist_events_and_notify_mock | |
) | |
# Small side-tangent. We populate the event cache with the event, even though | |
# it is not yet in the DB. This is an invalid scenario that can currently occur | |
# due to not properly invalidating the event cache. | |
# See https://github.com/matrix-org/synapse/issues/13476. | |
# | |
# As a result, backfill should not rely on the event cache to check whether | |
# we already have an event in the DB. | |
# TODO: Remove this bit when the event cache is properly invalidated. | |
cache_entry = EventCacheEntry( | |
event=event, | |
redacted_event=None, | |
) | |
self.store._get_event_cache.set_local((event.event_id,), cache_entry) | |
# We now call FederationEventHandler.backfill (a separate method) to trigger | |
# a backfill request. It should receive the fake event. | |
self.get_success( | |
self.hs.get_federation_event_handler().backfill( | |
other_user, | |
room_id, | |
limit=10, | |
extremities=[], | |
) | |
) | |
# Check that our fake event was persisted. | |
persist_events_and_notify_mock.assert_called_once() | |
persist_events_and_notify_mock.reset_mock() | |
# Now we repeat the backfill, having the homeserver receive the fake event | |
# again. | |
self.get_success( | |
self.hs.get_federation_event_handler().backfill( | |
other_user, | |
room_id, | |
limit=10, | |
extremities=[], | |
), | |
) | |
# This time, we expect no event persistence to have occurred, as we already | |
# have this event. | |
persist_events_and_notify_mock.assert_not_called() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK with the change suggested
# Check if we already any of these have these events. | ||
# Note: we currently make a lookup in the database directly here rather than | ||
# checking the event cache, due to: | ||
# https://github.com/matrix-org/synapse/issues/13476 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add an additional comment about potentially adding a cached check back when "something like #13916 comes along and correctly invalidates the event cache."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and added a comment to the referenced bug instead, which should serve the same purpose and tie things together a bit better on GitHub.
And explicitly ensure that it ignores get_event_cache for now.
I successfully tested the following on my homeserver on develop (2c63cdc) + #14161 + #14164 (this PR):
Worked as expected 🎉 |
Most of the context is in #14161.
This PR augments another optimisation. In #14161, when we purged and rejoined a room, we found that while the state was now persisted correctly, non-state events were not. These events are not initially received when you rejoin a room, but must be backfilled from the remote homeserver after you join the room.
Backfilling involves another area where we refuse to persist events if they already exist in the event cache:
synapse/synapse/handlers/federation_event.py
Lines 849 to 859 in 7b7478e
It has one more conditional over the
_have_seen_events_dict
case though. If we found an event in the cache, and it was an outlier, we'll still persist it. Of course, most events aren't outliers, including messages. Hence why we see messages get lost when rejoining a room after purging it in #14161 (comment),This PR changes this optimisation such that it skips the event cache entirely, and instead just checks the database directly. Again, as with #14161, there's a potential performance degradation here, but it prevents breakage until something like #13916 comes along and correctly invalidates the event cache.
The main change here is skipping the event cache during backfill when checking to see if we already have an event before persisting it.
Additionally, there's a potential speedup in this PR as well. Before, we were checking one event ID at a time against the cache (or DB in case of a cache miss) in
_process_pulled_event
.This PR moves this checking code outside of
_process_pulled_event
and excludes events we've already seen before sorting them and passing them one-by-one to_process_pulled_event
.In the case of lots of cache misses, this code should theoretically be faster...