Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #6089 from matrix-org/erikj/cleanup_user_ips
Browse files Browse the repository at this point in the history
  • Loading branch information
anoadragon453 committed Feb 25, 2020
2 parents ba03d64 + 9614d3c commit 41e59e6
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 50 deletions.
1 change: 1 addition & 0 deletions changelog.d/6089.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move last seen info into devices table.
124 changes: 75 additions & 49 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def __init__(self, db_conn, hs):
"user_ips_drop_nonunique_index", self._remove_user_ip_nonunique
)

# Update the last seen info in devices.
self.register_background_update_handler(
"devices_last_seen", self._devices_last_seen_update
)

# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

Expand Down Expand Up @@ -354,6 +359,21 @@ def _update_client_ips_batch_txn(self, txn, to_update):
},
lock=False,
)

# Technically an access token might not be associated with
# a device so we need to check.
if device_id:
self._simple_upsert_txn(
txn,
table="devices",
keyvalues={"user_id": user_id, "device_id": device_id},
values={
"user_agent": user_agent,
"last_seen": last_seen,
"ip": ip,
},
lock=False,
)
except Exception as e:
# Failed to upsert, log and continue
logger.error("Failed to insert client IP %r: %r", entry, e)
Expand All @@ -372,19 +392,14 @@ def get_last_client_ip_by_device(self, user_id, device_id):
keys giving the column names
"""

res = yield self.runInteraction(
"get_last_client_ip_by_device",
self._get_last_client_ip_by_device_txn,
user_id,
device_id,
retcols=(
"user_id",
"access_token",
"ip",
"user_agent",
"device_id",
"last_seen",
),
keyvalues = {"user_id": user_id}
if device_id is not None:
keyvalues["device_id"] = device_id

res = yield self._simple_select_list(
table="devices",
keyvalues=keyvalues,
retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
)

ret = {(d["user_id"], d["device_id"]): d for d in res}
Expand All @@ -403,42 +418,6 @@ def get_last_client_ip_by_device(self, user_id, device_id):
}
return ret

@classmethod
def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols):
where_clauses = []
bindings = []
if device_id is None:
where_clauses.append("user_id = ?")
bindings.extend((user_id,))
else:
where_clauses.append("(user_id = ? AND device_id = ?)")
bindings.extend((user_id, device_id))

if not where_clauses:
return []

inner_select = (
"SELECT MAX(last_seen) mls, user_id, device_id FROM user_ips "
"WHERE %(where)s "
"GROUP BY user_id, device_id"
) % {"where": " OR ".join(where_clauses)}

sql = (
"SELECT %(retcols)s FROM user_ips "
"JOIN (%(inner_select)s) ips ON"
" user_ips.last_seen = ips.mls AND"
" user_ips.user_id = ips.user_id AND"
" (user_ips.device_id = ips.device_id OR"
" (user_ips.device_id IS NULL AND ips.device_id IS NULL)"
" )"
) % {
"retcols": ",".join("user_ips." + c for c in retcols),
"inner_select": inner_select,
}

txn.execute(sql, bindings)
return cls.cursor_to_dict(txn)

@defer.inlineCallbacks
def get_user_ip_and_agents(self, user):
user_id = user.to_string()
Expand Down Expand Up @@ -470,3 +449,50 @@ def get_user_ip_and_agents(self, user):
}
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
)

@defer.inlineCallbacks
def _devices_last_seen_update(self, progress, batch_size):
"""Background update to insert last seen info into devices table
"""

last_user_id = progress.get("last_user_id", "")
last_device_id = progress.get("last_device_id", "")

def _devices_last_seen_update_txn(txn):
sql = """
SELECT u.last_seen, u.ip, u.user_agent, user_id, device_id FROM devices
INNER JOIN user_ips AS u USING (user_id, device_id)
WHERE user_id > ? OR (user_id = ? AND device_id > ?)
ORDER BY user_id ASC, device_id ASC
LIMIT ?
"""
txn.execute(sql, (last_user_id, last_user_id, last_device_id, batch_size))

rows = txn.fetchall()
if not rows:
return 0

sql = """
UPDATE devices
SET last_seen = ?, ip = ?, user_agent = ?
WHERE user_id = ? AND device_id = ?
"""
txn.execute_batch(sql, rows)

_, _, _, user_id, device_id = rows[-1]
self._background_update_progress_txn(
txn,
"devices_last_seen",
{"last_user_id": user_id, "last_device_id": device_id},
)

return len(rows)

updated = yield self.runInteraction(
"_devices_last_seen_update", _devices_last_seen_update_txn
)

if not updated:
yield self._end_background_update("devices_last_seen")

return updated
24 changes: 24 additions & 0 deletions synapse/storage/schema/delta/56/devices_last_seen.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/* Copyright 2019 Matrix.org Foundation CIC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Track last seen information for a device in the devices table, rather
-- than relying on it being in the user_ips table (which we want to be able
-- to purge old entries from)
ALTER TABLE devices ADD COLUMN last_seen BIGINT;
ALTER TABLE devices ADD COLUMN ip TEXT;
ALTER TABLE devices ADD COLUMN user_agent TEXT;

INSERT INTO background_updates (update_name, progress_json) VALUES
('devices_last_seen', '{}');
80 changes: 79 additions & 1 deletion tests/storage/test_client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def test_insert_new_client_ip(self):
{
"user_id": user_id,
"device_id": "device_id",
"access_token": "access_token",
"ip": "ip",
"user_agent": "user_agent",
"last_seen": 12345678000,
Expand Down Expand Up @@ -201,6 +200,85 @@ def test_updating_monthly_active_user_when_space(self):
active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
self.assertTrue(active)

def test_devices_last_seen_bg_update(self):
# First make sure we have completed all updates.
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)

# Insert a user IP
user_id = "@user:id"
self.get_success(
self.store.insert_client_ip(
user_id, "access_token", "ip", "user_agent", "device_id"
)
)

# Force persisting to disk
self.reactor.advance(200)

# But clear the associated entry in devices table
self.get_success(
self.store._simple_update(
table="devices",
keyvalues={"user_id": user_id, "device_id": "device_id"},
updatevalues={"last_seen": None, "ip": None, "user_agent": None},
desc="test_devices_last_seen_bg_update",
)
)

# We should now get nulls when querying
result = self.get_success(
self.store.get_last_client_ip_by_device(user_id, "device_id")
)

r = result[(user_id, "device_id")]
self.assertDictContainsSubset(
{
"user_id": user_id,
"device_id": "device_id",
"ip": None,
"user_agent": None,
"last_seen": None,
},
r,
)

# Register the background update to run again.
self.get_success(
self.store._simple_insert(
table="background_updates",
values={
"update_name": "devices_last_seen",
"progress_json": "{}",
"depends_on": None,
},
)
)

# ... and tell the DataStore that it hasn't finished all updates yet
self.store._all_done = False

# Now let's actually drive the updates to completion
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)

# We should now get the correct result again
result = self.get_success(
self.store.get_last_client_ip_by_device(user_id, "device_id")
)

r = result[(user_id, "device_id")]
self.assertDictContainsSubset(
{
"user_id": user_id,
"device_id": "device_id",
"ip": "ip",
"user_agent": "user_agent",
"last_seen": 0,
},
r,
)


class ClientIpAuthTestCase(unittest.HomeserverTestCase):

Expand Down

0 comments on commit 41e59e6

Please sign in to comment.