-
Notifications
You must be signed in to change notification settings - Fork 1
/
security_manager.py
126 lines (98 loc) · 3.62 KB
/
security_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from typing import Optional, Union
from contextvars import ContextVar
from security.enforcer import PolicyEnforcer
from security.permissions import AuthzedAction, Permission
from resources import Resource
from security.role_manager import RoleManager
def require_permissions(actions: Optional[list[AuthzedAction]] = [AuthzedAction.ALL]):
"""
A decorator to define the actions that are executed from within the current class method and that must be protected
against unauthorized access.
The first parameter of the protected method must be `self`
"""
def require_permissions_decorator(func):
def permission_checker(*args, **kwargs):
print(f"permission_checker for {args}, {kwargs}")
resource = args[0]
if issubclass(Resource, type(resource)):
raise NotImplementedError(
f"First argument must be a Resource not {type(resource)}"
)
sm = _get_security_manager()
if sm is None:
return True
sm.assert_permissions(
resource=resource,
actions=actions,
)
print(
f"User {sm.current_user} can invoke {actions} on {resource.get_name()}:{resource.get_type()} "
)
result = func(*args, **kwargs)
return result
return permission_checker
return require_permissions_decorator
class SecurityManager:
"""
The security manager holds references to the security components (role manager, policy enforces) and the configured permissions.
It is accessed and defined using the global functions :func:`_get_security_manager` and :func:`_set_security_manager`
"""
def __init__(
self,
role_manager: RoleManager,
policy_enforcer: PolicyEnforcer,
permissions: list[Permission] = [],
):
self._role_manager: RoleManager = role_manager
self._policy_enforcer: PolicyEnforcer = policy_enforcer
self._permissions: list[Permission] = permissions
self._current_user: ContextVar[Optional[str]] = ContextVar(
"current_user", default=None
)
def set_current_user(self, user: str):
self._current_user.set(user)
@property
def role_manager(self) -> RoleManager:
return self._role_manager
@property
def policy_enforcer(self):
return self._policy_enforcer
@property
def current_user(self) -> str:
return self._current_user.get()
@property
def permissions(self) -> list[Permission]:
return self._permissions
def assert_permissions(
self,
resource: Resource,
actions: Union[AuthzedAction, list[AuthzedAction]],
):
_actions = actions
if isinstance(actions, AuthzedAction):
_actions = [actions]
result, explain = self._policy_enforcer.enforce_policy(
role_manager=self._role_manager,
permissions=self._permissions,
user=self.current_user,
actions=_actions,
resource=resource,
)
if not result:
raise PermissionError(explain)
class DefaultSecurityManager(SecurityManager):
def __init__(self):
super().__init__(role_manager=None, permissions=[], policy_enforcer=None)
def assert_permissions(
self,
resource: Resource,
actions: Union[AuthzedAction, list[AuthzedAction]],
):
return True
_sm: SecurityManager = None
def _get_security_manager():
global _sm
return _sm
def _set_security_manager(sc: SecurityManager):
global _sm
_sm = sc