Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: add user record cleanup script
Browse files Browse the repository at this point in the history
Add's a new drop_user command that scans the AccessIndex for
users that haven't connected in the given month and removes the
route records.

Closes #645
  • Loading branch information
bbangert committed Oct 3, 2016
1 parent fca3589 commit 4063c52
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 12 deletions.
122 changes: 112 additions & 10 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from boto.dynamodb2.layer1 import DynamoDBConnection
from boto.dynamodb2.table import Table, Item
from boto.dynamodb2.types import NUMBER
from typing import Iterable, List # flake8: noqa

from autopush.exceptions import AutopushException
from autopush.utils import (
Expand All @@ -62,7 +63,13 @@

def get_month(delta=0):
"""Basic helper function to get a datetime.date object iterations months
ahead/behind of now."""
ahead/behind of now.
:type delta: int
:rtype: datetime.datetime
"""
new = last = datetime.date.today()
# Move until we hit a new month, this avoids having to manually
# check year changes as we push forward or backward since the Python
Expand Down Expand Up @@ -155,8 +162,9 @@ def create_router_table(tablename="router", read_throughput=5,
global_indexes=[
GlobalKeysOnlyIndex(
'AccessIndex',
parts=[HashKey('last_connect',
data_type=NUMBER)],
parts=[
HashKey('last_connect',
data_type=NUMBER)],
throughput=dict(read=5, write=5))],
)

Expand Down Expand Up @@ -265,7 +273,13 @@ def wrapper(self, *args, **kwargs):


def has_connected_this_month(item):
"""Whether or not a router item has connected this month"""
"""Whether or not a router item has connected this month
:type item: dict
:rtype: bool
"""
last_connect = item.get("last_connect")
if not last_connect:
return False
Expand All @@ -276,17 +290,47 @@ def has_connected_this_month(item):


def generate_last_connect():
"""Generate a last_connect"""
"""Generate a last_connect
This intentionally generates a limited set of keys for each month in a
known sequence. For each month, there's 24 hours * 10 random numbers for
a total of 240 keys per month depending on when the user migrates forward.
:type date: datetime.datetime
:rtype: int
"""
today = datetime.datetime.today()
val = "".join([
str(today.year),
str(today.month).zfill(2),
str(today.hour).zfill(2),
str(random.randint(0, 10)).zfill(4),
])
str(today.year),
str(today.month).zfill(2),
str(today.hour).zfill(2),
str(random.randint(0, 10)).zfill(4),
])
return int(val)


def generate_last_connect_values(date):
"""Generator of last_connect values for a given date
Creates an iterator that yields all the valid values for ``last_connect``
for a given year/month.
:type date: datetime.datetime
:rtype: Iterable[int]
"""
year = str(date.year)
month = str(date.month).zfill(2)
for hour in range(0, 24):
for rand_int in range(0, 11):
val = "".join([year, month, str(hour).zfill(2),
str(rand_int).zfill(4)])
yield int(val)


class Storage(object):
"""Create a Storage table abstraction on top of a DynamoDB Table object"""
def __init__(self, table, metrics):
Expand Down Expand Up @@ -606,6 +650,64 @@ def drop_user(self, uaid):
return self.table.delete_item(uaid=huaid,
expected={"uaid__eq": huaid})

def delete_uaids(self, uaids):
"""Issue a batch delete call for the given uaids
:type uaids: List[str]
"""
with self.table.batch_write() as batch:
for uaid in uaids:
batch.delete_item(uaid=uaid)

def drop_old_users(self, months_ago=2):
"""Drops user records that have no recent connection
Utilizes the last_connect index to locate users that haven't
connected in the given time-frame.
The caller must iterate through this generator to trigger batch
delete calls of the ``batch_size`` provided. Caller should wait as
appropriate to avoid exceeding table limits.
Each iteration will result in a batch delete for the currently
iterated batch. This implies a set of writes equal in size to the
``batch_size * record-size``.
.. warning::
Calling list() on this generator will likely exceed provisioned
write through-put as the batch-delete calls will be made as
quickly as possible.
:param months_ago: how many months ago since the last connect
:type months_ago: int
:returns: Iterable of how many deletes were run
:rtype: Iterable[int]
"""
prior_date = get_month(-months_ago)

batched = []
for hash_key in generate_last_connect_values(prior_date):
result_set = self.table.query_2(
last_connect__eq=hash_key,
index="AccessIndex",
)
for result in result_set:
batched.append(result["uaid"])

if len(batched) == 25:
self.delete_uaids(batched)
batched = []
yield 25

# Delete any leftovers
if batched:
self.delete_uaids(batched)
yield len(batched)

@track_provisioned
def update_message_month(self, uaid, month):
"""Update the route tables current_message_month
Expand Down
1 change: 1 addition & 0 deletions autopush/scripts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#
39 changes: 39 additions & 0 deletions autopush/scripts/drop_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import time

import click

from autopush.db import (
get_router_table,
Router,
)
from autopush.metrics import SinkMetrics


@click.command()
@click.option('--router_table_name', help="Name of the router table.")
@click.option('--months-ago', default=2, help="Months ago to remove.")
@click.option('--batch_size', default=25,
help="Deletes to run before pausing.")
@click.option('--pause_time', default=1,
help="Seconds to pause between batches.")
def drop_users(router_table_name, months_ago, batch_size, pause_time):
router_table = get_router_table(router_table_name)
router = Router(router_table, SinkMetrics())

click.echo("Deleting users with a last_connect %s months ago."
% months_ago)

count = 0
for deletes in router.drop_old_users(months_ago):
click.echo("")
count += deletes
if count > batch_size:
click.echo("Deleted %s user records, pausing for %s seconds."
% pause_time)
time.sleep(pause_time)
count = 0
click.echo("Finished old user purge.")


if __name__ == '__main__': # pragma: nocover
drop_users()
23 changes: 22 additions & 1 deletion autopush/tests/test_db.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
import uuid

from autopush.exceptions import AutopushException
from autopush.websocket import ms_time
from boto.dynamodb2.exceptions import (
ConditionalCheckFailedException,
ProvisionedThroughputExceededException,
Expand All @@ -23,7 +23,9 @@
Storage,
Message,
Router,
generate_last_connect,
)
from autopush.exceptions import AutopushException
from autopush.metrics import SinkMetrics
from autopush.utils import WebPushNotification

Expand Down Expand Up @@ -361,6 +363,25 @@ def setUp(self):
def tearDown(self):
self.real_table.connection = self.real_connection

def _create_minimal_record(self):
data = {
"uaid": str(uuid.uuid4()),
"router_type": "webupsh",
"last_connect": generate_last_connect(),
"connected_at": ms_time(),
}
return data

def test_drop_old_users(self):
# First create a bunch of users
r = get_router_table()
router = Router(r, SinkMetrics())
for _ in range(0, 53):
router.register_user(self._create_minimal_record())

results = router.drop_old_users(months_ago=0)
eq_(list(results), [25, 25, 3])

def test_custom_tablename(self):
db = DynamoDBConnection()
db_name = "router_%s" % uuid.uuid4()
Expand Down
2 changes: 1 addition & 1 deletion autopush/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def periodic_reporter(settings):
settings.metrics.gauge("update.client.readers",
len(reactor.getReaders()))
settings.metrics.gauge("update.client.connections",
len(settings.clients))
len(settings.clients))
settings.metrics.gauge("update.client.ws_connections",
settings.factory.countConnections)

Expand Down
1 change: 1 addition & 0 deletions base-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ boto3==1.4.0
botocore==1.4.50
cffi==1.7.0
characteristic==14.3.0
click==6.6
contextlib2==0.5.4
cryptography==1.5
cyclone==1.1
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
autoendpoint = autopush.main:endpoint_main
autokey = autokey:main
endpoint_diagnostic = autopush.diagnostic_cli:run_endpoint_diagnostic_cli
drop_users = autopush.scripts.drop_user:drop_users
[nose.plugins]
object-tracker = autopush.noseplugin:ObjectTracker
""",
Expand Down

0 comments on commit 4063c52

Please sign in to comment.