diff --git a/Squest/utils/squest_model.py b/Squest/utils/squest_model.py index 04b408240..50b6a846b 100644 --- a/Squest/utils/squest_model.py +++ b/Squest/utils/squest_model.py @@ -163,3 +163,6 @@ def get_content_type(self): def get_absolute_url(self): content_type = self.get_content_type() return reverse(f"{content_type.app_label}:{content_type.model}_details", args=[self.pk]) + + def is_owner(self, user): + return False diff --git a/Squest/utils/squest_rbac.py b/Squest/utils/squest_rbac.py index 8e65fb29f..92581f2e3 100644 --- a/Squest/utils/squest_rbac.py +++ b/Squest/utils/squest_rbac.py @@ -1,6 +1,7 @@ import logging from django.contrib.auth.backends import BaseBackend from django.contrib.auth.mixins import PermissionRequiredMixin +from profiles.models import RequesterPermission from profiles.models.squest_permission import Permission from django.db.models import Q from django.utils.safestring import mark_safe @@ -79,5 +80,17 @@ def has_perm(self, user_obj, perm, obj=None): codename=codename, content_type__app_label=app_label) ).exists() + if permission_granted: + cache.set(key, permission_granted, 60) + return permission_granted + if obj: + try: + if obj.is_owner(user_obj): + requester_permission = RequesterPermission.load() + permission_granted = Permission.objects.filter(requesterpermission=requester_permission, + codename=codename, + content_type__app_label=app_label).exists() + except AttributeError: + logger.debug("is_owner method not found") cache.set(key, permission_granted, 60) return permission_granted diff --git a/profiles/apps.py b/profiles/apps.py index e1844ae20..d229707b0 100644 --- a/profiles/apps.py +++ b/profiles/apps.py @@ -67,6 +67,23 @@ def insert_default_user_permissions(sender, **kwargs): ) ) +def insert_default_requester_permissions(sender, **kwargs): + from profiles.default_rbac.default_requester_permissions import default_requester_permissions + from profiles.models import RequesterPermission + from profiles.models.squest_permission import Permission + requester_permission, created = RequesterPermission.objects.get_or_create(name="RequesterPermission") + if created: + codenames = list( + map(lambda default_permissions: default_permissions.split('.')[1], default_requester_permissions)) + app_labels = list( + map(lambda default_permissions: default_permissions.split('.')[0], default_requester_permissions)) + requester_permission.default_permissions.add( + *Permission.objects.filter( + codename__in=codenames, + content_type__app_label__in=app_labels + ) + ) + class ProfilesConfig(AppConfig): name = 'profiles' @@ -74,3 +91,4 @@ class ProfilesConfig(AppConfig): def ready(self): post_migrate.connect(create_roles, sender=self) post_migrate.connect(insert_default_user_permissions, sender=self) + post_migrate.connect(insert_default_requester_permissions, sender=self) diff --git a/profiles/default_rbac/default_requester_permissions.py b/profiles/default_rbac/default_requester_permissions.py new file mode 100644 index 000000000..694d833c1 --- /dev/null +++ b/profiles/default_rbac/default_requester_permissions.py @@ -0,0 +1,11 @@ +default_requester_permissions = [ + 'service_catalog.view_instance', + 'service_catalog.view_request', + 'service_catalog.view_support', + 'service_catalog.add_support', + 'service_catalog.view_supportmessage', + 'service_catalog.add_supportmessage', + 'service_catalog.view_requestmessage', + 'service_catalog.add_requestmessage', + 'service_catalog.request_on_instance' +] diff --git a/profiles/forms/__init__.py b/profiles/forms/__init__.py index caa089557..c1485c4c4 100644 --- a/profiles/forms/__init__.py +++ b/profiles/forms/__init__.py @@ -4,3 +4,4 @@ from .organization_forms import * from .role_forms import * from .globalpermission_forms import * +from .requesterpermission_forms import * diff --git a/profiles/forms/requesterpermission_forms.py b/profiles/forms/requesterpermission_forms.py new file mode 100644 index 000000000..dcb6f2f66 --- /dev/null +++ b/profiles/forms/requesterpermission_forms.py @@ -0,0 +1,8 @@ +from Squest.utils.squest_model_form import SquestModelForm +from profiles.models import RequesterPermission + + +class RequesterPermissionForm(SquestModelForm): + class Meta: + model = RequesterPermission + fields = ["default_permissions"] diff --git a/profiles/migrations/0020_requesterpermission.py b/profiles/migrations/0020_requesterpermission.py new file mode 100644 index 000000000..608460eb0 --- /dev/null +++ b/profiles/migrations/0020_requesterpermission.py @@ -0,0 +1,25 @@ +# Generated by Django 3.2.13 on 2023-09-26 15:39 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('profiles', '0019_alter_quota_options'), + ] + + operations = [ + migrations.CreateModel( + name='RequesterPermission', + fields=[ + ('abstractscope_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='profiles.abstractscope')), + ('default_permissions', models.ManyToManyField(blank=True, help_text='Permissions assigned to the owner of Squest objects.', limit_choices_to={'content_type__app_label__in': ['service_catalog']}, to='profiles.Permission')), + ], + options={ + 'default_permissions': ('add', 'change', 'delete', 'view', 'list'), + }, + bases=('profiles.abstractscope',), + ), + ] diff --git a/profiles/models/__init__.py b/profiles/models/__init__.py index 1fc57ee6f..d6b3d8bde 100644 --- a/profiles/models/__init__.py +++ b/profiles/models/__init__.py @@ -10,3 +10,4 @@ from profiles.models.globalpermission import GlobalPermission from profiles.models.quota import Quota from profiles.models.squest_permission import Permission +from profiles.models.requesterpermission import RequesterPermission diff --git a/profiles/models/requesterpermission.py b/profiles/models/requesterpermission.py new file mode 100644 index 000000000..073909217 --- /dev/null +++ b/profiles/models/requesterpermission.py @@ -0,0 +1,50 @@ +from django.contrib.auth.models import User +from django.core.cache import cache +from django.db.models import ManyToManyField +from django.urls import reverse + +from profiles.models import AbstractScope +from profiles.models.squest_permission import Permission + + +class RequesterPermission(AbstractScope): + class Meta: + default_permissions = ('add', 'change', 'delete', 'view', 'list') + + default_permissions = ManyToManyField( + Permission, + blank=True, + help_text="Permissions assigned to the owner of Squest objects.", + limit_choices_to={"content_type__app_label__in": ["service_catalog"]} + ) + + def __str__(self): + return self.name + + def save(self, *args, **kwargs): + super(RequesterPermission, self).save(*args, **kwargs) + self.set_cache() + + def set_cache(self): + cache.set(self.__class__.__name__, self) + + def get_absolute_url(self): + return reverse("profiles:requesterpermission_details") + + def delete(self, *args, **kwargs): + pass + + @classmethod + def load(cls): + if cache.get(cls.__name__) is None: + obj, created = RequesterPermission.objects.get_or_create(name="RequesterPermission") + if not created: + obj.set_cache() + return obj + return cache.get(cls.__name__) + + def get_potential_users(self): + return User.objects.all() + + def get_scopes(self): + return AbstractScope.objects.filter(id=self.id) diff --git a/profiles/templatetags/squest_utils.py b/profiles/templatetags/squest_utils.py index b11930eeb..c895d46e4 100644 --- a/profiles/templatetags/squest_utils.py +++ b/profiles/templatetags/squest_utils.py @@ -239,6 +239,15 @@ def generate_sidebar(user): 'active': [ "globalpermission_default_permissions", "globalpermission_edit" ] + }, + { + 'name': 'Requester permissions', + 'view_name': 'profiles:requesterpermission_details', + 'icon': 'fas fa-check', + 'permission_required': 'profiles.list_requesterpermission', + 'active': [ + "requesterpermission_details", "requesterpermission_edit" + ] } ], }, diff --git a/profiles/urls.py b/profiles/urls.py index 10add348d..6e7ab2995 100644 --- a/profiles/urls.py +++ b/profiles/urls.py @@ -45,6 +45,9 @@ # User path('user/', views.UserListView.as_view(), name='user_list'), path('user//', views.UserDetailsView.as_view(), name='user_details'), + # Global Permission + path('requester-permission/', views.RequesterPermissionDetailsView.as_view(), name="requesterpermission_details"), + path('requester-permission/edit/', views.RequesterPermissionEditView.as_view(),name="requesterpermission_edit"), # Global Permission path('global-permission/', views.GlobalPermissionRBACView.as_view(), name="globalpermission_rbac"), diff --git a/profiles/views/__init__.py b/profiles/views/__init__.py index e273296ac..eaecadaa3 100644 --- a/profiles/views/__init__.py +++ b/profiles/views/__init__.py @@ -8,5 +8,6 @@ from profiles.views.user import * from profiles.views.role import * from profiles.views.globalpermission import * +from profiles.views.requesterpermission import * from profiles.views.quota import * from profiles.views.permission import * diff --git a/profiles/views/requesterpermission.py b/profiles/views/requesterpermission.py new file mode 100644 index 000000000..1c7d5e8e5 --- /dev/null +++ b/profiles/views/requesterpermission.py @@ -0,0 +1,49 @@ +from Squest.utils.squest_views import * +from profiles.forms import RequesterPermissionForm +from profiles.models import RequesterPermission +from profiles.tables import PermissionTable + + +class RequesterPermissionEditView(SquestUpdateView): + model = RequesterPermission + form_class = RequesterPermissionForm + + def get_object(self, queryset=None): + return RequesterPermission.load() + + def get_success_url(self): + return reverse_lazy('profiles:requesterpermission_details') + + def dispatch(self, request, *args, **kwargs): + self.kwargs['pk'] = self.get_object().id + kwargs['pk'] = self.kwargs.get('pk') + return super().dispatch(request, *args, **kwargs) + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + context['breadcrumbs'] = [ + {'text': 'Requester permissions', 'url': reverse_lazy('profiles:requesterpermission_details')}, + {'text': f'Edit', 'url': ""}, + ] + return context + + +class RequesterPermissionDetailsView(SquestDetailView): + model = RequesterPermission + + def get_object(self, queryset=None): + return RequesterPermission.load() + + def dispatch(self, request, *args, **kwargs): + self.kwargs['pk'] = self.get_object().id + kwargs['pk'] = self.kwargs.get('pk') + return super().dispatch(request, *args, **kwargs) + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + context['breadcrumbs'] = None + context['title'] = "Requester Permission" + permission_table = PermissionTable(self.object.default_permissions.all()) + permission_table.exclude = ("actions",) + context['default_permissions'] = permission_table + return context diff --git a/service_catalog/models/instance.py b/service_catalog/models/instance.py index 8bf54cad5..794bd2740 100644 --- a/service_catalog/models/instance.py +++ b/service_catalog/models/instance.py @@ -52,6 +52,16 @@ class Meta: def get_q_filter(cls, user, perm): from profiles.models import Team app_label, codename = perm.split(".") + + from profiles.models import RequesterPermission + requesterpermission = RequesterPermission.load() + additional_q = Q() + if requesterpermission.default_permissions.filter( + codename=codename, + content_type__app_label=app_label + ).exists(): + additional_q = Q(requester=user) + return Q( # Quota scope ## Quota scope - Org - User @@ -77,11 +87,14 @@ def get_q_filter(cls, user, perm): org__roles__permissions__codename=codename, org__roles__permissions__content_type__app_label=app_label ) - ) + ) | additional_q def get_scopes(self): return self.quota_scope.get_scopes() + def is_owner(self, user): + return self.requester == user + def __str__(self): return f"{self.name} (#{self.id})" diff --git a/service_catalog/models/message.py b/service_catalog/models/message.py index 64953b5d0..ea55a8270 100644 --- a/service_catalog/models/message.py +++ b/service_catalog/models/message.py @@ -35,6 +35,8 @@ class RequestMessage(Message): def get_scopes(self): return self.request.get_scopes() + def is_owner(self, user): + return self.request.is_owner(user) class SupportMessage(Message): support = ForeignKey(Support, @@ -47,3 +49,6 @@ class SupportMessage(Message): def get_scopes(self): return self.support.get_scopes() + + def is_owner(self, user): + return self.support.is_owner(user) diff --git a/service_catalog/models/request.py b/service_catalog/models/request.py index e16d37a93..ded81f420 100644 --- a/service_catalog/models/request.py +++ b/service_catalog/models/request.py @@ -70,6 +70,9 @@ def get_q_filter(cls, user, perm): instance__in=Instance.get_queryset_for_user(user, perm) ) + def is_owner(self, user): + return self.instance.is_owner(user) or self.user == user + def get_scopes(self): return self.instance.get_scopes() diff --git a/service_catalog/models/support.py b/service_catalog/models/support.py index 0166f934c..6ce11ea74 100644 --- a/service_catalog/models/support.py +++ b/service_catalog/models/support.py @@ -56,5 +56,8 @@ def get_q_filter(cls, user, perm): instance__in=Instance.get_queryset_for_user(user, perm) ) + def is_owner(self, user): + return self.instance.is_owner(user) or self.opened_by == user + def get_scopes(self): return self.instance.get_scopes() diff --git a/templates/profiles/requesterpermission_detail.html b/templates/profiles/requesterpermission_detail.html new file mode 100644 index 000000000..ee9d034d2 --- /dev/null +++ b/templates/profiles/requesterpermission_detail.html @@ -0,0 +1,35 @@ +{% extends 'base.html' %} +{% block title %} + Default permissions +{% endblock %} +{% load render_table from django_tables2 %} +{% load static %} +{% block header_button %} + {% has_perm request.user "profiles.change_requesterpermission" object as can_change_requester_perm %} + {% if can_change_requester_perm %} + + + + {% endif %} +{% endblock %} + +{% block main %} +
+
+
+
+
+
+ List of permissions granted to the requester of an Instance and all related Requests and Supports +
+ {% render_table default_permissions %} +
+
+
+
+
+{% endblock %} +{% block custom_script %} + +{% endblock %} diff --git a/tests/test_profiles/test_model/test_has_perm_for_owner.py b/tests/test_profiles/test_model/test_has_perm_for_owner.py new file mode 100644 index 000000000..e6703dc32 --- /dev/null +++ b/tests/test_profiles/test_model/test_has_perm_for_owner.py @@ -0,0 +1,109 @@ +from django.contrib.auth.models import User + +from profiles.models import Organization, RequesterPermission, Role +from profiles.models.squest_permission import Permission + +from service_catalog.models import Instance, Request, Operation, Service, JobTemplate, TowerServer, Support +from tests.utils import TransactionTestUtils + + +class TestModelHasPermWithIsOwner(TransactionTestUtils): + + def setUp(self): + super(TestModelHasPermWithIsOwner, self).setUp() + self.default_quota_scope = Organization.objects.create(name="Default scope for tests") + + self.user1 = User.objects.create_user('user1', 'user1@hpe.com', "password") + self.user2 = User.objects.create_user('user2', 'user2@hpe.com', "password") + self.user3 = User.objects.create_user('user3', 'user3@hpe.com', "password") + self.superuser = User.objects.create_superuser("superuser") + + self.empty_role = Role.objects.create(name="Empty role") + + self.requester_perm = RequesterPermission.load() + self.requester_perm.default_permissions.set([]) + + survey = { + "name": "test-survey", + "description": "test-survey-description", + "spec": [] + } + + service = Service.objects.create(name="Service #1") + tower = TowerServer.objects.create(name="Tower #1", host="localhost", token="xxx") + + job_template = JobTemplate.objects.create(name="Job Template #1", survey=survey, + tower_id=1, + tower_server=tower, + ) + + self.operation = Operation.objects.create(name="Operation #1", service=service, job_template=job_template) + + def _get_action_perm(self, obj, action): + return f"{obj.__class__._meta.app_label}.{action}_{obj.__class__._meta.model_name}" + + def _get_perm(self, obj, action): + print(obj, action) + return Permission.objects.get(content_type__app_label=f"{obj.__class__._meta.app_label}", + content_type__model=f"{obj.__class__._meta.model_name}", + codename=f"{action}_{obj.__class__._meta.model_name}") + + def _assert_user_can(self, action, user, obj): + print(f"testing that {user} can {action} on {obj}") + self.assertTrue(user.has_perm(self._get_action_perm(obj, action), obj)) + + def _assert_user_cant(self, action, user, obj): + print(f"testing that {user} cannot {action} on {obj}") + self.assertFalse(user.has_perm(self._get_action_perm(obj, action), obj)) + + def __test_has_perm_generic_with_requester_perm_user_permission(self, obj, action): + + permission = self._get_perm(obj, action) + + # Only superuser can see + self._assert_user_cant(action, self.user1, obj) + self._assert_user_can(action, self.superuser, obj) + + # Add {action}_{object} to everyone + self.requester_perm.default_permissions.add(permission) + + # user1 can see it + self._assert_user_can(action, self.user1, obj) + self._assert_user_can(action, self.superuser, obj) + + # Remove view_instance to everyone + self.requester_perm.default_permissions.remove(permission) + + # Only superuser can see + self._assert_user_cant(action, self.user1, obj) + self._assert_user_can(action, self.superuser, obj) + + def test_has_perm_instance_requesterperm_when_instance_owner(self): + for action in ["add", "view", "change", "delete"]: + instance1 = Instance.objects.create(name="Instance #1", quota_scope=self.default_quota_scope, + requester=self.user1) + self.__test_has_perm_generic_with_requester_perm_user_permission(instance1, action) + + def test_has_perm_request_requesterperm_when_request_owner(self): + for action in ["add", "view", "change", "delete"]: + instance1 = Instance.objects.create(name="Instance #1", quota_scope=self.default_quota_scope) + request1 = Request.objects.create(instance=instance1, operation=self.operation, user=self.user1) + self.__test_has_perm_generic_with_requester_perm_user_permission(request1, action) + + def test_has_perm_request_requesterperm_when_instance_owner(self): + for action in ["add", "view", "change", "delete"]: + instance1 = Instance.objects.create(name="Instance #1", quota_scope=self.default_quota_scope, + requester=self.user1) + request1 = Request.objects.create(instance=instance1, operation=self.operation) + self.__test_has_perm_generic_with_requester_perm_user_permission(request1, action) + + def test_has_perm_support_requesterperm_when_support_owner(self): + for action in ["add", "view", "change", "delete"]: + instance1 = Instance.objects.create(name="Instance #1", quota_scope=self.default_quota_scope) + support1 = Support.objects.create(instance=instance1, opened_by=self.user1) + self.__test_has_perm_generic_with_requester_perm_user_permission(support1, action) + def test_has_perm_support_requesterperm_when_instance_owner(self): + for action in ["add", "view", "change", "delete"]: + instance1 = Instance.objects.create(name="Instance #1", quota_scope=self.default_quota_scope,requester=self.user1) + support1 = Support.objects.create(instance=instance1) + self.__test_has_perm_generic_with_requester_perm_user_permission(support1, action)