Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Avoid checking the event cache when backfilling events (#14164)
Browse files Browse the repository at this point in the history
  • Loading branch information
anoadragon453 committed Oct 18, 2022
1 parent 828b550 commit dc02d9f
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 15 deletions.
1 change: 1 addition & 0 deletions changelog.d/14164.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.
47 changes: 34 additions & 13 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,9 +798,42 @@ async def _process_pulled_events(
],
)

# Check if we already any of these have these events.
# Note: we currently make a lookup in the database directly here rather than
# checking the event cache, due to:
# https://github.com/matrix-org/synapse/issues/13476
existing_events_map = await self._store._get_events_from_db(
[event.event_id for event in events]
)

new_events = []
for event in events:
event_id = event.event_id

# If we've already seen this event ID...
if event_id in existing_events_map:
existing_event = existing_events_map[event_id]

# ...and the event itself was not previously stored as an outlier...
if not existing_event.event.internal_metadata.is_outlier():
# ...then there's no need to persist it. We have it already.
logger.info(
"_process_pulled_event: Ignoring received event %s which we "
"have already seen",
event.event_id,
)
continue

# While we have seen this event before, it was stored as an outlier.
# We'll now persist it as a non-outlier.
logger.info("De-outliering event %s", event_id)

# Continue on with the events that are new to us.
new_events.append(event)

# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(events, key=lambda x: x.depth)
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
Expand Down Expand Up @@ -852,18 +885,6 @@ async def _process_pulled_event(

event_id = event.event_id

existing = await self._store.get_event(
event_id, allow_none=True, allow_rejected=True
)
if existing:
if not existing.internal_metadata.is_outlier():
logger.info(
"_process_pulled_event: Ignoring received event %s which we have already seen",
event_id,
)
return
logger.info("De-outliering event %s", event_id)

try:
self._sanity_check_event(event)
except SynapseError as err:
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ async def get_event(
If there is a mismatch, behave as per allow_none.
Returns:
The event, or None if the event was not found.
The event, or None if the event was not found and allow_none is `True`.
"""
if not isinstance(event_id, str):
raise TypeError("Invalid event event_id %r" % (event_id,))
Expand Down
105 changes: 104 additions & 1 deletion tests/handlers/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@
from twisted.test.proto_helpers import MemoryReactor

from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, Codes, LimitExceededError, SynapseError
from synapse.api.errors import (
AuthError,
Codes,
LimitExceededError,
NotFoundError,
SynapseError,
)
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, make_event_from_dict
from synapse.federation.federation_base import event_from_pdu_json
Expand All @@ -28,6 +34,7 @@
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.storage.databases.main.events_worker import EventCacheEntry
from synapse.util import Clock
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -322,6 +329,102 @@ def test_backfill_with_many_backward_extremities(self) -> None:
)
self.get_success(d)

def test_backfill_ignores_known_events(self) -> None:
"""
Tests that events that we already know about are ignored when backfilling.
"""
# Set up users
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")

other_server = "otherserver"
other_user = "@otheruser:" + other_server

# Create a room to backfill events into
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(self.store.get_room_version(room_id))

# Build an event to backfill
event = event_from_pdu_json(
{
"type": EventTypes.Message,
"content": {"body": "hello world", "msgtype": "m.text"},
"room_id": room_id,
"sender": other_user,
"depth": 32,
"prev_events": [],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
room_version,
)

# Ensure the event is not already in the DB
self.get_failure(
self.store.get_event(event.event_id),
NotFoundError,
)

# Backfill the event and check that it has entered the DB.

# We mock out the FederationClient.backfill method, to pretend that a remote
# server has returned our fake event.
federation_client_backfill_mock = Mock(return_value=make_awaitable([event]))
self.hs.get_federation_client().backfill = federation_client_backfill_mock

# We also mock the persist method with a side effect of itself. This allows us
# to track when it has been called while preserving its function.
persist_events_and_notify_mock = Mock(
side_effect=self.hs.get_federation_event_handler().persist_events_and_notify
)
self.hs.get_federation_event_handler().persist_events_and_notify = (
persist_events_and_notify_mock
)

# Small side-tangent. We populate the event cache with the event, even though
# it is not yet in the DB. This is an invalid scenario that can currently occur
# due to not properly invalidating the event cache.
# See https://github.com/matrix-org/synapse/issues/13476.
#
# As a result, backfill should not rely on the event cache to check whether
# we already have an event in the DB.
# TODO: Remove this bit when the event cache is properly invalidated.
cache_entry = EventCacheEntry(
event=event,
redacted_event=None,
)
self.store._get_event_cache.set_local((event.event_id,), cache_entry)

# We now call FederationEventHandler.backfill (a separate method) to trigger
# a backfill request. It should receive the fake event.
self.get_success(
self.hs.get_federation_event_handler().backfill(
other_user,
room_id,
limit=10,
extremities=[],
)
)

# Check that our fake event was persisted.
persist_events_and_notify_mock.assert_called_once()
persist_events_and_notify_mock.reset_mock()

# Now we repeat the backfill, having the homeserver receive the fake event
# again.
self.get_success(
self.hs.get_federation_event_handler().backfill(
other_user,
room_id,
limit=10,
extremities=[],
),
)

# This time, we expect no event persistence to have occurred, as we already
# have this event.
persist_events_and_notify_mock.assert_not_called()

@unittest.override_config(
{"rc_invites": {"per_user": {"per_second": 0.5, "burst_count": 3}}}
)
Expand Down

0 comments on commit dc02d9f

Please sign in to comment.