-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow disabling object-level roles #475
Changes from all commits
7f40d36
cfa3b98
edfac8a
4d1a4c8
77cd202
0e2025b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,43 +22,59 @@ | |
logger = logging.getLogger('ansible_base.rbac.permission_registry') | ||
|
||
|
||
class ModelPermissionInfo: | ||
"""Container of RBAC registration information for a model in permission_registry""" | ||
|
||
def __init__(self, model, parent_field_name='organization', allow_object_roles=True): | ||
self.model_name = model._meta.model_name | ||
self.app_label = model._meta.app_label | ||
if parent_field_name == self.model_name: | ||
# model can not be its own parent | ||
self.parent_field_name = None | ||
else: | ||
self.parent_field_name = parent_field_name | ||
self.allow_object_roles = allow_object_roles | ||
self.model = model | ||
|
||
|
||
class PermissionRegistry: | ||
def __init__(self): | ||
self._registry = set() # model registry | ||
self._name_to_model = dict() | ||
self._parent_fields = dict() | ||
self._registry = dict() # model registry | ||
self._managed_roles = dict() # code-defined role definitions, managed=True | ||
self.apps_ready = False | ||
self._tracked_relationships = set() | ||
self._trackers = dict() | ||
|
||
def register(self, *args, parent_field_name='organization'): | ||
def register(self, *args, **kwargs): | ||
if self.apps_ready: | ||
raise RuntimeError('Cannot register model to permission_registry after apps are ready') | ||
for cls in args: | ||
if cls not in self._registry: | ||
self._registry.add(cls) | ||
model_name = cls._meta.model_name | ||
if model_name in self._name_to_model: | ||
raise RuntimeError(f'Two models registered with same name {model_name}') | ||
self._name_to_model[model_name] = cls | ||
if model_name != 'organization': | ||
self._parent_fields[model_name] = parent_field_name | ||
for model in args: | ||
if model._meta.model_name not in self._registry: | ||
info = ModelPermissionInfo(model, **kwargs) | ||
self._registry[info.model_name] = info | ||
elif self._registry[model._meta.model_name] is model: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will account for the app_name because we are comparing the whole model right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will yell at you if you register 2 models with the same name, even if they're in different apps. Proxy models are fine, but don't register the original... and also register the proxy. |
||
logger.debug(f'Model {model._meta.model_name} registered to permission registry more than once') | ||
else: | ||
logger.debug(f'Model {cls._meta.model_name} registered to permission registry more than once') | ||
raise RuntimeError(f'Two models registered with same name {model._meta.model_name}') | ||
|
||
def get_info(self, obj: Union[ModelBase, Model]) -> ModelPermissionInfo: | ||
return self._registry[obj._meta.model_name] | ||
|
||
def track_relationship(self, cls, relationship, role_name): | ||
self._tracked_relationships.add((cls, relationship, role_name)) | ||
|
||
def get_parent_model(self, model) -> Optional[type]: | ||
model = self._name_to_model[model._meta.model_name] | ||
parent_field_name = self.get_parent_fd_name(model) | ||
if parent_field_name is None: | ||
info = self._registry[model._meta.model_name] | ||
if info.parent_field_name is None: | ||
return None | ||
return model._meta.get_field(parent_field_name).related_model | ||
return model._meta.get_field(info.parent_field_name).related_model | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to register a model with a parent but not the parent? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, very much so, Even more interestingly, the |
||
|
||
def get_parent_fd_name(self, model) -> Optional[str]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be really nice to rename this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not needed to merge this PR if its too much. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used that method name once, and only in eda-server tests, so it shouldn't be difficult to rename. But it is a cross-repo change. |
||
return self._parent_fields.get(model._meta.model_name) | ||
model_name = model._meta.model_name | ||
if model_name not in self._registry: | ||
return None | ||
info = self._registry[model_name] | ||
return info.parent_field_name | ||
|
||
def get_child_models(self, parent_model, seen=None) -> list[tuple[str, Type[Model]]]: | ||
"""Returns child models and the filter relationship to the parent | ||
|
@@ -72,19 +88,18 @@ def get_child_models(self, parent_model, seen=None) -> list[tuple[str, Type[Mode | |
seen = set() | ||
child_filters = [] | ||
parent_model_name = parent_model._meta.model_name | ||
for model_name, parent_field_name in self._parent_fields.items(): | ||
if parent_field_name is None: | ||
for model_name, info in self._registry.items(): | ||
if info.parent_field_name is None: | ||
continue | ||
child_model = self._name_to_model[model_name] | ||
this_parent_name = child_model._meta.get_field(parent_field_name).related_model._meta.model_name | ||
this_parent_name = info.model._meta.get_field(info.parent_field_name).related_model._meta.model_name | ||
if this_parent_name == parent_model_name: | ||
if model_name in seen: | ||
continue | ||
seen.add(model_name) | ||
|
||
child_filters.append((parent_field_name, child_model)) | ||
for next_parent_filter, grandchild_model in self.get_child_models(child_model, seen=seen): | ||
child_filters.append((f'{next_parent_filter}__{parent_field_name}', grandchild_model)) | ||
child_filters.append((info.parent_field_name, info.model)) | ||
for next_parent_filter, grandchild_model in self.get_child_models(info.model, seen=seen): | ||
child_filters.append((f'{next_parent_filter}__{info.parent_field_name}', grandchild_model)) | ||
return child_filters | ||
|
||
def get_resource_prefix(self, cls: Type[Model]) -> str: | ||
|
@@ -150,8 +165,8 @@ def call_when_apps_ready(self, apps, app_config): | |
self.apps = apps | ||
self.apps_ready = True | ||
|
||
if self.team_model not in self._registry: | ||
self._registry.add(self.team_model) | ||
if self.team_model._meta.model_name not in self._registry: | ||
self.register(self.team_model) | ||
|
||
# Do no specify sender for create_dab_permissions, because that is passed as app_config | ||
# and we want to create permissions for external apps, not the dab_rbac app | ||
|
@@ -169,9 +184,9 @@ def call_when_apps_ready(self, apps, app_config): | |
self.user_model.add_to_class('singleton_permissions', bound_singleton_permissions) | ||
post_delete.connect(triggers.rbac_post_user_delete, sender=self.user_model, dispatch_uid='permission-registry-user-delete') | ||
|
||
for cls in self._registry: | ||
triggers.connect_rbac_signals(cls) | ||
connect_rbac_methods(cls) | ||
for cls in self._registry.values(): | ||
triggers.connect_rbac_signals(cls.model) | ||
connect_rbac_methods(cls.model) | ||
|
||
for cls, relationship, role_name in self._tracked_relationships: | ||
if role_name in self._trackers: | ||
|
@@ -221,12 +236,19 @@ def team_permission(self): | |
return f'member_{self.team_model._meta.model_name}' | ||
|
||
@property | ||
def all_registered_models(self): | ||
return list(self._registry) | ||
def all_registered_models(self) -> list[Type[Model]]: | ||
return [info.model for info in self._registry.values()] | ||
|
||
def is_registered(self, obj: Union[ModelBase, Model]) -> bool: | ||
"""Tells if the given object or class is a type tracked by DAB RBAC""" | ||
return any(obj._meta.model_name == cls._meta.model_name for cls in self._registry) | ||
return bool(obj._meta.model_name in self._registry) | ||
|
||
def object_roles_enabled(self, obj: Optional[Union[ModelBase, Model]]) -> bool: | ||
"""For given model tells if object roles are enabled on that model""" | ||
if not obj: | ||
return True # can create system roles, although weird call pattern | ||
info = self.get_info(obj) | ||
return bool(info.allow_object_roles) | ||
|
||
|
||
permission_registry = PermissionRegistry() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,7 @@ def permissions_allowed_for_system_role() -> dict[type, list[str]]: | |
return permissions_by_model | ||
|
||
|
||
def permissions_allowed_for_role(cls) -> dict[type, list[str]]: | ||
def permissions_allowed_for_role(cls: Union[Model, Type[Model], None]) -> dict[type, list[str]]: | ||
"Permission codenames valid for a RoleDefinition of given class, organized by permission class" | ||
if cls is None: | ||
return permissions_allowed_for_system_role() | ||
|
@@ -100,6 +100,11 @@ def validate_permissions_for_model(permissions, content_type: Optional[Model], m | |
role_model = content_type.model_class() | ||
permissions_by_model = permissions_allowed_for_role(role_model) | ||
|
||
if not managed: | ||
if not permission_registry.object_roles_enabled(role_model): | ||
print_model = role_model._meta.verbose_name if role_model else 'global roles' | ||
raise ValidationError({'content_type': f'Creating roles for the {print_model} model is disabled'}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the right termonology in the exception? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the flow of this isn't good. The method |
||
|
||
invalid_codenames = codename_list - combine_values(permissions_by_model) | ||
if invalid_codenames: | ||
print_codenames = ', '.join(f'"{codename}"' for codename in invalid_codenames) | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we warn or info if we hit this continue?