Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add an API for listing threads in a room. (#13394)
Browse files Browse the repository at this point in the history
Implement the /threads endpoint from MSC3856.

This is currently unstable and behind an experimental configuration
flag.

It includes a background update to backfill data, results from
the /threads endpoint will be partial until that finishes.
  • Loading branch information
clokep committed Oct 13, 2022
1 parent b6baa46 commit 3bbe532
Show file tree
Hide file tree
Showing 10 changed files with 522 additions and 6 deletions.
1 change: 1 addition & 0 deletions changelog.d/13394.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Experimental support for [MSC3856](https://github.com/matrix-org/matrix-spec-proposals/pull/3856): threads list API.
2 changes: 2 additions & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
RegistrationBackgroundUpdateStore,
find_max_generated_user_id_localpart,
)
from synapse.storage.databases.main.relations import RelationsWorkerStore
from synapse.storage.databases.main.room import RoomBackgroundUpdateStore
from synapse.storage.databases.main.roommember import RoomMemberBackgroundUpdateStore
from synapse.storage.databases.main.search import SearchBackgroundUpdateStore
Expand Down Expand Up @@ -206,6 +207,7 @@ class Store(
PusherWorkerStore,
PresenceBackgroundUpdateStore,
ReceiptsBackgroundUpdateStore,
RelationsWorkerStore,
):
def execute(self, f: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
Expand Down
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# MSC3848: Introduce errcodes for specific event sending failures
self.msc3848_enabled: bool = experimental.get("msc3848_enabled", False)

# MSC3856: Threads list API
self.msc3856_enabled: bool = experimental.get("msc3856_enabled", False)

# MSC3852: Expose last seen user agent field on /_matrix/client/v3/devices.
self.msc3852_enabled: bool = experimental.get("msc3852_enabled", False)

Expand Down
86 changes: 85 additions & 1 deletion synapse/handlers/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import logging
from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Tuple

Expand All @@ -20,7 +21,7 @@
from synapse.api.errors import SynapseError
from synapse.events import EventBase, relation_from_event
from synapse.logging.opentracing import trace
from synapse.storage.databases.main.relations import _RelatedEvent
from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamToken, UserID
from synapse.visibility import filter_events_for_client
Expand All @@ -32,6 +33,13 @@
logger = logging.getLogger(__name__)


class ThreadsListInclude(str, enum.Enum):
"""Valid values for the 'include' flag of /threads."""

all = "all"
participated = "participated"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _ThreadAggregation:
# The latest event in the thread.
Expand Down Expand Up @@ -482,3 +490,79 @@ async def get_bundled_aggregations(
results.setdefault(event_id, BundledAggregations()).replace = edit

return results

async def get_threads(
self,
requester: Requester,
room_id: str,
include: ThreadsListInclude,
limit: int = 5,
from_token: Optional[ThreadsNextBatch] = None,
) -> JsonDict:
"""Get related events of a event, ordered by topological ordering.
Args:
requester: The user requesting the relations.
room_id: The room the event belongs to.
include: One of "all" or "participated" to indicate which threads should
be returned.
limit: Only fetch the most recent `limit` events.
from_token: Fetch rows from the given token, or from the start if None.
Returns:
The pagination chunk.
"""

user_id = requester.user.to_string()

# TODO Properly handle a user leaving a room.
(_, member_event_id) = await self._auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
)

# Note that ignored users are not passed into get_relations_for_event
# below. Ignored users are handled in filter_events_for_client (and by
# not passing them in here we should get a better cache hit rate).
thread_roots, next_batch = await self._main_store.get_threads(
room_id=room_id, limit=limit, from_token=from_token
)

events = await self._main_store.get_events_as_list(thread_roots)

if include == ThreadsListInclude.participated:
# Pre-seed thread participation with whether the requester sent the event.
participated = {event.event_id: event.sender == user_id for event in events}
# For events the requester did not send, check the database for whether
# the requester sent a threaded reply.
participated.update(
await self._main_store.get_threads_participated(
[eid for eid, p in participated.items() if not p],
user_id,
)
)

# Limit the returned threads to those the user has participated in.
events = [event for event in events if participated[event.event_id]]

events = await filter_events_for_client(
self._storage_controllers,
user_id,
events,
is_peeking=(member_event_id is None),
)

aggregations = await self.get_bundled_aggregations(
events, requester.user.to_string()
)

now = self._clock.time_msec()
serialized_events = self._event_serializer.serialize_events(
events, now, bundle_aggregations=aggregations
)

return_value: JsonDict = {"chunk": serialized_events}

if next_batch:
return_value["next_batch"] = str(next_batch)

return return_value
50 changes: 49 additions & 1 deletion synapse/rest/client/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
# limitations under the License.

import logging
import re
from typing import TYPE_CHECKING, Optional, Tuple

from synapse.handlers.relations import ThreadsListInclude
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet
from synapse.http.servlet import RestServlet, parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.rest.client._base import client_patterns
from synapse.storage.databases.main.relations import ThreadsNextBatch
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict

Expand Down Expand Up @@ -78,5 +81,50 @@ async def on_GET(
return 200, result


class ThreadsServlet(RestServlet):
PATTERNS = (
re.compile(
"^/_matrix/client/unstable/org.matrix.msc3856/rooms/(?P<room_id>[^/]*)/threads"
),
)

def __init__(self, hs: "HomeServer"):
super().__init__()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self._relations_handler = hs.get_relations_handler()

async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)

limit = parse_integer(request, "limit", default=5)
from_token_str = parse_string(request, "from")
include = parse_string(
request,
"include",
default=ThreadsListInclude.all.value,
allowed_values=[v.value for v in ThreadsListInclude],
)

# Return the relations
from_token = None
if from_token_str:
from_token = ThreadsNextBatch.from_string(from_token_str)

result = await self._relations_handler.get_threads(
requester=requester,
room_id=room_id,
include=ThreadsListInclude(include),
limit=limit,
from_token=from_token,
)

return 200, result


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
RelationPaginationServlet(hs).register(http_server)
if hs.config.experimental.msc3856_enabled:
ThreadsServlet(hs).register(http_server)
1 change: 1 addition & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,))
self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,))
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
self._attempt_to_invalidate_cache("get_threads", (room_id,))

async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
Expand Down
38 changes: 35 additions & 3 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from prometheus_client import Counter

import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
Expand Down Expand Up @@ -1616,7 +1616,7 @@ def _update_metadata_tables_txn(
)

# Remove from relations table.
self._handle_redact_relations(txn, event.redacts)
self._handle_redact_relations(txn, event.room_id, event.redacts)

# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
Expand Down Expand Up @@ -1866,6 +1866,34 @@ def _handle_event_relations(
},
)

if relation.rel_type == RelationTypes.THREAD:
# Upsert into the threads table, but only overwrite the value if the
# new event is of a later topological order OR if the topological
# ordering is equal, but the stream ordering is later.
sql = """
INSERT INTO threads (room_id, thread_id, latest_event_id, topological_ordering, stream_ordering)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (room_id, thread_id)
DO UPDATE SET
latest_event_id = excluded.latest_event_id,
topological_ordering = excluded.topological_ordering,
stream_ordering = excluded.stream_ordering
WHERE
threads.topological_ordering <= excluded.topological_ordering AND
threads.stream_ordering < excluded.stream_ordering
"""

txn.execute(
sql,
(
event.room_id,
relation.parent_id,
event.event_id,
event.depth,
event.internal_metadata.stream_ordering,
),
)

def _handle_insertion_event(
self, txn: LoggingTransaction, event: EventBase
) -> None:
Expand Down Expand Up @@ -1989,13 +2017,14 @@ def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase) -> None
txn.execute(sql, (batch_id,))

def _handle_redact_relations(
self, txn: LoggingTransaction, redacted_event_id: str
self, txn: LoggingTransaction, room_id: str, redacted_event_id: str
) -> None:
"""Handles receiving a redaction and checking whether the redacted event
has any relations which must be removed from the database.
Args:
txn
room_id: The room ID of the event that was redacted.
redacted_event_id: The event that was redacted.
"""

Expand Down Expand Up @@ -2024,6 +2053,9 @@ def _handle_redact_relations(
self.store._invalidate_cache_and_stream(
txn, self.store.get_thread_participated, (redacted_relates_to,)
)
self.store._invalidate_cache_and_stream(
txn, self.store.get_threads, (room_id,)
)

self.db_pool.simple_delete_txn(
txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
Expand Down
Loading

0 comments on commit 3bbe532

Please sign in to comment.