Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Implement unread counter (MSC2625) #7673

Merged
merged 30 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2292dc3
Add experimental "dont_push" push action to suppress push for notific…
ara4n Sep 18, 2019
dd8e24f
changelog
ara4n Sep 19, 2019
ec0a7b9
Merge branch 'develop' into babolivier/mark_unread
babolivier Jun 10, 2020
6f6a4bf
Rename dont_push into mark_unread
babolivier Jun 10, 2020
ef345c5
Add a new unread_counter to sync responses
babolivier Jun 10, 2020
c7b99a1
Use a more efficient way of calculating counters
babolivier Jun 10, 2020
476a897
Fix tests
babolivier Jun 10, 2020
aad40e3
Changelog
babolivier Jun 10, 2020
df3323a
Use temporary prefixes as per the MSC
babolivier Jun 10, 2020
243f0ba
Lint
babolivier Jun 10, 2020
9dbd006
Appease mypy
babolivier Jun 10, 2020
ea8f6e6
Actually act on mark_unread
babolivier Jun 11, 2020
ce74a66
Save the count of unread messages to event_push_summary
babolivier Jun 11, 2020
d0f0956
Lint
babolivier Jun 11, 2020
34fd1f7
Fix schema update
babolivier Jun 11, 2020
8032917
Fix SQL
babolivier Jun 11, 2020
cb6d4d0
Log for invalid values of notif
babolivier Jun 11, 2020
3cc7f43
Fix summary rotation
babolivier Jun 12, 2020
2a07c5d
Test that a mark_unread action updates the right counter
babolivier Jun 12, 2020
63d9a00
Remove debug logging
babolivier Jun 12, 2020
6b1fa32
Test that a mark_unread action updates the right counter when using a…
babolivier Jun 12, 2020
7e80c84
Lint
babolivier Jun 12, 2020
cf92fbb
Use attr instead of a dict
babolivier Jun 12, 2020
9549d55
Don't update the schema version
babolivier Jun 12, 2020
1e5a503
Pre-populate the unread_count column
babolivier Jun 12, 2020
e47e5a2
Incorporate review bits
babolivier Jun 12, 2020
e186c66
Lint
babolivier Jun 12, 2020
fed493c
Incorporate review
babolivier Jun 15, 2020
c2b4621
Merge branch 'develop' into babolivier/mark_unread
babolivier Jun 15, 2020
6efb2b0
Merge branch 'develop' into babolivier/mark_unread
babolivier Jun 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7673.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a per-room counter for unread messages in responses to `/sync` requests. Implements [MSC2625](https://github.com/matrix-org/matrix-doc/pull/2625).
3 changes: 3 additions & 0 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1895,6 +1895,9 @@ async def _generate_room_entry(
if notifs is not None:
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]
unread_notifications["org.matrix.msc2625.unread_count"] = notifs[
"unread_count"
]

sync_result_builder.joined.append(room_sync)

Expand Down
7 changes: 5 additions & 2 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,11 @@ def action_for_event_by_user(self, event, context):
)
if matches:
actions = [x for x in rule["actions"] if x != "dont_notify"]
if actions and "notify" in actions:
# Push rules say we should notify the user of this event
if (
"notify" in actions
or "org.matrix.msc2625.mark_unread" in actions
):
# Push rules say we should act on this event.
actions_by_user[uid] = actions
break

Expand Down
5 changes: 4 additions & 1 deletion synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ def get_badge_count(store, user_id):
)
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
badge += 1 if notifs["notify_count"] else 0
# We're populating this badge using the unread_count (instead of the
# notify_count) as this badge is the number of missed messages, not the
# number of missed notifications.
badge += 1 if notifs["unread_count"] else 0
richvdh marked this conversation as resolved.
Show resolved Hide resolved
return badge


Expand Down
4 changes: 2 additions & 2 deletions synapse/rest/client/v1/push_rule.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2014-2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -267,7 +267,7 @@ def _check_actions(actions):
raise InvalidRuleException("No actions found")

for a in actions:
if a in ["notify", "dont_notify", "coalesce"]:
if a in ["notify", "dont_notify", "coalesce", "org.matrix.msc2625.mark_unread"]:
babolivier marked this conversation as resolved.
Show resolved Hide resolved
pass
elif isinstance(a, dict) and "set_tweak" in a:
pass
Expand Down
135 changes: 108 additions & 27 deletions synapse/storage/data_stores/main/event_push_actions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2015-2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,9 +14,11 @@
# limitations under the License.

import logging
from typing import Dict, Tuple

from six import iteritems

import attr
from canonicaljson import json

from twisted.internet import defer
Expand All @@ -38,6 +39,18 @@
]


@attr.s
class EventPushSummary(object):
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""Summary of pending event push actions for a given user in a given room."""

user_id = attr.ib()
babolivier marked this conversation as resolved.
Show resolved Hide resolved
room_id = attr.ib()
unread_count = attr.ib()
stream_ordering = attr.ib()
old_user_id = attr.ib()
notif_count = attr.ib()


def _serialize_action(actions, is_highlight):
"""Custom serializer for actions. This allows us to "compress" common actions.

Expand Down Expand Up @@ -124,32 +137,50 @@ def _get_unread_counts_by_receipt_txn(

def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):

# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
# First get number of actions, grouped on whether the action notifies.
sql = (
"SELECT count(*)"
"SELECT count(*), notif"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND stream_ordering > ?"
" GROUP BY notif"
)

txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()
notify_count = row[0] if row else 0
rows = txn.fetchall()

# We should get a maximum number of two rows: one for notif = 0, which is the
# number of actions that contribute to the unread_count but not to the
# notify_count, and one for notif = 1, which is the number of actions that
# contribute to both counters. If one or both rows don't appear, then the
# value for the matching counter should be 0.
unread_count = 0
notify_count = 0
for row in rows:
# We always increment unread_count because actions that notify also
# contribute to it.
unread_count += row[0]
if row[1] == 1:
notify_count = row[0]
babolivier marked this conversation as resolved.
Show resolved Hide resolved
elif row[1] != 0:
logger.warning(
"Unexpected value %d for column 'notif' in table"
" 'event_push_actions'",
row[1],
)

txn.execute(
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""
SELECT notif_count FROM event_push_summary
SELECT notif_count, unread_count FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
""",
(room_id, user_id, stream_ordering),
)
rows = txn.fetchall()
if rows:
notify_count += rows[0][0]
unread_count += rows[0][1]

# Now get the number of highlights
sql = (
Expand All @@ -166,7 +197,11 @@ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
row = txn.fetchone()
highlight_count = row[0] if row else 0

return {"notify_count": notify_count, "highlight_count": highlight_count}
return {
"unread_count": unread_count,
"notify_count": notify_count,
"highlight_count": highlight_count,
}

@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
Expand Down Expand Up @@ -224,6 +259,7 @@ def get_after_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -252,6 +288,7 @@ def get_no_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -324,6 +361,7 @@ def get_after_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -352,6 +390,7 @@ def get_no_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -401,7 +440,7 @@ def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
def _get_if_maybe_push_in_range_for_user_txn(txn):
sql = """
SELECT 1 FROM event_push_actions
WHERE user_id = ? AND stream_ordering > ?
WHERE user_id = ? AND stream_ordering > ? AND notif = 1
LIMIT 1
"""

Expand Down Expand Up @@ -430,14 +469,15 @@ def add_push_actions_to_staging(self, event_id, user_id_actions):
return

# This is a helper function for generating the necessary tuple that
# can be used to inert into the `event_push_actions_staging` table.
# can be used to insert into the `event_push_actions_staging` table.
def _gen_entry(user_id, actions):
is_highlight = 1 if _action_has_highlight(actions) else 0
notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1
return (
event_id, # event_id column
user_id, # user_id column
_serialize_action(actions, is_highlight), # actions column
1, # notif column
notif, # notif column
is_highlight, # highlight column
)

Expand Down Expand Up @@ -819,24 +859,53 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id,
coalesce(old.notif_count, 0) + upd.notif_count,
coalesce(old.%s, 0) + upd.cnt,
upd.stream_ordering,
old.user_id
FROM (
SELECT user_id, room_id, count(*) as notif_count,
SELECT user_id, room_id, count(*) as cnt,
max(stream_ordering) as stream_ordering
FROM event_push_actions
WHERE ? <= stream_ordering AND stream_ordering < ?
AND highlight = 0
%s
GROUP BY user_id, room_id
) AS upd
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
"""

txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering))
rows = txn.fetchall()
# First get the count of unread messages.
txn.execute(
sql % ("unread_count", ""),
(old_rotate_stream_ordering, rotate_to_stream_ordering),
)

logger.info("Rotating notifications, handling %d rows", len(rows))
# We need to merge both lists into a single object because we might not have the
# same amount of rows in each of them. In this case we use a dict indexed on the
# user ID and room ID to make it easier to populate.
summaries = {} # type: Dict[Tuple[str, str], EventPushSummary]
for row in txn:
summaries[(row[0], row[1])] = EventPushSummary(
user_id=row[0],
room_id=row[1],
unread_count=row[2],
stream_ordering=row[3],
old_user_id=row[4],
notif_count=0,
)

# Then get the count of notifications.
txn.execute(
sql % ("notif_count", "AND notif = 1"),
(old_rotate_stream_ordering, rotate_to_stream_ordering),
)

# notif_rows is populated based on a subset of the query used to populate
# unread_rows, so we can be sure that there will be no KeyError here.
for row in txn:
summaries[(row[0], row[1])].notif_count = row[2]

logger.info("Rotating notifications, handling %d rows", len(summaries))

# If the `old.user_id` above is NULL then we know there isn't already an
# entry in the table, so we simply insert it. Otherwise we update the
Expand All @@ -846,22 +915,34 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
table="event_push_summary",
values=[
{
"user_id": row[0],
"room_id": row[1],
"notif_count": row[2],
"stream_ordering": row[3],
"user_id": summary.user_id,
"room_id": summary.room_id,
"notif_count": summary.notif_count,
"unread_count": summary.unread_count,
"stream_ordering": summary.stream_ordering,
}
for row in rows
if row[4] is None
for summary in summaries.values()
babolivier marked this conversation as resolved.
Show resolved Hide resolved
if summary.old_user_id is None
],
)

txn.executemany(
"""
UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
UPDATE event_push_summary
SET notif_count = ?, unread_count = ?, stream_ordering = ?
WHERE user_id = ? AND room_id = ?
""",
((row[2], row[3], row[0], row[1]) for row in rows if row[4] is not None),
(
(
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
summary.user_id,
summary.room_id,
)
for summary in summaries.values()
if summary.old_user_id is not None
),
)

txn.execute(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Store the number of unread messages, i.e. messages that triggered either a notify
-- action or a mark_unread one.
ALTER TABLE event_push_summary ADD COLUMN unread_count BIGINT NOT NULL DEFAULT 0;

-- Pre-populate the new column with the count of pending notifications.
-- We expect event_push_summary to be relatively small, so we can do this update
-- synchronously without impacting Synapse's startup time too much.
UPDATE event_push_summary SET unread_count = notif_count;
19 changes: 16 additions & 3 deletions tests/replication/slave/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_push_actions_for_user(self):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 0, "notify_count": 0},
{"highlight_count": 0, "notify_count": 0, "unread_count": 0},
)

self.persist(
Expand All @@ -173,7 +173,7 @@ def test_push_actions_for_user(self):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 0, "notify_count": 1},
{"highlight_count": 0, "notify_count": 1, "unread_count": 1},
)

self.persist(
Expand All @@ -188,7 +188,20 @@ def test_push_actions_for_user(self):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 1, "notify_count": 2},
{"highlight_count": 1, "notify_count": 2, "unread_count": 2},
)

self.persist(
type="m.room.message",
msgtype="m.text",
body="world",
push_actions=[(USER_ID_2, ["org.matrix.msc2625.mark_unread"])],
)
self.replicate()
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 1, "notify_count": 2, "unread_count": 3},
)

def test_get_rooms_for_user_with_stream_ordering(self):
Expand Down
Loading