Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #456 from matrix-org/store_event_actions
Browse files Browse the repository at this point in the history
Send unread notification counts
  • Loading branch information
dbkr committed Jan 8, 2016
2 parents 7c816de + daadcf3 commit c232780
Show file tree
Hide file tree
Showing 19 changed files with 541 additions and 36 deletions.
6 changes: 6 additions & 0 deletions synapse/handlers/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID, RoomAlias
from synapse.push.action_generator import ActionGenerator

from synapse.util.logcontext import PreserveLoggingContext

Expand Down Expand Up @@ -252,6 +253,11 @@ def handle_new_client_event(self, event, context, extra_destinations=[],
event, context=context
)

action_generator = ActionGenerator(self.store)
yield action_generator.handle_push_actions_for_event(
event, self
)

destinations = set(extra_destinations)
for k, s in context.current_state.items():
try:
Expand Down
8 changes: 8 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

from synapse.util.retryutils import NotRetryingDestination

from synapse.push.action_generator import ActionGenerator

from twisted.internet import defer

import itertools
Expand Down Expand Up @@ -242,6 +244,12 @@ def log_failure(f):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)

if not backfilled and not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.store)
yield action_generator.handle_push_actions_for_event(
event, self
)

@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events(
Expand Down
4 changes: 3 additions & 1 deletion synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def register(
localpart=None,
password=None,
generate_token=True,
guest_access_token=None
guest_access_token=None,
make_guest=False
):
"""Registers a new client on the server.
Expand Down Expand Up @@ -118,6 +119,7 @@ def register(
token=token,
password_hash=password_hash,
was_guest=guest_access_token is not None,
make_guest=make_guest,
)

yield registered_user(self.distributor, user)
Expand Down
63 changes: 63 additions & 0 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"state", # dict[(str, str), FrozenEvent]
"ephemeral",
"account_data",
"unread_notification_count",
])):
__slots__ = []

Expand All @@ -66,6 +67,8 @@ def __nonzero__(self):
or self.state
or self.ephemeral
or self.account_data
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
)


Expand Down Expand Up @@ -163,6 +166,18 @@ def current_sync_for_user(self, sync_config, since_token=None,
else:
return self.incremental_sync_with_gap(sync_config, since_token)

def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room):
if room_id not in ephemeral_by_room:
return None
for e in ephemeral_by_room[room_id]:
if e['type'] != 'm.receipt':
continue
for receipt_event_id, val in e['content'].items():
if 'm.read' in val:
if user_id in val['m.read']:
return receipt_event_id
return None

@defer.inlineCallbacks
def full_state_sync(self, sync_config, timeline_since_token):
"""Get a sync for a client which is starting without any state.
Expand Down Expand Up @@ -274,6 +289,13 @@ def full_state_sync_for_joined_room(self, room_id, sync_config,
room_id, sync_config, now_token, since_token=timeline_since_token
)

notifs = yield self.unread_notifs_for_room_id(
room_id, sync_config, ephemeral_by_room
)
notif_count = None
if notifs is not None:
notif_count = len(notifs)

current_state = yield self.get_state_at(room_id, now_token)

defer.returnValue(JoinedSyncResult(
Expand All @@ -284,6 +306,7 @@ def full_state_sync_for_joined_room(self, room_id, sync_config,
account_data=self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
),
unread_notification_count=notif_count,
))

def account_data_for_user(self, account_data):
Expand Down Expand Up @@ -423,6 +446,13 @@ def incremental_sync_with_gap(self, sync_config, since_token):
)
now_token = now_token.copy_and_replace("presence_key", presence_key)

# We now fetch all ephemeral events for this room in order to get
# this users current read receipt. This could almost certainly be
# optimised.
_, all_ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token
)

now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token, since_token
)
Expand Down Expand Up @@ -496,6 +526,13 @@ def incremental_sync_with_gap(self, sync_config, since_token):
else:
prev_batch = now_token

notifs = yield self.unread_notifs_for_room_id(
room_id, sync_config, all_ephemeral_by_room
)
notif_count = None
if notifs is not None:
notif_count = len(notifs)

just_joined = yield self.check_joined_room(sync_config, state)
if just_joined:
logger.debug("User has just joined %s: needs full state",
Expand All @@ -516,6 +553,7 @@ def incremental_sync_with_gap(self, sync_config, since_token):
account_data=self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
),
unread_notification_count=notif_count
)
logger.debug("Result for room %s: %r", room_id, room_sync)

Expand Down Expand Up @@ -650,6 +688,13 @@ def incremental_sync_with_gap_for_room(self, room_id, sync_config,
if just_joined:
state = yield self.get_state_at(room_id, now_token)

notifs = yield self.unread_notifs_for_room_id(
room_id, sync_config, ephemeral_by_room
)
notif_count = None
if notifs is not None:
notif_count = len(notifs)

room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
Expand All @@ -658,6 +703,7 @@ def incremental_sync_with_gap_for_room(self, room_id, sync_config,
account_data=self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
),
unread_notification_count=notif_count,
)

logger.debug("Room sync: %r", room_sync)
Expand Down Expand Up @@ -788,3 +834,20 @@ def check_joined_room(self, sync_config, state_delta):
if join_event.content["membership"] == Membership.JOIN:
return True
return False

@defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
last_unread_event_id = self.last_read_event_id_for_room_and_user(
room_id, sync_config.user.to_string(), ephemeral_by_room
)

notifs = []
if last_unread_event_id:
notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
else:
# There is no new information in this period, so your notification
# count is whatever it was last time.
defer.returnValue(None)
defer.returnValue(notifs)
21 changes: 6 additions & 15 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
logger = logging.getLogger(__name__)


# Pushers could now be moved to pull out of the event_push_actions table instead
# of listening on the event stream: this would avoid them having to run the
# rules again.
class Pusher(object):
INITIAL_BACKOFF = 1000
MAX_BACKOFF = 60 * 60 * 1000
Expand Down Expand Up @@ -157,21 +160,7 @@ def get_and_dispatch(self):
actions = yield rule_evaluator.actions_for_event(single_event)
tweaks = rule_evaluator.tweaks_for_actions(actions)

if len(actions) == 0:
logger.warn("Empty actions! Using default action.")
actions = Pusher.DEFAULT_ACTIONS

if 'notify' not in actions and 'dont_notify' not in actions:
logger.warn("Neither notify nor dont_notify in actions: adding default")
actions.extend(Pusher.DEFAULT_ACTIONS)

if 'dont_notify' in actions:
logger.debug(
"%s for %s: dont_notify",
single_event['event_id'], self.user_name
)
processed = True
else:
if 'notify' in actions:
rejected = yield self.dispatch_push(single_event, tweaks)
self.has_unread = True
if isinstance(rejected, list) or isinstance(rejected, tuple):
Expand All @@ -192,6 +181,8 @@ def get_and_dispatch(self):
yield self.hs.get_pusherpool().remove_pusher(
self.app_id, pk, self.user_name
)
else:
processed = True

if not self.alive:
return
Expand Down
55 changes: 55 additions & 0 deletions synapse/push/action_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer

import bulk_push_rule_evaluator

import logging

from synapse.api.constants import EventTypes

logger = logging.getLogger(__name__)


class ActionGenerator:
def __init__(self, store):
self.store = store
# really we want to get all user ids and all profile tags too,
# since we want the actions for each profile tag for every user and
# also actions for a client with no profile tag for each user.
# Currently the event stream doesn't support profile tags on an
# event stream, so we just run the rules for a client with no profile
# tag (ie. we just need all the users).

@defer.inlineCallbacks
def handle_push_actions_for_event(self, event, handler):
if event.type == EventTypes.Redaction and event.redacts is not None:
yield self.store.remove_push_actions_for_event_id(
event.room_id, event.redacts
)

bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
event.room_id, self.store
)

actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler)

yield self.store.set_push_actions_for_event_and_users(
event,
[
(uid, None, actions) for uid, actions in actions_by_user.items()
]
)
Loading

0 comments on commit c232780

Please sign in to comment.