Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Development #69

Merged
merged 9 commits into from
Oct 4, 2023
1 change: 1 addition & 0 deletions src/unipoll_api/actions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from . import account as AccountActions # noqa: F401
from . import group as GroupActions # noqa: F401
from . import policy as PolicyActions # noqa: F401
from . import poll as PollActions # noqa: F401
from . import authentication as AuthActions # noqa: F401
from . import workspace as WorkspaceActions # noqa: F401
Expand Down
76 changes: 35 additions & 41 deletions src/unipoll_api/actions/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,11 @@

# Get group
async def get_group(group: Group, include_members: bool = False, include_policies: bool = False) -> GroupSchemas.Group:
members = None
policies = None

account = AccountManager.active_user.get()
# Get the permissions(allowed actions) of the current user
permissions = await Permissions.get_all_permissions(group, account)
# Fetch the resources if the user has the required permissions
if include_members:
req_permissions = Permissions.GroupPermissions["get_group_members"] # type: ignore
if Permissions.check_permission(permissions, req_permissions):
members = (await get_group_members(group)).members
if include_policies:
req_permissions = Permissions.GroupPermissions["get_group_policies"] # type: ignore
if Permissions.check_permission(permissions, req_permissions):
policies = (await get_group_policies(group)).policies

members = (await get_group_members(group)).members if include_members else None
policies = (await get_group_policies(group)).policies if include_policies else None
workspace = WorkspaceSchemas.Workspace(**group.workspace.dict(exclude={"members", # type: ignore
"policies",
"groups"}))

# Return the workspace with the fetched resources
return GroupSchemas.Group(id=group.id,
name=group.name,
Expand Down Expand Up @@ -97,10 +82,15 @@ async def delete_group(group: Group):
async def get_group_members(group: Group) -> MemberSchemas.MemberList:
member_list = []
member: Account
for member in group.members: # type: ignore
member_data = member.dict(include={'id', 'first_name', 'last_name', 'email'})
member_scheme = MemberSchemas.Member(**member_data)
member_list.append(member_scheme)

account = AccountManager.active_user.get()
permissions = await Permissions.get_all_permissions(group, account)
req_permissions = Permissions.GroupPermissions["get_group_members"] # type: ignore
if Permissions.check_permission(permissions, req_permissions):
for member in group.members: # type: ignore
member_data = member.dict(include={'id', 'first_name', 'last_name', 'email'})
member_scheme = MemberSchemas.Member(**member_data)
member_list.append(member_scheme)
# Return the list of members
return MemberSchemas.MemberList(members=member_list)

Expand All @@ -114,7 +104,7 @@ async def add_group_members(group: Group, member_data: MemberSchemas.AddMembers)
account_list = await Account.find(In(Account.id, accounts)).to_list()
# Add the accounts to the group member list with default permissions
for account in account_list:
await group.add_member(group.workspace, account, Permissions.GROUP_BASIC_PERMISSIONS)
await group.add_member(account, Permissions.GROUP_BASIC_PERMISSIONS)
await Group.save(group)
# Return the list of members added to the group
return MemberSchemas.MemberList(members=[MemberSchemas.Member(**account.dict()) for account in account_list])
Expand Down Expand Up @@ -146,25 +136,29 @@ async def remove_group_member(group: Group, account_id: ResourceID | None):
async def get_group_policies(group: Group) -> PolicySchemas.PolicyList:
policy_list = []
policy: Policy
for policy in group.policies: # type: ignore
permissions = Permissions.GroupPermissions(policy.permissions).name.split('|') # type: ignore
# Get the policy_holder
if policy.policy_holder_type == 'account':
policy_holder = await Account.get(policy.policy_holder.ref.id)
elif policy.policy_holder_type == 'group':
policy_holder = await Group.get(policy.policy_holder.ref.id)
else:
raise ResourceExceptions.InternalServerError("Invalid policy_holder_type")
if not policy_holder:
# TODO: Replace with custom exception
raise ResourceExceptions.InternalServerError("get_group_policies() => Policy holder not found")
# Convert the policy_holder to a Member schema
policy_holder = MemberSchemas.Member(**policy_holder.dict()) # type: ignore
policy_list.append(PolicySchemas.PolicyShort(id=policy.id,
policy_holder_type=policy.policy_holder_type,
# Exclude unset fields(i.e. "description" for Account)
policy_holder=policy_holder.dict(exclude_unset=True),
permissions=permissions))
account = AccountManager.active_user.get()
permissions = await Permissions.get_all_permissions(group, account)
req_permissions = Permissions.GroupPermissions["get_group_policies"] # type: ignore
if Permissions.check_permission(permissions, req_permissions):
for policy in group.policies: # type: ignore
permissions = Permissions.GroupPermissions(policy.permissions).name.split('|') # type: ignore
# Get the policy_holder
if policy.policy_holder_type == 'account':
policy_holder = await Account.get(policy.policy_holder.ref.id)
elif policy.policy_holder_type == 'group':
policy_holder = await Group.get(policy.policy_holder.ref.id)
else:
raise ResourceExceptions.InternalServerError("Invalid policy_holder_type")
if not policy_holder:
# TODO: Replace with custom exception
raise ResourceExceptions.InternalServerError("get_group_policies() => Policy holder not found")
# Convert the policy_holder to a Member schema
policy_holder = MemberSchemas.Member(**policy_holder.dict()) # type: ignore
policy_list.append(PolicySchemas.PolicyShort(id=policy.id,
policy_holder_type=policy.policy_holder_type,
# Exclude unset fields(i.e. "description" for Account)
policy_holder=policy_holder.dict(exclude_unset=True),
permissions=permissions))
return PolicySchemas.PolicyList(policies=policy_list)


Expand Down
79 changes: 79 additions & 0 deletions src/unipoll_api/actions/policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from unipoll_api import AccountManager
from unipoll_api.documents import Account, Workspace, Group, Policy, Resource
from unipoll_api.schemas import MemberSchemas, PolicySchemas
from unipoll_api.exceptions import PolicyExceptions
from unipoll_api.utils import Permissions


# Get all policies of a workspace
async def get_policies(policy_holder: Account | Group | None = None,
resource: Resource | None = None) -> PolicySchemas.PolicyList:
policy_list = []
policy: Policy

account: Account = AccountManager.active_user.get()
all_policies = []

# Helper function to get policies from a resource
async def get_policies_from_resource(resource: Resource) -> list[Policy]:
req_permissions: Permissions.Permissions | None = None
if resource.resource_type == "workspace":
req_permissions = Permissions.WorkspacePermissions["get_workspace_policies"]
elif resource.resource_type == "group":
req_permissions = Permissions.GroupPermissions["get_group_policies"]
if req_permissions:
permissions = await Permissions.get_all_permissions(resource, account)
if Permissions.check_permission(permissions, req_permissions):
return resource.policies # type: ignore
return []

# Get policies from a specific resource
if resource:
all_policies = await get_policies_from_resource(resource)
# Get policies from all resources
else:
all_workspaces = Workspace.find(fetch_links=True)
all_groups = Group.find(fetch_links=True)
all_resources = await all_workspaces.to_list() + await all_groups.to_list()

for resource in all_resources:
all_policies += await get_policies_from_resource(resource)
# Build policy list
for policy in all_policies:
# Filter by policy_holder if specified
if policy_holder:
if (policy.policy_holder.ref.id != policy_holder.id):
continue
policy_list.append(await get_policy(policy))
# Return policy list
return PolicySchemas.PolicyList(policies=policy_list)


async def get_policy(policy: Policy) -> PolicySchemas.PolicyShort:

# Convert policy_holder link to Member object
ph_type = policy.policy_holder_type
ph_ref = policy.policy_holder.ref.id
policy_holder = await Account.get(ph_ref) if ph_type == "account" else await Group.get(ph_ref)

if not policy_holder:
raise PolicyExceptions.PolicyHolderNotFound(ph_ref)

policy_holder = MemberSchemas.Member(**policy_holder.dict()) # type: ignore
permissions = Permissions.WorkspacePermissions(policy.permissions).name.split('|') # type: ignore
return PolicySchemas.PolicyShort(id=policy.id,
policy_holder_type=policy.policy_holder_type,
policy_holder=policy_holder.dict(exclude_unset=True),
permissions=permissions)

# if not account and account_id:
# raise AccountExceptions.AccountNotFound(account_id)

# # Check if account is a member of the workspace
# if account.id not in [member.id for member in workspace.members]: # type: ignore
# raise WorkspaceExceptions.UserNotMember(workspace, account)

# user_permissions = await Permissions.get_all_permissions(workspace, account)
# return PolicySchemas.PolicyOutput(
# permissions=Permissions.WorkspacePermissions(user_permissions).name.split('|'), # type: ignore
# policy_holder=MemberSchemas.Member(**account.dict()))
29 changes: 27 additions & 2 deletions src/unipoll_api/actions/poll.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,35 @@
from unipoll_api import AccountManager
from unipoll_api.documents import Poll, Policy, Group, Account
from unipoll_api.documents import Poll, Policy, Group, Account, Workspace
from unipoll_api.schemas import PollSchemas, QuestionSchemas, PolicySchemas, MemberSchemas, WorkspaceSchemas
from unipoll_api.utils import Permissions
from unipoll_api.exceptions import ResourceExceptions


async def get_polls(workspace: Workspace | None = None) -> PollSchemas.PollList:
account = AccountManager.active_user.get()
req_permissions = Permissions.WorkspacePermissions["get_polls"] # type: ignore
all_workspaces = [workspace] if workspace else await Workspace.find(fetch_links=True).to_list()

poll_list = []
for workspace in all_workspaces:
permissions = await Permissions.get_all_permissions(workspace, account)
if Permissions.check_permission(permissions, req_permissions):
poll_list += workspace.polls # type: ignore
else:
req_permissions = Permissions.WorkspacePermissions["get_poll"]
for poll in workspace.polls: # type: ignore
if poll.public: # type: ignore
poll_list.append(poll)
else:
permissions = await Permissions.get_all_permissions(poll, account)
if Permissions.check_permission(permissions, req_permissions):
poll_list.append(poll)
# Build poll list and return the result
for poll in poll_list:
poll_list.append(PollSchemas.PollShort(**poll.dict())) # type: ignore
return PollSchemas.PollList(polls=poll_list)


async def get_poll(poll: Poll,
include_questions: bool = False,
include_policies: bool = False) -> PollSchemas.PollResponse:
Expand All @@ -16,7 +41,7 @@ async def get_poll(poll: Poll,
# Fetch the resources if the user has the required permissions
if include_questions:
req_permissions = Permissions.PollPermissions["get_poll_questions"] # type: ignore
if Permissions.check_permission(permissions, req_permissions) or poll.public:
if poll.public or Permissions.check_permission(permissions, req_permissions):
questions = (await get_poll_questions(poll)).questions
if include_policies:
req_permissions = Permissions.PollPermissions["get_poll_policies"] # type: ignore
Expand Down
Loading