Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test niscope fetching more thoroughly and consistently, parameterize channels tested #1956

Merged
merged 5 commits into from
May 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 132 additions & 57 deletions src/niscope/system_tests/test_system_niscope.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

instruments = ['FakeDevice1', 'FakeDevice2']
# TODO(sbethur): Use `get_channel_names` when #1402 is fixed
test_channels = 'FakeDevice2/0,FakeDevice1/1'
test_channels_1 = 'FakeDevice2/0,FakeDevice1/1'
test_channels_1_expanded = test_channels_1
# TODO(jfitzger): Use the commented values, once #1770 is fixed
test_channels_2 = 'FakeDevice2/0' # 'FakeDevice2/0:1'
test_channels_2_expanded = 'FakeDevice2/0' # 'FakeDevice2/0,FakeDevice2/1'


# There are system tests below that need either a PXI-5124 or a PXI-5142 instead of the PXIe-5164 we use everywhere else
Expand All @@ -37,6 +41,33 @@
daqmx_sim_5142_lock = fasteners.InterProcessLock(daqmx_sim_5142_lock_file)


def check_fetched_data(
data, # either waveforms or measurement_stats
test_channels_expanded,
test_record_length,
test_num_records_to_fetch,
test_starting_record_number=0
):
test_num_channels = len(test_channels_expanded.split(','))

# Ordering: rec 0: ch 0, rec 0: ch 1, rec 1: ch 0, rec 1: ch 1, etc.
expected_channels = test_channels_expanded.split(',') * test_num_records_to_fetch
expected_records = []
for i in range(test_starting_record_number, test_starting_record_number + test_num_records_to_fetch):
expected_records += [i] * test_num_channels

assert len(data) == test_num_channels * test_num_records_to_fetch
for i in range(len(data)):
if isinstance(data[i], niscope.WaveformInfo):
assert len(data[i].samples) == test_record_length
elif isinstance(data[i], niscope.MeasurementStats):
assert data[i].result == 0.0
else:
raise TypeError(f"data is unsupported type {type(data[i])}")
assert data[i].channel == expected_channels[i]
assert data[i].record == expected_records[i]


class SystemTests:
@pytest.fixture(scope='function')
def single_instrument_session(self, session_creation_kwargs):
Expand Down Expand Up @@ -72,22 +103,32 @@ def test_vi_string_attribute(self, multi_instrument_session):
assert trigger_source == multi_instrument_session.acq_arm_source

# Basic usability tests
def test_read(self, multi_instrument_session):
@pytest.mark.parametrize(
"test_channels,test_channels_expanded",
[
(test_channels_1, test_channels_1_expanded),
(test_channels_2, test_channels_2_expanded),
],
)
def test_read(self, multi_instrument_session, test_channels, test_channels_expanded):
test_voltage = 1.0
test_record_length = 2000
test_num_channels = 2
test_num_records = 3
multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC)
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)
waveforms = multi_instrument_session.channels[test_channels].read(num_samples=test_record_length, num_records=test_num_records)
assert len(waveforms) == test_num_channels * test_num_records
for i in range(len(waveforms)):
assert len(waveforms[i].samples) == test_record_length
check_fetched_data(waveforms, test_channels_expanded, test_record_length, test_num_records)

def test_fetch(self, multi_instrument_session):
@pytest.mark.parametrize(
"test_channels,test_channels_expanded",
[
(test_channels_1, test_channels_1_expanded),
(test_channels_2, test_channels_2_expanded),
],
)
def test_fetch(self, multi_instrument_session, test_channels, test_channels_expanded):
test_voltage = 1.0
test_record_length = 2000
test_num_channels = 2
test_starting_record_number = 2
test_num_records_to_acquire = 5
test_num_records_to_fetch = test_num_records_to_acquire - test_starting_record_number
Expand All @@ -98,13 +139,13 @@ def test_fetch(self, multi_instrument_session):
num_samples=test_record_length,
record_number=test_starting_record_number,
num_records=test_num_records_to_fetch)
assert len(waveforms) == test_num_channels * test_num_records_to_fetch
expected_channels = test_channels.split(',') * test_num_records_to_fetch
expected_records = [2, 2, 3, 3, 4, 4]
for i in range(len(waveforms)):
assert len(waveforms[i].samples) == test_record_length
assert waveforms[i].channel == expected_channels[i]
assert waveforms[i].record == expected_records[i]
check_fetched_data(
waveforms,
test_channels_expanded,
test_record_length,
test_num_records_to_fetch,
test_starting_record_number,
)

def test_fetch_defaults(self, multi_instrument_session):
test_voltage = 1.0
Expand All @@ -113,7 +154,7 @@ def test_fetch_defaults(self, multi_instrument_session):
multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC)
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, 1, True)
with multi_instrument_session.initiate():
waveforms = multi_instrument_session.channels[test_channels].fetch()
waveforms = multi_instrument_session.channels[test_channels_1].fetch()
assert len(waveforms) == test_num_channels
for i in range(len(waveforms)):
assert len(waveforms[i].samples) == test_record_length
Expand All @@ -123,10 +164,22 @@ def measurement_wfm_length(self, request):
MeasWfmLength = collections.namedtuple('MeasurementWaveformLength', ['passed_in', 'expected'])
return MeasWfmLength(passed_in=request.param[0], expected=request.param[1])

def test_fetch_array_measurement(self, multi_instrument_session, measurement_wfm_length):
@pytest.mark.parametrize(
"test_channels,test_channels_expanded",
[
(test_channels_1, test_channels_1_expanded),
(test_channels_2, test_channels_2_expanded),
],
)
def test_fetch_array_measurement(
self,
multi_instrument_session,
measurement_wfm_length,
test_channels,
test_channels_expanded,
):
test_voltage = 1.0
test_record_length = 1000
test_num_channels = 2
test_meas_wfm_length = measurement_wfm_length.passed_in
test_array_meas_function = niscope.ArrayMeasurement.ARRAY_GAIN
test_starting_record_number = 2
Expand All @@ -146,13 +199,13 @@ def test_fetch_array_measurement(self, multi_instrument_session, measurement_wfm
meas_num_samples=2000,
timeout=hightime.timedelta(seconds=4))

assert len(waveforms) == test_num_channels * test_num_records_to_fetch
expected_channels = test_channels.split(',') * test_num_records_to_fetch
expected_records = [2, 2, 3, 3, 4, 4]
for i in range(len(waveforms)):
assert len(waveforms[i].samples) == measurement_wfm_length.expected
assert waveforms[i].channel == expected_channels[i]
assert waveforms[i].record == expected_records[i]
check_fetched_data(
waveforms,
test_channels_expanded,
measurement_wfm_length.expected,
test_num_records_to_fetch,
test_starting_record_number,
)

def test_fetch_array_measurement_defaults(self, multi_instrument_session):
test_voltage = 1.0
Expand All @@ -165,18 +218,28 @@ def test_fetch_array_measurement_defaults(self, multi_instrument_session):
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)

with multi_instrument_session.initiate():
waveforms = multi_instrument_session.channels[test_channels].fetch_array_measurement(
waveforms = multi_instrument_session.channels[test_channels_1].fetch_array_measurement(
array_meas_function=test_array_meas_function)

assert len(waveforms) == test_num_channels * test_num_records
for i in range(len(waveforms)):
assert len(waveforms[i].samples) == test_record_length

def test_fetch_measurement_stats(self, multi_instrument_session):
@pytest.mark.parametrize(
"test_channels,test_channels_expanded",
[
(test_channels_1, test_channels_1_expanded),
(test_channels_2, test_channels_2_expanded),
],
)
def test_fetch_measurement_stats(
self,
multi_instrument_session,
test_channels,
test_channels_expanded,
):
test_voltage = 1.0
test_record_length = 1000
test_num_channels = 2
test_num_records = 3
test_starting_record_number = 2
test_num_records_to_acquire = 5
test_num_records_to_fetch = test_num_records_to_acquire - test_starting_record_number
Expand All @@ -191,13 +254,13 @@ def test_fetch_measurement_stats(self, multi_instrument_session):
num_records=test_num_records_to_fetch,
timeout=hightime.timedelta(seconds=4))

assert len(measurement_stats) == test_num_channels * test_num_records
expected_channels = test_channels.split(',') * test_num_records_to_fetch
expected_records = [2, 2, 3, 3, 4, 4]
for i in range(len(measurement_stats)):
assert measurement_stats[i].result == 0.0
assert measurement_stats[i].channel == expected_channels[i]
assert measurement_stats[i].record == expected_records[i]
check_fetched_data(
measurement_stats,
test_channels_expanded,
None, # pass None for test_record_length, because we shouldn't need it
test_num_records_to_fetch,
test_starting_record_number
)

def test_fetch_measurement_stats_defaults(self, multi_instrument_session):
test_voltage = 1.0
Expand All @@ -207,7 +270,7 @@ def test_fetch_measurement_stats_defaults(self, multi_instrument_session):
multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC)
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)
with multi_instrument_session.initiate():
measurement_stats = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT)
measurement_stats = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT)

assert len(measurement_stats) == test_num_channels * test_num_records
for stat in measurement_stats:
Expand All @@ -220,10 +283,10 @@ def test_clear_waveform_measurement_stats(self, multi_instrument_session):
multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC)
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)
with multi_instrument_session.initiate():
uncleared_stats = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)
uncleared_stats_2 = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)
multi_instrument_session.channels[test_channels].clear_waveform_measurement_stats(niscope.enums.ClearableMeasurement.FREQUENCY)
cleared_stats = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)
uncleared_stats = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)
uncleared_stats_2 = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)
multi_instrument_session.channels[test_channels_1].clear_waveform_measurement_stats(niscope.enums.ClearableMeasurement.FREQUENCY)
cleared_stats = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.FREQUENCY)

# The principle here is using consistent behavior (i.e. if stats are fetched twice on a single record/channel measurement in a row, it will always be the same)
# to demonstrate that clearing the stats does in fact cause a measurable change.
Expand All @@ -244,9 +307,9 @@ def test_waveform_processing(self, multi_instrument_session):
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)
with multi_instrument_session.initiate():
multi_instrument_session.add_waveform_processing(niscope.enums.ArrayMeasurement.DERIVATIVE)
processed_waveforms = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS)
processed_waveforms = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS)
multi_instrument_session.clear_waveform_processing()
unprocessed_waveforms = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS)
unprocessed_waveforms = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.MID_REF_VOLTS)

assert len(processed_waveforms) == test_num_channels * test_num_records
assert len(unprocessed_waveforms) == test_num_channels * test_num_records
Expand All @@ -264,7 +327,7 @@ def test_measurement_stats_str(self, multi_instrument_session):
multi_instrument_session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC)
multi_instrument_session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)
with multi_instrument_session.initiate():
measurement_stat = multi_instrument_session.channels[test_channels].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT)
measurement_stat = multi_instrument_session.channels[test_channels_1].fetch_measurement_stats(niscope.enums.ScalarMeasurement.NO_MEASUREMENT)

assert isinstance(measurement_stat[0].__str__(), str)
assert isinstance(measurement_stat[0].__repr__(), str)
Expand Down Expand Up @@ -457,18 +520,29 @@ def session_creation_kwargs(self):

# not supported by grpc due to numpy usage
@pytest.mark.parametrize(
"fetch_waveform_type,type_min_value",
"fetch_waveform_type,type_min_value,test_channels,test_channels_expanded",
[
(numpy.int8, numpy.iinfo(numpy.int8).min),
(numpy.int16, numpy.iinfo(numpy.int16).min),
(numpy.int32, numpy.iinfo(numpy.int32).min),
(numpy.float64, numpy.finfo(numpy.float64).min),
(numpy.int8, numpy.iinfo(numpy.int8).min, test_channels_1, test_channels_1_expanded),
ni-jfitzger marked this conversation as resolved.
Show resolved Hide resolved
(numpy.int16, numpy.iinfo(numpy.int16).min, test_channels_1, test_channels_1_expanded),
(numpy.int32, numpy.iinfo(numpy.int32).min, test_channels_1, test_channels_1_expanded),
(numpy.float64, numpy.finfo(numpy.float64).min, test_channels_1, test_channels_1_expanded),
(numpy.int8, numpy.iinfo(numpy.int8).min, test_channels_2, test_channels_2_expanded),
(numpy.int16, numpy.iinfo(numpy.int16).min, test_channels_2, test_channels_2_expanded),
(numpy.int32, numpy.iinfo(numpy.int32).min, test_channels_2, test_channels_2_expanded),
(numpy.float64, numpy.finfo(numpy.float64).min, test_channels_2, test_channels_2_expanded),
],
)
def test_fetch_into(self, multi_instrument_session, fetch_waveform_type, type_min_value):
def test_fetch_into(
self,
multi_instrument_session,
fetch_waveform_type,
type_min_value,
test_channels,
test_channels_expanded,
):
test_voltage = 1.0
test_record_length = 2000
test_num_channels = 2
test_num_channels = len(test_channels_expanded.split(','))
test_starting_record_number = 2
test_num_records_to_acquire = 5
test_num_records_to_fetch = test_num_records_to_acquire - test_starting_record_number
Expand All @@ -486,17 +560,18 @@ def test_fetch_into(self, multi_instrument_session, fetch_waveform_type, type_mi

for index, sample in enumerate(waveform):
assert sample != init_waveform[index]
assert len(waveforms) == test_num_channels * test_num_records_to_fetch

expected_channels = test_channels.split(',') * test_num_records_to_fetch
expected_records = [2, 2, 3, 3, 4, 4]
check_fetched_data(
waveforms,
test_channels_expanded,
test_record_length,
test_num_records_to_fetch,
test_starting_record_number,
)
for i in range(len(waveforms)):
record_wfm = waveforms[i].samples
assert len(record_wfm) == test_record_length
for j in range(len(record_wfm)):
assert record_wfm[j] == waveform[i * test_record_length + j]
assert waveforms[i].channel == expected_channels[i]
assert waveforms[i].record == expected_records[i]

def test_configure_ref_levels(self, single_instrument_session):
single_instrument_session._configure_ref_levels()
Expand Down