Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3193 from matrix-org/erikj/pagination_refactor
Browse files Browse the repository at this point in the history
Refactor /context to reuse pagination storage functions
  • Loading branch information
erikjohnston authored May 9, 2018
2 parents d5377eb + 23ec51c commit 1aeb5e2
Showing 1 changed file with 99 additions and 110 deletions.
209 changes: 99 additions & 110 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.engines import PostgresEngine

import abc
import logging
Expand Down Expand Up @@ -595,88 +595,28 @@ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_lim
retcols=["stream_ordering", "topological_ordering"],
)

token = RoomStreamToken(
results["topological_ordering"],
# Paginating backwards includes the event at the token, but paginating
# forward doesn't.
before_token = RoomStreamToken(
results["topological_ordering"] - 1,
results["stream_ordering"],
)

if isinstance(self.database_engine, Sqlite3Engine):
# SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
# So we give pass it to SQLite3 as the UNION ALL of the two queries.

query_before = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering < ?"
" UNION ALL"
" SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
)
before_args = (
room_id, token.topological,
room_id, token.topological, token.stream,
before_limit,
)

query_after = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering > ?"
" UNION ALL"
" SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
)
after_args = (
room_id, token.topological,
room_id, token.topological, token.stream,
after_limit,
)
else:
query_before = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND %s"
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
) % (upper_bound(token, self.database_engine, inclusive=False),)

before_args = (room_id, before_limit)

query_after = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND %s"
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
) % (lower_bound(token, self.database_engine, inclusive=False),)

after_args = (room_id, after_limit)

txn.execute(query_before, before_args)
after_token = RoomStreamToken(
results["topological_ordering"],
results["stream_ordering"],
)

rows = self.cursor_to_dict(txn)
rows, start_token = self._paginate_room_events_txn(
txn, room_id, before_token, direction='b', limit=before_limit,
)
events_before = [r["event_id"] for r in rows]

if rows:
start_token = str(RoomStreamToken(
rows[0]["topological_ordering"],
rows[0]["stream_ordering"] - 1,
))
else:
start_token = str(RoomStreamToken(
token.topological,
token.stream - 1,
))

txn.execute(query_after, after_args)

rows = self.cursor_to_dict(txn)
rows, end_token = self._paginate_room_events_txn(
txn, room_id, after_token, direction='f', limit=after_limit,
)
events_after = [r["event_id"] for r in rows]

if rows:
end_token = str(RoomStreamToken(
rows[-1]["topological_ordering"],
rows[-1]["stream_ordering"],
))
else:
end_token = str(token)

return {
"before": {
"event_ids": events_before,
Expand Down Expand Up @@ -738,38 +678,49 @@ def update_federation_out_pos(self, typ, stream_id):
def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id)

def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
direction='b', limit=-1, event_filter=None):
"""Returns list of events before or after a given token.
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()

def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()
Args:
txn
room_id (str)
from_token (RoomStreamToken): The token used to stream from
to_token (RoomStreamToken|None): A token which if given limits the
results to only those before
direction(char): Either 'b' or 'f' to indicate whether we are
paginating forwards or backwards from `from_key`.
limit (int): The maximum number of events to return. Zero or less
means no limit.
event_filter (Filter|None): If provided filters the events to
those that match the filter.
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "toplogical_ordering" and "stream_ordering".
"""
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
args = [False, room_id]
if direction == 'b':
order = "DESC"
bounds = upper_bound(
RoomStreamToken.parse(from_key), self.database_engine
from_token, self.database_engine
)
if to_key:
if to_token:
bounds = "%s AND %s" % (bounds, lower_bound(
RoomStreamToken.parse(to_key), self.database_engine
to_token, self.database_engine
))
else:
order = "ASC"
bounds = lower_bound(
RoomStreamToken.parse(from_key), self.database_engine
from_token, self.database_engine
)
if to_key:
if to_token:
bounds = "%s AND %s" % (bounds, upper_bound(
RoomStreamToken.parse(to_key), self.database_engine
to_token, self.database_engine
))

filter_clause, filter_args = filter_to_clause(event_filter)
Expand All @@ -785,7 +736,8 @@ def paginate_room_events(self, room_id, from_key, to_key=None,
limit_str = ""

sql = (
"SELECT * FROM events"
"SELECT event_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s %(limit)s"
Expand All @@ -795,29 +747,58 @@ def paginate_room_events(self, room_id, from_key, to_key=None,
"limit": limit_str
}

def f(txn):
txn.execute(sql, args)
txn.execute(sql, args)

rows = self.cursor_to_dict(txn)
rows = self.cursor_to_dict(txn)

if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = str(RoomStreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = RoomStreamToken(topo, toke)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token

return rows, next_token,
return rows, str(next_token),

rows, token = yield self.runInteraction("paginate_room_events", f)
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
"""Returns list of events before or after a given token.
Args:
room_id (str)
from_key (str): The token used to stream from
to_key (str|None): A token which if given limits the results to
only those before
direction(char): Either 'b' or 'f' to indicate whether we are
paginating forwards or backwards from `from_key`.
limit (int): The maximum number of events to return. Zero or less
means no limit.
event_filter (Filter|None): If provided filters the events to
those that match the filter.
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "toplogical_ordering" and "stream_orderign".
"""

from_key = RoomStreamToken.parse(from_key)
if to_key:
to_key = RoomStreamToken.parse(to_key)

rows, token = yield self.runInteraction(
"paginate_room_events", self._paginate_room_events_txn,
room_id, from_key, to_key, direction, limit, event_filter,
)

events = yield self._get_events(
[r["event_id"] for r in rows],
Expand All @@ -827,3 +808,11 @@ def f(txn):
self._set_before_and_after(events, rows)

defer.returnValue((events, token))


class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()

def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()

0 comments on commit 1aeb5e2

Please sign in to comment.