-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Refactor /context to reuse pagination storage functions #3193
Changes from 4 commits
06c0d0e
274b8c6
3e6d306
696f532
23ec51c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,7 +42,7 @@ | |
from synapse.types import RoomStreamToken | ||
from synapse.util.caches.stream_change_cache import StreamChangeCache | ||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background | ||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine | ||
from synapse.storage.engines import PostgresEngine | ||
|
||
import abc | ||
import logging | ||
|
@@ -595,88 +595,28 @@ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_lim | |
retcols=["stream_ordering", "topological_ordering"], | ||
) | ||
|
||
token = RoomStreamToken( | ||
results["topological_ordering"], | ||
# Paginating backwards includes the event at the token, but paginating | ||
# forward doesn't. | ||
before_token = RoomStreamToken( | ||
results["topological_ordering"] - 1, | ||
results["stream_ordering"], | ||
) | ||
|
||
if isinstance(self.database_engine, Sqlite3Engine): | ||
# SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)`` | ||
# So we give pass it to SQLite3 as the UNION ALL of the two queries. | ||
|
||
query_before = ( | ||
"SELECT topological_ordering, stream_ordering, event_id FROM events" | ||
" WHERE room_id = ? AND topological_ordering < ?" | ||
" UNION ALL" | ||
" SELECT topological_ordering, stream_ordering, event_id FROM events" | ||
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?" | ||
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" | ||
) | ||
before_args = ( | ||
room_id, token.topological, | ||
room_id, token.topological, token.stream, | ||
before_limit, | ||
) | ||
|
||
query_after = ( | ||
"SELECT topological_ordering, stream_ordering, event_id FROM events" | ||
" WHERE room_id = ? AND topological_ordering > ?" | ||
" UNION ALL" | ||
" SELECT topological_ordering, stream_ordering, event_id FROM events" | ||
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?" | ||
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" | ||
) | ||
after_args = ( | ||
room_id, token.topological, | ||
room_id, token.topological, token.stream, | ||
after_limit, | ||
) | ||
else: | ||
query_before = ( | ||
"SELECT topological_ordering, stream_ordering, event_id FROM events" | ||
" WHERE room_id = ? AND %s" | ||
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" | ||
) % (upper_bound(token, self.database_engine, inclusive=False),) | ||
|
||
before_args = (room_id, before_limit) | ||
|
||
query_after = ( | ||
"SELECT topological_ordering, stream_ordering, event_id FROM events" | ||
" WHERE room_id = ? AND %s" | ||
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" | ||
) % (lower_bound(token, self.database_engine, inclusive=False),) | ||
|
||
after_args = (room_id, after_limit) | ||
|
||
txn.execute(query_before, before_args) | ||
after_token = RoomStreamToken( | ||
results["topological_ordering"], | ||
results["stream_ordering"], | ||
) | ||
|
||
rows = self.cursor_to_dict(txn) | ||
rows, start_token = self.paginate_room_events_txn( | ||
txn, room_id, before_token, direction='b', limit=before_limit, | ||
) | ||
events_before = [r["event_id"] for r in rows] | ||
|
||
if rows: | ||
start_token = str(RoomStreamToken( | ||
rows[0]["topological_ordering"], | ||
rows[0]["stream_ordering"] - 1, | ||
)) | ||
else: | ||
start_token = str(RoomStreamToken( | ||
token.topological, | ||
token.stream - 1, | ||
)) | ||
|
||
txn.execute(query_after, after_args) | ||
|
||
rows = self.cursor_to_dict(txn) | ||
rows, end_token = self.paginate_room_events_txn( | ||
txn, room_id, after_token, direction='f', limit=after_limit, | ||
) | ||
events_after = [r["event_id"] for r in rows] | ||
|
||
if rows: | ||
end_token = str(RoomStreamToken( | ||
rows[-1]["topological_ordering"], | ||
rows[-1]["stream_ordering"], | ||
)) | ||
else: | ||
end_token = str(token) | ||
|
||
return { | ||
"before": { | ||
"event_ids": events_before, | ||
|
@@ -738,38 +678,49 @@ def update_federation_out_pos(self, typ, stream_id): | |
def has_room_changed_since(self, room_id, stream_id): | ||
return self._events_stream_cache.has_entity_changed(room_id, stream_id) | ||
|
||
def paginate_room_events_txn(self, txn, room_id, from_token, to_token=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any change in functionality here, or is it just:
I think it's the latter but I'm not sure if I'm missing something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, there shouldn't be any functional change here. (I really wish GH made it clearer that stuff is mainly white space change, e.g. 06c0d0e?w=1 makes this a lot clearer imo) |
||
direction='b', limit=-1, event_filter=None): | ||
"""Returns list of events before or after a given token. | ||
|
||
class StreamStore(StreamWorkerStore): | ||
def get_room_max_stream_ordering(self): | ||
return self._stream_id_gen.get_current_token() | ||
|
||
def get_room_min_stream_ordering(self): | ||
return self._backfill_id_gen.get_current_token() | ||
Args: | ||
txn | ||
room_id (str) | ||
from_token (RoomStreamToken): The token used to stream from | ||
to_token (RoomStreamToken|None): A token which if given limits the | ||
results to only those before | ||
direction(char): Either 'b' or 'f' to indicate whether we are | ||
paginating forwards or backwards from `from_key`. | ||
limit (int): The maximum number of events to return. Zero or less | ||
means no limit. | ||
event_filter (Filter|None): If provided filters the events to | ||
those that match the filter. | ||
|
||
@defer.inlineCallbacks | ||
def paginate_room_events(self, room_id, from_key, to_key=None, | ||
direction='b', limit=-1, event_filter=None): | ||
Returns: | ||
tuple[list[dict], str]: Returns the results as a list of dicts and | ||
a token that points to the end of the result set. The dicts haveq | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. haveq |
||
the keys "event_id", "toplogical_ordering" and "stream_orderign". | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. stream_orderign |
||
""" | ||
# Tokens really represent positions between elements, but we use | ||
# the convention of pointing to the event before the gap. Hence | ||
# we have a bit of asymmetry when it comes to equalities. | ||
args = [False, room_id] | ||
if direction == 'b': | ||
order = "DESC" | ||
bounds = upper_bound( | ||
RoomStreamToken.parse(from_key), self.database_engine | ||
from_token, self.database_engine | ||
) | ||
if to_key: | ||
if to_token: | ||
bounds = "%s AND %s" % (bounds, lower_bound( | ||
RoomStreamToken.parse(to_key), self.database_engine | ||
to_token, self.database_engine | ||
)) | ||
else: | ||
order = "ASC" | ||
bounds = lower_bound( | ||
RoomStreamToken.parse(from_key), self.database_engine | ||
from_token, self.database_engine | ||
) | ||
if to_key: | ||
if to_token: | ||
bounds = "%s AND %s" % (bounds, upper_bound( | ||
RoomStreamToken.parse(to_key), self.database_engine | ||
to_token, self.database_engine | ||
)) | ||
|
||
filter_clause, filter_args = filter_to_clause(event_filter) | ||
|
@@ -785,7 +736,8 @@ def paginate_room_events(self, room_id, from_key, to_key=None, | |
limit_str = "" | ||
|
||
sql = ( | ||
"SELECT * FROM events" | ||
"SELECT event_id, topological_ordering, stream_ordering" | ||
" FROM events" | ||
" WHERE outlier = ? AND room_id = ? AND %(bounds)s" | ||
" ORDER BY topological_ordering %(order)s," | ||
" stream_ordering %(order)s %(limit)s" | ||
|
@@ -795,29 +747,58 @@ def paginate_room_events(self, room_id, from_key, to_key=None, | |
"limit": limit_str | ||
} | ||
|
||
def f(txn): | ||
txn.execute(sql, args) | ||
txn.execute(sql, args) | ||
|
||
rows = self.cursor_to_dict(txn) | ||
rows = self.cursor_to_dict(txn) | ||
|
||
if rows: | ||
topo = rows[-1]["topological_ordering"] | ||
toke = rows[-1]["stream_ordering"] | ||
if direction == 'b': | ||
# Tokens are positions between events. | ||
# This token points *after* the last event in the chunk. | ||
# We need it to point to the event before it in the chunk | ||
# when we are going backwards so we subtract one from the | ||
# stream part. | ||
toke -= 1 | ||
next_token = str(RoomStreamToken(topo, toke)) | ||
else: | ||
# TODO (erikj): We should work out what to do here instead. | ||
next_token = to_key if to_key else from_key | ||
if rows: | ||
topo = rows[-1]["topological_ordering"] | ||
toke = rows[-1]["stream_ordering"] | ||
if direction == 'b': | ||
# Tokens are positions between events. | ||
# This token points *after* the last event in the chunk. | ||
# We need it to point to the event before it in the chunk | ||
# when we are going backwards so we subtract one from the | ||
# stream part. | ||
toke -= 1 | ||
next_token = RoomStreamToken(topo, toke) | ||
else: | ||
# TODO (erikj): We should work out what to do here instead. | ||
next_token = to_token if to_token else from_token | ||
|
||
return rows, next_token, | ||
return rows, str(next_token), | ||
|
||
rows, token = yield self.runInteraction("paginate_room_events", f) | ||
@defer.inlineCallbacks | ||
def paginate_room_events(self, room_id, from_key, to_key=None, | ||
direction='b', limit=-1, event_filter=None): | ||
"""Returns list of events before or after a given token. | ||
|
||
Args: | ||
room_id (str) | ||
from_key (str): The token used to stream from | ||
to_key (str|None): A token which if given limits the results to | ||
only those before | ||
direction(char): Either 'b' or 'f' to indicate whether we are | ||
paginating forwards or backwards from `from_key`. | ||
limit (int): The maximum number of events to return. Zero or less | ||
means no limit. | ||
event_filter (Filter|None): If provided filters the events to | ||
those that match the filter. | ||
|
||
Returns: | ||
tuple[list[dict], str]: Returns the results as a list of dicts and | ||
a token that points to the end of the result set. The dicts have | ||
the keys "event_id", "toplogical_ordering" and "stream_orderign". | ||
""" | ||
|
||
from_key = RoomStreamToken.parse(from_key) | ||
if to_key: | ||
to_key = RoomStreamToken.parse(to_key) | ||
|
||
rows, token = yield self.runInteraction( | ||
"paginate_room_events", self.paginate_room_events_txn, | ||
room_id, from_key, to_key, direction, limit, event_filter, | ||
) | ||
|
||
events = yield self._get_events( | ||
[r["event_id"] for r in rows], | ||
|
@@ -827,3 +808,11 @@ def f(txn): | |
self._set_before_and_after(events, rows) | ||
|
||
defer.returnValue((events, token)) | ||
|
||
|
||
class StreamStore(StreamWorkerStore): | ||
def get_room_max_stream_ordering(self): | ||
return self._stream_id_gen.get_current_token() | ||
|
||
def get_room_min_stream_ordering(self): | ||
return self._backfill_id_gen.get_current_token() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_paginate_events_txn
?