Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have Device.write accept a full byte sequence, not just a single byte #79

Merged
merged 6 commits into from
Apr 8, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pyvisa_sim/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,6 @@ def write(self, data: bytes) -> None:
if not isinstance(data, bytes):
raise TypeError("data must be an instance of bytes")

if len(data) != 1:
msg = "data must have a length of 1, not %d"
raise ValueError(msg % len(data))

self._input_buffer.extend(data)

le = len(self._query_eom)
Expand Down
7 changes: 3 additions & 4 deletions pyvisa_sim/sessions/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,10 @@ def write(self, data: bytes) -> Tuple[int, constants.StatusCode]:
if asrl_end == constants.SerialTermination.last_bit:
last_bit, _ = self.get_attribute(constants.ResourceAttribute.asrl_data_bits)
mask = 1 << (last_bit - 1)
for val in common.iter_bytes(data, mask, send_end):
self.device.write(val)
val = b"".join(common.iter_bytes(data, mask, send_end))
self.device.write(val)
else:
for i in range(len(data)):
self.device.write(data[i : i + 1])
self.device.write(data)

if asrl_end == constants.SerialTermination.termination_char:
if send_end:
Expand Down
3 changes: 1 addition & 2 deletions pyvisa_sim/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ def read(self, count: int) -> Tuple[bytes, constants.StatusCode]:
def write(self, data: bytes) -> Tuple[int, constants.StatusCode]:
send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled)

for i in range(len(data)):
self.device.write(data[i : i + 1])
self.device.write(data)

if send_end:
# EOM4882
Expand Down
17 changes: 17 additions & 0 deletions pyvisa_sim/testsuite/test_all.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# -*- coding: utf-8 -*-
import logging

import pytest
from pyvisa.errors import VisaIOError

Expand Down Expand Up @@ -179,3 +181,18 @@ def test_instrument_for_error_state(resource, resource_manager):

inst.write(":VOLT:IMM:AMPL 0")
assert_instrument_response(inst, ":SYST:ERR?", "1, Command error")


def test_device_write_logging(caplog, resource_manager) -> None:
instr = resource_manager.open_resource(
"USB0::0x1111::0x2222::0x4444::0::INSTR",
read_termination="\n",
write_termination="\n",
)

with caplog.at_level(logging.DEBUG):
instr.write("*IDN?")
instr.read()

assert "input buffer: b'D'" not in caplog.text
assert r"input buffer: b'*IDN?\n'" in caplog.text
40 changes: 40 additions & 0 deletions pyvisa_sim/testsuite/test_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import List, Optional

import pytest

from pyvisa_sim import common


@pytest.mark.parametrize(
"data, mask, send_end, want",
[
(b"1234", None, False, b"1234"),
(b"41234", 3, False, b"40004"),
# Can't figure this one out. Getting ValueError: bytes must in in range(0, 256)
# If I knew more about the purpose of `iter_bytes` then maybe I could
# reconcile it, but I don't have time to investigate right now.
pytest.param(
b"1234",
3,
True,
b"1234",
marks=pytest.mark.xfail(
reason=(
"ValueError bytes must be in range(0, 256). TODO: figure"
" out correct 'want' value."
)
),
),
Comment on lines +13 to +27
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for uncovering a bug !

iter_bytes is meant to clip values to the right number of bits (since serial may use from 5 to 8 bits). When send_end is True the highest data bits should only be set when the message is over. The problem here is that ~ gives us a signed binary number. So basically the function is bugged (and it is annoying since it is duplicated in pyvisa-py).

To make it less error prone it is we could refactor it to take the number of data bits and send_end. That way we could write out the mask directly. Can you make the change or do you prefer me to take care of it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to take a deeper dive to make sure I grok it. I've opened #80 to track, but I don't think that fixing the bug should block this PR.

I'll take a stab at it when I can. If I can't figure it out I'll bounce it back to you 😆

],
)
def test_iter_bytes(
data: bytes, mask: Optional[int], send_end: bool, want: List[bytes]
) -> None:
got = b"".join(common.iter_bytes(data, mask=mask, send_end=send_end))
assert got == want


def test_iter_bytes_with_send_end_requires_mask() -> None:
with pytest.raises(ValueError):
# Need to wrap in list otherwise the iterator is never called.
list(common.iter_bytes(b"", mask=None, send_end=True))
29 changes: 29 additions & 0 deletions pyvisa_sim/testsuite/test_serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
import pyvisa

from pyvisa_sim.sessions import serial

serial.SerialInstrumentSession


def test_serial_write_with_termination_last_bit(resource_manager):
instr = resource_manager.open_resource(
"ASRL4::INSTR",
read_termination="\n",
write_termination="\r\n",
)

# Ensure that we test the `asrl_end` block of serial.SerialInstrumentSession.write
instr.set_visa_attribute(
pyvisa.constants.ResourceAttribute.asrl_end_out,
pyvisa.constants.SerialTermination.last_bit,
)

# There's a bug (maybe?) in common.iter_bytes that we want to avoid for now.
instr.set_visa_attribute(
pyvisa.constants.ResourceAttribute.send_end_enabled,
pyvisa.constants.VI_FALSE,
)

instr.write("*IDN?")
assert instr.read() == "SCPI,MOCK,VERSION_1.0"