Skip to content

Commit

Permalink
Test niscope fetching more thoroughly and consistently, parameterize …
Browse files Browse the repository at this point in the history
…channels tested (#1956)

* test fetching more thoroughly and consistently, parameterize channels tested

* tweak tests to pass

* remove duplicate assertion

* Fix flake8 issues in tests
  • Loading branch information
ni-jfitzger authored May 2, 2023
1 parent ca585de commit 195ac33
Showing 1 changed file with 132 additions and 57 deletions.
189 changes: 132 additions & 57 deletions src/niscope/system_tests/test_system_niscope.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

instruments = ['FakeDevice1', 'FakeDevice2']
# TODO(sbethur): Use `get_channel_names` when #1402 is fixed
test_channels = 'FakeDevice2/0,FakeDevice1/1'
test_channels_1 = 'FakeDevice2/0,FakeDevice1/1'
test_channels_1_expanded = test_channels_1
# TODO(jfitzger): Use the commented values, once #1770 is fixed
test_channels_2 = 'FakeDevice2/0' # 'FakeDevice2/0:1'
test_channels_2_expanded = 'FakeDevice2/0' # 'FakeDevice2/0,FakeDevice2/1'


# There are system tests below that need either a PXI-5124 or a PXI-5142 instead of the PXIe-5164 we use everywhere else
Expand All @@ -37,6 +41,33 @@
daqmx_sim_5142_lock = fasteners.InterProcessLock(daqmx_sim_5142_lock_file)


def check_fetched_data(
data, # either waveforms or measurement_stats
test_channels_expanded,
test_record_length,
test_num_records_to_fetch,
test_starting_record_number=0
):
test_num_channels = len(test_channels_expanded.split(','))

# Ordering: rec 0: ch 0, rec 0: ch 1, rec 1: ch 0, rec 1: ch 1, etc.
expected_channels = test_channels_expanded.split(',') * test_num_records_to_fetch
expected_records = []
for i in range(test_starting_record_number, test_starting_record_number + test_num_records_to_fetch):
expected_records += [i] * test_num_channels

assert len(data) == test_num_channels * test_num_records_to_fetch
for i in range(len(data)):
if isinstance(data[i], niscope.WaveformInfo):
assert len(data[i].samples) == test_record_length
elif isinstance(data[i], niscope.MeasurementStats):
assert data[i].result == 0.0
else:
raise TypeError(f"data is unsupported type {type(data[i])}")
assert data[i].channel == expected_channels[i]
assert data[i].record == expected_records[i]


class SystemTests:
@pytest.fixture(scope='function')
def single_instrument_session(self, session_creation_kwargs):
Expand Down Expand Up @@ -72,22 +103,32 @@ def test_vi_string_attribute(self, multi_instrument_session):
assert trigger_source == multi_instrument_session.acq_arm_source

# Basic usability tests
def test_read(self, multi_instrument_session):
@pytest.mark.parametrize(
"test_channels,test_channels_expanded",
[
(test_channels_1, test_channels_1_expanded),
(test_channels_2, test_channels_2_expanded),
],
)
def test_read(self, multi_instrument_session, test_channels, test_channels_expanded):
test_voltage = 1.0
test_record_length = 2000
test_num_channels = 2
test_num_records = 3
multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC)
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)
waveforms = multi_instrument_session.channels[test_channels].read(num_samples=test_record_length, num_records=test_num_records)
assert len(waveforms) == test_num_channels * test_num_records
for i in range(len(waveforms)):
assert len(waveforms[i].samples) == test_record_length
check_fetched_data(waveforms, test_channels_expanded, test_record_length, test_num_records)

def test_fetch(self, multi_instrument_session):
@pytest.mark.parametrize(
"test_channels,test_channels_expanded",
[
(test_channels_1, test_channels_1_expanded),
(test_channels_2, test_channels_2_expanded),
],
)
def test_fetch(self, multi_instrument_session, test_channels, test_channels_expanded):
test_voltage = 1.0
test_record_length = 2000
test_num_channels = 2
test_starting_record_number = 2
test_num_records_to_acquire = 5
test_num_records_to_fetch = test_num_records_to_acquire - test_starting_record_number
Expand All @@ -98,13 +139,13 @@ def test_fetch(self, multi_instrument_session):
num_samples=test_record_length,
record_number=test_starting_record_number,
num_records=test_num_records_to_fetch)
assert len(waveforms) == test_num_channels * test_num_records_to_fetch
expected_channels = test_channels.split(',') * test_num_records_to_fetch
expected_records = [2, 2, 3, 3, 4, 4]
for i in range(len(waveforms)):
assert len(waveforms[i].samples) == test_record_length
assert waveforms[i].channel == expected_channels[i]
assert waveforms[i].record == expected_records[i]
check_fetched_data(
waveforms,
test_channels_expanded,
test_record_length,
test_num_records_to_fetch,
test_starting_record_number,
)

def test_fetch_defaults(self, multi_instrument_session):
test_voltage = 1.0
Expand All @@ -113,7 +154,7 @@ def test_fetch_defaults(self, multi_instrument_session):
multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC)
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, 1, True)
with multi_instrument_session.initiate():
waveforms = multi_instrument_session.channels[test_channels].fetch()
waveforms = multi_instrument_session.channels[test_channels_1].fetch()
assert len(waveforms) == test_num_channels
for i in range(len(waveforms)):
assert len(waveforms[i].samples) == test_record_length
Expand All @@ -123,10 +164,22 @@ def measurement_wfm_length(self, request):
MeasWfmLength = collections.namedtuple('MeasurementWaveformLength', ['passed_in', 'expected'])
return MeasWfmLength(passed_in=request.param[0], expected=request.param[1])

def test_fetch_array_measurement(self, multi_instrument_session, measurement_wfm_length):
@pytest.mark.parametrize(
"test_channels,test_channels_expanded",
[
(test_channels_1, test_channels_1_expanded),
(test_channels_2, test_channels_2_expanded),
],
)
def test_fetch_array_measurement(
self,
multi_instrument_session,
measurement_wfm_length,
test_channels,
test_channels_expanded,
):
test_voltage = 1.0
test_record_length = 1000
test_num_channels = 2
test_meas_wfm_length = measurement_wfm_length.passed_in
test_array_meas_function = niscope.ArrayMeasurement.ARRAY_GAIN
test_starting_record_number = 2
Expand All @@ -146,13 +199,13 @@ def test_fetch_array_measurement(self, multi_instrument_session, measurement_wfm
meas_num_samples=2000,
timeout=hightime.timedelta(seconds=4))

assert len(waveforms) == test_num_channels * test_num_records_to_fetch
expected_channels = test_channels.split(',') * test_num_records_to_fetch
expected_records = [2, 2, 3, 3, 4, 4]
for i in range(len(waveforms)):
assert len(waveforms[i].samples) == measurement_wfm_length.expected
assert waveforms[i].channel == expected_channels[i]
assert waveforms[i].record == expected_records[i]
check_fetched_data(
waveforms,
test_channels_expanded,
measurement_wfm_length.expected,
test_num_records_to_fetch,
test_starting_record_number,
)

def test_fetch_array_measurement_defaults(self, multi_instrument_session):
test_voltage = 1.0
Expand All @@ -165,18 +218,28 @@ def test_fetch_array_measurement_defaults(self, multi_instrument_session):
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)

with multi_instrument_session.initiate():
waveforms = multi_instrument_session.channels[test_channels].fetch_array_measurement(
waveforms = multi_instrument_session.channels[test_channels_1].fetch_array_measurement(
array_meas_function=test_array_meas_function)

assert len(waveforms) == test_num_channels * test_num_records
for i in range(len(waveforms)):
assert len(waveforms[i].samples) == test_record_length

def test_fetch_measurement_stats(self, multi_instrument_session):
@pytest.mark.parametrize(
"test_channels,test_channels_expanded",
[
(test_channels_1, test_channels_1_expanded),
(test_channels_2, test_channels_2_expanded),
],
)
def test_fetch_measurement_stats(
self,
multi_instrument_session,
test_channels,
test_channels_expanded,
):
test_voltage = 1.0
test_record_length = 1000
test_num_channels = 2
test_num_records = 3
test_starting_record_number = 2
test_num_records_to_acquire = 5
test_num_records_to_fetch = test_num_records_to_acquire - test_starting_record_number
Expand All @@ -191,13 +254,13 @@ def test_fetch_measurement_stats(self, multi_instrument_session):
num_records=test_num_records_to_fetch,
timeout=hightime.timedelta(seconds=4))

assert len(measurement_stats) == test_num_channels * test_num_records
expected_channels = test_channels.split(',') * test_num_records_to_fetch
expected_records = [2, 2, 3, 3, 4, 4]
for i in range(len(measurement_stats)):
assert measurement_stats[i].result == 0.0
assert measurement_stats[i].channel == expected_channels[i]
assert measurement_stats[i].record == expected_records[i]
check_fetched_data(
measurement_stats,
test_channels_expanded,
None, # pass None for test_record_length, because we shouldn't need it
test_num_records_to_fetch,
test_starting_record_number
)

def test_fetch_measurement_stats_defaults(self, multi_instrument_session):
test_voltage = 1.0
Expand All @@ -207,7 +270,7 @@ def test_fetch_measurement_stats_defaults(self, multi_instrument_session):
multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC)
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)
with multi_instrument_session.initiate():
measurement_stats = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT)
measurement_stats = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT)

assert len(measurement_stats) == test_num_channels * test_num_records
for stat in measurement_stats:
Expand All @@ -220,10 +283,10 @@ def test_clear_waveform_measurement_stats(self, multi_instrument_session):
multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC)
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)
with multi_instrument_session.initiate():
uncleared_stats = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)
uncleared_stats_2 = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)
multi_instrument_session.channels[test_channels].clear_waveform_measurement_stats(niscope.enums.ClearableMeasurement.FREQUENCY)
cleared_stats = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)
uncleared_stats = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)
uncleared_stats_2 = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)
multi_instrument_session.channels[test_channels_1].clear_waveform_measurement_stats(niscope.enums.ClearableMeasurement.FREQUENCY)
cleared_stats = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)

# The principle here is using consistent behavior (i.e. if stats are fetched twice on a single record/channel measurement in a row, it will always be the same)
# to demonstrate that clearing the stats does in fact cause a measurable change.
Expand All @@ -244,9 +307,9 @@ def test_waveform_processing(self, multi_instrument_session):
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)
with multi_instrument_session.initiate():
multi_instrument_session.add_waveform_processing(niscope.enums.ArrayMeasurement.DERIVATIVE)
processed_waveforms = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS)
processed_waveforms = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS)
multi_instrument_session.clear_waveform_processing()
unprocessed_waveforms = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS)
unprocessed_waveforms = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS)

assert len(processed_waveforms) == test_num_channels * test_num_records
assert len(unprocessed_waveforms) == test_num_channels * test_num_records
Expand All @@ -264,7 +327,7 @@ def test_measurement_stats_str(self, multi_instrument_session):
multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC)
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)
with multi_instrument_session.initiate():
measurement_stat = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT)
measurement_stat = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT)

assert isinstance(measurement_stat[0].__str__(), str)
assert isinstance(measurement_stat[0].__repr__(), str)
Expand Down Expand Up @@ -457,18 +520,29 @@ def session_creation_kwargs(self):

# not supported by grpc due to numpy usage
@pytest.mark.parametrize(
"fetch_waveform_type,type_min_value",
"fetch_waveform_type,type_min_value,test_channels,test_channels_expanded",
[
(numpy.int8, numpy.iinfo(numpy.int8).min),
(numpy.int16, numpy.iinfo(numpy.int16).min),
(numpy.int32, numpy.iinfo(numpy.int32).min),
(numpy.float64, numpy.finfo(numpy.float64).min),
(numpy.int8, numpy.iinfo(numpy.int8).min, test_channels_1, test_channels_1_expanded),
(numpy.int16, numpy.iinfo(numpy.int16).min, test_channels_1, test_channels_1_expanded),
(numpy.int32, numpy.iinfo(numpy.int32).min, test_channels_1, test_channels_1_expanded),
(numpy.float64, numpy.finfo(numpy.float64).min, test_channels_1, test_channels_1_expanded),
(numpy.int8, numpy.iinfo(numpy.int8).min, test_channels_2, test_channels_2_expanded),
(numpy.int16, numpy.iinfo(numpy.int16).min, test_channels_2, test_channels_2_expanded),
(numpy.int32, numpy.iinfo(numpy.int32).min, test_channels_2, test_channels_2_expanded),
(numpy.float64, numpy.finfo(numpy.float64).min, test_channels_2, test_channels_2_expanded),
],
)
def test_fetch_into(self, multi_instrument_session, fetch_waveform_type, type_min_value):
def test_fetch_into(
self,
multi_instrument_session,
fetch_waveform_type,
type_min_value,
test_channels,
test_channels_expanded,
):
test_voltage = 1.0
test_record_length = 2000
test_num_channels = 2
test_num_channels = len(test_channels_expanded.split(','))
test_starting_record_number = 2
test_num_records_to_acquire = 5
test_num_records_to_fetch = test_num_records_to_acquire - test_starting_record_number
Expand All @@ -486,17 +560,18 @@ def test_fetch_into(self, multi_instrument_session, fetch_waveform_type, type_mi

for index, sample in enumerate(waveform):
assert sample != init_waveform[index]
assert len(waveforms) == test_num_channels * test_num_records_to_fetch

expected_channels = test_channels.split(',') * test_num_records_to_fetch
expected_records = [2, 2, 3, 3, 4, 4]
check_fetched_data(
waveforms,
test_channels_expanded,
test_record_length,
test_num_records_to_fetch,
test_starting_record_number,
)
for i in range(len(waveforms)):
record_wfm = waveforms[i].samples
assert len(record_wfm) == test_record_length
for j in range(len(record_wfm)):
assert record_wfm[j] == waveform[i * test_record_length + j]
assert waveforms[i].channel == expected_channels[i]
assert waveforms[i].record == expected_records[i]

def test_configure_ref_levels(self, single_instrument_session):
single_instrument_session._configure_ref_levels()
Expand Down

0 comments on commit 195ac33

Please sign in to comment.