Skip to content

Commit

Permalink
Added /prune/ endpoint to removing "old" RPMs from a Repository.
Browse files Browse the repository at this point in the history
closes pulp#2909.
  • Loading branch information
ggainey committed May 14, 2024
1 parent 39b3db0 commit edf4989
Show file tree
Hide file tree
Showing 13 changed files with 606 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES/2909.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added /rpm/prune command to allow "pruning" old Packages from repositories.
1 change: 1 addition & 0 deletions pulp_rpm/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
ModulemdObsoleteSerializer,
)
from .package import PackageSerializer, MinimalPackageSerializer # noqa
from .prune import PruneNEVRAsSerializer # noqa
from .repository import ( # noqa
CopySerializer,
RpmDistributionSerializer,
Expand Down
81 changes: 81 additions & 0 deletions pulp_rpm/app/serializers/prune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from gettext import gettext as _

from rest_framework import fields, serializers

from pulp_rpm.app.models import RpmRepository

from pulpcore.plugin.serializers import ValidateFieldsMixin
from pulpcore.plugin.util import get_domain


class PruneNEVRAsSerializer(serializers.Serializer, ValidateFieldsMixin):
"""
Serializer for prune-old-NEVRAs operation.
"""

repo_hrefs = fields.ListField(
required=True,
help_text=_(
"Will prune old RPMs from the specified list of repos. Use ['*'] to specify all repos."
),
child=serializers.CharField(),
)

keep_days = serializers.IntegerField(
help_text=_(
"Prune NEVRAs introduced *prior-to* this many days ago. "
"Default is 14. A value of 0 implies 'keep latest NEVRA only.'"
),
required=False,
min_value=0,
default=14,
)

repo_concurrency = serializers.IntegerField(
help_text=(
"Number of concurrent workers to use to do the pruning. "
"If not set then the default value will be used."
),
allow_null=True,
required=False,
min_value=1,
default=10,
)

dry_run = serializers.BooleanField(
help_text=_(
"Determine what would-be-pruned and log the list of NEVRAs. "
"Intended as a debugging aid."
),
default=False,
required=False,
)

def validate_repo_hrefs(self, value):
"""
Check that repo_hrefs is not an empty list and contains either valid hrefs or "*".
Args:
value (list): The list supplied by the user
Returns:
The list of RpmRepositories after validation
Raises:
ValidationError: If the list is empty or contains invalid hrefs.
"""
if len(value) == 0:
raise serializers.ValidationError("Must not be [].")

# prune-all-repos is "*" - find all RPM repos in this domain
if "*" in value:
if len(value) != 1:
raise serializers.ValidationError("Can't specify specific HREFs when using '*'")
return RpmRepository.objects.filter(pulp_domain=get_domain())

from pulpcore.plugin.viewsets import NamedModelViewSet

# We're pruning a specific list of RPM repositories.
# Validate that they are for RpmRepositories.
hrefs_to_return = []
for href in value:
hrefs_to_return.append(NamedModelViewSet.get_resource(href, RpmRepository))

return hrefs_to_return
1 change: 1 addition & 0 deletions pulp_rpm/app/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .synchronizing import synchronize # noqa
from .copy import copy_content # noqa
from .comps import upload_comps # noqa
from .prune import prune_nevras # noqa
156 changes: 156 additions & 0 deletions pulp_rpm/app/tasks/prune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from datetime import datetime, timedelta
from logging import getLogger, DEBUG

from django.db.models import F, Subquery
from django.utils import timezone

from pulpcore.plugin.models import ProgressReport
from pulpcore.plugin.constants import TASK_STATES
from pulpcore.plugin.models import (
GroupProgressReport,
RepositoryContent,
TaskGroup,
)
from pulpcore.plugin.tasking import dispatch
from pulp_rpm.app.models.package import Package
from pulp_rpm.app.models.repository import RpmRepository

log = getLogger(__name__)


def prune_repo_nevras(repo_pk, keep_days, dry_run):
"""
This task prunes old NEVRAs from the latest_version of the specified repository.
Args:
repo_pk (UUID): UUID of the RpmRepository to be pruned.
keep_days(int): Keep RepositoryContent created less than this many days ago.
dry_run (boolean): If True, don't actually do the prune, just log to-be-pruned NEVRAs.
"""
repo = RpmRepository.objects.filter(pk=repo_pk).get()
curr_vers = repo.latest_version()
eldest_datetime = datetime.now(tz=timezone.utc) - timedelta(days=keep_days)
log.info(f"PRUNING REPOSITORY {repo.name}.")
log.debug(f">>> TOTAL RPMS: {curr_vers.get_content(Package.objects).count()}")

# We only care about RPM-Names that have more than one EVRA - "singles" are always kept.
rpm_by_name_age = (
curr_vers.get_content(Package.objects.with_age())
.filter(age__gt=1)
.order_by("name", "epoch", "version", "release", "arch")
.values("pk")
)
log.debug(f">>> NAME/AGE COUNT {rpm_by_name_age.count()}")
log.debug(
">>> # NAME/ARCH w/ MULTIPLE EVRs: {}".format(
curr_vers.get_content(Package.objects)
.filter(pk__in=rpm_by_name_age)
.values("name", "arch")
.distinct()
.count()
)
)
log.debug(
">>> # UNIQUE NAME/ARCHS: {}".format(
curr_vers.get_content(Package.objects).values("name", "arch").distinct().count()
)
)

# Find the RepositoryContents associated with the multi-EVR-names from above,
# whose maximum-pulp-created date is LESS THAN eldest_datetime.
#
# Note that we can "assume" the latest-date is an "add" with no "remove", since we're
# limiting ourselves to the list of ids that we know are in the repo's current latest-version!
target_ids_q = (
RepositoryContent.objects.filter(
content__in=Subquery(rpm_by_name_age), repository=repo, version_removed=None
)
.filter(pulp_created__lt=eldest_datetime)
.values("content_id")
)
log.debug(f">>> TARGET IDS: {target_ids_q.count()}.")
to_be_removed = target_ids_q.count()
# Use the progressreport to report back numbers. The prune happens as one
# action.
data = dict(
message=f"Pruning {repo.name}",
code="rpm.nevra.prune.repository",
total=to_be_removed,
state=TASK_STATES.COMPLETED,
done=0,
)

if dry_run:
if log.getEffectiveLevel() == DEBUG: # Don't go through the loop unless debugging
log.debug(">>> Packages to be removed : ")
for p in (
Package.objects.filter(pk__in=target_ids_q)
.order_by("name", "epoch", "version", "release", "arch")
.values("name", "epoch", "version", "release", "arch")
):
log.debug(f'{p["name"]}-{p["epoch"]}:{p["version"]}-{p["release"]}.{p["arch"]}')
else:
with repo.new_version(base_version=None) as new_version:
new_version.remove_content(target_ids_q)
data["done"] = to_be_removed

pb = ProgressReport(**data)
pb.save()

# Report back that this repo has completed.
gpr = TaskGroup.current().group_progress_reports.filter(code="rpm.nevra.prune")
gpr.update(done=F("done") + 1)


def prune_nevras(
repo_pks,
keep_days=14,
repo_concurrency=10,
dry_run=False,
):
"""
This task prunes old NEVRAs from the latest_version of the specified list of repos.
"Old" in this context is defined by the RepositoryContent record that added a NEVRA
to the repository in question.
It will issue one task-per-repository.
Kwargs:
repo_pks (list): A list of repo pks the disk reclaim space is performed on.
keep_days(int): Keep RepositoryContent created less than this many days ago.
repo_concurrency (int): number of repos to prune at a time.
dry_run (boolean): If True, don't actually do the prune, just record to-be-pruned NEVRAs.
"""

repos_to_prune = RpmRepository.objects.filter(pk__in=repo_pks)
task_group = TaskGroup.current()

gpr = GroupProgressReport(
message="Pruning old NEVRAs",
code="rpm.nevra.prune",
total=len(repo_pks),
done=0,
task_group=task_group,
)
gpr.save()

# Dispatch a task-per-repository.
# Lock on the the repository *and* to insure the max-concurrency specified.
# This will keep an "all repositories" prune from locking up all the workers
# until all repositories are completed.
for index, a_repo in enumerate(repos_to_prune):
worker_rsrc = f"rpm-prune-worker-{index % repo_concurrency}"
exclusive_resources = [worker_rsrc, a_repo]

dispatch(
prune_repo_nevras,
exclusive_resources=exclusive_resources,
args=(
a_repo.pk,
keep_days,
dry_run,
),
task_group=task_group,
)
task_group.finish()
3 changes: 2 additions & 1 deletion pulp_rpm/app/urls.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from django.conf import settings
from django.urls import path

from .viewsets import CopyViewSet, CompsXmlViewSet
from .viewsets import CopyViewSet, CompsXmlViewSet, PruneNEVRAsViewSet

if settings.DOMAIN_ENABLED:
V3_API_ROOT = settings.V3_DOMAIN_API_ROOT_NO_FRONT_SLASH
Expand All @@ -11,4 +11,5 @@
urlpatterns = [
path(f"{V3_API_ROOT}rpm/copy/", CopyViewSet.as_view({"post": "create"})),
path(f"{V3_API_ROOT}rpm/comps/", CompsXmlViewSet.as_view({"post": "create"})),
path(f"{V3_API_ROOT}rpm/prune/", PruneNEVRAsViewSet.as_view({"post": "prune_nevras"})),
]
1 change: 1 addition & 0 deletions pulp_rpm/app/viewsets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .distribution import DistributionTreeViewSet # noqa
from .modulemd import ModulemdViewSet, ModulemdDefaultsViewSet, ModulemdObsoleteViewSet # noqa
from .package import PackageViewSet # noqa
from .prune import PruneNEVRAsViewSet # noqa
from .repository import ( # noqa
RpmRepositoryViewSet,
RpmRepositoryVersionViewSet,
Expand Down
72 changes: 72 additions & 0 deletions pulp_rpm/app/viewsets/prune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from drf_spectacular.utils import extend_schema
from django.conf import settings
from rest_framework.viewsets import ViewSet

from pulpcore.plugin.viewsets import TaskGroupOperationResponse
from pulpcore.plugin.models import TaskGroup
from pulpcore.plugin.serializers import TaskGroupOperationResponseSerializer
from pulp_rpm.app.serializers import PruneNEVRAsSerializer
from pulp_rpm.app.tasks import prune_nevras
from pulpcore.plugin.tasking import dispatch


class PruneNEVRAsViewSet(ViewSet):
"""
Viewset for prune-old-NEVRAs endpoint.
"""

serializer_class = PruneNEVRAsSerializer

DEFAULT_ACCESS_POLICY = {
"statements": [
{
"action": ["prune_nevras"],
"principal": "authenticated",
"effect": "allow",
"condition": [
"has_model_or_domain_or_obj_perms:rpm.prune_rpmrepository",
],
},
],
}

@extend_schema(
description="Trigger an asynchronous old-NEVRA-prune operation.",
responses={202: TaskGroupOperationResponseSerializer},
)
def prune_nevras(self, request):
"""
Triggers an asynchronous old-NEVRA-purge operation.
This returns a task-group that contains a "master" task that dispatches one task
per repo being pruned. This allows repositories to become available for other
processing as soon as their task completes, rather than having to wait for *all*
repositories to be pruned.
"""
serializer = PruneNEVRAsSerializer(data=request.data)
serializer.is_valid(raise_exception=True)

repos = serializer.validated_data.get("repo_hrefs", [])
repos_to_prune_pks = []
for repo in repos:
repos_to_prune_pks.append(repo.pk)

uri = "/api/v3/rpm/prune/"
if settings.DOMAIN_ENABLED:
uri = f"/{request.pulp_domain.name}{uri}"
exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:rpm:prune"]

task_group = TaskGroup.objects.create(description="Prune old NEVRAs.")

dispatch(
prune_nevras,
exclusive_resources=exclusive_resources,
task_group=task_group,
kwargs={
"repo_pks": repos_to_prune_pks,
"keep_days": serializer.validated_data["keep_days"],
"repo_concurrency": serializer.validated_data["repo_concurrency"],
"dry_run": serializer.validated_data["dry_run"],
},
)
return TaskGroupOperationResponse(task_group, request)
2 changes: 2 additions & 0 deletions pulp_rpm/app/viewsets/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class RpmRepositoryViewSet(RepositoryViewSet, ModifyRepositoryActionMixin, Roles
"rpm.delete_rpmrepository_version",
"rpm.manage_roles_rpmrepository",
"rpm.modify_content_rpmrepository",
"rpm.prune_rpmrepository",
"rpm.repair_rpmrepository",
"rpm.sync_rpmrepository",
"rpm.view_rpmrepository",
Expand Down Expand Up @@ -174,6 +175,7 @@ class RpmRepositoryViewSet(RepositoryViewSet, ModifyRepositoryActionMixin, Roles
"rpm.manage_roles_rpmrepository",
"rpm.manage_roles_ulnremote",
"rpm.modify_content_rpmrepository",
"rpm.prune_rpmrepository",
"rpm.refresh_rpmalternatecontentsource",
"rpm.repair_rpmrepository",
"rpm.sync_rpmrepository",
Expand Down
7 changes: 7 additions & 0 deletions pulp_rpm/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
RemotesRpmApi,
RepositoriesRpmApi,
RepositoriesRpmVersionsApi,
RpmPruneApi,
RpmRepositorySyncURL,
)

Expand All @@ -36,6 +37,12 @@ def rpm_distribution_api(rpm_client):
return DistributionsRpmApi(rpm_client)


@pytest.fixture(scope="session")
def rpm_prune_api(rpm_client):
"""Fixture for RPM Prune API."""
return RpmPruneApi(rpm_client)


@pytest.fixture(scope="session")
def rpm_client(bindings_cfg):
"""Fixture for RPM client."""
Expand Down
Loading

0 comments on commit edf4989

Please sign in to comment.