Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix not sending events over federation when using sharded event persisters #8536

Merged
merged 4 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8536.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix not sending events over federation when using sharded event writers.
4 changes: 0 additions & 4 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,10 +790,6 @@ async def process_replication_rows(self, stream_name, token, rows):
send_queue.process_rows_for_federation(self.federation_sender, rows)
await self.update_token(token)

# We also need to poke the federation sender when new events happen
elif stream_name == "events":
self.federation_sender.notify_new_events(token)

# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
await self._on_new_receipts(rows)
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def _clear_queue_before_pos(self, position_to_delete):
for key in keys[:i]:
del self.edus[key]

def notify_new_events(self, current_id):
def notify_new_events(self, max_token):
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
Expand Down
9 changes: 7 additions & 2 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
from synapse.types import ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -154,10 +154,15 @@ def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
self._per_destination_queues[destination] = queue
return queue

def notify_new_events(self, current_id: int) -> None:
def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
current_id = max_token.stream

self._last_poked_id = max(current_id, self._last_poked_id)

if self._is_processing:
Expand Down
11 changes: 7 additions & 4 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)
Expand All @@ -47,15 +48,17 @@ def __init__(self, hs):
self.current_max = 0
self.is_processing = False

async def notify_interested_services(self, current_id):
async def notify_interested_services(self, max_token: RoomStreamToken):
"""Notifies (pushes) all application services interested in this event.

Pushing is done asynchronously, so this method won't block for any
prolonged length of time.

Args:
current_id(int): The current maximum ID.
"""
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
current_id = max_token.stream

services = self.store.get_app_services()
if not services or not self.notify_appservices:
return
Expand Down
6 changes: 3 additions & 3 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,19 +319,19 @@ def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
)

if self.federation_sender:
self.federation_sender.notify_new_events(max_room_stream_token.stream)
self.federation_sender.notify_new_events(max_room_stream_token)

async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try:
await self.appservice_handler.notify_interested_services(
max_room_stream_token.stream
max_room_stream_token
)
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
await self._pusher_pool.on_new_notifications(max_room_stream_token)
except Exception:
logger.exception("Error pusher pool of event")

Expand Down
8 changes: 7 additions & 1 deletion synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from twisted.internet.error import AlreadyCalled, AlreadyCancelled

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -91,7 +92,12 @@ def on_stop(self):
pass
self.timed_call = None

def on_new_notifications(self, max_stream_ordering):
def on_new_notifications(self, max_token: RoomStreamToken):
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream

if self.max_stream_ordering:
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering
Expand Down
8 changes: 7 additions & 1 deletion synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.types import RoomStreamToken

from . import push_rule_evaluator, push_tools

Expand Down Expand Up @@ -114,7 +115,12 @@ def on_started(self, should_check_for_notifs):
if should_check_for_notifs:
self._start_processing()

def on_new_notifications(self, max_stream_ordering):
def on_new_notifications(self, max_token: RoomStreamToken):
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream

self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0
)
Expand Down
10 changes: 8 additions & 2 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import concurrently_execute

if TYPE_CHECKING:
Expand Down Expand Up @@ -186,11 +187,16 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])

async def on_new_notifications(self, max_stream_id: int):
async def on_new_notifications(self, max_token: RoomStreamToken):
if not self.pushers:
# nothing to do here.
return

# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_id = max_token.stream

if max_stream_id < self._last_room_stream_id_seen:
# Nothing to do
return
Expand All @@ -214,7 +220,7 @@ async def on_new_notifications(self, max_stream_id: int):

if u in self.pushers:
for p in self.pushers[u].values():
p.on_new_notifications(max_stream_id)
p.on_new_notifications(max_token)

except Exception:
logger.exception("Exception in pusher on_new_notifications")
Expand Down
13 changes: 10 additions & 3 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from twisted.internet import defer

from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.types import RoomStreamToken

from tests.test_utils import make_awaitable
from tests.utils import MockClock
Expand Down Expand Up @@ -61,7 +62,9 @@ def test_notify_interested_services(self):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
yield defer.ensureDeferred(self.handler.notify_interested_services(0))
yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)
self.mock_scheduler.submit_event_for_as.assert_called_once_with(
interested_service, event
)
Expand All @@ -80,7 +83,9 @@ def test_query_user_exists_unknown_user(self):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
yield defer.ensureDeferred(self.handler.notify_interested_services(0))
yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)
self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)

@defer.inlineCallbacks
Expand All @@ -97,7 +102,9 @@ def test_query_user_exists_known_user(self):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
yield defer.ensureDeferred(self.handler.notify_interested_services(0))
yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)
self.assertFalse(
self.mock_as_api.query_user.called,
"query_user called when it shouldn't have been.",
Expand Down