Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Comments from review
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Mar 18, 2020
1 parent 65a941d commit 6e6476e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 8 deletions.
3 changes: 3 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,9 @@ def process_replication_rows(self, stream_name, token, rows):

# ... as well as device updates and messages
elif stream_name == DeviceListsStream.NAME:
# The entities are either user IDs (starting with '@') whose devices
# have changed, or remote servers that we need to tell about
# changes.
hosts = {row.entity for row in rows if not row.entity.startswith("@")}
for host in hosts:
self.federation_sender.send_device_messages(host)
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def process_replication_rows(self, stream_name, token, rows):

def _invalidate_caches_for_devices(self, token, rows):
for row in rows:
# The entities are either user IDs (starting with '@') whose devices
# have changed, or remote servers that we need to tell about
# changes.
if row.entity.startswith("@"):
self._device_list_stream_cache.entity_has_changed(row.entity, token)
self.get_cached_devices_for_user.invalidate((row.entity,))
Expand Down
27 changes: 19 additions & 8 deletions synapse/storage/data_stores/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List, Tuple

from six import iteritems

Expand All @@ -31,7 +32,7 @@
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import Database
from synapse.storage.database import Database, LoggingTransaction
from synapse.types import Collection, get_verify_key_from_cross_signing_key
from synapse.util.caches.descriptors import (
Cache,
Expand Down Expand Up @@ -574,10 +575,12 @@ def get_users_whose_signatures_changed(self, user_id, from_key):
else:
return set()

def get_all_device_list_changes_for_remotes(self, from_key, to_key):
"""Return a list of `(stream_id, user_id, destination)` which is the
combined list of changes to devices, and which destinations need to be
poked. `destination` may be None if no destinations need to be poked.
async def get_all_device_list_changes_for_remotes(
self, from_key: int, to_key: int
) -> List[Tuple[int, str]]:
"""Return a list of `(stream_id, entity)` which is the combined list of
changes to devices and which destinations need to be poked. Entity is
either a user ID (starting with '@') or a remote destination.
"""

# This query Does The Right Thing where it'll correctly apply the
Expand All @@ -591,7 +594,7 @@ def get_all_device_list_changes_for_remotes(self, from_key, to_key):
WHERE ? < stream_id AND stream_id <= ?
"""

return self.db.execute(
return await self.db.execute(
"get_all_device_list_changes_for_remotes", None, sql, from_key, to_key
)

Expand Down Expand Up @@ -1018,19 +1021,27 @@ def add_device_change_to_streams(self, user_id, device_ids, hosts):

return stream_ids[-1]

def _add_device_change_to_stream_txn(self, txn, user_id, device_ids, stream_ids):
def _add_device_change_to_stream_txn(
self,
txn: LoggingTransaction,
user_id: str,
device_ids: Collection[str],
stream_ids: List[str],
):
txn.call_after(
self._device_list_stream_cache.entity_has_changed, user_id, stream_ids[-1],
)

min_stream_id = stream_ids[0]

# Delete older entries in the table, as we really only care about
# when the latest change happened.
txn.executemany(
"""
DELETE FROM device_lists_stream
WHERE user_id = ? AND device_id = ? AND stream_id < ?
""",
[(user_id, device_id, stream_ids[0]) for device_id in device_ids],
[(user_id, device_id, min_stream_id) for device_id in device_ids],
)

self.db.simple_insert_many_txn(
Expand Down

0 comments on commit 6e6476e

Please sign in to comment.