Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Immediately retry any requests that have backed off when a server com…
Browse files Browse the repository at this point in the history
…es back online. (#12500)

Otherwise it can take up to a minute for any in-flight `/send` requests to be retried.
  • Loading branch information
erikjohnston authored May 10, 2022
1 parent ade3008 commit 8dd3e0e
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 6 deletions.
1 change: 1 addition & 0 deletions changelog.d/12500.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Immediately retry any requests that have backed off when a server comes back online.
15 changes: 13 additions & 2 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.async_helpers import timeout_deferred
from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
from synapse.util.metrics import Measure

if TYPE_CHECKING:
Expand Down Expand Up @@ -353,6 +353,13 @@ def schedule(x):

self._cooperator = Cooperator(scheduler=schedule)

self._sleeper = AwakenableSleeper(self.reactor)

def wake_destination(self, destination: str) -> None:
"""Called when the remote server may have come back online."""

self._sleeper.wake(destination)

async def _send_request_with_optional_trailing_slash(
self,
request: MatrixFederationRequest,
Expand Down Expand Up @@ -474,6 +481,8 @@ async def _send_request(
self._store,
backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
notifier=self.hs.get_notifier(),
replication_client=self.hs.get_replication_command_handler(),
)

method_bytes = request.method.encode("ascii")
Expand Down Expand Up @@ -664,7 +673,9 @@ async def _send_request(
delay,
)

await self.clock.sleep(delay)
# Sleep for the calculated delay, or wake up immediately
# if we get notified that the server is back up.
await self._sleeper.sleep(request.destination, delay * 1000)
retries_left -= 1
else:
raise
Expand Down
8 changes: 5 additions & 3 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,7 @@ def __init__(self, hs: "HomeServer"):
# Called when there are new things to stream over replication
self.replication_callbacks: List[Callable[[], None]] = []

# Called when remote servers have come back online after having been
# down.
self.remote_server_up_callbacks: List[Callable[[str], None]] = []
self._federation_client = hs.get_federation_http_client()

self._third_party_rules = hs.get_third_party_event_rules()

Expand Down Expand Up @@ -731,3 +729,7 @@ def notify_remote_server_up(self, server: str) -> None:
# circular dependencies.
if self.federation_sender:
self.federation_sender.wake_destination(server)

# Tell the federation client about the fact the server is back up, so
# that any in flight requests can be immediately retried.
self._federation_client.wake_destination(server)
57 changes: 57 additions & 0 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,3 +778,60 @@ def handle_cancel(new_deferred: "defer.Deferred[T]") -> None:
new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel)
deferred.chainDeferred(new_deferred)
return new_deferred


class AwakenableSleeper:
"""Allows explicitly waking up deferreds related to an entity that are
currently sleeping.
"""

def __init__(self, reactor: IReactorTime) -> None:
self._streams: Dict[str, Set[defer.Deferred[None]]] = {}
self._reactor = reactor

def wake(self, name: str) -> None:
"""Wake everything related to `name` that is currently sleeping."""
stream_set = self._streams.pop(name, set())
for deferred in stream_set:
try:
with PreserveLoggingContext():
deferred.callback(None)
except Exception:
pass

async def sleep(self, name: str, delay_ms: int) -> None:
"""Sleep for the given number of milliseconds, or return if the given
`name` is explicitly woken up.
"""

# Create a deferred that gets called in N seconds
sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
call = self._reactor.callLater(delay_ms / 1000, sleep_deferred.callback, None)

# Create a deferred that will get called if `wake` is called with
# the same `name`.
stream_set = self._streams.setdefault(name, set())
notify_deferred: "defer.Deferred[None]" = defer.Deferred()
stream_set.add(notify_deferred)

try:
# Wait for either the delay or for `wake` to be called.
await make_deferred_yieldable(
defer.DeferredList(
[sleep_deferred, notify_deferred],
fireOnOneCallback=True,
fireOnOneErrback=True,
consumeErrors=True,
)
)
finally:
# Clean up the state
curr_stream_set = self._streams.get(name)
if curr_stream_set is not None:
curr_stream_set.discard(notify_deferred)
if len(curr_stream_set) == 0:
self._streams.pop(name)

# Cancel the sleep if we were woken up
if call.active():
call.cancel()
24 changes: 23 additions & 1 deletion synapse/util/retryutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
import logging
import random
from types import TracebackType
from typing import Any, Optional, Type
from typing import TYPE_CHECKING, Any, Optional, Type

import synapse.logging.context
from synapse.api.errors import CodeMessageException
from synapse.storage import DataStore
from synapse.util import Clock

if TYPE_CHECKING:
from synapse.notifier import Notifier
from synapse.replication.tcp.handler import ReplicationCommandHandler

logger = logging.getLogger(__name__)

# the initial backoff, after the first transaction fails
Expand Down Expand Up @@ -131,6 +135,8 @@ def __init__(
retry_interval: int,
backoff_on_404: bool = False,
backoff_on_failure: bool = True,
notifier: Optional["Notifier"] = None,
replication_client: Optional["ReplicationCommandHandler"] = None,
):
"""Marks the destination as "down" if an exception is thrown in the
context, except for CodeMessageException with code < 500.
Expand Down Expand Up @@ -160,6 +166,9 @@ def __init__(
self.backoff_on_404 = backoff_on_404
self.backoff_on_failure = backoff_on_failure

self.notifier = notifier
self.replication_client = replication_client

def __enter__(self) -> None:
pass

Expand Down Expand Up @@ -239,6 +248,19 @@ async def store_retry_timings() -> None:
retry_last_ts,
self.retry_interval,
)

if self.notifier:
# Inform the relevant places that the remote server is back up.
self.notifier.notify_remote_server_up(self.destination)

if self.replication_client:
# If we're on a worker we try and inform master about this. The
# replication client doesn't hook into the notifier to avoid
# infinite loops where we send a `REMOTE_SERVER_UP` command to
# master, which then echoes it back to us which in turn pokes
# the notifier.
self.replication_client.send_remote_server_up(self.destination)

except Exception:
logger.exception("Failed to store destination_retry_timings")

Expand Down
80 changes: 80 additions & 0 deletions tests/util/test_async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
make_deferred_yieldable,
)
from synapse.util.async_helpers import (
AwakenableSleeper,
ObservableDeferred,
concurrently_execute,
delay_cancellation,
stop_cancellation,
timeout_deferred,
)

from tests.server import get_clock
from tests.unittest import TestCase


Expand Down Expand Up @@ -496,3 +498,81 @@ async def outer():
# logging context.
blocking_d.callback(None)
self.successResultOf(d)


class AwakenableSleeperTests(TestCase):
"Tests AwakenableSleeper"

def test_sleep(self):
reactor, _ = get_clock()
sleeper = AwakenableSleeper(reactor)

d = defer.ensureDeferred(sleeper.sleep("name", 1000))

reactor.pump([0.0])
self.assertFalse(d.called)

reactor.advance(0.5)
self.assertFalse(d.called)

reactor.advance(0.6)
self.assertTrue(d.called)

def test_explicit_wake(self):
reactor, _ = get_clock()
sleeper = AwakenableSleeper(reactor)

d = defer.ensureDeferred(sleeper.sleep("name", 1000))

reactor.pump([0.0])
self.assertFalse(d.called)

reactor.advance(0.5)
self.assertFalse(d.called)

sleeper.wake("name")
self.assertTrue(d.called)

reactor.advance(0.6)

def test_multiple_sleepers_timeout(self):
reactor, _ = get_clock()
sleeper = AwakenableSleeper(reactor)

d1 = defer.ensureDeferred(sleeper.sleep("name", 1000))

reactor.advance(0.6)
self.assertFalse(d1.called)

# Add another sleeper
d2 = defer.ensureDeferred(sleeper.sleep("name", 1000))

# Only the first sleep should time out now.
reactor.advance(0.6)
self.assertTrue(d1.called)
self.assertFalse(d2.called)

reactor.advance(0.6)
self.assertTrue(d2.called)

def test_multiple_sleepers_wake(self):
reactor, _ = get_clock()
sleeper = AwakenableSleeper(reactor)

d1 = defer.ensureDeferred(sleeper.sleep("name", 1000))

reactor.advance(0.5)
self.assertFalse(d1.called)

# Add another sleeper
d2 = defer.ensureDeferred(sleeper.sleep("name", 1000))

# Neither should fire yet
reactor.advance(0.3)
self.assertFalse(d1.called)
self.assertFalse(d2.called)

# Explicitly waking both up works
sleeper.wake("name")
self.assertTrue(d1.called)
self.assertTrue(d2.called)

0 comments on commit 8dd3e0e

Please sign in to comment.