Skip to content

Commit

Permalink
improve mock_create in test_pipeline.py
Browse files Browse the repository at this point in the history
  • Loading branch information
dtrai2 committed Jul 2, 2024
1 parent 472448e commit 1819608
Showing 1 changed file with 14 additions and 23 deletions.
37 changes: 14 additions & 23 deletions tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,20 @@ class ConfigurationForTests:
lock = Lock()


def add_empty_processor_result_to_process_mocks(pipeline):
for processor in pipeline:
processor.process = mock.MagicMock()
processor.process.return_value = ProcessorResult(name="mock_processor")


@mock.patch("logprep.factory.Factory.create")
def get_mock_create():
"""
Create a new mock_create magic mock with a default processor result. Is applied for every
test.
"""
mock_create = mock.MagicMock()
mock_component = mock.MagicMock()
mock_component.process = mock.MagicMock()
mock_component.process.return_value = ProcessorResult(name="mock_processor")
mock_create.return_value = mock_component
return mock_create


@mock.patch("logprep.factory.Factory.create", new_callable=get_mock_create)
class TestPipeline(ConfigurationForTests):
def setup_method(self):
self._check_failed_stored = None
Expand All @@ -74,7 +81,6 @@ def setup_method(self):

def test_pipeline_property_returns_pipeline(self, mock_create):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
assert len(self.pipeline._pipeline) == 2
assert mock_create.call_count == 4 # 2 processors, 1 input, 1 output

Expand Down Expand Up @@ -146,7 +152,6 @@ def test_empty_documents_are_not_forwarded_to_other_processors(self, _):

def test_not_empty_documents_are_stored_in_the_output(self, _):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._input.get_next.return_value = ({"message": "test"}, None)
self.pipeline._store_event = mock.MagicMock()
self.pipeline.process_pipeline()
Expand Down Expand Up @@ -177,7 +182,6 @@ def test_shut_down_calls_shut_down_on_output(self, _):

def test_all_events_provided_by_input_arrive_at_output(self, _):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._setup = mock.MagicMock()
input_data = [{"test": "1"}, {"test": "2"}, {"test": "3"}]
expected_output_data = deepcopy(input_data)
Expand All @@ -190,7 +194,6 @@ def test_all_events_provided_by_input_arrive_at_output(self, _):
@mock.patch("logging.Logger.error")
def test_critical_output_error_is_logged_and_stored_as_failed(self, mock_error, _):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._input.get_next.return_value = ({"order": 1}, None)
raised_error = CriticalOutputError(
self.pipeline._output["dummy"], "An error message", {"failed": "event"}
Expand All @@ -203,7 +206,6 @@ def test_critical_output_error_is_logged_and_stored_as_failed(self, mock_error,
@mock.patch("logging.Logger.warning")
def test_input_warning_error_is_logged_but_processing_continues(self, mock_warning, _):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)

def raise_warning_error(_):
raise InputWarning(self.pipeline._input, "i warn you")
Expand All @@ -223,7 +225,6 @@ def raise_warning_error(_):
@mock.patch("logging.Logger.warning")
def test_output_warning_error_is_logged_but_processing_continues(self, mock_warning, _):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._input.get_next.return_value = ({"order": 1}, None)
self.pipeline._output["dummy"].metrics = mock.MagicMock()
self.pipeline._output["dummy"].metrics.number_of_warnings = 0
Expand Down Expand Up @@ -258,7 +259,6 @@ def test_processor_critical_error_is_logged_event_is_stored_in_error_output(
self, mock_error, _
):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
input_event1 = {"message": "first event"}
input_event2 = {"message": "second event"}
self.pipeline._input.get_next.return_value = (input_event1, None)
Expand Down Expand Up @@ -306,7 +306,6 @@ def test_processor_logs_processing_error_and_warnings_separately(
self, mock_warning, mock_error, _
):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
input_event1 = {"message": "first event"}
self.pipeline._input.get_next.return_value = (input_event1, None)
mock_rule = mock.MagicMock()
Expand Down Expand Up @@ -369,7 +368,6 @@ def raise_critical(event):
dummy_output.store = raise_critical
dummy_output.metrics.number_of_errors = 0
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._output["dummy"] = dummy_output
self.pipeline._input.get_next.return_value = (input_event, None)
self.pipeline.process_pipeline()
Expand All @@ -389,7 +387,6 @@ def raise_warning(event):
input_event = {"test": "message"}
dummy_output.store = raise_warning
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._output["dummy"] = dummy_output
self.pipeline._input.get_next.return_value = (input_event, None)
self.pipeline.process_pipeline()
Expand Down Expand Up @@ -420,7 +417,6 @@ def test_processor_fatal_output_error_is_logged_pipeline_is_shutdown(
self, mock_log_error, mock_shut_down, _
):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._input.get_next.return_value = ({"some": "event"}, None)
raised_error = FatalOutputError(mock.MagicMock(), "fatal output error")
self.pipeline._output["dummy"].store.side_effect = raised_error
Expand All @@ -439,7 +435,6 @@ def test_processor_fatal_output_error_in_setup_is_logged(self, mock_log_error, _

def test_extra_data_tuple_is_passed_to_store_custom(self, _):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._input.get_next.return_value = ({"some": "event"}, None)
self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0])
self.pipeline._pipeline[1].process.return_value = ProcessorResult(
Expand All @@ -454,7 +449,6 @@ def test_extra_data_tuple_is_passed_to_store_custom(self, _):
def test_store_custom_calls_all_defined_outputs(self, _):
self.pipeline._output.update({"dummy1": mock.MagicMock()})
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0])
self.pipeline._pipeline[1].process.return_value = ProcessorResult(
name="",
Expand All @@ -478,7 +472,6 @@ def test_store_custom_calls_all_defined_outputs(self, _):

def test_extra_data_list_is_passed_to_store_custom(self, _):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0])
self.pipeline._pipeline[1].process.return_value = ProcessorResult(
name="", data=[({"foo": "bar"}, ({"dummy": "target"},))]
Expand Down Expand Up @@ -511,7 +504,6 @@ def test_pipeline_does_not_call_batch_finished_callback_if_output_store_does_not
self, _
):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._input.get_next.return_value = ({"some": "event"}, None)
self.pipeline._input.batch_finished_callback = mock.MagicMock()
self.pipeline._output["dummy"].store = mock.MagicMock()
Expand All @@ -521,7 +513,6 @@ def test_pipeline_does_not_call_batch_finished_callback_if_output_store_does_not

def test_retrieve_and_process_data_calls_store_failed_for_non_critical_error_message(self, _):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
self.pipeline._input.get_next.return_value = ({"some": "event"}, "This is non critical")
self.pipeline.process_pipeline()
self.pipeline._output["dummy"].store_failed.assert_called_with(
Expand Down

0 comments on commit 1819608

Please sign in to comment.