From 21f135ba763a583ecf9ba2714b5151f6b14b61fd Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 10 Dec 2015 16:26:08 +0000 Subject: [PATCH 01/33] Very first cut of calculating actions for events as they come in. Doesn't store them yet. Not very efficient. --- synapse/handlers/_base.py | 8 ++++++ synapse/handlers/federation.py | 15 +++++++++- synapse/push/action_generator.py | 47 ++++++++++++++++++++++++++++++++ synapse/storage/registration.py | 12 ++++++++ 4 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 synapse/push/action_generator.py diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 5fd20285d297..a8e8c4f5afdc 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -19,9 +19,12 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes from synapse.types import UserID, RoomAlias +from synapse.push.action_generator import ActionGenerator from synapse.util.logcontext import PreserveLoggingContext +from synapse.events.utils import serialize_event + import logging @@ -264,6 +267,11 @@ def handle_new_client_event(self, event, context, extra_destinations=[], event, context=context ) + action_generator = ActionGenerator(self.store) + yield action_generator.handle_event(serialize_event( + event, self.clock.time_msec() + )) + destinations = set(extra_destinations) for k, s in context.current_state.items(): try: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2855f2d7c3a6..18289eb529c9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -32,10 +32,12 @@ ) from synapse.types import UserID -from synapse.events.utils import prune_event +from synapse.events.utils import prune_event, serialize_event from synapse.util.retryutils import NotRetryingDestination +from synapse.push.action_generator import ActionGenerator + from twisted.internet import defer import itertools @@ -1113,6 +1115,11 @@ def _handle_new_event(self, origin, event, state=None, backfilled=False, current_state=current_state, ) + action_generator = ActionGenerator(self.store) + yield action_generator.handle_event(serialize_event( + event, self.clock.time_msec()) + ) + defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks @@ -1139,6 +1146,12 @@ def _handle_new_events(self, origin, event_infos, backfilled=False, is_new_state=(not outliers and not backfilled), ) + for ev_info in event_infos: + action_generator = ActionGenerator(self.store) + yield action_generator.handle_event(serialize_event( + ev_info["event"], self.clock.time_msec()) + ) + @defer.inlineCallbacks def _persist_auth_tree(self, auth_events, state, event): """Checks the auth chain is valid (and passes auth checks) for the diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py new file mode 100644 index 000000000000..508eeaed957c --- /dev/null +++ b/synapse/push/action_generator.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import push_rule_evaluator + +import logging + + +logger = logging.getLogger(__name__) + + +class ActionGenerator: + def __init__(self, store): + self.store = store + # really we want to get all user ids and all profile tags too, + # since we want the actions for each profile tag for every user and + # also actions for a client with no profile tag for each user. + # Currently the event stream doesn't support profile tags on an + # event stream, so we just run the rules for a client with no profile + # tag (ie. we just need all the users). + + @defer.inlineCallbacks + def handle_event(self, event): + users = yield self.store.get_users_in_room(event['room_id']) + logger.error("users in room: %r", users) + + for uid in users: + evaluator = yield push_rule_evaluator.\ + evaluator_for_user_name_and_profile_tag( + uid, None, event['room_id'], self.store + ) + actions = yield evaluator.actions_for_event(event) + logger.info("actions for user %s: %s", uid, actions) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 2e5eddd2594b..f230faa25eba 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -291,6 +291,18 @@ def get_user_id_by_threepid(self, medium, address): defer.returnValue(ret['user_id']) defer.returnValue(None) + @defer.inlineCallbacks + def get_all_user_ids(self): + """Returns all user ids registered on this homeserver""" + return self.runInteraction( + "get_all_user_ids", + self._get_all_user_ids_txn + ) + + def _get_all_user_ids_txn(self, txn): + txn.execute("SELECT name from users") + return [r[0] for r in txn.fetchall()] + @defer.inlineCallbacks def count_all_users(self): """Counts all users registered on the homeserver.""" From a84a6933274c0da64c41ac494b027ab5010a4801 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 10 Dec 2015 17:18:46 +0000 Subject: [PATCH 02/33] Having consulted The Erikle, this should go at the end of on_receive_pdu, otherwise it will be triggered whenever we backfill too. --- synapse/handlers/federation.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 18289eb529c9..6a4269b50085 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -244,6 +244,12 @@ def log_failure(f): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) + if not backfilled and not event.internal_metadata.is_outlier(): + action_generator = ActionGenerator(self.store) + yield action_generator.handle_event(serialize_event( + event, self.clock.time_msec()) + ) + @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): event_to_state = yield self.store.get_state_for_events( @@ -1115,11 +1121,6 @@ def _handle_new_event(self, origin, event, state=None, backfilled=False, current_state=current_state, ) - action_generator = ActionGenerator(self.store) - yield action_generator.handle_event(serialize_event( - event, self.clock.time_msec()) - ) - defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks @@ -1146,12 +1147,6 @@ def _handle_new_events(self, origin, event_infos, backfilled=False, is_new_state=(not outliers and not backfilled), ) - for ev_info in event_infos: - action_generator = ActionGenerator(self.store) - yield action_generator.handle_event(serialize_event( - ev_info["event"], self.clock.time_msec()) - ) - @defer.inlineCallbacks def _persist_auth_tree(self, auth_events, state, event): """Checks the auth chain is valid (and passes auth checks) for the From aa667ee396c473f497b084655d47b2a9520a538a Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 10 Dec 2015 17:51:15 +0000 Subject: [PATCH 03/33] Save event actions to the db --- synapse/push/action_generator.py | 6 ++- synapse/storage/__init__.py | 2 + synapse/storage/event_actions.py | 41 +++++++++++++++++++ .../storage/schema/delta/27/event_actions.sql | 25 +++++++++++ 4 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 synapse/storage/event_actions.py create mode 100644 synapse/storage/schema/delta/27/event_actions.sql diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 508eeaed957c..870c68a0cafb 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -19,7 +19,6 @@ import logging - logger = logging.getLogger(__name__) @@ -42,6 +41,9 @@ def handle_event(self, event): evaluator = yield push_rule_evaluator.\ evaluator_for_user_name_and_profile_tag( uid, None, event['room_id'], self.store - ) + ) actions = yield evaluator.actions_for_event(event) logger.info("actions for user %s: %s", uid, actions) + self.store.set_actions_for_event( + event['event_id'], uid, None, actions + ) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c46b653f1155..a112dd237f4a 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -33,6 +33,7 @@ from .push_rule import PushRuleStore from .media_repository import MediaRepositoryStore from .rejections import RejectionsStore +from .event_actions import EventActionsStore from .state import StateStore from .signatures import SignatureStore @@ -75,6 +76,7 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, + EventActionsStore ): def __init__(self, hs): diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py new file mode 100644 index 000000000000..593b1714c718 --- /dev/null +++ b/synapse/storage/event_actions.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer + +import logging +import simplejson as json + +logger = logging.getLogger(__name__) + + +class EventActionsStore(SQLBaseStore): + @defer.inlineCallbacks + def set_actions_for_event(self, event_id, user_id, profile_tag, actions): + actionsJson = json.dumps(actions) + + ret = yield self.runInteraction( + "_set_actions_for_event", + self._simple_upsert_txn, + EventActionsTable.table_name, + {'event_id': event_id, 'user_id': user_id, 'profile_tag': profile_tag}, + {'actions': actionsJson} + ) + defer.returnValue(ret) + + +class EventActionsTable(object): + table_name = "event_actions" diff --git a/synapse/storage/schema/delta/27/event_actions.sql b/synapse/storage/schema/delta/27/event_actions.sql new file mode 100644 index 000000000000..1246823a0090 --- /dev/null +++ b/synapse/storage/schema/delta/27/event_actions.sql @@ -0,0 +1,25 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_actions( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + profile_tag VARCHAR(32), + actions TEXT NOT NULL, + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (event_id, user_id, profile_tag) +); + + +CREATE INDEX event_actions_event_id_user_id_profile_tag on event_actions(event_id, user_id, profile_tag); From 5e909c73d76cd56e3eac91635a99405182dcac3c Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 10 Dec 2015 18:40:28 +0000 Subject: [PATCH 04/33] Store nothing instead of ['dont_notify'] for events with no notification required: much as it would be nice to be able to tell between the event not having been processed and there being no notification for it, this isn't worth filling up the table with ['dont_notify'] I think. Consequently treat the empty actions array as dont_notify and filter dont_notify out of the result. --- synapse/push/__init__.py | 18 +++--------------- synapse/push/action_generator.py | 8 ++++---- synapse/push/push_rule_evaluator.py | 9 +++++++-- 3 files changed, 14 insertions(+), 21 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index e7c964bcd2ed..d5928c1d291a 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -157,21 +157,7 @@ def get_and_dispatch(self): actions = yield rule_evaluator.actions_for_event(single_event) tweaks = rule_evaluator.tweaks_for_actions(actions) - if len(actions) == 0: - logger.warn("Empty actions! Using default action.") - actions = Pusher.DEFAULT_ACTIONS - - if 'notify' not in actions and 'dont_notify' not in actions: - logger.warn("Neither notify nor dont_notify in actions: adding default") - actions.extend(Pusher.DEFAULT_ACTIONS) - - if 'dont_notify' in actions: - logger.debug( - "%s for %s: dont_notify", - single_event['event_id'], self.user_name - ) - processed = True - else: + if 'notify' in actions: rejected = yield self.dispatch_push(single_event, tweaks) self.has_unread = True if isinstance(rejected, list) or isinstance(rejected, tuple): @@ -192,6 +178,8 @@ def get_and_dispatch(self): yield self.hs.get_pusherpool().remove_pusher( self.app_id, pk, self.user_name ) + else: + processed = True if not self.alive: return diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 870c68a0cafb..a72a7d703ca1 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -35,7 +35,6 @@ def __init__(self, store): @defer.inlineCallbacks def handle_event(self, event): users = yield self.store.get_users_in_room(event['room_id']) - logger.error("users in room: %r", users) for uid in users: evaluator = yield push_rule_evaluator.\ @@ -44,6 +43,7 @@ def handle_event(self, event): ) actions = yield evaluator.actions_for_event(event) logger.info("actions for user %s: %s", uid, actions) - self.store.set_actions_for_event( - event['event_id'], uid, None, actions - ) + if len(actions): + self.store.set_actions_for_event( + event['event_id'], uid, None, actions + ) diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 92c7fd048fd8..420476fd0bdb 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -43,7 +43,7 @@ def evaluator_for_user_name_and_profile_tag(user_name, profile_tag, room_id, sto class PushRuleEvaluator: - DEFAULT_ACTIONS = ['dont_notify'] + DEFAULT_ACTIONS = [] INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") def __init__(self, user_name, profile_tag, raw_rules, enabled_map, room_id, @@ -85,7 +85,7 @@ def actions_for_event(self, ev): """ if ev['user_id'] == self.user_name: # let's assume you probably know about messages you sent yourself - defer.returnValue(['dont_notify']) + defer.returnValue([]) room_id = ev['room_id'] @@ -131,6 +131,11 @@ def actions_for_event(self, ev): "%s matches for user %s, event %s", r['rule_id'], self.user_name, ev['event_id'] ) + + # filter out dont_notify as we treat an empty actions list + # as dont_notify, and this doesn't take up a row in our database + actions = [x for x in actions if x != 'dont_notify'] + defer.returnValue(actions) logger.info( From 42ad49f5b75c2c645c4060026c21c5572f5b1063 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Dec 2015 18:42:09 +0000 Subject: [PATCH 05/33] still very WIP, but now sends unread_notifications_count in the room object on sync (only actually corrrect in a full sync: hardcoded to 0 in incremental syncs). --- synapse/handlers/sync.py | 26 +++++++++ synapse/push/action_generator.py | 2 +- synapse/rest/client/v2_alpha/sync.py | 1 + synapse/storage/event_actions.py | 53 ++++++++++++++++++- .../storage/schema/delta/27/event_actions.sql | 5 +- 5 files changed, 82 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 24c2b2fad68b..6d193a10c4da 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -52,6 +52,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "state", # dict[(str, str), FrozenEvent] "ephemeral", "account_data", + "unread_notification_count", ])): __slots__ = [] @@ -64,6 +65,7 @@ def __nonzero__(self): or self.state or self.ephemeral or self.account_data + or self.unread_notification_count > 0 ) @@ -161,6 +163,18 @@ def current_sync_for_user(self, sync_config, since_token=None, else: return self.incremental_sync_with_gap(sync_config, since_token) + def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room): + if room_id not in ephemeral_by_room: + return None + for e in ephemeral_by_room[room_id]: + if e['type'] != 'm.receipt': + continue + for receipt_event_id,val in e['content'].items(): + if 'm.read' in val: + if user_id in val['m.read']: + return receipt_event_id + return None + @defer.inlineCallbacks def full_state_sync(self, sync_config, timeline_since_token): """Get a sync for a client which is starting without any state. @@ -265,6 +279,16 @@ def full_state_sync_for_joined_room(self, room_id, sync_config, room_id, sync_config, now_token, since_token=timeline_since_token ) + last_unread_event_id = self.last_read_event_id_for_room_and_user( + room_id, sync_config.user.to_string(), ephemeral_by_room + ) + + notifs = [] + if last_unread_event_id: + notifs = yield self.store.get_unread_event_actions_by_room( + room_id, last_unread_event_id + ) + current_state = yield self.get_state_at(room_id, now_token) defer.returnValue(JoinedSyncResult( @@ -275,6 +299,7 @@ def full_state_sync_for_joined_room(self, room_id, sync_config, account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=len(notifs) )) def account_data_for_user(self, account_data): @@ -509,6 +534,7 @@ def incremental_sync_with_gap(self, sync_config, since_token): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=0 ) logger.debug("Result for room %s: %r", room_id, room_sync) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index a72a7d703ca1..1c7cd31666cb 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -45,5 +45,5 @@ def handle_event(self, event): logger.info("actions for user %s: %s", uid, actions) if len(actions): self.store.set_actions_for_event( - event['event_id'], uid, None, actions + event, uid, None, actions ) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index f0a637a6da33..4ca10732c1db 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -304,6 +304,7 @@ def serialize(event): }, "state": {"events": serialized_state}, "account_data": {"events": account_data}, + "unread_notification_count": room.unread_notification_count } if joined: diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index 593b1714c718..40ac8e2d2751 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -24,18 +24,67 @@ class EventActionsStore(SQLBaseStore): @defer.inlineCallbacks - def set_actions_for_event(self, event_id, user_id, profile_tag, actions): + def set_actions_for_event(self, event, user_id, profile_tag, actions): actionsJson = json.dumps(actions) ret = yield self.runInteraction( "_set_actions_for_event", self._simple_upsert_txn, EventActionsTable.table_name, - {'event_id': event_id, 'user_id': user_id, 'profile_tag': profile_tag}, + { + 'room_id': event['room_id'], + 'event_id': event['event_id'], + 'user_id': user_id, + 'profile_tag': profile_tag + }, {'actions': actionsJson} ) defer.returnValue(ret) + @defer.inlineCallbacks + def get_unread_event_actions_by_room(self, room_id, last_read_event_id): + #events = yield self._get_events( + # [last_read_event_id], + # check_redacted=False + #) + + def _get_unread_event_actions_by_room(txn): + sql = ( + "SELECT stream_ordering, topological_ordering" + " FROM events" + " WHERE room_id = ? AND event_id = ?" + ) + txn.execute( + sql, (room_id, last_read_event_id) + ) + results = txn.fetchall() + if len(results) == 0: + return [] + + stream_ordering = results[0][0] + topological_ordering = results[0][1] + + sql = ( + "SELECT ea.actions" + " FROM event_actions ea, events e" + " WHERE ea.room_id = e.room_id" + " AND ea.event_id = e.event_id" + " AND ea.room_id = ?" + " AND (" + " e.topological_ordering > ?" + " OR (e.topological_ordering == ? AND e.stream_ordering > ?)" + ")" + ) + txn.execute(sql, + (room_id, topological_ordering, topological_ordering, stream_ordering) + ) + return txn.fetchall() + + ret = yield self.runInteraction( + "get_unread_event_actions_by_room", + _get_unread_event_actions_by_room + ) + defer.returnValue(ret) class EventActionsTable(object): table_name = "event_actions" diff --git a/synapse/storage/schema/delta/27/event_actions.sql b/synapse/storage/schema/delta/27/event_actions.sql index 1246823a0090..bbdaee990ef7 100644 --- a/synapse/storage/schema/delta/27/event_actions.sql +++ b/synapse/storage/schema/delta/27/event_actions.sql @@ -14,12 +14,13 @@ */ CREATE TABLE IF NOT EXISTS event_actions( + room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, profile_tag VARCHAR(32), actions TEXT NOT NULL, - CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (event_id, user_id, profile_tag) + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) ); -CREATE INDEX event_actions_event_id_user_id_profile_tag on event_actions(event_id, user_id, profile_tag); +CREATE INDEX event_actions_room_id_event_id_user_id_profile_tag on event_actions(room_id, event_id, user_id, profile_tag); From 413d0d6a2404c579b1fa39ece9a698f9df8349db Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 18 Dec 2015 17:47:00 +0000 Subject: [PATCH 06/33] Make unread notification count sending work: put the correct count in incremental syncs too, where necessary, and fix silly bugs like only select the event actions for that user... --- synapse/handlers/sync.py | 48 ++++++++++++++++++++++++-------- synapse/storage/event_actions.py | 21 ++++++++------ 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6d193a10c4da..44420a063a8c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -65,7 +65,8 @@ def __nonzero__(self): or self.state or self.ephemeral or self.account_data - or self.unread_notification_count > 0 + # nb the notification count does not, er, count: if there's nothing + # else in the result, we don't need to send it. ) @@ -279,15 +280,12 @@ def full_state_sync_for_joined_room(self, room_id, sync_config, room_id, sync_config, now_token, since_token=timeline_since_token ) - last_unread_event_id = self.last_read_event_id_for_room_and_user( - room_id, sync_config.user.to_string(), ephemeral_by_room + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config, ephemeral_by_room ) - - notifs = [] - if last_unread_event_id: - notifs = yield self.store.get_unread_event_actions_by_room( - room_id, last_unread_event_id - ) + notif_count = None + if notifs is not None: + notif_count = len(notifs) current_state = yield self.get_state_at(room_id, now_token) @@ -299,7 +297,7 @@ def full_state_sync_for_joined_room(self, room_id, sync_config, account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), - unread_notification_count=len(notifs) + unread_notification_count=notif_count )) def account_data_for_user(self, account_data): @@ -441,6 +439,10 @@ def incremental_sync_with_gap(self, sync_config, since_token): ) now_token = now_token.copy_and_replace("presence_key", presence_key) + _, all_ephemeral_by_room = yield self.ephemeral_by_room( + sync_config, now_token + ) + now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token, since_token ) @@ -514,6 +516,13 @@ def incremental_sync_with_gap(self, sync_config, since_token): else: prev_batch = now_token + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config, all_ephemeral_by_room + ) + notif_count = None + if notifs is not None: + notif_count = len(notifs) + just_joined = yield self.check_joined_room(sync_config, state) if just_joined: logger.debug("User has just joined %s: needs full state", @@ -534,7 +543,7 @@ def incremental_sync_with_gap(self, sync_config, since_token): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), - unread_notification_count=0 + unread_notification_count=notif_count ) logger.debug("Result for room %s: %r", room_id, room_sync) @@ -805,3 +814,20 @@ def check_joined_room(self, sync_config, state_delta): if join_event.content["membership"] == Membership.JOIN: return True return False + + @defer.inlineCallbacks + def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room): + last_unread_event_id = self.last_read_event_id_for_room_and_user( + room_id, sync_config.user.to_string(), ephemeral_by_room + ) + + notifs = [] + if last_unread_event_id: + notifs = yield self.store.get_unread_event_actions_by_room_for_user( + room_id, sync_config.user.to_string(), last_unread_event_id + ) + else: + # There is no new information in this period, so your notification + # count is whatever it was last time. + defer.returnValue(None) + defer.returnValue(notifs) \ No newline at end of file diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index 40ac8e2d2751..f7fe78e554dc 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -42,12 +42,9 @@ def set_actions_for_event(self, event, user_id, profile_tag, actions): defer.returnValue(ret) @defer.inlineCallbacks - def get_unread_event_actions_by_room(self, room_id, last_read_event_id): - #events = yield self._get_events( - # [last_read_event_id], - # check_redacted=False - #) - + def get_unread_event_actions_by_room_for_user( + self, room_id, user_id, last_read_event_id + ): def _get_unread_event_actions_by_room(txn): sql = ( "SELECT stream_ordering, topological_ordering" @@ -65,10 +62,11 @@ def _get_unread_event_actions_by_room(txn): topological_ordering = results[0][1] sql = ( - "SELECT ea.actions" + "SELECT ea.event_id, ea.actions" " FROM event_actions ea, events e" " WHERE ea.room_id = e.room_id" " AND ea.event_id = e.event_id" + " AND ea.user_id = ?" " AND ea.room_id = ?" " AND (" " e.topological_ordering > ?" @@ -76,9 +74,14 @@ def _get_unread_event_actions_by_room(txn): ")" ) txn.execute(sql, - (room_id, topological_ordering, topological_ordering, stream_ordering) + ( + user_id, room_id, + topological_ordering, topological_ordering, stream_ordering + ) ) - return txn.fetchall() + return [ + { "event_id": row[0], "actions": row[1] } for row in txn.fetchall() + ] ret = yield self.runInteraction( "get_unread_event_actions_by_room", From b131fb1fe26da50ce30656cadbb24e72bd7ecdf9 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 18 Dec 2015 18:04:45 +0000 Subject: [PATCH 07/33] add list of things I want to fix with this branch --- problems_with_this_branch | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 problems_with_this_branch diff --git a/problems_with_this_branch b/problems_with_this_branch new file mode 100644 index 000000000000..9c5df5521289 --- /dev/null +++ b/problems_with_this_branch @@ -0,0 +1,5 @@ + * processing push by bluntly running the rules for each user + * processing push twice because the pushers still run the rules themselves rather than pulling out of the event_actions table + * consequently converting events back & forth between objects & dicts when processing + * fetching all the current ephemeral events for each room on an incremental sync to get the user's read receipt + * doing 2 more db queries per room to get the unread notif count From 091c545c4fb38f662b61cb46779a813f70971e4f Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 21 Dec 2015 10:14:57 +0000 Subject: [PATCH 08/33] pep8 --- synapse/handlers/sync.py | 6 +++--- synapse/storage/event_actions.py | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 44420a063a8c..20b2a2595a95 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -170,7 +170,7 @@ def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_ro for e in ephemeral_by_room[room_id]: if e['type'] != 'm.receipt': continue - for receipt_event_id,val in e['content'].items(): + for receipt_event_id, val in e['content'].items(): if 'm.read' in val: if user_id in val['m.read']: return receipt_event_id @@ -281,7 +281,7 @@ def full_state_sync_for_joined_room(self, room_id, sync_config, ) notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config, ephemeral_by_room + room_id, sync_config, ephemeral_by_room ) notif_count = None if notifs is not None: @@ -830,4 +830,4 @@ def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room): # There is no new information in this period, so your notification # count is whatever it was last time. defer.returnValue(None) - defer.returnValue(notifs) \ No newline at end of file + defer.returnValue(notifs) diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index f7fe78e554dc..fbd0a4227949 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -73,14 +73,13 @@ def _get_unread_event_actions_by_room(txn): " OR (e.topological_ordering == ? AND e.stream_ordering > ?)" ")" ) - txn.execute(sql, - ( - user_id, room_id, - topological_ordering, topological_ordering, stream_ordering - ) + txn.execute(sql, ( + user_id, room_id, + topological_ordering, topological_ordering, stream_ordering + ) ) return [ - { "event_id": row[0], "actions": row[1] } for row in txn.fetchall() + {"event_id": row[0], "actions": row[1]} for row in txn.fetchall() ] ret = yield self.runInteraction( @@ -89,5 +88,6 @@ def _get_unread_event_actions_by_room(txn): ) defer.returnValue(ret) + class EventActionsTable(object): table_name = "event_actions" From f73f154ec2c8ffdc49270d3ccaf3053f915800f3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 21 Dec 2015 15:28:54 +0000 Subject: [PATCH 09/33] Only run pushers for users on this hs! --- synapse/handlers/_base.py | 2 +- synapse/handlers/federation.py | 2 +- synapse/push/action_generator.py | 8 +++++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index a8e8c4f5afdc..24c4c6269861 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -267,7 +267,7 @@ def handle_new_client_event(self, event, context, extra_destinations=[], event, context=context ) - action_generator = ActionGenerator(self.store) + action_generator = ActionGenerator(self.hs, self.store) yield action_generator.handle_event(serialize_event( event, self.clock.time_msec() )) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6a4269b50085..6525bde43075 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -245,7 +245,7 @@ def log_failure(f): yield user_joined_room(self.distributor, user, event.room_id) if not backfilled and not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.store) + action_generator = ActionGenerator(self.hs, self.store) yield action_generator.handle_event(serialize_event( event, self.clock.time_msec()) ) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 1c7cd31666cb..6e107ca7925d 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from synapse.types import UserID + import push_rule_evaluator import logging @@ -23,7 +25,8 @@ class ActionGenerator: - def __init__(self, store): + def __init__(self, hs, store): + self.hs = hs self.store = store # really we want to get all user ids and all profile tags too, # since we want the actions for each profile tag for every user and @@ -37,6 +40,9 @@ def handle_event(self, event): users = yield self.store.get_users_in_room(event['room_id']) for uid in users: + if not self.hs.is_mine(UserID.from_string(uid)): + continue + evaluator = yield push_rule_evaluator.\ evaluator_for_user_name_and_profile_tag( uid, None, event['room_id'], self.store From 65c451cb3878fb41f28a2adecd638894e18f5343 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 15:19:34 +0000 Subject: [PATCH 10/33] Add bulk push rule evaluator which actually still evaluates rules one by one, but does far fewer db queries to fetch the rules --- synapse/push/action_generator.py | 28 +++---- synapse/push/bulk_push_rule_evaluator.py | 99 ++++++++++++++++++++++++ synapse/push/push_rule_evaluator.py | 13 ++-- synapse/storage/push_rule.py | 41 ++++++++++ 4 files changed, 159 insertions(+), 22 deletions(-) create mode 100644 synapse/push/bulk_push_rule_evaluator.py diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 6e107ca7925d..2ad5f82da23b 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -15,9 +15,7 @@ from twisted.internet import defer -from synapse.types import UserID - -import push_rule_evaluator +import bulk_push_rule_evaluator import logging @@ -39,17 +37,13 @@ def __init__(self, hs, store): def handle_event(self, event): users = yield self.store.get_users_in_room(event['room_id']) - for uid in users: - if not self.hs.is_mine(UserID.from_string(uid)): - continue - - evaluator = yield push_rule_evaluator.\ - evaluator_for_user_name_and_profile_tag( - uid, None, event['room_id'], self.store - ) - actions = yield evaluator.actions_for_event(event) - logger.info("actions for user %s: %s", uid, actions) - if len(actions): - self.store.set_actions_for_event( - event, uid, None, actions - ) + bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( + event['room_id'], self.hs, self.store + ) + + actions_by_user = bulk_evaluator.action_for_event_by_user(event) + + for uid,actions in actions_by_user.items(): + self.store.set_actions_for_event( + event, uid, None, actions + ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py new file mode 100644 index 000000000000..f531d2edc468 --- /dev/null +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import simplejson as json + +from twisted.internet import defer + +from synapse.types import UserID + +import baserules +from push_rule_evaluator import PushRuleEvaluator + +logger = logging.getLogger(__name__) + + +def decode_rule_json(rule): + rule['conditions'] = json.loads(rule['conditions']) + rule['actions'] = json.loads(rule['actions']) + return rule + + +@defer.inlineCallbacks +def evaluator_for_room_id(room_id, hs, store): + users = yield store.get_users_in_room(room_id) + rules_by_user = yield store.bulk_get_push_rules(users) + rules_by_user = { + uid: baserules.list_with_base_rules( + [decode_rule_json(rule_list) for rule_list in rules_by_user[uid]] + if uid in rules_by_user else [], + UserID.from_string(uid) + ) + for uid in users + } + member_events = yield store.get_current_state( + room_id=room_id, + event_type='m.room.member', + ) + display_names = {} + for ev in member_events: + if ev.content.get("displayname"): + display_names[ev.state_key] = ev.content.get("displayname") + + defer.returnValue(BulkPushRuleEvaluator( + room_id, rules_by_user, display_names, users + )) + + +class BulkPushRuleEvaluator: + def __init__(self, room_id, rules_by_user, display_names, users_in_room): + self.room_id = room_id + self.rules_by_user = rules_by_user + self.display_names = display_names + self.users_in_room = users_in_room + + def action_for_event_by_user(self, event): + actions_by_user = {} + + for uid, rules in self.rules_by_user.items(): + display_name = None + if uid in self.display_names: + display_name = self.display_names[uid] + + for rule in rules: + if 'enabled' in rule and not rule['enabled']: + continue + + # XXX: profile tags + if BulkPushRuleEvaluator.event_matches_rule( + event, rule, + display_name, len(self.users_in_room), None + ): + actions = [x for x in rule['actions'] if x != 'dont_notify'] + if len(actions) > 0: + actions_by_user[uid] = actions + break + return actions_by_user + + @staticmethod + def event_matches_rule(event, rule, + display_name, room_member_count, profile_tag): + matches = True + for cond in rule['conditions']: + matches &= PushRuleEvaluator._event_fulfills_condition( + event, cond, display_name, room_member_count, profile_tag + ) + return matches \ No newline at end of file diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 420476fd0bdb..40c7622ec41d 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -113,7 +113,8 @@ def actions_for_event(self, ev): for c in conditions: matches &= self._event_fulfills_condition( ev, c, display_name=my_display_name, - room_member_count=room_member_count + room_member_count=room_member_count, + profile_tag=self.profile_tag ) logger.debug( "Rule %s %s", @@ -156,16 +157,18 @@ def _glob_to_regexp(glob): re.sub(r'\\\-', '-', x.group(2)))), r) return r - def _event_fulfills_condition(self, ev, condition, display_name, room_member_count): + @staticmethod + def _event_fulfills_condition(ev, condition, + display_name, room_member_count, profile_tag): if condition['kind'] == 'event_match': if 'pattern' not in condition: logger.warn("event_match condition with no pattern") return False # XXX: optimisation: cache our pattern regexps if condition['key'] == 'content.body': - r = r'\b%s\b' % self._glob_to_regexp(condition['pattern']) + r = r'\b%s\b' % PushRuleEvaluator._glob_to_regexp(condition['pattern']) else: - r = r'^%s$' % self._glob_to_regexp(condition['pattern']) + r = r'^%s$' % PushRuleEvaluator._glob_to_regexp(condition['pattern']) val = _value_for_dotted_key(condition['key'], ev) if val is None: return False @@ -174,7 +177,7 @@ def _event_fulfills_condition(self, ev, condition, display_name, room_member_cou elif condition['kind'] == 'device': if 'profile_tag' not in condition: return True - return condition['profile_tag'] == self.profile_tag + return condition['profile_tag'] == profile_tag elif condition['kind'] == 'contains_display_name': # This is special because display names can be different diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 5305b7e1227a..9dec4aa68543 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -55,6 +55,47 @@ def get_push_rules_enabled_for_user(self, user_name): r['rule_id']: False if r['enabled'] == 0 else True for r in results }) + @defer.inlineCallbacks + def bulk_get_push_rules(self, user_ids): + batch_size = 100 + + def f(txn, user_ids_to_fetch): + sql = ( + "SELECT " + + ",".join(map(lambda x: "pr."+x, PushRuleTable.fields)) + + " FROM " + PushRuleTable.table_name + " pr " + + " LEFT JOIN " + PushRuleEnableTable.table_name + " pre " + + " ON pr.user_name = pre.user_name and pr.rule_id = pre.rule_id " + + " WHERE pr.user_name " + + " IN (" + ",".join(["?" for _ in user_ids_to_fetch]) + ")" + " AND (pre.enabled is null or pre.enabled = 1)" + " ORDER BY pr.user_name, pr.priority_class DESC, pr.priority DESC" + ) + txn.execute(sql, user_ids_to_fetch) + return txn.fetchall() + + results = {} + + batch_start = 0 + while batch_start < len(user_ids): + batch_end = max(len(user_ids), batch_size) + batch_user_ids = user_ids[batch_start:batch_end] + batch_start = batch_end + + rows = yield self.runInteraction( + "bulk_get_push_rules", f, batch_user_ids + ) + + for r in rows: + rawdict = { + PushRuleTable.fields[i]: r[i] for i in range(len(r)) + } + + if rawdict['user_name'] not in results: + results[rawdict['user_name']] = [] + results[rawdict['user_name']].append(rawdict) + defer.returnValue(results) + @defer.inlineCallbacks def add_push_rule(self, before, after, **kwargs): vals = kwargs From 77f06856b612e2905b5b69f6f2de75ddd348adfd Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 15:22:26 +0000 Subject: [PATCH 11/33] clarify problems --- problems_with_this_branch | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/problems_with_this_branch b/problems_with_this_branch index 9c5df5521289..baed6f5c6be9 100644 --- a/problems_with_this_branch +++ b/problems_with_this_branch @@ -2,4 +2,4 @@ * processing push twice because the pushers still run the rules themselves rather than pulling out of the event_actions table * consequently converting events back & forth between objects & dicts when processing * fetching all the current ephemeral events for each room on an incremental sync to get the user's read receipt - * doing 2 more db queries per room to get the unread notif count + * doing 2 more db queries per room at sync time to get the unread notif count From 4c8f6a7e427cc0e22ff1a19c3f1d9da0f9438f18 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:04:31 +0000 Subject: [PATCH 12/33] Insert push actions in a single db query rather than one per user/profile_tag --- synapse/push/action_generator.py | 10 ++++++---- synapse/storage/event_actions.py | 31 ++++++++++++++++++------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 2ad5f82da23b..148b1bda8e06 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -43,7 +43,9 @@ def handle_event(self, event): actions_by_user = bulk_evaluator.action_for_event_by_user(event) - for uid,actions in actions_by_user.items(): - self.store.set_actions_for_event( - event, uid, None, actions - ) + yield self.store.set_actions_for_event_and_users( + event, + [ + (uid, None, actions) for uid, actions in actions_by_user.items() + ] + ) diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index fbd0a4227949..3efa445c18d8 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -24,22 +24,27 @@ class EventActionsStore(SQLBaseStore): @defer.inlineCallbacks - def set_actions_for_event(self, event, user_id, profile_tag, actions): - actionsJson = json.dumps(actions) - - ret = yield self.runInteraction( - "_set_actions_for_event", - self._simple_upsert_txn, - EventActionsTable.table_name, - { + def set_actions_for_event_and_users(self, event, tuples): + """ + :param event: the event set actions for + :param tuples: list of tuples of (user_id, profile_tag, actions) + """ + values = [] + for uid, profile_tag, actions in tuples: + values.append({ 'room_id': event['room_id'], 'event_id': event['event_id'], - 'user_id': user_id, - 'profile_tag': profile_tag - }, - {'actions': actionsJson} + 'user_id': uid, + 'profile_tag': profile_tag, + 'actions': json.dumps(actions) + }) + + yield self.runInteraction( + "set_actions_for_event_and_users", + self._simple_insert_many_txn, + EventActionsTable.table_name, + values ) - defer.returnValue(ret) @defer.inlineCallbacks def get_unread_event_actions_by_room_for_user( From 3fbb0317453f00f4d84ebc52145d8afb6490909f Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:13:47 +0000 Subject: [PATCH 13/33] Remove the list of problems (moved to jira issues) --- problems_with_this_branch | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 problems_with_this_branch diff --git a/problems_with_this_branch b/problems_with_this_branch deleted file mode 100644 index baed6f5c6be9..000000000000 --- a/problems_with_this_branch +++ /dev/null @@ -1,5 +0,0 @@ - * processing push by bluntly running the rules for each user - * processing push twice because the pushers still run the rules themselves rather than pulling out of the event_actions table - * consequently converting events back & forth between objects & dicts when processing - * fetching all the current ephemeral events for each room on an incremental sync to get the user's read receipt - * doing 2 more db queries per room at sync time to get the unread notif count From 5645d9747b17e9d119cc7badd7c2abe3c157a1a6 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:19:22 +0000 Subject: [PATCH 14/33] Add some comments to areas that could be optimised. --- synapse/handlers/sync.py | 3 +++ synapse/push/__init__.py | 4 +++- synapse/push/bulk_push_rule_evaluator.py | 8 ++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4cbb43a31ba0..fa5e954e010d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -447,6 +447,9 @@ def incremental_sync_with_gap(self, sync_config, since_token): ) now_token = now_token.copy_and_replace("presence_key", presence_key) + # We now fetch all ephemeral events for this room in order to get + # this users current read receipt. This could almost certainly be + # optimised. _, all_ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token ) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index d5928c1d291a..635dedd5236b 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -26,7 +26,9 @@ logger = logging.getLogger(__name__) - +# Pushers could now be moved to pull out of the event_actions table instead +# of listening on the event stream: this would avoid them having to run the +# rules again. class Pusher(object): INITIAL_BACKOFF = 1000 MAX_BACKOFF = 60 * 60 * 1000 diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index f531d2edc468..1c0fa72b2537 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -59,6 +59,14 @@ def evaluator_for_room_id(room_id, hs, store): class BulkPushRuleEvaluator: + """ + Runs push rules for all users in a room. + This is faster than running PushRuleEvaluator for each user because it + fetches all the rules for all the users in one (batched) db query + rarher than doing multiple queries per-user. It currently uses + the same logic to run the actual rules, but could be optimised further + (see https://matrix.org/jira/browse/SYN-562) + """ def __init__(self, room_id, rules_by_user, display_names, users_in_room): self.room_id = room_id self.rules_by_user = rules_by_user From 9b4cd0cd0f31803657018bf0ac9178787d796912 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:25:09 +0000 Subject: [PATCH 15/33] pep8 & unused variable --- synapse/push/__init__.py | 1 + synapse/push/action_generator.py | 2 -- synapse/push/bulk_push_rule_evaluator.py | 4 ++-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 635dedd5236b..250f22a16806 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -26,6 +26,7 @@ logger = logging.getLogger(__name__) + # Pushers could now be moved to pull out of the event_actions table instead # of listening on the event stream: this would avoid them having to run the # rules again. diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 148b1bda8e06..00f518f6097e 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -35,8 +35,6 @@ def __init__(self, hs, store): @defer.inlineCallbacks def handle_event(self, event): - users = yield self.store.get_users_in_room(event['room_id']) - bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( event['room_id'], self.hs, self.store ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 1c0fa72b2537..c489bfc8d4e7 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -54,7 +54,7 @@ def evaluator_for_room_id(room_id, hs, store): display_names[ev.state_key] = ev.content.get("displayname") defer.returnValue(BulkPushRuleEvaluator( - room_id, rules_by_user, display_names, users + room_id, rules_by_user, display_names, users )) @@ -104,4 +104,4 @@ def event_matches_rule(event, rule, matches &= PushRuleEvaluator._event_fulfills_condition( event, cond, display_name, room_member_count, profile_tag ) - return matches \ No newline at end of file + return matches From d79e90f078c83314de3dc469770750dd2585e255 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:56:56 +0000 Subject: [PATCH 16/33] Add mocks to make tests work again --- tests/handlers/test_federation.py | 7 +++++++ tests/handlers/test_room.py | 9 +++++++++ 2 files changed, 16 insertions(+) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index d392c2301553..a4758c03db38 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -49,6 +49,10 @@ def setUp(self): "get_destination_retry_timings", "set_destination_retry_timings", "have_events", + "get_users_in_room", + "bulk_get_push_rules", + "get_current_state", + "set_actions_for_event_and_users", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -85,6 +89,9 @@ def test_msg(self): self.datastore.persist_event.return_value = defer.succeed((1,1)) self.datastore.get_room.return_value = defer.succeed(True) + self.datastore.get_users_in_room.return_value = ["@a:b"] + self.datastore.bulk_get_push_rules.return_value = {} + self.datastore.get_current_state.return_value = {} self.auth.check_host_in_room.return_value = defer.succeed(True) retry_timings_res = { diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index 2a7553f982dc..ba20b3194567 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -43,6 +43,10 @@ def setUp(self): "store_room", "get_latest_events_in_room", "add_event_hashes", + "get_users_in_room", + "bulk_get_push_rules", + "get_current_state", + "set_actions_for_event_and_users", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -90,6 +94,8 @@ def setUp(self): self.datastore.persist_event.return_value = (1,1) self.datastore.add_event_hashes.return_value = [] + self.datastore.get_users_in_room.return_value = ["@bob:red"] + self.datastore.bulk_get_push_rules.return_value = {} @defer.inlineCallbacks def test_invite(self): @@ -109,6 +115,7 @@ def test_invite(self): self.datastore.get_latest_events_in_room.return_value = ( defer.succeed([]) ) + self.datastore.get_current_state.return_value = {} def annotate(_): ctx = Mock() @@ -190,6 +197,7 @@ def test_simple_join(self): self.datastore.get_latest_events_in_room.return_value = ( defer.succeed([]) ) + self.datastore.get_current_state.return_value = {} def annotate(_): ctx = Mock() @@ -265,6 +273,7 @@ def test_simple_leave(self): self.datastore.get_latest_events_in_room.return_value = ( defer.succeed([]) ) + self.datastore.get_current_state.return_value = {} def annotate(_): ctx = Mock() From d2a92c6bdeff51136a930ccddda6e734a2a0dc25 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 18:25:04 +0000 Subject: [PATCH 17/33] Fix merge fail with anon access stuff --- synapse/handlers/sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b1bfdce85b1d..f63c073a20a8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -520,11 +520,11 @@ def incremental_sync_with_gap(self, sync_config, since_token): # this users current read receipt. This could almost certainly be # optimised. _, all_ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token + sync_config, now_token, room_ids ) now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token, since_token + sync_config, now_token, room_ids, since_token ) rm_handler = self.hs.get_handlers().room_member_handler From 3051c9d002a467643d1ab32bc36974d2e3f84c12 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 13:39:29 +0000 Subject: [PATCH 18/33] Address minor PR issues --- synapse/handlers/_base.py | 4 ++-- synapse/handlers/federation.py | 4 ++-- synapse/push/action_generator.py | 7 +++---- synapse/push/bulk_push_rule_evaluator.py | 2 +- synapse/storage/event_actions.py | 2 +- synapse/storage/push_rule.py | 6 +++--- synapse/storage/registration.py | 12 ------------ 7 files changed, 12 insertions(+), 25 deletions(-) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 24c4c6269861..938eb29de7df 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -267,8 +267,8 @@ def handle_new_client_event(self, event, context, extra_destinations=[], event, context=context ) - action_generator = ActionGenerator(self.hs, self.store) - yield action_generator.handle_event(serialize_event( + action_generator = ActionGenerator(self.store) + yield action_generator.handle_push_actions_for_event(serialize_event( event, self.clock.time_msec() )) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0b1221deb5b8..764709b4243d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -245,8 +245,8 @@ def log_failure(f): yield user_joined_room(self.distributor, user, event.room_id) if not backfilled and not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.hs, self.store) - yield action_generator.handle_event(serialize_event( + action_generator = ActionGenerator(self.store) + yield action_generator.handle_push_actions_for_event(serialize_event( event, self.clock.time_msec()) ) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 00f518f6097e..4ab5d9e1b8c2 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -23,8 +23,7 @@ class ActionGenerator: - def __init__(self, hs, store): - self.hs = hs + def __init__(self, store): self.store = store # really we want to get all user ids and all profile tags too, # since we want the actions for each profile tag for every user and @@ -34,9 +33,9 @@ def __init__(self, hs, store): # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_event(self, event): + def handle_push_actions_for_event(self, event): bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( - event['room_id'], self.hs, self.store + event['room_id'], self.store ) actions_by_user = bulk_evaluator.action_for_event_by_user(event) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index c489bfc8d4e7..1c4e54ba44db 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -33,7 +33,7 @@ def decode_rule_json(rule): @defer.inlineCallbacks -def evaluator_for_room_id(room_id, hs, store): +def evaluator_for_room_id(room_id, store): users = yield store.get_users_in_room(room_id) rules_by_user = yield store.bulk_get_push_rules(users) rules_by_user = { diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index 3efa445c18d8..fa9cbe71eea5 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd +# Copyright 2015 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 9dec4aa68543..7c5123d6444c 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -62,12 +62,12 @@ def bulk_get_push_rules(self, user_ids): def f(txn, user_ids_to_fetch): sql = ( "SELECT " + - ",".join(map(lambda x: "pr."+x, PushRuleTable.fields)) + + ",".join("pr."+x for x in PushRuleTable.fields) + " FROM " + PushRuleTable.table_name + " pr " + " LEFT JOIN " + PushRuleEnableTable.table_name + " pre " + " ON pr.user_name = pre.user_name and pr.rule_id = pre.rule_id " + " WHERE pr.user_name " + - " IN (" + ",".join(["?" for _ in user_ids_to_fetch]) + ")" + " IN (" + ",".join("?" for _ in user_ids_to_fetch) + ")" " AND (pre.enabled is null or pre.enabled = 1)" " ORDER BY pr.user_name, pr.priority_class DESC, pr.priority DESC" ) @@ -78,7 +78,7 @@ def f(txn, user_ids_to_fetch): batch_start = 0 while batch_start < len(user_ids): - batch_end = max(len(user_ids), batch_size) + batch_end = min(len(user_ids), batch_size) batch_user_ids = user_ids[batch_start:batch_end] batch_start = batch_end diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 4676f225b967..09a05b08ef60 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -291,18 +291,6 @@ def get_user_id_by_threepid(self, medium, address): defer.returnValue(ret['user_id']) defer.returnValue(None) - @defer.inlineCallbacks - def get_all_user_ids(self): - """Returns all user ids registered on this homeserver""" - return self.runInteraction( - "get_all_user_ids", - self._get_all_user_ids_txn - ) - - def _get_all_user_ids_txn(self, txn): - txn.execute("SELECT name from users") - return [r[0] for r in txn.fetchall()] - @defer.inlineCallbacks def count_all_users(self): """Counts all users registered on the homeserver.""" From c914d67cda9682331639b78190db367974e4fb8b Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 14:05:37 +0000 Subject: [PATCH 19/33] Rename event-actions to event_push_actions as per PR request --- synapse/handlers/sync.py | 2 +- synapse/push/__init__.py | 2 +- synapse/push/action_generator.py | 2 +- synapse/storage/__init__.py | 4 ++-- ...event_actions.py => event_push_actions.py} | 20 +++++++++---------- ...ent_actions.sql => event_push_actions.sql} | 4 ++-- 6 files changed, 17 insertions(+), 17 deletions(-) rename synapse/storage/{event_actions.py => event_push_actions.py} (84%) rename synapse/storage/schema/delta/27/{event_actions.sql => event_push_actions.sql} (82%) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f63c073a20a8..64556c5eb8d1 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -896,7 +896,7 @@ def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room): notifs = [] if last_unread_event_id: - notifs = yield self.store.get_unread_event_actions_by_room_for_user( + notifs = yield self.store.get_unread_event_push_actions_by_room_for_user( room_id, sync_config.user.to_string(), last_unread_event_id ) else: diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 250f22a16806..3ab6da06255f 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) -# Pushers could now be moved to pull out of the event_actions table instead +# Pushers could now be moved to pull out of the event_push_actions table instead # of listening on the event stream: this would avoid them having to run the # rules again. class Pusher(object): diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 4ab5d9e1b8c2..5526324a6dc0 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -40,7 +40,7 @@ def handle_push_actions_for_event(self, event): actions_by_user = bulk_evaluator.action_for_event_by_user(event) - yield self.store.set_actions_for_event_and_users( + yield self.store.set_push_actions_for_event_and_users( event, [ (uid, None, actions) for uid, actions in actions_by_user.items() diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a112dd237f4a..43e05f144ac8 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -33,7 +33,7 @@ from .push_rule import PushRuleStore from .media_repository import MediaRepositoryStore from .rejections import RejectionsStore -from .event_actions import EventActionsStore +from .event_push_actions import EventPushActionsStore from .state import StateStore from .signatures import SignatureStore @@ -76,7 +76,7 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, - EventActionsStore + EventPushActionsStore ): def __init__(self, hs): diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_push_actions.py similarity index 84% rename from synapse/storage/event_actions.py rename to synapse/storage/event_push_actions.py index fa9cbe71eea5..016c0adf8a4d 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,9 +22,9 @@ logger = logging.getLogger(__name__) -class EventActionsStore(SQLBaseStore): +class EventPushActionsStore(SQLBaseStore): @defer.inlineCallbacks - def set_actions_for_event_and_users(self, event, tuples): + def set_push_actions_for_event_and_users(self, event, tuples): """ :param event: the event set actions for :param tuples: list of tuples of (user_id, profile_tag, actions) @@ -42,15 +42,15 @@ def set_actions_for_event_and_users(self, event, tuples): yield self.runInteraction( "set_actions_for_event_and_users", self._simple_insert_many_txn, - EventActionsTable.table_name, + EventPushActionsTable.table_name, values ) @defer.inlineCallbacks - def get_unread_event_actions_by_room_for_user( + def get_unread_event_push_actions_by_room_for_user( self, room_id, user_id, last_read_event_id ): - def _get_unread_event_actions_by_room(txn): + def _get_unread_event_push_actions_by_room(txn): sql = ( "SELECT stream_ordering, topological_ordering" " FROM events" @@ -68,7 +68,7 @@ def _get_unread_event_actions_by_room(txn): sql = ( "SELECT ea.event_id, ea.actions" - " FROM event_actions ea, events e" + " FROM event_push_actions ea, events e" " WHERE ea.room_id = e.room_id" " AND ea.event_id = e.event_id" " AND ea.user_id = ?" @@ -88,11 +88,11 @@ def _get_unread_event_actions_by_room(txn): ] ret = yield self.runInteraction( - "get_unread_event_actions_by_room", - _get_unread_event_actions_by_room + "get_unread_event_push_actions_by_room", + _get_unread_event_push_actions_by_room ) defer.returnValue(ret) -class EventActionsTable(object): - table_name = "event_actions" +class EventPushActionsTable(object): + table_name = "event_push_actions" diff --git a/synapse/storage/schema/delta/27/event_actions.sql b/synapse/storage/schema/delta/27/event_push_actions.sql similarity index 82% rename from synapse/storage/schema/delta/27/event_actions.sql rename to synapse/storage/schema/delta/27/event_push_actions.sql index bbdaee990ef7..bdf6ae3f2495 100644 --- a/synapse/storage/schema/delta/27/event_actions.sql +++ b/synapse/storage/schema/delta/27/event_push_actions.sql @@ -13,7 +13,7 @@ * limitations under the License. */ -CREATE TABLE IF NOT EXISTS event_actions( +CREATE TABLE IF NOT EXISTS event_push_actions( room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -23,4 +23,4 @@ CREATE TABLE IF NOT EXISTS event_actions( ); -CREATE INDEX event_actions_room_id_event_id_user_id_profile_tag on event_actions(room_id, event_id, user_id, profile_tag); +CREATE INDEX event_push_actions_room_id_event_id_user_id_profile_tag on event_push_actions(room_id, event_id, user_id, profile_tag); From 92a1e74b202757b0f4b577ccbd3e31d8dd4d6460 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 14:17:35 +0000 Subject: [PATCH 20/33] fix tests --- tests/handlers/test_federation.py | 2 +- tests/handlers/test_room.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index a4758c03db38..6acc4ebadc8b 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -52,7 +52,7 @@ def setUp(self): "get_users_in_room", "bulk_get_push_rules", "get_current_state", - "set_actions_for_event_and_users", + "set_push_actions_for_event_and_users", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index ba20b3194567..ff2b5971243e 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -46,7 +46,7 @@ def setUp(self): "get_users_in_room", "bulk_get_push_rules", "get_current_state", - "set_actions_for_event_and_users", + "set_push_actions_for_event_and_users", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), From f1b67730fa085755a7ab459b0239608bd3585a67 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 14:50:36 +0000 Subject: [PATCH 21/33] Add unread_notif_count in incremental_sync_with_gap --- synapse/handlers/sync.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 64556c5eb8d1..93c48d167a0b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -749,6 +749,13 @@ def incremental_sync_with_gap_for_room(self, room_id, sync_config, if just_joined: state = yield self.get_state_at(room_id, now_token) + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config, ephemeral_by_room + ) + notif_count = None + if notifs is not None: + notif_count = len(notifs) + room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, @@ -757,6 +764,7 @@ def incremental_sync_with_gap_for_room(self, room_id, sync_config, account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=notif_count, ) logging.debug("Room sync: %r", room_sync) From d74c6ace247c31524dacf795108300e53bdbff55 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 15:32:00 +0000 Subject: [PATCH 22/33] comma --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 93c48d167a0b..2ec42ee5033d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -346,7 +346,7 @@ def full_state_sync_for_joined_room(self, room_id, sync_config, account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), - unread_notification_count=notif_count + unread_notification_count=notif_count, )) def account_data_for_user(self, account_data): From c77e7e60fc5d29e8a57bb82dd5aa8e72ae570d84 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 15:49:06 +0000 Subject: [PATCH 23/33] Only joined rooms have unread_notif_count --- synapse/rest/client/v2_alpha/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index cd3aef9e07fa..095f96e218dd 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -311,12 +311,12 @@ def serialize(event): }, "state": {"events": serialized_state}, "account_data": {"events": account_data}, - "unread_notification_count": room.unread_notification_count } if joined: ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) result["ephemeral"] = {"events": ephemeral_events} + result["unread_notification_count"] = room.unread_notification_count return result From 4eb7b950c829dd8463df8ccd1095772452293a15 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 18:11:17 +0000 Subject: [PATCH 24/33] = not == in sql --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 016c0adf8a4d..3075d02257c7 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -75,7 +75,7 @@ def _get_unread_event_push_actions_by_room(txn): " AND ea.room_id = ?" " AND (" " e.topological_ordering > ?" - " OR (e.topological_ordering == ? AND e.stream_ordering > ?)" + " OR (e.topological_ordering = ? AND e.stream_ordering > ?)" ")" ) txn.execute(sql, ( From 85ca8cb90c706ad40034a05282d751887d9bf05f Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 5 Jan 2016 13:32:39 +0000 Subject: [PATCH 25/33] comment typo --- synapse/push/bulk_push_rule_evaluator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 1c4e54ba44db..c00acfd87eaa 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -63,7 +63,7 @@ class BulkPushRuleEvaluator: Runs push rules for all users in a room. This is faster than running PushRuleEvaluator for each user because it fetches all the rules for all the users in one (batched) db query - rarher than doing multiple queries per-user. It currently uses + rather than doing multiple queries per-user. It currently uses the same logic to run the actual rules, but could be optimised further (see https://matrix.org/jira/browse/SYN-562) """ From c79f221192044203b9d32cfbd416a7fefeb34cd5 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 11:38:09 +0000 Subject: [PATCH 26/33] Add is_guest flag to users db to track whether a user is a guest user or not. Use this so we can run _filter_events_for_client when calculating event_push_actions. --- synapse/handlers/_base.py | 8 ++-- synapse/handlers/federation.py | 6 +-- synapse/handlers/register.py | 4 +- synapse/push/action_generator.py | 6 +-- synapse/push/bulk_push_rule_evaluator.py | 27 ++++++++++--- synapse/rest/client/v2_alpha/register.py | 5 ++- synapse/storage/event_push_actions.py | 4 +- synapse/storage/registration.py | 40 ++++++++++++++----- .../delta/{27 => 28}/event_push_actions.sql | 0 9 files changed, 69 insertions(+), 31 deletions(-) rename synapse/storage/schema/delta/{27 => 28}/event_push_actions.sql (100%) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 3115a5065d85..66e35de6e461 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -23,8 +23,6 @@ from synapse.util.logcontext import PreserveLoggingContext -from synapse.events.utils import serialize_event - import logging @@ -256,9 +254,9 @@ def handle_new_client_event(self, event, context, extra_destinations=[], ) action_generator = ActionGenerator(self.store) - yield action_generator.handle_push_actions_for_event(serialize_event( - event, self.clock.time_msec() - )) + yield action_generator.handle_push_actions_for_event( + event, self + ) destinations = set(extra_destinations) for k, s in context.current_state.items(): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 764709b4243d..075b9e21c34a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -32,7 +32,7 @@ ) from synapse.types import UserID -from synapse.events.utils import prune_event, serialize_event +from synapse.events.utils import prune_event from synapse.util.retryutils import NotRetryingDestination @@ -246,8 +246,8 @@ def log_failure(f): if not backfilled and not event.internal_metadata.is_outlier(): action_generator = ActionGenerator(self.store) - yield action_generator.handle_push_actions_for_event(serialize_event( - event, self.clock.time_msec()) + yield action_generator.handle_push_actions_for_event( + event, self ) @defer.inlineCallbacks diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 6f111ff63ee6..1799a668c6ca 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -84,7 +84,8 @@ def register( localpart=None, password=None, generate_token=True, - guest_access_token=None + guest_access_token=None, + make_guest=False ): """Registers a new client on the server. @@ -118,6 +119,7 @@ def register( token=token, password_hash=password_hash, was_guest=guest_access_token is not None, + make_guest=make_guest ) yield registered_user(self.distributor, user) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 5526324a6dc0..bcd40798f9db 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -33,12 +33,12 @@ def __init__(self, store): # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_push_actions_for_event(self, event): + def handle_push_actions_for_event(self, event, handler): bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( - event['room_id'], self.store + event.room_id, self.store ) - actions_by_user = bulk_evaluator.action_for_event_by_user(event) + actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler) yield self.store.set_push_actions_for_event_and_users( event, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index c00acfd87eaa..63d65b446588 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -23,6 +23,8 @@ import baserules from push_rule_evaluator import PushRuleEvaluator +from synapse.events.utils import serialize_event + logger = logging.getLogger(__name__) @@ -54,7 +56,7 @@ def evaluator_for_room_id(room_id, store): display_names[ev.state_key] = ev.content.get("displayname") defer.returnValue(BulkPushRuleEvaluator( - room_id, rules_by_user, display_names, users + room_id, rules_by_user, display_names, users, store )) @@ -67,13 +69,15 @@ class BulkPushRuleEvaluator: the same logic to run the actual rules, but could be optimised further (see https://matrix.org/jira/browse/SYN-562) """ - def __init__(self, room_id, rules_by_user, display_names, users_in_room): + def __init__(self, room_id, rules_by_user, display_names, users_in_room, store): self.room_id = room_id self.rules_by_user = rules_by_user self.display_names = display_names self.users_in_room = users_in_room + self.store = store - def action_for_event_by_user(self, event): + @defer.inlineCallbacks + def action_for_event_by_user(self, event, handler): actions_by_user = {} for uid, rules in self.rules_by_user.items(): @@ -81,6 +85,13 @@ def action_for_event_by_user(self, event): if uid in self.display_names: display_name = self.display_names[uid] + is_guest = yield self.store.is_guest(UserID.from_string(uid)) + filtered = yield handler._filter_events_for_client( + uid, [event], is_guest=is_guest + ) + if len(filtered) == 0: + continue + for rule in rules: if 'enabled' in rule and not rule['enabled']: continue @@ -94,14 +105,20 @@ def action_for_event_by_user(self, event): if len(actions) > 0: actions_by_user[uid] = actions break - return actions_by_user + defer.returnValue(actions_by_user) @staticmethod def event_matches_rule(event, rule, display_name, room_member_count, profile_tag): matches = True + + # passing the clock all the way into here is extremely awkward and push + # rules do not care about any of the relative timestamps, so we just + # pass 0 for the current time. + client_event = serialize_event(event, 0) + for cond in rule['conditions']: matches &= PushRuleEvaluator._event_fulfills_condition( - event, cond, display_name, room_member_count, profile_tag + client_event, cond, display_name, room_member_count, profile_tag ) return matches diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 25389ceded6f..c4d025b465df 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -259,7 +259,10 @@ def onEmailTokenRequest(self, request): def _do_guest_registration(self): if not self.hs.config.allow_guest_access: defer.returnValue((403, "Guest access is disabled")) - user_id, _ = yield self.registration_handler.register(generate_token=False) + user_id, _ = yield self.registration_handler.register( + generate_token=False, + make_guest=True + ) access_token = self.auth_handler.generate_access_token(user_id, ["guest = true"]) defer.returnValue((200, { "user_id": user_id, diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3075d02257c7..0634af6b62b5 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -32,8 +32,8 @@ def set_push_actions_for_event_and_users(self, event, tuples): values = [] for uid, profile_tag, actions in tuples: values.append({ - 'room_id': event['room_id'], - 'event_id': event['event_id'], + 'room_id': event.room_id, + 'event_id': event.event_id, 'user_id': uid, 'profile_tag': profile_tag, 'actions': json.dumps(actions) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index f0fa0bd33ce0..c79066f7740a 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -18,7 +18,7 @@ from synapse.api.errors import StoreError, Codes from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks class RegistrationStore(SQLBaseStore): @@ -73,7 +73,8 @@ def add_refresh_token_to_user(self, user_id, token): ) @defer.inlineCallbacks - def register(self, user_id, token, password_hash, was_guest=False): + def register(self, user_id, token, password_hash, + was_guest=False, make_guest=False): """Attempts to register an account. Args: @@ -82,15 +83,18 @@ def register(self, user_id, token, password_hash, was_guest=False): password_hash (str): Optional. The password hash for this user. was_guest (bool): Optional. Whether this is a guest account being upgraded to a non-guest account. + make_guest (boolean): True if the the new user should be guest, + false to add a regular user account. Raises: StoreError if the user_id could not be registered. """ yield self.runInteraction( "register", - self._register, user_id, token, password_hash, was_guest + self._register, user_id, token, password_hash, was_guest, make_guest ) + self.is_guest.invalidate((user_id,)) - def _register(self, txn, user_id, token, password_hash, was_guest): + def _register(self, txn, user_id, token, password_hash, was_guest, make_guest): now = int(self.clock.time()) next_id = self._access_tokens_id_gen.get_next_txn(txn) @@ -100,12 +104,14 @@ def _register(self, txn, user_id, token, password_hash, was_guest): txn.execute("UPDATE users SET" " password_hash = ?," " upgrade_ts = ?" + " is_guest = ?" " WHERE name = ?", - [password_hash, now, user_id]) + [password_hash, now, make_guest, user_id]) else: - txn.execute("INSERT INTO users(name, password_hash, creation_ts) " - "VALUES (?,?,?)", - [user_id, password_hash, now]) + txn.execute("INSERT INTO users " + "(name, password_hash, creation_ts, is_guest) " + "VALUES (?,?,?,?)", + [user_id, password_hash, now, make_guest]) except self.database_engine.module.IntegrityError: raise StoreError( 400, "User ID already taken.", errcode=Codes.USER_IN_USE @@ -126,7 +132,7 @@ def get_user_by_id(self, user_id): keyvalues={ "name": user_id, }, - retcols=["name", "password_hash"], + retcols=["name", "password_hash", "is_guest"], allow_none=True, ) @@ -136,7 +142,7 @@ def get_users_by_id_case_insensitive(self, user_id): """ def f(txn): sql = ( - "SELECT name, password_hash FROM users" + "SELECT name, password_hash, is_guest FROM users" " WHERE lower(name) = lower(?)" ) txn.execute(sql, (user_id,)) @@ -249,9 +255,21 @@ def is_server_admin(self, user): defer.returnValue(res if res else False) + @cachedInlineCallbacks() + def is_guest(self, user): + res = yield self._simple_select_one_onecol( + table="users", + keyvalues={"name": user.to_string()}, + retcol="is_guest", + allow_none=True, + desc="is_guest", + ) + + defer.returnValue(res if res else False) + def _query_for_auth(self, txn, token): sql = ( - "SELECT users.name, access_tokens.id as token_id" + "SELECT users.name, users.is_guest, access_tokens.id as token_id" " FROM users" " INNER JOIN access_tokens on users.name = access_tokens.user_id" " WHERE token = ?" diff --git a/synapse/storage/schema/delta/27/event_push_actions.sql b/synapse/storage/schema/delta/28/event_push_actions.sql similarity index 100% rename from synapse/storage/schema/delta/27/event_push_actions.sql rename to synapse/storage/schema/delta/28/event_push_actions.sql From ae1262a241aa816b9e0f19e628afcc83229af64f Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 11:58:20 +0000 Subject: [PATCH 27/33] Add schema change file for is_guest flag --- .../schema/delta/28/users_is_guest.sql | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 synapse/storage/schema/delta/28/users_is_guest.sql diff --git a/synapse/storage/schema/delta/28/users_is_guest.sql b/synapse/storage/schema/delta/28/users_is_guest.sql new file mode 100644 index 000000000000..80792e85deb0 --- /dev/null +++ b/synapse/storage/schema/delta/28/users_is_guest.sql @@ -0,0 +1,22 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE users ADD is_guest admin SMALLINT DEFAULT 0 NOT NULL; +/* + * NB: any guest users created between 27 and 28 will be incorrectly + * marked as not guests: we don't bother to fill these in correctly + * because guest access is not really complete in 27 anyway so it's + * very unlikley there will be any guest users created. + */ From 992928304f3c95f87a3297799965159d295432ea Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 11:58:46 +0000 Subject: [PATCH 28/33] Delete notifications for redacted events --- synapse/push/action_generator.py | 7 +++++++ synapse/storage/event_push_actions.py | 12 ++++++++++++ 2 files changed, 19 insertions(+) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index bcd40798f9db..4cf94f6c614f 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -19,6 +19,8 @@ import logging +from synapse.api.constants import EventTypes + logger = logging.getLogger(__name__) @@ -34,6 +36,11 @@ def __init__(self, store): @defer.inlineCallbacks def handle_push_actions_for_event(self, event, handler): + if event.type == EventTypes.Redaction and event.redacts is not None: + yield self.store.remove_push_actions_for_event_id( + event.room_id, event.redacts + ) + bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( event.room_id, self.store ) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 0634af6b62b5..5b44431ab9e9 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -93,6 +93,18 @@ def _get_unread_event_push_actions_by_room(txn): ) defer.returnValue(ret) + @defer.inlineCallbacks + def remove_push_actions_for_event_id(self, room_id, event_id): + def f(txn): + txn.execute( + "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", + (room_id, event_id) + ) + yield self.runInteraction( + "remove_push_actions_for_event_id", + f + ) + class EventPushActionsTable(object): table_name = "event_push_actions" From 0e48f7f2458f08341131b3b90c78b7034fe02d14 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 16:46:41 +0000 Subject: [PATCH 29/33] fix tests --- tests/handlers/test_federation.py | 4 ++++ tests/handlers/test_room.py | 5 +++++ tests/storage/test_registration.py | 2 +- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 6acc4ebadc8b..029c09411515 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -53,6 +53,8 @@ def setUp(self): "bulk_get_push_rules", "get_current_state", "set_push_actions_for_event_and_users", + "is_guest", + "get_state_for_events", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -73,6 +75,8 @@ def setUp(self): self.handlers.federation_handler = FederationHandler(self.hs) + self.datastore.get_state_for_events.return_value = {"$a:b": {}} + @defer.inlineCallbacks def test_msg(self): pdu = FrozenEvent({ diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index ff2b5971243e..b1c8e61522aa 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -47,6 +47,8 @@ def setUp(self): "bulk_get_push_rules", "get_current_state", "set_push_actions_for_event_and_users", + "get_state_for_events", + "is_guest", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -116,6 +118,7 @@ def test_invite(self): defer.succeed([]) ) self.datastore.get_current_state.return_value = {} + self.datastore.get_state_for_events = lambda event_ids,types: {x: {} for x in event_ids} def annotate(_): ctx = Mock() @@ -198,6 +201,7 @@ def test_simple_join(self): defer.succeed([]) ) self.datastore.get_current_state.return_value = {} + self.datastore.get_state_for_events = lambda event_ids,types: {x: {} for x in event_ids} def annotate(_): ctx = Mock() @@ -274,6 +278,7 @@ def test_simple_leave(self): defer.succeed([]) ) self.datastore.get_current_state.return_value = {} + self.datastore.get_state_for_events = lambda event_ids,types: {x: {} for x in event_ids} def annotate(_): ctx = Mock() diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index 0cce6c37dfe3..4760131f9c29 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -45,7 +45,7 @@ def test_register(self): self.assertEquals( # TODO(paul): Surely this field should be 'user_id', not 'name' # Additionally surely it shouldn't come in a 1-element list - {"name": self.user_id, "password_hash": self.pwhash}, + {"name": self.user_id, "password_hash": self.pwhash, "is_guest": 0}, (yield self.store.get_user_by_id(self.user_id)) ) From b6a585348ae8a07dc8105242e182435a240e6b8f Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 17:16:02 +0000 Subject: [PATCH 30/33] Adding is_guest here won't work because it just constructs a dict of uid -> password hash --- synapse/storage/registration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index c79066f7740a..a52b67013bc1 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -142,7 +142,7 @@ def get_users_by_id_case_insensitive(self, user_id): """ def f(txn): sql = ( - "SELECT name, password_hash, is_guest FROM users" + "SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)" ) txn.execute(sql, (user_id,)) From 09dc9854cd8f28b2e5ce90207cfc822d226ec2ad Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 17:44:10 +0000 Subject: [PATCH 31/33] comma style --- synapse/handlers/register.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 1799a668c6ca..ba26d13d4971 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -119,7 +119,7 @@ def register( token=token, password_hash=password_hash, was_guest=guest_access_token is not None, - make_guest=make_guest + make_guest=make_guest, ) yield registered_user(self.distributor, user) From 823b679232ed030ba9a2f609d11074c6b3111be2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Jan 2016 10:02:47 +0000 Subject: [PATCH 32/33] more commas --- synapse/push/bulk_push_rule_evaluator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 63d65b446588..ce244fa95930 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -42,7 +42,7 @@ def evaluator_for_room_id(room_id, store): uid: baserules.list_with_base_rules( [decode_rule_json(rule_list) for rule_list in rules_by_user[uid]] if uid in rules_by_user else [], - UserID.from_string(uid) + UserID.from_string(uid), ) for uid in users } From daadcf36c0bad93220d4fa422dce0c5740f63e3d Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Jan 2016 10:15:35 +0000 Subject: [PATCH 33/33] This comma is actually important --- synapse/storage/registration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index a52b67013bc1..ece71f2ee8c4 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -103,7 +103,7 @@ def _register(self, txn, user_id, token, password_hash, was_guest, make_guest): if was_guest: txn.execute("UPDATE users SET" " password_hash = ?," - " upgrade_ts = ?" + " upgrade_ts = ?," " is_guest = ?" " WHERE name = ?", [password_hash, now, make_guest, user_id])