diff --git a/jenkins-unittests.sh b/jenkins-unittests.sh index 104d51199474..7b56a714a15b 100755 --- a/jenkins-unittests.sh +++ b/jenkins-unittests.sh @@ -20,6 +20,10 @@ export DUMP_COVERAGE_COMMAND="coverage help" # UNSTABLE or FAILURE this build. export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?" +TOX_BIN=$WORKSPACE/.tox/py27/bin +python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install +$TOX_BIN/pip install lxml + rm .coverage* || echo "No coverage files to remove" tox -e py27 diff --git a/synapse/api/errors.py b/synapse/api/errors.py index b106fbed6dd6..dd57fe3b8d40 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -44,6 +44,7 @@ class Codes(object): THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED" THREEPID_IN_USE = "THREEPID_IN_USE" INVALID_USERNAME = "M_INVALID_USERNAME" + CANNOT_PEEK = "M_CANNOT_PEEK" class CodeMessageException(RuntimeError): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index be26a491ff59..e1c940af4b4b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,6 +20,9 @@ from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client +from synapse.types import SyncNextBatchToken, SyncPaginationState +from synapse.api.errors import Codes, SynapseError +from synapse.storage.tags import (TAG_CHANGE_NEWLY_TAGGED, TAG_CHANGE_ALL_REMOVED) from twisted.internet import defer @@ -35,9 +38,48 @@ "filter_collection", "is_guest", "request_key", + "pagination_config", ]) +class SyncPaginationConfig(collections.namedtuple("SyncPaginationConfig", [ + "order", + "limit", + "tags", +])): + "Initial pagination configuration from initial sync." + def __init__(self, order, limit, tags): + if order not in SYNC_PAGINATION_VALID_ORDERS: + raise SynapseError(400, "Invalid 'order'") + if tags not in SYNC_PAGINATION_VALID_TAGS_OPTIONS: + raise SynapseError(400, "Invalid 'tags'") + + try: + limit = int(limit) + except: + raise SynapseError(400, "Invalid 'limit'") + + super(SyncPaginationConfig, self).__init__(order, limit, tags) + + +SYNC_PAGINATION_TAGS_INCLUDE_ALL = "m.include_all" +SYNC_PAGINATION_TAGS_IGNORE = "m.ignore" +SYNC_PAGINATION_VALID_TAGS_OPTIONS = ( + SYNC_PAGINATION_TAGS_INCLUDE_ALL, SYNC_PAGINATION_TAGS_IGNORE, +) + +SYNC_PAGINATION_ORDER_TS = "m.origin_server_ts" +SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,) + + +SyncExtras = collections.namedtuple("SyncExtras", [ + "paginate", # dict with "limit" key + "peek", # dict of room_id -> dict +]) + +DEFAULT_SYNC_EXTRAS = SyncExtras(paginate={}, peek={}) + + class TimelineBatch(collections.namedtuple("TimelineBatch", [ "prev_batch", "events", @@ -59,6 +101,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "ephemeral", "account_data", "unread_notifications", + "synced", # bool ])): __slots__ = [] @@ -106,6 +149,18 @@ def __nonzero__(self): return True +class ErrorSyncResult(collections.namedtuple("ErrorSyncResult", [ + "room_id", # str + "errcode", # str + "error", # str +])): + __slots__ = [] + + def __nonzero__(self): + """Errors should always be reported to the client""" + return True + + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. @@ -113,6 +168,9 @@ class SyncResult(collections.namedtuple("SyncResult", [ "joined", # JoinedSyncResult for each joined room. "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. + "errors", # ErrorSyncResult + "pagination_info", + "unread_notifications", ])): __slots__ = [] @@ -140,8 +198,8 @@ def __init__(self, hs): self.clock = hs.get_clock() self.response_cache = ResponseCache() - def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, - full_state=False): + def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, + full_state=False, extras=DEFAULT_SYNC_EXTRAS): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. @@ -153,48 +211,42 @@ def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, result = self.response_cache.set( sync_config.request_key, self._wait_for_sync_for_user( - sync_config, since_token, timeout, full_state + sync_config, batch_token, timeout, full_state, extras, ) ) return result @defer.inlineCallbacks - def _wait_for_sync_for_user(self, sync_config, since_token, timeout, - full_state): + def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, + full_state, extras=DEFAULT_SYNC_EXTRAS): context = LoggingContext.current_context() if context: - if since_token is None: + if batch_token is None: context.tag = "initial_sync" elif full_state: context.tag = "full_state_sync" else: context.tag = "incremental_sync" - if timeout == 0 or since_token is None or full_state: + if timeout == 0 or batch_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. - result = yield self.current_sync_for_user( - sync_config, since_token, full_state=full_state, + result = yield self.generate_sync_result( + sync_config, batch_token, full_state=full_state, extras=extras, ) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): - return self.current_sync_for_user(sync_config, since_token) + return self.generate_sync_result( + sync_config, batch_token, full_state=False, extras=extras, + ) result = yield self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, - from_token=since_token, + from_token=batch_token.stream_token, ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, since_token=None, - full_state=False): - """Get the sync for client needed to match what the server has now. - Returns: - A Deferred SyncResult. - """ - return self.generate_sync_result(sync_config, since_token, full_state) - @defer.inlineCallbacks def push_rules_for_user(self, user): user_id = user.to_string() @@ -490,13 +542,15 @@ def unread_notifs_for_room_id(self, room_id, sync_config): defer.returnValue(None) @defer.inlineCallbacks - def generate_sync_result(self, sync_config, since_token=None, full_state=False): + def generate_sync_result(self, sync_config, batch_token=None, full_state=False, + extras=DEFAULT_SYNC_EXTRAS): """Generates a sync result. Args: sync_config (SyncConfig) since_token (StreamToken) full_state (bool) + extras (SyncExtras) Returns: Deferred(SyncResult) @@ -508,10 +562,16 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False): # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() + all_joined_rooms = yield self.store.get_rooms_for_user( + sync_config.user.to_string() + ) + all_joined_rooms = [room.room_id for room in all_joined_rooms] + sync_result_builder = SyncResultBuilder( sync_config, full_state, - since_token=since_token, + batch_token=batch_token, now_token=now_token, + all_joined_rooms=all_joined_rooms, ) account_data_by_room = yield self._generate_sync_entry_for_account_data( @@ -519,7 +579,7 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False): ) res = yield self._generate_sync_entry_for_rooms( - sync_result_builder, account_data_by_room + sync_result_builder, account_data_by_room, extras, ) newly_joined_rooms, newly_joined_users = res @@ -527,15 +587,55 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False): sync_result_builder, newly_joined_rooms, newly_joined_users ) + yield self._generate_notification_counts(sync_result_builder) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, joined=sync_result_builder.joined, invited=sync_result_builder.invited, archived=sync_result_builder.archived, - next_batch=sync_result_builder.now_token, + errors=sync_result_builder.errors, + next_batch=SyncNextBatchToken( + stream_token=sync_result_builder.now_token, + pagination_state=sync_result_builder.pagination_state, + ), + pagination_info=sync_result_builder.pagination_info, + unread_notifications=sync_result_builder.unread_notifications, )) + @defer.inlineCallbacks + def _generate_notification_counts(self, sync_result_builder): + rooms = sync_result_builder.all_joined_rooms + + total_notif_count = [0] + rooms_with_notifs = set() + total_highlight_count = [0] + rooms_with_highlights = set() + + @defer.inlineCallbacks + def notif_for_room(room_id): + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_result_builder.sync_config + ) + if notifs is not None: + total_notif_count[0] += notifs["notify_count"] + total_highlight_count[0] += notifs["highlight_count"] + + if notifs["notify_count"]: + rooms_with_notifs.add(room_id) + if notifs["highlight_count"]: + rooms_with_highlights.add(room_id) + + yield concurrently_execute(notif_for_room, rooms, 10) + + sync_result_builder.unread_notifications = { + "total_notification_count": total_notif_count[0], + "rooms_notification_count": len(rooms_with_notifs), + "total_highlight_count": total_highlight_count[0], + "rooms_highlight_count": len(rooms_with_highlights), + } + @defer.inlineCallbacks def _generate_sync_entry_for_account_data(self, sync_result_builder): """Generates the account data portion of the sync response. Populates @@ -646,7 +746,8 @@ def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_ro sync_result_builder.presence = presence @defer.inlineCallbacks - def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): + def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room, + extras): """Generates the rooms portion of the sync response. Populates the `sync_result_builder` with the result. @@ -690,6 +791,12 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro tags_by_room = yield self.store.get_tags_for_user(user_id) + yield self._update_room_entries_for_paginated_sync( + sync_result_builder, room_entries, extras + ) + + sync_result_builder.full_state |= sync_result_builder.since_token is None + def handle_room_entries(room_entry): return self._generate_room_entry( sync_result_builder, @@ -698,7 +805,6 @@ def handle_room_entries(room_entry): ephemeral=ephemeral_by_room.get(room_entry.room_id, []), tags=tags_by_room.get(room_entry.room_id), account_data=account_data_by_room.get(room_entry.room_id, {}), - always_include=sync_result_builder.full_state, ) yield concurrently_execute(handle_room_entries, room_entries, 10) @@ -719,6 +825,162 @@ def handle_room_entries(room_entry): defer.returnValue((newly_joined_rooms, newly_joined_users)) + @defer.inlineCallbacks + def _update_room_entries_for_paginated_sync(self, sync_result_builder, + room_entries, extras): + """Works out which room_entries should be synced to the client, which + would need to be resynced if they were sent down, etc. + + Mutates room_entries. + + Args: + sync_result_builder (SyncResultBuilder) + room_entries (list(RoomSyncResultBuilder)) + extras (SyncExtras) + """ + user_id = sync_result_builder.sync_config.user.to_string() + sync_config = sync_result_builder.sync_config + + if sync_config.pagination_config: + pagination_config = sync_config.pagination_config + old_pagination_value = 0 + include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL + elif sync_result_builder.pagination_state: + pagination_config = SyncPaginationConfig( + order=sync_result_builder.pagination_state.order, + limit=sync_result_builder.pagination_state.limit, + tags=sync_result_builder.pagination_state.tags, + ) + old_pagination_value = sync_result_builder.pagination_state.value + include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL + else: + pagination_config = None + old_pagination_value = 0 + include_all_tags = False + + if sync_result_builder.pagination_state: + missing_state = yield self._get_rooms_that_need_full_state( + room_ids=[r.room_id for r in room_entries], + sync_config=sync_config, + since_token=sync_result_builder.since_token, + pagination_state=sync_result_builder.pagination_state, + ) + + all_tags = yield self.store.get_tags_for_user(user_id) + + if sync_result_builder.since_token: + stream_id = sync_result_builder.since_token.account_data_key + now_stream_id = sync_result_builder.now_token.account_data_key + tag_changes = yield self.store.get_room_tags_changed( + user_id, stream_id, now_stream_id + ) + else: + tag_changes = {} + + if missing_state: + for r in room_entries: + if r.room_id in missing_state: + if include_all_tags: + # If we're always including tagged rooms, then only + # resync rooms which are newly tagged. + change = tag_changes.get(r.room_id) + if change == TAG_CHANGE_NEWLY_TAGGED: + r.always_include = True + r.would_require_resync = True + r.synced = True + continue + elif change == TAG_CHANGE_ALL_REMOVED: + r.always_include = True + r.synced = False + continue + elif r.room_id in all_tags: + r.always_include = True + continue + + if r.room_id in extras.peek: + since = extras.peek[r.room_id].get("since", None) + if since: + tok = SyncNextBatchToken.from_string(since) + r.since_token = tok.stream_token + else: + r.always_include = True + r.would_require_resync = True + r.synced = False + else: + r.would_require_resync = True + + elif pagination_config and include_all_tags: + all_tags = yield self.store.get_tags_for_user(user_id) + + for r in room_entries: + if r.room_id in all_tags: + r.always_include = True + + for room_id in set(extras.peek.keys()) - {r.room_id for r in room_entries}: + sync_result_builder.errors.append(ErrorSyncResult( + room_id=room_id, + errcode=Codes.CANNOT_PEEK, + error="Cannot peek into requested room", + )) + + if pagination_config: + room_ids = [r.room_id for r in room_entries] + pagination_limit = pagination_config.limit + + extra_limit = extras.paginate.get("limit", 0) + + room_map = yield self._get_room_timestamps_at_token( + room_ids, sync_result_builder.now_token, sync_config, + pagination_limit + extra_limit + 1, + ) + + limited = False + if room_map: + sorted_list = sorted( + room_map.items(), + key=lambda item: -item[1] + ) + + cutoff_list = sorted_list[:pagination_limit + extra_limit] + + if cutoff_list[pagination_limit:]: + new_room_ids = set(r[0] for r in cutoff_list[pagination_limit:]) + for r in room_entries: + if r.room_id in new_room_ids: + r.always_include = True + r.would_require_resync = True + + _, bottom_ts = cutoff_list[-1] + new_pagination_value = bottom_ts + + # We're limited if there are any rooms that are after cutoff + # in the list, but still have an origin server ts from after + # the pagination value from the since token. + limited = any( + old_pagination_value < r[1] + for r in sorted_list[pagination_limit + extra_limit:] + ) + + sync_result_builder.pagination_state = SyncPaginationState( + order=pagination_config.order, value=new_pagination_value, + limit=pagination_limit + extra_limit, + tags=pagination_config.tags, + ) + + to_sync_map = dict(cutoff_list) + else: + to_sync_map = {} + + sync_result_builder.pagination_info["limited"] = limited + + if len(room_map) == len(room_entries): + sync_result_builder.pagination_state = None + + room_entries[:] = [ + r for r in room_entries + if r.room_id in to_sync_map or r.always_include + ] + @defer.inlineCallbacks def _get_rooms_changed(self, sync_result_builder, ignored_users): """Gets the the changes that have happened since the last sync. @@ -809,7 +1071,6 @@ def _get_rooms_changed(self, sync_result_builder, ignored_users): rtype="archived", events=None, newly_joined=room_id in newly_joined_rooms, - full_state=False, since_token=since_token, upto_token=leave_token, )) @@ -839,7 +1100,6 @@ def _get_rooms_changed(self, sync_result_builder, ignored_users): rtype="joined", events=events, newly_joined=room_id in newly_joined_rooms, - full_state=False, since_token=None if room_id in newly_joined_rooms else since_token, upto_token=prev_batch_token, )) @@ -849,7 +1109,6 @@ def _get_rooms_changed(self, sync_result_builder, ignored_users): rtype="joined", events=[], newly_joined=room_id in newly_joined_rooms, - full_state=False, since_token=since_token, upto_token=since_token, )) @@ -893,7 +1152,6 @@ def _get_all_rooms(self, sync_result_builder, ignored_users): rtype="joined", events=None, newly_joined=False, - full_state=True, since_token=since_token, upto_token=now_token, )) @@ -920,7 +1178,6 @@ def _get_all_rooms(self, sync_result_builder, ignored_users): rtype="archived", events=None, newly_joined=False, - full_state=True, since_token=since_token, upto_token=leave_token, )) @@ -929,8 +1186,7 @@ def _get_all_rooms(self, sync_result_builder, ignored_users): @defer.inlineCallbacks def _generate_room_entry(self, sync_result_builder, ignored_users, - room_builder, ephemeral, tags, account_data, - always_include=False): + room_builder, ephemeral, tags, account_data): """Populates the `joined` and `archived` section of `sync_result_builder` based on the `room_builder`. @@ -946,19 +1202,23 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, even if empty. """ newly_joined = room_builder.newly_joined + always_include = ( + newly_joined + or sync_result_builder.full_state + or room_builder.always_include + ) full_state = ( - room_builder.full_state - or newly_joined + newly_joined or sync_result_builder.full_state + or room_builder.would_require_resync ) events = room_builder.events # We want to shortcut out as early as possible. - if not (always_include or account_data or ephemeral or full_state): + if not (always_include or account_data or ephemeral): if events == [] and tags is None: return - since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config @@ -993,9 +1253,20 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) - if not (always_include or batch or account_data or ephemeral or full_state): + if not (always_include or batch or account_data or ephemeral): return + # At this point we're guarenteed (?) to send down the room, so if we + # need to resync the entire room do so now. + if room_builder.would_require_resync: + batch = yield self._load_filtered_recents( + room_id, sync_config, + now_token=upto_token, + since_token=None, + recents=None, + newly_joined_room=newly_joined, + ) + state = yield self.compute_state_delta( room_id, batch, sync_config, since_token, now_token, full_state=full_state @@ -1010,6 +1281,7 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, ephemeral=ephemeral, account_data=account_data_events, unread_notifications=unread_notifications, + synced=room_builder.synced, ) if room_sync or always_include: @@ -1034,6 +1306,90 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) + @defer.inlineCallbacks + def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit): + """For each room, get the last origin_server_ts timestamp the client + would see (after filtering) at a particular token. + + Only attempts finds the latest `limit` room timestamps. + """ + room_to_entries = {} + + @defer.inlineCallbacks + def _get_last_ts(room_id): + entry = yield self.store.get_last_event_id_ts_for_room( + room_id, token.room_key + ) + + # TODO: Is this ever possible? + room_to_entries[room_id] = entry if entry else { + "origin_server_ts": 0, + } + + yield concurrently_execute(_get_last_ts, room_ids, 10) + + if len(room_to_entries) <= limit: + defer.returnValue({ + room_id: entry["origin_server_ts"] + for room_id, entry in room_to_entries.items() + }) + + queued_events = sorted( + room_to_entries.items(), + key=lambda e: -e[1]["origin_server_ts"] + ) + + to_return = {} + + while len(to_return) < limit and len(queued_events) > 0: + to_fetch = queued_events[:limit - len(to_return)] + event_to_q = { + e["event_id"]: (room_id, e) for room_id, e in to_fetch + if "event_id" in e + } + + # Now we fetch each event to check if its been filtered out + event_map = yield self.store.get_events(event_to_q.keys()) + + recents = sync_config.filter_collection.filter_room_timeline( + event_map.values() + ) + recents = yield filter_events_for_client( + self.store, + sync_config.user.to_string(), + recents, + ) + + to_return.update({r.room_id: r.origin_server_ts for r in recents}) + + for ev_id in set(event_map.keys()) - set(r.event_id for r in recents): + queued_events.append(event_to_q[ev_id]) + + # FIXME: Need to refetch TS + queued_events.sort(key=lambda e: -e[1]["origin_server_ts"]) + + defer.returnValue(to_return) + + @defer.inlineCallbacks + def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token, + pagination_state): + """Work out which rooms we haven't sent to the client yet, so would + require us to send down the full state + """ + start_ts = yield self._get_room_timestamps_at_token( + room_ids, since_token, + sync_config=sync_config, + limit=len(room_ids), + ) + + missing_list = frozenset( + room_id for room_id, ts in + sorted(start_ts.items(), key=lambda item: -item[1]) + if ts < pagination_state.value + ) + + defer.returnValue(missing_list) + def _action_has_highlight(actions): for action in actions: @@ -1085,31 +1441,53 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): class SyncResultBuilder(object): "Used to help build up a new SyncResult for a user" - def __init__(self, sync_config, full_state, since_token, now_token): + + __slots__ = ( + "sync_config", "full_state", "batch_token", "since_token", "pagination_state", + "now_token", "presence", "account_data", "joined", "invited", "archived", + "pagination_info", "errors", "all_joined_rooms", "unread_notifications", + ) + + def __init__(self, sync_config, full_state, batch_token, now_token, + all_joined_rooms): """ Args: sync_config(SyncConfig) full_state(bool): The full_state flag as specified by user - since_token(StreamToken): The token supplied by user, or None. + batch_token(SyncNextBatchToken): The token supplied by user, or None. now_token(StreamToken): The token to sync up to. + all_joined_rooms(list(str)): List of all joined room ids. """ self.sync_config = sync_config self.full_state = full_state - self.since_token = since_token + self.batch_token = batch_token + self.since_token = batch_token.stream_token if batch_token else None + self.pagination_state = batch_token.pagination_state if batch_token else None self.now_token = now_token + self.all_joined_rooms = all_joined_rooms self.presence = [] self.account_data = [] self.joined = [] self.invited = [] self.archived = [] + self.errors = [] + + self.pagination_info = {} + self.unread_notifications = {} class RoomSyncResultBuilder(object): """Stores information needed to create either a `JoinedSyncResult` or `ArchivedSyncResult`. """ - def __init__(self, room_id, rtype, events, newly_joined, full_state, + + __slots__ = ( + "room_id", "rtype", "events", "newly_joined", "since_token", + "upto_token", "always_include", "would_require_resync", "synced", + ) + + def __init__(self, room_id, rtype, events, newly_joined, since_token, upto_token): """ Args: @@ -1118,7 +1496,6 @@ def __init__(self, room_id, rtype, events, newly_joined, full_state, events(list): List of events to include in the room, (more events may be added when generating result). newly_joined(bool): If the user has newly joined the room - full_state(bool): Whether the full state should be sent in result since_token(StreamToken): Earliest point to return events from, or None upto_token(StreamToken): Latest point to return events from. """ @@ -1126,6 +1503,12 @@ def __init__(self, room_id, rtype, events, newly_joined, full_state, self.rtype = rtype self.events = events self.newly_joined = newly_joined - self.full_state = full_state self.since_token = since_token self.upto_token = upto_token + + # Should this room always be included in the sync? + self.always_include = False + # If we send down this room, should we send down the full state? + self.would_require_resync = False + # Should the client consider this room "synced"? + self.synced = True diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index e024cec0a2d7..5b98940292b7 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -36,6 +36,7 @@ "blist": ["blist"], "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"], "pymacaroons-pynacl": ["pymacaroons"], + "cbor2": ["cbor2"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index 735c03c7eb07..351894510c85 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -51,6 +51,9 @@ def __init__(self, db_conn, hs): get_updated_account_data_for_user = ( DataStore.get_updated_account_data_for_user.__func__ ) + get_room_tags_changed = ( + DataStore.get_room_tags_changed.__func__ + ) def get_max_account_data_stream_id(self): return self._account_data_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 877c68508ceb..03231550b751 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -144,6 +144,8 @@ def __init__(self, db_conn, hs): _get_events_around_txn = DataStore._get_events_around_txn.__func__ _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ + get_last_event_id_ts_for_room = DataStore.get_last_event_id_ts_for_room.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 43d8e0bf39a7..cfec804b4414 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -16,10 +16,14 @@ from twisted.internet import defer from synapse.http.servlet import ( - RestServlet, parse_string, parse_integer, parse_boolean + RestServlet, parse_string, parse_integer, parse_boolean, + parse_json_object_from_request, ) -from synapse.handlers.sync import SyncConfig -from synapse.types import StreamToken +from synapse.handlers.sync import ( + SyncConfig, SyncPaginationConfig, SYNC_PAGINATION_TAGS_IGNORE, SyncExtras, + DEFAULT_SYNC_EXTRAS, +) +from synapse.types import SyncNextBatchToken from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_room_id, ) @@ -84,6 +88,94 @@ def __init__(self, hs): self.filtering = hs.get_filtering() self.presence_handler = hs.get_presence_handler() + @defer.inlineCallbacks + def on_POST(self, request): + requester = yield self.auth.get_user_by_req( + request, allow_guest=True + ) + user = requester.user + + body = parse_json_object_from_request(request) + + timeout = body.get("timeout", 0) + since = body.get("since", None) + + extras = body.get("extras", {}) + extras = SyncExtras( + paginate=extras.get("paginate", {}), + peek=extras.get("peek", {}), + ) + + if "from" in body: + # /events used to use 'from', but /sync uses 'since'. + # Lets be helpful and whine if we see a 'from'. + raise SynapseError( + 400, "'from' is not a valid parameter. Did you mean 'since'?" + ) + + set_presence = body.get("set_presence", "online") + if set_presence not in self.ALLOWED_PRESENCE: + message = "Parameter 'set_presence' must be one of [%s]" % ( + ", ".join(repr(v) for v in self.ALLOWED_PRESENCE) + ) + raise SynapseError(400, message) + + full_state = body.get("full_state", False) + + filter_id = body.get("filter_id", None) + filter_dict = body.get("filter", None) + pagination_config = body.get("pagination_config", None) + + if filter_dict is not None and filter_id is not None: + raise SynapseError( + 400, + "Can only specify one of `filter` and `filter_id` paramters" + ) + + if filter_id: + filter_collection = yield self.filtering.get_user_filter( + user.localpart, filter_id + ) + filter_key = filter_id + elif filter_dict: + self.filtering.check_valid_filter(filter_dict) + filter_collection = FilterCollection(filter_dict) + filter_key = json.dumps(filter_dict) + else: + filter_collection = DEFAULT_FILTER_COLLECTION + filter_key = None + + request_key = (user, timeout, since, filter_key, full_state) + + sync_config = SyncConfig( + user=user, + filter_collection=filter_collection, + is_guest=requester.is_guest, + request_key=request_key, + pagination_config=SyncPaginationConfig( + order=pagination_config["order"], + limit=pagination_config["limit"], + tags=pagination_config.get("tags", SYNC_PAGINATION_TAGS_IGNORE), + ) if pagination_config else None, + ) + + if since is not None: + batch_token = SyncNextBatchToken.from_string(since) + else: + batch_token = None + + sync_result = yield self._handle_sync( + requester=requester, + sync_config=sync_config, + batch_token=batch_token, + set_presence=set_presence, + full_state=full_state, + timeout=timeout, + extras=extras, + ) + + defer.returnValue(sync_result) + @defer.inlineCallbacks def on_GET(self, request): if "from" in request.args: @@ -107,13 +199,6 @@ def on_GET(self, request): filter_id = parse_string(request, "filter", default=None) full_state = parse_boolean(request, "full_state", default=False) - logger.info( - "/sync: user=%r, timeout=%r, since=%r," - " set_presence=%r, filter_id=%r" % ( - user, timeout, since, set_presence, filter_id - ) - ) - request_key = (user, timeout, since, filter_id, full_state) if filter_id: @@ -136,15 +221,39 @@ def on_GET(self, request): filter_collection=filter, is_guest=requester.is_guest, request_key=request_key, + pagination_config=None, ) if since is not None: - since_token = StreamToken.from_string(since) + batch_token = SyncNextBatchToken.from_string(since) else: - since_token = None + batch_token = None + + sync_result = yield self._handle_sync( + requester=requester, + sync_config=sync_config, + batch_token=batch_token, + set_presence=set_presence, + full_state=full_state, + timeout=timeout, + ) + + defer.returnValue(sync_result) + @defer.inlineCallbacks + def _handle_sync(self, requester, sync_config, batch_token, set_presence, + full_state, timeout, extras=DEFAULT_SYNC_EXTRAS): affect_presence = set_presence != PresenceState.OFFLINE + user = sync_config.user + + logger.info( + "/sync: user=%r, timeout=%r, since=%r," + " set_presence=%r" % ( + user, timeout, batch_token, set_presence + ) + ) + if affect_presence: yield self.presence_handler.set_state(user, {"presence": set_presence}) @@ -153,8 +262,8 @@ def on_GET(self, request): ) with context: sync_result = yield self.sync_handler.wait_for_sync_for_user( - sync_config, since_token=since_token, timeout=timeout, - full_state=full_state + sync_config, batch_token=batch_token, timeout=timeout, + full_state=full_state, extras=extras, ) time_now = self.clock.time_msec() @@ -182,8 +291,15 @@ def on_GET(self, request): "leave": archived, }, "next_batch": sync_result.next_batch.to_string(), + "unread_notifications": sync_result.unread_notifications, } + if sync_result.errors: + response_content["rooms"]["errors"] = self.encode_errors(sync_result.errors) + + if sync_result.pagination_info: + response_content["pagination_info"] = sync_result.pagination_info + defer.returnValue((200, response_content)) def encode_presence(self, events, time_now): @@ -194,6 +310,15 @@ def encode_presence(self, events, time_now): formatted.append(event) return {"events": formatted} + def encode_errors(self, errors): + return { + e.room_id: { + "errcode": e.errcode, + "error": e.error + } + for e in errors + } + def encode_joined(self, rooms, time_now, token_id): """ Encode the joined rooms in a sync result @@ -215,6 +340,7 @@ def encode_joined(self, rooms, time_now, token_id): joined[room.room_id] = self.encode_room( room, time_now, token_id ) + joined[room.room_id]["synced"] = room.synced return joined diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c8487c8838c9..8801669a6b3c 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 32 +SCHEMA_VERSION = 33 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/33/tag_changes.sql b/synapse/storage/schema/delta/33/tag_changes.sql new file mode 100644 index 000000000000..6d858000d92c --- /dev/null +++ b/synapse/storage/schema/delta/33/tag_changes.sql @@ -0,0 +1,24 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE room_tags_change_revisions( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + stream_id BIGINT NOT NULL, + change TEXT NOT NULL +); + +CREATE INDEX room_tags_change_revisions_rm_idx ON room_tags_change_revisions(user_id, room_id, stream_id); +CREATE INDEX room_tags_change_revisions_idx ON room_tags_change_revisions(user_id, stream_id); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b9ad965fd6fa..434adf9a1779 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -525,6 +525,36 @@ def _set_before_and_after(events, rows, topo_order=True): int(stream), ) + def get_last_event_id_ts_for_room(self, room_id, token): + """Get the latest event_id and origin_server_ts for a room_id before a + given token. + + Args: + room_id (str) + token (str) + + Returns: + Dictionary with ``event_id`` and ``origin_server_ts`` keys. + """ + stream_ordering = RoomStreamToken.parse_stream_token(token).stream + + sql = ( + "SELECT event_id, origin_server_ts FROM events" + " WHERE room_id = ? AND stream_ordering <= ?" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT 1" + ) + + def f(txn): + txn.execute(sql, (room_id, stream_ordering)) + rows = self.cursor_to_dict(txn) + if rows: + return rows[0] + else: + return None + + return self.runInteraction("get_last_event_id_ts_for_room", f) + @defer.inlineCallbacks def get_events_around(self, room_id, event_id, before_limit, after_limit): """Retrieve events and pagination tokens around a given event in a diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 5a2c1aa59b6a..8dcbe9bc9022 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -17,12 +17,18 @@ from synapse.util.caches.descriptors import cached from twisted.internet import defer +from collections import Counter + import ujson as json import logging logger = logging.getLogger(__name__) +TAG_CHANGE_NEWLY_TAGGED = "newly_tagged" +TAG_CHANGE_ALL_REMOVED = "all_removed" + + class TagsStore(SQLBaseStore): def get_max_account_data_stream_id(self): """Get the current max stream id for the private user data stream @@ -170,6 +176,45 @@ def get_tags_for_room(self, user_id, room_id): row["tag"]: json.loads(row["content"]) for row in rows }) + def get_room_tags_changed(self, user_id, stream_id, now_id): + """Returns the rooms that have been newly tagged or had all their tags + removed since `stream_id`. + + Collapses multiple changes into one. For example, if a room has gone + from untagged to tagged back to untagged, the room_id won't be returned. + """ + changed = self._account_data_stream_cache.has_entity_changed( + user_id, int(stream_id) + ) + + if not changed: + return {} + + def _get_room_tags_changed(txn): + txn.execute( + "SELECT room_id, change FROM room_tags_change_revisions" + " WHERE user_id = ? AND stream_id > ? AND stream_id <= ?", + (user_id, stream_id, now_id) + ) + + results = Counter() + + for room_id, change in txn.fetchall(): + if change == TAG_CHANGE_NEWLY_TAGGED: + results[room_id] += 1 + elif change == TAG_CHANGE_ALL_REMOVED: + results[room_id] -= 1 + else: + logger.warn("Unexpected tag change: %r", change) + + return { + room_id: TAG_CHANGE_NEWLY_TAGGED if count > 0 else TAG_CHANGE_ALL_REMOVED + for room_id, count in results.items() + if count + } + + return self.runInteraction("get_room_tags_changed", _get_room_tags_changed) + @defer.inlineCallbacks def add_tag_to_room(self, user_id, room_id, tag, content): """Add a tag to a room for a user. @@ -184,6 +229,12 @@ def add_tag_to_room(self, user_id, room_id, tag, content): content_json = json.dumps(content) def add_tag_txn(txn, next_id): + txn.execute( + "SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?", + (user_id, room_id), + ) + existing_tags, = txn.fetchone() + self._simple_upsert_txn( txn, table="room_tags", @@ -197,6 +248,17 @@ def add_tag_txn(txn, next_id): } ) self._update_revision_txn(txn, user_id, room_id, next_id) + if not existing_tags: + self._simple_insert_txn( + txn, + table="room_tags_change_revisions", + values={ + "user_id": user_id, + "room_id": room_id, + "stream_id": next_id, + "change": TAG_CHANGE_NEWLY_TAGGED, + } + ) with self._account_data_id_gen.get_next() as next_id: yield self.runInteraction("add_tag", add_tag_txn, next_id) @@ -218,6 +280,24 @@ def remove_tag_txn(txn, next_id): " WHERE user_id = ? AND room_id = ? AND tag = ?" ) txn.execute(sql, (user_id, room_id, tag)) + if txn.rowcount > 0: + txn.execute( + "SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?", + (user_id, room_id), + ) + existing_tags, = txn.fetchone() + if not existing_tags: + self._simple_insert_txn( + txn, + table="room_tags_change_revisions", + values={ + "user_id": user_id, + "room_id": room_id, + "stream_id": next_id, + "change": TAG_CHANGE_ALL_REMOVED, + } + ) + self._update_revision_txn(txn, user_id, room_id, next_id) with self._account_data_id_gen.get_next() as next_id: diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 4f089bfb9419..49be3c222aa5 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.api.errors import SynapseError -from synapse.types import StreamToken +from synapse.types import StreamToken, SyncNextBatchToken import logging @@ -72,14 +72,18 @@ def get_param(name, default=None): if direction not in ['f', 'b']: raise SynapseError(400, "'dir' parameter is invalid.") - from_tok = get_param("from") + raw_from_tok = get_param("from") to_tok = get_param("to") try: - if from_tok == "END": + from_tok = None + if raw_from_tok == "END": from_tok = None # For backwards compat. - elif from_tok: - from_tok = StreamToken.from_string(from_tok) + elif raw_from_tok: + try: + from_tok = SyncNextBatchToken.from_string(raw_from_tok).stream_token + except: + from_tok = StreamToken.from_string(raw_from_tok) except: raise SynapseError(400, "'from' paramater is invalid") diff --git a/synapse/types.py b/synapse/types.py index f639651a7307..6c7bdf0cf9c7 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -17,6 +17,9 @@ from collections import namedtuple +from unpaddedbase64 import encode_base64, decode_base64 +import cbor2 as serializer + Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) @@ -115,8 +118,71 @@ class EventID(DomainSpecificString): SIGIL = "$" +class SyncNextBatchToken( + namedtuple("SyncNextBatchToken", ( + "stream_token", + "pagination_state", + )) +): + @classmethod + def from_string(cls, string): + try: + d = serializer.loads(decode_base64(string)) + pa = d.get("pa", None) + if pa: + pa = SyncPaginationState.from_dict(pa) + return cls( + stream_token=StreamToken.from_arr(d["t"]), + pagination_state=pa, + ) + except: + raise SynapseError(400, "Invalid Token") + + def to_string(self): + return encode_base64(serializer.dumps({ + "t": self.stream_token.to_arr(), + "pa": self.pagination_state.to_dict() if self.pagination_state else None, + })) + + def replace(self, **kwargs): + return self._replace(**kwargs) + + +_ORDER_ENCODE = {"m.origin_server_ts": "o"} +_ORDER_DECODE = {v: k for k, v in _ORDER_ENCODE.items()} +_TAG_ENCODE = {"m.include_all": "i", "m.ignore": "x"} +_TAG_DECODE = {v: k for k, v in _TAG_ENCODE.items()} + + +class SyncPaginationState( + namedtuple("SyncPaginationState", ( + "order", + "value", + "limit", + "tags", + )) +): + @classmethod + def from_dict(cls, d): + try: + return cls(_ORDER_DECODE[d["o"]], d["v"], d["l"], _TAG_DECODE[d["t"]]) + except: + raise SynapseError(400, "Invalid Token") + + def to_dict(self): + return { + "o": _ORDER_ENCODE[self.order], + "v": self.value, + "l": self.limit, + "t": _TAG_ENCODE[self.tags], + } + + def replace(self, **kwargs): + return self._replace(**kwargs) + + class StreamToken( - namedtuple("Token", ( + namedtuple("StreamToken", ( "room_key", "presence_key", "typing_key", @@ -141,6 +207,20 @@ def from_string(cls, string): def to_string(self): return self._SEPARATOR.join([str(k) for k in self]) + @classmethod + def from_arr(cls, arr): + try: + keys = arr + while len(keys) < len(cls._fields): + # i.e. old token from before receipt_key + keys.append("0") + return cls(*keys) + except: + raise SynapseError(400, "Invalid Token") + + def to_arr(self): + return self + @property def room_stream_id(self): # TODO(markjh): Awful hack to work around hacks in the presence tests