Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add fine-grained permission system using scopes #2087

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions backend/capellacollab/core/authentication/basic_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import fastapi
from fastapi import security
from sqlalchemy import orm

from capellacollab.core import database
from capellacollab.users import crud as user_crud
from capellacollab.users import models as users_models
from capellacollab.users.tokens import crud as token_crud
from capellacollab.users.tokens import models as tokens_models

from . import exceptions

Expand All @@ -20,29 +22,31 @@ class HTTPBasicAuth(security.HTTPBasic):
def __init__(self):
super().__init__(auto_error=True)

async def __call__(self, request: fastapi.Request) -> str: # type: ignore
async def validate(
self, db: orm.Session, request: fastapi.Request
) -> tuple[users_models.DatabaseUser, tokens_models.DatabaseUserToken]:
credentials: (
security.HTTPBasicCredentials | None
) = await super().__call__(request)
if not credentials:
raise exceptions.UnauthenticatedError()
with database.SessionLocal() as session:
user = user_crud.get_user_by_name(session, credentials.username)
db_token = (
token_crud.get_token_by_token_and_user(
session, credentials.password, user.id
)
if user
else None

user = user_crud.get_user_by_name(db, credentials.username)
if not user:
logger.info(
"User with username '%s' not found.", credentials.username
)
if not db_token:
logger.info("Token invalid for user %s", credentials.username)
raise exceptions.InvalidPersonalAccessTokenError()
raise exceptions.InvalidPersonalAccessTokenError()
db_token = token_crud.get_token_by_token_and_user(
db, credentials.password, user.id
)

if not db_token:
logger.info("Token invalid for user %s", credentials.username)
raise exceptions.InvalidPersonalAccessTokenError()

if db_token.expiration_date < datetime.date.today():
logger.info("Token expired for user %s", credentials.username)
raise exceptions.PersonalAccessTokenExpired()
return self.get_username(credentials)
if db_token.expiration_date < datetime.date.today():
logger.info("Token expired for user %s", credentials.username)
raise exceptions.PersonalAccessTokenExpired()

def get_username(self, credentials: security.HTTPBasicCredentials) -> str:
return credentials.username
return user, db_token
28 changes: 22 additions & 6 deletions backend/capellacollab/core/authentication/injectables.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from capellacollab.core import database
from capellacollab.core.authentication import api_key_cookie, basic_auth
from capellacollab.permissions import models as users_permissions
from capellacollab.projects import crud as projects_crud
from capellacollab.projects import exceptions as projects_exceptions
from capellacollab.projects import models as projects_models
Expand Down Expand Up @@ -59,28 +60,43 @@ class OpenAPIPersonalAccessToken(OpenAPIFakeBase):
__hash__ = OpenAPIFakeBase.__hash__


async def get_username(
async def validate_authentication_information(
request: fastapi.Request,
db: orm.Session = fastapi.Depends(database.get_db),
_unused1=fastapi.Depends(OpenAPIPersonalAccessToken()),
) -> str:
) -> tuple[users_models.DatabaseUser, users_permissions.GlobalScopes]:
if request.cookies.get("id_token"):
username = await api_key_cookie.JWTAPIKeyCookie()(request)
return username
user = users_crud.get_user_by_name(
db, await api_key_cookie.JWTAPIKeyCookie()(request)
)
assert user
return user, users_models.ROLE_MAPPING[user.role]

authorization = request.headers.get("Authorization")
scheme, _ = security_utils.get_authorization_scheme_param(authorization)

match scheme.lower():
case "basic":
username = await basic_auth.HTTPBasicAuth()(request)
user, token = await basic_auth.HTTPBasicAuth().validate(
db, request
)
case "":
raise exceptions.UnauthenticatedError()
case _:
raise exceptions.UnknownScheme(scheme)

return username
return user, users_permissions.GlobalScopes() # Replace with token scope


async def get_username(
authentication_information: tuple[
users_models.DatabaseUser, users_permissions.GlobalScopes
] = fastapi.Depends(validate_authentication_information),
) -> str:
return authentication_information[0].name


# TODO: Replace all occurrences of RoleVerification with PermissionValidation
class RoleVerification:
def __init__(self, required_role: users_models.Role, verify: bool = True):
self.required_role = required_role
Expand Down
10 changes: 9 additions & 1 deletion backend/capellacollab/core/logging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from starlette.middleware import base

from capellacollab.configuration.app import config
from capellacollab.core import database
from capellacollab.core.authentication import injectables as auth_injectables

LOGGING_LEVEL = config.logging.level
Expand Down Expand Up @@ -80,7 +81,14 @@ async def dispatch(
call_next: base.RequestResponseEndpoint,
):
try:
username = await auth_injectables.get_username(request)
with database.SessionLocal() as session:
(
user,
_,
) = await auth_injectables.validate_authentication_information(
request, session
)
username = user.name
except fastapi.HTTPException:
username = "anonymous"

Expand Down
2 changes: 2 additions & 0 deletions backend/capellacollab/permissions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SPDX-FileCopyrightText: Copyright DB InfraGO AG and contributors
# SPDX-License-Identifier: Apache-2.0
23 changes: 23 additions & 0 deletions backend/capellacollab/permissions/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# SPDX-FileCopyrightText: Copyright DB InfraGO AG and contributors
# SPDX-License-Identifier: Apache-2.0

from fastapi import status

from capellacollab.core import exceptions as core_exceptions

from . import models


class InsufficientPermissionError(core_exceptions.BaseError):
def __init__(
self,
required_permission: str,
required_verbs: set[models.UserTokenVerb],
):
verbs = ", ".join(verb.value for verb in required_verbs)
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
title="Permission denied",
reason=f"Insufficient permissions: '{required_permission}' permission with verbs {verbs} is required for this transaction.",
err_code="INSUFFICIENT_PERMISSION",
)
45 changes: 45 additions & 0 deletions backend/capellacollab/permissions/injectables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# SPDX-FileCopyrightText: Copyright DB InfraGO AG and contributors
# SPDX-License-Identifier: Apache-2.0

import dataclasses

import fastapi
from sqlalchemy import orm

from capellacollab.core import database
from capellacollab.core.authentication import injectables as auth_injectables
from capellacollab.users import models as users_models

from . import exceptions, models


@dataclasses.dataclass(eq=False)
class PermissionValidation:
required_scope: models.GlobalScopes

def __call__(
self,
authentication_information: tuple[
users_models.DatabaseUser, models.GlobalScopes
] = fastapi.Depends(
auth_injectables.validate_authentication_information
),
db: orm.Session = fastapi.Depends(database.get_db),
) -> None:
actual_scope = authentication_information[1].model_dump()

for scope, perms in self.required_scope:
for perm, verbs in perms:
for verb in verbs:
if verb not in actual_scope[scope][perm]:
raise exceptions.InsufficientPermissionError(
perm, verbs
)


async def get_scope(
authentication_information: tuple[
users_models.DatabaseUser, models.GlobalScopes
] = fastapi.Depends(auth_injectables.validate_authentication_information),
) -> models.GlobalScopes:
return authentication_information[1]
165 changes: 165 additions & 0 deletions backend/capellacollab/permissions/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# SPDX-FileCopyrightText: Copyright DB InfraGO AG and contributors
# SPDX-License-Identifier: Apache-2.0

import enum
import typing as t

import pydantic

from capellacollab.core import pydantic as core_pydantic


class UserTokenVerb(str, enum.Enum):
GET = "GET"
CREATE = "CREATE"
UPDATE = "UPDATE"
DELETE = "DELETE"


class UserScopes(core_pydantic.BaseModel):
sessions: set[
t.Literal[
UserTokenVerb.GET, UserTokenVerb.CREATE, UserTokenVerb.DELETE
]
] = pydantic.Field(
default={},
title="Sessions",
description="Manage sessions of your own user",
)
projects: set[t.Literal[UserTokenVerb.CREATE]] = pydantic.Field(
default={}, title="Projects", description="Create new projects"
)
tokens: set[
t.Literal[
UserTokenVerb.GET, UserTokenVerb.CREATE, UserTokenVerb.DELETE
]
] = pydantic.Field(
default={},
title="Personal Access Tokens",
description="Manage personal access tokens",
)


class ProjectUserScopes(core_pydantic.BaseModel):
root: set[t.Literal[UserTokenVerb.DELETE]] = pydantic.Field(
default={},
title="Project",
description="Add capability to delete the project or update the project metadata (visibility & project type)",
)
pipelines: set[
t.Literal[
UserTokenVerb.GET, UserTokenVerb.CREATE, UserTokenVerb.DELETE
]
] = pydantic.Field(
default={},
title="Pipelines",
description="See pipelines, create new pipelines or delete existing pipelines",
)
pipeline_runs: set[t.Literal[UserTokenVerb.GET, UserTokenVerb.CREATE]] = (
pydantic.Field(
default={},
title="Pipeline Runs",
description="Allow access to see or trigger pipeline runs",
)
)


class AdminScopes(core_pydantic.BaseModel):
users: set[
t.Literal[
UserTokenVerb.GET,
UserTokenVerb.CREATE,
UserTokenVerb.UPDATE,
UserTokenVerb.DELETE,
]
] = pydantic.Field(
default={},
title="Users",
description="Manage all users of the application",
)
projects: set[
t.Literal[
UserTokenVerb.GET,
UserTokenVerb.CREATE,
UserTokenVerb.UPDATE,
UserTokenVerb.DELETE,
]
] = pydantic.Field(
default={},
title="Projects",
description="Grant permission to all sub-resources of ALL projects",
)
tools: set[
t.Literal[
UserTokenVerb.GET,
UserTokenVerb.CREATE,
UserTokenVerb.UPDATE,
UserTokenVerb.DELETE,
]
] = pydantic.Field(
default={},
title="Tools",
description="Manage all tools, including it's versions and natures",
)
announcements: set[
t.Literal[UserTokenVerb.CREATE, UserTokenVerb.DELETE]
] = pydantic.Field(
default={},
title="Announcements",
description="Manage all announcements",
)
monitoring: set[t.Literal[UserTokenVerb.GET]] = pydantic.Field(
default={},
title="Monitoring",
description="Allow access to monitoring dashboards, Prometheus and Grafana",
)
configuration: set[t.Literal[UserTokenVerb.GET, UserTokenVerb.UPDATE]] = (
pydantic.Field(
default={},
title="Global Configuration",
description="See and update the global configuration",
)
)
t4c_servers: set[
t.Literal[
UserTokenVerb.GET,
UserTokenVerb.CREATE,
UserTokenVerb.UPDATE,
UserTokenVerb.DELETE,
]
] = pydantic.Field(
default={},
title="TeamForCapella Servers",
description="Manage Team4Capella servers and license servers",
)
t4c_repositories: set[
t.Literal[
UserTokenVerb.GET,
UserTokenVerb.CREATE,
UserTokenVerb.UPDATE,
UserTokenVerb.DELETE,
]
] = pydantic.Field(
default={},
title="TeamForCapella Repositories",
description="Manage Team4Capella repositories",
)
pv_configuration: set[
t.Literal[UserTokenVerb.UPDATE, UserTokenVerb.DELETE]
] = pydantic.Field(
default={},
title="pure::variants Configuration",
description="pure::variants license configuration",
)
events: set[t.Literal[UserTokenVerb.GET]] = pydantic.Field(
default={}, title="Events", description="See all events"
)


class GlobalScopes(core_pydantic.BaseModel):
user: UserScopes = pydantic.Field(
default=UserScopes(), title="User Scopes"
)
admin: AdminScopes = pydantic.Field(
default=AdminScopes(), title="Administrator Scopes"
)
13 changes: 13 additions & 0 deletions backend/capellacollab/permissions/routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# SPDX-FileCopyrightText: Copyright DB InfraGO AG and contributors
# SPDX-License-Identifier: Apache-2.0

import fastapi

from . import models

router = fastapi.APIRouter()


@router.get("")
def get_permissions():
return models.GlobalScopes.model_json_schema()
Loading
Loading