Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add admin API to reset connection timeouts for remote server #11639

Merged
merged 19 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11593.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an error to get federation status of a destination server even if no error has occurred.
1 change: 1 addition & 0 deletions changelog.d/11639.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add admin API to reset connection timeouts for remote server.
29 changes: 28 additions & 1 deletion docs/usage/administration/admin_api/federation.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ The following fields are returned in the JSON response body:
- `next_token`: string representing a positive integer - Indication for pagination. See above.
- `total` - integer - Total number of destinations.

# Destination Details API
## Destination Details API

This API gets the retry timing info for a specific remote server.

Expand All @@ -108,7 +108,34 @@ A response body like the following is returned:
}
```

**Parameters**

The following parameters should be set in the URL:

- `destination` - Name of the remote server.

**Response**

The response fields are the same like in the `destinations` array in
[List of destinations](#list-of-destinations) response.

## Reset connection timeout

This API resets the retry timing for a specific remote server and tries to connect
dklimpel marked this conversation as resolved.
Show resolved Hide resolved
the remote server again. It does not wait for the next `retry_interval`.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
The connection must have previously run into an error and `retry_last_ts`
([Destination Details API](#destination-details-api)) must not be equal to `0`.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

The API is:

```
POST /_synapse/admin/v1/federation/destinations/<destination>/reset_connection

{}
```

**Parameters**

The following parameters should be set in the URL:

- `destination` - Name of the remote server.
4 changes: 2 additions & 2 deletions synapse/federation/transport/server/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ async def authenticate_request(
# alive
retry_timings = await self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings.retry_last_ts:
run_in_background(self._reset_retry_timings, origin)
run_in_background(self.reset_retry_timings, origin)

return origin

async def _reset_retry_timings(self, origin: str) -> None:
async def reset_retry_timings(self, origin: str) -> None:
try:
logger.info("Marking origin %r as up", origin)
await self.store.set_destination_retry_timings(origin, None, 0, 0)
Expand Down
6 changes: 4 additions & 2 deletions synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
EventReportsRestServlet,
)
from synapse.rest.admin.federation import (
DestinationsRestServlet,
DestinationResetConnectionRestServlet,
DestinationRestServlet,
ListDestinationsRestServlet,
)
from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
Expand Down Expand Up @@ -265,7 +266,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ListRegistrationTokensRestServlet(hs).register(http_server)
NewRegistrationTokenRestServlet(hs).register(http_server)
RegistrationTokenRestServlet(hs).register(http_server)
DestinationsRestServlet(hs).register(http_server)
DestinationResetConnectionRestServlet(hs).register(http_server)
DestinationRestServlet(hs).register(http_server)
ListDestinationsRestServlet(hs).register(http_server)

# Some servlets only get registered for the main process.
Expand Down
65 changes: 59 additions & 6 deletions synapse/rest/admin/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import TYPE_CHECKING, Tuple

from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.federation.transport.server._base import Authenticator
dklimpel marked this conversation as resolved.
Show resolved Hide resolved
from synapse.http.servlet import RestServlet, parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
Expand Down Expand Up @@ -90,7 +91,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
return HTTPStatus.OK, response


class DestinationsRestServlet(RestServlet):
class DestinationRestServlet(RestServlet):
"""Get details of a destination.
This needs user to have administrator access in Synapse.

Expand All @@ -111,12 +112,26 @@ async def on_GET(
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)

if not await self._store.is_destination(destination):
raise NotFoundError("Unknown destination")

destination_retry_timings = await self._store.get_destination_retry_timings(
destination
)

if not destination_retry_timings:
raise NotFoundError("Unknown destination")
retry_timing_respone: JsonDict = {}
if destination_retry_timings:
retry_timing_respone = {
"failure_ts": destination_retry_timings.failure_ts,
"retry_last_ts": destination_retry_timings.retry_last_ts,
"retry_interval": destination_retry_timings.retry_interval,
}
else:
retry_timing_respone = {
"failure_ts": None,
"retry_last_ts": 0,
"retry_interval": 0,
}

last_successful_stream_ordering = (
await self._store.get_destination_last_successful_stream_ordering(
Expand All @@ -126,10 +141,48 @@ async def on_GET(

response = {
"destination": destination,
"failure_ts": destination_retry_timings.failure_ts,
"retry_last_ts": destination_retry_timings.retry_last_ts,
"retry_interval": destination_retry_timings.retry_interval,
"last_successful_stream_ordering": last_successful_stream_ordering,
**retry_timing_respone,
}

return HTTPStatus.OK, response


class DestinationResetConnectionRestServlet(RestServlet):
"""Reset destinations' connection timeouts and wake it up.
This needs user to have administrator access in Synapse.

POST /_synapse/admin/v1/federation/destinations/<destination>/reset_connection
{}

returns:
200 OK otherwise an error.
"""

PATTERNS = admin_patterns(
"/federation/destinations/(?P<destination>[^/]+)/reset_connection$"
)

def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastore()
self._authenticator = Authenticator(hs)

async def on_POST(
self, request: SynapseRequest, destination: str
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

if not await self._store.is_destination(destination):
raise NotFoundError("Unknown destination")

retry_timings = await self._store.get_destination_retry_timings(destination)
if not (retry_timings and retry_timings.retry_last_ts):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"The retry timing does not need to be reset for this destination.",
)

await self._authenticator.reset_retry_timings(destination)

return HTTPStatus.OK, {}
10 changes: 10 additions & 0 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,13 @@ def get_destinations_paginate_txn(
return await self.db_pool.runInteraction(
"get_destinations_paginate_txn", get_destinations_paginate_txn
)

async def is_destination(self, destination: str) -> Optional[bool]:
"""Function to check if a destination is a destination of this server."""
return await self.db_pool.simple_select_one_onecol(
table="destinations",
keyvalues={"destination": destination},
retcol="1",
allow_none=True,
desc="is_destination",
)
130 changes: 108 additions & 22 deletions tests/rest/admin/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:

@parameterized.expand(
[
("/_synapse/admin/v1/federation/destinations",),
("/_synapse/admin/v1/federation/destinations/dummy",),
("GET", "/_synapse/admin/v1/federation/destinations"),
("GET", "/_synapse/admin/v1/federation/destinations/dummy"),
(
"POST",
"/_synapse/admin/v1/federation/destinations/dummy/reset_connection",
),
]
)
def test_requester_is_no_admin(self, url: str) -> None:
def test_requester_is_no_admin(self, method: str, url: str) -> None:
"""
If the user is not a server admin, an error 403 is returned.
"""
Expand All @@ -56,7 +60,7 @@ def test_requester_is_no_admin(self, url: str) -> None:
other_user_tok = self.login("user", "pass")

channel = self.make_request(
"GET",
method,
url,
content={},
access_token=other_user_tok,
Expand Down Expand Up @@ -120,6 +124,16 @@ def test_invalid_parameter(self) -> None:
self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])

# invalid destination
channel = self.make_request(
"POST",
self.url + "/dummy/reset_connection",
access_token=self.admin_user_tok,
)

self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])

def test_limit(self) -> None:
"""
Testing list of destinations with limit
Expand Down Expand Up @@ -314,15 +328,12 @@ def _order_test(
retry_interval,
last_successful_stream_ordering,
) in dest:
self.get_success(
self.store.set_destination_retry_timings(
destination, failure_ts, retry_last_ts, retry_interval
)
)
self.get_success(
self.store.set_destination_last_successful_stream_ordering(
destination, last_successful_stream_ordering
)
self._create_destination(
destination,
failure_ts,
retry_last_ts,
retry_interval,
last_successful_stream_ordering,
)

# order by default (destination)
Expand Down Expand Up @@ -413,11 +424,9 @@ def _search_test(
_search_test(None, "foo")
_search_test(None, "bar")

def test_get_single_destination(self) -> None:
"""
Get one specific destinations.
"""
self._create_destinations(5)
def test_get_single_destination_with_retry_timings(self) -> None:
"""Get one specific destination which has retry timings."""
self._create_destinations(1)

channel = self.make_request(
"GET",
Expand All @@ -432,6 +441,86 @@ def test_get_single_destination(self) -> None:
# convert channel.json_body into a List
self._check_fields([channel.json_body])

def test_get_single_destination_no_retry_timings(self) -> None:
"""Get one specific destination which has no retry timings."""
self._create_destination("sub0.example.com")

channel = self.make_request(
"GET",
self.url + "/sub0.example.com",
access_token=self.admin_user_tok,
)

self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("sub0.example.com", channel.json_body["destination"])
self.assertEqual(0, channel.json_body["retry_last_ts"])
self.assertEqual(0, channel.json_body["retry_interval"])
self.assertIsNone(channel.json_body["failure_ts"])
self.assertIsNone(channel.json_body["last_successful_stream_ordering"])

def test_destination_reset_connection(self) -> None:
"""Reset timeouts and wake up destination."""
self._create_destination("sub0.example.com", 100, 100, 100)

channel = self.make_request(
"POST",
self.url + "/sub0.example.com/reset_connection",
access_token=self.admin_user_tok,
)

self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)

retry_timings = self.get_success(
self.store.get_destination_retry_timings("sub0.example.com")
)
self.assertIsNone(retry_timings)

def test_destination_reset_connection_not_required(self) -> None:
"""Try to reset timeouts of a destination with no timeouts and get an error."""
self._create_destination("sub0.example.com", None, 0, 0)

channel = self.make_request(
"POST",
self.url + "/sub0.example.com/reset_connection",
access_token=self.admin_user_tok,
)

self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(
"The retry timing does not need to be reset for this destination.",
channel.json_body["error"],
)

def _create_destination(
self,
destination: str,
failure_ts: Optional[int] = None,
retry_last_ts: int = 0,
retry_interval: int = 0,
last_successful_stream_ordering: Optional[int] = None,
) -> None:
"""Create one specific destination

Args:
destination: the destination we have successfully sent to
failure_ts: when the server started failing (ms since epoch)
retry_last_ts: time of last retry attempt in unix epoch ms
retry_interval: how long until next retry in ms
last_successful_stream_ordering: the stream_ordering of the most
recent successfully-sent PDU
"""
self.get_success(
self.store.set_destination_retry_timings(
destination, failure_ts, retry_last_ts, retry_interval
)
)
if last_successful_stream_ordering is not None:
self.get_success(
self.store.set_destination_last_successful_stream_ordering(
destination, last_successful_stream_ordering
)
)

def _create_destinations(self, number_destinations: int) -> None:
"""Create a number of destinations

Expand All @@ -440,10 +529,7 @@ def _create_destinations(self, number_destinations: int) -> None:
"""
for i in range(0, number_destinations):
dest = f"sub{i}.example.com"
self.get_success(self.store.set_destination_retry_timings(dest, 50, 50, 50))
self.get_success(
self.store.set_destination_last_successful_stream_ordering(dest, 100)
)
self._create_destination(dest, 50, 50, 50, 100)

def _check_fields(self, content: List[JsonDict]) -> None:
"""Checks that the expected destination attributes are present in content
Expand Down