Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsasha committed Jun 28, 2023
1 parent 8397ba1 commit 09e2d1f
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 66 deletions.
7 changes: 7 additions & 0 deletions irrd/utils/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
AuthPermission,
AuthUser,
AuthWebAuthn,
ChangeLog,
RPSLDatabaseObject,
)
from irrd.webui.auth.users import password_handler
Expand Down Expand Up @@ -84,3 +85,9 @@ class Meta:
name = factory.Sequence(lambda n: "API token %s" % n)
enabled_webapi = True
enabled_email = True


class ChangeLogFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = ChangeLog
sqlalchemy_session_persistence = "commit"
66 changes: 65 additions & 1 deletion irrd/webui/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
from starlette_wtf import csrf_protect, csrf_token

from irrd.conf import get_setting
from irrd.storage.models import AuthoritativeChangeOrigin, AuthUser, RPSLDatabaseObject
from irrd.storage.models import (
AuthMntner,
AuthoritativeChangeOrigin,
AuthPermission,
AuthUser,
ChangeLog,
RPSLDatabaseObject,
)
from irrd.storage.orm_provider import ORMSessionProvider, session_provider_manager
from irrd.storage.queries import RPSLDatabaseQuery
from irrd.updates.handler import ChangeSubmissionHandler
Expand Down Expand Up @@ -172,3 +179,60 @@ def save():
},
)
return Response(status_code=405) # pragma: no cover


@session_provider_manager
@authentication_required
async def change_log_mntner(request: Request, session_provider: ORMSessionProvider) -> Response:
query = session_provider.session.query(AuthMntner).join(AuthPermission)
query = query.filter(
AuthMntner.pk == request.path_params["mntner"],
AuthPermission.user_id == str(request.auth.user.pk),
AuthPermission.user_management == True, # noqa
)
mntner = await session_provider.run(query.one)
if not mntner or not mntner.migration_complete:
return Response(status_code=404)

query = (
session_provider.session.query(ChangeLog)
.filter(
(ChangeLog.auth_through_mntner_id == str(mntner.pk))
| (
(ChangeLog.auth_through_rpsl_mntner_pk == mntner.rpsl_mntner_pk)
& (ChangeLog.rpsl_target_source == mntner.rpsl_mntner_source)
)
)
.order_by(ChangeLog.timestamp.desc())
)
change_logs = await session_provider.run(query.all)

return template_context_render(
"change_log_mntner.html", request, {"mntner": mntner, "change_logs": change_logs}
)


@session_provider_manager
@authentication_required
async def change_log_entry(request: Request, session_provider: ORMSessionProvider) -> Response:
mntners = list(request.auth.user.mntners)
if not mntners:
return Response(status_code=404)

query = session_provider.session.query(ChangeLog)
query = query.filter(
ChangeLog.pk == request.path_params["entry"],
AuthPermission.user_id == str(request.auth.user.pk),
AuthPermission.user_management == True, # noqa
).filter(
(ChangeLog.auth_through_mntner_id.in_([str(mntner.pk) for mntner in mntners]))
| (
ChangeLog.auth_through_rpsl_mntner_pk.in_([mntner.rpsl_mntner_pk for mntner in mntners])
& ChangeLog.rpsl_target_source.in_([mntner.rpsl_mntner_source for mntner in mntners])
)
)
entry = await session_provider.run(query.one)
if not entry:
return Response(status_code=404)

return template_context_render("change_log_entry.html", request, {"entry": entry})
64 changes: 0 additions & 64 deletions irrd/webui/endpoints_changelog.py

This file was deleted.

3 changes: 2 additions & 1 deletion irrd/webui/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

from irrd.webui.auth.routes import AUTH_ROUTES
from irrd.webui.endpoints import (
change_log_entry,
change_log_mntner,
index,
maintained_objects,
rpsl_detail,
rpsl_update,
user_permissions,
)
from irrd.webui.endpoints_changelog import change_log_entry, change_log_mntner
from irrd.webui.endpoints_mntners import (
api_token_add,
api_token_delete,
Expand Down
118 changes: 118 additions & 0 deletions irrd/webui/tests/test_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from datetime import datetime, timezone
from unittest.mock import create_autospec

Expand All @@ -7,6 +8,8 @@
from irrd.utils.rpsl_samples import SAMPLE_MNTNER
from irrd.webui import datetime_format

from ...updates.parser_state import UpdateRequestType
from ...utils.factories import AuthApiTokenFactory, ChangeLogFactory
from .conftest import WebRequestTest, create_permission


Expand Down Expand Up @@ -252,3 +255,118 @@ def test_valid_mntner_not_logged_in(self, test_client, irrd_db_session_with_user
assert response.status_code == 200
assert "TEST-MNT" in response.text
assert "DUMMYVALUE" in response.text.upper()


class TestChangeLogMntner(WebRequestTest):
url_template = "/ui/change-log/{uuid}/"

def pre_login(self, session_provider, user, user_management=True):
self.permission = create_permission(session_provider, user, user_management=user_management)
self.url = self.url_template.format(uuid=self.permission.mntner.pk)

def test_render(self, test_client, irrd_db_session_with_user):
session_provider, user = irrd_db_session_with_user
self.pre_login(session_provider, user)
self._login_if_needed(test_client, user)

ChangeLogFactory(
auth_through_mntner_id=str(self.permission.mntner.pk),
auth_change_descr="auth change descr",
)
api_token = AuthApiTokenFactory()
ChangeLogFactory(
auth_through_rpsl_mntner_pk=str(self.permission.mntner.rpsl_mntner_pk),
rpsl_target_pk="TARGET-PK",
rpsl_target_object_class="person",
rpsl_target_source=self.permission.mntner.rpsl_mntner_source,
auth_by_api_key_id_fixed=str(api_token.pk),
from_ip="127.0.0.1",
rpsl_target_request_type=UpdateRequestType.MODIFY,
)

response = test_client.get(self.url)
assert response.status_code == 200
assert self.permission.mntner.rpsl_mntner_pk in response.text
assert "auth change descr" in response.text
assert str(api_token.pk) in response.text
assert "127.0.0.1" in response.text
assert "modify of person TARGET-PK" in response.text

def test_no_entries(self, test_client, irrd_db_session_with_user):
session_provider, user = irrd_db_session_with_user
self.pre_login(session_provider, user)
self._login_if_needed(test_client, user)

response = test_client.get(self.url)
assert response.status_code == 200
assert self.permission.mntner.rpsl_mntner_pk in response.text

def test_no_permissions(self, test_client, irrd_db_session_with_user):
session_provider, user = irrd_db_session_with_user
self.pre_login(session_provider, user)
self._login_if_needed(test_client, user)
session_provider.session.delete(self.permission)
session_provider.session.commit()

response = test_client.get(self.url)
assert response.status_code == 404


class TestChangeLogEntry(WebRequestTest):
url_template = "/ui/change-log/entry/{uuid}/"

def pre_login(self, session_provider, user, user_management=True):
self.permission = create_permission(session_provider, user, user_management=user_management)
self.change_log = ChangeLogFactory(
auth_through_mntner_id=str(self.permission.mntner.pk),
auth_change_descr="auth change descr",
)
self.url = self.url_template.format(uuid=self.change_log.pk)

def test_render(self, test_client, irrd_db_session_with_user):
session_provider, user = irrd_db_session_with_user
self.pre_login(session_provider, user)
self._login_if_needed(test_client, user)

response = test_client.get(self.url)
assert response.status_code == 200
assert "auth change descr" in response.text

def test_render_rpsl_change(self, test_client, irrd_db_session_with_user):
session_provider, user = irrd_db_session_with_user
self.pre_login(session_provider, user)

api_token = AuthApiTokenFactory()
change_log = ChangeLogFactory(
auth_through_rpsl_mntner_pk=str(self.permission.mntner.rpsl_mntner_pk),
rpsl_target_pk="TARGET-PK",
rpsl_target_object_class="person",
rpsl_target_source=self.permission.mntner.rpsl_mntner_source,
auth_by_api_key_id_fixed=str(api_token.pk),
from_ip="127.0.0.1",
rpsl_target_request_type=UpdateRequestType.MODIFY,
)
self.url = self.url_template.format(uuid=change_log.pk)
self._login_if_needed(test_client, user)

response = test_client.get(self.url)
assert response.status_code == 200
assert str(api_token.pk) in response.text
assert "127.0.0.1" in response.text
assert "modify of person TARGET-PK" in response.text

def test_no_permissions(self, test_client, irrd_db_session_with_user):
session_provider, user = irrd_db_session_with_user
self.pre_login(session_provider, user)
self._login_if_needed(test_client, user)
session_provider.session.delete(self.permission)
session_provider.session.commit()

response = test_client.get(self.url)
assert response.status_code == 404

def test_object_not_exists(self, test_client, irrd_db_session_with_user):
session_provider, user = irrd_db_session_with_user
self._login_if_needed(test_client, user)
response = test_client.get(self.url_template.format(uuid=uuid.uuid4()))
assert response.status_code == 404

0 comments on commit 09e2d1f

Please sign in to comment.