Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add Admin API to Fetch Messages Within a Particular Window #13672

Merged
merged 18 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13672.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add admin APIs to fetch messages within a particular window of time.
145 changes: 145 additions & 0 deletions docs/admin_api/rooms.md
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,151 @@ A response body like the following is returned:
}
```

# Room Messages API

The Room Messages admin API allows server admins to get all messages
sent to a room in a given timeframe. There are various parameters available
that allow for filtering and ordering the returned list. This API supports pagination.

To use it, you will need to authenticate by providing an `access_token`
for a server admin: see [Admin API](../usage/administration/admin_api).

This endpoint mirrors the [Matrix Spec defined Messages API](https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3roomsroomidmessages).

The API is:
```
GET /_synapse/admin/v1/rooms/<room_id>/messages
```

**Parameters**

The following path parameters are required:

* `room_id` - The ID of the room you wish you fetch messages from.

The following query parameters are available:

* `from` (required) - The token to start returning events from. This token can be obtained from a prev_batch
or next_batch token returned by the /sync endpoint, or from an end token returned by a previous request to this endpoint.
* `to` - The token to spot returning events at.
* `limit` - The maximum number of events to return. Defaults to `10`.
* `filter` - A JSON RoomEventFilter to filter returned events with.
* `dir` - The direction to return events from. Either `f` for forwards or `b` for backwards. Setting
this value to `b` will reverse the above sort order. Defaults to `f`.

**Response**

The following fields are possible in the JSON response body:

* `chunk` - A list of room events. The order depends on the dir parameter.
Note that an empty chunk does not necessarily imply that no more events are available. Clients should continue to paginate until no end property is returned.
* `end` - A token corresponding to the end of chunk. This token can be passed back to this endpoint to request further events.
If no further events are available, this property is omitted from the response.
* `start` - A token corresponding to the start of chunk.
* `state` - A list of state events relevant to showing the chunk.

**Example**

For more details on each chunk, read [the Matrix specification](https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3roomsroomidmessages).

```json
{
"chunk": [
{
"content": {
"body": "This is an example text message",
"format": "org.matrix.custom.html",
"formatted_body": "<b>This is an example text message</b>",
"msgtype": "m.text"
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 1432735824653,
"room_id": "!636q39766251:example.com",
"sender": "@example:example.org",
"type": "m.room.message",
"unsigned": {
"age": 1234
}
},
{
"content": {
"name": "The room name"
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 1432735824653,
"room_id": "!636q39766251:example.com",
"sender": "@example:example.org",
"state_key": "",
"type": "m.room.name",
"unsigned": {
"age": 1234
}
},
{
"content": {
"body": "Gangnam Style",
"info": {
"duration": 2140786,
"h": 320,
"mimetype": "video/mp4",
"size": 1563685,
"thumbnail_info": {
"h": 300,
"mimetype": "image/jpeg",
"size": 46144,
"w": 300
},
"thumbnail_url": "mxc://example.org/FHyPlCeYUSFFxlgbQYZmoEoe",
"w": 480
},
"msgtype": "m.video",
"url": "mxc://example.org/a526eYUSFFxlgbQYZmo442"
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 1432735824653,
"room_id": "!636q39766251:example.com",
"sender": "@example:example.org",
"type": "m.room.message",
"unsigned": {
"age": 1234
}
}
],
"end": "t47409-4357353_219380_26003_2265",
"start": "t47429-4392820_219380_26003_2265"
}
```

# Room Timestamp to Event API

The Room Timestamp to Event API endpoint fetches the `event_id` of the closest event to the given
timestamp (`ts` query parameter) in the given direction (`dir` query parameter).

Useful for cases like jump to date so you can start paginating messages from
a given date in the archive.

The API is:
```
GET /_synapse/admin/v1/rooms/<room_id>/timestamp_to_event
```

**Parameters**

The following path parameters are required:

* `room_id` - The ID of the room you wish to check.

The following query parameters are available:

* `ts` - a timestamp in milliseconds where we will find the closest event in
the given direction.
* `dir` - can be `f` or `b` to indicate forwards and backwards in time from the
given timestamp.
dav-is marked this conversation as resolved.
Show resolved Hide resolved

**Response**

* `event_id` - converted from timestamp

# Block Room API
The Block Room admin API allows server admins to block and unblock rooms,
and query to see if a given room is blocked.
Expand Down
46 changes: 30 additions & 16 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from synapse.handlers.room import ShutdownRoomResponse
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamKeyType
Expand Down Expand Up @@ -423,6 +424,7 @@ async def get_messages(
pagin_config: PaginationConfig,
as_client_event: bool = True,
event_filter: Optional[Filter] = None,
use_admin_priviledge: bool = False,
) -> JsonDict:
"""Get messages in a room.

Expand All @@ -432,10 +434,16 @@ async def get_messages(
pagin_config: The pagination config rules to apply, if any.
as_client_event: True to get events in client-server format.
event_filter: Filter to apply to results or None
use_admin_priviledge: if `True`, return all events, regardless
of whether `user` has access to them. To be used **ONLY**
from the admin API.

Returns:
Pagination API results
"""
if use_admin_priviledge:
await assert_user_is_admin(self.auth, requester)

user_id = requester.user.to_string()

if pagin_config.from_token:
Expand All @@ -458,12 +466,14 @@ async def get_messages(
room_token = from_token.room_key

async with self.pagination_lock.read(room_id):
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
)
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
)

if pagin_config.direction == "b":
# if we're going backwards, we might need to backfill. This
Expand All @@ -475,7 +485,7 @@ async def get_messages(
room_id, room_token.stream
)

if membership == Membership.LEAVE:
if not use_admin_priviledge and membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
Expand Down Expand Up @@ -528,12 +538,13 @@ async def get_messages(
if event_filter:
events = await event_filter.filter(events)

events = await filter_events_for_client(
self._storage_controllers,
user_id,
events,
is_peeking=(member_event_id is None),
)
if not use_admin_priviledge:
events = await filter_events_for_client(
self._storage_controllers,
user_id,
events,
is_peeking=(member_event_id is None),
)

# if after the filter applied there are no more events
# return immediately - but there might be more in next_token batch
Expand Down Expand Up @@ -561,9 +572,12 @@ async def get_messages(
state_dict = await self.store.get_events(list(state_ids.values()))
state = state_dict.values()

aggregations = await self._relations_handler.get_bundled_aggregations(
events, user_id
)
if not use_admin_priviledge:
aggregations = await self._relations_handler.get_bundled_aggregations(
events, user_id
)
else:
aggregations = None # TODO: an admin might want aggregations
dav-is marked this conversation as resolved.
Show resolved Hide resolved

time_now = self.clock.time_msec()

Expand Down
4 changes: 4 additions & 0 deletions synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@
MakeRoomAdminRestServlet,
RoomEventContextServlet,
RoomMembersRestServlet,
RoomMessagesRestServlet,
RoomRestServlet,
RoomRestV2Servlet,
RoomStateRestServlet,
RoomTimestampToEventRestServlet,
)
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
Expand Down Expand Up @@ -271,6 +273,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
DestinationResetConnectionRestServlet(hs).register(http_server)
DestinationRestServlet(hs).register(http_server)
ListDestinationsRestServlet(hs).register(http_server)
RoomMessagesRestServlet(hs).register(http_server)
RoomTimestampToEventRestServlet(hs).register(http_server)

# Some servlets only get registered for the main process.
if hs.config.worker.worker_app is None:
Expand Down
104 changes: 104 additions & 0 deletions synapse/rest/admin/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from synapse.storage.databases.main.room import RoomSortOrder
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, RoomID, UserID, create_requester
from synapse.util import json_decoder

Expand Down Expand Up @@ -858,3 +859,106 @@ async def on_PUT(
await self._store.unblock_room(room_id)

return HTTPStatus.OK, {"block": block}


class RoomMessagesRestServlet(RestServlet):
"""
Get messages list of a room.
"""

PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/messages$")

def __init__(self, hs: "HomeServer"):
self._hs = hs
self._clock = hs.get_clock()
self._pagination_handler = hs.get_pagination_handler()
self._auth = hs.get_auth()
self._store = hs.get_datastores().main

async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester)

pagination_config = await PaginationConfig.from_request(
self._store, request, default_limit=10
)
# Twisted will have processed the args by now.
assert request.args is not None
as_client_event = b"raw" not in request.args
filter_str = parse_string(request, "filter", encoding="utf-8")
if filter_str:
filter_json = urlparse.unquote(filter_str)
event_filter: Optional[Filter] = Filter(
self._hs, json_decoder.decode(filter_json)
)
if (
event_filter
and event_filter.filter_json.get("event_format", "client")
== "federation"
):
as_client_event = False
else:
event_filter = None

msgs = await self._pagination_handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
as_client_event=as_client_event,
event_filter=event_filter,
use_admin_priviledge=True,
)

return HTTPStatus.OK, msgs


class RoomTimestampToEventRestServlet(RestServlet):
"""
API endpoint to fetch the `event_id` of the closest event to the given
timestamp (`ts` query parameter) in the given direction (`dir` query
parameter).

Useful for cases like jump to date so you can start paginating messages from
a given date in the archive.

`ts` is a timestamp in milliseconds where we will find the closest event in
the given direction.

`dir` can be `f` or `b` to indicate forwards and backwards in time from the
given timestamp.

GET /_synapse/admin/v1/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
{
"event_id": ...
}
"""

PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/timestamp_to_event$")

def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastores().main
self._timestamp_lookup_handler = hs.get_timestamp_lookup_handler()

async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester)

timestamp = parse_integer(request, "ts", required=True)
direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])

(
event_id,
origin_server_ts,
) = await self._timestamp_lookup_handler.get_event_for_timestamp(
requester, room_id, timestamp, direction
)

return HTTPStatus.OK, {
"event_id": event_id,
"origin_server_ts": origin_server_ts,
}
Loading