diff --git a/scaaml/capture/scope/lecroy/lecroy_communication.py b/scaaml/capture/scope/lecroy/lecroy_communication.py index b4eae556..eb97cbc4 100644 --- a/scaaml/capture/scope/lecroy/lecroy_communication.py +++ b/scaaml/capture/scope/lecroy/lecroy_communication.py @@ -11,12 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# +# Based on by code kindly shared by Victor Lomné - NinjaLab """Communication with the LeCroy oscilloscope. Either using pyvisa or socket. """ from abc import ABC, abstractmethod import hashlib import logging +import socket +from struct import pack, unpack from typing import Optional import pyvisa @@ -71,7 +75,9 @@ def query_binary_values(self, message: str, datatype="B", container=bytearray) -> bytearray: - """Query binary data.""" + """Query binary data. Beware that the results from socket version might + contain headers which are stripped by pyvisa. + """ def _check_response_template(self): """ Check if the hash of the waveform template matches the supported @@ -119,7 +125,7 @@ def __init__(self, ip_address: str, timeout: float = 5.0): def connect(self): # For portability and ease of setup we enforce the pure Python backend self._resource_manager = pyvisa.ResourceManager("@py") - assert self._resource_manager is not None + assert self._resource_manager scope_resource = self._resource_manager.open_resource( f"TCPIP::{self._ip_address}::INSTR", @@ -128,7 +134,7 @@ def connect(self): assert isinstance(scope_resource, pyvisa.resources.MessageBasedResource) self._scope = scope_resource - assert self._scope is not None + assert self._scope self._scope.timeout = self._timeout * 1_000 # Convert second to ms self._scope.clear() @@ -137,17 +143,17 @@ def connect(self): @make_custom_exception def close(self) -> None: - assert self._scope is not None + assert self._scope self._scope.before_close() self._scope.close() - assert self._resource_manager is not None + assert self._resource_manager self._resource_manager.close() @make_custom_exception def write(self, message: str) -> None: """Write a message to the oscilloscope. """ - assert self._scope is not None + assert self._scope self._logger.debug("write(message=\"%s\")", message) self._scope.write(message) @@ -156,7 +162,7 @@ def query(self, message: str) -> str: """Query the oscilloscope (write, read, and decode the answer as a string). """ - assert self._scope is not None + assert self._scope self._logger.debug("query(message=\"%s\")", message) return self._scope.query(message).strip() @@ -164,7 +170,7 @@ def query(self, message: str) -> str: def get_waveform(self, channel: LECROY_CHANNEL_NAME_T) -> LecroyWaveform: """Get a LecroyWaveform object representing a single waveform. """ - assert self._scope is not None + assert self._scope return self._scope.query_binary_values( f"{channel}:WAVEFORM?", @@ -178,10 +184,168 @@ def query_binary_values(self, datatype="B", container=bytearray): """Query binary data.""" - assert self._scope is not None + assert self._scope self._logger.debug("query_binary_values(message=\"%s\")", message) return self._scope.query_binary_values( message, datatype=datatype, container=container, ) + + +class LeCroyCommunicationSocket(LeCroyCommunication): + """Use Python socket to communicate using the TCP/IP (VICP). + ("Utilities > Utilities Setup > Remote" and choose TCPIP).""" + + def __init__(self, ip_address: str, timeout: float = 5.0): + super().__init__( + ip_address=ip_address, + timeout=timeout, + ) + # Header format (see section "VICP Headers"): + # operation: byte + # header_version: byte + # sequence_number: byte + # spare: byte = 0 (reserved for future) + # block_length: long = length of the command (block to be sent) + self._lecroy_command_header = ">4BL" + self._socket: Optional[socket.socket] = None + + @make_custom_exception + def connect(self): + assert self._socket is None + + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.settimeout(self._timeout) + + # establish connection + self._socket.connect((self._ip_address, 1861)) + + # Check if the response template is what LecroyWaveform expects + self._check_response_template() + + @make_custom_exception + def close(self) -> None: + assert self._socket + self._socket.shutdown(socket.SHUT_RDWR) + self._socket.close() + self._socket = None + + @make_custom_exception + def write(self, message: str) -> None: + """Write a message to the oscilloscope. + """ + assert self._socket + self._socket.send(self._format_command(message)) + + @make_custom_exception + def query(self, message: str) -> str: + """Query the oscilloscope (write, read, and decode the answer as a + string). + """ + return self.query_binary_values(message).decode() + + @make_custom_exception + def get_waveform(self, channel: LECROY_CHANNEL_NAME_T) -> LecroyWaveform: + """Get a LecroyWaveform object representing a single waveform. + + Args: + channel (LECROY_CHANNEL_NAME_T): The name of queried channel. + """ + raw_data = self.query_binary_values(f"{channel}:WAVEFORM?") + assert raw_data[:6] == b"ALL,#9" # followed by 9 digits for size + len_raw_data = int(raw_data[6:15]) # length without the header + raw_data = raw_data[15:] + if len_raw_data + 1 == len(raw_data): + raw_data = raw_data[:-1] # last is linefeed + assert len(raw_data) == len_raw_data + return LecroyWaveform(raw_data) + + @make_custom_exception + def query_binary_values(self, + message: str, + datatype="B", + container=None) -> bytes: + """Query binary data. + + Args: + message (str): Query message. + datatype (str): Ignored. + container: A bytearray is always used. + + Returns: a bytes representation of the response. + """ + assert self._socket + + del datatype # ignored + del container # ignored + + self._logger.debug("\"%s\"", message) + + # Send message + self.write(message) + + # Receive and decode answer + return self._get_raw_response() + + def _format_command(self, command: str) -> bytes: + """Method formatting leCroy command. + + Args: + command (str): The command to be formatted for sending over a + socket. + + Returns: bytes representation to be directly sent over a socket. + """ + # Compute header for the current command, header: + # operation = DATA | EOI + command_header = pack(self._lecroy_command_header, 129, 1, 1, 0, + len(command)) + + formatted_command = command_header + command.encode("ascii") + return formatted_command + + def _get_raw_response(self) -> bytes: + """Get raw response from the socket. + + Returns: bytes representation of the response. + """ + assert self._socket + response = bytearray() + + while True: + header = bytearray() + + # Loop until we get a full header (8 bytes) + while len(header) < 8: + header.extend(self._socket.recv(8 - len(header))) + + # Parse formated response + ( + operation, + header_version, # unused + sequence_number, # unused + spare, # unused + total_bytes) = unpack(self._lecroy_command_header, header) + + # Delete unused values + del header_version + del sequence_number + del spare + + # Buffer for the current portion of data + buffer = bytearray() + + # Loop until we get all data + while len(buffer) < total_bytes: + buffer.extend( + self._socket.recv(min(total_bytes - len(buffer), 8_192))) + + # Accumulate final response + response.extend(buffer) + + # Leave the loop when the EOI bit is set + if operation & 1: + break + + return bytes(response)