Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better error reporting #645

Merged
merged 2 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ prof/
.idea/
.~c9*
.vscode/
pyrightconfig.json

# Python
*.py[cod]
Expand Down
29 changes: 21 additions & 8 deletions controller/src/mantarray_desktop_app/sub_processes/mc_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
from ..exceptions import FirmwareUpdateCommandFailedError
from ..exceptions import FirmwareUpdateTimeoutError
from ..exceptions import IncorrectInstrumentConnectedError
from ..exceptions import InstrumentBadDataError
from ..exceptions import InstrumentDataStreamingAlreadyStartedError
from ..exceptions import InstrumentDataStreamingAlreadyStoppedError
from ..exceptions import InstrumentFirmwareError
Expand Down Expand Up @@ -333,11 +334,13 @@ def _teardown_after_loop(self) -> None:
board = self._board_connections[board_idx]
if board is not None:
# log any data in cache, flush and log remaining serial data
cache_data = list(self._serial_packet_cache)
buffer_data = list(board.read_all())
put_log_message_into_queue(
logging.INFO,
f"Duration (seconds) since events: {self._get_dur_since_events()}. "
f"Remaining serial data in cache: {list(self._serial_packet_cache)}, "
f"in buffer: {list(board.read_all())}",
f"Remaining serial data in cache ({len(cache_data)} bytes): {cache_data}, "
f"in buffer ({len(buffer_data)} bytes): {buffer_data}",
self._board_queues[board_idx][1],
self.get_logging_level(),
)
Expand Down Expand Up @@ -1063,7 +1066,14 @@ def _handle_data_stream(self) -> None:
)
self._timepoints_of_prev_actions["packet_sort"] = perf_counter()
# sort packets by into packet type groups: magnetometer data, stim status, other
sorted_packet_dict = sort_serial_packets(bytearray(self._serial_packet_cache))
try:
sorted_packet_dict = sort_serial_packets(bytearray(self._serial_packet_cache))
except InstrumentBadDataError: # pragma: no cover
self._update_performance_metrics(new_performance_tracking_values)
self._handle_performance_logging(force=True)
sleep(0.02) # sleep to ensure that this message is processed by process monitor
raise

new_performance_tracking_values["sorting_duration"] = _get_dur_of_data_sort_secs(
self._timepoints_of_prev_actions["packet_sort"] # type: ignore
)
Expand Down Expand Up @@ -1403,12 +1413,15 @@ def _update_performance_metrics(self, new_performance_tracking_values: Dict[str,
for metric_name, metric_value in new_performance_tracking_values.items():
self._performance_tracking_values[metric_name].append(metric_value)

def _handle_performance_logging(self) -> None:
if logging.DEBUG >= self._logging_level: # pragma: no cover
def _handle_performance_logging(self, force: bool = False) -> None:
if logging.DEBUG >= self._logging_level or force: # pragma: no cover
performance_metrics: Dict[str, Any] = {"communication_type": "performance_metrics"}
for metric_name, metric_values in self._performance_tracking_values.items():
performance_metrics[metric_name] = None
if len(metric_values) > 2:
performance_metrics[metric_name] = create_metrics_stats(metric_values)
performance_metrics[metric_name] = {"n": len(metric_values)}
try:
performance_metrics[metric_name] |= create_metrics_stats(metric_values)
except Exception:
performance_metrics[metric_name] |= {"error": "creating stats"}

self._send_performance_metrics(performance_metrics)
self._reset_performance_tracking_values()
77 changes: 19 additions & 58 deletions controller/tests/mc_comm/test_data_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ def test_McCommunicationProcess__raises_error_when_set_sampling_period_command_r


def test_McCommunicationProcess__processes_start_managed_acquisition_command__and_raises_error_when_already_streaming(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
patch_print,
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon, patch_print
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
from_main_queue = four_board_mc_comm_process_no_handshake["board_queues"][0][0]
Expand Down Expand Up @@ -174,8 +172,7 @@ def test_McCommunicationProcess__processes_stop_data_streaming_command__when_dat

# put simulator in data streaming mode
put_object_into_queue_and_raise_error_if_eventually_still_empty(
{"command": "set_data_streaming_status", "data_streaming_status": True},
testing_queue,
{"command": "set_data_streaming_status", "data_streaming_status": True}, testing_queue
)

expected_response = {"communication_type": "acquisition_manager", "command": "stop_managed_acquisition"}
Expand All @@ -194,9 +191,7 @@ def test_McCommunicationProcess__processes_stop_data_streaming_command__when_dat


def test_McCommunicationProcess__processes_stop_data_streaming_command__and_raises_error_when_not_streaming(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
patch_print,
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon, patch_print
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
from_main_queue = four_board_mc_comm_process_no_handshake["board_queues"][0][0]
Expand All @@ -216,9 +211,7 @@ def test_McCommunicationProcess__processes_stop_data_streaming_command__and_rais


def test_McCommunicationProcess__reads_all_bytes_from_instrument__and_does_not_sort_packets_if_not_at_least_one_full_packet_is_present(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
mocker,
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon, mocker
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
to_fw_queue = four_board_mc_comm_process_no_handshake["board_queues"][0][2]
Expand Down Expand Up @@ -263,9 +256,7 @@ def test_McCommunicationProcess__reads_all_bytes_from_instrument__and_does_not_s


def test_McCommunicationProcess__tries_to_read_one_more_time_if_first_read_fails(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
mocker,
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon, mocker
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
to_main_queue = four_board_mc_comm_process_no_handshake["board_queues"][0][1]
Expand Down Expand Up @@ -307,9 +298,7 @@ def test_McCommunicationProcess__tries_to_read_one_more_time_if_first_read_fails


def test_McCommunicationProcess__processes_non_stream_packet_immediately(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
mocker,
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon, mocker
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
to_main_queue = four_board_mc_comm_process_no_handshake["board_queues"][0][1]
Expand Down Expand Up @@ -350,9 +339,7 @@ def test_McCommunicationProcess__processes_non_stream_packet_immediately(


def test_McCommunicationProcess__correctly_indicates_which_packet_is_the_first_of_the_stream(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
mocker,
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon, mocker
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
to_fw_queue = four_board_mc_comm_process_no_handshake["board_queues"][0][2]
Expand Down Expand Up @@ -389,9 +376,7 @@ def test_McCommunicationProcess__correctly_indicates_which_packet_is_the_first_o


def test_McCommunicationProcess__does_not_parse_data_packets_before_one_second_of_data_is_present__all_channels_enabled(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
mocker,
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon, mocker
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
to_fw_queue = four_board_mc_comm_process_no_handshake["board_queues"][0][2]
Expand Down Expand Up @@ -422,9 +407,7 @@ def test_McCommunicationProcess__does_not_parse_data_packets_before_one_second_o


def test_McCommunicationProcess__handles_read_of_only_data_packets__and_sends_data_to_file_writer_correctly__when_at_least_one_second_of_data_with_all_channels_enabled_is_present(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
mocker,
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon, mocker
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
to_fw_queue = four_board_mc_comm_process_no_handshake["board_queues"][0][2]
Expand Down Expand Up @@ -465,7 +448,7 @@ def test_McCommunicationProcess__handles_read_of_only_data_packets__and_sends_da
channel_dict = {
"time_offsets": np.zeros(
(SERIAL_COMM_NUM_SENSORS_PER_WELL, expected_num_packets), dtype=np.uint16
),
)
}
for channel_id in range(SERIAL_COMM_NUM_DATA_CHANNELS):
channel_dict[channel_id] = expected_data * np.uint16(well_idx + 1)
Expand Down Expand Up @@ -498,9 +481,7 @@ def test_McCommunicationProcess__handles_read_of_only_data_packets__and_sends_da


def test_McCommunicationProcess__handles_read_of_only_data_packets__and_sends_data_to_file_writer_correctly__when_one_second_of_data_is_present(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
mocker,
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon, mocker
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
to_fw_queue = four_board_mc_comm_process_no_handshake["board_queues"][0][2]
Expand Down Expand Up @@ -580,9 +561,7 @@ def test_McCommunicationProcess__handles_read_of_only_data_packets__and_sends_da


def test_McCommunicationProcess__handles_one_second_read_with_two_interrupting_packets_correctly(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
mocker,
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon, mocker
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
_, to_main_queue, to_fw_queue = four_board_mc_comm_process_no_handshake["board_queues"][0]
Expand Down Expand Up @@ -668,14 +647,10 @@ def test_McCommunicationProcess__handles_one_second_read_with_two_interrupting_p
actual_time_offsets = actual_fw_item[key]["time_offsets"]
actual_data = actual_fw_item[key][expected_sensor_axis_id]
np.testing.assert_array_equal(
actual_time_offsets,
expected_item["time_offsets"],
err_msg=f"Failure at '{key}' key",
actual_time_offsets, expected_item["time_offsets"], err_msg=f"Failure at '{key}' key"
)
np.testing.assert_array_equal(
actual_data,
expected_item[expected_sensor_axis_id],
err_msg=f"Failure at at '{key}' key",
actual_data, expected_item[expected_sensor_axis_id], err_msg=f"Failure at at '{key}' key"
)


Expand All @@ -690,12 +665,7 @@ def test_McCommunicationProcess__handles_incomplete_read_of_packet_immediately_f
# mocking so no barcode messages are sent from mc_comm to main
mocker.patch.object(simulator, "_handle_barcode", autospec=True)
# mocking to ensure no data packets are sent
mocker.patch.object(
mc_simulator,
"_get_us_since_last_data_packet",
autospec=True,
return_value=0,
)
mocker.patch.object(mc_simulator, "_get_us_since_last_data_packet", autospec=True, return_value=0)

set_connection_and_register_simulator(
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon
Expand Down Expand Up @@ -793,10 +763,7 @@ def test_McCommunicationProcess__updates_performance_metrics_after_parsing_data(
expected_secs_between_reading = [randint(1, 50) for _ in range(test_num_iterations - 1)]
expected_secs_between_reading = list(range(test_num_iterations - 1))
mocked_since_last_read = mocker.patch.object(
mc_comm,
"_get_secs_since_last_data_read",
autospec=True,
side_effect=expected_secs_between_reading,
mc_comm, "_get_secs_since_last_data_read", autospec=True, side_effect=expected_secs_between_reading
)
expected_read_durs = [randint(1, 50) for _ in range(test_num_iterations)]
mocked_data_read_dur = mocker.patch.object(
Expand Down Expand Up @@ -826,14 +793,8 @@ def test_McCommunicationProcess__updates_performance_metrics_after_parsing_data(
side_effect=[
{
"num_packets_sorted": num_packets,
"magnetometer_stream_info": {
"raw_bytes": bytearray(0),
"num_packets": num_packets,
},
"stim_stream_info": {
"raw_bytes": bytearray(0),
"num_packets": 0,
},
"magnetometer_stream_info": {"raw_bytes": bytearray(0), "num_packets": num_packets},
"stim_stream_info": {"raw_bytes": bytearray(0), "num_packets": 0},
"other_packet_info": [],
"unread_bytes": bytearray(0),
}
Expand Down Expand Up @@ -891,7 +852,7 @@ def test_McCommunicationProcess__updates_performance_metrics_after_parsing_data(
("mag_data_parsing_duration", expected_parse_durs),
("num_mag_packets_parsed", expected_num_packets_parsed),
):
assert actual[name] == create_metrics_stats(mc_measurements), name
assert actual[name] == {"n": len(mc_measurements)} | create_metrics_stats(mc_measurements), name

# values created in parent class
assert "idle_iteration_time_ns" not in actual
Expand Down
Loading