diff --git a/src/niscope/system_tests/test_system_niscope.py b/src/niscope/system_tests/test_system_niscope.py index b5c6d2e57..9ed3b86ec 100644 --- a/src/niscope/system_tests/test_system_niscope.py +++ b/src/niscope/system_tests/test_system_niscope.py @@ -19,7 +19,11 @@ instruments = ['FakeDevice1', 'FakeDevice2'] # TODO(sbethur): Use `get_channel_names` when #1402 is fixed -test_channels = 'FakeDevice2/0,FakeDevice1/1' +test_channels_1 = 'FakeDevice2/0,FakeDevice1/1' +test_channels_1_expanded = test_channels_1 +# TODO(jfitzger): Use the commented values, once #1770 is fixed +test_channels_2 = 'FakeDevice2/0' # 'FakeDevice2/0:1' +test_channels_2_expanded = 'FakeDevice2/0' # 'FakeDevice2/0,FakeDevice2/1' # There are system tests below that need either a PXI-5124 or a PXI-5142 instead of the PXIe-5164 we use everywhere else @@ -37,6 +41,33 @@ daqmx_sim_5142_lock = fasteners.InterProcessLock(daqmx_sim_5142_lock_file) +def check_fetched_data( + data, # either waveforms or measurement_stats + test_channels_expanded, + test_record_length, + test_num_records_to_fetch, + test_starting_record_number=0 +): + test_num_channels = len(test_channels_expanded.split(',')) + + # Ordering: rec 0: ch 0, rec 0: ch 1, rec 1: ch 0, rec 1: ch 1, etc. + expected_channels = test_channels_expanded.split(',') * test_num_records_to_fetch + expected_records = [] + for i in range(test_starting_record_number, test_starting_record_number + test_num_records_to_fetch): + expected_records += [i] * test_num_channels + + assert len(data) == test_num_channels * test_num_records_to_fetch + for i in range(len(data)): + if isinstance(data[i], niscope.WaveformInfo): + assert len(data[i].samples) == test_record_length + elif isinstance(data[i], niscope.MeasurementStats): + assert data[i].result == 0.0 + else: + raise TypeError(f"data is unsupported type {type(data[i])}") + assert data[i].channel == expected_channels[i] + assert data[i].record == expected_records[i] + + class SystemTests: @pytest.fixture(scope='function') def single_instrument_session(self, session_creation_kwargs): @@ -72,22 +103,32 @@ def test_vi_string_attribute(self, multi_instrument_session): assert trigger_source == multi_instrument_session.acq_arm_source # Basic usability tests - def test_read(self, multi_instrument_session): + @pytest.mark.parametrize( + "test_channels,test_channels_expanded", + [ + (test_channels_1, test_channels_1_expanded), + (test_channels_2, test_channels_2_expanded), + ], + ) + def test_read(self, multi_instrument_session, test_channels, test_channels_expanded): test_voltage = 1.0 test_record_length = 2000 - test_num_channels = 2 test_num_records = 3 multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC) multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True) waveforms = multi_instrument_session.channels[test_channels].read(num_samples=test_record_length, num_records=test_num_records) - assert len(waveforms) == test_num_channels * test_num_records - for i in range(len(waveforms)): - assert len(waveforms[i].samples) == test_record_length + check_fetched_data(waveforms, test_channels_expanded, test_record_length, test_num_records) - def test_fetch(self, multi_instrument_session): + @pytest.mark.parametrize( + "test_channels,test_channels_expanded", + [ + (test_channels_1, test_channels_1_expanded), + (test_channels_2, test_channels_2_expanded), + ], + ) + def test_fetch(self, multi_instrument_session, test_channels, test_channels_expanded): test_voltage = 1.0 test_record_length = 2000 - test_num_channels = 2 test_starting_record_number = 2 test_num_records_to_acquire = 5 test_num_records_to_fetch = test_num_records_to_acquire - test_starting_record_number @@ -98,13 +139,13 @@ def test_fetch(self, multi_instrument_session): num_samples=test_record_length, record_number=test_starting_record_number, num_records=test_num_records_to_fetch) - assert len(waveforms) == test_num_channels * test_num_records_to_fetch - expected_channels = test_channels.split(',') * test_num_records_to_fetch - expected_records = [2, 2, 3, 3, 4, 4] - for i in range(len(waveforms)): - assert len(waveforms[i].samples) == test_record_length - assert waveforms[i].channel == expected_channels[i] - assert waveforms[i].record == expected_records[i] + check_fetched_data( + waveforms, + test_channels_expanded, + test_record_length, + test_num_records_to_fetch, + test_starting_record_number, + ) def test_fetch_defaults(self, multi_instrument_session): test_voltage = 1.0 @@ -113,7 +154,7 @@ def test_fetch_defaults(self, multi_instrument_session): multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC) multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, 1, True) with multi_instrument_session.initiate(): - waveforms = multi_instrument_session.channels[test_channels].fetch() + waveforms = multi_instrument_session.channels[test_channels_1].fetch() assert len(waveforms) == test_num_channels for i in range(len(waveforms)): assert len(waveforms[i].samples) == test_record_length @@ -123,10 +164,22 @@ def measurement_wfm_length(self, request): MeasWfmLength = collections.namedtuple('MeasurementWaveformLength', ['passed_in', 'expected']) return MeasWfmLength(passed_in=request.param[0], expected=request.param[1]) - def test_fetch_array_measurement(self, multi_instrument_session, measurement_wfm_length): + @pytest.mark.parametrize( + "test_channels,test_channels_expanded", + [ + (test_channels_1, test_channels_1_expanded), + (test_channels_2, test_channels_2_expanded), + ], + ) + def test_fetch_array_measurement( + self, + multi_instrument_session, + measurement_wfm_length, + test_channels, + test_channels_expanded, + ): test_voltage = 1.0 test_record_length = 1000 - test_num_channels = 2 test_meas_wfm_length = measurement_wfm_length.passed_in test_array_meas_function = niscope.ArrayMeasurement.ARRAY_GAIN test_starting_record_number = 2 @@ -146,13 +199,13 @@ def test_fetch_array_measurement(self, multi_instrument_session, measurement_wfm meas_num_samples=2000, timeout=hightime.timedelta(seconds=4)) - assert len(waveforms) == test_num_channels * test_num_records_to_fetch - expected_channels = test_channels.split(',') * test_num_records_to_fetch - expected_records = [2, 2, 3, 3, 4, 4] - for i in range(len(waveforms)): - assert len(waveforms[i].samples) == measurement_wfm_length.expected - assert waveforms[i].channel == expected_channels[i] - assert waveforms[i].record == expected_records[i] + check_fetched_data( + waveforms, + test_channels_expanded, + measurement_wfm_length.expected, + test_num_records_to_fetch, + test_starting_record_number, + ) def test_fetch_array_measurement_defaults(self, multi_instrument_session): test_voltage = 1.0 @@ -165,18 +218,28 @@ def test_fetch_array_measurement_defaults(self, multi_instrument_session): multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True) with multi_instrument_session.initiate(): - waveforms = multi_instrument_session.channels[test_channels].fetch_array_measurement( + waveforms = multi_instrument_session.channels[test_channels_1].fetch_array_measurement( array_meas_function=test_array_meas_function) assert len(waveforms) == test_num_channels * test_num_records for i in range(len(waveforms)): assert len(waveforms[i].samples) == test_record_length - def test_fetch_measurement_stats(self, multi_instrument_session): + @pytest.mark.parametrize( + "test_channels,test_channels_expanded", + [ + (test_channels_1, test_channels_1_expanded), + (test_channels_2, test_channels_2_expanded), + ], + ) + def test_fetch_measurement_stats( + self, + multi_instrument_session, + test_channels, + test_channels_expanded, + ): test_voltage = 1.0 test_record_length = 1000 - test_num_channels = 2 - test_num_records = 3 test_starting_record_number = 2 test_num_records_to_acquire = 5 test_num_records_to_fetch = test_num_records_to_acquire - test_starting_record_number @@ -191,13 +254,13 @@ def test_fetch_measurement_stats(self, multi_instrument_session): num_records=test_num_records_to_fetch, timeout=hightime.timedelta(seconds=4)) - assert len(measurement_stats) == test_num_channels * test_num_records - expected_channels = test_channels.split(',') * test_num_records_to_fetch - expected_records = [2, 2, 3, 3, 4, 4] - for i in range(len(measurement_stats)): - assert measurement_stats[i].result == 0.0 - assert measurement_stats[i].channel == expected_channels[i] - assert measurement_stats[i].record == expected_records[i] + check_fetched_data( + measurement_stats, + test_channels_expanded, + None, # pass None for test_record_length, because we shouldn't need it + test_num_records_to_fetch, + test_starting_record_number + ) def test_fetch_measurement_stats_defaults(self, multi_instrument_session): test_voltage = 1.0 @@ -207,7 +270,7 @@ def test_fetch_measurement_stats_defaults(self, multi_instrument_session): multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC) multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True) with multi_instrument_session.initiate(): - measurement_stats = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT) + measurement_stats = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT) assert len(measurement_stats) == test_num_channels * test_num_records for stat in measurement_stats: @@ -220,10 +283,10 @@ def test_clear_waveform_measurement_stats(self, multi_instrument_session): multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC) multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True) with multi_instrument_session.initiate(): - uncleared_stats = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY) - uncleared_stats_2 = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY) - multi_instrument_session.channels[test_channels].clear_waveform_measurement_stats(niscope.enums.ClearableMeasurement.FREQUENCY) - cleared_stats = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY) + uncleared_stats = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY) + uncleared_stats_2 = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY) + multi_instrument_session.channels[test_channels_1].clear_waveform_measurement_stats(niscope.enums.ClearableMeasurement.FREQUENCY) + cleared_stats = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY) # The principle here is using consistent behavior (i.e. if stats are fetched twice on a single record/channel measurement in a row, it will always be the same) # to demonstrate that clearing the stats does in fact cause a measurable change. @@ -244,9 +307,9 @@ def test_waveform_processing(self, multi_instrument_session): multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True) with multi_instrument_session.initiate(): multi_instrument_session.add_waveform_processing(niscope.enums.ArrayMeasurement.DERIVATIVE) - processed_waveforms = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS) + processed_waveforms = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS) multi_instrument_session.clear_waveform_processing() - unprocessed_waveforms = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS) + unprocessed_waveforms = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS) assert len(processed_waveforms) == test_num_channels * test_num_records assert len(unprocessed_waveforms) == test_num_channels * test_num_records @@ -264,7 +327,7 @@ def test_measurement_stats_str(self, multi_instrument_session): multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC) multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True) with multi_instrument_session.initiate(): - measurement_stat = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT) + measurement_stat = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT) assert isinstance(measurement_stat[0].__str__(), str) assert isinstance(measurement_stat[0].__repr__(), str) @@ -457,18 +520,29 @@ def session_creation_kwargs(self): # not supported by grpc due to numpy usage @pytest.mark.parametrize( - "fetch_waveform_type,type_min_value", + "fetch_waveform_type,type_min_value,test_channels,test_channels_expanded", [ - (numpy.int8, numpy.iinfo(numpy.int8).min), - (numpy.int16, numpy.iinfo(numpy.int16).min), - (numpy.int32, numpy.iinfo(numpy.int32).min), - (numpy.float64, numpy.finfo(numpy.float64).min), + (numpy.int8, numpy.iinfo(numpy.int8).min, test_channels_1, test_channels_1_expanded), + (numpy.int16, numpy.iinfo(numpy.int16).min, test_channels_1, test_channels_1_expanded), + (numpy.int32, numpy.iinfo(numpy.int32).min, test_channels_1, test_channels_1_expanded), + (numpy.float64, numpy.finfo(numpy.float64).min, test_channels_1, test_channels_1_expanded), + (numpy.int8, numpy.iinfo(numpy.int8).min, test_channels_2, test_channels_2_expanded), + (numpy.int16, numpy.iinfo(numpy.int16).min, test_channels_2, test_channels_2_expanded), + (numpy.int32, numpy.iinfo(numpy.int32).min, test_channels_2, test_channels_2_expanded), + (numpy.float64, numpy.finfo(numpy.float64).min, test_channels_2, test_channels_2_expanded), ], ) - def test_fetch_into(self, multi_instrument_session, fetch_waveform_type, type_min_value): + def test_fetch_into( + self, + multi_instrument_session, + fetch_waveform_type, + type_min_value, + test_channels, + test_channels_expanded, + ): test_voltage = 1.0 test_record_length = 2000 - test_num_channels = 2 + test_num_channels = len(test_channels_expanded.split(',')) test_starting_record_number = 2 test_num_records_to_acquire = 5 test_num_records_to_fetch = test_num_records_to_acquire - test_starting_record_number @@ -486,17 +560,18 @@ def test_fetch_into(self, multi_instrument_session, fetch_waveform_type, type_mi for index, sample in enumerate(waveform): assert sample != init_waveform[index] - assert len(waveforms) == test_num_channels * test_num_records_to_fetch - expected_channels = test_channels.split(',') * test_num_records_to_fetch - expected_records = [2, 2, 3, 3, 4, 4] + check_fetched_data( + waveforms, + test_channels_expanded, + test_record_length, + test_num_records_to_fetch, + test_starting_record_number, + ) for i in range(len(waveforms)): record_wfm = waveforms[i].samples - assert len(record_wfm) == test_record_length for j in range(len(record_wfm)): assert record_wfm[j] == waveform[i * test_record_length + j] - assert waveforms[i].channel == expected_channels[i] - assert waveforms[i].record == expected_records[i] def test_configure_ref_levels(self, single_instrument_session): single_instrument_session._configure_ref_levels()