diff --git a/src/qibolab/instruments/zhinst.py b/src/qibolab/instruments/zhinst.py deleted file mode 100644 index ef7e5e22e..000000000 --- a/src/qibolab/instruments/zhinst.py +++ /dev/null @@ -1,1434 +0,0 @@ -"""Instrument for using the Zurich Instruments (Zhinst) devices.""" - -import copy -import os -from collections import defaultdict -from dataclasses import dataclass, replace -from typing import Dict, List, Tuple, Union - -import laboneq._token -import laboneq.simple as lo -import numpy as np -from laboneq.contrib.example_helpers.plotting.plot_helpers import plot_simulation -from laboneq.dsl.experiment.pulse_library import ( - sampled_pulse_complex, - sampled_pulse_real, -) -from qibo.config import log - -from qibolab import AcquisitionType, AveragingMode, ExecutionParameters -from qibolab.couplers import Coupler -from qibolab.pulses import CouplerFluxPulse, FluxPulse, PulseSequence, PulseType -from qibolab.qubits import Qubit -from qibolab.sweeper import Parameter, Sweeper -from qibolab.unrolling import Bounds - -from .abstract import Controller -from .port import Port - -# this env var just needs to be set -os.environ["LABONEQ_TOKEN"] = "not required" -laboneq._token.is_valid_token = lambda _token: True # pylint: disable=W0212 - -SAMPLING_RATE = 2 -NANO_TO_SECONDS = 1e-9 -COMPILER_SETTINGS = { - "SHFSG_MIN_PLAYWAVE_HINT": 32, - "SHFSG_MIN_PLAYZERO_HINT": 32, - "HDAWG_MIN_PLAYWAVE_HINT": 64, - "HDAWG_MIN_PLAYZERO_HINT": 64, -} -"""Translating to Zurich ExecutionParameters.""" -ACQUISITION_TYPE = { - AcquisitionType.INTEGRATION: lo.AcquisitionType.INTEGRATION, - AcquisitionType.RAW: lo.AcquisitionType.RAW, - AcquisitionType.DISCRIMINATION: lo.AcquisitionType.DISCRIMINATION, -} - -AVERAGING_MODE = { - AveragingMode.CYCLIC: lo.AveragingMode.CYCLIC, - AveragingMode.SINGLESHOT: lo.AveragingMode.SINGLE_SHOT, -} - -SWEEPER_SET = {"amplitude", "frequency", "duration", "relative_phase"} -SWEEPER_BIAS = {"bias"} -SWEEPER_START = {"start"} - - -def select_pulse(pulse, pulse_type): - """Pulse translation.""" - - if "IIR" not in str(pulse.shape): - if str(pulse.shape) == "Rectangular()": - can_compress = pulse.type is not PulseType.READOUT - return lo.pulse_library.const( - uid=(f"{pulse_type}_{pulse.qubit}_"), - length=round(pulse.duration * NANO_TO_SECONDS, 9), - amplitude=pulse.amplitude, - can_compress=can_compress, - ) - if "Gaussian" in str(pulse.shape): - sigma = pulse.shape.rel_sigma - return lo.pulse_library.gaussian( - uid=(f"{pulse_type}_{pulse.qubit}_"), - length=round(pulse.duration * NANO_TO_SECONDS, 9), - amplitude=pulse.amplitude, - sigma=2 / sigma, - zero_boundaries=False, - ) - - if "GaussianSquare" in str(pulse.shape): - sigma = pulse.shape.rel_sigma - width = pulse.shape.width - can_compress = pulse.type is not PulseType.READOUT - return lo.pulse_library.gaussian_square( - uid=(f"{pulse_type}_{pulse.qubit}_"), - length=round(pulse.duration * NANO_TO_SECONDS, 9), - width=round(pulse.duration * NANO_TO_SECONDS, 9) * width, - amplitude=pulse.amplitude, - can_compress=can_compress, - sigma=2 / sigma, - zero_boundaries=False, - ) - - if "Drag" in str(pulse.shape): - sigma = pulse.shape.rel_sigma - beta = pulse.shape.beta - return lo.pulse_library.drag( - uid=(f"{pulse_type}_{pulse.qubit}_"), - length=round(pulse.duration * NANO_TO_SECONDS, 9), - amplitude=pulse.amplitude, - sigma=2 / sigma, - beta=beta, - zero_boundaries=False, - ) - - if np.all(pulse.envelope_waveform_q(SAMPLING_RATE).data == 0): - return sampled_pulse_real( - uid=(f"{pulse_type}_{pulse.qubit}_"), - samples=pulse.envelope_waveform_i(SAMPLING_RATE).data, - can_compress=True, - ) - else: - # Test this when we have pulses that use it - return sampled_pulse_complex( - uid=(f"{pulse_type}_{pulse.qubit}_"), - samples=pulse.envelope_waveform_i(SAMPLING_RATE).data - + (1j * pulse.envelope_waveform_q(SAMPLING_RATE).data), - can_compress=True, - ) - - # Implement Slepian shaped flux pulse https://arxiv.org/pdf/0909.5368.pdf - - # """ - # Typically, the sampler function should discard ``length`` and ``amplitude``, and - # instead assume that the pulse extends from -1 to 1, and that it has unit - # amplitude. LabOne Q will automatically rescale the sampler's output to the correct - # amplitude and length. - - # They don't even do that on their notebooks - # and just use lenght and amplitude but we have to check - - -@dataclass -class ZhPort(Port): - name: Tuple[str, str] - offset: float = 0.0 - power_range: int = 0 - - -class ZhPulse: - """Zurich pulse from qibolab pulse translation.""" - - def __init__(self, pulse): - """Zurich pulse from qibolab pulse.""" - self.pulse = pulse - """Qibolab pulse.""" - self.signal = f"{pulse.type.name.lower()}{pulse.qubit}" - """Line associated with the pulse.""" - self.zhpulse = select_pulse(pulse, pulse.type.name.lower()) - """Zurich pulse.""" - - -class ZhSweeper: - """Zurich sweeper from qibolab sweeper for pulse parameters Amplitude, - Duration, Frequency (and maybe Phase)""" - - def __init__(self, pulse, sweeper, qubit): - self.sweeper = sweeper - """Qibolab sweeper.""" - - self.pulse = pulse - """Qibolab pulse associated to the sweeper.""" - self.signal = f"{pulse.type.name.lower()}{pulse.qubit}" - """Line associated with the pulse.""" - self.zhpulse = ZhPulse(pulse).zhpulse - """Zurich pulse associated to the sweeper.""" - - self.zhsweeper = self.select_sweeper(pulse.type, sweeper, qubit) - """Zurich sweeper.""" - - self.zhsweepers = [self.select_sweeper(pulse.type, sweeper, qubit)] - """Zurich sweepers, Need something better to store multiple sweeps on - the same pulse. - - Not properly implemented as it was only used on Rabi amplitude - vs lenght and it was an unused routine. - """ - - @staticmethod # pylint: disable=R0903 - def select_sweeper(ptype, sweeper, qubit): - """Sweeper translation.""" - - if sweeper.parameter is Parameter.amplitude: - return lo.SweepParameter( - uid=sweeper.parameter.name, - values=copy.copy(sweeper.values), - ) - if sweeper.parameter is Parameter.duration: - return lo.SweepParameter( - uid=sweeper.parameter.name, - values=sweeper.values * NANO_TO_SECONDS, - ) - if sweeper.parameter is Parameter.relative_phase: - return lo.SweepParameter( - uid=sweeper.parameter.name, - values=sweeper.values, - ) - if sweeper.parameter is Parameter.frequency: - if ptype is PulseType.READOUT: - intermediate_frequency = ( - qubit.readout_frequency - qubit.readout.local_oscillator.frequency - ) - elif ptype is PulseType.DRIVE: - intermediate_frequency = ( - qubit.drive_frequency - qubit.drive.local_oscillator.frequency - ) - return lo.LinearSweepParameter( - uid=sweeper.parameter.name, - start=sweeper.values[0] + intermediate_frequency, - stop=sweeper.values[-1] + intermediate_frequency, - count=len(sweeper.values), - ) - - def add_sweeper(self, sweeper, qubit): - """Add sweeper to list of sweepers.""" - self.zhsweepers.append(self.select_sweeper(self.pulse.type, sweeper, qubit)) - - -class ZhSweeperLine: - """Zurich sweeper from qibolab sweeper for non pulse parameters Bias, Delay - (, power_range, local_oscillator frequency, offset ???) - - For now Parameter.bias sweepers are implemented as - Parameter.Amplitude on a flux pulse. We may want to keep this class - separate for future Near Time sweeps - """ - - def __init__(self, sweeper, qubit=None, sequence=None, pulse=None): - self.sweeper = sweeper - """Qibolab sweeper.""" - - # Do something with the pulse coming here - if sweeper.parameter is Parameter.bias: - if isinstance(qubit, Qubit): - pulse = FluxPulse( - start=0, - duration=sequence.duration + sequence.start, - amplitude=1, - shape="Rectangular", - channel=qubit.flux.name, - qubit=qubit.name, - ) - self.signal = f"flux{qubit.name}" - if isinstance(qubit, Coupler): - pulse = CouplerFluxPulse( - start=0, - duration=sequence.duration + sequence.start, - amplitude=1, - shape="Rectangular", - channel=qubit.flux.name, - qubit=qubit.name, - ) - self.signal = f"couplerflux{qubit.name}" - - self.pulse = pulse - - self.zhpulse = lo.pulse_library.const( - uid=(f"{pulse.type.name.lower()}_{pulse.qubit}_"), - length=round(pulse.duration * NANO_TO_SECONDS, 9), - amplitude=pulse.amplitude, - ) - - elif sweeper.parameter is Parameter.start: - if pulse: - self.pulse = pulse - self.signal = f"flux{qubit}" - - self.zhpulse = ZhPulse(pulse).zhpulse - - # Need something better to store multiple sweeps on the same pulse - self.zhsweeper = self.select_sweeper(sweeper) - - @staticmethod # pylint: disable=R0903 - def select_sweeper(sweeper): - """Sweeper translation.""" - if sweeper.parameter is Parameter.bias: - return lo.SweepParameter( - uid=sweeper.parameter.name, - values=sweeper.values, - ) - if sweeper.parameter is Parameter.start: - return lo.SweepParameter( - uid=sweeper.parameter.name, - values=sweeper.values * NANO_TO_SECONDS, - ) - - -class Zurich(Controller): - """Zurich driver main class.""" - - PortType = ZhPort - - def __init__( - self, name, device_setup, use_emulation=False, time_of_flight=0.0, smearing=0.0 - ): - self.name = name - "Setup name (str)" - - self.emulation = use_emulation - "Enable emulation mode (bool)" - self.is_connected = False - "Is the device connected ? (bool)" - - self.signal_map = {} - "Signals to lines mapping" - self.calibration = lo.Calibration() - "Zurich calibration object)" - - self.device_setup = device_setup - self.session = None - self.device = None - "Zurich device parameters for connection" - - self.time_of_flight = time_of_flight - self.smearing = smearing - self.chip = "iqm5q" - "Parameters read from the runcard not part of ExecutionParameters" - - self.exp = None - self.experiment = None - self.exp_options = ExecutionParameters() - self.exp_calib = lo.Calibration() - self.results = None - "Zurich experiment definitions" - - self.bounds = Bounds( - waveforms=int(4e4), - readout=250, - instructions=int(1e6), - ) - - self.acquisition_type = None - "To store if the AcquisitionType.SPECTROSCOPY needs to be enabled by parsing the sequence" - - self.sequence = defaultdict(list) - "Zurich pulse sequence" - self.sequence_qibo = None - # Remove if able - self.sub_sequences = {} - "Sub sequences between each measurement" - - self.sweepers = [] - self.nt_sweeps = None - "Storing sweepers" - # Improve the storing of multiple sweeps - self._ports = {} - self.settings = None - - @property - def sampling_rate(self): - return SAMPLING_RATE - - def connect(self): - if self.is_connected is False: - # To fully remove logging #configure_logging=False - # I strongly advise to set it to 20 to have time estimates of the experiment duration! - self.session = lo.Session(self.device_setup, log_level=20) - self.device = self.session.connect(do_emulation=self.emulation) - self.is_connected = True - - def disconnect(self): - if self.is_connected: - self.device = self.session.disconnect() - self.is_connected = False - - def calibration_step(self, qubits, couplers, options): - """Zurich general pre experiment calibration definitions. - - Change to get frequencies from sequence - """ - - for coupler in couplers.values(): - self.register_couplerflux_line(coupler) - - for qubit in qubits.values(): - if qubit.flux is not None: - self.register_flux_line(qubit) - if len(self.sequence[f"drive{qubit.name}"]) != 0: - self.register_drive_line( - qubit=qubit, - intermediate_frequency=qubit.drive_frequency - - qubit.drive.local_oscillator.frequency, - ) - if len(self.sequence[f"readout{qubit.name}"]) != 0: - self.register_readout_line( - qubit=qubit, - intermediate_frequency=qubit.readout_frequency - - qubit.readout.local_oscillator.frequency, - options=options, - ) - if options.fast_reset is not False: - if len(self.sequence[f"drive{qubit.name}"]) == 0: - self.register_drive_line( - qubit=qubit, - intermediate_frequency=qubit.drive_frequency - - qubit.drive.local_oscillator.frequency, - ) - self.device_setup.set_calibration(self.calibration) - - def register_readout_line(self, qubit, intermediate_frequency, options): - """Registers qubit measure and acquire lines to calibration and signal - map. - - Note - ---- - To allow debugging with and oscilloscope, just set the following:: - - self.calibration[f"/logical_signal_groups/q{q}/measure_line"] = lo.SignalCalibration( - ..., - local_oscillator=lo.Oscillator( - ... - frequency=0.0, - ), - ..., - port_mode=lo.PortMode.LF, - ..., - ) - """ - - q = qubit.name # pylint: disable=C0103 - self.signal_map[f"measure{q}"] = self.device_setup.logical_signal_groups[ - f"q{q}" - ].logical_signals["measure_line"] - self.calibration[f"/logical_signal_groups/q{q}/measure_line"] = ( - lo.SignalCalibration( - oscillator=lo.Oscillator( - frequency=intermediate_frequency, - modulation_type=lo.ModulationType.SOFTWARE, - ), - local_oscillator=lo.Oscillator( - uid="lo_shfqa_m" + str(q), - frequency=int(qubit.readout.local_oscillator.frequency), - ), - range=qubit.readout.power_range, - port_delay=None, - delay_signal=0, - ) - ) - - self.signal_map[f"acquire{q}"] = self.device_setup.logical_signal_groups[ - f"q{q}" - ].logical_signals["acquire_line"] - - oscillator = lo.Oscillator( - frequency=intermediate_frequency, - modulation_type=lo.ModulationType.SOFTWARE, - ) - threshold = None - - if options.acquisition_type == AcquisitionType.DISCRIMINATION: - if qubit.kernel is not None: - # Kernels don't work with the software modulation on the acquire signal - oscillator = None - else: - # To keep compatibility with angle and threshold discrimination (Remove when possible) - threshold = qubit.threshold - - self.calibration[f"/logical_signal_groups/q{q}/acquire_line"] = ( - lo.SignalCalibration( - oscillator=oscillator, - range=qubit.feedback.power_range, - port_delay=self.time_of_flight * NANO_TO_SECONDS, - threshold=threshold, - ) - ) - - def register_drive_line(self, qubit, intermediate_frequency): - """Registers qubit drive line to calibration and signal map.""" - q = qubit.name # pylint: disable=C0103 - self.signal_map[f"drive{q}"] = self.device_setup.logical_signal_groups[ - f"q{q}" - ].logical_signals["drive_line"] - self.calibration[f"/logical_signal_groups/q{q}/drive_line"] = ( - lo.SignalCalibration( - oscillator=lo.Oscillator( - frequency=intermediate_frequency, - modulation_type=lo.ModulationType.HARDWARE, - ), - local_oscillator=lo.Oscillator( - uid="lo_shfqc" + str(q), - frequency=int(qubit.drive.local_oscillator.frequency), - ), - range=qubit.drive.power_range, - port_delay=None, - delay_signal=0, - ) - ) - - def register_flux_line(self, qubit): - """Registers qubit flux line to calibration and signal map.""" - q = qubit.name # pylint: disable=C0103 - self.signal_map[f"flux{q}"] = self.device_setup.logical_signal_groups[ - f"q{q}" - ].logical_signals["flux_line"] - self.calibration[f"/logical_signal_groups/q{q}/flux_line"] = ( - lo.SignalCalibration( - range=qubit.flux.power_range, - port_delay=None, - delay_signal=0, - voltage_offset=qubit.flux.offset, - ) - ) - - def register_couplerflux_line(self, coupler): - """Registers qubit flux line to calibration and signal map.""" - c = coupler.name # pylint: disable=C0103 - self.signal_map[f"couplerflux{c}"] = self.device_setup.logical_signal_groups[ - f"qc{c}" - ].logical_signals["flux_line"] - self.calibration[f"/logical_signal_groups/qc{c}/flux_line"] = ( - lo.SignalCalibration( - range=coupler.flux.power_range, - port_delay=None, - delay_signal=0, - voltage_offset=coupler.flux.offset, - ) - ) - - def run_exp(self): - """ - Compilation settings, compilation step, execution step and data retrival - - Save a experiment Python object: - self.experiment.save("saved_exp") - - Save a experiment compiled experiment (): - self.exp.save("saved_exp") # saving compiled experiment - """ - self.exp = self.session.compile( - self.experiment, compiler_settings=COMPILER_SETTINGS - ) - # self.exp.save_compiled_experiment("saved_exp") - self.results = self.session.run(self.exp) - - @staticmethod - def frequency_from_pulses(qubits, sequence): - """Gets the frequencies from the pulses to the qubits.""" - # Implement Dual drive frequency experiments, we don't have any for now - for pulse in sequence: - qubit = qubits[pulse.qubit] - if pulse.type is PulseType.READOUT: - qubit.readout_frequency = pulse.frequency - if pulse.type is PulseType.DRIVE: - qubit.drive_frequency = pulse.frequency - - def create_sub_sequence( - self, - line_name: str, - quantum_elements: Union[Dict[str, Qubit], Dict[str, Coupler]], - ): - """Create a list of sequences for each measurement. - - Args: - line_name (str): Name of the line from which extract the sequence. - quantum_elements (dict[str, Qubit]|dict[str, Coupler]): qubits or couplers for the platform. - """ - for quantum_element in quantum_elements.values(): - q = quantum_element.name # pylint: disable=C0103 - measurements = self.sequence[f"readout{q}"] - pulses = self.sequence[f"{line_name}{q}"] - pulse_sequences = [[] for _ in measurements] - pulse_sequences.append([]) - measurement_index = 0 - for pulse in pulses: - if measurement_index < len(measurements): - if pulse.pulse.finish > measurements[measurement_index].pulse.start: - measurement_index += 1 - pulse_sequences[measurement_index].append(pulse) - self.sub_sequences[f"{line_name}{q}"] = pulse_sequences - - def create_sub_sequences( - self, qubits: Dict[str, Qubit], couplers: Dict[str, Coupler] - ): - """Create subsequences for different lines (drive, flux, coupler flux). - - Args: - qubits (dict[str, Qubit]): qubits for the platform. - couplers (dict[str, Coupler]): couplers for the platform. - """ - self.sub_sequences = {} - self.create_sub_sequence("drive", qubits) - self.create_sub_sequence("flux", qubits) - self.create_sub_sequence("couplerflux", couplers) - - def experiment_flow( - self, - qubits: Dict[str, Qubit], - couplers: Dict[str, Coupler], - sequence: PulseSequence, - options: ExecutionParameters, - ): - """Create the experiment object for the devices, following the steps - separated one on each method: - - Translation, Calibration, Experiment Definition. - - Args: - qubits (dict[str, Qubit]): qubits for the platform. - couplers (dict[str, Coupler]): couplers for the platform. - sequence (PulseSequence): sequence of pulses to be played in the experiment. - """ - self.sequence_zh(sequence, qubits, couplers) - self.create_sub_sequences(qubits, couplers) - self.calibration_step(qubits, couplers, options) - self.create_exp(qubits, couplers, options) - - # pylint: disable=W0221 - def play(self, qubits, couplers, sequence, options): - """Play pulse sequence.""" - self.signal_map = {} - - self.frequency_from_pulses(qubits, sequence) - - self.experiment_flow(qubits, couplers, sequence, options) - - self.run_exp() - - # Get the results back - results = {} - for qubit in qubits.values(): - q = qubit.name # pylint: disable=C0103 - if len(self.sequence[f"readout{q}"]) != 0: - for i, ropulse in enumerate(self.sequence[f"readout{q}"]): - data = np.array(self.results.get_data(f"sequence{q}_{i}")) - if options.acquisition_type is AcquisitionType.DISCRIMINATION: - data = ( - np.ones(data.shape) - data.real - ) # Probability inversion patch - serial = ropulse.pulse.serial - qubit = ropulse.pulse.qubit - results[serial] = results[qubit] = options.results_type(data) - - # html containing the pulse sequence schedule - # lo.show_pulse_sheet("pulses", self.exp) - return results - - def sequence_zh(self, sequence, qubits, couplers): - """Qibo sequence to Zurich sequence.""" - # Define and assign the sequence - zhsequence = defaultdict(list) - self.sequence_qibo = sequence - - # Fill the sequences with pulses according to their lines in temporal order - for pulse in sequence: - zhsequence[f"{pulse.type.name.lower()}{pulse.qubit}"].append(ZhPulse(pulse)) - - # Mess that gets the sweeper and substitutes the pulse it sweeps in the right place - - def nt_loop(sweeper): - if not self.nt_sweeps: - self.nt_sweeps = [sweeper] - else: - self.nt_sweeps.append(sweeper) - self.sweepers.remove(sweeper) - - for sweeper in self.sweepers.copy(): - if sweeper.parameter.name in SWEEPER_SET: - for pulse in sweeper.pulses: - aux_list = zhsequence[f"{pulse.type.name.lower()}{pulse.qubit}"] - if ( - sweeper.parameter is Parameter.frequency - and pulse.type is PulseType.READOUT - ): - self.acquisition_type = lo.AcquisitionType.SPECTROSCOPY - if ( - sweeper.parameter is Parameter.amplitude - and pulse.type is PulseType.READOUT - ): - self.acquisition_type = lo.AcquisitionType.SPECTROSCOPY - nt_loop(sweeper) - for element in aux_list: - if pulse == element.pulse: - if isinstance(aux_list[aux_list.index(element)], ZhPulse): - if isinstance(pulse, CouplerFluxPulse): - aux_list[aux_list.index(element)] = ZhSweeper( - pulse, sweeper, couplers[pulse.qubit] - ) - else: - aux_list[aux_list.index(element)] = ZhSweeper( - pulse, sweeper, qubits[pulse.qubit] - ) - elif isinstance( - aux_list[aux_list.index(element)], ZhSweeper - ): - if isinstance(pulse, CouplerFluxPulse): - aux_list[aux_list.index(element)].add_sweeper( - sweeper, couplers[pulse.qubit] - ) - else: - aux_list[aux_list.index(element)].add_sweeper( - sweeper, qubits[pulse.qubit] - ) - - if sweeper.parameter.name in SWEEPER_BIAS: - nt_loop(sweeper) - - # This may not place the Zhsweeper when the start occurs among different sections or lines - if sweeper.parameter.name in SWEEPER_START: - pulse = sweeper.pulses[0] - aux_list = zhsequence[f"{pulse.type.name.lower()}{pulse.qubit}"] - for element in aux_list: - if pulse == element.pulse: - if isinstance(aux_list[aux_list.index(element)], ZhPulse): - aux_list.insert( - aux_list.index(element), - ZhSweeperLine(sweeper, pulse.qubit, sequence, pulse), - ) - break - - self.sequence = zhsequence - - def create_exp(self, qubits, couplers, options): - """Zurich experiment initialization using their Experiment class.""" - - # Setting experiment signal lines - signals = [] - for coupler in couplers.values(): - signals.append(lo.ExperimentSignal(f"couplerflux{coupler.name}")) - - for qubit in qubits.values(): - q = qubit.name # pylint: disable=C0103 - if len(self.sequence[f"drive{q}"]) != 0: - signals.append(lo.ExperimentSignal(f"drive{q}")) - if qubit.flux is not None: - signals.append(lo.ExperimentSignal(f"flux{q}")) - if len(self.sequence[f"readout{q}"]) != 0: - signals.append(lo.ExperimentSignal(f"measure{q}")) - signals.append(lo.ExperimentSignal(f"acquire{q}")) - if options.fast_reset is not False: - if len(self.sequence[f"drive{q}"]) == 0: - signals.append(lo.ExperimentSignal(f"drive{q}")) - - exp = lo.Experiment( - uid="Sequence", - signals=signals, - ) - - if self.acquisition_type: - acquisition_type = self.acquisition_type - self.acquisition_type = None - else: - acquisition_type = ACQUISITION_TYPE[options.acquisition_type] - averaging_mode = AVERAGING_MODE[options.averaging_mode] - exp_options = replace( - options, acquisition_type=acquisition_type, averaging_mode=averaging_mode - ) - - exp_calib = lo.Calibration() - # Near Time recursion loop or directly to Real Time recursion loop - if self.nt_sweeps is not None: - self.sweep_recursion_nt(qubits, couplers, exp_options, exp, exp_calib) - else: - self.define_exp(qubits, couplers, exp_options, exp, exp_calib) - - def define_exp(self, qubits, couplers, exp_options, exp, exp_calib): - """Real time definition.""" - with exp.acquire_loop_rt( - uid="shots", - count=exp_options.nshots, - acquisition_type=exp_options.acquisition_type, - averaging_mode=exp_options.averaging_mode, - ): - # Recursion loop for sweepers or just play a sequence - if len(self.sweepers) > 0: - self.sweep_recursion(qubits, couplers, exp, exp_calib, exp_options) - else: - self.select_exp(exp, qubits, couplers, exp_options) - exp.set_calibration(exp_calib) - exp.set_signal_map(self.signal_map) - self.experiment = exp - - def select_exp(self, exp, qubits, couplers, exp_options): - """Build Zurich Experiment selecting the relevant sections.""" - if "coupler" in str(self.sequence): - self.couplerflux(exp, couplers) - if "drive" in str(self.sequence): - if "flux" in str(self.sequence): - self.flux(exp, qubits) - self.drive(exp, qubits) - else: - self.drive(exp, qubits) - elif "flux" in str(self.sequence): - self.flux(exp, qubits) - self.measure_relax( - exp, - qubits, - couplers, - exp_options.relaxation_time, - exp_options.acquisition_type, - ) - if exp_options.fast_reset is not False: - self.fast_reset(exp, qubits, exp_options.fast_reset) - - @staticmethod - def play_sweep_select_single(exp, qubit, pulse, section, parameters, partial_sweep): - """Play Zurich pulse when a single sweeper is involved.""" - if any("amplitude" in param for param in parameters): - pulse.zhpulse.amplitude *= max(pulse.zhsweeper.values) - pulse.zhsweeper.values /= max(pulse.zhsweeper.values) - exp.play( - signal=f"{section}{qubit.name}", - pulse=pulse.zhpulse, - amplitude=pulse.zhsweeper, - phase=pulse.pulse.relative_phase, - ) - elif any("duration" in param for param in parameters): - exp.play( - signal=f"{section}{qubit.name}", - pulse=pulse.zhpulse, - length=pulse.zhsweeper, - phase=pulse.pulse.relative_phase, - ) - elif any("relative_phase" in param for param in parameters): - exp.play( - signal=f"{section}{qubit.name}", - pulse=pulse.zhpulse, - phase=pulse.zhsweeper, # I believe this is the global phase sweep - # increment_oscillator_phase=pulse.zhsweeper, # I believe this is the relative phase sweep - ) - elif "frequency" in partial_sweep.uid or partial_sweep.uid == "start": - exp.play( - signal=f"{section}{qubit.name}", - pulse=pulse.zhpulse, - phase=pulse.pulse.relative_phase, - ) - - # Hardcoded for the flux pulse for 2q gates - @staticmethod - def play_sweep_select_dual(exp, qubit, pulse, section, parameters): - """Play Zurich pulse when two sweepers are involved on the same - pulse.""" - if "amplitude" in parameters and "duration" in parameters: - for sweeper in pulse.zhsweepers: - if sweeper.uid == "amplitude": - sweeper_amp_index = pulse.zhsweepers.index(sweeper) - sweeper.values = sweeper.values.copy() - pulse.zhpulse.amplitude *= max(abs(sweeper.values)) - sweeper.values /= max(abs(sweeper.values)) - else: - sweeper_dur_index = pulse.zhsweepers.index(sweeper) - - exp.play( - signal=f"{section}{qubit.name}", - pulse=pulse.zhpulse, - amplitude=pulse.zhsweepers[sweeper_amp_index], - length=pulse.zhsweepers[sweeper_dur_index], - ) - - def play_sweep(self, exp, qubit, pulse, section): - """Takes care of playing the sweepers and involved pulses for different - options.""" - - if isinstance(pulse, ZhSweeperLine): - if pulse.zhsweeper.uid == "bias": - exp.play( - signal=f"{section}{qubit.name}", - pulse=pulse.zhpulse, - amplitude=pulse.zhsweeper, - ) - else: - parameters = [] - for partial_sweep in pulse.zhsweepers: - parameters.append(partial_sweep.uid) - # Recheck partial sweeps - if len(parameters) == 2: - self.play_sweep_select_dual(exp, qubit, pulse, section, parameters) - else: - self.play_sweep_select_single( - exp, qubit, pulse, section, parameters, partial_sweep - ) - - def couplerflux(self, exp: lo.Experiment, couplers: Dict[str, Coupler]): - """Coupler flux for bias sweep or pulses. - - Args: - exp (lo.Experiment): laboneq experiment on which register sequences. - couplers (dict[str, Coupler]): coupler on which pulses are played. - """ - for coupler in couplers.values(): - c = coupler.name # pylint: disable=C0103 - time = 0 - previous_section = None - for i, sequence in enumerate(self.sub_sequences[f"couplerflux{c}"]): - section_uid = f"sequence_couplerflux{c}_{i}" - with exp.section(uid=section_uid, play_after=previous_section): - for j, pulse in enumerate(sequence): - pulse.zhpulse.uid += f"{i}_{j}" - exp.delay( - signal=f"couplerflux{c}", - time=round(pulse.pulse.start * NANO_TO_SECONDS, 9) - time, - ) - time = round(pulse.pulse.duration * NANO_TO_SECONDS, 9) + round( - pulse.pulse.start * NANO_TO_SECONDS, 9 - ) - if isinstance(pulse, ZhSweeperLine): - self.play_sweep(exp, coupler, pulse, section="couplerflux") - elif isinstance(pulse, ZhSweeper): - self.play_sweep(exp, coupler, pulse, section="couplerflux") - elif isinstance(pulse, ZhPulse): - exp.play(signal=f"couplerflux{c}", pulse=pulse.zhpulse) - previous_section = section_uid - - def flux(self, exp: lo.Experiment, qubits: Dict[str, Qubit]): - """Qubit flux for bias sweep or pulses. - - Args: - exp (lo.Experiment): laboneq experiment on which register sequences. - qubits (dict[str, Qubit]): qubits on which pulses are played. - """ - for qubit in qubits.values(): - q = qubit.name # pylint: disable=C0103 - time = 0 - previous_section = None - for i, sequence in enumerate(self.sub_sequences[f"flux{q}"]): - section_uid = f"sequence_flux{q}_{i}" - with exp.section(uid=section_uid, play_after=previous_section): - for j, pulse in enumerate(sequence): - if not isinstance(pulse, ZhSweeperLine): - pulse.zhpulse.uid += f"{i}_{j}" - exp.delay( - signal=f"flux{q}", - time=round(pulse.pulse.start * NANO_TO_SECONDS, 9) - - time, - ) - time = round( - pulse.pulse.duration * NANO_TO_SECONDS, 9 - ) + round(pulse.pulse.start * NANO_TO_SECONDS, 9) - if isinstance(pulse, ZhSweeperLine): - self.play_sweep(exp, qubit, pulse, section="flux") - elif isinstance(pulse, ZhSweeper): - self.play_sweep(exp, qubit, pulse, section="flux") - elif isinstance(pulse, ZhPulse): - exp.play(signal=f"flux{q}", pulse=pulse.zhpulse) - previous_section = section_uid - - def drive(self, exp: lo.Experiment, qubits: Dict[str, Qubit]): - """Qubit driving pulses. - - Args: - exp (lo.Experiment): laboneq experiment on which register sequences. - qubits (dict[str, Qubit]): qubits on which pulses are played. - """ - for qubit in qubits.values(): - q = qubit.name # pylint: disable=C0103 - time = 0 - previous_section = None - for i, sequence in enumerate(self.sub_sequences[f"drive{q}"]): - section_uid = f"sequence_drive{q}_{i}" - with exp.section(uid=section_uid, play_after=previous_section): - for j, pulse in enumerate(sequence): - if not isinstance(pulse, ZhSweeperLine): - exp.delay( - signal=f"drive{q}", - time=round(pulse.pulse.start * NANO_TO_SECONDS, 9) - - time, - ) - time = round( - pulse.pulse.duration * NANO_TO_SECONDS, 9 - ) + round(pulse.pulse.start * NANO_TO_SECONDS, 9) - pulse.zhpulse.uid += f"{i}_{j}" - if isinstance(pulse, ZhSweeper): - self.play_sweep(exp, qubit, pulse, section="drive") - elif isinstance(pulse, ZhPulse): - exp.play( - signal=f"drive{q}", - pulse=pulse.zhpulse, - phase=pulse.pulse.relative_phase, - ) - elif isinstance(pulse, ZhSweeperLine): - exp.delay(signal=f"drive{q}", time=pulse.zhsweeper) - - if len(self.sequence[f"readout{q}"]) > 0 and isinstance( - self.sequence[f"readout{q}"][0], ZhSweeperLine - ): - exp.delay( - signal=f"drive{q}", - time=self.sequence[f"readout{q}"][0].zhsweeper, - ) - self.sequence[f"readout{q}"].remove( - self.sequence[f"readout{q}"][0] - ) - - previous_section = section_uid - - def find_subsequence_finish( - self, - measurement_number: int, - line: str, - quantum_elements: Union[Dict[str, Qubit], Dict[str, Coupler]], - ) -> Tuple[int, str]: - """Find the finishing time and qubit for a given sequence. - - Args: - measurement_number (int): number of the measure pulse. - line (str): line from which measure the finishing time. - e.g.: "drive", "flux", "couplerflux" - quantum_elements (dict[str, Qubit]|dict[str, Coupler]): qubits or couplers from - which measure the finishing time. - - Returns: - time_finish (int): Finish time of the last pulse of the subsequence - before the measurement. - sequence_finish (str): Name of the last subsequence before measurement. If - there are no sequences after the previous measurement, use "None". - """ - time_finish = 0 - sequence_finish = "None" - for quantum_element in quantum_elements: - if ( - len(self.sub_sequences[f"{line}{quantum_element}"]) - <= measurement_number - ): - continue - for pulse in self.sub_sequences[f"{line}{quantum_element}"][ - measurement_number - ]: - if pulse.pulse.finish > time_finish: - time_finish = pulse.pulse.finish - sequence_finish = f"{line}{quantum_element}" - return time_finish, sequence_finish - - # For pulsed spectroscopy, set integration_length and either measure_pulse or measure_pulse_length. - # For CW spectroscopy, set only integration_length and do not specify the measure signal. - # For all other measurements, set either length or pulse for both the measure pulse and integration kernel. - def measure_relax(self, exp, qubits, couplers, relaxation_time, acquisition_type): - """Qubit readout pulse, data acquisition and qubit relaxation.""" - readout_schedule = defaultdict(list) - qubit_readout_schedule = defaultdict(list) - iq_angle_readout_schedule = defaultdict(list) - for qubit in qubits.values(): - q = qubit.name # pylint: disable=C0103 - iq_angle = qubit.iq_angle - if len(self.sequence[f"readout{q}"]) != 0: - for i, pulse in enumerate(self.sequence[f"readout{q}"]): - readout_schedule[i].append(pulse) - qubit_readout_schedule[i].append(q) - iq_angle_readout_schedule[i].append(iq_angle) - - weights = {} - for i, (pulses, qubits_readout, iq_angles) in enumerate( - zip( - readout_schedule.values(), - qubit_readout_schedule.values(), - iq_angle_readout_schedule.values(), - ) - ): - qd_finish = self.find_subsequence_finish(i, "drive", qubits_readout) - qf_finish = self.find_subsequence_finish(i, "flux", qubits_readout) - cf_finish = self.find_subsequence_finish(i, "couplerflux", couplers) - finish_times = np.array( - [ - qd_finish, - qf_finish, - cf_finish, - ], - dtype=[("finish", "i4"), ("line", "U15")], - ) - latest_sequence = finish_times[finish_times["finish"].argmax()] - if latest_sequence["line"] == "None": - play_after = None - else: - play_after = f"sequence_{latest_sequence['line']}_{i}" - # Section on the outside loop allows for multiplex - with exp.section(uid=f"sequence_measure_{i}", play_after=play_after): - for pulse, q, iq_angle in zip(pulses, qubits_readout, iq_angles): - pulse.zhpulse.uid += str(i) - - exp.delay( - signal=f"acquire{q}", - time=self.smearing * NANO_TO_SECONDS, - ) - - if ( - qubits[q].kernel is not None - and acquisition_type == lo.AcquisitionType.DISCRIMINATION - ): - kernel = qubits[q].kernel - weight = lo.pulse_library.sampled_pulse_complex( - uid="weight" + str(q), - samples=kernel * np.exp(1j * iq_angle), - ) - - else: - if i == 0: - if acquisition_type == lo.AcquisitionType.DISCRIMINATION: - weight = lo.pulse_library.sampled_pulse_complex( - samples=np.ones( - [ - int( - pulse.pulse.duration * 2 - - 3 * self.smearing * NANO_TO_SECONDS - ) - ] - ) - * np.exp(1j * iq_angle), - uid="weights" + str(q), - ) - weights[q] = weight - else: - # TODO: Patch for multiple readouts: Remove different uids - weight = lo.pulse_library.const( - uid="weight" + str(q), - length=round( - pulse.pulse.duration * NANO_TO_SECONDS, 9 - ) - - 1.5 * self.smearing * NANO_TO_SECONDS, - amplitude=1, - ) - - weights[q] = weight - elif i != 0: - weight = weights[q] - - measure_pulse_parameters = {"phase": 0} - - if i == len(self.sequence[f"readout{q}"]) - 1: - reset_delay = relaxation_time * NANO_TO_SECONDS - else: - # Here time of flight or not ? - reset_delay = 0 # self.time_of_flight * NANO_TO_SECONDS - - exp.measure( - acquire_signal=f"acquire{q}", - handle=f"sequence{q}_{i}", - integration_kernel=weight, - integration_kernel_parameters=None, - integration_length=None, - measure_signal=f"measure{q}", - measure_pulse=pulse.zhpulse, - measure_pulse_length=round( - pulse.pulse.duration * NANO_TO_SECONDS, 9 - ), - measure_pulse_parameters=measure_pulse_parameters, - measure_pulse_amplitude=None, - acquire_delay=self.time_of_flight * NANO_TO_SECONDS, - reset_delay=reset_delay, - ) - - def fast_reset(self, exp, qubits, fast_reset): - """ - Conditional fast reset after readout - small delay for signal processing - This is a very naive approach that can be improved by repeating this step until - we reach non fast reset fidelity - https://quantum-computing.ibm.com/lab/docs/iql/manage/systems/reset/backend_reset - """ - log.warning("Im fast resetting") - for qubit_name in self.sequence_qibo.qubits: - qubit = qubits[qubit_name] - q = qubit.name # pylint: disable=C0103 - with exp.section(uid=f"fast_reset{q}", play_after=f"sequence_measure"): - with exp.match_local(handle=f"sequence{q}"): - with exp.case(state=0): - pass - with exp.case(state=1): - pulse = ZhPulse(qubit.native_gates.RX.pulse(0, 0)) - exp.play(signal=f"drive{q}", pulse=pulse.zhpulse) - - @staticmethod - def rearrange_sweepers(sweepers: List[Sweeper]) -> Tuple[np.ndarray, List[Sweeper]]: - """Rearranges sweepers from qibocal based on device hardware - limitations. - - Frequency sweepers must be applied before (on the outer loop) bias or amplitude sweepers. - - Args: - sweepers (list[Sweeper]): list of sweepers used in the experiment. - - Returns: - rearranging_axes (np.ndarray): array of shape (2,) and dtype=int containing - the indexes of the sweepers to be swapped. Defaults to np.array([0, 0]) - if no swap is needed. - sweepers (list[Sweeper]): updated list of sweepers used in the experiment. If - sweepers must be swapped, the list is updated accordingly. - """ - rearranging_axes = np.zeros(2, dtype=int) - if len(sweepers) == 2: - if sweepers[1].parameter is Parameter.frequency: - if not sweepers[0].pulses is None: - if (sweepers[0].parameter is Parameter.bias) or ( - not sweepers[0].parameter is Parameter.amplitude - and sweepers[0].pulses[0].type is not PulseType.READOUT - ): - rearranging_axes[:] = [1, 0] - sweepers = sweepers[::-1] - log.warning("Sweepers were reordered") - return rearranging_axes, sweepers - - def sweep(self, qubits, couplers, sequence: PulseSequence, options, *sweepers): - """Play pulse and sweepers sequence.""" - - self.signal_map = {} - self.nt_sweeps = None - sweepers = list(sweepers) - rearranging_axes, sweepers = self.rearrange_sweepers(sweepers) - self.sweepers = sweepers - - self.frequency_from_pulses(qubits, sequence) - - self.experiment_flow(qubits, couplers, sequence, options) - self.run_exp() - - # Get the results back - results = {} - for qubit in qubits.values(): - q = qubit.name # pylint: disable=C0103 - if len(self.sequence[f"readout{q}"]) != 0: - for i, ropulse in enumerate(self.sequence[f"readout{q}"]): - exp_res = self.results.get_data(f"sequence{q}_{i}") - # if using singleshot, the first axis contains shots, - # i.e.: (nshots, sweeper_1, sweeper_2) - # if using integration: (sweeper_1, sweeper_2) - if options.averaging_mode is AveragingMode.SINGLESHOT: - rearranging_axes += 1 - # Reorder dimensions - data = np.moveaxis( - exp_res, rearranging_axes[0], rearranging_axes[1] - ) - if options.acquisition_type is AcquisitionType.DISCRIMINATION: - data = ( - np.ones(data.shape) - data.real - ) # Probability inversion patch - - serial = ropulse.pulse.serial - qubit = ropulse.pulse.qubit - results[serial] = results[qubit] = options.results_type(data) - - # html containing the pulse sequence schedule - # lo.show_pulse_sheet("pulses", self.exp) - return results - - def sweep_recursion(self, qubits, couplers, exp, exp_calib, exp_options): - """Sweepers recursion for multiple nested Real Time sweepers.""" - - sweeper = self.sweepers[0] - - i = len(self.sweepers) - 1 - self.sweepers.remove(sweeper) - parameter = None - - if sweeper.parameter is Parameter.frequency: - for pulse in sweeper.pulses: - line = "drive" if pulse.type is PulseType.DRIVE else "measure" - zhsweeper = ZhSweeper( - pulse, sweeper, qubits[sweeper.pulses[0].qubit] - ).zhsweeper - zhsweeper.uid = "frequency" # Changing the name from "frequency" breaks it f"frequency_{i} - exp_calib[f"{line}{pulse.qubit}"] = lo.SignalCalibration( - oscillator=lo.Oscillator( - frequency=zhsweeper, - modulation_type=lo.ModulationType.HARDWARE, - ) - ) - if sweeper.parameter is Parameter.amplitude: - for pulse in sweeper.pulses: - pulse = pulse.copy() - pulse.amplitude *= max(abs(sweeper.values)) - - # Proper copy(sweeper) here if we want to keep the sweepers - # sweeper_aux = copy.copy(sweeper) - aux_max = max(abs(sweeper.values)) - - sweeper.values /= aux_max - parameter = ZhSweeper( - pulse, sweeper, qubits[sweeper.pulses[0].qubit] - ).zhsweeper - sweeper.values *= aux_max - - if sweeper.parameter is Parameter.bias: - if sweeper.qubits: - for qubit in sweeper.qubits: - parameter = ZhSweeperLine( - sweeper, qubit, self.sequence_qibo - ).zhsweeper - if sweeper.couplers: - for qubit in sweeper.couplers: - parameter = ZhSweeperLine( - sweeper, qubit, self.sequence_qibo - ).zhsweeper - - elif sweeper.parameter is Parameter.start: - parameter = ZhSweeperLine(sweeper).zhsweeper - - elif parameter is None: - parameter = ZhSweeper( - sweeper.pulses[0], sweeper, qubits[sweeper.pulses[0].qubit] - ).zhsweeper - - with exp.sweep( - uid=f"sweep_{sweeper.parameter.name.lower()}_{i}", # This uid trouble double freq ??? - parameter=parameter, - reset_oscillator_phase=True, # Should we reset this phase ??? - ): - if len(self.sweepers) > 0: - self.sweep_recursion(qubits, couplers, exp, exp_calib, exp_options) - else: - self.select_exp(exp, qubits, couplers, exp_options) - - def find_instrument_address( - self, quantum_element: Union[Qubit, Coupler], parameter: str - ) -> str: - """Find path of the instrument connected to a specified line and - qubit/coupler. - - Args: - quantum_element (Qubit | Coupler): qubits or couplers on which perform the near time sweep. - parameter (str): parameter on which perform the near time sweep. - """ - line_names = { - "bias": "flux", - "amplitude": "drive", - } - line_name = line_names[parameter] - channel_uid = ( - self.device_setup.logical_signal_groups[f"q{quantum_element.name}"] - .logical_signals[f"{line_name}_line"] - .physical_channel.uid - ) - channel_name = channel_uid.split("/")[0] - instruments = self.device_setup.instruments - for instrument in instruments: - if instrument.uid == channel_name: - return instrument.address - raise RuntimeError( - f"Could not find instrument for {quantum_element} {line_name}" - ) - - def sweep_recursion_nt( - self, - qubits: Dict[str, Qubit], - couplers: Dict[str, Coupler], - options: ExecutionParameters, - exp: lo.Experiment, - exp_calib: lo.Calibration, - ): - """Sweepers recursion for Near Time sweepers. Faster than regular - software sweepers as they are executed on the actual device by - (software ? or slower hardware ones) - - You want to avoid them so for now they are implement for a - specific sweep. - """ - - log.info("nt Loop") - - sweeper = self.nt_sweeps[0] - - i = len(self.nt_sweeps) - 1 - self.nt_sweeps.remove(sweeper) - - parameter = None - - if sweeper.parameter is Parameter.bias: - if sweeper.qubits: - for qubit in sweeper.qubits: - zhsweeper = ZhSweeperLine( - sweeper, qubit, self.sequence_qibo - ).zhsweeper - zhsweeper.uid = "bias" - path = self.find_instrument_address(qubit, "bias") - - parameter = copy.deepcopy(zhsweeper) - parameter.values += qubit.flux.offset - device_path = f"{path}/sigouts/0/offset" - - elif sweeper.parameter is Parameter.amplitude: - for pulse in sweeper.pulses: - pulse = pulse.copy() - pulse.amplitude *= max(abs(sweeper.values)) - - # Proper copy(sweeper) here - # sweeper_aux = copy.copy(sweeper) - aux_max = max(abs(sweeper.values)) - - sweeper.values /= aux_max - zhsweeper = ZhSweeper( - pulse, sweeper, qubits[sweeper.pulses[0].qubit] - ).zhsweeper - sweeper.values *= aux_max - - zhsweeper.uid = "amplitude" - path = self.find_instrument_address( - qubits[sweeper.pulses[0].qubit], "amplitude" - ) - parameter = zhsweeper - device_path = ( - f"/{path}/qachannels/*/oscs/0/gain" # Hardcoded SHFQA device - ) - - elif parameter is None: # can it be accessed? - parameter = ZhSweeper( - sweeper.pulses[0], sweeper, qubits[sweeper.pulses[0].qubit] - ).zhsweeper - device_path = f"/{path}/qachannels/*/oscs/0/gain" # Hardcoded SHFQA device - - with exp.sweep( - uid=f"sweep_{sweeper.parameter.name.lower()}_{i}", - parameter=parameter, - ): - exp.set_node( - path=device_path, - value=parameter, - ) - - if len(self.nt_sweeps) > 0: - self.sweep_recursion_nt(qubits, couplers, options, exp, exp_calib) - else: - self.define_exp(qubits, couplers, options, exp, exp_calib) - - def play_sim(self, qubits, sequence, options, sim_time): - """Play pulse sequence.""" - - self.experiment_flow(qubits, sequence, options) # missing couplers? - self.run_sim(sim_time) - - def run_sim(self, sim_time): - """Run the simulation. - - Args: - sim_time (float): Time[s] to simulate starting from 0 - """ - # create a session - self.sim_session = lo.Session(self.device_setup) - # connect to session - self.sim_device = self.sim_session.connect(do_emulation=True) - self.exp = self.sim_session.compile( - self.experiment, compiler_settings=COMPILER_SETTINGS - ) - - # Plot simulated output signals with helper function - plot_simulation( - self.exp, - start_time=0, - length=sim_time, - plot_width=10, - plot_height=3, - ) diff --git a/src/qibolab/instruments/zhinst/__init__.py b/src/qibolab/instruments/zhinst/__init__.py new file mode 100644 index 000000000..6eff487e6 --- /dev/null +++ b/src/qibolab/instruments/zhinst/__init__.py @@ -0,0 +1,4 @@ +from .executor import Zurich +from .pulse import ZhPulse +from .sweep import ProcessedSweeps, classify_sweepers +from .util import acquire_channel_name, measure_channel_name diff --git a/src/qibolab/instruments/zhinst/executor.py b/src/qibolab/instruments/zhinst/executor.py new file mode 100644 index 000000000..ff73fde62 --- /dev/null +++ b/src/qibolab/instruments/zhinst/executor.py @@ -0,0 +1,739 @@ +"""Executing pulse sequences on a Zurich Instruments devices.""" + +import re +from collections import defaultdict +from dataclasses import dataclass, replace +from typing import Any, Optional + +import laboneq.simple as lo +import numpy as np +from qibo.config import log + +from qibolab import AcquisitionType, AveragingMode, ExecutionParameters +from qibolab.couplers import Coupler +from qibolab.instruments.abstract import Controller +from qibolab.instruments.port import Port +from qibolab.pulses import PulseSequence, PulseType +from qibolab.qubits import Qubit +from qibolab.sweeper import Parameter, Sweeper +from qibolab.unrolling import Bounds + +from .pulse import ZhPulse +from .sweep import ProcessedSweeps, classify_sweepers +from .util import ( + NANO_TO_SECONDS, + SAMPLING_RATE, + acquire_channel_name, + measure_channel_name, +) + +COMPILER_SETTINGS = { + "SHFSG_MIN_PLAYWAVE_HINT": 32, + "SHFSG_MIN_PLAYZERO_HINT": 32, + "HDAWG_MIN_PLAYWAVE_HINT": 64, + "HDAWG_MIN_PLAYZERO_HINT": 64, +} +"""Translating to Zurich ExecutionParameters.""" +ACQUISITION_TYPE = { + AcquisitionType.INTEGRATION: lo.AcquisitionType.INTEGRATION, + AcquisitionType.RAW: lo.AcquisitionType.RAW, + AcquisitionType.DISCRIMINATION: lo.AcquisitionType.DISCRIMINATION, +} + +AVERAGING_MODE = { + AveragingMode.CYCLIC: lo.AveragingMode.CYCLIC, + AveragingMode.SINGLESHOT: lo.AveragingMode.SINGLE_SHOT, +} + + +@dataclass +class ZhPort(Port): + name: tuple[str, str] + offset: float = 0.0 + power_range: int = 0 + + +@dataclass +class SubSequence: + """A subsequence is a slice (in time) of a sequence that contains at most + one measurement per qubit. + + When the driver is asked to execute a sequence, it will first split + it into sub-sequences. This is needed so that we can create a + separate laboneq section for each measurement (multiple measurements + per section are not allowed). When splitting a sequence, it is + assumed that 1. a measurement operation can be parallel (in time) to + another measurement operation (i.e. measuring multiple qubits + simultaneously), but other channels (e.g. drive) do not contain any + pulses parallel to measurements, 2. ith measurement on some channel + is in the same subsequence as the ith measurement (if any) on + another measurement channel, 3. all measurements in one subsequence + happen at the same time. + """ + + measurements: list[tuple[str, ZhPulse]] + control_sequence: dict[str, list[ZhPulse]] + + +class Zurich(Controller): + """Driver for a collection of ZI instruments that are automatically + synchronized via ZSync protocol.""" + + PortType = ZhPort + + def __init__(self, name, device_setup, time_of_flight=0.0, smearing=0.0): + super().__init__(name, None) + + self.signal_map = {} + "Signals to lines mapping" + self.calibration = lo.Calibration() + "Zurich calibration object)" + + self.device_setup = device_setup + self.session = None + "Zurich device parameters for connection" + + self.time_of_flight = time_of_flight + self.smearing = smearing + "Parameters read from the runcard not part of ExecutionParameters" + + self.experiment = None + self.results = None + "Zurich experiment definitions" + + self.bounds = Bounds( + waveforms=int(4e4), + readout=250, + instructions=int(1e6), + ) + + self.acquisition_type = None + "To store if the AcquisitionType.SPECTROSCOPY needs to be enabled by parsing the sequence" + + self.sequence = defaultdict(list) + "Zurich pulse sequence" + self.sub_sequences: list[SubSequence] = [] + "Sub sequences between each measurement" + + self.processed_sweeps: Optional[ProcessedSweeps] = None + self.nt_sweeps: list[Sweeper] = [] + self.rt_sweeps: list[Sweeper] = [] + + @property + def sampling_rate(self): + return SAMPLING_RATE + + def connect(self): + if self.is_connected is False: + # To fully remove logging #configure_logging=False + # I strongly advise to set it to 20 to have time estimates of the experiment duration! + self.session = lo.Session(self.device_setup, log_level=20) + _ = self.session.connect() + self.is_connected = True + + def disconnect(self): + if self.is_connected: + _ = self.session.disconnect() + self.is_connected = False + + def calibration_step(self, qubits, couplers, options): + """Zurich general pre experiment calibration definitions. + + Change to get frequencies from sequence + """ + + for coupler in couplers.values(): + self.register_couplerflux_line(coupler) + + for qubit in qubits.values(): + if qubit.flux is not None: + self.register_flux_line(qubit) + if len(self.sequence[qubit.drive.name]) != 0: + self.register_drive_line( + qubit=qubit, + intermediate_frequency=qubit.drive_frequency + - qubit.drive.local_oscillator.frequency, + ) + if len(self.sequence[measure_channel_name(qubit)]) != 0: + self.register_readout_line( + qubit=qubit, + intermediate_frequency=qubit.readout_frequency + - qubit.readout.local_oscillator.frequency, + options=options, + ) + self.device_setup.set_calibration(self.calibration) + + def register_readout_line(self, qubit, intermediate_frequency, options): + """Registers qubit measure and acquire lines to calibration and signal + map. + + Note + ---- + To allow debugging with and oscilloscope, just set the following:: + + self.calibration[f"/logical_signal_groups/q{q}/measure_line"] = lo.SignalCalibration( + ..., + local_oscillator=lo.Oscillator( + ... + frequency=0.0, + ), + ..., + port_mode=lo.PortMode.LF, + ..., + ) + """ + + q = qubit.name # pylint: disable=C0103 + self.signal_map[measure_channel_name(qubit)] = ( + self.device_setup.logical_signal_groups[f"q{q}"].logical_signals[ + "measure_line" + ] + ) + self.calibration[f"/logical_signal_groups/q{q}/measure_line"] = ( + lo.SignalCalibration( + oscillator=lo.Oscillator( + frequency=intermediate_frequency, + modulation_type=lo.ModulationType.SOFTWARE, + ), + local_oscillator=lo.Oscillator( + frequency=int(qubit.readout.local_oscillator.frequency), + ), + range=qubit.readout.power_range, + port_delay=None, + delay_signal=0, + ) + ) + + self.signal_map[acquire_channel_name(qubit)] = ( + self.device_setup.logical_signal_groups[f"q{q}"].logical_signals[ + "acquire_line" + ] + ) + + oscillator = lo.Oscillator( + frequency=intermediate_frequency, + modulation_type=lo.ModulationType.SOFTWARE, + ) + threshold = None + + if options.acquisition_type == AcquisitionType.DISCRIMINATION: + if qubit.kernel is not None: + # Kernels don't work with the software modulation on the acquire signal + oscillator = None + else: + # To keep compatibility with angle and threshold discrimination (Remove when possible) + threshold = qubit.threshold + + self.calibration[f"/logical_signal_groups/q{q}/acquire_line"] = ( + lo.SignalCalibration( + oscillator=oscillator, + range=qubit.feedback.power_range, + port_delay=self.time_of_flight * NANO_TO_SECONDS, + threshold=threshold, + ) + ) + + def register_drive_line(self, qubit, intermediate_frequency): + """Registers qubit drive line to calibration and signal map.""" + q = qubit.name # pylint: disable=C0103 + self.signal_map[qubit.drive.name] = self.device_setup.logical_signal_groups[ + f"q{q}" + ].logical_signals["drive_line"] + self.calibration[f"/logical_signal_groups/q{q}/drive_line"] = ( + lo.SignalCalibration( + oscillator=lo.Oscillator( + frequency=intermediate_frequency, + modulation_type=lo.ModulationType.HARDWARE, + ), + local_oscillator=lo.Oscillator( + frequency=int(qubit.drive.local_oscillator.frequency), + ), + range=qubit.drive.power_range, + port_delay=None, + delay_signal=0, + ) + ) + + def register_flux_line(self, qubit): + """Registers qubit flux line to calibration and signal map.""" + q = qubit.name # pylint: disable=C0103 + self.signal_map[qubit.flux.name] = self.device_setup.logical_signal_groups[ + f"q{q}" + ].logical_signals["flux_line"] + self.calibration[f"/logical_signal_groups/q{q}/flux_line"] = ( + lo.SignalCalibration( + range=qubit.flux.power_range, + port_delay=None, + delay_signal=0, + voltage_offset=qubit.flux.offset, + ) + ) + + def register_couplerflux_line(self, coupler): + """Registers qubit flux line to calibration and signal map.""" + c = coupler.name # pylint: disable=C0103 + self.signal_map[coupler.flux.name] = self.device_setup.logical_signal_groups[ + f"qc{c}" + ].logical_signals["flux_line"] + self.calibration[f"/logical_signal_groups/qc{c}/flux_line"] = ( + lo.SignalCalibration( + range=coupler.flux.power_range, + port_delay=None, + delay_signal=0, + voltage_offset=coupler.flux.offset, + ) + ) + + def run_exp(self): + """ + Compilation settings, compilation step, execution step and data retrival + - Save a experiment Python object: + self.experiment.save("saved_exp") + - Save a experiment compiled experiment (): + self.exp.save("saved_exp") # saving compiled experiment + """ + compiled_experiment = self.session.compile( + self.experiment, compiler_settings=COMPILER_SETTINGS + ) + self.results = self.session.run(compiled_experiment) + + @staticmethod + def frequency_from_pulses(qubits, sequence): + """Gets the frequencies from the pulses to the qubits.""" + for pulse in sequence: + qubit = qubits[pulse.qubit] + if pulse.type is PulseType.READOUT: + qubit.readout_frequency = pulse.frequency + if pulse.type is PulseType.DRIVE: + qubit.drive_frequency = pulse.frequency + + def create_sub_sequences(self, qubits: list[Qubit]) -> list[SubSequence]: + """Create subsequences based on locations of measurements.""" + measure_channels = {measure_channel_name(qb) for qb in qubits} + other_channels = set(self.sequence.keys()) - measure_channels + + measurement_groups = defaultdict(list) + for ch in measure_channels: + for i, pulse in enumerate(self.sequence[ch]): + measurement_groups[i].append((ch, pulse)) + + measurement_starts = {} + for i, group in measurement_groups.items(): + starts = np.array([meas.pulse.start for _, meas in group]) + measurement_starts[i] = max(starts) + + # split all non-measurement channels according to the locations of the measurements + sub_sequences = defaultdict(lambda: defaultdict(list)) + for ch in other_channels: + measurement_index = 0 + for pulse in self.sequence[ch]: + if pulse.pulse.finish > measurement_starts[measurement_index]: + measurement_index += 1 + sub_sequences[measurement_index][ch].append(pulse) + if len(sub_sequences) > len(measurement_groups): + log.warning("There are control pulses after the last measurement start.") + + return [ + SubSequence(measurement_groups[i], sub_sequences[i]) + for i in range(len(measurement_groups)) + ] + + def experiment_flow( + self, + qubits: dict[str, Qubit], + couplers: dict[str, Coupler], + sequence: PulseSequence, + options: ExecutionParameters, + ): + """Create the experiment object for the devices, following the steps + separated one on each method: + + Translation, Calibration, Experiment Definition. + + Args: + qubits (dict[str, Qubit]): qubits for the platform. + couplers (dict[str, Coupler]): couplers for the platform. + sequence (PulseSequence): sequence of pulses to be played in the experiment. + """ + self.sequence = self.sequence_zh(sequence, qubits) + self.sub_sequences = self.create_sub_sequences(list(qubits.values())) + self.calibration_step(qubits, couplers, options) + self.create_exp(qubits, options) + + # pylint: disable=W0221 + def play(self, qubits, couplers, sequence, options): + """Play pulse sequence.""" + return self.sweep(qubits, couplers, sequence, options) + + def sequence_zh( + self, sequence: PulseSequence, qubits: dict[str, Qubit] + ) -> dict[str, list[ZhPulse]]: + """Convert Qibo sequence to a sequence where all pulses are replaced + with ZhPulse instances. + + The resulting object is a dictionary mapping from channel name + to corresponding sequence of ZhPulse instances + """ + # Define and assign the sequence + zhsequence = defaultdict(list) + + # Fill the sequences with pulses according to their lines in temporal order + for pulse in sequence: + if pulse.type == PulseType.READOUT: + ch = measure_channel_name(qubits[pulse.qubit]) + else: + ch = pulse.channel + zhsequence[ch].append(ZhPulse(pulse)) + + if self.processed_sweeps: + for ch, zhpulses in zhsequence.items(): + for zhpulse in zhpulses: + for param, sweep in self.processed_sweeps.sweeps_for_pulse( + zhpulse.pulse + ): + zhpulse.add_sweeper(param, sweep) + + return zhsequence + + def create_exp(self, qubits, options): + """Zurich experiment initialization using their Experiment class.""" + if self.acquisition_type: + acquisition_type = self.acquisition_type + else: + acquisition_type = ACQUISITION_TYPE[options.acquisition_type] + averaging_mode = AVERAGING_MODE[options.averaging_mode] + exp_options = replace( + options, acquisition_type=acquisition_type, averaging_mode=averaging_mode + ) + + signals = [lo.ExperimentSignal(name) for name in self.signal_map.keys()] + exp = lo.Experiment( + uid="Sequence", + signals=signals, + ) + + contexts = self._contexts(exp, exp_options) + self._populate_exp(qubits, exp, exp_options, contexts) + self.set_calibration_for_rt_sweep(exp) + exp.set_signal_map(self.signal_map) + self.experiment = exp + + def _contexts( + self, exp: lo.Experiment, exp_options: ExecutionParameters + ) -> list[tuple[Optional[Sweeper], Any]]: + """To construct a laboneq experiment, we need to first define a certain + sequence of nested contexts. + + This method returns the corresponding sequence of context + managers. + """ + sweep_contexts = [] + for i, sweeper in enumerate(self.nt_sweeps): + ctx = exp.sweep( + uid=f"nt_sweep_{sweeper.parameter.name.lower()}_{i}", + parameter=[ + sweep_param + for sweep_param in self.processed_sweeps.sweeps_for_sweeper(sweeper) + ], + ) + sweep_contexts.append((sweeper, ctx)) + + shots_ctx = exp.acquire_loop_rt( + uid="shots", + count=exp_options.nshots, + acquisition_type=exp_options.acquisition_type, + averaging_mode=exp_options.averaging_mode, + ) + sweep_contexts.append((None, shots_ctx)) + + for i, sweeper in enumerate(self.rt_sweeps): + ctx = exp.sweep( + uid=f"rt_sweep_{sweeper.parameter.name.lower()}_{i}", + parameter=[ + sweep_param + for sweep_param in self.processed_sweeps.sweeps_for_sweeper(sweeper) + ], + reset_oscillator_phase=True, + ) + sweep_contexts.append((sweeper, ctx)) + + return sweep_contexts + + def _populate_exp( + self, + qubits: dict[str, Qubit], + exp: lo.Experiment, + exp_options: ExecutionParameters, + contexts, + ): + """Recursively activate the nested contexts, then define the main + experiment body inside the innermost context.""" + if len(contexts) == 0: + self.select_exp(exp, qubits, exp_options) + return + + sweeper, ctx = contexts[0] + with ctx: + if sweeper in self.nt_sweeps: + self.set_instrument_nodes_for_nt_sweep(exp, sweeper) + self._populate_exp(qubits, exp, exp_options, contexts[1:]) + + def set_calibration_for_rt_sweep(self, exp: lo.Experiment) -> None: + """Set laboneq calibration of parameters that are to be swept in real- + time.""" + if self.processed_sweeps: + calib = lo.Calibration() + for ch in ( + set(self.sequence.keys()) | self.processed_sweeps.channels_with_sweeps() + ): + for param, sweep_param in self.processed_sweeps.sweeps_for_channel(ch): + if param is Parameter.frequency: + calib[ch] = lo.SignalCalibration( + oscillator=lo.Oscillator( + frequency=sweep_param, + modulation_type=lo.ModulationType.HARDWARE, + ) + ) + exp.set_calibration(calib) + + def set_instrument_nodes_for_nt_sweep( + self, exp: lo.Experiment, sweeper: Sweeper + ) -> None: + """In some cases there is no straightforward way to sweep a parameter. + + In these cases we achieve sweeping by directly manipulating the + instrument nodes + """ + for ch, param, sweep_param in self.processed_sweeps.channel_sweeps_for_sweeper( + sweeper + ): + channel_node_path = self.get_channel_node_path(ch) + if param is Parameter.bias: + offset_node_path = f"{channel_node_path}/offset" + exp.set_node(path=offset_node_path, value=sweep_param) + + # This is supposed to happen only for measurement, but we do not validate it here. + if param is Parameter.amplitude: + a, b = re.match(r"(.*)/(\d)/.*", channel_node_path).groups() + gain_node_path = f"{a}/{b}/oscs/{b}/gain" + exp.set_node(path=gain_node_path, value=sweep_param) + + def get_channel_node_path(self, channel_name: str) -> str: + """Return the path of the instrument node corresponding to the given + channel.""" + logical_signal = self.signal_map[channel_name] + for instrument in self.device_setup.instruments: + for conn in instrument.connections: + if conn.remote_path == logical_signal.path: + return f"{instrument.address}/{conn.local_port}" + raise RuntimeError( + f"Could not find instrument node corresponding to channel {channel_name}" + ) + + def select_exp(self, exp, qubits, exp_options): + """Build Zurich Experiment selecting the relevant sections.""" + weights = {} + previous_section = None + for i, seq in enumerate(self.sub_sequences): + section_uid = f"control_{i}" + with exp.section(uid=section_uid, play_after=previous_section): + for ch, pulses in seq.control_sequence.items(): + time = 0 + for pulse in pulses: + if pulse.delay_sweeper: + exp.delay(signal=ch, time=pulse.delay_sweeper) + exp.delay( + signal=ch, + time=round(pulse.pulse.start * NANO_TO_SECONDS, 9) - time, + ) + time = round(pulse.pulse.duration * NANO_TO_SECONDS, 9) + round( + pulse.pulse.start * NANO_TO_SECONDS, 9 + ) + if pulse.zhsweepers: + self.play_sweep(exp, ch, pulse) + else: + exp.play( + signal=ch, + pulse=pulse.zhpulse, + phase=pulse.pulse.relative_phase, + ) + previous_section = section_uid + + if any(m.delay_sweeper is not None for _, m in seq.measurements): + section_uid = f"measurement_delay_{i}" + with exp.section(uid=section_uid, play_after=previous_section): + for ch, m in seq.measurements: + if m.delay_sweeper: + exp.delay(signal=ch, time=m.delay_sweeper) + previous_section = section_uid + + section_uid = f"measure_{i}" + with exp.section(uid=section_uid, play_after=previous_section): + for ch, pulse in seq.measurements: + qubit = qubits[pulse.pulse.qubit] + q = qubit.name + + exp.delay( + signal=acquire_channel_name(qubit), + time=self.smearing * NANO_TO_SECONDS, + ) + + if ( + qubit.kernel is not None + and exp_options.acquisition_type + == lo.AcquisitionType.DISCRIMINATION + ): + weight = lo.pulse_library.sampled_pulse_complex( + samples=qubit.kernel * np.exp(1j * qubit.iq_angle), + ) + + else: + if i == 0: + if ( + exp_options.acquisition_type + == lo.AcquisitionType.DISCRIMINATION + ): + weight = lo.pulse_library.sampled_pulse_complex( + samples=np.ones( + [ + int( + pulse.pulse.duration * 2 + - 3 * self.smearing * NANO_TO_SECONDS + ) + ] + ) + * np.exp(1j * qubit.iq_angle), + ) + weights[q] = weight + else: + weight = lo.pulse_library.const( + length=round( + pulse.pulse.duration * NANO_TO_SECONDS, 9 + ) + - 1.5 * self.smearing * NANO_TO_SECONDS, + amplitude=1, + ) + + weights[q] = weight + elif i != 0: + weight = weights[q] + + measure_pulse_parameters = {"phase": 0} + + if i == len(self.sequence[measure_channel_name(qubit)]) - 1: + reset_delay = exp_options.relaxation_time * NANO_TO_SECONDS + else: + reset_delay = 0 + + exp.measure( + acquire_signal=acquire_channel_name(qubit), + handle=f"sequence{q}_{i}", + integration_kernel=weight, + integration_kernel_parameters=None, + integration_length=None, + measure_signal=measure_channel_name(qubit), + measure_pulse=pulse.zhpulse, + measure_pulse_length=round( + pulse.pulse.duration * NANO_TO_SECONDS, 9 + ), + measure_pulse_parameters=measure_pulse_parameters, + measure_pulse_amplitude=None, + acquire_delay=self.time_of_flight * NANO_TO_SECONDS, + reset_delay=reset_delay, + ) + previous_section = section_uid + + @staticmethod + def play_sweep(exp, channel_name, pulse): + """Play Zurich pulse when a single sweeper is involved.""" + play_parameters = {} + for p, zhs in pulse.zhsweepers: + if p is Parameter.amplitude: + max_value = max(np.abs(zhs.values)) + pulse.zhpulse.amplitude *= max_value + zhs.values /= max_value + play_parameters["amplitude"] = zhs + if p is Parameter.duration: + play_parameters["length"] = zhs + if p is Parameter.relative_phase: + play_parameters["phase"] = zhs + if "phase" not in play_parameters: + play_parameters["phase"] = pulse.pulse.relative_phase + + exp.play(signal=channel_name, pulse=pulse.zhpulse, **play_parameters) + + @staticmethod + def rearrange_rt_sweepers( + sweepers: list[Sweeper], + ) -> tuple[Optional[tuple[int, int]], list[Sweeper]]: + """Rearranges list of real-time sweepers based on hardware limitations. + + The only known limitation currently is that frequency sweepers must be applied before (on the outer loop) other + (e.g. amplitude) sweepers. Consequently, the only thing done here is to swap the frequency sweeper with the + first sweeper in the list. + + Args: + sweepers: Sweepers to rearrange. + + Returns: + swapped_axis_pair: tuple containing indices of the two swapped axes, or None if nothing to rearrange. + sweepers: rearranged (or original, if nothing to rearrange) list of sweepers. + """ + freq_sweeper = next( + iter(s for s in sweepers if s.parameter is Parameter.frequency), None + ) + if freq_sweeper: + sweepers_copy = sweepers.copy() + freq_sweeper_idx = sweepers_copy.index(freq_sweeper) + sweepers_copy[freq_sweeper_idx] = sweepers_copy[0] + sweepers_copy[0] = freq_sweeper + log.warning("Sweepers were reordered") + return (0, freq_sweeper_idx), sweepers_copy + return None, sweepers + + def sweep(self, qubits, couplers, sequence: PulseSequence, options, *sweepers): + """Play pulse and sweepers sequence.""" + + self.signal_map = {} + self.processed_sweeps = ProcessedSweeps(sweepers, qubits) + self.nt_sweeps, self.rt_sweeps = classify_sweepers(sweepers) + swapped_axis_pair, self.rt_sweeps = self.rearrange_rt_sweepers(self.rt_sweeps) + if swapped_axis_pair: + # 1. axes corresponding to NT sweeps appear before axes corresponding to RT sweeps + # 2. in singleshot mode, the first axis contains shots, i.e.: (nshots, sweeper_1, sweeper_2) + axis_offset = len(self.nt_sweeps) + int( + options.averaging_mode is AveragingMode.SINGLESHOT + ) + swapped_axis_pair = tuple(ax + axis_offset for ax in swapped_axis_pair) + + self.frequency_from_pulses(qubits, sequence) + + self.acquisition_type = None + for sweeper in sweepers: + if sweeper.parameter in {Parameter.frequency, Parameter.amplitude}: + for pulse in sweeper.pulses: + if pulse.type is PulseType.READOUT: + self.acquisition_type = lo.AcquisitionType.SPECTROSCOPY + + self.experiment_flow(qubits, couplers, sequence, options) + self.run_exp() + + # Get the results back + results = {} + for qubit in qubits.values(): + q = qubit.name # pylint: disable=C0103 + for i, ropulse in enumerate(self.sequence[measure_channel_name(qubit)]): + data = self.results.get_data(f"sequence{q}_{i}") + + if swapped_axis_pair: + data = np.moveaxis(data, swapped_axis_pair[0], swapped_axis_pair[1]) + if options.acquisition_type is AcquisitionType.DISCRIMINATION: + data = ( + np.ones(data.shape) - data.real + ) # Probability inversion patch + + serial = ropulse.pulse.serial + qubit = ropulse.pulse.qubit + results[serial] = results[qubit] = options.results_type(data) + + return results diff --git a/src/qibolab/instruments/zhinst/pulse.py b/src/qibolab/instruments/zhinst/pulse.py new file mode 100644 index 000000000..44c223ea6 --- /dev/null +++ b/src/qibolab/instruments/zhinst/pulse.py @@ -0,0 +1,106 @@ +"""Wrapper for qibolab and laboneq pulses and sweeps.""" + +from typing import Optional + +import laboneq.simple as lo +import numpy as np +from laboneq.dsl.experiment.pulse_library import ( + sampled_pulse_complex, + sampled_pulse_real, +) + +from qibolab.pulses import Drag, Gaussian, GaussianSquare, Pulse, PulseType, Rectangular +from qibolab.sweeper import Parameter + +from .util import NANO_TO_SECONDS, SAMPLING_RATE + + +def select_pulse(pulse: Pulse): + """Return laboneq pulse object corresponding to the given qibolab pulse.""" + if isinstance(pulse.shape, Rectangular): + can_compress = pulse.type is not PulseType.READOUT + return lo.pulse_library.const( + length=round(pulse.duration * NANO_TO_SECONDS, 9), + amplitude=pulse.amplitude, + can_compress=can_compress, + ) + if isinstance(pulse.shape, Gaussian): + sigma = pulse.shape.rel_sigma + return lo.pulse_library.gaussian( + length=round(pulse.duration * NANO_TO_SECONDS, 9), + amplitude=pulse.amplitude, + sigma=2 / sigma, + zero_boundaries=False, + ) + + if isinstance(pulse.shape, GaussianSquare): + sigma = pulse.shape.rel_sigma + width = pulse.shape.width + can_compress = pulse.type is not PulseType.READOUT + return lo.pulse_library.gaussian_square( + length=round(pulse.duration * NANO_TO_SECONDS, 9), + width=round(pulse.duration * NANO_TO_SECONDS, 9) * width, + amplitude=pulse.amplitude, + can_compress=can_compress, + sigma=2 / sigma, + zero_boundaries=False, + ) + + if isinstance(pulse.shape, Drag): + sigma = pulse.shape.rel_sigma + beta = pulse.shape.beta + return lo.pulse_library.drag( + length=round(pulse.duration * NANO_TO_SECONDS, 9), + amplitude=pulse.amplitude, + sigma=2 / sigma, + beta=beta, + zero_boundaries=False, + ) + + if np.all(pulse.envelope_waveform_q(SAMPLING_RATE).data == 0): + return sampled_pulse_real( + samples=pulse.envelope_waveform_i(SAMPLING_RATE).data, + can_compress=True, + ) + else: + return sampled_pulse_complex( + samples=pulse.envelope_waveform_i(SAMPLING_RATE).data + + (1j * pulse.envelope_waveform_q(SAMPLING_RATE).data), + can_compress=True, + ) + + +class ZhPulse: + """Wrapper data type that holds a qibolab pulse, the corresponding laboneq + pulse object, and any sweeps associated with this pulse.""" + + def __init__(self, pulse): + self.pulse: Pulse = pulse + """Qibolab pulse.""" + self.zhpulse = select_pulse(pulse) + """Laboneq pulse.""" + self.zhsweepers: list[tuple[Parameter, lo.SweepParameter]] = [] + """Parameters to be swept, along with their laboneq sweep parameter + definitions.""" + self.delay_sweeper: Optional[lo.SweepParameter] = None + """Laboneq sweep parameter if the delay of the pulse should be + swept.""" + + # pylint: disable=R0903 + def add_sweeper(self, param: Parameter, sweeper: lo.SweepParameter): + """Add sweeper to list of sweepers associated with this pulse.""" + if param in { + Parameter.amplitude, + Parameter.frequency, + Parameter.duration, + Parameter.relative_phase, + }: + self.zhsweepers.append((param, sweeper)) + elif param is Parameter.start: + if self.delay_sweeper: + raise ValueError( + "Cannot have multiple delay sweepers for a single pulse" + ) + self.delay_sweeper = sweeper + else: + raise ValueError(f"Sweeping {param} is not supported") diff --git a/src/qibolab/instruments/zhinst/sweep.py b/src/qibolab/instruments/zhinst/sweep.py new file mode 100644 index 000000000..d2371c79e --- /dev/null +++ b/src/qibolab/instruments/zhinst/sweep.py @@ -0,0 +1,164 @@ +"""Pre-execution processing of sweeps.""" + +from collections.abc import Iterable +from copy import copy + +import laboneq.simple as lo +import numpy as np + +from qibolab.pulses import Pulse, PulseType +from qibolab.qubits import Qubit +from qibolab.sweeper import Parameter, Sweeper + +from .util import NANO_TO_SECONDS, measure_channel_name + + +def classify_sweepers( + sweepers: Iterable[Sweeper], +) -> tuple[list[Sweeper], list[Sweeper]]: + """Divide sweepers into two lists: 1. sweeps that can be done in the laboneq near-time sweep loop, 2. sweeps that + can be done in real-time (i.e. on hardware)""" + nt_sweepers, rt_sweepers = [], [] + for sweeper in sweepers: + if sweeper.parameter is Parameter.bias or ( + sweeper.parameter is Parameter.amplitude + and sweeper.pulses[0].type is PulseType.READOUT + ): + nt_sweepers.append(sweeper) + else: + rt_sweepers.append(sweeper) + return nt_sweepers, rt_sweepers + + +class ProcessedSweeps: + """Data type that centralizes and allows extracting information about given + sweeps. + + In laboneq, sweeps are represented with the help of SweepParameter + instances. When adding pulses to a laboneq experiment, some + properties can be set to be an instance of SweepParameter instead of + a fixed numeric value. In case of channel property sweeps, either + the relevant calibration property or the instrument node directly + can be set ot a SweepParameter instance. Parts of the laboneq + experiment that define the sweep loops refer to SweepParameter + instances as well. These should be linkable to instances that are + either set to a pulse property, a channel calibration or instrument + node. To achieve this, we use the exact same SweepParameter instance + in both places. This class takes care of creating these + SweepParameter instances and giving access to them in a consistent + way (i.e. whenever they need to be the same instance they will be + the same instance). When constructing sweep loops you may ask from + this class to provide all the SweepParameter instances related to a + given qibolab Sweeper (parallel sweeps). Later, when adding pulses + or setting channel properties, you may ask from this class to + provide all SweepParameter instances related to a given pulse or + channel, and you will get parameters that are linkable to the ones + in the sweep loop definition + """ + + def __init__(self, sweepers: Iterable[Sweeper], qubits: dict[str, Qubit]): + pulse_sweeps = [] + channel_sweeps = [] + parallel_sweeps = [] + for sweeper in sweepers: + for pulse in sweeper.pulses or []: + if sweeper.parameter in (Parameter.duration, Parameter.start): + sweep_param = lo.SweepParameter( + values=sweeper.values * NANO_TO_SECONDS + ) + pulse_sweeps.append((pulse, sweeper.parameter, sweep_param)) + elif sweeper.parameter is Parameter.frequency: + ptype, qubit = pulse.type, qubits[pulse.qubit] + if ptype is PulseType.READOUT: + ch = measure_channel_name(qubit) + intermediate_frequency = ( + qubit.readout_frequency + - qubit.readout.local_oscillator.frequency + ) + elif ptype is PulseType.DRIVE: + ch = qubit.drive.name + intermediate_frequency = ( + qubit.drive_frequency + - qubit.drive.local_oscillator.frequency + ) + else: + raise ValueError( + f"Cannot sweep frequency of pulse of type {ptype}, because it does not have associated frequency" + ) + sweep_param = lo.SweepParameter( + values=sweeper.values + intermediate_frequency + ) + channel_sweeps.append((ch, sweeper.parameter, sweep_param)) + elif ( + pulse.type is PulseType.READOUT + and sweeper.parameter is Parameter.amplitude + ): + max_value = max(np.abs(sweeper.values)) + sweep_param = lo.SweepParameter(values=sweeper.values / max_value) + # FIXME: this implicitly relies on the fact that pulse is the same python object as appears in the + # sequence that is being executed, hence the mutation is propagated. This is bad programming and + # should be fixed once things become simpler + pulse.amplitude *= max_value + + channel_sweeps.append( + ( + measure_channel_name(qubits[pulse.qubit]), + sweeper.parameter, + sweep_param, + ) + ) + else: + sweep_param = lo.SweepParameter(values=copy(sweeper.values)) + pulse_sweeps.append((pulse, sweeper.parameter, sweep_param)) + parallel_sweeps.append((sweeper, sweep_param)) + + for qubit in sweeper.qubits or []: + if sweeper.parameter is not Parameter.bias: + raise ValueError( + f"Sweeping {sweeper.parameter.name} for {qubit} is not supported" + ) + sweep_param = lo.SweepParameter( + values=sweeper.values + qubit.flux.offset + ) + channel_sweeps.append((qubit.flux.name, sweeper.parameter, sweep_param)) + parallel_sweeps.append((sweeper, sweep_param)) + + for coupler in sweeper.couplers or []: + if sweeper.parameter is not Parameter.bias: + raise ValueError( + f"Sweeping {sweeper.parameter.name} for {coupler} is not supported" + ) + sweep_param = lo.SweepParameter( + values=sweeper.values + coupler.flux.offset + ) + channel_sweeps.append( + (coupler.flux.name, sweeper.parameter, sweep_param) + ) + parallel_sweeps.append((sweeper, sweep_param)) + + self._pulse_sweeps = pulse_sweeps + self._channel_sweeps = channel_sweeps + self._parallel_sweeps = parallel_sweeps + + def sweeps_for_pulse( + self, pulse: Pulse + ) -> list[tuple[Parameter, lo.SweepParameter]]: + return [item[1:] for item in self._pulse_sweeps if item[0] == pulse] + + def sweeps_for_channel(self, ch: str) -> list[tuple[Parameter, lo.SweepParameter]]: + return [item[1:] for item in self._channel_sweeps if item[0] == ch] + + def sweeps_for_sweeper(self, sweeper: Sweeper) -> list[lo.SweepParameter]: + return [item[1] for item in self._parallel_sweeps if item[0] == sweeper] + + def channel_sweeps_for_sweeper( + self, sweeper: Sweeper + ) -> list[tuple[str, Parameter, lo.SweepParameter]]: + return [ + item + for item in self._channel_sweeps + if item[2] in self.sweeps_for_sweeper(sweeper) + ] + + def channels_with_sweeps(self) -> set[str]: + return {ch for ch, _, _ in self._channel_sweeps} diff --git a/src/qibolab/instruments/zhinst/util.py b/src/qibolab/instruments/zhinst/util.py new file mode 100644 index 000000000..8ca884306 --- /dev/null +++ b/src/qibolab/instruments/zhinst/util.py @@ -0,0 +1,24 @@ +"""Utility methods.""" + +from qibolab.qubits import Qubit + +SAMPLING_RATE = 2 +NANO_TO_SECONDS = 1e-9 + + +def measure_channel_name(qubit: Qubit) -> str: + """Construct and return a name for qubit's measure channel. + + FIXME: We cannot use channel name directly, because currently channels are named after wires, and due to multiplexed readout + multiple qubits have the same channel name for their readout. Should be fixed once channels are refactored. + """ + return f"{qubit.readout.name}_{qubit.name}" + + +def acquire_channel_name(qubit: Qubit) -> str: + """Construct and return a name for qubit's acquire channel. + + FIXME: We cannot use acquire channel name, because qibolab does not have a concept of acquire channel. This function shall be removed + once all channel refactoring is done. + """ + return f"acquire{qubit.name}" diff --git a/tests/dummy_qrc/zurich/platform.py b/tests/dummy_qrc/zurich/platform.py index e74c11672..3014bbc22 100644 --- a/tests/dummy_qrc/zurich/platform.py +++ b/tests/dummy_qrc/zurich/platform.py @@ -86,7 +86,6 @@ def create(path: pathlib.Path = FOLDER): controller = Zurich( "EL_ZURO", device_setup=device_setup, - use_emulation=False, time_of_flight=75, smearing=50, ) diff --git a/tests/test_instruments_zhinst.py b/tests/test_instruments_zhinst.py index c94fe8135..bbea85ecb 100644 --- a/tests/test_instruments_zhinst.py +++ b/tests/test_instruments_zhinst.py @@ -1,11 +1,20 @@ import math -import re +from collections import defaultdict +import laboneq.dsl.experiment.pulse as laboneq_pulse +import laboneq.simple as lo import numpy as np import pytest from qibolab import AcquisitionType, AveragingMode, ExecutionParameters, create_platform -from qibolab.instruments.zhinst import ZhPulse, ZhSweeperLine, Zurich +from qibolab.instruments.zhinst import ( + ProcessedSweeps, + ZhPulse, + Zurich, + acquire_channel_name, + classify_sweepers, + measure_channel_name, +) from qibolab.pulses import ( IIR, SNZ, @@ -15,31 +24,25 @@ Gaussian, Pulse, PulseSequence, + PulseType, ReadoutPulse, Rectangular, ) -from qibolab.sweeper import Parameter, Sweeper, SweeperType +from qibolab.sweeper import Parameter, Sweeper from qibolab.unrolling import batch from .conftest import get_instrument @pytest.mark.parametrize( - "shape", ["Rectangular", "Gaussian", "GaussianSquare", "Drag", "SNZ", "IIR"] -) -def test_zhpulse(shape): - if shape == "Rectangular": - pulse = Pulse(0, 40, 0.05, int(3e9), 0.0, Rectangular(), "ch0", qubit=0) - if shape == "Gaussian": - pulse = Pulse(0, 40, 0.05, int(3e9), 0.0, Gaussian(5), "ch0", qubit=0) - if shape == "GaussianSquare": - pulse = Pulse(0, 40, 0.05, int(3e9), 0.0, Gaussian(5), "ch0", qubit=0) - if shape == "Drag": - pulse = Pulse(0, 40, 0.05, int(3e9), 0.0, Drag(5, 0.4), "ch0", qubit=0) - if shape == "SNZ": - pulse = Pulse(0, 40, 0.05, int(3e9), 0.0, SNZ(10, 0.01), "ch0", qubit=0) - if shape == "IIR": - pulse = Pulse( + "pulse", + [ + Pulse(0, 40, 0.05, int(3e9), 0.0, Rectangular(), "ch0", qubit=0), + Pulse(0, 40, 0.05, int(3e9), 0.0, Gaussian(5), "ch0", qubit=0), + Pulse(0, 40, 0.05, int(3e9), 0.0, Gaussian(5), "ch0", qubit=0), + Pulse(0, 40, 0.05, int(3e9), 0.0, Drag(5, 0.4), "ch0", qubit=0), + Pulse(0, 40, 0.05, int(3e9), 0.0, SNZ(10, 0.01), "ch0", qubit=0), + Pulse( 0, 40, 0.05, @@ -48,39 +51,198 @@ def test_zhpulse(shape): IIR([10, 1], [0.4, 1], target=Gaussian(5)), "ch0", qubit=0, - ) + ), + ], +) +def test_zhpulse_pulse_conversion(pulse): + shape = pulse.shape + zhpulse = ZhPulse(pulse).zhpulse + assert isinstance(zhpulse, laboneq_pulse.Pulse) + if isinstance(shape, (SNZ, IIR)): + assert len(zhpulse.samples) == 80 + else: + assert zhpulse.length == 40e-9 + +def test_zhpulse_add_sweeper(): + pulse = Pulse(0, 40, 0.05, int(3e9), 0.0, Gaussian(5), "ch", qubit=0) zhpulse = ZhPulse(pulse) - assert zhpulse.pulse.serial == pulse.serial - if shape == "SNZ" or shape == "IIR": - assert len(zhpulse.zhpulse.samples) == 80 - else: - assert zhpulse.zhpulse.length == 40e-9 + assert zhpulse.zhsweepers == [] + assert zhpulse.delay_sweeper is None + zhpulse.add_sweeper( + Parameter.duration, lo.SweepParameter(values=np.array([1, 2, 3])) + ) + assert len(zhpulse.zhsweepers) == 1 + assert zhpulse.delay_sweeper is None -@pytest.mark.parametrize("parameter", [Parameter.bias, Parameter.start]) -def test_select_sweeper(dummy_qrc, parameter): - swept_points = 5 + zhpulse.add_sweeper( + Parameter.start, lo.SweepParameter(values=np.array([4, 5, 6, 7])) + ) + assert len(zhpulse.zhsweepers) == 1 + assert zhpulse.delay_sweeper is not None + + zhpulse.add_sweeper( + Parameter.amplitude, lo.SweepParameter(values=np.array([3, 2, 1, 0])) + ) + assert len(zhpulse.zhsweepers) == 2 + assert zhpulse.delay_sweeper is not None + + +def test_measure_channel_name(dummy_qrc): platform = create_platform("zurich") - qubits = {0: platform.qubits[0]} - sequence = PulseSequence() - ro_pulses = {} - qd_pulses = {} - for qubit in qubits.values(): - q = qubit.name - qd_pulses[q] = platform.create_RX_pulse(q, start=0) - sequence.add(qd_pulses[q]) - ro_pulses[q] = platform.create_qubit_readout_pulse(q, start=qd_pulses[q].finish) - sequence.add(ro_pulses[q]) + qubits = platform.qubits.values() + meas_ch_names = {measure_channel_name(q) for q in qubits} + assert len(qubits) > 0 + assert len(meas_ch_names) == len(qubits) + + +def test_acquire_channel_name(dummy_qrc): + platform = create_platform("zurich") + qubits = platform.qubits.values() + acq_ch_names = {acquire_channel_name(q) for q in qubits} + assert len(qubits) > 0 + assert len(acq_ch_names) == len(qubits) + + +def test_classify_sweepers(dummy_qrc): + platform = create_platform("zurich") + qubit_id, qubit = 0, platform.qubits[0] + pulse_1 = Pulse(0, 40, 0.05, int(3e9), 0.0, Gaussian(5), "ch0", qubit=qubit_id) + pulse_2 = Pulse( + 0, + 40, + 0.05, + int(3e9), + 0.0, + Rectangular(), + "ch7", + PulseType.READOUT, + qubit=qubit_id, + ) + amplitude_sweeper = Sweeper(Parameter.amplitude, np.array([1, 2, 3]), [pulse_1]) + readout_amplitude_sweeper = Sweeper( + Parameter.amplitude, np.array([1, 2, 3, 4, 5]), [pulse_2] + ) + freq_sweeper = Sweeper(Parameter.frequency, np.array([4, 5, 6, 7]), [pulse_1]) + bias_sweeper = Sweeper(Parameter.bias, np.array([3, 2, 1]), qubits=[qubit]) + nt_sweeps, rt_sweeps = classify_sweepers( + [amplitude_sweeper, readout_amplitude_sweeper, bias_sweeper, freq_sweeper] + ) - parameter_range = np.random.randint(swept_points, size=swept_points) - if parameter is Parameter.start: - sweeper = Sweeper(parameter, parameter_range, pulses=[qd_pulses[q]]) - if parameter is Parameter.bias: - sweeper = Sweeper(parameter, parameter_range, qubits=q) + assert amplitude_sweeper in rt_sweeps + assert freq_sweeper in rt_sweeps + assert bias_sweeper in nt_sweeps + assert readout_amplitude_sweeper in nt_sweeps - ZhSweeper = ZhSweeperLine(sweeper, qubit, sequence) - assert ZhSweeper.sweeper == sweeper + +def test_processed_sweeps_pulse_properties(dummy_qrc): + platform = create_platform("zurich") + qubit_id_1, qubit_1 = 0, platform.qubits[0] + qubit_id_2, qubit_2 = 3, platform.qubits[3] + pulse_1 = Pulse( + 0, 40, 0.05, int(3e9), 0.0, Gaussian(5), qubit_1.drive.name, qubit=qubit_id_1 + ) + pulse_2 = Pulse( + 0, 40, 0.05, int(3e9), 0.0, Gaussian(5), qubit_2.drive.name, qubit=qubit_id_2 + ) + sweeper_amplitude = Sweeper( + Parameter.amplitude, np.array([1, 2, 3]), [pulse_1, pulse_2] + ) + sweeper_duration = Sweeper(Parameter.duration, np.array([1, 2, 3, 4]), [pulse_2]) + processed_sweeps = ProcessedSweeps( + [sweeper_duration, sweeper_amplitude], qubits=platform.qubits + ) + + assert len(processed_sweeps.sweeps_for_pulse(pulse_1)) == 1 + assert processed_sweeps.sweeps_for_pulse(pulse_1)[0][0] == Parameter.amplitude + assert isinstance( + processed_sweeps.sweeps_for_pulse(pulse_1)[0][1], lo.SweepParameter + ) + assert len(processed_sweeps.sweeps_for_pulse(pulse_2)) == 2 + + assert len(processed_sweeps.sweeps_for_sweeper(sweeper_amplitude)) == 2 + parallel_sweep_ids = { + s.uid for s in processed_sweeps.sweeps_for_sweeper(sweeper_amplitude) + } + assert len(parallel_sweep_ids) == 2 + assert processed_sweeps.sweeps_for_pulse(pulse_1)[0][1].uid in parallel_sweep_ids + assert any( + s.uid in parallel_sweep_ids + for _, s in processed_sweeps.sweeps_for_pulse(pulse_2) + ) + + assert len(processed_sweeps.sweeps_for_sweeper(sweeper_duration)) == 1 + pulse_2_sweep_ids = {s.uid for _, s in processed_sweeps.sweeps_for_pulse(pulse_2)} + assert len(pulse_2_sweep_ids) == 2 + assert ( + processed_sweeps.sweeps_for_sweeper(sweeper_duration)[0].uid + in pulse_2_sweep_ids + ) + + assert processed_sweeps.channels_with_sweeps() == set() + + +def test_processed_sweeps_frequency(dummy_qrc): + platform = create_platform("zurich") + qubit_id, qubit = 1, platform.qubits[1] + pulse = Pulse( + 0, 40, 0.05, int(3e9), 0.0, Gaussian(5), qubit.drive.name, qubit=qubit_id + ) + freq_sweeper = Sweeper(Parameter.frequency, np.array([1, 2, 3]), [pulse]) + processed_sweeps = ProcessedSweeps([freq_sweeper], platform.qubits) + + # Frequency sweepers should result into channel property sweeps + assert len(processed_sweeps.sweeps_for_pulse(pulse)) == 0 + assert processed_sweeps.channels_with_sweeps() == {qubit.drive.name} + assert len(processed_sweeps.sweeps_for_channel(qubit.drive.name)) == 1 + + with pytest.raises(ValueError): + flux_pulse = Pulse( + 0, + 40, + 0.05, + int(3e9), + 0.0, + Gaussian(5), + qubit.flux.name, + PulseType.FLUX, + qubit=qubit_id, + ) + freq_sweeper = Sweeper( + Parameter.frequency, np.array([1, 3, 5, 7]), [flux_pulse] + ) + ProcessedSweeps([freq_sweeper], platform.qubits) + + +def test_processed_sweeps_readout_amplitude(dummy_qrc): + platform = create_platform("zurich") + qubit_id, qubit = 0, platform.qubits[0] + readout_ch = measure_channel_name(qubit) + pulse_readout = Pulse( + 0, + 40, + 0.05, + int(3e9), + 0.0, + Rectangular(), + readout_ch, + PulseType.READOUT, + qubit_id, + ) + readout_amplitude_sweeper = Sweeper( + Parameter.amplitude, np.array([1, 2, 3, 4]), [pulse_readout] + ) + processed_sweeps = ProcessedSweeps( + [readout_amplitude_sweeper], qubits=platform.qubits + ) + + # Readout amplitude should result into channel property (gain) sweep + assert len(processed_sweeps.sweeps_for_pulse(pulse_readout)) == 0 + assert processed_sweeps.channels_with_sweeps() == { + readout_ch, + } + assert len(processed_sweeps.sweeps_for_channel(readout_ch)) == 1 def test_zhinst_setup(dummy_qrc): @@ -90,110 +252,84 @@ def test_zhinst_setup(dummy_qrc): def test_zhsequence(dummy_qrc): - qd_pulse = Pulse(0, 40, 0.05, int(3e9), 0.0, Rectangular(), "ch0", qubit=0) - ro_pulse = ReadoutPulse(0, 40, 0.05, int(3e9), 0.0, Rectangular(), "ch1", qubit=0) - sequence = PulseSequence() - sequence.add(qd_pulse) - sequence.add(ro_pulse) IQM5q = create_platform("zurich") controller = IQM5q.instruments["EL_ZURO"] - controller.sequence_zh(sequence, IQM5q.qubits, IQM5q.couplers) - zhsequence = controller.sequence - - with pytest.raises(AttributeError): - controller.sequence_zh("sequence", IQM5q.qubits, IQM5q.couplers) - zhsequence = controller.sequence - - assert len(zhsequence) == 2 - assert len(zhsequence["readout0"]) == 1 - - -def test_zhsequence_couplers(dummy_qrc): - qd_pulse = Pulse(0, 40, 0.05, int(3e9), 0.0, Rectangular(), "ch0", qubit=0) - ro_pulse = ReadoutPulse(0, 40, 0.05, int(3e9), 0.0, Rectangular(), "ch1", qubit=0) - qc_pulse = CouplerFluxPulse(0, 40, 0.05, Rectangular(), "ch_c0", qubit=3) + drive_channel, readout_channel = IQM5q.qubits[0].drive.name, measure_channel_name( + IQM5q.qubits[0] + ) + qd_pulse = Pulse(0, 40, 0.05, int(3e9), 0.0, Rectangular(), drive_channel, qubit=0) + ro_pulse = ReadoutPulse( + 0, 40, 0.05, int(3e9), 0.0, Rectangular(), readout_channel, qubit=0 + ) sequence = PulseSequence() sequence.add(qd_pulse) + sequence.add(qd_pulse) sequence.add(ro_pulse) - sequence.add(qc_pulse) - IQM5q = create_platform("zurich") - controller = IQM5q.instruments["EL_ZURO"] - controller.sequence_zh(sequence, IQM5q.qubits, IQM5q.couplers) - zhsequence = controller.sequence + zhsequence = controller.sequence_zh(sequence, IQM5q.qubits) - with pytest.raises(AttributeError): - controller.sequence_zh("sequence", IQM5q.qubits, IQM5q.couplers) - zhsequence = controller.sequence + assert len(zhsequence) == 2 + assert len(zhsequence[drive_channel]) == 2 + assert len(zhsequence[readout_channel]) == 1 - assert len(zhsequence) == 3 - assert len(zhsequence["readout0"]) == 1 - assert len(zhsequence["couplerflux3"]) == 1 + with pytest.raises(AttributeError): + controller.sequence_zh("sequence", IQM5q.qubits) -def test_zhsequence_couplers_sweeper(dummy_qrc): - ro_pulse = ReadoutPulse(0, 40, 0.05, int(3e9), 0.0, Rectangular(), "ch1", qubit=0) - sequence = PulseSequence() - sequence.add(ro_pulse) +def test_zhsequence_couplers(dummy_qrc): IQM5q = create_platform("zurich") controller = IQM5q.instruments["EL_ZURO"] - delta_bias_range = np.arange(-1, 1, 0.5) - - sweeper = Sweeper( - Parameter.amplitude, - delta_bias_range, - pulses=[ - CouplerFluxPulse( - start=0, - duration=sequence.duration + sequence.start, - amplitude=1, - shape="Rectangular", - qubit=IQM5q.couplers[0].name, - ) - ], - type=SweeperType.ABSOLUTE, + drive_channel, readout_channel = IQM5q.qubits[0].drive.name, measure_channel_name( + IQM5q.qubits[0] ) + couplerflux_channel = IQM5q.couplers[0].flux.name + qd_pulse = Pulse(0, 40, 0.05, int(3e9), 0.0, Rectangular(), drive_channel, qubit=0) + ro_pulse = ReadoutPulse( + 0, 40, 0.05, int(3e9), 0.0, Rectangular(), readout_channel, qubit=0 + ) + qc_pulse = CouplerFluxPulse( + 0, 40, 0.05, Rectangular(), couplerflux_channel, qubit=3 + ) + sequence = PulseSequence() + sequence.add(qd_pulse) + sequence.add(ro_pulse) + sequence.add(qc_pulse) - controller.sweepers = [sweeper] - controller.sequence_zh(sequence, IQM5q.qubits, IQM5q.couplers) - zhsequence = controller.sequence - - with pytest.raises(AttributeError): - controller.sequence_zh("sequence", IQM5q.qubits, IQM5q.couplers) - zhsequence = controller.sequence + zhsequence = controller.sequence_zh(sequence, IQM5q.qubits) - assert len(zhsequence) == 2 - assert len(zhsequence["readout0"]) == 1 - assert len(zhsequence["couplerflux0"]) == 0 # is it correct? + assert len(zhsequence) == 3 + assert len(zhsequence[couplerflux_channel]) == 1 def test_zhsequence_multiple_ro(dummy_qrc): + platform = create_platform("zurich") + readout_channel = measure_channel_name(platform.qubits[0]) sequence = PulseSequence() qd_pulse = Pulse(0, 40, 0.05, int(3e9), 0.0, Rectangular(), "ch0", qubit=0) sequence.add(qd_pulse) - ro_pulse = ReadoutPulse(0, 40, 0.05, int(3e9), 0.0, Rectangular(), "ch1", qubit=0) + ro_pulse = ReadoutPulse( + 0, 40, 0.05, int(3e9), 0.0, Rectangular(), readout_channel, qubit=0 + ) sequence.add(ro_pulse) - ro_pulse = ReadoutPulse(0, 5000, 0.05, int(3e9), 0.0, Rectangular(), "ch1", qubit=0) + ro_pulse = ReadoutPulse( + 0, 5000, 0.05, int(3e9), 0.0, Rectangular(), readout_channel, qubit=0 + ) sequence.add(ro_pulse) platform = create_platform("zurich") controller = platform.instruments["EL_ZURO"] - controller.sequence_zh(sequence, platform.qubits, platform.couplers) - zhsequence = controller.sequence - - with pytest.raises(AttributeError): - controller.sequence_zh("sequence", platform.qubits, platform.couplers) - zhsequence = controller.sequence + zhsequence = controller.sequence_zh(sequence, platform.qubits) assert len(zhsequence) == 2 - assert len(zhsequence["readout0"]) == 2 + assert len(zhsequence[readout_channel]) == 2 def test_zhinst_register_readout_line(dummy_qrc): platform = create_platform("zurich") IQM5q = platform.instruments["EL_ZURO"] + qubit = platform.qubits[0] options = ExecutionParameters( relaxation_time=300e-6, @@ -201,12 +337,10 @@ def test_zhinst_register_readout_line(dummy_qrc): averaging_mode=AveragingMode.CYCLIC, ) - IQM5q.register_readout_line( - platform.qubits[0], intermediate_frequency=int(1e6), options=options - ) + IQM5q.register_readout_line(qubit, intermediate_frequency=int(1e6), options=options) - assert "measure0" in IQM5q.signal_map - assert "acquire0" in IQM5q.signal_map + assert measure_channel_name(qubit) in IQM5q.signal_map + assert acquire_channel_name(qubit) in IQM5q.signal_map assert ( "/logical_signal_groups/q0/measure_line" in IQM5q.calibration.calibration_items ) @@ -215,22 +349,24 @@ def test_zhinst_register_readout_line(dummy_qrc): def test_zhinst_register_drive_line(dummy_qrc): platform = create_platform("zurich") IQM5q = platform.instruments["EL_ZURO"] - IQM5q.register_drive_line(platform.qubits[0], intermediate_frequency=int(1e6)) + qubit = platform.qubits[0] + IQM5q.register_drive_line(qubit, intermediate_frequency=int(1e6)) - assert "drive0" in IQM5q.signal_map + assert qubit.drive.name in IQM5q.signal_map assert "/logical_signal_groups/q0/drive_line" in IQM5q.calibration.calibration_items def test_zhinst_register_flux_line(dummy_qrc): platform = create_platform("zurich") IQM5q = platform.instruments["EL_ZURO"] - IQM5q.register_flux_line(platform.qubits[0]) + qubit = platform.qubits[0] + IQM5q.register_flux_line(qubit) - assert "flux0" in IQM5q.signal_map + assert qubit.flux.name in IQM5q.signal_map assert "/logical_signal_groups/q0/flux_line" in IQM5q.calibration.calibration_items -def test_experiment_execute_pulse_sequence(dummy_qrc): +def test_experiment_flow(dummy_qrc): platform = create_platform("zurich") IQM5q = platform.instruments["EL_ZURO"] @@ -263,12 +399,12 @@ def test_experiment_execute_pulse_sequence(dummy_qrc): IQM5q.experiment_flow(qubits, couplers, sequence, options) - assert "flux0" in IQM5q.experiment.signals - assert "measure0" in IQM5q.experiment.signals - assert "acquire0" in IQM5q.experiment.signals + assert qubits[0].flux.name in IQM5q.experiment.signals + assert measure_channel_name(qubits[0]) in IQM5q.experiment.signals + assert acquire_channel_name(qubits[0]) in IQM5q.experiment.signals -def test_experiment_execute_pulse_sequence_coupler(dummy_qrc): +def test_experiment_flow_coupler(dummy_qrc): platform = create_platform("zurich") IQM5q = platform.instruments["EL_ZURO"] @@ -315,89 +451,62 @@ def test_experiment_execute_pulse_sequence_coupler(dummy_qrc): IQM5q.experiment_flow(qubits, couplers, sequence, options) - assert "flux0" in IQM5q.experiment.signals - assert "measure0" in IQM5q.experiment.signals - assert "acquire0" in IQM5q.experiment.signals - - -def test_experiment_fast_reset_readout(dummy_qrc): - platform = create_platform("zurich") - IQM5q = platform.instruments["EL_ZURO"] - - sequence = PulseSequence() - qubits = {0: platform.qubits[0]} - couplers = {} - platform.qubits = qubits - - ro_pulses = {} - fr_pulses = {} - for qubit in qubits: - fr_pulses[qubit] = platform.create_RX_pulse(qubit, start=0) - ro_pulses[qubit] = platform.create_qubit_readout_pulse(qubit, start=0) - sequence.add(ro_pulses[qubit]) - - options = ExecutionParameters( - relaxation_time=300e-6, - fast_reset=fr_pulses, - acquisition_type=AcquisitionType.INTEGRATION, - averaging_mode=AveragingMode.CYCLIC, - ) - - IQM5q.experiment_flow(qubits, couplers, sequence, options) - - assert "drive0" in IQM5q.experiment.signals - assert "measure0" in IQM5q.experiment.signals - assert "acquire0" in IQM5q.experiment.signals + assert qubits[0].flux.name in IQM5q.experiment.signals + assert measure_channel_name(qubits[0]) in IQM5q.experiment.signals + assert acquire_channel_name(qubits[0]) in IQM5q.experiment.signals -@pytest.mark.parametrize("fast_reset", [True, False]) -def test_experiment_execute_pulse_sequence(dummy_qrc, fast_reset): +def test_sweep_and_play_sim(dummy_qrc): + """Test end-to-end experiment run using ZI emulated connection.""" platform = create_platform("zurich") IQM5q = platform.instruments["EL_ZURO"] sequence = PulseSequence() - qubits = {0: platform.qubits[0]} + qubits = {0: platform.qubits[0], 2: platform.qubits[2]} platform.qubits = qubits + couplers = {} ro_pulses = {} - qd_pulses = {} qf_pulses = {} - fr_pulses = {} - for qubit in qubits: - if fast_reset: - fr_pulses[qubit] = platform.create_RX_pulse(qubit, start=0) - qd_pulses[qubit] = platform.create_RX_pulse(qubit, start=0) - sequence.add(qd_pulses[qubit]) - ro_pulses[qubit] = platform.create_qubit_readout_pulse( - qubit, start=qd_pulses[qubit].finish - ) - sequence.add(ro_pulses[qubit]) - qf_pulses[qubit] = FluxPulse( + for qubit in qubits.values(): + q = qubit.name + qf_pulses[q] = FluxPulse( start=0, - duration=ro_pulses[qubit].start, + duration=500, amplitude=1, shape=Rectangular(), - channel=platform.qubits[qubit].flux.name, - qubit=qubit, + channel=platform.qubits[q].flux.name, + qubit=q, ) - sequence.add(qf_pulses[qubit]) - - if fast_reset: - fast_reset = fr_pulses + sequence.add(qf_pulses[q]) + ro_pulses[q] = platform.create_qubit_readout_pulse(q, start=qf_pulses[q].finish) + sequence.add(ro_pulses[q]) options = ExecutionParameters( relaxation_time=300e-6, - fast_reset=fast_reset, acquisition_type=AcquisitionType.INTEGRATION, averaging_mode=AveragingMode.CYCLIC, + nshots=12, ) - IQM5q.experiment_flow(qubits, sequence, options) + # check play + IQM5q.session = lo.Session(IQM5q.device_setup) + IQM5q.session.connect(do_emulation=True) + res = IQM5q.play(qubits, couplers, sequence, options) + assert res is not None + assert all(qubit in res for qubit in qubits) - assert "drive0" in IQM5q.experiment.signals - assert "flux0" in IQM5q.experiment.signals - assert "measure0" in IQM5q.experiment.signals - assert "acquire0" in IQM5q.experiment.signals + # check sweep with empty list of sweeps + res = IQM5q.sweep(qubits, couplers, sequence, options) + assert res is not None + assert all(qubit in res for qubit in qubits) + + # check sweep with sweeps + sweep_1 = Sweeper(Parameter.start, np.array([1, 2, 3, 4]), list(qf_pulses.values())) + sweep_2 = Sweeper(Parameter.bias, np.array([1, 2, 3]), qubits=[qubits[0]]) + res = IQM5q.sweep(qubits, couplers, sequence, options, sweep_1, sweep_2) + assert res is not None + assert all(qubit in res for qubit in qubits) @pytest.mark.parametrize("parameter1", [Parameter.start, Parameter.duration]) @@ -405,7 +514,6 @@ def test_experiment_sweep_single(dummy_qrc, parameter1): platform = create_platform("zurich") IQM5q = platform.instruments["EL_ZURO"] - sequence = PulseSequence() qubits = {0: platform.qubits[0]} couplers = {} @@ -436,13 +544,11 @@ def test_experiment_sweep_single(dummy_qrc, parameter1): averaging_mode=AveragingMode.CYCLIC, ) - IQM5q.sweepers = sweepers - IQM5q.experiment_flow(qubits, couplers, sequence, options) - assert "drive0" in IQM5q.experiment.signals - assert "measure0" in IQM5q.experiment.signals - assert "acquire0" in IQM5q.experiment.signals + assert qubits[0].drive.name in IQM5q.experiment.signals + assert measure_channel_name(qubits[0]) in IQM5q.experiment.signals + assert acquire_channel_name(qubits[0]) in IQM5q.experiment.signals @pytest.mark.parametrize("parameter1", [Parameter.start, Parameter.duration]) @@ -450,7 +556,6 @@ def test_experiment_sweep_single_coupler(dummy_qrc, parameter1): platform = create_platform("zurich") IQM5q = platform.instruments["EL_ZURO"] - sequence = PulseSequence() qubits = {0: platform.qubits[0], 2: platform.qubits[2]} couplers = {0: platform.couplers[0]} @@ -494,14 +599,12 @@ def test_experiment_sweep_single_coupler(dummy_qrc, parameter1): averaging_mode=AveragingMode.CYCLIC, ) - IQM5q.sweepers = sweepers - IQM5q.experiment_flow(qubits, couplers, sequence, options) - assert "couplerflux0" in IQM5q.experiment.signals - assert "drive0" in IQM5q.experiment.signals - assert "measure0" in IQM5q.experiment.signals - assert "acquire0" in IQM5q.experiment.signals + assert couplers[0].flux.name in IQM5q.experiment.signals + assert qubits[0].drive.name in IQM5q.experiment.signals + assert measure_channel_name(qubits[0]) in IQM5q.experiment.signals + assert acquire_channel_name(qubits[0]) in IQM5q.experiment.signals SweeperParameter = { @@ -519,7 +622,6 @@ def test_experiment_sweep_2d_general(dummy_qrc, parameter1, parameter2): platform = create_platform("zurich") IQM5q = platform.instruments["EL_ZURO"] - sequence = PulseSequence() qubits = {0: platform.qubits[0]} couplers = {} @@ -566,21 +668,17 @@ def test_experiment_sweep_2d_general(dummy_qrc, parameter1, parameter2): averaging_mode=AveragingMode.CYCLIC, ) - IQM5q.sweepers = sweepers - rearranging_axes, sweepers = IQM5q.rearrange_sweepers(sweepers) - IQM5q.sweepers = sweepers # to be changed IQM5q.experiment_flow(qubits, couplers, sequence, options) - assert "drive0" in IQM5q.experiment.signals - assert "measure0" in IQM5q.experiment.signals - assert "acquire0" in IQM5q.experiment.signals + assert qubits[0].drive.name in IQM5q.experiment.signals + assert measure_channel_name(qubits[0]) in IQM5q.experiment.signals + assert acquire_channel_name(qubits[0]) in IQM5q.experiment.signals def test_experiment_sweep_2d_specific(dummy_qrc): platform = create_platform("zurich") IQM5q = platform.instruments["EL_ZURO"] - sequence = PulseSequence() qubits = {0: platform.qubits[0]} couplers = {} @@ -621,15 +719,11 @@ def test_experiment_sweep_2d_specific(dummy_qrc): averaging_mode=AveragingMode.CYCLIC, ) - IQM5q.sweepers = sweepers - rearranging_axes, sweepers = IQM5q.rearrange_sweepers(sweepers) - IQM5q.sweepers = sweepers # to be changed IQM5q.experiment_flow(qubits, couplers, sequence, options) - assert "drive0" in IQM5q.experiment.signals - assert "measure0" in IQM5q.experiment.signals - assert "acquire0" in IQM5q.experiment.signals - assert rearranging_axes != [[], []] + assert qubits[0].drive.name in IQM5q.experiment.signals + assert measure_channel_name(qubits[0]) in IQM5q.experiment.signals + assert acquire_channel_name(qubits[0]) in IQM5q.experiment.signals @pytest.mark.parametrize( @@ -639,7 +733,6 @@ def test_experiment_sweep_punchouts(dummy_qrc, parameter): platform = create_platform("zurich") IQM5q = platform.instruments["EL_ZURO"] - sequence = PulseSequence() qubits = {0: platform.qubits[0]} couplers = {} @@ -687,41 +780,10 @@ def test_experiment_sweep_punchouts(dummy_qrc, parameter): averaging_mode=AveragingMode.CYCLIC, ) - IQM5q.sweepers = sweepers - rearranging_axes, sweepers = IQM5q.rearrange_sweepers(sweepers) - IQM5q.sweepers = sweepers # to be changed IQM5q.experiment_flow(qubits, couplers, sequence, options) - assert "measure0" in IQM5q.experiment.signals - assert "acquire0" in IQM5q.experiment.signals - - -# TODO: Fix this -def test_sim(dummy_qrc): - platform = create_platform("zurich") - IQM5q = platform.instruments["EL_ZURO"] - sequence = PulseSequence() - qubits = {0: platform.qubits[0]} - platform.qubits = qubits - ro_pulses = {} - qd_pulses = {} - qf_pulses = {} - for qubit in qubits: - qd_pulses[qubit] = platform.create_RX_pulse(qubit, start=0) - sequence.add(qd_pulses[qubit]) - ro_pulses[qubit] = platform.create_qubit_readout_pulse( - qubit, start=qd_pulses[qubit].finish - ) - sequence.add(ro_pulses[qubit]) - qf_pulses[qubit] = FluxPulse( - start=0, - duration=500, - amplitude=1, - shape=Rectangular(), - channel=platform.qubits[qubit].flux.name, - qubit=qubit, - ) - sequence.add(qf_pulses[qubit]) + assert measure_channel_name(qubits[0]) in IQM5q.experiment.signals + assert acquire_channel_name(qubits[0]) in IQM5q.experiment.signals def test_batching(dummy_qrc): @@ -756,7 +818,7 @@ def test_connections(instrument): @pytest.mark.qpu -def test_experiment_execute_pulse_sequence(connected_platform, instrument): +def test_experiment_execute_pulse_sequence_qpu(connected_platform, instrument): platform = connected_platform sequence = PulseSequence() qubits = {0: platform.qubits[0], "c0": platform.qubits["c0"]} @@ -795,9 +857,8 @@ def test_experiment_execute_pulse_sequence(connected_platform, instrument): @pytest.mark.qpu -def test_experiment_sweep_2d_specific(connected_platform, instrument): +def test_experiment_sweep_2d_specific_qpu(connected_platform, instrument): platform = connected_platform - sequence = PulseSequence() qubits = {0: platform.qubits[0]} swept_points = 5 @@ -849,25 +910,22 @@ def test_experiment_sweep_2d_specific(connected_platform, instrument): def get_previous_subsequence_finish(instrument, name): """Look recursively for sub_section finish times.""" - signal_name = re.sub("sequence_", "", name) - signal_name = re.sub(r"_\d+$", "", signal_name) - signal_name = re.sub(r"flux", "bias", signal_name) - finish = 0 - for section in instrument.experiment.sections[0].children: - if section.uid == name: - for pulse in section.children: - if pulse.signal == signal_name: - try: - finish += pulse.time - except AttributeError: - # not a laboneq Delay class object, skipping - pass - try: - finish += pulse.pulse.length - except AttributeError: - # not a laboneq PlayPulse class object, skipping - pass - return finish + section = next( + iter(ch for ch in instrument.experiment.sections[0].children if ch.uid == name) + ) + finish = defaultdict(int) + for pulse in section.children: + try: + finish[pulse.signal] += pulse.time + except AttributeError: + # not a laboneq Delay class object, skipping + pass + try: + finish[pulse.signal] += pulse.pulse.length + except AttributeError: + # not a laboneq PlayPulse class object, skipping + pass + return max(finish.values()) def test_experiment_measurement_sequence(dummy_qrc): @@ -902,11 +960,11 @@ def test_experiment_measurement_sequence(dummy_qrc): IQM5q.experiment_flow(qubits, couplers, sequence, options) measure_start = 0 for section in IQM5q.experiment.sections[0].children: - if section.uid == "sequence_measure_0": + if section.uid == "measure_0": measure_start += get_previous_subsequence_finish(IQM5q, section.play_after) for pulse in section.children: try: - if pulse.signal == "measure0": + if pulse.signal == measure_channel_name(qubits[0]): measure_start += pulse.time except AttributeError: # not a laboneq delay class object, skipping