diff --git a/src/qibolab/instruments/qm/__init__.py b/src/qibolab/instruments/qm/__init__.py index e053aa970..668987278 100644 --- a/src/qibolab/instruments/qm/__init__.py +++ b/src/qibolab/instruments/qm/__init__.py @@ -1,2 +1,2 @@ from .controller import QMController -from .devices import Octave, OPXplus +from .devices import FEM, OPX1000, Octave, OPXplus diff --git a/src/qibolab/instruments/qm/config.py b/src/qibolab/instruments/qm/config.py index 41c1f3e4a..578efc24a 100644 --- a/src/qibolab/instruments/qm/config.py +++ b/src/qibolab/instruments/qm/config.py @@ -6,7 +6,7 @@ from qibolab.pulses import PulseType, Rectangular -from .ports import OPXIQ, OctaveInput, OctaveOutput, OPXOutput +from .ports import OPXIQ, FEMInput, FEMOutput, OctaveInput, OctaveOutput, OPXOutput SAMPLING_RATE = 1 """Sampling rate of Quantum Machines OPX in GSps.""" @@ -48,28 +48,48 @@ def register_port(self, port): self.register_port(port.q) else: is_octave = isinstance(port, (OctaveOutput, OctaveInput)) + is_fem = isinstance(port, (FEMOutput, FEMInput)) controllers = self.octaves if is_octave else self.controllers if port.device not in controllers: if is_octave: controllers[port.device] = {} + elif is_fem: + controllers[port.device] = {"type": "opx1000", "fems": {}} else: controllers[port.device] = { "analog_inputs": DEFAULT_INPUTS, "digital_outputs": {}, } - device = controllers[port.device] + if is_fem: + fems = controllers[port.device]["fems"] + if port.fem_number not in fems: + fems[port.fem_number] = { + "type": port.fem_type, + "analog_inputs": DEFAULT_INPUTS, + "digital_outputs": {}, + } + device = fems[port.fem_number] + else: + device = controllers[port.device] + if port.key in device: device[port.key].update(port.config) else: device[port.key] = port.config if is_octave: - con = port.opx_port.i.device - number = port.opx_port.i.number - device["connectivity"] = con self.register_port(port.opx_port) - self.controllers[con]["digital_outputs"][number] = {} + subport = port.opx_port.i + con = subport.device + number = subport.number + if isinstance(subport, (FEMOutput, FEMInput)): + fem = subport.fem_number + device["connectivity"] = (con, fem) + self.controllers[con]["fems"][fem]["digital_outputs"][number] = {} + else: + device["connectivity"] = con + self.controllers[con]["digital_outputs"][number] = {} @staticmethod def iq_imbalance(g, phi): @@ -275,10 +295,6 @@ def register_pulse(self, qubit, qmpulse): "waveforms": {"I": serial_i, "Q": serial_q}, "digital_marker": "ON", } - # register drive pulse in elements - self.elements[qmpulse.element]["operations"][ - qmpulse.operation - ] = qmpulse.operation elif pulse.type is PulseType.FLUX: serial = self.register_waveform(pulse) @@ -289,10 +305,6 @@ def register_pulse(self, qubit, qmpulse): "single": serial, }, } - # register flux pulse in elements - self.elements[qmpulse.element]["operations"][ - qmpulse.operation - ] = qmpulse.operation elif pulse.type is PulseType.READOUT: serial_i = self.register_waveform(pulse, "i") @@ -312,14 +324,14 @@ def register_pulse(self, qubit, qmpulse): }, "digital_marker": "ON", } - # register readout pulse in elements - self.elements[qmpulse.element]["operations"][ - qmpulse.operation - ] = qmpulse.operation else: raise_error(TypeError, f"Unknown pulse type {pulse.type.name}.") + self.elements[qmpulse.element]["operations"][ + qmpulse.operation + ] = qmpulse.operation + def register_waveform(self, pulse, mode="i"): """Registers waveforms in QM config. diff --git a/src/qibolab/instruments/qm/controller.py b/src/qibolab/instruments/qm/controller.py index 08d3f23f6..e9017bb4b 100644 --- a/src/qibolab/instruments/qm/controller.py +++ b/src/qibolab/instruments/qm/controller.py @@ -189,6 +189,9 @@ def __post_init__(self): # convert simulation duration from ns to clock cycles self.simulation_duration //= 4 + def __str__(self): + return self.name + def ports(self, name, output=True): """Provides instrument ports to the user. @@ -255,7 +258,6 @@ def disconnect(self): self._reset_temporary_calibration() if self.manager is not None: self.manager.close_all_quantum_machines() - self.manager.close() self.is_connected = False def calibrate_mixers(self, qubits): @@ -373,6 +375,10 @@ def sweep(self, qubits, couplers, sequence, options, *sweepers): # play pulses using QUA with qua.program() as experiment: n = declare(int) + for qubit in qubits.values(): + if qubit.flux: + qua.set_dc_offset(qubit.flux.name, "single", qubit.sweetspot) + acquisitions = declare_acquisitions(ro_pulses, qubits, options) with for_(n, 0, n < options.nshots, n + 1): sweep( diff --git a/src/qibolab/instruments/qm/devices.py b/src/qibolab/instruments/qm/devices.py index 6ec0674d8..e8b3a547b 100644 --- a/src/qibolab/instruments/qm/devices.py +++ b/src/qibolab/instruments/qm/devices.py @@ -1,12 +1,14 @@ from collections import defaultdict from dataclasses import dataclass, field from itertools import chain -from typing import Dict +from typing import Dict, Literal, Union from qibolab.instruments.abstract import Instrument from .ports import ( OPXIQ, + FEMInput, + FEMOutput, OctaveInput, OctaveOutput, OPXInput, @@ -43,6 +45,9 @@ class QMDevice(Instrument): inputs: Dict[int, QMInput] = field(init=False) """Dictionary containing the instrument's input ports.""" + def __str__(self): + return self.name + def ports(self, number, output=True): """Provides instrument's ports to the user. @@ -94,13 +99,60 @@ def __post_init__(self): self.inputs = PortsDefaultdict(lambda n: OPXInput(self.name, n)) +@dataclass +class FEM: + """Device handling OPX1000 FEMs.""" + + name: int + type: Literal["LF", "MF"] = "LF" + + +@dataclass +class OPX1000(QMDevice): + """Device handling OPX1000 controllers.""" + + fems: Dict[int, FEM] = field(default_factory=dict) + + def __post_init__(self): + def kwargs(fem): + return {"fem_number": fem, "fem_type": self.fems[fem].type} + + self.outputs = PortsDefaultdict( + lambda pair: FEMOutput(self.name, pair[1], **kwargs(pair[0])) + ) + self.inputs = PortsDefaultdict( + lambda pair: FEMInput(self.name, pair[1], **kwargs(pair[0])) + ) + + def ports(self, fem_number: int, number: int, output: bool = True): + ports_ = self.outputs if output else self.inputs + return ports_[(fem_number, number)] + + def connectivity(self, fem_number: int) -> tuple["OPX1000", int]: + return (self, fem_number) + + def setup(self, **kwargs): + for name, settings in kwargs.items(): + fem, port = name.split("/") + fem = int(fem) + number = int(port[1:]) + if port[0] == "o": + self.outputs[(fem, number)].setup(**settings) + elif port[0] == "i": + self.inputs[(fem, number)].setup(**settings) + else: + raise ValueError( + f"Invalid port name {name} in instrument settings for {self.name}." + ) + + @dataclass class Octave(QMDevice): """Device handling Octaves.""" port: int """Network port of the Octave in the cluster configuration.""" - connectivity: OPXplus + connectivity: Union[OPXplus, tuple[OPX1000, int]] """OPXplus that acts as the waveform generator for the Octave.""" def __post_init__(self): @@ -115,7 +167,12 @@ def ports(self, number, output=True): """ port = super().ports(number, output) if port.opx_port is None: - iport = self.connectivity.ports(2 * number - 1, output) - qport = self.connectivity.ports(2 * number, output) + if isinstance(self.connectivity, OPXplus): + iport = self.connectivity.ports(2 * number - 1, output) + qport = self.connectivity.ports(2 * number, output) + else: + opx, fem_number = self.connectivity + iport = opx.ports(fem_number, 2 * number - 1, output) + qport = opx.ports(fem_number, 2 * number, output) port.opx_port = OPXIQ(iport, qport) return port diff --git a/src/qibolab/instruments/qm/ports.py b/src/qibolab/instruments/qm/ports.py index 1d6ce2d44..5a593746d 100644 --- a/src/qibolab/instruments/qm/ports.py +++ b/src/qibolab/instruments/qm/ports.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field, fields -from typing import ClassVar, Dict, Optional, Union +from typing import ClassVar, Dict, Literal, Optional, Union DIGITAL_DELAY = 57 DIGITAL_BUFFER = 18 @@ -131,6 +131,37 @@ class OPXIQ: """Port implementing the Q-component of the signal.""" +@dataclass +class FEMOutput(OPXOutput): + fem_number: int = 0 + fem_type: Literal["LF", "MF"] = "LF" + output_mode: Literal["direct", "amplified"] = field( + default="direct", metadata={"config": "output_mode", "settings": True} + ) + + @property + def name(self): + return f"{self.fem_number}/o{self.number}" + + @property + def pair(self): + return (self.device, self.fem_number, self.number) + + +@dataclass +class FEMInput(OPXInput): + fem_number: int = 0 + fem_type: Literal["LF", "MF"] = "LF" + + @property + def name(self): + return f"{self.fem_number}/i{self.number}" + + @property + def pair(self): + return (self.device, self.fem_number, self.number) + + @dataclass class OctaveOutput(QMOutput): key: ClassVar[str] = "RF_outputs" @@ -149,7 +180,7 @@ class OctaveOutput(QMOutput): Can be external or internal. """ - output_mode: str = field(default="triggered", metadata={"config": "output_mode"}) + output_mode: str = field(default="always_on", metadata={"config": "output_mode"}) """Can be: "always_on" / "always_off"/ "triggered" / "triggered_reversed".""" digital_delay: int = DIGITAL_DELAY """Delay for digital output channel.""" @@ -165,11 +196,14 @@ def digital_inputs(self): Digital markers are used to switch LOs on in triggered mode. """ - opx = self.opx_port.i.device - number = self.opx_port.i.number + opx_port = self.opx_port.i + if isinstance(opx_port, (FEMOutput, FEMInput)): + port = (opx_port.device, opx_port.fem_number, opx_port.number) + else: + port = (opx_port.device, opx_port.number) return { "output_switch": { - "port": (opx, number), + "port": port, "delay": self.digital_delay, "buffer": self.digital_buffer, } diff --git a/src/qibolab/instruments/qm/sweepers.py b/src/qibolab/instruments/qm/sweepers.py index 3110e1f3d..081f68264 100644 --- a/src/qibolab/instruments/qm/sweepers.py +++ b/src/qibolab/instruments/qm/sweepers.py @@ -141,16 +141,18 @@ def _sweep_bias(sweepers, qubits, qmsequence, relaxation_time): for qubit in sweeper.qubits: b0 = qubit.flux.offset max_offset = qubit.flux.max_offset + if max_offset is None: + max_offset = 0.5 max_value = maximum_sweep_value(sweeper.values, b0) check_max_offset(max_value, max_offset) offset0.append(declare(fixed, value=b0)) b = declare(fixed) with for_(*from_array(b, sweeper.values)): for qubit, b0 in zip(sweeper.qubits, offset0): - with qua.if_((b + b0) >= 0.49): - qua.set_dc_offset(f"flux{qubit.name}", "single", 0.49) - with qua.elif_((b + b0) <= -0.49): - qua.set_dc_offset(f"flux{qubit.name}", "single", -0.49) + with qua.if_((b + b0) >= max_offset): + qua.set_dc_offset(f"flux{qubit.name}", "single", max_offset) + with qua.elif_((b + b0) <= -max_offset): + qua.set_dc_offset(f"flux{qubit.name}", "single", -max_offset) with qua.else_(): qua.set_dc_offset(f"flux{qubit.name}", "single", (b + b0))