Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

bg update to clear out duplicate outbound_device_list_pokes #7193

Merged
merged 5 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7193.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a background database update job to clear out duplicate `device_lists_outbound_pokes`.
16 changes: 5 additions & 11 deletions synapse/storage/data_stores/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import Database
from synapse.storage.database import Database, make_tuple_comparison_clause
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.descriptors import Cache

Expand Down Expand Up @@ -303,16 +303,10 @@ def _devices_last_seen_update_txn(txn):
# we'll just end up updating the same device row multiple
# times, which is fine.

if self.database_engine.supports_tuple_comparison:
where_clause = "(user_id, device_id) > (?, ?)"
where_args = [last_user_id, last_device_id]
else:
# We explicitly do a `user_id >= ? AND (...)` here to ensure
# that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)`
# makes it hard for query optimiser to tell that it can use the
# index on user_id
where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)"
where_args = [last_user_id, last_user_id, last_device_id]
where_clause, where_args = make_tuple_comparison_clause(
self.database_engine,
[("user_id", last_user_id), ("device_id", last_device_id)],
)

sql = """
SELECT
Expand Down
73 changes: 72 additions & 1 deletion synapse/storage/data_stores/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import Database, LoggingTransaction
from synapse.storage.database import (
Database,
LoggingTransaction,
make_tuple_comparison_clause,
)
from synapse.types import Collection, get_verify_key_from_cross_signing_key
from synapse.util.caches.descriptors import (
Cache,
Expand All @@ -49,6 +53,8 @@
"drop_device_list_streams_non_unique_indexes"
)

BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"


class DeviceWorkerStore(SQLBaseStore):
def get_device(self, user_id, device_id):
Expand Down Expand Up @@ -714,6 +720,11 @@ def __init__(self, database: Database, db_conn, hs):
self._drop_device_list_streams_non_unique_indexes,
)

# clear out duplicate device list outbound pokes
self.db.updates.register_background_update_handler(
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes,
)

@defer.inlineCallbacks
def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
def f(conn):
Expand All @@ -728,6 +739,66 @@ def f(conn):
)
return 1

async def _remove_duplicate_outbound_pokes(self, progress, batch_size):
# for some reason, we have accumulated duplicate entries in
# device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
# efficient.
#
# For each duplicate, we delete all the existing rows and put one back.

KEY_COLS = ["stream_id", "destination", "user_id", "device_id"]
last_row = progress.get(
"last_row",
{"stream_id": 0, "destination": "", "user_id": "", "device_id": ""},
)

def _txn(txn):
clause, args = make_tuple_comparison_clause(
self.db.engine, [(x, last_row[x]) for x in KEY_COLS]
)
sql = """
SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts
FROM device_lists_outbound_pokes
WHERE %s
GROUP BY %s
HAVING count(*) > 1
ORDER BY %s
LIMIT ?
""" % (
clause, # WHERE
",".join(KEY_COLS), # GROUP BY
",".join(KEY_COLS), # ORDER BY
)
txn.execute(sql, args + [batch_size])
rows = self.db.cursor_to_dict(txn)

row = None
for row in rows:
self.db.simple_delete_txn(
txn, "device_lists_outbound_pokes", {x: row[x] for x in KEY_COLS},
)

row["sent"] = False
self.db.simple_insert_txn(
txn, "device_lists_outbound_pokes", row,
)

if row:
self.db.updates._background_update_progress_txn(
txn, BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, {"last_row": row},
)

return len(rows)

rows = await self.db.runInteraction(BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, _txn)

if not rows:
await self.db.updates._end_background_update(
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES
)

return rows


class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
def __init__(self, database: Database, db_conn, hs):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* for some reason, we have accumulated duplicate entries in
* device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
* efficient.
*/

INSERT INTO background_updates (ordering, update_name, progress_json)
VALUES (5800, 'remove_dup_outbound_pokes', '{}');
83 changes: 82 additions & 1 deletion synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,17 @@
import logging
import time
from time import monotonic as monotonic_time
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Tuple,
TypeVar,
)

from six import iteritems, iterkeys, itervalues
from six.moves import intern, range
Expand Down Expand Up @@ -1557,3 +1567,74 @@ def make_in_list_sql_clause(
return "%s = ANY(?)" % (column,), [list(iterable)]
else:
return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)


KV = TypeVar("KV")


def make_tuple_comparison_clause(
database_engine: BaseDatabaseEngine, keys: List[Tuple[str, KV]]
) -> Tuple[str, List[KV]]:
"""Returns a tuple comparison SQL clause

Depending what the SQL engine supports, builds a SQL clause that looks like either
"(a, b) > (?, ?)", or "(a > ?) OR (a == ? AND b > ?)".

Args:
database_engine
keys: A set of (column, value) pairs to be compared.

Returns:
A tuple of SQL query and the args
"""
if database_engine.supports_tuple_comparison:
return (
"(%s) > (%s)" % (",".join(k[0] for k in keys), ",".join("?" for _ in keys)),
[k[1] for k in keys],
)

# we want to build a clause
# (a > ?) OR
# (a == ? AND b > ?) OR
# (a == ? AND b == ? AND c > ?)
# ...
# (a == ? AND b == ? AND ... AND z > ?)
#
# or, equivalently:
#
# (a > ? OR (a == ? AND
# (b > ? OR (b == ? AND
# ...
# (y > ? OR (y == ? AND
# z > ?
# ))
# ...
# ))
# ))
#
# which itself is equivalent to (and apparently easier for the query optimiser):
#
# (a >= ? AND (a > ? OR
# (b >= ? AND (b > ? OR
# ...
# (y >= ? AND (y > ? OR
# z > ?
# ))
# ...
# ))
# ))
#
#

clause = ""
args = [] # type: List[KV]
for k, v in keys[:-1]:
clause = clause + "(%s >= ? AND (%s > ? OR " % (k, k)
args.extend([v, v])

(k, v) = keys[-1]
clause += "%s > ?" % (k,)
args.append(v)

clause += "))" * (len(keys) - 1)
return clause, args
52 changes: 52 additions & 0 deletions tests/storage/test_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.storage.database import make_tuple_comparison_clause
from synapse.storage.engines import BaseDatabaseEngine

from tests import unittest


def _stub_db_engine(**kwargs) -> BaseDatabaseEngine:
# returns a DatabaseEngine, circumventing the abc mechanism
# any kwargs are set as attributes on the class before instantiating it
t = type(
"TestBaseDatabaseEngine",
(BaseDatabaseEngine,),
dict(BaseDatabaseEngine.__dict__),
)
# defeat the abc mechanism
t.__abstractmethods__ = set()
for k, v in kwargs.items():
setattr(t, k, v)
return t(None, None)


class TupleComparisonClauseTestCase(unittest.TestCase):
def test_native_tuple_comparison(self):
db_engine = _stub_db_engine(supports_tuple_comparison=True)
clause, args = make_tuple_comparison_clause(db_engine, [("a", 1), ("b", 2)])
self.assertEqual(clause, "(a,b) > (?,?)")
self.assertEqual(args, [1, 2])

def test_emulated_tuple_comparison(self):
db_engine = _stub_db_engine(supports_tuple_comparison=False)
clause, args = make_tuple_comparison_clause(
db_engine, [("a", 1), ("b", 2), ("c", 3)]
)
self.assertEqual(
clause, "(a >= ? AND (a > ? OR (b >= ? AND (b > ? OR c > ?))))"
)
self.assertEqual(args, [1, 1, 2, 2, 3])