Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

feat: add user record cleanup script #677

Merged
merged 1 commit into from
Oct 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 112 additions & 10 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from boto.dynamodb2.layer1 import DynamoDBConnection
from boto.dynamodb2.table import Table, Item
from boto.dynamodb2.types import NUMBER
from typing import Iterable, List # flake8: noqa

from autopush.exceptions import AutopushException
from autopush.utils import (
Expand All @@ -62,7 +63,13 @@

def get_month(delta=0):
"""Basic helper function to get a datetime.date object iterations months
ahead/behind of now."""
ahead/behind of now.

:type delta: int

:rtype: datetime.datetime

"""
new = last = datetime.date.today()
# Move until we hit a new month, this avoids having to manually
# check year changes as we push forward or backward since the Python
Expand Down Expand Up @@ -155,8 +162,9 @@ def create_router_table(tablename="router", read_throughput=5,
global_indexes=[
GlobalKeysOnlyIndex(
'AccessIndex',
parts=[HashKey('last_connect',
data_type=NUMBER)],
parts=[
HashKey('last_connect',
data_type=NUMBER)],
throughput=dict(read=5, write=5))],
)

Expand Down Expand Up @@ -265,7 +273,13 @@ def wrapper(self, *args, **kwargs):


def has_connected_this_month(item):
"""Whether or not a router item has connected this month"""
"""Whether or not a router item has connected this month

:type item: dict

:rtype: bool

"""
last_connect = item.get("last_connect")
if not last_connect:
return False
Expand All @@ -276,17 +290,47 @@ def has_connected_this_month(item):


def generate_last_connect():
"""Generate a last_connect"""
"""Generate a last_connect

This intentionally generates a limited set of keys for each month in a
known sequence. For each month, there's 24 hours * 10 random numbers for
a total of 240 keys per month depending on when the user migrates forward.

:type date: datetime.datetime

:rtype: int

"""
today = datetime.datetime.today()
val = "".join([
str(today.year),
str(today.month).zfill(2),
str(today.hour).zfill(2),
str(random.randint(0, 10)).zfill(4),
])
str(today.year),
str(today.month).zfill(2),
str(today.hour).zfill(2),
str(random.randint(0, 10)).zfill(4),
])
return int(val)


def generate_last_connect_values(date):
"""Generator of last_connect values for a given date

Creates an iterator that yields all the valid values for ``last_connect``
for a given year/month.

:type date: datetime.datetime

:rtype: Iterable[int]

"""
year = str(date.year)
month = str(date.month).zfill(2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this partially fixes #653?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, its how they index key is constructed.

for hour in range(0, 24):
for rand_int in range(0, 11):
val = "".join([year, month, str(hour).zfill(2),
str(rand_int).zfill(4)])
yield int(val)


class Storage(object):
"""Create a Storage table abstraction on top of a DynamoDB Table object"""
def __init__(self, table, metrics):
Expand Down Expand Up @@ -606,6 +650,64 @@ def drop_user(self, uaid):
return self.table.delete_item(uaid=huaid,
expected={"uaid__eq": huaid})

def delete_uaids(self, uaids):
"""Issue a batch delete call for the given uaids

:type uaids: List[str]

"""
with self.table.batch_write() as batch:
for uaid in uaids:
batch.delete_item(uaid=uaid)

def drop_old_users(self, months_ago=2):
"""Drops user records that have no recent connection

Utilizes the last_connect index to locate users that haven't
connected in the given time-frame.

The caller must iterate through this generator to trigger batch
delete calls. Caller should wait as appropriate to avoid exceeding
table limits.

Each iteration will result in a batch delete for the currently
iterated batch. This implies a set of writes equal in size to the
``25 * record-size`` minimum.

.. warning::

Calling list() on this generator will likely exceed provisioned
write through-put as the batch-delete calls will be made as
quickly as possible.

:param months_ago: how many months ago since the last connect
:type months_ago: int

:returns: Iterable of how many deletes were run
:rtype: Iterable[int]

"""
prior_date = get_month(-months_ago)

batched = []
for hash_key in generate_last_connect_values(prior_date):
result_set = self.table.query_2(
last_connect__eq=hash_key,
index="AccessIndex",
)
for result in result_set:
batched.append(result["uaid"])

if len(batched) == 25:
self.delete_uaids(batched)
batched = []
yield 25

# Delete any leftovers
if batched:
self.delete_uaids(batched)
yield len(batched)

@track_provisioned
def update_message_month(self, uaid, month):
"""Update the route tables current_message_month
Expand Down
1 change: 1 addition & 0 deletions autopush/scripts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#
39 changes: 39 additions & 0 deletions autopush/scripts/drop_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import time

import click

from autopush.db import (
get_router_table,
Router,
)
from autopush.metrics import SinkMetrics


@click.command()
@click.option('--router_table_name', help="Name of the router table.")
@click.option('--months-ago', default=2, help="Months ago to remove.")
@click.option('--batch_size', default=25,
help="Deletes to run before pausing.")
@click.option('--pause_time', default=1,
help="Seconds to pause between batches.")
def drop_users(router_table_name, months_ago, batch_size, pause_time):
router_table = get_router_table(router_table_name)
router = Router(router_table, SinkMetrics())

click.echo("Deleting users with a last_connect %s months ago."
% months_ago)

count = 0
for deletes in router.drop_old_users(months_ago):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't batch_size be an argument to drop_old_users? it would work more as advertised and simplify this loop too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly wanted a way to rope in how frequently the pause happens, but wanted to still maximize the actual delete batch commands to the max supported by AWS. So that the approximate speed is controlled, a little slop is ok. It's entirely likely that there will be no pause at all specified since latency is probably going to reduce the max speed anyways.

click.echo("")
count += deletes
if count >= batch_size:
click.echo("Deleted %s user records, pausing for %s seconds."
% pause_time)
time.sleep(pause_time)
count = 0
click.echo("Finished old user purge.")


if __name__ == '__main__': # pragma: nocover
drop_users()
23 changes: 22 additions & 1 deletion autopush/tests/test_db.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
import uuid

from autopush.exceptions import AutopushException
from autopush.websocket import ms_time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW: moved this to utils.py since it's used by more than just websocket

from boto.dynamodb2.exceptions import (
ConditionalCheckFailedException,
ProvisionedThroughputExceededException,
Expand All @@ -23,7 +23,9 @@
Storage,
Message,
Router,
generate_last_connect,
)
from autopush.exceptions import AutopushException
from autopush.metrics import SinkMetrics
from autopush.utils import WebPushNotification

Expand Down Expand Up @@ -361,6 +363,25 @@ def setUp(self):
def tearDown(self):
self.real_table.connection = self.real_connection

def _create_minimal_record(self):
data = {
"uaid": str(uuid.uuid4()),
"router_type": "webupsh",
"last_connect": generate_last_connect(),
"connected_at": ms_time(),
}
return data

def test_drop_old_users(self):
# First create a bunch of users
r = get_router_table()
router = Router(r, SinkMetrics())
for _ in range(0, 53):
router.register_user(self._create_minimal_record())

results = router.drop_old_users(months_ago=0)
eq_(list(results), [25, 25, 3])

def test_custom_tablename(self):
db = DynamoDBConnection()
db_name = "router_%s" % uuid.uuid4()
Expand Down
1 change: 1 addition & 0 deletions base-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ boto3==1.4.0
botocore==1.4.50
cffi==1.7.0
characteristic==14.3.0
click==6.6
contextlib2==0.5.4
cryptography==1.5
cyclone==1.1
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
autoendpoint = autopush.main:endpoint_main
autokey = autokey:main
endpoint_diagnostic = autopush.diagnostic_cli:run_endpoint_diagnostic_cli
drop_users = autopush.scripts.drop_user:drop_users
[nose.plugins]
object-tracker = autopush.noseplugin:ObjectTracker
""",
Expand Down