Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Ignore depth when updating read-receipts
Browse files Browse the repository at this point in the history
Order read receipts by stream ordering instead of depth
  • Loading branch information
richvdh committed Jun 1, 2018
1 parent c2c3092 commit 857e6fd
Showing 1 changed file with 37 additions and 30 deletions.
67 changes: 37 additions & 30 deletions synapse/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.api.errors import NotFoundError
from ._base import SQLBaseStore
from .util.id_generators import StreamIdGenerator
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
Expand Down Expand Up @@ -332,6 +332,41 @@ def get_max_receipt_stream_id(self):

def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, data, stream_id):
res = self._simple_select_one_txn(
txn,
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
allow_none=True
)

if not res:
raise NotFoundError(
"Cannot set read receipt on unknown event %s" % (
event_id,
),
)

stream_ordering = int(res["stream_ordering"])

# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
sql = (
"SELECT stream_ordering, event_id FROM events"
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
)
txn.execute(sql, (room_id, receipt_type, user_id))

for so, eid in txn:
if int(so) >= stream_ordering:
logger.debug(
"Ignoring new receipt for %s in favour of existing "
"one for later event %s",
event_id, eid,
)
return False

txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
Expand All @@ -355,34 +390,6 @@ def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
(user_id, room_id, receipt_type)
)

res = self._simple_select_one_txn(
txn,
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
allow_none=True
)

topological_ordering = int(res["topological_ordering"]) if res else None
stream_ordering = int(res["stream_ordering"]) if res else None

# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
sql = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
)

txn.execute(sql, (room_id, receipt_type, user_id))

if topological_ordering:
for to, so, _ in txn:
if int(to) > topological_ordering:
return False
elif int(to) == topological_ordering and int(so) >= stream_ordering:
return False

self._simple_delete_txn(
txn,
table="receipts_linearized",
Expand All @@ -406,7 +413,7 @@ def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
}
)

if receipt_type == "m.read" and topological_ordering:
if receipt_type == "m.read":
self._remove_old_push_actions_before_txn(
txn,
room_id=room_id,
Expand Down

0 comments on commit 857e6fd

Please sign in to comment.