Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Send unread notification counts #456

Merged
merged 38 commits into from
Jan 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
21f135b
Very first cut of calculating actions for events as they come in. Doe…
dbkr Dec 10, 2015
a84a693
Having consulted The Erikle, this should go at the end of on_receive_…
dbkr Dec 10, 2015
aa667ee
Save event actions to the db
dbkr Dec 10, 2015
5e909c7
Store nothing instead of ['dont_notify'] for events with no notificat…
dbkr Dec 10, 2015
42ad49f
still very WIP, but now sends unread_notifications_count in the room …
dbkr Dec 16, 2015
413d0d6
Make unread notification count sending work: put the correct count in…
dbkr Dec 18, 2015
b131fb1
add list of things I want to fix with this branch
dbkr Dec 18, 2015
091c545
pep8
dbkr Dec 21, 2015
f73f154
Only run pushers for users on this hs!
dbkr Dec 21, 2015
c061b47
Merge remote-tracking branch 'origin/develop' into store_event_actions
dbkr Dec 21, 2015
65c451c
Add bulk push rule evaluator which actually still evaluates rules one…
dbkr Dec 22, 2015
77f0685
clarify problems
dbkr Dec 22, 2015
4c8f6a7
Insert push actions in a single db query rather than one per user/pro…
dbkr Dec 22, 2015
3fbb031
Remove the list of problems (moved to jira issues)
dbkr Dec 22, 2015
5645d97
Add some comments to areas that could be optimised.
dbkr Dec 22, 2015
140a50f
Merge remote-tracking branch 'origin/develop' into store_event_actions
dbkr Dec 22, 2015
9b4cd0c
pep8 & unused variable
dbkr Dec 22, 2015
d79e90f
Add mocks to make tests work again
dbkr Dec 22, 2015
d2a92c6
Fix merge fail with anon access stuff
dbkr Dec 22, 2015
3051c9d
Address minor PR issues
dbkr Jan 4, 2016
928c575
Merge remote-tracking branch 'origin/develop' into store_event_actions
dbkr Jan 4, 2016
c914d67
Rename event-actions to event_push_actions as per PR request
dbkr Jan 4, 2016
92a1e74
fix tests
dbkr Jan 4, 2016
f1b6773
Add unread_notif_count in incremental_sync_with_gap
dbkr Jan 4, 2016
d74c6ac
comma
dbkr Jan 4, 2016
c77e7e6
Only joined rooms have unread_notif_count
dbkr Jan 4, 2016
4eb7b95
= not == in sql
dbkr Jan 4, 2016
85ca8cb
comment typo
dbkr Jan 5, 2016
eb03625
Merge remote-tracking branch 'origin/develop' into store_event_actions
dbkr Jan 5, 2016
c79f221
Add is_guest flag to users db to track whether a user is a guest user…
dbkr Jan 6, 2016
ae1262a
Add schema change file for is_guest flag
dbkr Jan 6, 2016
9929283
Delete notifications for redacted events
dbkr Jan 6, 2016
0e48f7f
fix tests
dbkr Jan 6, 2016
b6a5853
Adding is_guest here won't work because it just constructs a dict of …
dbkr Jan 6, 2016
442fcc0
Merge remote-tracking branch 'origin/develop' into store_event_actions
dbkr Jan 6, 2016
09dc985
comma style
dbkr Jan 6, 2016
823b679
more commas
dbkr Jan 7, 2016
daadcf3
This comma is actually important
dbkr Jan 7, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions synapse/handlers/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID, RoomAlias
from synapse.push.action_generator import ActionGenerator

from synapse.util.logcontext import PreserveLoggingContext

Expand Down Expand Up @@ -252,6 +253,11 @@ def handle_new_client_event(self, event, context, extra_destinations=[],
event, context=context
)

action_generator = ActionGenerator(self.store)
yield action_generator.handle_push_actions_for_event(
event, self
)

destinations = set(extra_destinations)
for k, s in context.current_state.items():
try:
Expand Down
8 changes: 8 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

from synapse.util.retryutils import NotRetryingDestination

from synapse.push.action_generator import ActionGenerator

from twisted.internet import defer

import itertools
Expand Down Expand Up @@ -242,6 +244,12 @@ def log_failure(f):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)

if not backfilled and not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.store)
yield action_generator.handle_push_actions_for_event(
event, self
)

@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events(
Expand Down
4 changes: 3 additions & 1 deletion synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def register(
localpart=None,
password=None,
generate_token=True,
guest_access_token=None
guest_access_token=None,
make_guest=False
):
"""Registers a new client on the server.

Expand Down Expand Up @@ -118,6 +119,7 @@ def register(
token=token,
password_hash=password_hash,
was_guest=guest_access_token is not None,
make_guest=make_guest,
)

yield registered_user(self.distributor, user)
Expand Down
63 changes: 63 additions & 0 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"state", # dict[(str, str), FrozenEvent]
"ephemeral",
"account_data",
"unread_notification_count",
])):
__slots__ = []

Expand All @@ -66,6 +67,8 @@ def __nonzero__(self):
or self.state
or self.ephemeral
or self.account_data
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
)


Expand Down Expand Up @@ -163,6 +166,18 @@ def current_sync_for_user(self, sync_config, since_token=None,
else:
return self.incremental_sync_with_gap(sync_config, since_token)

def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room):
if room_id not in ephemeral_by_room:
return None
for e in ephemeral_by_room[room_id]:
if e['type'] != 'm.receipt':
continue
for receipt_event_id, val in e['content'].items():
if 'm.read' in val:
if user_id in val['m.read']:
return receipt_event_id
return None

@defer.inlineCallbacks
def full_state_sync(self, sync_config, timeline_since_token):
"""Get a sync for a client which is starting without any state.
Expand Down Expand Up @@ -274,6 +289,13 @@ def full_state_sync_for_joined_room(self, room_id, sync_config,
room_id, sync_config, now_token, since_token=timeline_since_token
)

notifs = yield self.unread_notifs_for_room_id(
room_id, sync_config, ephemeral_by_room
)
notif_count = None
if notifs is not None:
notif_count = len(notifs)

current_state = yield self.get_state_at(room_id, now_token)

defer.returnValue(JoinedSyncResult(
Expand All @@ -284,6 +306,7 @@ def full_state_sync_for_joined_room(self, room_id, sync_config,
account_data=self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
),
unread_notification_count=notif_count,
))

def account_data_for_user(self, account_data):
Expand Down Expand Up @@ -423,6 +446,13 @@ def incremental_sync_with_gap(self, sync_config, since_token):
)
now_token = now_token.copy_and_replace("presence_key", presence_key)

# We now fetch all ephemeral events for this room in order to get
# this users current read receipt. This could almost certainly be
# optimised.
_, all_ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token
)

now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token, since_token
)
Expand Down Expand Up @@ -496,6 +526,13 @@ def incremental_sync_with_gap(self, sync_config, since_token):
else:
prev_batch = now_token

notifs = yield self.unread_notifs_for_room_id(
room_id, sync_config, all_ephemeral_by_room
)
notif_count = None
if notifs is not None:
notif_count = len(notifs)

just_joined = yield self.check_joined_room(sync_config, state)
if just_joined:
logger.debug("User has just joined %s: needs full state",
Expand All @@ -516,6 +553,7 @@ def incremental_sync_with_gap(self, sync_config, since_token):
account_data=self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
),
unread_notification_count=notif_count
)
logger.debug("Result for room %s: %r", room_id, room_sync)

Expand Down Expand Up @@ -650,6 +688,13 @@ def incremental_sync_with_gap_for_room(self, room_id, sync_config,
if just_joined:
state = yield self.get_state_at(room_id, now_token)

notifs = yield self.unread_notifs_for_room_id(
room_id, sync_config, ephemeral_by_room
)
notif_count = None
if notifs is not None:
notif_count = len(notifs)

room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
Expand All @@ -658,6 +703,7 @@ def incremental_sync_with_gap_for_room(self, room_id, sync_config,
account_data=self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
),
unread_notification_count=notif_count,
)

logging.debug("Room sync: %r", room_sync)
Expand Down Expand Up @@ -788,3 +834,20 @@ def check_joined_room(self, sync_config, state_delta):
if join_event.content["membership"] == Membership.JOIN:
return True
return False

@defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
last_unread_event_id = self.last_read_event_id_for_room_and_user(
room_id, sync_config.user.to_string(), ephemeral_by_room
)

notifs = []
if last_unread_event_id:
notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
else:
# There is no new information in this period, so your notification
# count is whatever it was last time.
defer.returnValue(None)
defer.returnValue(notifs)
21 changes: 6 additions & 15 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
logger = logging.getLogger(__name__)


# Pushers could now be moved to pull out of the event_push_actions table instead
# of listening on the event stream: this would avoid them having to run the
# rules again.
class Pusher(object):
INITIAL_BACKOFF = 1000
MAX_BACKOFF = 60 * 60 * 1000
Expand Down Expand Up @@ -157,21 +160,7 @@ def get_and_dispatch(self):
actions = yield rule_evaluator.actions_for_event(single_event)
tweaks = rule_evaluator.tweaks_for_actions(actions)

if len(actions) == 0:
logger.warn("Empty actions! Using default action.")
actions = Pusher.DEFAULT_ACTIONS

if 'notify' not in actions and 'dont_notify' not in actions:
logger.warn("Neither notify nor dont_notify in actions: adding default")
actions.extend(Pusher.DEFAULT_ACTIONS)

if 'dont_notify' in actions:
logger.debug(
"%s for %s: dont_notify",
single_event['event_id'], self.user_name
)
processed = True
else:
if 'notify' in actions:
rejected = yield self.dispatch_push(single_event, tweaks)
self.has_unread = True
if isinstance(rejected, list) or isinstance(rejected, tuple):
Expand All @@ -192,6 +181,8 @@ def get_and_dispatch(self):
yield self.hs.get_pusherpool().remove_pusher(
self.app_id, pk, self.user_name
)
else:
processed = True

if not self.alive:
return
Expand Down
55 changes: 55 additions & 0 deletions synapse/push/action_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer

import bulk_push_rule_evaluator

import logging

from synapse.api.constants import EventTypes

logger = logging.getLogger(__name__)


class ActionGenerator:
def __init__(self, store):
self.store = store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass the hs and call hs.get_datastore() to get the store.
If you are just using the store then just pass that.

Try to avoid storing the hs on the object if possible to make it clearer what the object uses

# really we want to get all user ids and all profile tags too,
# since we want the actions for each profile tag for every user and
# also actions for a client with no profile tag for each user.
# Currently the event stream doesn't support profile tags on an
# event stream, so we just run the rules for a client with no profile
# tag (ie. we just need all the users).

@defer.inlineCallbacks
def handle_push_actions_for_event(self, event, handler):
if event.type == EventTypes.Redaction and event.redacts is not None:
yield self.store.remove_push_actions_for_event_id(
event.room_id, event.redacts
)

bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
event.room_id, self.store
)

actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler)

yield self.store.set_push_actions_for_event_and_users(
event,
[
(uid, None, actions) for uid, actions in actions_by_user.items()
]
)
Loading