Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #705 from matrix-org/dbkr/pushers_use_event_actions
Browse files Browse the repository at this point in the history
Change pushers to use the event_actions table
  • Loading branch information
dbkr committed Apr 11, 2016
2 parents 17515ba + 9bb0417 commit 2547dff
Show file tree
Hide file tree
Showing 19 changed files with 680 additions and 628 deletions.
8 changes: 7 additions & 1 deletion synapse/handlers/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from synapse.types import UserID, RoomAlias, Requester
from synapse.push.action_generator import ActionGenerator

from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn

import logging

Expand Down Expand Up @@ -406,6 +406,12 @@ def is_inviter_member_event(e):
event, context=context
)

# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
event_stream_id, max_stream_id
)

destinations = set()
for k, s in context.current_state.items():
try:
Expand Down
8 changes: 7 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
Expand Down Expand Up @@ -1094,6 +1094,12 @@ def _handle_new_event(self, origin, event, state=None, auth_events=None):
context=context,
)

# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
event_stream_id, max_stream_id
)

defer.returnValue((context, event_stream_id, max_stream_id))

@defer.inlineCallbacks
Expand Down
22 changes: 18 additions & 4 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def _received_remote_receipt(self, origin, content):
def _handle_new_receipts(self, receipts):
"""Takes a list of receipts, stores them and informs the notifier.
"""
min_batch_id = None
max_batch_id = None

for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
Expand All @@ -97,10 +100,21 @@ def _handle_new_receipts(self, receipts):

stream_id, max_persisted_id = res

with PreserveLoggingContext():
self.notifier.on_new_event(
"receipt_key", max_persisted_id, rooms=[room_id]
)
if min_batch_id is None or stream_id < min_batch_id:
min_batch_id = stream_id
if max_batch_id is None or max_persisted_id > max_batch_id:
max_batch_id = max_persisted_id

affected_room_ids = list(set([r["room_id"] for r in receipts]))

with PreserveLoggingContext():
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
)

defer.returnValue(True)

Expand Down
Loading

0 comments on commit 2547dff

Please sign in to comment.