Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Calculate the forward extremeties once
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jan 20, 2017
1 parent 09eb08f commit 4c6a31c
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 126 deletions.
76 changes: 6 additions & 70 deletions synapse/storage/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,80 +235,21 @@ def _handle_mult_prev_events(self, txn, events):
],
)

self._update_extremeties(txn, events)
self._update_backward_extremeties(txn, events)

def _update_extremeties(self, txn, events):
"""Updates the event_*_extremities tables based on the new/updated
def _update_backward_extremeties(self, txn, events):
"""Updates the event_backward_extremities tables based on the new/updated
events being persisted.
This is called for new events *and* for events that were outliers, but
are are now being persisted as non-outliers.
are now being persisted as non-outliers.
Forward extremities are handled when we first start persisting the events.
"""
events_by_room = {}
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)

for room_id, room_events in events_by_room.items():
prevs = [
e_id for ev in room_events for e_id, _ in ev.prev_events
if not ev.internal_metadata.is_outlier()
]
if prevs:
txn.execute(
"DELETE FROM event_forward_extremities"
" WHERE room_id = ?"
" AND event_id in (%s)" % (
",".join(["?"] * len(prevs)),
),
[room_id] + prevs,
)

query = (
"INSERT INTO event_forward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_edges WHERE prev_event_id = ?"
" )"
)

txn.executemany(
query,
[
(ev.event_id, ev.room_id, ev.event_id) for ev in events
if not ev.internal_metadata.is_outlier()
]
)

# We now insert into stream_ordering_to_exterm a mapping from room_id,
# new stream_ordering to new forward extremeties in the room.
# This allows us to later efficiently look up the forward extremeties
# for a room before a given stream_ordering
max_stream_ord = max(
ev.internal_metadata.stream_ordering for ev in events
)
new_extrem = {}
for room_id in events_by_room:
event_ids = self._simple_select_onecol_txn(
txn,
table="event_forward_extremities",
keyvalues={"room_id": room_id},
retcol="event_id",
)
new_extrem[room_id] = event_ids

self._simple_insert_many_txn(
txn,
table="stream_ordering_to_exterm",
values=[
{
"room_id": room_id,
"event_id": event_id,
"stream_ordering": max_stream_ord,
}
for room_id, extrem_evs in new_extrem.items()
for event_id in extrem_evs
]
)

query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
Expand Down Expand Up @@ -339,11 +280,6 @@ def _update_extremeties(self, txn, events):
]
)

for room_id in events_by_room:
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, (room_id,)
)

def get_forward_extremeties_for_room(self, room_id, stream_ordering):
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
Expand Down
142 changes: 86 additions & 56 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ def _persist_events(self, events_and_contexts, backfilled=False,
# We can't easily parallelize these since different chunks
# might contain the same event. :(

new_forward_extremeties = {}
current_state_for_room = {}
if not backfilled:
# Work out the new "current state" for each room.
Expand All @@ -296,20 +297,16 @@ def _persist_events(self, events_and_contexts, backfilled=False,
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = set(latest_event_ids)
for event, ctx in ev_ctx_rm:
if event.internal_metadata.is_outlier():
continue

new_latest_event_ids.difference_update(
e_id for e_id, _ in event.prev_events
)
new_latest_event_ids.add(event.event_id)
new_latest_event_ids = yield self._calculate_new_extremeties(
room_id, [ev for ev, _ in ev_ctx_rm]
)

if new_latest_event_ids == set(latest_event_ids):
# No change in extremities, so no change in state
continue

new_forward_extremeties[room_id] = new_latest_event_ids

# Now we need to work out the different state sets for
# each state extremities
state_sets = []
Expand Down Expand Up @@ -358,9 +355,45 @@ def _persist_events(self, events_and_contexts, backfilled=False,
backfilled=backfilled,
delete_existing=delete_existing,
current_state_for_room=current_state_for_room,
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))

@defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, events):
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = set(latest_event_ids)
new_latest_event_ids.update(
event.event_id for event in events
if not event.internal_metadata.is_outlier()
)
new_latest_event_ids.difference_update(
e_id
for event in events
for e_id, _ in event.prev_events
if not event.internal_metadata.is_outlier()
)

rows = yield self._simple_select_many_batch(
table="event_edges",
column="prev_event_id",
iterable=list(new_latest_event_ids),
retcols=["prev_event_id"],
keyvalues={
"room_id": room_id,
"is_state": False,
},
desc="_calculate_new_extremeties",
)

new_latest_event_ids.difference_update(
row["prev_event_id"] for row in rows
)

defer.returnValue(new_latest_event_ids)

@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
Expand Down Expand Up @@ -417,53 +450,10 @@ def get_events(self, event_ids, check_redacted=True,

defer.returnValue({e.event_id: e for e in events})

@log_function
def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
delete_existing=False):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))

# Add an entry to the current_state_resets table to record the point
# where we clobbered the current state
stream_order = event.internal_metadata.stream_ordering
self._simple_insert_txn(
txn,
table="current_state_resets",
values={"event_stream_ordering": stream_order}
)

self._simple_delete_txn(
txn,
table="current_state_events",
keyvalues={"room_id": event.room_id},
)

for s in current_state:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": s.event_id,
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
}
)

return self._persist_events_txn(
txn,
[(event, context)],
backfilled=backfilled,
delete_existing=delete_existing,
)

@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
delete_existing=False, current_state_for_room={}):
delete_existing=False, current_state_for_room={},
new_forward_extremeties={}):
"""Insert some number of room events into the necessary database tables.
Rejected events are only inserted into the events table, the events_json table,
Expand All @@ -473,18 +463,18 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled,
If delete_existing is True then existing events will be purged from the
database before insertion. This is useful when retrying due to IntegrityError.
"""
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
for room_id, current_state in current_state_for_room.iteritems():
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (room_id,))

# Add an entry to the current_state_resets table to record the point
# where we clobbered the current state
stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
self._simple_insert_txn(
txn,
table="current_state_resets",
values={"event_stream_ordering": stream_order}
values={"event_stream_ordering": max_stream_order}
)

self._simple_delete_txn(
Expand All @@ -507,6 +497,46 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled,
],
)

for room_id, new_extrem in new_forward_extremeties.items():
self._simple_delete_txn(
txn,
table="event_forward_extremities",
keyvalues={"room_id": room_id},
)
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, (room_id,)
)

self._simple_insert_many_txn(
txn,
table="event_forward_extremities",
values=[
{
"event_id": ev_id,
"room_id": room_id,
}
for room_id, new_extrem in new_forward_extremeties.items()
for ev_id in new_extrem
],
)
# We now insert into stream_ordering_to_exterm a mapping from room_id,
# new stream_ordering to new forward extremeties in the room.
# This allows us to later efficiently look up the forward extremeties
# for a room before a given stream_ordering
self._simple_insert_many_txn(
txn,
table="stream_ordering_to_exterm",
values=[
{
"room_id": room_id,
"event_id": event_id,
"stream_ordering": max_stream_order,
}
for room_id, new_extrem in new_forward_extremeties.items()
for event_id in new_extrem
]
)

# Ensure that we don't have the same event twice.
# Pick the earliest non-outlier if there is one, else the earliest one.
new_events_and_contexts = OrderedDict()
Expand Down

0 comments on commit 4c6a31c

Please sign in to comment.