Skip to content

Commit

Permalink
Base manager can now be used for clean_duplicate_history
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-traeger committed Jan 26, 2023
1 parent 309609e commit 44f687e
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 3 deletions.
8 changes: 8 additions & 0 deletions docs/utils.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ from the duplicate check
$ python manage.py clean_duplicate_history --auto --excluded_fields field1 field2
You can use Django's base manager to perform the cleanup over all records,
including those that would otherwise be filtered or modified by a
custom manager, by using the ``--base-manager`` flag.

.. code-block:: bash
$ python manage.py clean_duplicate_history --auto --base-manager
clean_old_history
-----------------------

Expand Down
17 changes: 14 additions & 3 deletions simple_history/management/commands/clean_duplicate_history.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from django.db import transaction
from django.utils import timezone

from ... import models, utils
from ...exceptions import NotHistoricalModelError
from ... import utils
from . import populate_history


Expand Down Expand Up @@ -36,10 +35,19 @@ def add_arguments(self, parser):
nargs="+",
help="List of fields to be excluded from the diff_against check",
)
parser.add_argument(
"--base-manager",
action="store_true",
default=False,
help="Use Django's base manager to handle all records stored in the database,"
" including those that would otherwise be filtered or modified by a"
" custom manager.",
)

def handle(self, *args, **options):
self.verbosity = options["verbosity"]
self.excluded_fields = options.get("excluded_fields")
self.base_manager = options.get("base_manager")

to_process = set()
model_strings = options.get("models", []) or args
Expand Down Expand Up @@ -72,7 +80,10 @@ def _process(self, to_process, date_back=None, dry_run=True):
continue

# Break apart the query so we can add additional filtering
model_query = model.objects.all()
if self.base_manager:
model_query = model._base_manager.all()
else:
model_query = model._default_manager.all()

# If we're provided a stop date take the initial hit of getting the
# filtered records to iterate over
Expand Down
16 changes: 16 additions & 0 deletions simple_history/tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ class PollWithAlternativeManager(models.Model):
history = HistoricalRecords()


class CustomPollManager(models.Manager):
def get_queryset(self):
return super(CustomPollManager, self).get_queryset().exclude(hidden=True)


class PollWithCustomManager(models.Model):
some_objects = CustomPollManager()
all_objects = models.Manager()

question = models.CharField(max_length=200)
pub_date = models.DateTimeField("date published")
hidden = models.BooleanField(default=False)

history = HistoricalRecords()


class IPAddressHistoricalModel(models.Model):
ip_address = models.GenericIPAddressField()

Expand Down
50 changes: 50 additions & 0 deletions simple_history/tests/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
CustomManagerNameModel,
Place,
Poll,
PollWithCustomManager,
PollWithExcludeFields,
Restaurant,
)
Expand Down Expand Up @@ -283,6 +284,55 @@ def test_auto_cleanup(self):
)
self.assertEqual(Poll.history.all().count(), 2)

def _prepare_cleanup_manager(self):
one = PollWithCustomManager._default_manager.create(
question="This is hidden in default manager",
pub_date=datetime.now(),
hidden=True,
)
one.save()

two = PollWithCustomManager._default_manager.create(
question="This is visible in default manager", pub_date=datetime.now()
)
two.save()

self.assertEqual(PollWithCustomManager.history.count(), 4)

def test_auto_cleanup_defaultmanager(self):
self._prepare_cleanup_manager()

out = StringIO()
management.call_command(
self.command_name, auto=True, stdout=out, stderr=StringIO()
)
self.assertEqual(
out.getvalue(),
"Removed 1 historical records for "
"<class 'simple_history.tests.models.PollWithCustomManager'>\n",
)
self.assertEqual(PollWithCustomManager.history.count(), 3)

def test_auto_cleanup_basemanage(self):
self._prepare_cleanup_manager()

out = StringIO()
management.call_command(
self.command_name,
auto=True,
base_manager=True,
stdout=out,
stderr=StringIO(),
)
self.assertEqual(
out.getvalue(),
"Removed 1 historical records for "
"<class 'simple_history.tests.models.PollWithCustomManager'>\n"
"Removed 1 historical records for "
"<class 'simple_history.tests.models.PollWithCustomManager'>\n",
)
self.assertEqual(PollWithCustomManager.history.count(), 2)

def test_auto_cleanup_verbose(self):
p = Poll.objects.create(
question="Will this be deleted?", pub_date=datetime.now()
Expand Down

0 comments on commit 44f687e

Please sign in to comment.