diff --git a/.github/workflows/code.yml b/.github/workflows/code.yml index 9675f58f4..a19d1dd91 100644 --- a/.github/workflows/code.yml +++ b/.github/workflows/code.yml @@ -16,7 +16,7 @@ jobs: - uses: actions/checkout@v2 - name: Lint - run: pip install --user .[dev] && tox -e pre-commit,mypy + run: pip install --user .[dev] && tox -e pre-commit build: if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name != github.repository @@ -69,7 +69,7 @@ jobs: source ${GITHUB_WORKSPACE}/scripts/epics_docker.sh - name: Test with pytest - run: pytest -k "${TEST_CL} or /v2/" + run: pytest -k "${TEST_CL}" - name: Upload coverage to Codecov uses: codecov/codecov-action@v2 diff --git a/docs/_templates/custom-class-template.rst b/docs/_templates/custom-class-template.rst index e909602bb..f73eda50e 100644 --- a/docs/_templates/custom-class-template.rst +++ b/docs/_templates/custom-class-template.rst @@ -1,8 +1,3 @@ -.. note:: - - Ophyd.v2 is included on a provisional basis until the v2.0 release and - may change API on minor release numbers before then - {{ fullname | escape | underline}} .. currentmodule:: {{ module }} diff --git a/docs/_templates/custom-module-template.rst b/docs/_templates/custom-module-template.rst index e82810011..91083ca74 100644 --- a/docs/_templates/custom-module-template.rst +++ b/docs/_templates/custom-module-template.rst @@ -1,8 +1,3 @@ -.. note:: - - Ophyd.v2 is included on a provisional basis until the v2.0 release and - may change API on minor release numbers before then - {{ fullname | escape | underline}} .. automodule:: {{ fullname }} diff --git a/ophyd/__init__.py b/ophyd/__init__.py index 8f66ff3e2..a3b412cac 100644 --- a/ophyd/__init__.py +++ b/ophyd/__init__.py @@ -32,7 +32,7 @@ def set_cl(control_layer=None, *, pv_telemetry=False): # TODO replace this with fancier meta-programming # TODO handle control_layer being a module/nampspace directly if control_layer == "pyepics": - # If using pyepics and ophyd.v2 (p4p and aioca), need to use the same + # If using pyepics and ophyd-async (p4p and aioca), need to use the same # libCom and libCa as provided by epicscorelibs # https://github.com/BCDA-APS/apstools/issues/836 try: diff --git a/ophyd/v2/__init__.py b/ophyd/v2/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/ophyd/v2/_aioca.py b/ophyd/v2/_aioca.py deleted file mode 100644 index 7e97770e0..000000000 --- a/ophyd/v2/_aioca.py +++ /dev/null @@ -1,236 +0,0 @@ -import sys -from asyncio import CancelledError -from dataclasses import dataclass -from enum import Enum -from typing import Any, Dict, Optional, Sequence, Type, Union - -from aioca import ( - FORMAT_CTRL, - FORMAT_RAW, - FORMAT_TIME, - Subscription, - caget, - camonitor, - caput, -) -from aioca.types import AugmentedValue, Dbr, Format -from bluesky.protocols import Descriptor, Dtype, Reading -from epicscorelibs.ca import dbr - -from .core import ( - NotConnected, - ReadingValueCallback, - SignalBackend, - T, - get_dtype, - get_unique, - wait_for_connection, -) - -dbr_to_dtype: Dict[Dbr, Dtype] = { - dbr.DBR_STRING: "string", - dbr.DBR_SHORT: "integer", - dbr.DBR_FLOAT: "number", - dbr.DBR_CHAR: "string", - dbr.DBR_LONG: "integer", - dbr.DBR_DOUBLE: "number", -} - - -@dataclass -class CaConverter: - read_dbr: Optional[Dbr] - write_dbr: Optional[Dbr] - - def write_value(self, value) -> Any: - return value - - def value(self, value: AugmentedValue): - return value - - def reading(self, value: AugmentedValue): - return dict( - value=self.value(value), - timestamp=value.timestamp, - alarm_severity=-1 if value.severity > 2 else value.severity, - ) - - def descriptor(self, source: str, value: AugmentedValue) -> Descriptor: - return dict(source=source, dtype=dbr_to_dtype[value.datatype], shape=[]) - - -class CaArrayConverter(CaConverter): - def descriptor(self, source: str, value: AugmentedValue) -> Descriptor: - return dict(source=source, dtype="array", shape=[len(value)]) - - -@dataclass -class CaEnumConverter(CaConverter): - enum_class: Type[Enum] - - def write_value(self, value: Union[Enum, str]): - if isinstance(value, Enum): - return value.value - else: - return value - - def value(self, value: AugmentedValue): - return self.enum_class(value) - - def descriptor(self, source: str, value: AugmentedValue) -> Descriptor: - choices = [e.value for e in self.enum_class] - return dict(source=source, dtype="string", shape=[], choices=choices) # type: ignore - - -class DisconnectedCaConverter(CaConverter): - def __getattribute__(self, __name: str) -> Any: - raise NotImplementedError("No PV has been set as connect() has not been called") - - -def make_converter( - datatype: Optional[Type], values: Dict[str, AugmentedValue] -) -> CaConverter: - pv = list(values)[0] - pv_dbr = get_unique({k: v.datatype for k, v in values.items()}, "datatypes") - is_array = bool([v for v in values.values() if v.element_count > 1]) - if is_array and datatype is str and pv_dbr == dbr.DBR_CHAR: - # Override waveform of chars to be treated as string - return CaConverter(dbr.DBR_CHAR_STR, dbr.DBR_CHAR_STR) - elif is_array and pv_dbr == dbr.DBR_STRING: - # Waveform of strings, check we wanted this - if datatype and datatype != Sequence[str]: - raise TypeError(f"{pv} has type [str] not {datatype.__name__}") - return CaArrayConverter(pv_dbr, None) - elif is_array: - pv_dtype = get_unique({k: v.dtype for k, v in values.items()}, "dtypes") - # This is an array - if datatype: - # Check we wanted an array of this type - dtype = get_dtype(datatype) - if not dtype: - raise TypeError(f"{pv} has type [{pv_dtype}] not {datatype.__name__}") - if dtype != pv_dtype: - raise TypeError(f"{pv} has type [{pv_dtype}] not [{dtype}]") - return CaArrayConverter(pv_dbr, None) - elif pv_dbr == dbr.DBR_ENUM and datatype is bool: - # Database can't do bools, so are often representated as enums, CA can do int tho - pv_choices_len = get_unique( - {k: len(v.enums) for k, v in values.items()}, "number of choices" - ) - if pv_choices_len != 2: - raise TypeError(f"{pv} has {pv_choices_len} choices, can't map to bool") - return CaConverter(dbr.DBR_SHORT, dbr.DBR_SHORT) - elif pv_dbr == dbr.DBR_ENUM: - # This is an Enum - pv_choices = get_unique( - {k: tuple(v.enums) for k, v in values.items()}, "choices" - ) - if datatype: - if not issubclass(datatype, Enum): - raise TypeError(f"{pv} has type Enum not {datatype.__name__}") - choices = tuple(v.value for v in datatype) - if set(choices) != set(pv_choices): - raise TypeError(f"{pv} has choices {pv_choices} not {choices}") - enum_class = datatype - else: - enum_class = Enum("GeneratedChoices", {x: x for x in pv_choices}, type=str) # type: ignore - return CaEnumConverter(dbr.DBR_STRING, None, enum_class) - else: - value = list(values.values())[0] - # Done the dbr check, so enough to check one of the values - if datatype and not isinstance(value, datatype): - raise TypeError( - f"{pv} has type {type(value).__name__.replace('ca_', '')} not {datatype.__name__}" - ) - return CaConverter(pv_dbr, None) - - -_tried_pyepics = False - - -def _use_pyepics_context_if_imported(): - global _tried_pyepics - if not _tried_pyepics: - ca = sys.modules.get("epics.ca", None) - if ca: - ca.use_initial_context() - _tried_pyepics = True - - -class CaSignalBackend(SignalBackend[T]): - def __init__(self, datatype: Optional[Type[T]], read_pv: str, write_pv: str): - self.datatype = datatype - self.read_pv = read_pv - self.write_pv = write_pv - self.initial_values: Dict[str, AugmentedValue] = {} - self.converter: CaConverter = DisconnectedCaConverter(None, None) - self.source = f"ca://{self.read_pv}" - self.subscription: Optional[Subscription] = None - - async def _store_initial_value(self, pv): - try: - self.initial_values[pv] = await caget(pv, format=FORMAT_CTRL, timeout=None) - except CancelledError: - raise NotConnected(self.source) - - async def connect(self): - _use_pyepics_context_if_imported() - if self.read_pv != self.write_pv: - # Different, need to connect both - await wait_for_connection( - read_pv=self._store_initial_value(self.read_pv), - write_pv=self._store_initial_value(self.write_pv), - ) - else: - # The same, so only need to connect one - await self._store_initial_value(self.read_pv) - self.converter = make_converter(self.datatype, self.initial_values) - - async def put(self, value: Optional[T], wait=True, timeout=None): - if value is None: - write_value = self.initial_values[self.write_pv] - else: - write_value = self.converter.write_value(value) - await caput( - self.write_pv, - write_value, - datatype=self.converter.write_dbr, - wait=wait, - timeout=timeout, - ) - - async def _caget(self, format: Format) -> AugmentedValue: - return await caget( - self.read_pv, - datatype=self.converter.read_dbr, - format=format, - timeout=None, - ) - - async def get_descriptor(self) -> Descriptor: - value = await self._caget(FORMAT_CTRL) - return self.converter.descriptor(self.source, value) - - async def get_reading(self) -> Reading: - value = await self._caget(FORMAT_TIME) - return self.converter.reading(value) - - async def get_value(self) -> T: - value = await self._caget(FORMAT_RAW) - return self.converter.value(value) - - def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None: - if callback: - assert ( - not self.subscription - ), "Cannot set a callback when one is already set" - self.subscription = camonitor( - self.read_pv, - lambda v: callback(self.converter.reading(v), self.converter.value(v)), - datatype=self.converter.read_dbr, - format=FORMAT_TIME, - ) - else: - if self.subscription: - self.subscription.close() - self.subscription = None diff --git a/ophyd/v2/_p4p.py b/ophyd/v2/_p4p.py deleted file mode 100644 index c449e377a..000000000 --- a/ophyd/v2/_p4p.py +++ /dev/null @@ -1,241 +0,0 @@ -import asyncio -import atexit -from asyncio import CancelledError -from dataclasses import dataclass -from enum import Enum -from typing import Any, Dict, Optional, Sequence, Type, Union - -from bluesky.protocols import Descriptor, Dtype, Reading -from p4p.client.asyncio import Context, Subscription - -from .core import ( - NotConnected, - ReadingValueCallback, - SignalBackend, - T, - get_dtype, - get_unique, - wait_for_connection, -) - -# https://mdavidsaver.github.io/p4p/values.html -specifier_to_dtype: Dict[str, Dtype] = { - "?": "integer", # bool - "b": "integer", # int8 - "B": "integer", # uint8 - "h": "integer", # int16 - "H": "integer", # uint16 - "i": "integer", # int32 - "I": "integer", # uint32 - "l": "integer", # int64 - "L": "integer", # uint64 - "f": "number", # float32 - "d": "number", # float64 - "s": "string", -} - - -class PvaConverter: - def write_value(self, value): - return value - - def value(self, value): - return value["value"] - - def reading(self, value): - ts = value["timeStamp"] - sv = value["alarm"]["severity"] - return dict( - value=self.value(value), - timestamp=ts["secondsPastEpoch"] + ts["nanoseconds"] * 1e-9, - alarm_severity=-1 if sv > 2 else sv, - ) - - def descriptor(self, source: str, value) -> Descriptor: - dtype = specifier_to_dtype[value.type().aspy("value")] - return dict(source=source, dtype=dtype, shape=[]) - - -class PvaArrayConverter(PvaConverter): - def descriptor(self, source: str, value) -> Descriptor: - return dict(source=source, dtype="array", shape=[len(value["value"])]) - - -@dataclass -class PvaEnumConverter(PvaConverter): - enum_class: Type[Enum] - - def write_value(self, value: Union[Enum, str]): - if isinstance(value, Enum): - return value.value - else: - return value - - def value(self, value): - return list(self.enum_class)[value["value"]["index"]] - - def descriptor(self, source: str, value) -> Descriptor: - choices = [e.value for e in self.enum_class] - return dict(source=source, dtype="string", shape=[], choices=choices) # type: ignore - - -class PvaEnumBoolConverter(PvaConverter): - def value(self, value): - return value["value"]["index"] - - def descriptor(self, source: str, value) -> Descriptor: - return dict(source=source, dtype="integer", shape=[]) - - -class PvaTableConverter(PvaConverter): - def value(self, value): - return value["value"].todict() - - def descriptor(self, source: str, value) -> Descriptor: - # This is wrong, but defer until we know how to actually describe a table - return dict(source=source, dtype="object", shape=[]) # type: ignore - - -class DisconnectedPvaConverter(PvaConverter): - def __getattribute__(self, __name: str) -> Any: - raise NotImplementedError("No PV has been set as connect() has not been called") - - -def make_converter(datatype: Optional[Type], values: Dict[str, Any]) -> PvaConverter: - pv = list(values)[0] - typeid = get_unique({k: v.getID() for k, v in values.items()}, "typeids") - typ = get_unique({k: type(v["value"]) for k, v in values.items()}, "value types") - if "NTScalarArray" in typeid and typ == list: - # Waveform of strings, check we wanted this - if datatype and datatype != Sequence[str]: - raise TypeError(f"{pv} has type [str] not {datatype.__name__}") - return PvaArrayConverter() - elif "NTScalarArray" in typeid: - pv_dtype = get_unique( - {k: v["value"].dtype for k, v in values.items()}, "dtypes" - ) - # This is an array - if datatype: - # Check we wanted an array of this type - dtype = get_dtype(datatype) - if not dtype: - raise TypeError(f"{pv} has type [{pv_dtype}] not {datatype.__name__}") - if dtype != pv_dtype: - raise TypeError(f"{pv} has type [{pv_dtype}] not [{dtype}]") - return PvaArrayConverter() - elif "NTEnum" in typeid and datatype is bool: - # Wanted a bool, but database represents as an enum - pv_choices_len = get_unique( - {k: len(v["value"]["choices"]) for k, v in values.items()}, - "number of choices", - ) - if pv_choices_len != 2: - raise TypeError(f"{pv} has {pv_choices_len} choices, can't map to bool") - return PvaEnumBoolConverter() - elif "NTEnum" in typeid: - # This is an Enum - pv_choices = get_unique( - {k: tuple(v["value"]["choices"]) for k, v in values.items()}, "choices" - ) - if datatype: - if not issubclass(datatype, Enum): - raise TypeError(f"{pv} has type Enum not {datatype.__name__}") - choices = tuple(v.value for v in datatype) - if set(choices) != set(pv_choices): - raise TypeError(f"{pv} has choices {pv_choices} not {choices}") - enum_class = datatype - else: - enum_class = Enum("GeneratedChoices", {x: x for x in pv_choices}, type=str) # type: ignore - return PvaEnumConverter(enum_class) - elif "NTScalar" in typeid: - if datatype and not issubclass(typ, datatype): - raise TypeError(f"{pv} has type {typ.__name__} not {datatype.__name__}") - return PvaConverter() - elif "NTTable" in typeid: - return PvaTableConverter() - else: - raise TypeError(f"{pv}: Unsupported typeid {typeid}") - - -class PvaSignalBackend(SignalBackend[T]): - _ctxt: Optional[Context] = None - - def __init__(self, datatype: Optional[Type[T]], read_pv: str, write_pv: str): - self.datatype = datatype - self.read_pv = read_pv - self.write_pv = write_pv - self.initial_values: Dict[str, Any] = {} - self.converter: PvaConverter = DisconnectedPvaConverter() - self.source = f"pva://{self.read_pv}" - self.subscription: Optional[Subscription] = None - - @property - def ctxt(self) -> Context: - if PvaSignalBackend._ctxt is None: - PvaSignalBackend._ctxt = Context("pva", nt=False) - - @atexit.register - def _del_ctxt(): - # If we don't do this we get messages like this on close: - # Error in sys.excepthook: - # Original exception was: - PvaSignalBackend._ctxt = None - - return PvaSignalBackend._ctxt - - async def _store_initial_value(self, pv): - try: - self.initial_values[pv] = await self.ctxt.get(pv) - except CancelledError: - raise NotConnected(self.source) - - async def connect(self): - if self.read_pv != self.write_pv: - # Different, need to connect both - await wait_for_connection( - read_pv=self._store_initial_value(self.read_pv), - write_pv=self._store_initial_value(self.write_pv), - ) - else: - # The same, so only need to connect one - await self._store_initial_value(self.read_pv) - self.converter = make_converter(self.datatype, self.initial_values) - - async def put(self, value: Optional[T], wait=True, timeout=None): - if value is None: - write_value = self.initial_values[self.write_pv] - else: - write_value = self.converter.write_value(value) - coro = self.ctxt.put(self.write_pv, dict(value=write_value), wait=wait) - await asyncio.wait_for(coro, timeout) - - async def get_descriptor(self) -> Descriptor: - value = await self.ctxt.get(self.read_pv) - return self.converter.descriptor(self.source, value) - - async def get_reading(self) -> Reading: - value = await self.ctxt.get( - self.read_pv, request="field(value,alarm,timestamp)" - ) - return self.converter.reading(value) - - async def get_value(self) -> T: - value = await self.ctxt.get(self.read_pv, "field(value)") - return self.converter.value(value) - - def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None: - if callback: - assert ( - not self.subscription - ), "Cannot set a callback when one is already set" - - async def async_callback(v): - callback(self.converter.reading(v), self.converter.value(v)) - - self.subscription = self.ctxt.monitor( - self.read_pv, async_callback, request="field(value,alarm,timestamp)" - ) - else: - if self.subscription: - self.subscription.close() - self.subscription = None diff --git a/ophyd/v2/core.py b/ophyd/v2/core.py deleted file mode 100644 index 559c41d75..000000000 --- a/ophyd/v2/core.py +++ /dev/null @@ -1,1024 +0,0 @@ -"""Core Ophyd.v2 functionality like Device and Signal""" -from __future__ import annotations - -import asyncio -import functools -import inspect -import logging -import re -import sys -import time -from abc import abstractmethod -from collections import abc -from contextlib import suppress -from dataclasses import dataclass -from enum import Enum -from typing import ( - Any, - AsyncGenerator, - Awaitable, - Callable, - Coroutine, - Dict, - Generator, - Generic, - Iterable, - List, - Optional, - Sequence, - Set, - Tuple, - Type, - TypeVar, - Union, - cast, - get_origin, -) - -import numpy as np -from bluesky.protocols import ( - Configurable, - Descriptor, - Dtype, - HasName, - Movable, - Readable, - Reading, - Stageable, - Status, - Subscribable, -) -from bluesky.run_engine import call_in_bluesky_event_loop - -T = TypeVar("T") -Callback = Callable[[T], None] - - -class AsyncStatus(Status): - "Convert asyncio awaitable to bluesky Status interface" - - def __init__( - self, - awaitable: Awaitable, - watchers: Optional[List[Callable]] = None, - ): - if isinstance(awaitable, asyncio.Task): - self.task = awaitable - else: - self.task = asyncio.create_task(awaitable) # type: ignore - self.task.add_done_callback(self._run_callbacks) - self._callbacks = cast(List[Callback[Status]], []) - self._watchers = watchers - - def __await__(self): - return self.task.__await__() - - def add_callback(self, callback: Callback[Status]): - if self.done: - callback(self) - else: - self._callbacks.append(callback) - - def _run_callbacks(self, task: asyncio.Task): - if not task.cancelled(): - for callback in self._callbacks: - callback(self) - - # TODO: remove ignore and bump min version when bluesky v1.12.0 is released - def exception(self, timeout: Optional[float] = 0.0) -> Optional[BaseException]: # type: ignore - if timeout != 0.0: - raise Exception( - "cannot honour any timeout other than 0 in an asynchronous function" - ) - - if self.task.done(): - try: - return self.task.exception() - except asyncio.CancelledError as e: - return e - return None - - @property - def done(self) -> bool: - return self.task.done() - - @property - def success(self) -> bool: - return ( - self.task.done() and not self.task.cancelled() and not self.task.exception() - ) - - def watch(self, watcher: Callable): - """Add watcher to the list of interested parties. - - Arguments as per Bluesky :external+bluesky:meth:`watch` protocol. - """ - if self._watchers is not None: - self._watchers.append(watcher) - - @classmethod - def wrap(cls, f: Callable[[T], Coroutine]) -> Callable[[T], AsyncStatus]: - @functools.wraps(f) - def wrap_f(self) -> AsyncStatus: - return AsyncStatus(f(self)) - - return wrap_f - - def __repr__(self) -> str: - if self.done: - if self.exception() is not None: - status = "errored" - else: - status = "done" - else: - status = "pending" - return f"<{type(self).__name__} {status}>" - - __str__ = __repr__ - - -class Device(HasName): - """Common base class for all Ophyd.v2 Devices. - - By default, names and connects all Device children. - """ - - _name: str = "" - #: The parent Device if it exists - parent: Optional[Device] = None - - def __init__(self, name: str = "") -> None: - self.set_name(name) - - @property - def name(self) -> str: - """Return the name of the Device""" - return self._name - - def set_name(self, name: str): - """Set ``self.name=name`` and each ``self.child.name=name+"-child"``. - - Parameters - ---------- - name: - New name to set - """ - self._name = name - name_children(self, name) - - async def connect(self, sim: bool = False): - """Connect self and all child Devices. - - Parameters - ---------- - sim: - If True then connect in simulation mode. - """ - await connect_children(self, sim) - - -class NotConnected(Exception): - """Exception to be raised if a `Device.connect` is cancelled""" - - def __init__(self, *lines: str): - self.lines = list(lines) - - def __str__(self) -> str: - return "\n".join(self.lines) - - -async def wait_for_connection(**coros: Awaitable[None]): - """Call many underlying signals, accumulating `NotConnected` exceptions - - Raises - ------ - `NotConnected` if cancelled - """ - ts = {k: asyncio.create_task(c) for (k, c) in coros.items()} # type: ignore - try: - done, pending = await asyncio.wait(ts.values()) - except asyncio.CancelledError: - for t in ts.values(): - t.cancel() - lines: List[str] = [] - for k, t in ts.items(): - try: - await t - except NotConnected as e: - if len(e.lines) == 1: - lines.append(f"{k}: {e.lines[0]}") - else: - lines.append(f"{k}:") - lines += [f" {line}" for line in e.lines] - raise NotConnected(*lines) - else: - # Wait for everything to foreground the exceptions - for f in list(done) + list(pending): - await f - - -async def connect_children(device: Device, sim: bool): - """Call ``child.connect(sim)`` on all child devices in parallel. - - Typically used to implement `Device.connect` like this:: - - async def connect(self, sim=False): - await connect_children(self, sim) - """ - - coros = { - name: child_device.connect(sim) - for name, child_device in get_device_children(device) - } - if coros: - await wait_for_connection(**coros) - - -def name_children(device: Device, name: str): - """Call ``child.set_name(child_name)`` on all child devices in series.""" - for attr_name, child in get_device_children(device): - child_name = f"{name}-{attr_name.rstrip('_')}" if name else "" - child.set_name(child_name) - child.parent = device - - -def get_device_children(device: Device) -> Generator[Tuple[str, Device], None, None]: - for attr_name, attr in device.__dict__.items(): - if attr_name != "parent" and isinstance(attr, Device): - yield attr_name, attr - - -class DeviceCollector: - """Collector of top level Device instances to be used as a context manager - - Parameters - ---------- - set_name: - If True, call ``device.set_name(variable_name)`` on all collected - Devices - connect: - If True, call ``device.connect(sim)`` in parallel on all - collected Devices - sim: - If True, connect Signals in simulation mode - timeout: - How long to wait for connect before logging an exception - - Notes - ----- - Example usage:: - - [async] with DeviceCollector(): - t1x = motor.Motor("BLxxI-MO-TABLE-01:X") - t1y = motor.Motor("pva://BLxxI-MO-TABLE-01:Y") - # Names and connects devices here - assert t1x.comm.velocity.source - assert t1x.name == "t1x" - - """ - - def __init__( - self, - set_name=True, - connect=True, - sim=False, - timeout: float = 10.0, - ): - self._set_name = set_name - self._connect = connect - self._sim = sim - self._timeout = timeout - self._names_on_enter: Set[str] = set() - self._objects_on_exit: Dict[str, Any] = {} - - def _caller_locals(self): - """Walk up until we find a stack frame that doesn't have us as self""" - try: - raise ValueError - except ValueError: - _, _, tb = sys.exc_info() - assert tb, "Can't get traceback, this shouldn't happen" - caller_frame = tb.tb_frame - while caller_frame.f_locals.get("self", None) is self: - caller_frame = caller_frame.f_back - return caller_frame.f_locals - - def __enter__(self) -> DeviceCollector: - # Stash the names that were defined before we were called - self._names_on_enter = set(self._caller_locals()) - return self - - async def __aenter__(self) -> DeviceCollector: - return self.__enter__() - - async def _on_exit(self) -> None: - # Name and kick off connect for devices - tasks: Dict[asyncio.Task, str] = {} - for name, obj in self._objects_on_exit.items(): - if name not in self._names_on_enter and isinstance(obj, Device): - if self._set_name and not obj.name: - obj.set_name(name) - if self._connect: - task = asyncio.create_task(obj.connect(self._sim)) - tasks[task] = name - # Wait for all the signals to have finished - if tasks: - await self._wait_for_tasks(tasks) - - async def _wait_for_tasks(self, tasks: Dict[asyncio.Task, str]): - done, pending = await asyncio.wait(tasks, timeout=self._timeout) - if pending: - msg = f"{len(pending)} Devices did not connect:" - for t in pending: - t.cancel() - with suppress(Exception): - await t - e = t.exception() - msg += f"\n {tasks[t]}: {type(e).__name__}" - lines = str(e).splitlines() - if len(lines) <= 1: - msg += f": {e}" - else: - msg += "".join(f"\n {line}" for line in lines) - logging.error(msg) - raised = [t for t in done if t.exception()] - if raised: - logging.error(f"{len(raised)} Devices raised an error:") - for t in raised: - logging.exception(f" {tasks[t]}:", exc_info=t.exception()) - if pending or raised: - raise NotConnected("Not all Devices connected") - - async def __aexit__(self, type, value, traceback): - self._objects_on_exit = self._caller_locals() - await self._on_exit() - - def __exit__(self, type_, value, traceback): - self._objects_on_exit = self._caller_locals() - return call_in_bluesky_event_loop(self._on_exit()) - - -#: A function that will be called with the Reading and value when the -#: monitor updates -ReadingValueCallback = Callable[[Reading, T], None] - - -class SignalBackend(Generic[T]): - """A read/write/monitor backend for a Signals""" - - #: Datatype of the signal value - datatype: Optional[Type[T]] = None - - #: Like ca://PV_PREFIX:SIGNAL - source: str = "" - - @abstractmethod - async def connect(self): - """Connect to underlying hardware""" - - @abstractmethod - async def put(self, value: Optional[T], wait=True, timeout=None): - """Put a value to the PV, if wait then wait for completion for up to timeout""" - - @abstractmethod - async def get_descriptor(self) -> Descriptor: - """Metadata like source, dtype, shape, precision, units""" - - @abstractmethod - async def get_reading(self) -> Reading: - """The current value, timestamp and severity""" - - @abstractmethod - async def get_value(self) -> T: - """The current value""" - - @abstractmethod - def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None: - """Observe changes to the current value, timestamp and severity""" - - -_sim_backends: Dict[Signal, SimSignalBackend] = {} - - -primitive_dtypes: Dict[type, Dtype] = { - str: "string", - int: "integer", - float: "number", - bool: "boolean", -} - - -class SimConverter(Generic[T]): - def value(self, value: T) -> T: - return value - - def write_value(self, value: T) -> T: - return value - - def reading(self, value: T, timestamp: float, severity: int) -> Reading: - return Reading( - value=value, - timestamp=timestamp, - alarm_severity=-1 if severity > 2 else severity, - ) - - def descriptor(self, source: str, value) -> Descriptor: - assert ( - type(value) in primitive_dtypes - ), f"invalid converter for value of type {type(value)}" - dtype = primitive_dtypes[type(value)] - return dict(source=source, dtype=dtype, shape=[]) - - def make_initial_value(self, datatype: Optional[Type[T]]) -> T: - if datatype is None: - return cast(T, None) - - return datatype() - - -class SimArrayConverter(SimConverter): - def descriptor(self, source: str, value) -> Descriptor: - return dict(source=source, dtype="array", shape=[len(value)]) - - def make_initial_value(self, datatype: Optional[Type[T]]) -> T: - if datatype is None: - return cast(T, None) - - if get_origin(datatype) == abc.Sequence: - return cast(T, []) - - return cast(T, datatype(shape=0)) # type: ignore - - -@dataclass -class SimEnumConverter(SimConverter): - enum_class: Type[Enum] - - def write_value(self, value: Union[Enum, str]) -> Enum: - if isinstance(value, Enum): - return value - else: - return self.enum_class(value) - - def descriptor(self, source: str, value) -> Descriptor: - choices = [e.value for e in self.enum_class] - return dict(source=source, dtype="string", shape=[], choices=choices) # type: ignore - - def make_initial_value(self, datatype: Optional[Type[T]]) -> T: - if datatype is None: - return cast(T, None) - - return cast(T, list(datatype.__members__.values())[0]) # type: ignore - - -class DisconnectedSimConverter(SimConverter): - def __getattribute__(self, __name: str) -> Any: - raise NotImplementedError("No PV has been set as connect() has not been called") - - -def make_converter(datatype): - is_array = get_dtype(datatype) is not None - is_sequence = get_origin(datatype) == abc.Sequence - is_enum = issubclass(datatype, Enum) if inspect.isclass(datatype) else False - - if is_array or is_sequence: - return SimArrayConverter() - if is_enum: - return SimEnumConverter(datatype) - - return SimConverter() - - -class SimSignalBackend(SignalBackend[T]): - """An simulated backend to a Signal, created with ``Signal.connect(sim=True)``""" - - _value: T - _initial_value: T - _timestamp: float - _severity: int - - def __init__(self, datatype: Optional[Type[T]], source: str) -> None: - pv = re.split(r"://", source)[-1] - self.source = f"sim://{pv}" - self.datatype = datatype - self.pv = source - self.converter: SimConverter = DisconnectedSimConverter() - self.put_proceeds = asyncio.Event() - self.put_proceeds.set() - self.callback: Optional[ReadingValueCallback[T]] = None - - async def connect(self) -> None: - self.converter = make_converter(self.datatype) - self._initial_value = self.converter.make_initial_value(self.datatype) - self._severity = 0 - - await self.put(None) - - async def put(self, value: Optional[T], wait=True, timeout=None): - write_value = ( - self.converter.write_value(value) - if value is not None - else self._initial_value - ) - self._set_value(write_value) - - if wait: - await asyncio.wait_for(self.put_proceeds.wait(), timeout) - - def _set_value(self, value: T): - """Method to bypass asynchronous logic, designed to only be used in tests.""" - self._value = value - self._timestamp = time.monotonic() - reading: Reading = self.converter.reading( - self._value, self._timestamp, self._severity - ) - - if self.callback: - self.callback(reading, self._value) - - async def get_descriptor(self) -> Descriptor: - return self.converter.descriptor(self.source, self._value) - - async def get_reading(self) -> Reading: - return self.converter.reading(self._value, self._timestamp, self._severity) - - async def get_value(self) -> T: - return self.converter.value(self._value) - - def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None: - if callback: - assert not self.callback, "Cannot set a callback when one is already set" - reading: Reading = self.converter.reading( - self._value, self._timestamp, self._severity - ) - callback(reading, self._value) - self.callback = callback - - -def set_sim_value(signal: Signal[T], value: T): - """Set the value of a signal that is in sim mode.""" - _sim_backends[signal]._set_value(value) - - -def set_sim_put_proceeds(signal: Signal[T], proceeds: bool): - """Allow or block a put with wait=True from proceeding""" - event = _sim_backends[signal].put_proceeds - if proceeds: - event.set() - else: - event.clear() - - -def set_sim_callback(signal: Signal[T], callback: ReadingValueCallback[T]) -> None: - """Monitor the value of a signal that is in sim mode""" - return _sim_backends[signal].set_callback(callback) - - -def _fail(self, other, *args, **kwargs): - if isinstance(other, Signal): - raise TypeError( - "Can't compare two Signals, did you mean await signal.get_value() instead?" - ) - else: - return NotImplemented - - -# Types -# - bool -# - int -# - float -# - str -# - Enum[str] -# - npt.NDArray[np.bool_ | np.uint[8,16,32,64] | np.int[8,16,32,64] | np.float[32,64] -# - Sequence[str | Enum] -# - Table (TypedDict of Sequence or NDArray above), exploded in reading - -DEFAULT_TIMEOUT = 10.0 - - -class Signal(Device, Generic[T]): - """A Device with the concept of a value, with R, RW, W and X flavours""" - - def __init__( - self, backend: SignalBackend[T], timeout: Optional[float] = DEFAULT_TIMEOUT - ) -> None: - self._name = "" - self._timeout = timeout - self._init_backend = self._backend = backend - - @property - def name(self) -> str: - return self._name - - def set_name(self, name: str = ""): - self._name = name - - async def connect(self, sim=False): - if sim: - self._backend = SimSignalBackend( - datatype=self._init_backend.datatype, source=self._init_backend.source - ) - _sim_backends[self] = self._backend - else: - self._backend = self._init_backend - _sim_backends.pop(self, None) - await self._backend.connect() - - @property - def source(self) -> str: - """Like ca://PV_PREFIX:SIGNAL, or "" if not set""" - return self._backend.source - - __lt__ = __le__ = __eq__ = __ge__ = __gt__ = __ne__ = _fail - - def __hash__(self): - # Restore the default implementation so we can use in a set or dict - return hash(id(self)) - - -class _SignalCache(Generic[T]): - def __init__(self, backend: SignalBackend[T], signal: Signal): - self._signal = signal - self._staged = False - self._listeners: Dict[Callback, bool] = {} - self._valid = asyncio.Event() - self._reading: Optional[Reading] = None - self._value: Optional[T] = None - - self.backend = backend - backend.set_callback(self._callback) - - def close(self): - self.backend.set_callback(None) - - async def get_reading(self) -> Reading: - await self._valid.wait() - assert self._reading is not None, "Monitor not working" - return self._reading - - async def get_value(self) -> T: - await self._valid.wait() - assert self._value is not None, "Monitor not working" - return self._value - - def _callback(self, reading: Reading, value: T): - self._reading = reading - self._value = value - self._valid.set() - for function, want_value in self._listeners.items(): - self._notify(function, want_value) - - def _notify(self, function: Callback, want_value: bool): - if want_value: - function(self._value) - else: - function({self._signal.name: self._reading}) - - def subscribe(self, function: Callback, want_value: bool) -> None: - self._listeners[function] = want_value - if self._valid.is_set(): - self._notify(function, want_value) - - def unsubscribe(self, function: Callback) -> bool: - self._listeners.pop(function) - return self._staged or bool(self._listeners) - - def set_staged(self, staged: bool): - self._staged = staged - return self._staged or bool(self._listeners) - - -def _add_timeout(func): - @functools.wraps(func) - async def wrapper(self: Signal, *args, **kwargs): - return await asyncio.wait_for(func(self, *args, **kwargs), self._timeout) - - return wrapper - - -class SignalR(Signal[T], Readable, Stageable, Subscribable): - """Signal that can be read from and monitored""" - - _cache: Optional[_SignalCache] = None - - def _backend_or_cache( - self, cached: Optional[bool] - ) -> Union[_SignalCache, SignalBackend]: - # If cached is None then calculate it based on whether we already have a cache - if cached is None: - cached = self._cache is not None - if cached: - assert self._cache, f"{self.source} not being monitored" - return self._cache - else: - return self._backend - - def _get_cache(self) -> _SignalCache: - if not self._cache: - self._cache = _SignalCache(self._backend, self) - return self._cache - - def _del_cache(self, needed: bool): - if self._cache and not needed: - self._cache.close() - self._cache = None - - @_add_timeout - async def read(self, cached: Optional[bool] = None) -> Dict[str, Reading]: - """Return a single item dict with the reading in it""" - return {self.name: await self._backend_or_cache(cached).get_reading()} - - @_add_timeout - async def describe(self) -> Dict[str, Descriptor]: - """Return a single item dict with the descriptor in it""" - return {self.name: await self._backend.get_descriptor()} - - @_add_timeout - async def get_value(self, cached: Optional[bool] = None) -> T: - """The current value""" - return await self._backend_or_cache(cached).get_value() - - def subscribe_value(self, function: Callback[T]): - """Subscribe to updates in value of a device""" - self._get_cache().subscribe(function, want_value=True) - - def subscribe(self, function: Callback[Dict[str, Reading]]) -> None: - """Subscribe to updates in the reading""" - self._get_cache().subscribe(function, want_value=False) - - def clear_sub(self, function: Callback) -> None: - """Remove a subscription.""" - self._del_cache(self._get_cache().unsubscribe(function)) - - @AsyncStatus.wrap - async def stage(self) -> None: - """Start caching this signal""" - self._get_cache().set_staged(True) - - @AsyncStatus.wrap - async def unstage(self) -> None: - """Stop caching this signal""" - self._del_cache(self._get_cache().set_staged(False)) - - -class SignalW(Signal[T], Movable): - """Signal that can be set""" - - def set(self, value: T, wait=True, timeout=None) -> AsyncStatus: - """Set the value and return a status saying when it's done""" - coro = self._backend.put(value, wait=wait, timeout=timeout or self._timeout) - return AsyncStatus(coro) - - -class SignalRW(SignalR[T], SignalW[T]): - """Signal that can be both read and set""" - - -class SignalX(Signal): - """Signal that puts the default value""" - - async def execute(self, wait=True, timeout=None): - """Execute the action and return a status saying when it's done""" - await self._backend.put(None, wait=wait, timeout=timeout or self._timeout) - - -async def observe_value(signal: SignalR[T]) -> AsyncGenerator[T, None]: - """Subscribe to the value of a signal so it can be iterated from. - - Parameters - ---------- - signal: - Call subscribe_value on this at the start, and clear_sub on it at the - end - - Notes - ----- - Example usage:: - - async for value in observe_value(sig): - do_something_with(value) - """ - q: asyncio.Queue[T] = asyncio.Queue() - signal.subscribe_value(q.put_nowait) - try: - while True: - yield await q.get() - finally: - signal.clear_sub(q.put_nowait) - - -class _ValueChecker(Generic[T]): - def __init__(self, matcher: Callable[[T], bool], matcher_name: str): - self._last_value: Optional[T] - self._matcher = matcher - self._matcher_name = matcher_name - - async def _wait_for_value(self, signal: SignalR[T]): - async for value in observe_value(signal): - self._last_value = value - if self._matcher(value): - return - - async def wait_for_value(self, signal: SignalR[T], timeout: float): - try: - await asyncio.wait_for(self._wait_for_value(signal), timeout) - except asyncio.TimeoutError as e: - raise TimeoutError( - f"{signal.name} didn't match {self._matcher_name} in {timeout}s, " - f"last value {self._last_value!r}" - ) from e - - -async def wait_for_value( - signal: SignalR[T], match: Union[T, Callable[[T], bool]], timeout: float -): - """Wait for a signal to have a matching value. - - Parameters - ---------- - signal: - Call subscribe_value on this at the start, and clear_sub on it at the - end - match: - If a callable, it should return True if the value matches. If not - callable then value will be checked for equality with match. - timeout: - How long to wait for the value to match - - Notes - ----- - Example usage:: - - wait_for_value(device.acquiring, 1, timeout=1) - - Or:: - - wait_for_value(device.num_captured, lambda v: v > 45, timeout=1) - """ - if callable(match): - checker = _ValueChecker(match, match.__name__) - else: - checker = _ValueChecker(lambda v: v == match, repr(match)) - await checker.wait_for_value(signal, timeout) - - -async def set_and_wait_for_value( - signal: SignalRW[T], - value: T, - timeout: float = DEFAULT_TIMEOUT, - status_timeout: Optional[float] = None, -) -> AsyncStatus: - """Set a signal and monitor it until it has that value. - - Useful for busy record, or other Signals with pattern: - - - Set Signal with wait=True and stash the Status - - Read the same Signal to check the operation has started - - Return the Status so calling code can wait for operation to complete - - Parameters - ---------- - signal: - The signal to set and monitor - value: - The value to set it to - timeout: - How long to wait for the signal to have the value - status_timeout: - How long the returned Status will wait for the set to complete - - Notes - ----- - Example usage:: - - set_and_wait_for_value(device.acquire, 1) - """ - status = signal.set(value, timeout=status_timeout) - await wait_for_value(signal, value, timeout=timeout) - return status - - -async def merge_gathered_dicts( - coros: Iterable[Awaitable[Dict[str, T]]] -) -> Dict[str, T]: - """Merge dictionaries produced by a sequence of coroutines. - - Can be used for merging ``read()`` or ``describe``. For instance:: - - combined_read = await merge_gathered_dicts(s.read() for s in signals) - """ - ret: Dict[str, T] = {} - for result in await asyncio.gather(*coros): - ret.update(result) - return ret - - -class StandardReadable(Device, Readable, Configurable, Stageable): - """Device that owns its children and provides useful default behavior. - - - When its name is set it renames child Devices - - Signals can be registered for read() and read_configuration() - - These signals will be subscribed for read() between stage() and unstage() - """ - - _read_signals: Tuple[SignalR, ...] = () - _configuration_signals: Tuple[SignalR, ...] = () - _read_uncached_signals: Tuple[SignalR, ...] = () - - def set_readable_signals( - self, - read: Sequence[SignalR] = (), - config: Sequence[SignalR] = (), - read_uncached: Sequence[SignalR] = (), - ): - """ - Parameters - ---------- - read: - Signals to make up `read()` - conf: - Signals to make up `read_configuration()` - read_uncached: - Signals to make up `read()` that won't be cached - """ - self._read_signals = tuple(read) - self._configuration_signals = tuple(config) - self._read_uncached_signals = tuple(read_uncached) - - @AsyncStatus.wrap - async def stage(self) -> None: - for sig in self._read_signals + self._configuration_signals: - await sig.stage().task - - @AsyncStatus.wrap - async def unstage(self) -> None: - for sig in self._read_signals + self._configuration_signals: - await sig.unstage().task - - async def describe_configuration(self) -> Dict[str, Descriptor]: - return await merge_gathered_dicts( - [sig.describe() for sig in self._configuration_signals] - ) - - async def read_configuration(self) -> Dict[str, Reading]: - return await merge_gathered_dicts( - [sig.read() for sig in self._configuration_signals] - ) - - async def describe(self) -> Dict[str, Descriptor]: - return await merge_gathered_dicts( - [sig.describe() for sig in self._read_signals + self._read_uncached_signals] - ) - - async def read(self) -> Dict[str, Reading]: - return await merge_gathered_dicts( - [sig.read() for sig in self._read_signals] - + [sig.read(cached=False) for sig in self._read_uncached_signals] - ) - - -VT = TypeVar("VT", bound=Device) - - -class DeviceVector(Dict[int, VT], Device): - def set_name(self, parent_name: str): - self._name = parent_name - for name, device in self.items(): - device.set_name(f"{parent_name}-{name}") - device.parent = self - - async def connect(self, sim: bool = False): - coros = {str(k): d.connect(sim) for k, d in self.items()} - await wait_for_connection(**coros) - - -def get_unique(values: Dict[str, T], types: str) -> T: - """If all values are the same, return that value, otherwise return TypeError - - >>> get_unique({"a": 1, "b": 1}, "integers") - 1 - >>> get_unique({"a": 1, "b": 2}, "integers") - Traceback (most recent call last): - ... - TypeError: Differing integers: a has 1, b has 2 - """ - set_values = set(values.values()) - if len(set_values) != 1: - diffs = ", ".join(f"{k} has {v}" for k, v in values.items()) - raise TypeError(f"Differing {types}: {diffs}") - return set_values.pop() - - -def get_dtype(typ: Type) -> Optional[np.dtype]: - """Get the runtime dtype from a numpy ndarray type annotation - - >>> import numpy.typing as npt - >>> import numpy as np - >>> get_dtype(npt.NDArray[np.int8]) - dtype('int8') - """ - if getattr(typ, "__origin__", None) == np.ndarray: - # datatype = numpy.ndarray[typing.Any, numpy.dtype[numpy.float64]] - # so extract numpy.float64 from it - return np.dtype(typ.__args__[1].__args__[0]) # type: ignore - return None diff --git a/ophyd/v2/epics.py b/ophyd/v2/epics.py deleted file mode 100644 index f2839012e..000000000 --- a/ophyd/v2/epics.py +++ /dev/null @@ -1,116 +0,0 @@ -"""EPICS Signals over CA or PVA""" - -from __future__ import annotations - -from enum import Enum -from typing import Optional, Tuple, Type - -from .core import SignalBackend, SignalR, SignalRW, SignalW, SignalX, T, get_unique - -try: - from ._aioca import CaSignalBackend -except ImportError as ca_error: - - class CaSignalBackend: # type: ignore - def __init__(*args, ca_error=ca_error, **kwargs): - raise NotImplementedError("CA support not available") from ca_error - - -try: - from ._p4p import PvaSignalBackend -except ImportError as pva_error: - - class PvaSignalBackend: # type: ignore - def __init__(*args, pva_error=pva_error, **kwargs): - raise NotImplementedError("PVA support not available") from pva_error - - -class EpicsTransport(Enum): - """The sorts of transport EPICS support""" - - #: Use Channel Access (using aioca library) - ca = CaSignalBackend - #: Use PVAccess (using p4p library) - pva = PvaSignalBackend - - -_default_epics_transport = EpicsTransport.ca - - -def _transport_pv(pv: str) -> Tuple[EpicsTransport, str]: - split = pv.split("://", 1) - if len(split) > 1: - # We got something like pva://mydevice, so use specified comms mode - transport_str, pv = split - transport = EpicsTransport[transport_str] - else: - # No comms mode specified, use the default - transport = _default_epics_transport - return transport, pv - - -def _make_backend( - datatype: Optional[Type[T]], read_pv: str, write_pv: str -) -> SignalBackend[T]: - r_transport, r_pv = _transport_pv(read_pv) - w_transport, w_pv = _transport_pv(write_pv) - transport = get_unique({read_pv: r_transport, write_pv: w_transport}, "transports") - return transport.value(datatype, r_pv, w_pv) - - -def epics_signal_rw( - datatype: Type[T], read_pv: str, write_pv: Optional[str] = None -) -> SignalRW[T]: - """Create a `SignalRW` backed by 1 or 2 EPICS PVs - - Parameters - ---------- - datatype: - Check that the PV is of this type - read_pv: - The PV to read and monitor - write_pv: - If given, use this PV to write to, otherwise use read_pv - """ - backend = _make_backend(datatype, read_pv, write_pv or read_pv) - return SignalRW(backend) - - -def epics_signal_r(datatype: Type[T], read_pv: str) -> SignalR[T]: - """Create a `SignalR` backed by 1 EPICS PV - - Parameters - ---------- - datatype: - Check that the PV is of this type - read_pv: - The PV to read and monitor - """ - backend = _make_backend(datatype, read_pv, read_pv) - return SignalR(backend) - - -def epics_signal_w(datatype: Type[T], write_pv: str) -> SignalW[T]: - """Create a `SignalW` backed by 1 EPICS PVs - - Parameters - ---------- - datatype: - Check that the PV is of this type - write_pv: - The PV to write to - """ - backend = _make_backend(datatype, write_pv, write_pv) - return SignalW(backend) - - -def epics_signal_x(write_pv: str) -> SignalX: - """Create a `SignalX` backed by 1 EPICS PVs - - Parameters - ---------- - write_pv: - The PV to write its initial value to on execute - """ - backend: SignalBackend = _make_backend(None, write_pv, write_pv) - return SignalX(backend) diff --git a/ophyd/v2/epicsdemo/__init__.py b/ophyd/v2/epicsdemo/__init__.py deleted file mode 100644 index e97a963d1..000000000 --- a/ophyd/v2/epicsdemo/__init__.py +++ /dev/null @@ -1,149 +0,0 @@ -"""Demo EPICS Devices for the tutorial""" - -import asyncio -import time -from enum import Enum -from typing import Callable, List, Optional - -import numpy as np -from bluesky.protocols import Movable, Stoppable - -from ophyd.v2.core import AsyncStatus, Device, StandardReadable, observe_value -from ophyd.v2.epics import epics_signal_r, epics_signal_rw, epics_signal_x - - -class EnergyMode(Enum): - """Energy mode for `Sensor`""" - - #: Low energy mode - low = "Low Energy" - #: High energy mode - high = "High Energy" - - -class Sensor(StandardReadable): - """A demo sensor that produces a scalar value based on X and Y Movers""" - - def __init__(self, prefix: str, name="") -> None: - # Define some signals - self.value = epics_signal_r(float, prefix + "Value") - self.mode = epics_signal_rw(EnergyMode, prefix + "Mode") - # Set name and signals for read() and read_configuration() - self.set_readable_signals( - read=[self.value], - config=[self.mode], - ) - super().__init__(name=name) - - -class Mover(StandardReadable, Movable, Stoppable): - """A demo movable that moves based on velocity""" - - def __init__(self, prefix: str, name="") -> None: - # Define some signals - self.setpoint = epics_signal_rw(float, prefix + "Setpoint") - self.readback = epics_signal_r(float, prefix + "Readback") - self.velocity = epics_signal_rw(float, prefix + "Velocity") - self.units = epics_signal_r(str, prefix + "Readback.EGU") - self.precision = epics_signal_r(int, prefix + "Readback.PREC") - # Signals that collide with standard methods should have a trailing underscore - self.stop_ = epics_signal_x(prefix + "Stop.PROC") - # Whether set() should complete successfully or not - self._set_success = True - # Set name and signals for read() and read_configuration() - self.set_readable_signals( - read=[self.readback], - config=[self.velocity, self.units], - ) - super().__init__(name=name) - - def set_name(self, name: str): - super().set_name(name) - # Readback should be named the same as its parent in read() - self.readback.set_name(name) - - async def _move(self, new_position: float, watchers: List[Callable] = []): - self._set_success = True - # time.monotonic won't go backwards in case of NTP corrections - start = time.monotonic() - old_position, units, precision = await asyncio.gather( - self.setpoint.get_value(), - self.units.get_value(), - self.precision.get_value(), - ) - # Wait for the value to set, but don't wait for put completion callback - await self.setpoint.set(new_position, wait=False) - async for current_position in observe_value(self.readback): - for watcher in watchers: - watcher( - name=self.name, - current=current_position, - initial=old_position, - target=new_position, - unit=units, - precision=precision, - time_elapsed=time.monotonic() - start, - ) - if np.isclose(current_position, new_position): - break - if not self._set_success: - raise RuntimeError("Motor was stopped") - - def move(self, new_position: float, timeout: Optional[float] = None): - """Commandline only synchronous move of a Motor""" - from bluesky.run_engine import call_in_bluesky_event_loop, in_bluesky_event_loop - - if in_bluesky_event_loop(): - raise RuntimeError("Will deadlock run engine if run in a plan") - call_in_bluesky_event_loop(self._move(new_position), timeout) # type: ignore - - # TODO: this fails if we call from the cli, but works if we "ipython await" it - def set(self, new_position: float, timeout: Optional[float] = None) -> AsyncStatus: - watchers: List[Callable] = [] - coro = asyncio.wait_for(self._move(new_position, watchers), timeout=timeout) - return AsyncStatus(coro, watchers) - - async def stop(self, success=True): - self._set_success = success - await self.stop_.execute() - - -class SampleStage(Device): - """A demo sample stage with X and Y movables""" - - def __init__(self, prefix: str, name="") -> None: - # Define some child Devices - self.x = Mover(prefix + "X:") - self.y = Mover(prefix + "Y:") - # Set name of device and child devices - super().__init__(name=name) - - -def start_ioc_subprocess() -> str: - """Start an IOC subprocess with EPICS database for sample stage and sensor - with the same pv prefix - """ - import atexit - import random - import string - import subprocess - import sys - from pathlib import Path - - pv_prefix = "".join(random.choice(string.ascii_uppercase) for _ in range(12)) + ":" - here = Path(__file__).absolute().parent - args = [sys.executable, "-m", "epicscorelibs.ioc"] - args += ["-m", f"P={pv_prefix}"] - args += ["-d", str(here / "sensor.db")] - for suff in "XY": - args += ["-m", f"P={pv_prefix}{suff}:"] - args += ["-d", str(here / "mover.db")] - process = subprocess.Popen( - args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - ) - atexit.register(process.communicate, "exit") - return pv_prefix diff --git a/ophyd/v2/epicsdemo/mover.db b/ophyd/v2/epicsdemo/mover.db deleted file mode 100644 index 4707cbba8..000000000 --- a/ophyd/v2/epicsdemo/mover.db +++ /dev/null @@ -1,40 +0,0 @@ -record(ao, "$(P)Setpoint") { - field(DESC, "Target value to move to") - field(PREC, "$(PREC=3)") - field(PINI, "YES") - field(EGU, "$(EGU=mm)") -} - -record(ao, "$(P)Velocity") { - field(DESC, "Velocity to move at") - field(PREC, "$(PREC=3)") - field(PINI, "YES") - field(EGU, "$(EGU=mm)/s") - field(VAL, "$(VELO=100)") -} - -record(calc, "$(P)VelocityDiv") { - field(DESC, "Velocity per .1s") - field(INPA, "$(P)Velocity CP") - field(CALC, "A/10") - field(PINI, "YES") - field(EGU, "$(EGU=mm)/s") -} - -record(calc, "$(P)Readback") { - field(DESC, "Target value to move to") - field(INPA, "$(P)Setpoint") - field(INPB, "$(P)Readback") - field(INPC, "$(P)VelocityDiv") - field(CALC, "ABS(A-B)B?B+C:B-C") - field(SCAN, ".1 second") - field(EGU, "$(EGU=mm)") - field(PREC, "$(PREC=3)") -} - -record(calcout, "$(P)Stop") { - field(DESC, "Process this to stop") - field(INPA, "$(P)Readback") - field(CALC, "A") - field(OUT, "$(P)Setpoint PP") -} diff --git a/ophyd/v2/epicsdemo/sensor.db b/ophyd/v2/epicsdemo/sensor.db deleted file mode 100644 index 9912bb3ca..000000000 --- a/ophyd/v2/epicsdemo/sensor.db +++ /dev/null @@ -1,20 +0,0 @@ -record(mbbo, "$(P)Mode") { - field(DESC, "Energy sensitivity of the image") - field(DTYP, "Raw Soft Channel") - field(PINI, "YES") - field(ZRVL, "10") - field(ZRST, "Low Energy") - field(ONVL, "100") - field(ONST, "High Energy") -} - -record(calc, "$(P)Value") { - field(DESC, "Sensor value simulated from X and Y") - field(INPA, "$(P)X:Readback CP") - field(INPB, "$(P)Y:Readback CP") - field(INPC, "$(P)Mode.RVAL CP") - field(CALC, "SIN(A)**10+COS(C+B*A)*COS(A)") - field(EGU, "$(EGU=cts/s)") - field(PREC, "$(PREC=3)") -} - diff --git a/ophyd/v2/tests/test_core.py b/ophyd/v2/tests/test_core.py deleted file mode 100644 index aa2566151..000000000 --- a/ophyd/v2/tests/test_core.py +++ /dev/null @@ -1,480 +0,0 @@ -import asyncio -import re -import time -import traceback -from enum import Enum -from typing import Any, Callable, Sequence, Tuple, Type -from unittest.mock import Mock - -import bluesky.plan_stubs as bps -import numpy as np -import numpy.typing as npt -import pytest -from bluesky import FailedStatus, RunEngine -from bluesky.protocols import Movable, Reading, Status - -from ophyd.v2.core import ( - AsyncStatus, - Device, - DeviceCollector, - DeviceVector, - Signal, - SignalBackend, - SignalRW, - SimSignalBackend, - T, - get_device_children, - set_and_wait_for_value, - set_sim_put_proceeds, - set_sim_value, - wait_for_connection, - wait_for_value, -) - - -class MySignal(Signal): - @property - def source(self) -> str: - return "me" - - async def connect(self, sim=False): - pass - - -def test_signals_equality_raises(): - sim_backend = SimSignalBackend(str, "test") - - s1 = MySignal(sim_backend) - s2 = MySignal(sim_backend) - with pytest.raises( - TypeError, - match=re.escape( - "Can't compare two Signals, did you mean await signal.get_value() instead?" - ), - ): - s1 == s2 - with pytest.raises( - TypeError, - match=re.escape("'>' not supported between instances of 'MySignal' and 'int'"), - ): - s1 > 4 - - -class MyEnum(str, Enum): - a = "Aaa" - b = "Bbb" - c = "Ccc" - - -def integer_d(value): - return dict(dtype="integer", shape=[]) - - -def number_d(value): - return dict(dtype="number", shape=[]) - - -def string_d(value): - return dict(dtype="string", shape=[]) - - -def enum_d(value): - return dict(dtype="string", shape=[], choices=["Aaa", "Bbb", "Ccc"]) - - -def waveform_d(value): - return dict(dtype="array", shape=[len(value)]) - - -class MonitorQueue: - def __init__(self, backend: SignalBackend): - self.backend = backend - self.updates: asyncio.Queue[Tuple[Reading, Any]] = asyncio.Queue() - backend.set_callback(self.add_reading_value) - - def add_reading_value(self, reading: Reading, value): - self.updates.put_nowait((reading, value)) - - async def assert_updates(self, expected_value): - expected_reading = { - "value": expected_value, - "timestamp": pytest.approx(time.monotonic(), rel=0.1), - "alarm_severity": 0, - } - reading, value = await self.updates.get() - - backend_value = await self.backend.get_value() - backend_reading = await self.backend.get_reading() - - assert value == expected_value == backend_value - assert reading == expected_reading == backend_reading - - def close(self): - self.backend.set_callback(None) - - -@pytest.mark.parametrize( - "datatype, initial_value, put_value, descriptor", - [ - (int, 0, 43, integer_d), - (float, 0.0, 43.5, number_d), - (str, "", "goodbye", string_d), - (MyEnum, MyEnum.a, MyEnum.c, enum_d), - (npt.NDArray[np.int8], [], [-8, 3, 44], waveform_d), - (npt.NDArray[np.uint8], [], [218], waveform_d), - (npt.NDArray[np.int16], [], [-855], waveform_d), - (npt.NDArray[np.uint16], [], [5666], waveform_d), - (npt.NDArray[np.int32], [], [-2], waveform_d), - (npt.NDArray[np.uint32], [], [1022233], waveform_d), - (npt.NDArray[np.int64], [], [-3], waveform_d), - (npt.NDArray[np.uint64], [], [995444], waveform_d), - (npt.NDArray[np.float32], [], [1.0], waveform_d), - (npt.NDArray[np.float64], [], [0.2], waveform_d), - (Sequence[str], [], ["nine", "ten"], waveform_d), - # Can't do long strings until https://github.com/epics-base/pva2pva/issues/17 - # (str, "longstr", ls1, ls2, string_d), - # (str, "longstr2.VAL$", ls1, ls2, string_d), - ], -) -async def test_backend_get_put_monitor( - datatype: Type[T], - initial_value: T, - put_value: T, - descriptor: Callable[[Any], dict], -): - backend = SimSignalBackend(datatype, "") - - await backend.connect() - q = MonitorQueue(backend) - try: - # Check descriptor - assert ( - dict(source="sim://", **descriptor(initial_value)) - == await backend.get_descriptor() - ) - # Check initial value - await q.assert_updates( - pytest.approx(initial_value) if initial_value != "" else initial_value - ) - # Put to new value and check that - await backend.put(put_value) - await q.assert_updates(pytest.approx(put_value)) - finally: - q.close() - - -async def test_sim_backend_if_disconnected(): - sim_backend = SimSignalBackend(npt.NDArray[np.float64], "SOME-IOC:PV") - with pytest.raises(NotImplementedError): - await sim_backend.get_value() - - -async def test_sim_backend_with_numpy_typing(): - sim_backend = SimSignalBackend(npt.NDArray[np.float64], "SOME-IOC:PV") - await sim_backend.connect() - - array = await sim_backend.get_value() - assert array.shape == (0,) - - -async def test_async_status_success(): - st = AsyncStatus(asyncio.sleep(0.1)) - assert isinstance(st, Status) - assert not st.done - assert not st.success - await st - assert st.done - assert st.success - - -class DummyBaseDevice(Device): - def __init__(self) -> None: - self.connected = False - - async def connect(self, sim=False): - self.connected = True - - -class DummyDeviceGroup(Device): - def __init__(self, name: str) -> None: - self.child1 = DummyBaseDevice() - self.child2 = DummyBaseDevice() - self.dict_with_children: DeviceVector[DummyBaseDevice] = DeviceVector( - {123: DummyBaseDevice()} - ) - self.set_name(name) - - -def test_get_device_children(): - parent = DummyDeviceGroup("parent") - - names = ["child1", "child2", "dict_with_children"] - for idx, (name, child) in enumerate(get_device_children(parent)): - assert name == names[idx] - assert ( - type(child) == DummyBaseDevice - if name.startswith("child") - else type(child) == DeviceVector - ) - - -async def test_children_of_device_have_set_names_and_get_connected(): - parent = DummyDeviceGroup("parent") - - assert parent.name == "parent" - assert parent.child1.name == "parent-child1" - assert parent.child2.name == "parent-child2" - assert parent.dict_with_children.name == "parent-dict_with_children" - assert parent.dict_with_children[123].name == "parent-dict_with_children-123" - - await parent.connect() - - assert parent.child1.connected - assert parent.dict_with_children[123].connected - - -async def test_device_with_device_collector(): - async with DeviceCollector(sim=True): - parent = DummyDeviceGroup("parent") - - assert parent.name == "parent" - assert parent.child1.name == "parent-child1" - assert parent.child2.name == "parent-child2" - assert parent.dict_with_children.name == "parent-dict_with_children" - assert parent.dict_with_children[123].name == "parent-dict_with_children-123" - assert parent.child1.connected - assert parent.dict_with_children[123].connected - - -async def normal_coroutine(time: float): - await asyncio.sleep(time) - - -async def failing_coroutine(time: float): - await normal_coroutine(time) - raise ValueError() - - -async def test_async_status_propagates_exception(): - status = AsyncStatus(failing_coroutine(0.1)) - assert status.exception() is None - - with pytest.raises(ValueError): - await status - - assert type(status.exception()) == ValueError - - -async def test_async_status_propagates_cancelled_error(): - status = AsyncStatus(normal_coroutine(0.1)) - assert status.exception() is None - - status.task.exception = Mock(side_effect=asyncio.CancelledError("")) - await status - - assert type(status.exception()) == asyncio.CancelledError - - -async def test_async_status_has_no_exception_if_coroutine_successful(): - status = AsyncStatus(normal_coroutine(0.1)) - assert status.exception() is None - - await status - - assert status.exception() is None - - -async def test_async_status_success_if_cancelled(): - status = AsyncStatus(normal_coroutine(0.1)) - assert status.exception() is None - status.task.cancel() - with pytest.raises(asyncio.CancelledError): - await status - assert status.success is False - assert isinstance(status.exception(), asyncio.CancelledError) - - -async def test_async_status_wrap(): - wrapped_coroutine = AsyncStatus.wrap(normal_coroutine) - status = wrapped_coroutine(0.1) - - await status - assert status.success is True - - -async def test_async_status_initialised_with_a_task(): - normal_task = asyncio.Task(normal_coroutine(0.1)) - status = AsyncStatus(normal_task) - - await status - assert status.success is True - - -async def test_async_status_str_for_normal_coroutine(): - normal_task = asyncio.Task(normal_coroutine(0.01)) - status = AsyncStatus(normal_task) - - assert str(status) == "" - await status - - assert str(status) == "" - - -async def test_async_status_str_for_failing_coroutine(): - failing_task = asyncio.Task(failing_coroutine(0.01)) - status = AsyncStatus(failing_task) - - assert str(status) == "" - with pytest.raises(ValueError): - await status - - assert str(status) == "" - - -async def test_wait_for_connection(): - class DummyDeviceWithSleep(DummyBaseDevice): - def __init__(self, name) -> None: - self.set_name(name) - - async def connect(self, sim=False): - await asyncio.sleep(0.01) - self.connected = True - - device1, device2 = DummyDeviceWithSleep("device1"), DummyDeviceWithSleep("device2") - - normal_coros = {"device1": device1.connect(), "device2": device2.connect()} - - await wait_for_connection(**normal_coros) - - assert device1.connected - assert device2.connected - - -async def test_wait_for_connection_propagates_error(): - failing_coros = {"test": normal_coroutine(0.01), "failing": failing_coroutine(0.01)} - - with pytest.raises(ValueError) as e: - await wait_for_connection(**failing_coros) - assert traceback.extract_tb(e.__traceback__)[-1].name == "failing_coroutine" - - -class FailingMovable(Movable, Device): - def _fail(self): - raise ValueError("This doesn't work") - - async def _set(self, value): - if value: - self._fail() - - def set(self, value) -> AsyncStatus: - return AsyncStatus(self._set(value)) - - -async def test_status_propogates_traceback_under_RE() -> None: - expected_call_stack = ["_set", "_fail"] - RE = RunEngine() - d = FailingMovable() - with pytest.raises(FailedStatus) as ctx: - RE(bps.mv(d, 3)) - # We get "The above exception was the direct cause of the following exception:", - # so extract that first exception traceback and check - assert ctx.value.__cause__ - assert expected_call_stack == [ - x.name for x in traceback.extract_tb(ctx.value.__cause__.__traceback__) - ] - # Check we get the same from the status.exception - status: AsyncStatus = ctx.value.args[0] - exception = status.exception() - assert exception - assert expected_call_stack == [ - x.name for x in traceback.extract_tb(exception.__traceback__) - ] - - -async def test_set_sim_put_proceeds(): - sim_signal = Signal(SimSignalBackend(str, "test")) - await sim_signal.connect(sim=True) - - assert sim_signal._backend.put_proceeds.is_set() is True - - set_sim_put_proceeds(sim_signal, False) - assert sim_signal._backend.put_proceeds.is_set() is False - set_sim_put_proceeds(sim_signal, True) - assert sim_signal._backend.put_proceeds.is_set() is True - - -async def test_sim_backend_descriptor_fails_for_invalid_class(): - class myClass: - def __init__(self) -> None: - pass - - sim_signal = Signal(SimSignalBackend(myClass, "test")) - await sim_signal.connect(sim=True) - - with pytest.raises(AssertionError): - await sim_signal._backend.get_descriptor() - - -async def time_taken_by(coro) -> float: - start = time.monotonic() - await coro - return time.monotonic() - start - - -async def test_wait_for_value_with_value(): - sim_signal = SignalRW(SimSignalBackend(str, "test")) - sim_signal.set_name("sim_signal") - await sim_signal.connect(sim=True) - set_sim_value(sim_signal, "blah") - - with pytest.raises( - TimeoutError, - match="sim_signal didn't match 'something' in 0.1s, last value 'blah'", - ): - await wait_for_value(sim_signal, "something", timeout=0.1) - assert await time_taken_by(wait_for_value(sim_signal, "blah", timeout=2)) < 0.1 - t = asyncio.create_task( - time_taken_by(wait_for_value(sim_signal, "something else", timeout=2)) - ) - await asyncio.sleep(0.2) - assert not t.done() - set_sim_value(sim_signal, "something else") - assert 0.2 < await t < 1.0 - - -async def test_wait_for_value_with_funcion(): - sim_signal = SignalRW(SimSignalBackend(float, "test")) - sim_signal.set_name("sim_signal") - await sim_signal.connect(sim=True) - set_sim_value(sim_signal, 45.8) - - def less_than_42(v): - return v < 42 - - with pytest.raises( - TimeoutError, - match="sim_signal didn't match less_than_42 in 0.1s, last value 45.8", - ): - await wait_for_value(sim_signal, less_than_42, timeout=0.1) - t = asyncio.create_task( - time_taken_by(wait_for_value(sim_signal, less_than_42, timeout=2)) - ) - await asyncio.sleep(0.2) - assert not t.done() - set_sim_value(sim_signal, 41) - assert 0.2 < await t < 1.0 - assert ( - await time_taken_by(wait_for_value(sim_signal, less_than_42, timeout=2)) < 0.1 - ) - - -async def test_set_and_wait_for_value(): - sim_signal = SignalRW(SimSignalBackend(int, "test")) - sim_signal.set_name("sim_signal") - await sim_signal.connect(sim=True) - set_sim_value(sim_signal, 0) - set_sim_put_proceeds(sim_signal, False) - st = await set_and_wait_for_value(sim_signal, 1) - assert not st.done - set_sim_put_proceeds(sim_signal, True) - assert await time_taken_by(st) < 0.1 diff --git a/ophyd/v2/tests/test_epics.py b/ophyd/v2/tests/test_epics.py deleted file mode 100644 index eda40d09c..000000000 --- a/ophyd/v2/tests/test_epics.py +++ /dev/null @@ -1,325 +0,0 @@ -import asyncio -import random -import re -import string -import subprocess -import sys -import time -from dataclasses import dataclass -from enum import Enum -from pathlib import Path -from typing import Any, Callable, Literal, Optional, Sequence, Tuple, Type, TypedDict - -import numpy as np -import numpy.typing as npt -import pytest -from aioca import purge_channel_caches -from bluesky.protocols import Reading - -from ophyd.v2.core import NotConnected, SignalBackend, T, get_dtype -from ophyd.v2.epics import EpicsTransport, _make_backend - -RECORDS = str(Path(__file__).parent / "test_records.db") -PV_PREFIX = "".join(random.choice(string.ascii_lowercase) for _ in range(12)) - - -@dataclass -class IOC: - process: subprocess.Popen - protocol: Literal["ca", "pva"] - - async def make_backend( - self, typ: Optional[Type], suff: str, connect=True - ) -> SignalBackend: - # Calculate the pv - pv = f"{PV_PREFIX}:{self.protocol}:{suff}" - # Make and connect the backend - cls = EpicsTransport[self.protocol].value - backend = cls(typ, pv, pv) - if connect: - await asyncio.wait_for(backend.connect(), 10) - return backend - - -# Use a module level fixture per protocol so it's fast to run tests. This means -# we need to add a record for every PV that we will modify in tests to stop -# tests interfering with each other -@pytest.fixture(scope="module", params=["pva", "ca"]) -def ioc(request): - protocol = request.param - process = subprocess.Popen( - [ - sys.executable, - "-m", - "epicscorelibs.ioc", - "-m", - f"P={PV_PREFIX}:{protocol}:", - "-d", - RECORDS, - ], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - ) - yield IOC(process, protocol) - # close backend caches before the event loop - purge_channel_caches() - try: - print(process.communicate("exit")[0]) - except ValueError: - # Someone else already called communicate - pass - - -class MonitorQueue: - def __init__(self, backend: SignalBackend): - self.backend = backend - self.subscription = backend.set_callback(self.add_reading_value) - self.updates: asyncio.Queue[Tuple[Reading, Any]] = asyncio.Queue() - - def add_reading_value(self, reading: Reading, value): - self.updates.put_nowait((reading, value)) - - async def assert_updates(self, expected_value): - expected_reading = { - "value": expected_value, - "timestamp": pytest.approx(time.time(), rel=0.1), - "alarm_severity": 0, - } - reading, value = await self.updates.get() - assert value == expected_value == await self.backend.get_value() - assert reading == expected_reading == await self.backend.get_reading() - - def close(self): - self.backend.set_callback(None) - - -async def assert_monitor_then_put( - ioc: IOC, - suffix: str, - descriptor: dict, - initial_value: T, - put_value: T, - datatype: Optional[Type[T]] = None, -): - backend = await ioc.make_backend(datatype, suffix) - # Make a monitor queue that will monitor for updates - q = MonitorQueue(backend) - try: - # Check descriptor - source = f"{ioc.protocol}://{PV_PREFIX}:{ioc.protocol}:{suffix}" - assert dict(source=source, **descriptor) == await backend.get_descriptor() - # Check initial value - await q.assert_updates(pytest.approx(initial_value)) - # Put to new value and check that - await backend.put(put_value) - await q.assert_updates(pytest.approx(put_value)) - finally: - q.close() - - -class MyEnum(str, Enum): - a = "Aaa" - b = "Bbb" - c = "Ccc" - - -def integer_d(value): - return dict(dtype="integer", shape=[]) - - -def number_d(value): - return dict(dtype="number", shape=[]) - - -def string_d(value): - return dict(dtype="string", shape=[]) - - -def enum_d(value): - return dict(dtype="string", shape=[], choices=["Aaa", "Bbb", "Ccc"]) - - -def waveform_d(value): - return dict(dtype="array", shape=[len(value)]) - - -ls1 = "a string that is just longer than forty characters" -ls2 = "another string that is just longer than forty characters" - -ca_dtype_mapping = { - np.int8: np.uint8, - np.uint16: np.int32, - np.uint32: np.float64, - np.int64: np.float64, - np.uint64: np.float64, -} - - -@pytest.mark.parametrize( - "datatype, suffix, initial_value, put_value, descriptor", - [ - (int, "int", 42, 43, integer_d), - (float, "float", 3.141, 43.5, number_d), - (str, "str", "hello", "goodbye", string_d), - (MyEnum, "enum", MyEnum.b, MyEnum.c, enum_d), - (npt.NDArray[np.int8], "int8a", [-128, 127], [-8, 3, 44], waveform_d), - (npt.NDArray[np.uint8], "uint8a", [0, 255], [218], waveform_d), - (npt.NDArray[np.int16], "int16a", [-32768, 32767], [-855], waveform_d), - (npt.NDArray[np.uint16], "uint16a", [0, 65535], [5666], waveform_d), - (npt.NDArray[np.int32], "int32a", [-2147483648, 2147483647], [-2], waveform_d), - (npt.NDArray[np.uint32], "uint32a", [0, 4294967295], [1022233], waveform_d), - (npt.NDArray[np.int64], "int64a", [-2147483649, 2147483648], [-3], waveform_d), - (npt.NDArray[np.uint64], "uint64a", [0, 4294967297], [995444], waveform_d), - (npt.NDArray[np.float32], "float32a", [0.000002, -123.123], [1.0], waveform_d), - (npt.NDArray[np.float64], "float64a", [0.1, -12345678.123], [0.2], waveform_d), - (Sequence[str], "stra", ["five", "six", "seven"], ["nine", "ten"], waveform_d), - # Can't do long strings until https://github.com/epics-base/pva2pva/issues/17 - # (str, "longstr", ls1, ls2, string_d), - # (str, "longstr2.VAL$", ls1, ls2, string_d), - ], -) -async def test_backend_get_put_monitor( - ioc: IOC, - datatype: Type[T], - suffix: str, - initial_value: T, - put_value: T, - descriptor: Callable[[Any], dict], -): - # ca can't support all the types - dtype = get_dtype(datatype) - if ioc.protocol == "ca" and dtype and dtype.type in ca_dtype_mapping: - if dtype == np.int8: - # CA maps uint8 onto int8 rather than upcasting, so we need to change initial - # array - initial_value, put_value = [ # type: ignore - np.array(x).astype(np.uint8) for x in (initial_value, put_value) - ] - datatype = npt.NDArray[ca_dtype_mapping[dtype.type]] # type: ignore - # With the given datatype, check we have the correct initial value and putting works - await assert_monitor_then_put( - ioc, suffix, descriptor(initial_value), initial_value, put_value, datatype - ) - # With datatype guessed from CA/PVA, check we can set it back to the initial value - await assert_monitor_then_put( - ioc, suffix, descriptor(put_value), put_value, initial_value, datatype=None - ) - - -async def test_bool_conversion_of_enum(ioc: IOC) -> None: - await assert_monitor_then_put( - ioc, - suffix="bool", - descriptor=integer_d(True), - initial_value=True, - put_value=False, - datatype=bool, - ) - - -class BadEnum(Enum): - a = "Aaa" - b = "B" - c = "Ccc" - - -@pytest.mark.parametrize( - "typ, suff, error", - [ - (BadEnum, "enum", "has choices ('Aaa', 'Bbb', 'Ccc') not ('Aaa', 'B', 'Ccc')"), - (int, "str", "has type str not int"), - (str, "float", "has type float not str"), - (str, "stra", "has type [str] not str"), - (int, "uint8a", "has type [uint8] not int"), - (float, "enum", "has type Enum not float"), - (npt.NDArray[np.int32], "float64a", "has type [float64] not [int32]"), - ], -) -async def test_backend_wrong_type_errors(ioc: IOC, typ, suff, error): - with pytest.raises( - TypeError, match=re.escape(f"{PV_PREFIX}:{ioc.protocol}:{suff} {error}") - ): - await ioc.make_backend(typ, suff) - - -async def test_backend_put_enum_string(ioc: IOC) -> None: - backend = await ioc.make_backend(MyEnum, "enum2") - # Don't do this in production code, but allow on CLI - await backend.put("Ccc") # type: ignore - assert MyEnum.c == await backend.get_value() - - -def approx_table(table): - return {k: pytest.approx(v) for k, v in table.items()} - - -class MyTable(TypedDict): - bool: npt.NDArray[np.bool_] - int: npt.NDArray[np.int32] - float: npt.NDArray[np.float64] - str: Sequence[str] - enum: Sequence[MyEnum] - - -async def test_pva_table(ioc: IOC) -> None: - if ioc.protocol == "ca": - # CA can't do tables - return - initial = MyTable( - bool=np.array([False, False, True, True], np.bool_), - int=np.array([1, 8, -9, 32], np.int32), - float=np.array([1.8, 8.2, -6, 32.9887], np.float64), - str=["Hello", "World", "Foo", "Bar"], - enum=[MyEnum.a, MyEnum.b, MyEnum.a, MyEnum.c], - ) - put = MyTable( - bool=np.array([True, False], np.bool_), - int=np.array([-5, 32], np.int32), - float=np.array([8.5, -6.97], np.float64), - str=["Hello", "Bat"], - enum=[MyEnum.c, MyEnum.b], - ) - # TODO: what should this be for a variable length table? - descriptor = dict(dtype="object", shape=[]) - # Make and connect the backend - for t, i, p in [(MyTable, initial, put), (None, put, initial)]: - backend = await ioc.make_backend(t, "table") - # Make a monitor queue that will monitor for updates - q = MonitorQueue(backend) - try: - # Check descriptor - dict(source=backend.source, **descriptor) == await backend.get_descriptor() - # Check initial value - await q.assert_updates(approx_table(i)) - # Put to new value and check that - await backend.put(p) - await q.assert_updates(approx_table(p)) - finally: - q.close() - - -async def test_non_existant_errors(ioc: IOC): - backend = await ioc.make_backend(str, "non-existant", connect=False) - # Can't use asyncio.wait_for on python3.8 because of - # https://github.com/python/cpython/issues/84787 - done, pending = await asyncio.wait( - [asyncio.create_task(backend.connect())], timeout=0.1 - ) - assert len(done) == 0 - assert len(pending) == 1 - t = pending.pop() - t.cancel() - with pytest.raises(NotConnected, match=backend.source): - await t - - -def test_make_backend_fails_for_different_transports(): - read_pv = "test" - write_pv = "pva://test" - - with pytest.raises(TypeError) as err: - _make_backend(str, read_pv, write_pv) - assert err.args[0] == f"Differing transports: {read_pv} has EpicsTransport.ca," - +" {write_pv} has EpicsTransport.pva" diff --git a/ophyd/v2/tests/test_epicsdemo.py b/ophyd/v2/tests/test_epicsdemo.py deleted file mode 100644 index 1038f1a1b..000000000 --- a/ophyd/v2/tests/test_epicsdemo.py +++ /dev/null @@ -1,191 +0,0 @@ -import asyncio -from typing import Dict -from unittest.mock import Mock, call, patch - -import pytest -from bluesky.protocols import Reading -from bluesky.run_engine import RunEngine - -from ophyd.v2 import epicsdemo -from ophyd.v2.core import DeviceCollector, NotConnected, set_sim_callback, set_sim_value - -# Long enough for multiple asyncio event loop cycles to run so -# all the tasks have a chance to run -A_WHILE = 0.001 - - -@pytest.fixture -async def sim_mover(): - async with DeviceCollector(sim=True): - sim_mover = epicsdemo.Mover("BLxxI-MO-TABLE-01:X:") - # Signals connected here - - assert sim_mover.name == "sim_mover" - set_sim_value(sim_mover.units, "mm") - set_sim_value(sim_mover.precision, 3) - set_sim_value(sim_mover.velocity, 1) - yield sim_mover - - -@pytest.fixture -async def sim_sensor(): - async with DeviceCollector(sim=True): - sim_sensor = epicsdemo.Sensor("SIM:SENSOR:") - # Signals connected here - - assert sim_sensor.name == "sim_sensor" - yield sim_sensor - - -async def test_mover_moving_well(sim_mover: epicsdemo.Mover) -> None: - s = sim_mover.set(0.55) - watcher = Mock() - s.watch(watcher) - done = Mock() - s.add_callback(done) - await asyncio.sleep(A_WHILE) - assert watcher.call_count == 1 - assert watcher.call_args == call( - name="sim_mover", - current=0.0, - initial=0.0, - target=0.55, - unit="mm", - precision=3, - time_elapsed=pytest.approx(0.0, abs=0.05), - ) - watcher.reset_mock() - assert 0.55 == await sim_mover.setpoint.get_value() - assert not s.done - done.assert_not_called() - await asyncio.sleep(0.1) - set_sim_value(sim_mover.readback, 0.1) - await asyncio.sleep(A_WHILE) - assert watcher.call_count == 1 - assert watcher.call_args == call( - name="sim_mover", - current=0.1, - initial=0.0, - target=0.55, - unit="mm", - precision=3, - time_elapsed=pytest.approx(0.1, abs=0.05), - ) - set_sim_value(sim_mover.readback, 0.5499999) - await asyncio.sleep(A_WHILE) - assert s.done - assert s.success - done.assert_called_once_with(s) - done2 = Mock() - s.add_callback(done2) - done2.assert_called_once_with(s) - - -async def test_mover_stopped(sim_mover: epicsdemo.Mover): - callbacks = [] - set_sim_callback(sim_mover.stop_, lambda r, v: callbacks.append(v)) - - assert callbacks == [None] - await sim_mover.stop() - assert callbacks == [None, None] - - -async def test_read_mover(sim_mover: epicsdemo.Mover): - await sim_mover.stage() - assert (await sim_mover.read())["sim_mover"]["value"] == 0.0 - assert (await sim_mover.describe())["sim_mover"][ - "source" - ] == "sim://BLxxI-MO-TABLE-01:X:Readback" - assert (await sim_mover.read_configuration())["sim_mover-velocity"]["value"] == 1 - assert (await sim_mover.describe_configuration())["sim_mover-units"]["shape"] == [] - set_sim_value(sim_mover.readback, 0.5) - assert (await sim_mover.read())["sim_mover"]["value"] == 0.5 - await sim_mover.unstage() - # Check we can still read and describe when not staged - set_sim_value(sim_mover.readback, 0.1) - assert (await sim_mover.read())["sim_mover"]["value"] == 0.1 - assert await sim_mover.describe() - - -async def test_set_velocity(sim_mover: epicsdemo.Mover) -> None: - v = sim_mover.velocity - assert (await v.describe())["sim_mover-velocity"][ - "source" - ] == "sim://BLxxI-MO-TABLE-01:X:Velocity" - q: asyncio.Queue[Dict[str, Reading]] = asyncio.Queue() - v.subscribe(q.put_nowait) - assert (await q.get())["sim_mover-velocity"]["value"] == 1.0 - await v.set(2.0) - assert (await q.get())["sim_mover-velocity"]["value"] == 2.0 - v.clear_sub(q.put_nowait) - await v.set(3.0) - assert (await v.read())["sim_mover-velocity"]["value"] == 3.0 - assert q.empty() - - -async def test_mover_disconncted(): - with pytest.raises(NotConnected, match="Not all Devices connected"): - async with DeviceCollector(timeout=0.1): - m = epicsdemo.Mover("ca://PRE:", name="mover") - assert m.name == "mover" - - -async def test_sensor_disconncted(): - with patch("ophyd.v2.core.logging") as mock_logging: - with pytest.raises(NotConnected, match="Not all Devices connected"): - async with DeviceCollector(timeout=0.1): - s = epicsdemo.Sensor("ca://PRE:", name="sensor") - mock_logging.error.assert_called_once_with( - """\ -1 Devices did not connect: - s: NotConnected - value: ca://PRE:Value - mode: ca://PRE:Mode""" - ) - assert s.name == "sensor" - - -async def test_read_sensor(sim_sensor: epicsdemo.Sensor): - sim_sensor.stage() - assert (await sim_sensor.read())["sim_sensor-value"]["value"] == 0 - assert (await sim_sensor.describe())["sim_sensor-value"][ - "source" - ] == "sim://SIM:SENSOR:Value" - assert (await sim_sensor.read_configuration())["sim_sensor-mode"][ - "value" - ] == epicsdemo.EnergyMode.low - desc = (await sim_sensor.describe_configuration())["sim_sensor-mode"] - assert desc["dtype"] == "string" - assert desc["choices"] == ["Low Energy", "High Energy"] # type: ignore - set_sim_value(sim_sensor.mode, epicsdemo.EnergyMode.high) - assert (await sim_sensor.read_configuration())["sim_sensor-mode"][ - "value" - ] == epicsdemo.EnergyMode.high - await sim_sensor.unstage() - - -async def test_assembly_renaming() -> None: - thing = epicsdemo.SampleStage("PRE") - await thing.connect(sim=True) - assert thing.x.name == "" - assert thing.x.velocity.name == "" - assert thing.x.stop_.name == "" - await thing.x.velocity.set(456) - assert await thing.x.velocity.get_value() == 456 - thing.set_name("foo") - assert thing.x.name == "foo-x" - assert thing.x.velocity.name == "foo-x-velocity" - assert thing.x.stop_.name == "foo-x-stop" - - -def test_mover_in_re(sim_mover: epicsdemo.Mover) -> None: - RE = RunEngine() - sim_mover.move(0) - - def my_plan(): - sim_mover.move(0) - return - yield - - with pytest.raises(RuntimeError, match="Will deadlock run engine if run in a plan"): - RE(my_plan()) diff --git a/ophyd/v2/tests/test_records.db b/ophyd/v2/tests/test_records.db deleted file mode 100644 index 307ad5cd3..000000000 --- a/ophyd/v2/tests/test_records.db +++ /dev/null @@ -1,244 +0,0 @@ -record(bo, "$(P)bool") { - field(ZNAM, "No") - field(ONAM, "Yes") - field(VAL, "1") - field(PINI, "YES") -} - -record(longout, "$(P)int") { - field(HOPR, "100") - field(HIHI, "98") - field(HIGH, "96") - field(DRVH, "90") - field(DRVL, "10") - field(LOW, "5") - field(LOLO, "2") - field(LOPR, "0") - field(VAL, "42") - field(PINI, "YES") -} - -record(ao, "$(P)float") { - field(PREC, "1") - field(EGU, "mm") - field(VAL, "3.141") - field(PINI, "YES") -} - -record(stringout, "$(P)str") { - field(VAL, "hello") - field(PINI, "YES") -} - -record(mbbo, "$(P)enum") { - field(ZRST, "Aaa") - field(ZRVL, "5") - field(ONST, "Bbb") - field(ONVL, "6") - field(TWST, "Ccc") - field(TWVL, "7") - field(VAL, "1") - field(PINI, "YES") -} - -record(mbbo, "$(P)enum2") { - field(ZRST, "Aaa") - field(ONST, "Bbb") - field(TWST, "Ccc") - field(VAL, "1") - field(PINI, "YES") -} - -record(waveform, "$(P)int8a") { - field(NELM, "3") - field(FTVL, "CHAR") - field(INP, {const:[-128, 127]}) - field(PINI, "YES") -} - -record(waveform, "$(P)uint8a") { - field(NELM, "3") - field(FTVL, "UCHAR") - field(INP, {const:[0, 255]}) - field(PINI, "YES") -} - -record(waveform, "$(P)int16a") { - field(NELM, "3") - field(FTVL, "SHORT") - field(INP, {const:[-32768, 32767]}) - field(PINI, "YES") -} - -record(waveform, "$(P)uint16a") { - field(NELM, "3") - field(FTVL, "USHORT") - field(INP, {const:[0, 65535]}) - field(PINI, "YES") -} - -record(waveform, "$(P)int32a") { - field(NELM, "3") - field(FTVL, "LONG") - field(INP, {const:[-2147483648, 2147483647]}) - field(PINI, "YES") -} - -record(waveform, "$(P)uint32a") { - field(NELM, "3") - field(FTVL, "ULONG") - field(INP, {const:[0, 4294967295]}) - field(PINI, "YES") -} - -record(waveform, "$(P)int64a") { - field(NELM, "3") - field(FTVL, "INT64") - # Can't do 64-bit int with JSON numbers in a const link... - field(INP, {const:[-2147483649, 2147483648]}) - field(PINI, "YES") -} - -record(waveform, "$(P)uint64a") { - field(NELM, "3") - field(FTVL, "UINT64") - field(INP, {const:[0, 4294967297]}) - field(PINI, "YES") -} - -record(waveform, "$(P)float32a") { - field(NELM, "3") - field(FTVL, "FLOAT") - field(INP, {const:[0.000002, -123.123]}) - field(PINI, "YES") -} - -record(waveform, "$(P)float64a") { - field(NELM, "3") - field(FTVL, "DOUBLE") - field(INP, {const:[0.1, -12345678.123]}) - field(PINI, "YES") -} - -record(waveform, "$(P)stra") { - field(NELM, "3") - field(FTVL, "STRING") - field(INP, {const:["five", "six", "seven"]}) - field(PINI, "YES") -} - -record(waveform, "$(P)longstr") { - field(NELM, "80") - field(FTVL, "CHAR") - field(INP, {const:"a string that is just longer than forty characters"}) - field(PINI, "YES") -} - -record(lsi, "$(P)longstr2") { - field(SIZV, "80") - field(INP, {const:"a string that is just longer than forty characters"}) - field(PINI, "YES") -} - -record(waveform, "$(P)table:labels") { - field(FTVL, "STRING") - field(NELM, "5") - field(INP, {const:["Bool", "Int", "Float", "Str", "Enum"]}) - field(PINI, "YES") - info(Q:group, { - "$(P)table": { - "+id": "epics:nt/NTTable:1.0", - "labels": { - "+type": "plain", - "+channel": "VAL" - } - } - }) -} - -record(waveform, "$(P)table:bool") -{ - field(FTVL, "UCHAR") - field(NELM, "4096") - field(INP, {const:[false, false, true, true]}) - field(PINI, "YES") - info(Q:group, { - "$(P)table": { - "value.bool": { - "+type": "plain", - "+channel": "VAL", - "+putorder": 1 - } - } - }) -} - -record(waveform, "$(P)table:int") -{ - field(FTVL, "LONG") - field(NELM, "4096") - field(INP, {const:[1, 8, -9, 32]}) - field(PINI, "YES") - info(Q:group, { - "$(P)table": { - "value.int": { - "+type": "plain", - "+channel": "VAL", - "+putorder": 2 - } - } - }) -} - -record(waveform, "$(P)table:float") -{ - field(FTVL, "DOUBLE") - field(NELM, "4096") - field(INP, {const:[1.8, 8.2, -6, 32.9887]}) - field(PINI, "YES") - info(Q:group, { - "$(P)table": { - "value.float": { - "+type": "plain", - "+channel": "VAL", - "+putorder": 3 - } - } - }) -} - -record(waveform, "$(P)table:str") -{ - field(FTVL, "STRING") - field(NELM, "4096") - field(INP, {const:["Hello", "World", "Foo", "Bar"]}) - field(PINI, "YES") - info(Q:group, { - "$(P)table": { - "value.str": { - "+type": "plain", - "+channel": "VAL", - "+putorder": 4 - } - } - }) -} - -record(waveform, "$(P)table:enum") -{ - field(FTVL, "STRING") - field(NELM, "4096") - field(INP, {const:["Aaa", "Bbb", "Aaa", "Ccc"]}) - field(PINI, "YES") - info(Q:group, { - "$(P)table": { - "value.enum": { - "+type": "plain", - "+channel": "VAL", - "+putorder": 5, - "+trigger": "*", - }, - "": {"+type": "meta", "+channel": "VAL"} - } - }) -} \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index bf05ccb40..2732098c4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,15 +26,8 @@ install_requires = pint [options.extras_require] -ca = - aioca>=1.6 -pva = - numpy<2.0 # See https://github.com/mdavidsaver/p4p/issues/145 - p4p; python_version < '3.12' # For development tests/docs dev = - %(ca)s - %(pva)s attrs>=19.3.0 black==22.3.0 bluesky>=1.11.0 @@ -81,11 +74,6 @@ ophyd = databroker.handlers = NPY_SEQ = ophyd.sim:NumpySeqHandler -[mypy] -# Ignore missing stubs for modules we use -ignore_missing_imports = True -plugins = numpy.typing.mypy_plugin - [isort] profile=black @@ -125,10 +113,6 @@ skipsdist = True allowlist_externals = pytest commands = pytest {posargs} -[testenv:mypy] -allowlist_externals = mypy -commands = mypy ophyd/v2 {posargs} - [testenv:pre-commit] allowlist_externals = pre-commit commands = pre-commit run --all-files {posargs}