Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Rewrite prune_old_outbound_device_pokes for efficiency #7159

Merged
merged 5 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7159.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix excessive CPU usage by `prune_old_outbound_device_pokes` job.
25 changes: 2 additions & 23 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.handlers._base import BaseHandler
from synapse.logging.context import (
make_deferred_yieldable,
nested_logging_context,
Expand All @@ -69,10 +70,9 @@
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server

from ._base import BaseHandler

logger = logging.getLogger(__name__)


Expand All @@ -93,27 +93,6 @@ class _NewEventInfo:
auth_events = attr.ib(type=Optional[StateMap[EventBase]], default=None)


def shortstr(iterable, maxitems=5):
"""If iterable has maxitems or fewer, return the stringification of a list
containing those items.

Otherwise, return the stringification of a a list with the first maxitems items,
followed by "...".

Args:
iterable (Iterable): iterable to truncate
maxitems (int): number of items to return before truncating

Returns:
unicode
"""

items = list(itertools.islice(iterable, maxitems + 1))
if len(items) <= maxitems:
return str(items)
return "[" + ", ".join(repr(r) for r in items[:maxitems]) + ", ...]"


class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
Expand Down
79 changes: 58 additions & 21 deletions synapse/storage/data_stores/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
cachedList,
)
from synapse.util.iterutils import batch_iter
from synapse.util.stringutils import shortstr

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1092,18 +1093,47 @@ def _add_device_outbound_poke_to_stream_txn(
],
)

def _prune_old_outbound_device_pokes(self):
def _prune_old_outbound_device_pokes(self, prune_age=24 * 60 * 60 * 1000):
"""Delete old entries out of the device_lists_outbound_pokes to ensure
that we don't fill up due to dead servers. We keep one entry per
(destination, user_id) tuple to ensure that the prev_ids remain correct
if the server does come back.
that we don't fill up due to dead servers.

Normally, we try to send device updates as a delta since a previous known point:
this is done by setting the prev_id in the m.device_list_update EDU. However,
for that to work, we have to have a complete record of each change to
each device, which can add up to quite a lot of data.

An alternative mechanism is that, if the remote server sees that it has missed
an entry in the stream_id sequence for a given user, it will request a full
list of that user's devices. Hence, we can reduce the amount of data we have to
store (and transmit in some future transaction), by clearing almost everything
for a given destination out of the database, and having the remote server
resync.

All we need to do is make sure we keep at least one row for each
(user, destination) pair, to remind us to send a m.device_list_update EDU for
that user when the destination comes back. It doesn't matter which device
we keep.
"""
yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000
yesterday = self._clock.time_msec() - prune_age

def _prune_txn(txn):
# look for (user, destination) pairs which have an update older than
# the cutoff.
#
# For each pair, we also need to know the most recent stream_id, and
# an arbitrary device_id at that stream_id.
select_sql = """
SELECT destination, user_id, max(stream_id) as stream_id
FROM device_lists_outbound_pokes
SELECT
dlop1.destination,
dlop1.user_id,
MAX(dlop1.stream_id) AS stream_id,
(SELECT MIN(dlop2.device_id) AS device_id FROM
device_lists_outbound_pokes dlop2
WHERE dlop2.destination = dlop1.destination AND
dlop2.user_id=dlop1.user_id AND
dlop2.stream_id=MAX(dlop1.stream_id)
)
FROM device_lists_outbound_pokes dlop1
GROUP BY destination, user_id
HAVING min(ts) < ? AND count(*) > 1
"""
Expand All @@ -1114,24 +1144,31 @@ def _prune_txn(txn):
if not rows:
return

delete_sql = """
DELETE FROM device_lists_outbound_pokes
WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
"""

txn.executemany(
delete_sql, ((yesterday, row[0], row[1], row[2]) for row in rows)
logger.info(
"Pruning old outbound device list updates for %i users/destinations: %s",
len(rows),
shortstr((row[0], row[1]) for row in rows),
)

# Since we've deleted unsent deltas, we need to remove the entry
# of last successful sent so that the prev_ids are correctly set.
sql = """
DELETE FROM device_lists_outbound_last_success
WHERE destination = ? AND user_id = ?
# we want to keep the update with the highest stream_id for each user.
#
# there might be more than one update (with different device_ids) with the
# same stream_id, so we also delete all but one rows with the max stream id.
delete_sql = """
DELETE FROM device_lists_outbound_pokes
WHERE destination = ? AND user_id = ? AND (
stream_id < ? OR
(stream_id = ? AND device_id != ?)
)
"""
txn.executemany(sql, ((row[0], row[1]) for row in rows))
count = 0
for (destination, user_id, stream_id, device_id) in rows:
txn.execute(
delete_sql, (destination, user_id, stream_id, stream_id, device_id)
)
count += txn.rowcount

logger.info("Pruned %d device list outbound pokes", txn.rowcount)
logger.info("Pruned %d device list outbound pokes", count)

return run_as_background_process(
"prune_old_outbound_device_pokes",
Expand Down
21 changes: 20 additions & 1 deletion synapse/util/stringutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
import random
import re
import string
from collections import Iterable

import six
from six import PY2, PY3
Expand Down Expand Up @@ -126,3 +127,21 @@ def assert_valid_client_secret(client_secret):
raise SynapseError(
400, "Invalid client_secret parameter", errcode=Codes.INVALID_PARAM
)


def shortstr(iterable: Iterable, maxitems: int = 5) -> str:
"""If iterable has maxitems or fewer, return the stringification of a list
containing those items.

Otherwise, return the stringification of a a list with the first maxitems items,
followed by "...".

Args:
iterable: iterable to truncate
maxitems: number of items to return before truncating
"""

items = list(itertools.islice(iterable, maxitems + 1))
if len(items) <= maxitems:
return str(items)
return "[" + ", ".join(repr(r) for r in items[:maxitems]) + ", ...]"
37 changes: 37 additions & 0 deletions tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,43 @@ def test_unreachable_server(self):
devices = {edu["content"]["device_id"] for edu in self.edus}
self.assertEqual({"D1", "D2", "D3"}, devices)

def test_prune_outbound_device_pokes(self):
"""If a destination is unreachable, and the updates are pruned, we should get
a single update"""
mock_send_txn = self.hs.get_federation_transport_client().send_transaction
mock_send_txn.side_effect = lambda t, cb: defer.fail("fail")

# create devices
u1 = self.register_user("user", "pass")
self.login("user", "pass", device_id="D1")
self.login("user", "pass", device_id="D2")
self.login("user", "pass", device_id="D3")

# delete them again
self.get_success(
self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
)

self.assertGreaterEqual(mock_send_txn.call_count, 4)

# run the prune job
self.reactor.advance(10)
self.get_success(
self.hs.get_datastore()._prune_old_outbound_device_pokes(prune_age=1)
)

# recover the server
mock_send_txn.side_effect = self.record_transaction
self.hs.get_federation_sender().send_device_messages("host2")
self.pump()

# there should be a single update for this user.
self.assertEqual(len(self.edus), 1)
edu = self.edus.pop(0)
self.assertEqual(edu["edu_type"], "m.device_list_update")
c = edu["content"]
self.assertEqual(c["prev_id"], [])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get a test where we successfully send an update before we prune please? Now that we're not deleting from device_lists_outbound_last_success its not clear what the prev_id will be in that case.

Separately, the spec doesn't actually say that the remote should resync when prev_id is empty, though Synapse does. We should either a) fix Synapse to point to a dropped update or b) update the spec. I guess I'm happy with not trying to fix it in this PR, but if so we should at least file an issue or something

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thanks for making me think about this properly. Removing the deletion from device_lists_outbound_last_success was the wrong thing.

Fixing it isn't entirely trivial: have raised #7173 to track


def check_device_update_edu(
self,
edu: JsonDict,
Expand Down