From 85781ac1e4b8ea3b9cc378b6b73905bd71cdff29 Mon Sep 17 00:00:00 2001 From: Adrian Stanea Date: Wed, 28 Aug 2024 12:28:12 +0300 Subject: [PATCH] tests: validate the aout triggering Signed-off-by: Adrian Stanea --- tests/analog_functions.py | 171 +++++++++++++++++++++++++++++++++++++- tests/helpers.py | 34 ++++++++ tests/m2k_analog_test.py | 22 ++++- tests/main.py | 3 +- 4 files changed, 222 insertions(+), 8 deletions(-) diff --git a/tests/analog_functions.py b/tests/analog_functions.py index 6a3e6de6..d4447d84 100644 --- a/tests/analog_functions.py +++ b/tests/analog_functions.py @@ -13,7 +13,7 @@ import random import sys import reset_def_values as reset -from helpers import get_result_files, get_sample_rate_display_format, get_time_format, save_data_to_csv, plot_to_file +from helpers import get_result_files, get_sample_rate_display_format, get_time_format, save_data_to_csv, plot_to_file, plot_to_file_multiline from open_context import ctx_timeout, ctx from create_files import results_file, results_dir, csv @@ -1068,6 +1068,8 @@ def write_file(file, test_name, channel, data_string): file.write("\n\nAmplitude test on channel " + str(channel) + ": \n") elif test_name == "buffer_transition_glitch": file.write("\n\nTest buffer transition glitch on channel " + str(channel) + ": \n") + elif test_name == "aout_triggering": + file.write("\n\nTest aout start with trigger event on channel = " + str(channel) + ": \n") for i in range(len(data_string)): file.write(str(data_string[i]) + '\n') @@ -1317,7 +1319,7 @@ def get_experiment_config_for_sample_hold(dac_sr): raise ValueError("Invalid DAC sample rate.") return cfg -def are_values_within_range(data: np.ndarray, lower_bound, upper_bound, chn): +def are_values_within_range(data: np.ndarray, lower_bound, upper_bound, chn=None): assert lower_bound < upper_bound, "Invalid bounds" is_CH0_in_range = np.all((lower_bound <= data[0]) & (data[0] <= upper_bound)) is_CH1_in_range = np.all((lower_bound <= data[1]) & (data[1] <= upper_bound)) @@ -1329,7 +1331,6 @@ def are_values_within_range(data: np.ndarray, lower_bound, upper_bound, chn): return is_CH1_in_range else: raise ValueError(f"Unknown channel: {chn}") - def test_last_sample_hold( ain: libm2k.M2kAnalogIn, aout: libm2k.M2kAnalogOut, @@ -1511,4 +1512,166 @@ def check_for_glitch(data, threshold=0.3): filename=f"last_sample_hold_{chn_str}_{sr_str}_step4.png") aout.stop() - return glitched, is_last_sample_hold_ok, is_idle_ok \ No newline at end of file + return glitched, is_last_sample_hold_ok, is_idle_ok + + +def test_aout_triggering( + ain: libm2k.M2kAnalogIn, + aout: libm2k.M2kAnalogOut, + dig: libm2k.M2kDigital, + trig: libm2k.M2kHardwareTrigger, + ctx: libm2k.M2k, + auto_rearm : bool, isCyclic : bool, status +): + def configure_trigger(trig: libm2k.M2kHardwareTrigger, + dig: libm2k.M2kDigital, + trig_pin, status, delay): + trig.setAnalogDelay(-delay) + trig.setDigitalDelay(-delay) + trig.setDigitalSource(libm2k.SRC_NONE) # DigitalIn conditioned by internal trigger structure + trig.setDigitalCondition(trig_pin, libm2k.RISING_EDGE_DIGITAL) + trig.setAnalogOutTriggerSource(libm2k.TRIGGER_LA) # aout conditioned by the LA trigger + trig.setAnalogOutTriggerStatus(status) + file_name, dir_name, csv_path = get_result_files(gen_reports) + test_name = "aout_triggering" + data_string = [] + + TRIG_PIN = libm2k.DIO_CHANNEL_0 + DELAY = 8_000 + BUFFER_SIZE = 16_000 + OVERSAMPLING = 1 + KB_COUNT = 40 + N_SAMPLES = 1024 + AMPLITUDE = 5 + OFFSET = 0 + TIMEOUT = 10_000 + + ADC_SR = 100_000_000 + DAC_SR = 75_000_000 + SR_IN_DIG = 100_000_000 + SR_OUT_DIG = 100_000_000 + + ctx.reset() + ctx.calibrateADC() + ctx.calibrateDAC() + ctx.setTimeout(TIMEOUT) + + ain.setSampleRate(ADC_SR) + ain.setOversamplingRatio(OVERSAMPLING) + ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_1, True) + ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_2, True) + ain.setRange(libm2k.ANALOG_IN_CHANNEL_1, -10, 10) + ain.setRange(libm2k.ANALOG_IN_CHANNEL_2, -10, 10) + assert ain.getSampleRate() == ADC_SR, "Failed to set the sample rate for AnalogIn" + + aout.setSampleRate(0, DAC_SR) + aout.setSampleRate(1, DAC_SR) + aout.enableChannel(0, True) + aout.enableChannel(1, True) + aout.setOversamplingRatio(0, 1) + aout.setOversamplingRatio(1, 1) + aout.setKernelBuffersCount(0, KB_COUNT) + aout.setKernelBuffersCount(1, KB_COUNT) + assert aout.getSampleRate(1) == DAC_SR, "Failed to set the sample rate for AnalogOut1" + + dig.setDirection(TRIG_PIN, libm2k.DIO_OUTPUT) + dig.setOutputMode(TRIG_PIN, libm2k.DIO_PUSHPULL) + dig.enableChannel(TRIG_PIN, True) + dig.setCyclic(False) + dig.setValueRaw(TRIG_PIN, libm2k.LOW) + dig.setSampleRateIn(SR_IN_DIG) + dig.setSampleRateOut(SR_OUT_DIG) + assert dig.getSampleRateIn() == SR_IN_DIG , "Failed to set the sample rate for DigitalIn" + assert dig.getSampleRateOut() == SR_OUT_DIG , "Failed to set the sample rate for DigitalOut" + + # LA trigger will determine an action for the aout based on the provided status + configure_trigger(trig, dig, TRIG_PIN, status, DELAY) + aout.setCyclic(isCyclic) + aout.setBufferRearmOnTrigger(auto_rearm) + + # Configure Aout Signal + buf = shape_gen(n=N_SAMPLES, amplitude=AMPLITUDE, offset=OFFSET)[Shape.FALLING_RAMP.value] + aout.push([buf, buf]) + + ctx.startMixedSignalAcquisition(BUFFER_SIZE) + + dig.setValueRaw(TRIG_PIN, libm2k.HIGH) # Trigger event -> should start the AOUT + analog_data = np.array(ain.getSamples(BUFFER_SIZE)) + digital_data = np.array(dig.getSamples(BUFFER_SIZE)) + mask = 0x0001 << TRIG_PIN + digital_data_chn = (digital_data & mask) >> TRIG_PIN + + ctx.stopMixedSignalAcquisition() + + # Validate test + peaks_CH0, _ = find_peaks(analog_data[0], prominence=1, height=1, distance = 100) + peaks_CH1, _ = find_peaks(analog_data[1], prominence=1, height=1, distance = 100) + + CH0_left = analog_data[0][:DELAY] + CH0_right = analog_data[0][DELAY:] + peaks_CH0_left, _ = find_peaks(CH0_left, prominence=1, height=1, distance = 100) + peaks_CH0_right, _ = find_peaks(CH0_right, prominence=1, height=1, distance = 100) + CH1_left = analog_data[1][:DELAY] + CH1_right = analog_data[1][DELAY:] + peaks_CH1_left, _ = find_peaks(CH1_left, prominence=1, height=1, distance = 100) + peaks_CH1_right, _ = find_peaks(CH1_right, prominence=1, height=1, distance = 100) + + status_str = "START" if status == libm2k.START else "STOP" + isCyclic_str = "Cyclic" if isCyclic else "Non-Cyclic" + rearm_str = "Ream" if auto_rearm else "No-Rearm" + data_string.append(f"Configuration: status={status_str} \t isCyclic={isCyclic_str} \t auto_rearm={rearm_str}") + data_string.append(f"\tPeaks before trigger: CH0={len(peaks_CH0_left)} CH1={len(peaks_CH1_left)}") + data_string.append(f"\tPeaks after trigger: CH0={len(peaks_CH0_right)} CH1={len(peaks_CH1_right)}") + + result = True + # NOTE: auto_rearm only has effect on START status + # Case 1, 2, 4 + if ((status == libm2k.START) and (not isCyclic) and (not auto_rearm)) or \ + ((status == libm2k.START) and (not isCyclic) and (auto_rearm)) or \ + ((status == libm2k.START) and (isCyclic) and (auto_rearm)): + # Should IDLE before trigger at 0V because the channel was reset + result = are_values_within_range(analog_data[:, :DELAY ], -0.1, 0.1) + # result = result and (len(peaks_CH0_left) == 0) and (len(peaks_CH1_left) == 0) + # Should output exactly 1 period after trigger + result = result and (len(peaks_CH0_right) == 1) and (len(peaks_CH1_right) == 1) + # Case 3 + if (status == libm2k.START) and (isCyclic) and (not auto_rearm): + # Should IDLE before trigger at 0V because the channel was reset + result = are_values_within_range(analog_data[:, :DELAY ], -0.1, 0.1) + # Should output multiple period after trigger + result = result and (len(peaks_CH0_right) > 1) and (len(peaks_CH1_right) > 1) + # Case 5 and 6 + if ((status == libm2k.STOP) and (not isCyclic) and (not auto_rearm)) or \ + ((status == libm2k.STOP) and (not isCyclic) and (auto_rearm)): + # The channels are in the last sample hold state and STOP is not available for non-cyclic buffers due to HDL limitations + # We expect both channels to hold last sample for the entire duration + result = result and are_values_within_range(analog_data, -AMPLITUDE * 1.1, -AMPLITUDE * 0.9) + result = result and (len(peaks_CH0_left) == 0) and (len(peaks_CH1_left) == 0) + result = result and (len(peaks_CH0_right) == 0) and (len(peaks_CH1_right) == 0) + # Case 7 and 8 + if ((status == libm2k.STOP) and (isCyclic) and (not auto_rearm)) or \ + ((status == libm2k.STOP) and (isCyclic) and (auto_rearm)): + # Should be generating cyclic signal before trigger + result = result and (len(peaks_CH0_left) > 1) and (len(peaks_CH1_left) > 1) + # Should stop generating signal after trigger + # TODO: might need aditional delay since the channel takes some time untill it stops from when the trigger event occurs + result = result and are_values_within_range(analog_data[:, -DELAY + 500:], -0.1, 0.1) + + if gen_reports: + write_file(file_name, test_name, "Both Channels", data_string) + filename_str = f"aout_triggering_{status_str}_{isCyclic_str}_{rearm_str}.png" + plot_to_file_multiline( + title="AOUT Triggering", + datasets=[ + (None, digital_data_chn, {"label":"Digital"}), + (None, analog_data[0], {"label" : "Analog CH0"}), + (peaks_CH0, analog_data[0], {"label" : "Peaks CH0", "marker" : "x"}), + (None, analog_data[1],{"label" : "Analog CH1"}), + (peaks_CH1, analog_data[1], {"label" : "Peaks CH1", "marker" : "x"}), + ], + dir_name=dir_name, + filename=filename_str, + ylim=(-6, 6), + ) + aout.stop() + return result \ No newline at end of file diff --git a/tests/helpers.py b/tests/helpers.py index 700a8ec1..930f8652 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -73,6 +73,40 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, x_lim = None, yla plt.close() return +def plot_to_file_multiline( + title, + datasets, + dir_name, + filename, + xlabel="Samples", ylabel="Voltage [V]", + xlim = None, ylim = None, +): + plt.title(title) + plt.xlabel(xlabel) + plt.ylabel(ylabel) + plt.grid(visible=True) + + for data in datasets: + xdata, ydata, fmt = data + if xdata is not None: + if "marker" in fmt: + # Mark scattered points + plt.plot(xdata, ydata[xdata], linestyle="None", **fmt) + else: + plt.plot(xdata, ydata, **fmt) + else: + plt.plot(ydata, **fmt) + + if xlim is not None: + plt.xlim(*xlim) + if ylim is not None: + plt.ylim(*ylim) + plt.legend() + + plt.savefig(f"{dir_name}/{filename}") + plt.close() + return + def get_time_format(samples, sample_rate): x_time = np.linspace(0, samples/sample_rate, samples) diff --git a/tests/m2k_analog_test.py b/tests/m2k_analog_test.py index 04b7a29f..5abfc5fa 100644 --- a/tests/m2k_analog_test.py +++ b/tests/m2k_analog_test.py @@ -1,16 +1,17 @@ +import itertools import sys import unittest import libm2k from shapefile import shape_gen, ref_shape_gen, shape_name -from analog_functions import get_experiment_config_for_sample_hold, test_amplitude, test_last_sample_hold, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \ +from analog_functions import get_experiment_config_for_sample_hold, test_amplitude, test_aout_triggering, test_last_sample_hold, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \ test_voltmeter_functionality, test_kernel_buffers, test_buffer_transition_glitch from analog_functions import noncyclic_buffer_test, set_samplerates_for_shapetest, set_trig_for_cyclicbuffer_test, \ test_calibration from analog_functions import compare_in_out_frequency, test_oversampling_ratio, channels_diff_in_samples, test_timeout, \ cyclic_buffer_test import reset_def_values as reset -from open_context import ctx, ain, aout, trig, create_dir +from open_context import ctx, ain, aout, dig, trig, create_dir from create_files import results_dir, csv, results_file import logger from repeat_test import repeat @@ -277,4 +278,19 @@ def test_last_sample_hold(self): self.assertEqual(has_glitch, False, f'Found glitches on {chn_str} with DAC SR {sr_format}') self.assertEqual(is_last_sample_hold_ok, True, f'Last sample hold failed on {chn_str} with DAC SR {sr_format}') self.assertEqual(is_last_sample_hold_ok, True, f'Last sample hold failed on {chn_str} with DAC SR {sr_format}') - self.assertEqual(is_idle_ok, True, 'Test idle condition failed') \ No newline at end of file + self.assertEqual(is_idle_ok, True, 'Test idle condition failed') + + @unittest.skipIf(ctx.getFirmwareVersion() < 'v0.33', 'DAC triggering is available starting with firmware v0.33') + def test_aout_triggering(self): + # Test the triggering functionality of the M2kAnalogOut. + # The test looks for patterns before and after the trigger event for 8 different combinations. + autorearm = [False, True] + isCyclic = [False, True] + status = [libm2k.START, libm2k.STOP] + combinations = list(itertools.product(autorearm, isCyclic, status)) + for combination in combinations: + autorearm, isCyclic, status = combination + test_result = test_aout_triggering(ain, aout, dig, trig, ctx, autorearm, isCyclic, status) + status_str = "START" if status == libm2k.START else "STOP" + with self.subTest(msg=f'Test aout start with trigger for: status={status_str}, isCyclic={isCyclic}, autorearm={autorearm} '): + self.assertEqual(test_result, True, msg=f'Specification not met') diff --git a/tests/main.py b/tests/main.py index 2c441078..5b27e50b 100644 --- a/tests/main.py +++ b/tests/main.py @@ -75,7 +75,8 @@ def wait_(): "test_shapes_ch1\n" "test_voltmeter\n" "test_buffer_transition_glitch\n" - "test_last_sample_hold\n") + "test_last_sample_hold\n" + "test_aout_triggering\n") print("\n ===== class B_TriggerTests ===== \n") print(" ===== tests ====== \n") print("test_1_trigger_object\n"