diff --git a/caluma/caluma_form/serializers.py b/caluma/caluma_form/serializers.py index 327a65e1f..d0d9dcd2a 100644 --- a/caluma/caluma_form/serializers.py +++ b/caluma/caluma_form/serializers.py @@ -470,7 +470,7 @@ class SaveAnswerSerializer(serializers.ModelSerializer): def validate(self, data): if "value" not in data: data["value"] = None - validators.AnswerValidator().validate(**data, info=self.context["info"]) + validators.AnswerValidator().validate(**data, user=self.context["request"].user) return super().validate(data) class Meta: diff --git a/caluma/caluma_form/tests/test_validators.py b/caluma/caluma_form/tests/test_validators.py index c6be7de8b..89380a183 100644 --- a/caluma/caluma_form/tests/test_validators.py +++ b/caluma/caluma_form/tests/test_validators.py @@ -19,7 +19,13 @@ ) @pytest.mark.parametrize("question__type", [Question.TYPE_TEXT]) def test_validate_hidden_required_field( - db, required_jexl, hidden_jexl, should_throw, form_question, document_factory, info + db, + required_jexl, + hidden_jexl, + should_throw, + form_question, + document_factory, + admin_user, ): form_question.question.is_required = required_jexl form_question.question.is_hidden = hidden_jexl @@ -29,9 +35,9 @@ def test_validate_hidden_required_field( error_msg = f"Questions {form_question.question.slug} are required but not provided" if should_throw: with pytest.raises(ValidationError, match=error_msg): - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) else: - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) @pytest.mark.parametrize( @@ -39,11 +45,11 @@ def test_validate_hidden_required_field( [(Question.TYPE_FILE, "false"), (Question.TYPE_DATE, "false")], ) def test_validate_special_fields( - db, form_question, question, document_factory, answer_factory, info + db, form_question, question, document_factory, answer_factory, admin_user ): document = document_factory(form=form_question.form) answer_factory(document=document, question=question) - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) @pytest.mark.parametrize( @@ -70,7 +76,7 @@ def test_validate_dynamic_options( valid, document_factory, answer_factory, - info, + admin_user, settings, ): settings.DATA_SOURCE_CLASSES = [ @@ -85,13 +91,13 @@ def test_validate_dynamic_options( answer_factory(value=value, document=document, question=question) if valid: - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) assert DynamicOption.objects.get( document=document, slug=lookup_value, question=question ) else: with pytest.raises(ValidationError): - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) @pytest.mark.parametrize( @@ -106,7 +112,7 @@ def test_validate_dynamic_option_exists( answer_factory, document_factory, dynamic_option_factory, - info, + admin_user, settings, ): settings.DATA_SOURCE_CLASSES = [ @@ -123,7 +129,7 @@ def test_validate_dynamic_option_exists( value = [value] answer_factory(question=question, value=value, document=document) - assert DocumentValidator().validate(dynamic_option.document, info) is None + assert DocumentValidator().validate(dynamic_option.document, admin_user) is None @pytest.mark.parametrize( @@ -152,7 +158,7 @@ def test_validate_table( form_question_factory, document_factory, answer_factory, - info, + admin_user, ): # F: main-form @@ -222,16 +228,16 @@ def test_validate_table( if should_throw and required_jexl_sub.startswith("'fail'"): with pytest.raises(QuestionMissing): - DocumentValidator().validate(main_document, info) + DocumentValidator().validate(main_document, admin_user) elif should_throw: q_slug = sub_question_b.question.slug if required_jexl_sub == "false": q_slug = other_q_2.question.slug error_msg = f"Questions {q_slug} are required but not provided" with pytest.raises(ValidationError, match=error_msg): - DocumentValidator().validate(main_document, info) + DocumentValidator().validate(main_document, admin_user) else: - DocumentValidator().validate(main_document, info) + DocumentValidator().validate(main_document, admin_user) @pytest.mark.parametrize( @@ -248,7 +254,7 @@ def test_validate_table( ], ) def test_validate_data_source( - db, question, valid, info, serializer_to_use, data_source_settings + db, question, valid, serializer_to_use, data_source_settings ): serializer = getattr(serializers, serializer_to_use) @@ -286,7 +292,6 @@ def test_validate_empty_answers( document_factory, answer_factory, expected_value, - info, ): struct = structure.FieldSet(document, document.form) @@ -325,19 +330,19 @@ def test_validate_empty_answers( ], ) def test_validate_invalid_jexl( - db, form_question, document, answer, question, exception_message, info + db, form_question, document, answer, question, exception_message, admin_user ): if exception_message is not None: with pytest.raises(RuntimeError) as exc: - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) assert exc.value.args[0] == exception_message else: - assert DocumentValidator().validate(document, info) is None + assert DocumentValidator().validate(document, admin_user) is None def test_validate_required_integer_0( - db, form_question, answer_factory, document_factory, info + db, form_question, answer_factory, document_factory, admin_user ): form_question.question.is_required = "true" form_question.question.type = Question.TYPE_INTEGER @@ -346,7 +351,7 @@ def test_validate_required_integer_0( document = document_factory(form=form_question.form) answer_factory(document=document, value=0, question=form_question.question) - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) @pytest.mark.parametrize( @@ -373,10 +378,10 @@ def test_validate_required_integer_0( "answer__date,answer__file,question__is_required", [(None, None, "true")] ) def test_validate_required_empty_answers( - db, info, form_question, document, answer, question + db, admin_user, form_question, document, answer, question ): with pytest.raises(ValidationError): - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) @pytest.mark.parametrize("question__is_hidden", ["true", "false"]) @@ -385,7 +390,7 @@ def test_validate_required_empty_answers( [[Question.TYPE_FLOAT, {"min_value": 0, "max_value": 3}]], ) def test_validate_hidden_field( - db, form_question, document_factory, answer_factory, info + db, form_question, document_factory, answer_factory, admin_user ): question = form_question.question document = document_factory(form=form_question.form) @@ -394,10 +399,10 @@ def test_validate_hidden_field( answer_factory(question=question, document=document, value=4) if question.is_hidden == "true": - assert DocumentValidator().validate(document, info) is None + assert DocumentValidator().validate(document, admin_user) is None else: with pytest.raises(ValidationError): - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) @pytest.mark.parametrize("hide_formquestion", [True, False]) @@ -408,7 +413,7 @@ def test_validate_hidden_subform( document_factory, form_question_factory, answer_factory, - info, + admin_user, hide_formquestion, ): # First, build our nested form: @@ -461,10 +466,10 @@ def test_validate_hidden_subform( answer_factory(question=sub_sub_question, document=document, value=4) if hide_formquestion: - assert DocumentValidator().validate(document, info) is None + assert DocumentValidator().validate(document, admin_user) is None else: with pytest.raises(ValidationError) as excinfo: - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) # Verify that the sub_sub_question is not the cause of the exception: # it should not be checked at all because it's parent is always hidden assert excinfo.match(r"Questions \bsub_question\s.* required but not provided.") @@ -483,7 +488,7 @@ def test_dependent_question_is_hidden( form_question_factory, answer_factory, answer_value, - info, + admin_user, ): form = form_factory() q1 = question_factory(is_hidden="false", type=Question.TYPE_TEXT) @@ -508,18 +513,18 @@ def test_dependent_question_is_hidden( if answer_value == "foo": # Answer to q1's value is "foo", so q2 is hidden. This means # that q3 should be hidden and not required as well - assert DocumentValidator().validate(document, info) is None + assert DocumentValidator().validate(document, admin_user) is None else: # a1's value is "bar", so q2 is visible. This means # that q3 should also be visible and required with pytest.raises(ValidationError): - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) @pytest.mark.parametrize("question__type", ["file"]) @pytest.mark.parametrize("question__is_required", ["true"]) @pytest.mark.parametrize("question__is_hidden", ["false"]) -def test_required_file(db, question, form, document, answer, info, form_question): +def test_required_file(db, question, form, document, answer, admin_user, form_question): # verify some assumptions assert document.form == form assert answer.document == document @@ -533,14 +538,14 @@ def test_required_file(db, question, form, document, answer, info, form_question # ensure validation fails with no `file` value with pytest.raises(ValidationError): - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) # put the file back answer.file = the_file answer.save() # then, validation must pass - assert DocumentValidator().validate(document, info) is None + assert DocumentValidator().validate(document, admin_user) is None def test_validate_missing_in_subform( @@ -550,7 +555,7 @@ def test_validate_missing_in_subform( document_factory, form_question_factory, answer_factory, - info, + admin_user, ): # First, build our nested form: # top_form @@ -585,7 +590,7 @@ def test_validate_missing_in_subform( document = document_factory(form=top_form) with pytest.raises(ValidationError) as excinfo: - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) # Verify that the sub_sub_question is not the cause of the exception: # it should not be checked at all because it's parent is always hidden @@ -602,7 +607,7 @@ def test_validate_missing_in_table( answer_document_factory, answer_factory, table_required, - info, + admin_user, ): # First, build our nested form: # top_form @@ -653,7 +658,7 @@ def test_validate_missing_in_table( row_doc.answers.create(question=sub_question1, value="hi") with pytest.raises(ValidationError) as excinfo: - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) # Verify that the sub_sub_question is not the cause of the exception: # it should not be checked at all because it's parent is always hidden assert excinfo.match( @@ -662,7 +667,7 @@ def test_validate_missing_in_table( else: # should not raise - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) @pytest.mark.parametrize( @@ -679,7 +684,7 @@ def test_validate_form_in_table( answer_factory, column_is_hidden, expect_error, - info, + admin_user, ): # First, build our nested form: # top_form @@ -731,7 +736,7 @@ def test_validate_form_in_table( # Should raise an error with pytest.raises(ValidationError) as excinfo: - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) # Verify that the sub_sub_question is not the cause of the exception: # it should not be checked at all because it's parent is always hidden assert excinfo.match( @@ -741,4 +746,4 @@ def test_validate_form_in_table( else: # Should not raise, as the "form" referenced by the # question's jexl is the rowform, which is wrong - DocumentValidator().validate(document, info) + DocumentValidator().validate(document, admin_user) diff --git a/caluma/caluma_form/validators.py b/caluma/caluma_form/validators.py index b659d1ecc..d354b12f7 100644 --- a/caluma/caluma_form/validators.py +++ b/caluma/caluma_form/validators.py @@ -103,20 +103,20 @@ def _validate_question_multiple_choice(self, question, value, **kwargs): ) def _validate_question_dynamic_choice( - self, question, value, document, info, **kwargs + self, question, value, document, user, **kwargs ): if not isinstance(value, str): raise CustomValidationError( f'Invalid value "{value}". Must be of type str.', slugs=[question.slug] ) - self._validate_dynamic_option(question, document, value, info) + self._validate_dynamic_option(question, document, value, user) - def _validate_dynamic_option(self, question, document, option, info): + def _validate_dynamic_option(self, question, document, option, user): data_source = get_data_sources(dic=True)[question.data_source] data_source_object = data_source() valid_label = data_source_object.validate_answer_value( - option, document, question, info + option, document, question, user ) if valid_label is False: raise CustomValidationError( @@ -129,13 +129,13 @@ def _validate_dynamic_option(self, question, document, option, info): slug=option, defaults={ "label": valid_label, - "created_by_user": info.context.user.username, - "created_by_group": info.context.user.group, + "created_by_user": user.username, + "created_by_group": user.group, }, ) def _validate_question_dynamic_multiple_choice( - self, question, value, document, info, **kwargs + self, question, value, document, user, **kwargs ): if not isinstance(value, list): raise CustomValidationError( @@ -148,17 +148,17 @@ def _validate_question_dynamic_multiple_choice( f'Invalid value: "{v}". Must be of type string', slugs=[question.slug], ) - self._validate_dynamic_option(question, document, v, info) + self._validate_dynamic_option(question, document, v, user) - def _validate_question_table(self, question, value, document, info, **kwargs): + def _validate_question_table(self, question, value, document, user, **kwargs): for row_doc in value: - DocumentValidator().validate(row_doc, info=info, **kwargs) + DocumentValidator().validate(row_doc, user=user, **kwargs) def _validate_question_file(self, question, value, **kwargs): pass - def validate(self, *, question, document, info, validation_context=None, **kwargs): + def validate(self, *, question, document, user, validation_context=None, **kwargs): # Check all possible fields for value value = None for i in ["value", "file", "date", "documents"]: @@ -174,7 +174,7 @@ def validate(self, *, question, document, info, validation_context=None, **kwarg question, value, document=document, - info=info, + user=user, validation_context=validation_context, ) @@ -184,7 +184,7 @@ def validate(self, *, question, document, info, validation_context=None, **kwarg class DocumentValidator: - def validate(self, document, info, validation_context=None, **kwargs): + def validate(self, document, user, validation_context=None, **kwargs): if not validation_context: validation_context = self._validation_context(document) @@ -199,7 +199,7 @@ def validate(self, document, info, validation_context=None, **kwargs): question=answer.question, value=answer.value, documents=answer.documents.all(), - info=info, + user=user, validation_context=validation_context, ) @@ -373,13 +373,13 @@ def validate(self, data): self._validate_data_source(data["dataSource"]) -def get_document_validity(document, info): +def get_document_validity(document, user): validator = DocumentValidator() is_valid = True errors = [] try: - validator.validate(document, info) + validator.validate(document, user) except CustomValidationError as exc: is_valid = False detail = str(exc.detail[0]) diff --git a/caluma/caluma_workflow/api.py b/caluma/caluma_workflow/api.py index 1d6e786ef..d85911d2d 100644 --- a/caluma/caluma_workflow/api.py +++ b/caluma/caluma_workflow/api.py @@ -19,14 +19,12 @@ def start_case( Similar to Case.objects.create(), but with the same business logic as used in the `saveCase` mutation. - Usage example: - ```py - start_case( - workflow=Workflow.objects.get(pk="my-wf"), - form=Form.objects.get(pk="my-form")), - user=AnonymousUser() - ) - ``` + >>> start_case( + ... workflow=Workflow.objects.get(pk="my-wf"), + ... form=Form.objects.get(pk="my-form")), + ... user=AnonymousUser() + ... ) + """ data = {"workflow": workflow, "form": form, "parent_work_item": parent_work_item} @@ -39,3 +37,45 @@ def start_case( case = models.Case.objects.create(**validated_data) return domain_logic.StartCaseLogic.post_start(case, user, parent_work_item) + + +def complete_work_item(work_item: models.WorkItem, user: BaseUser) -> models.WorkItem: + """ + Complete a work item (just like `completeWorkItem`). + + >>> complete_work_item( + ... work_item=models.WorkItem.first(), + ... user=AnonymousUser() + ... ) + + ``` + """ + domain_logic.CompleteWorkItemLogic.validate_for_complete(work_item, user) + validated_data = domain_logic.CompleteWorkItemLogic.pre_complete({}, user) + + models.WorkItem.objects.filter(pk=work_item.pk).update(**validated_data) + + domain_logic.CompleteWorkItemLogic.post_complete(work_item, user) + + return work_item + + +def skip_work_item(work_item: models.WorkItem, user: BaseUser) -> models.WorkItem: + """ + Skip a work item (just like `SkipWorkItem`). + + >>> skip_work_item( + ... work_item=models.WorkItem.first(), + ... user=AnonymousUser() + ... ) + + """ + domain_logic.SkipWorkItemLogic.validate_for_skip(work_item) + + validated_data = domain_logic.SkipWorkItemLogic.pre_skip({}, user) + + models.WorkItem.objects.filter(pk=work_item.pk).update(**validated_data) + + domain_logic.SkipWorkItemLogic.post_skip(work_item, user) + + return work_item diff --git a/caluma/caluma_workflow/domain_logic.py b/caluma/caluma_workflow/domain_logic.py index 8b989be9f..3646f3861 100644 --- a/caluma/caluma_workflow/domain_logic.py +++ b/caluma/caluma_workflow/domain_logic.py @@ -1,9 +1,11 @@ from django.core.exceptions import ValidationError +from django.utils import timezone from caluma.caluma_core.events import send_event from ..caluma_form.models import Document -from . import events, models, utils +from . import events, models, utils, validators +from .jexl import FlowJexl class StartCaseLogic: @@ -72,3 +74,117 @@ def post_start(case, user, parent_work_item): ) return case + + +class CompleteWorkItemLogic: + @staticmethod + def _can_continue(work_item, task): + # If a "multiple instance" task has running siblings, the task is not completed + if task.is_multiple_instance: + return not work_item.case.work_items.filter( + task=task, status=models.WorkItem.STATUS_READY + ).exists() + return work_item.case.work_items.filter( + task=task, + status__in=( + models.WorkItem.STATUS_COMPLETED, + models.WorkItem.STATUS_SKIPPED, + ), + ).exists() + + @staticmethod + def validate_for_complete(work_item, user): + validators.WorkItemValidator().validate( + status=work_item.status, + child_case=work_item.child_case, + task=work_item.task, + case=work_item.case, + document=work_item.document, + user=user, + ) + + @staticmethod + def pre_complete(validated_data, user): + validated_data["status"] = models.WorkItem.STATUS_COMPLETED + validated_data["closed_at"] = timezone.now() + validated_data["closed_by_user"] = user.username + validated_data["closed_by_group"] = user.group + + return validated_data + + @staticmethod + def post_complete(work_item, user): + case = work_item.case + + if not CompleteWorkItemLogic._can_continue(work_item, work_item.task): + return work_item + + flow = models.Flow.objects.filter(task_flows__task=work_item.task_id).first() + flow_referenced_tasks = models.Task.objects.filter(task_flows__flow=flow) + + all_complete = all( + CompleteWorkItemLogic._can_continue(work_item, task) + for task in flow_referenced_tasks + ) + + if flow and all_complete: + jexl = FlowJexl() + result = jexl.evaluate(flow.next) + if not isinstance(result, list): + result = [result] + + tasks = models.Task.objects.filter(pk__in=result) + + created_work_items = utils.bulk_create_work_items( + tasks, case, user, work_item + ) + + for created_work_item in created_work_items: # pragma: no cover + send_event( + events.created_work_item, + sender="post_complete_work_item", + work_item=created_work_item, + ) + else: + # no more tasks, mark case as complete + case.status = models.Case.STATUS_COMPLETED + case.closed_at = timezone.now() + case.closed_by_user = user.username + case.closed_by_group = user.group + case.save() + send_event( + events.completed_case, sender="post_complete_work_item", case=case + ) + + send_event( + events.completed_work_item, + sender="post_complete_work_item", + work_item=work_item, + ) + + return work_item + + +class SkipWorkItemLogic: + @staticmethod + def validate_for_skip(work_item): + if work_item.status != models.WorkItem.STATUS_READY: + raise ValidationError("Only READY work items can be skipped") + + @staticmethod + def pre_skip(validated_data, user): + validated_data = CompleteWorkItemLogic.pre_complete(validated_data, user) + + validated_data["status"] = models.WorkItem.STATUS_SKIPPED + + return validated_data + + @staticmethod + def post_skip(work_item, user): + work_item = CompleteWorkItemLogic.post_complete(work_item, user) + + send_event( + events.skipped_work_item, sender="post_skip_work_item", work_item=work_item + ) + + return work_item diff --git a/caluma/caluma_workflow/serializers.py b/caluma/caluma_workflow/serializers.py index 4494414a4..b5b21934c 100644 --- a/caluma/caluma_workflow/serializers.py +++ b/caluma/caluma_workflow/serializers.py @@ -9,7 +9,7 @@ from ..caluma_core import serializers from ..caluma_form.models import Document, Form -from . import domain_logic, events, models, utils, validators +from . import domain_logic, events, models, utils from .jexl import FlowJexl, GroupJexl @@ -256,105 +256,61 @@ def update(self, instance, validated_data): return instance -class CompleteWorkItemSerializer(SendEventSerializerMixin, serializers.ModelSerializer): +class CompleteWorkItemSerializer(serializers.ModelSerializer): id = serializers.GlobalIDField() def validate(self, data): - user = self.context["request"].user - validators.WorkItemValidator().validate( - status=self.instance.status, - child_case=self.instance.child_case, - task=self.instance.task, - case=self.instance.case, - document=self.instance.document, - info=self.context["info"], - ) - data["status"] = models.WorkItem.STATUS_COMPLETED - data["closed_at"] = timezone.now() - data["closed_by_user"] = user.username - data["closed_by_group"] = user.group - return super().validate(data) + try: + domain_logic.CompleteWorkItemLogic.validate_for_complete( + self.instance, self.context["request"].user + ) + except ValidationError as e: + raise exceptions.ValidationError(str(e)) - def _can_continue(self, instance, task): - # If a "multiple instance" task has running siblings, the task is not completed - if task.is_multiple_instance: - return not instance.case.work_items.filter( - task=task, status=models.WorkItem.STATUS_READY - ).exists() - return instance.case.work_items.filter( - task=task, - status__in=( - models.WorkItem.STATUS_COMPLETED, - models.WorkItem.STATUS_SKIPPED, - ), - ).exists() + return super().validate(data) @transaction.atomic - def update(self, instance, validated_data): - - instance = super().update(instance, validated_data) + def update(self, work_item, validated_data): user = self.context["request"].user - case = instance.case - - if not self._can_continue(instance, instance.task): - return instance - flow = models.Flow.objects.filter(task_flows__task=instance.task_id).first() - flow_referenced_tasks = models.Task.objects.filter(task_flows__flow=flow) - - all_complete = all( - self._can_continue(instance, task) for task in flow_referenced_tasks + validated_data = domain_logic.CompleteWorkItemLogic.pre_complete( + validated_data, user ) - if flow and all_complete: - jexl = FlowJexl() - result = jexl.evaluate(flow.next) - if not isinstance(result, list): - result = [result] - - tasks = models.Task.objects.filter(pk__in=result) - - work_items = utils.bulk_create_work_items(tasks, case, user, instance) + work_item = super().update(work_item, validated_data) + work_item = domain_logic.CompleteWorkItemLogic.post_complete(work_item, user) - for work_item in work_items: # pragma: no cover - self.send_event(events.created_work_item, work_item=work_item) - else: - # no more tasks, mark case as complete - case.status = models.Case.STATUS_COMPLETED - case.closed_at = timezone.now() - case.closed_by_user = user.username - case.closed_by_group = user.group - case.save() - self.send_event(events.completed_case, case=case) - - self.send_event(events.completed_work_item, work_item=instance) - - return instance + return work_item class Meta: model = models.WorkItem fields = ("id",) -class SkipWorkItemSerializer(CompleteWorkItemSerializer): +class SkipWorkItemSerializer(serializers.ModelSerializer): + id = serializers.GlobalIDField() + def validate(self, data): - if self.instance.status != models.WorkItem.STATUS_READY: - raise exceptions.ValidationError("Only READY work items can be skipped") + try: + domain_logic.SkipWorkItemLogic.validate_for_skip(self.instance) + except ValidationError as e: + raise exceptions.ValidationError(str(e)) + + return data + def update(self, work_item, validated_data): user = self.context["request"].user - data["status"] = models.WorkItem.STATUS_SKIPPED - data["closed_at"] = timezone.now() - data["closed_by_user"] = user.username - data["closed_by_group"] = user.group - # We skip parent validation, as the work item is now "skipped", - # meaning no other conditions need apply - return data + validated_data = domain_logic.SkipWorkItemLogic.pre_skip(validated_data, user) - def update(self, instance, validated_data): - instance = super().update(instance, validated_data) - self.send_event(events.skipped_work_item, work_item=instance) - return instance + work_item = super().update(work_item, validated_data) + work_item = domain_logic.SkipWorkItemLogic.post_skip(work_item, user) + + return work_item + + class Meta: + model = models.WorkItem + fields = ("id",) class SaveWorkItemSerializer(SendEventSerializerMixin, serializers.ModelSerializer): diff --git a/caluma/caluma_workflow/tests/test_work_item.py b/caluma/caluma_workflow/tests/test_work_item.py index 5136f7d8e..0858b9b61 100644 --- a/caluma/caluma_workflow/tests/test_work_item.py +++ b/caluma/caluma_workflow/tests/test_work_item.py @@ -6,7 +6,7 @@ from ...caluma_core.relay import extract_global_id from ...caluma_form.models import Question from ...caluma_user.models import BaseUser -from .. import models +from .. import api, models def test_query_all_work_items_filter_status(db, work_item_factory, schema_executor): @@ -990,3 +990,31 @@ def test_skip_multiple_instance_task_form_work_item( assert result.data["skipWorkItem"]["workItem"]["case"]["status"] == to_const( models.Case.STATUS_COMPLETED ) + + +@pytest.mark.parametrize("task__type,task__form", [(models.Task.TYPE_SIMPLE, None)]) +@pytest.mark.parametrize( + "work_item__status,case__status", + [(models.WorkItem.STATUS_READY, models.Case.STATUS_COMPLETED)], +) +def test_complete_work_item_last_api(db, work_item, admin_user): + api.complete_work_item(work_item, admin_user) + + work_item.refresh_from_db() + + assert work_item.status == models.WorkItem.STATUS_COMPLETED + assert work_item.case.status == models.Case.STATUS_COMPLETED + + +@pytest.mark.parametrize("task__type,task__form", [(models.Task.TYPE_SIMPLE, None)]) +@pytest.mark.parametrize( + "work_item__status,case__status", + [(models.WorkItem.STATUS_READY, models.Case.STATUS_COMPLETED)], +) +def test_skip_work_item_last_api(db, work_item, admin_user): + api.skip_work_item(work_item, admin_user) + + work_item.refresh_from_db() + + assert work_item.status == models.WorkItem.STATUS_SKIPPED + assert work_item.case.status == models.Case.STATUS_COMPLETED diff --git a/caluma/caluma_workflow/validators.py b/caluma/caluma_workflow/validators.py index f004025e7..4d63ab7ea 100644 --- a/caluma/caluma_workflow/validators.py +++ b/caluma/caluma_workflow/validators.py @@ -1,27 +1,27 @@ -from rest_framework import exceptions +from django.core.exceptions import ValidationError from ..caluma_form.validators import DocumentValidator from . import models class WorkItemValidator: - def _validate_task_simple(self, task, case, document, info): + def _validate_task_simple(self, task, case, document, user): pass - def _validate_task_complete_workflow_form(self, task, case, document, info): - self._validate_task_complete_task_form(task, case, case.document, info) + def _validate_task_complete_workflow_form(self, task, case, document, user): + self._validate_task_complete_task_form(task, case, case.document, user) - def _validate_task_complete_task_form(self, task, case, document, info): - DocumentValidator().validate(document, info) + def _validate_task_complete_task_form(self, task, case, document, user): + DocumentValidator().validate(document, user) - def validate(self, *, status, child_case, case, task, document, info, **kwargs): + def validate(self, *, status, child_case, case, task, document, user, **kwargs): if status != models.WorkItem.STATUS_READY: - raise exceptions.ValidationError("Only ready work items can be completed.") + raise ValidationError("Only ready work items can be completed.") if child_case: if child_case.status == models.Case.STATUS_RUNNING: - raise exceptions.ValidationError( + raise ValidationError( "Work item can only be completed when child case is in a finish state." ) - getattr(self, f"_validate_task_{task.type}")(task, case, document, info) + getattr(self, f"_validate_task_{task.type}")(task, case, document, user)