From 50318b99a5a00b5f5949de75b0fe120d9591217c Mon Sep 17 00:00:00 2001 From: Kevin Carrogan Date: Fri, 24 Jan 2025 21:35:24 +0000 Subject: [PATCH 1/3] Add validation for invalid mime type file uploads --- core/file_handler.py | 47 ++++++++++- core/middleware.py | 6 +- exporter/applications/forms/parties.py | 18 ++++- exporter/goods/forms/common.py | 4 + exporter/goods/forms/firearms.py | 7 ++ exporter/goods/forms/goods.py | 4 + exporter/organisation/forms.py | 8 ++ unit_tests/core/test_file_handler.py | 103 +++++++++++++++---------- unit_tests/core/urls.py | 12 +++ unit_tests/helpers.py | 4 +- 10 files changed, 164 insertions(+), 49 deletions(-) create mode 100644 unit_tests/core/urls.py diff --git a/core/file_handler.py b/core/file_handler.py index 6f0f41d6f8..3a5680cdfc 100644 --- a/core/file_handler.py +++ b/core/file_handler.py @@ -3,7 +3,9 @@ import magic from django.conf import settings +from django.core.exceptions import ValidationError from django.core.files.uploadhandler import UploadFileException +from django.core.files.uploadedfile import UploadedFile from django.http import StreamingHttpResponse from django_chunk_upload_handlers.s3 import S3FileUploadHandler @@ -45,6 +47,39 @@ def s3_client(): return S3Wrapper.get_client() +def validate_mime_type(file): + if isinstance(file, UnacceptableMimeTypeFile): + raise ValidationError( + "The file type is not supported. Upload a supported file type", + code="invalid_mime_type", + ) + + +class UnacceptableMimeTypeFile(UploadedFile): + def __init__(self, field_name): + super().__init__(file="unacceptable-mime-type", name="unacceptable-mime-type", size="unacceptable-mime-type") + self.field_name = field_name + + def open(self, mode=None): + raise UnacceptableMimeTypeError() + + def chunks(self, chunk_size=None): + raise UnacceptableMimeTypeError() + + def multiple_chunks(self, chunk_size=None): + raise UnacceptableMimeTypeError() + + def __iter__(self): + raise UnacceptableMimeTypeError() + + def __enter__(self): + raise UnacceptableMimeTypeError() + + @property + def obj(self): + raise UnacceptableMimeTypeError() + + class SafeS3FileUploadHandler(S3FileUploadHandler): """ S3FileUploadHandler with mime-type validation. @@ -62,8 +97,9 @@ def receive_data_chunk(self, raw_data, start): mime = magic.from_buffer(raw_data, mime=True) if mime not in self.ACCEPTED_FILE_UPLOAD_MIME_TYPES: self.abort() - raise UnacceptableMimeTypeError(f"Unsupported file type: {mime}") - super().receive_data_chunk(raw_data, start) + self.failed_mime_type = True + return None + return super().receive_data_chunk(raw_data, start) def file_complete(self, *args, **kwargs): """Override `file_complete` to ensure that all necessary attributes @@ -72,6 +108,9 @@ def file_complete(self, *args, **kwargs): Some frameworks, e.g. formtools' SessionWizardView will fall over if these attributes aren't present. """ + if getattr(self, "failed_mime_type", False): + return UnacceptableMimeTypeFile(self.field_name) + file = super().file_complete(*args, **kwargs) file.charset = self.charset @@ -79,11 +118,11 @@ def file_complete(self, *args, **kwargs): class UploadFailed(UploadFileException): - pass + message = "File upload failed." class UnacceptableMimeTypeError(UploadFailed): - pass + message = "Invalid file type uploaded." def generate_file(result): diff --git a/core/middleware.py b/core/middleware.py index a59848d782..d150f112db 100644 --- a/core/middleware.py +++ b/core/middleware.py @@ -17,7 +17,6 @@ from django.http import HttpResponseForbidden from core.file_handler import UploadFailed -from lite_content.lite_internal_frontend.strings import cases from lite_forms.generators import error_page from json import JSONDecodeError @@ -37,7 +36,10 @@ def process_exception(self, request, exception): if not isinstance(exception, UploadFailed): return None - return error_page(request, cases.Manage.Documents.AttachDocuments.FILE_TOO_LARGE) + return error_page( + request, + exception.message, + ) class SessionTimeoutMiddleware: diff --git a/exporter/applications/forms/parties.py b/exporter/applications/forms/parties.py index 86c7c28748..09cb8512f7 100644 --- a/exporter/applications/forms/parties.py +++ b/exporter/applications/forms/parties.py @@ -8,6 +8,7 @@ from django.urls import reverse_lazy from core.common.forms import BaseForm, FieldsetForm +from core.file_handler import validate_mime_type from core.forms.layouts import ConditionalRadios, ConditionalRadiosQuestion from core.forms.widgets import Autocomplete from exporter.core.constants import CaseTypes, FileUploadFileTypes @@ -433,6 +434,9 @@ class PartyDocumentUploadForm(forms.Form): error_messages={ "required": "Select an end-user document", }, + validators=[ + validate_mime_type, + ], ) product_differences_note = forms.CharField( widget=forms.Textarea(attrs={"rows": "5"}), @@ -492,6 +496,9 @@ class PartyEnglishTranslationDocumentUploadForm(forms.Form): error_messages={ "required": "Select an English translation", }, + validators=[ + validate_mime_type, + ], ) def __init__(self, edit, *args, **kwargs): @@ -520,6 +527,9 @@ class PartyCompanyLetterheadDocumentUploadForm(forms.Form): error_messages={ "required": "Select a document on company letterhead", }, + validators=[ + validate_mime_type, + ], ) def __init__(self, edit, *args, **kwargs): @@ -543,7 +553,13 @@ def get_title(self): class PartyEC3DocumentUploadForm(forms.Form): title = "Upload an EC3 form (optional)" - party_ec3_document = forms.FileField(label="", required=False) + party_ec3_document = forms.FileField( + label="", + required=False, + validators=[ + validate_mime_type, + ], + ) ec3_missing_reason = forms.CharField( widget=forms.Textarea(attrs={"rows": "5"}), label="", diff --git a/exporter/goods/forms/common.py b/exporter/goods/forms/common.py index cf0314dbfc..24fe660181 100644 --- a/exporter/goods/forms/common.py +++ b/exporter/goods/forms/common.py @@ -14,6 +14,7 @@ ConditionalCheckbox, Prefixed, ) +from core.file_handler import validate_mime_type from core.forms.utils import coerce_str_to_bool from core.common.forms import BaseForm, FieldsetForm @@ -363,6 +364,9 @@ class Layout: error_messages={ "required": "Select a document that shows what your product is designed to do", }, + validators=[ + validate_mime_type, + ], ) description = forms.CharField( widget=forms.Textarea(attrs={"rows": "5"}), diff --git a/exporter/goods/forms/firearms.py b/exporter/goods/forms/firearms.py index 589e90d13a..3f98f2f9e5 100644 --- a/exporter/goods/forms/firearms.py +++ b/exporter/goods/forms/firearms.py @@ -9,6 +9,7 @@ FirearmsActSections, SerialChoices, ) +from core.file_handler import validate_mime_type from core.forms.layouts import ( ConditionalCheckbox, ConditionalRadiosQuestion, @@ -222,6 +223,9 @@ class Layout: error_messages={ "required": "Select a registered firearms dealer certificate", }, + validators=[ + validate_mime_type, + ], widget=PotentiallyUnsafeClearableFileInput, ) @@ -330,6 +334,9 @@ class Layout: file = forms.FileField( label="", required=False, + validators=[ + validate_mime_type, + ], widget=PotentiallyUnsafeClearableFileInput( force_required=True, ), diff --git a/exporter/goods/forms/goods.py b/exporter/goods/forms/goods.py index 43362db1de..36f8b9a177 100644 --- a/exporter/goods/forms/goods.py +++ b/exporter/goods/forms/goods.py @@ -10,6 +10,7 @@ from core.builtins.custom_tags import default_na, linkify from core.constants import ComponentAccessoryChoices, ProductCategories +from core.file_handler import validate_mime_type from core.forms.layouts import ConditionalRadiosQuestion, ConditionalRadios, summary_list from core.forms.utils import coerce_str_to_bool @@ -1106,6 +1107,9 @@ class AttachFirearmsDealerCertificateForm(forms.Form): error_messages={ "required": "Select certificate file to upload", }, + validators=[ + validate_mime_type, + ], ) reference_code = forms.CharField( diff --git a/exporter/organisation/forms.py b/exporter/organisation/forms.py index 14b9d2cb45..8976f1dadb 100644 --- a/exporter/organisation/forms.py +++ b/exporter/organisation/forms.py @@ -8,6 +8,8 @@ from crispy_forms_gds.helper import FormHelper from crispy_forms_gds.layout import Submit, Layout, HTML +from core.file_handler import validate_mime_type + from exporter.core.constants import FileUploadFileTypes @@ -28,6 +30,9 @@ class Layout: label="", help_text="The file must be smaller than 50MB", error_messages={"required": "Select certificate file to upload"}, + validators=[ + validate_mime_type, + ], ) reference_code = forms.CharField( label="Certificate number", @@ -63,6 +68,9 @@ class Layout: label=FileUploadFileTypes.UPLOAD_GUIDANCE_TEXT, help_text="The file must be smaller than 50MB", error_messages={"required": "Select certificate file to upload"}, + validators=[ + validate_mime_type, + ], ) reference_code = forms.CharField( label="Certificate number", diff --git a/unit_tests/core/test_file_handler.py b/unit_tests/core/test_file_handler.py index 063aed034e..4efdd7fbf3 100644 --- a/unit_tests/core/test_file_handler.py +++ b/unit_tests/core/test_file_handler.py @@ -1,64 +1,85 @@ +import os import pytest -from unittest import mock -from os import path - -from django.core.files.uploadhandler import UploadFileException +from django import forms +from django.http import HttpResponse +from django.urls import path, reverse +from django.views import View +from django.views.generic import FormView from core.file_handler import ( s3_client, - SafeS3FileUploadHandler, S3Wrapper, - UnacceptableMimeTypeError, + validate_mime_type, ) -TEST_FILES_PATH = path.join(path.dirname(__file__), "test_file_handler_files") +TEST_FILES_PATH = os.path.join(os.path.dirname(__file__), "test_file_handler_files") + + +class FileForm(forms.Form): + upload_file = forms.FileField( + validators=[ + validate_mime_type, + ] + ) + + +class OKView(View): + def get(self, request, *args, **kwargs): + return HttpResponse("OK") + + +class FileUploadFormView(FormView): + form_class = FileForm + def form_invalid(self, form): + response = HttpResponse("Not OK") + response.form = form + return response -@pytest.fixture -def mock_handler(): - with mock.patch("django_chunk_upload_handlers.s3.boto3_client"): - handler = SafeS3FileUploadHandler() - handler.new_file(field_name="test", file_name="test", content_type="test/test", content_length=0) - handler.file = mock.Mock() - handler.client = mock.Mock() - handler.executor = mock.Mock() - handler.bucket_name = "test" - handler.s3_key = "test" - handler.upload_id = "test" - yield handler + def get_success_url(self): + return reverse("ok") -def test_valid_file_upload(mock_handler): +urlpatterns = [ + path("file-upload/", FileUploadFormView.as_view(), name="file-upload"), + path("ok/", OKView.as_view(), name="ok"), +] + + +@pytest.fixture() +def file_upload_handler_settings(settings): + settings.FILE_UPLOAD_HANDLERS = ["core.file_handler.SafeS3FileUploadHandler"] + settings.ACCEPTED_FILE_UPLOAD_MIME_TYPES = "text/plain" + settings.MIDDLEWARE = [] + + +@pytest.mark.urls(__name__) +def test_valid_file(file_upload_handler_settings, client): with open(f"{TEST_FILES_PATH}/valid.txt", "rb") as f: - content = f.read() - mock_handler.abort = mock.Mock() - mock_handler.receive_data_chunk(content, 0) - mock_handler.abort.assert_not_called() - # set start to be anything but 0 - mock_handler.receive_data_chunk(content, 1) - mock_handler.abort.assert_not_called() + response = client.post(reverse("file-upload"), {"upload_file": f}) + assert response.status_code == 302 + assert response.url == reverse("ok") -def test_invalid_file_type_upload(mock_handler): + +@pytest.mark.urls(__name__) +def test_invalid_file_type(file_upload_handler_settings, client): with open(f"{TEST_FILES_PATH}/invalid_type.zip", "rb") as f: - content = f.read() - mock_handler.abort = mock.Mock() - with pytest.raises(UnacceptableMimeTypeError, match="Unsupported file type: application/zip") as e: - mock_handler.receive_data_chunk(content, 0) - assert isinstance(e.value, UploadFileException) - mock_handler.abort.assert_called_once() + response = client.post(reverse("file-upload"), {"upload_file": f}) + + assert not response.form.is_valid() + assert response.form.errors == {"upload_file": ["The file type is not supported. Upload a supported file type"]} -def test_invalid_file_mime_type_upload(mock_handler): +@pytest.mark.urls(__name__) +def test_invalid_mime_type(file_upload_handler_settings, client): with open(f"{TEST_FILES_PATH}/invalid_mime.txt", "rb") as f: - content = f.read() - mock_handler.abort = mock.Mock() - with pytest.raises(UnacceptableMimeTypeError, match="Unsupported file type: application/zip") as e: - mock_handler.receive_data_chunk(content, 0) - assert isinstance(e.value, UploadFileException) - mock_handler.abort.assert_called_once() + response = client.post(reverse("file-upload"), {"upload_file": f}) + + assert not response.form.is_valid() + assert response.form.errors == {"upload_file": ["The file type is not supported. Upload a supported file type"]} def test_s3_wrapper_get_client(settings, mocker): diff --git a/unit_tests/core/urls.py b/unit_tests/core/urls.py new file mode 100644 index 0000000000..686769fa3e --- /dev/null +++ b/unit_tests/core/urls.py @@ -0,0 +1,12 @@ +from django.urls import path + +from .test_file_handler import ValidFileFormView + + +urlpatterns = [ + path( + "valid-file/", + ValidFileFormView.as_view(), + name="valid-file", + ), +] diff --git a/unit_tests/helpers.py b/unit_tests/helpers.py index ee9cd130b1..108053b204 100644 --- a/unit_tests/helpers.py +++ b/unit_tests/helpers.py @@ -8,7 +8,9 @@ from django.urls import clear_url_caches -def reload_urlconf(urlconfs=[settings.ROOT_URLCONF]): +def reload_urlconf(urlconfs=None): + if not urlconfs: + urlconfs = [settings.ROOT_URLCONF] clear_url_caches() for urlconf in urlconfs: if urlconf in sys.modules: From 649e6a6fdc05395baea111435f61dfde4f07e17e Mon Sep 17 00:00:00 2001 From: Gurdeep Atwal Date: Tue, 28 Jan 2025 21:16:43 +0000 Subject: [PATCH 2/3] add tests --- .../core/middleware/test_upload_failed.py | 23 +++++++++++++++++++ unit_tests/core/test_file_handler.py | 11 +++++++++ 2 files changed, 34 insertions(+) create mode 100644 unit_tests/core/middleware/test_upload_failed.py diff --git a/unit_tests/core/middleware/test_upload_failed.py b/unit_tests/core/middleware/test_upload_failed.py new file mode 100644 index 0000000000..fc4ca254c8 --- /dev/null +++ b/unit_tests/core/middleware/test_upload_failed.py @@ -0,0 +1,23 @@ +from core.middleware import UploadFailedMiddleware +from core.file_handler import UploadFailed + + +def test_upload_failed_middleware_general_exception(rf, mocker): + get_response = mocker.MagicMock() + upload_failed_middleware = UploadFailedMiddleware(get_response) + request = rf.get("/") + not_file_upload_exception = Exception() + + assert upload_failed_middleware.process_exception(request, not_file_upload_exception) is None + + +def test_upload_failed_middleware_upload_failed_exception(rf, mocker, beautiful_soup): + get_response = mocker.MagicMock() + upload_failed_middleware = UploadFailedMiddleware(get_response) + request = rf.get("/") + not_file_upload_exception = UploadFailed() + + error_page_reponse = upload_failed_middleware.process_exception(request, not_file_upload_exception) + soup = beautiful_soup(error_page_reponse.content) + + assert "An error occurred" in soup.title.text.strip() diff --git a/unit_tests/core/test_file_handler.py b/unit_tests/core/test_file_handler.py index 4efdd7fbf3..8885b4cfc2 100644 --- a/unit_tests/core/test_file_handler.py +++ b/unit_tests/core/test_file_handler.py @@ -8,6 +8,8 @@ from django.views.generic import FormView from core.file_handler import ( + UnacceptableMimeTypeError, + UnacceptableMimeTypeFile, s3_client, S3Wrapper, validate_mime_type, @@ -82,6 +84,15 @@ def test_invalid_mime_type(file_upload_handler_settings, client): assert response.form.errors == {"upload_file": ["The file type is not supported. Upload a supported file type"]} +@pytest.mark.parametrize("func_name", ["open", "chunks", "multiple_chunks", "__iter__", "__enter__", "obj"]) +def test_unacceptable_mime_type_file_function_raises_errors(func_name): + unacceptable_mime_file = UnacceptableMimeTypeFile("bad_file") + + with pytest.raises(UnacceptableMimeTypeError): + func = getattr(unacceptable_mime_file, func_name) + func() + + def test_s3_wrapper_get_client(settings, mocker): settings.AWS_ACCESS_KEY_ID = "aws-access-key-id" settings.AWS_SECRET_ACCESS_KEY = "aws-secret-access-key" # noqa: S105 From 5e5ad50c29384ef8fac7c84fc9e64657acfac76f Mon Sep 17 00:00:00 2001 From: Marcus Patino Pan Date: Thu, 30 Jan 2025 11:28:53 +0000 Subject: [PATCH 3/3] Reinstate case details panel in advice approval views --- .../advice/templates/advice/form_wizard.html | 7 ++++ caseworker/advice/views/approval.py | 1 + caseworker/templates/core/form-wizard.html | 1 + .../views/test_give_approval_advice_view.py | 41 ++++++++++++++++++- 4 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 caseworker/advice/templates/advice/form_wizard.html diff --git a/caseworker/advice/templates/advice/form_wizard.html b/caseworker/advice/templates/advice/form_wizard.html new file mode 100644 index 0000000000..152c054e75 --- /dev/null +++ b/caseworker/advice/templates/advice/form_wizard.html @@ -0,0 +1,7 @@ +{% extends 'core/form-wizard.html' %} + +{% block right_side_panel %} +
+ {% include "advice/case_detail.html" %} +
+{% endblock %} diff --git a/caseworker/advice/views/approval.py b/caseworker/advice/views/approval.py index 7c10c60934..e9531a282a 100644 --- a/caseworker/advice/views/approval.py +++ b/caseworker/advice/views/approval.py @@ -43,6 +43,7 @@ def get_context_data(self, **kwargs): class BaseApprovalAdviceView(LoginRequiredMixin, CaseContextMixin, BaseSessionWizardView): + template_name = "advice/form_wizard.html" condition_dict = { AdviceSteps.RECOMMEND_APPROVAL: ~C(is_fcdo_team), diff --git a/caseworker/templates/core/form-wizard.html b/caseworker/templates/core/form-wizard.html index 931d4284af..482d13c464 100644 --- a/caseworker/templates/core/form-wizard.html +++ b/caseworker/templates/core/form-wizard.html @@ -37,6 +37,7 @@ {% crispy wizard.form %} {% endif %} + {% block right_side_panel %}{% endblock %} diff --git a/unit_tests/caseworker/advice/views/test_give_approval_advice_view.py b/unit_tests/caseworker/advice/views/test_give_approval_advice_view.py index adc8fcb87b..176f2d8e57 100644 --- a/unit_tests/caseworker/advice/views/test_give_approval_advice_view.py +++ b/unit_tests/caseworker/advice/views/test_give_approval_advice_view.py @@ -31,9 +31,42 @@ def post_to_step(post_to_step_factory, url_approve): return post_to_step_factory(url_approve) -def test_give_approval_advice_get(authorized_client, url): +def test_give_approval_advice_legacy_get(authorized_client, url, beautiful_soup): response = authorized_client.get(url) assert response.status_code == 200 + soup = beautiful_soup(response.content) + header = soup.find("h1", {"class": "govuk-heading-xl"}) + assert header.text == "Recommend an approval" + + summary_header = soup.find("h2", {"class": "govuk-heading-m"}) + assert summary_header.text == "Case details" + + details = soup.find_all("span", {"class": "govuk-details__summary-text"}) + assert {detail.text.strip() for detail in details} == { + "Add a licence condition, instruction to exporter or footnote", + "Products", + "Destinations", + "View serial numbers", + } + + +def test_give_approval_advice_get( + authorized_client, + beautiful_soup, + url_approve, +): + response = authorized_client.get(url_approve) + assert response.status_code == 200 + + soup = beautiful_soup(response.content) + header = soup.find("h1", {"class": "govuk-heading-xl"}) + assert header.text == "Recommend an approval" + + summary_header = soup.find("h2", {"class": "govuk-heading-m"}) + assert summary_header.text == "Case details" + + details = soup.find_all("span", {"class": "govuk-details__summary-text"}) + assert {detail.text.strip() for detail in details} == {"Products", "Destinations", "View serial numbers"} def test_approval_advice_post_valid( @@ -75,6 +108,12 @@ def test_approval_advice_post_valid_add_conditional( header = soup.find("h1") assert header.text == "Add licence conditions (optional)" + summary_header = soup.find("h2", {"class": "govuk-heading-m"}) + assert summary_header.text == "Case details" + + details = soup.find_all("span", {"class": "govuk-details__summary-text"}) + assert {detail.text.strip() for detail in details} == {"Products", "Destinations", "View serial numbers"} + add_LC_response = post_to_step( AdviceSteps.LICENCE_CONDITIONS, {"proviso": "proviso"},