Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

MXDGA-3724: Added the MVP for the fast grid scan motion #2

Merged
merged 7 commits into from
Jan 14, 2022
21 changes: 21 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
},
{
"name": "Debug Unit Test",
"type": "python",
"request": "test",
"justMyCode": false,
},
]
}
8 changes: 8 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.formatting.provider": "black"
}
Empty file added src/__init__.py
Empty file.
Empty file added src/artemis/devices/__init__.py
Empty file.
219 changes: 219 additions & 0 deletions src/artemis/devices/fast_grid_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import threading
import time
from typing import List
from ophyd import (
Component,
Device,
EpicsSignal,
EpicsSignalRO,
EpicsSignalWithRBV,
Signal,
)
from ophyd.status import DeviceStatus, StatusBase, SubscriptionStatus

from dataclasses import dataclass
from typing import Any

from src.artemis.devices.motors import (
GridScanLimit,
GridScanLimitBundle,
)

from bluesky.plan_stubs import mv


@dataclass
class GridScanParams:
"""
Holder class for the parameters of a grid scan.
"""

x_steps: int = 1
y_steps: int = 1
x_step_size: float = 0.1
y_step_size: float = 0.1
dwell_time: float = 0.1
x_start: float = 0.1
y1_start: float = 0.1
z1_start: float = 0.1

def is_valid(self, limits: GridScanLimitBundle) -> bool:
"""
Validates scan parameters

:param limits: The motor limits against which to validate
the parameters
:return: True if the scan is valid
"""

return (
# All scan axes are within limits
scan_in_limits(limits.x, self.x_start, self.x_steps, self.x_step_size)
and scan_in_limits(limits.y, self.y1_start, self.y_steps, self.y_step_size)
# Z never exceeds limits
and limits.z.is_within(self.z1_start)
)


def scan_in_limits(
limit: GridScanLimit, start: float, steps: float, step_size: float
) -> bool:
end = start + (steps * step_size)
return limit.is_within(start) and limit.is_within(end)


class GridScanCompleteStatus(DeviceStatus):
"""
A Status for the grid scan completion
A special status object that notifies watchers (progress bars)
based on comparing device.expected_images to device.position_counter.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_ts = time.time()

self.device.position_counter.subscribe(self._notify_watchers)
self.device.status.subscribe(self._running_changed)

self._name = self.device.name
self._target_count = self.device.expected_images.get()

def _notify_watchers(self, value, *args, **kwargs):
if not self._watchers:
return
time_elapsed = time.time() - self.start_ts
try:
fraction = 1 - value / self._target_count
except ZeroDivisionError:
fraction = 0
time_remaining = 0
except Exception as e:
fraction = None
time_remaining = None
self.set_exception(e)
else:
time_remaining = time_elapsed / fraction
for watcher in self._watchers:
watcher(
name=self._name,
current=value,
initial=0,
target=self._target_count,
unit="images",
precision=0,
fraction=fraction,
time_elapsed=time_elapsed,
time_remaining=time_remaining,
)

def _running_changed(self, value=None, old_value=None, **kwargs):
if (old_value == 1) and (value == 0):
# Stopped running
number_of_images = self.device.position_counter.get()
if number_of_images != self._target_count:
self.set_exception(
Exception(
f"Grid scan finished without collecting expected number of images. Expected {self._target_count} got {number_of_images}."
)
)
else:
self.set_finished()
self.clean_up()

def clean_up(self):
self.device.position_counter.clear_sub(self._notify_watchers)
self.device.status.clear_sub(self._running_changed)


class FastGridScan(Device):

x_steps: EpicsSignalWithRBV = Component(EpicsSignalWithRBV, "X_NUM_STEPS")
y_steps: EpicsSignalWithRBV = Component(EpicsSignalWithRBV, "Y_NUM_STEPS")

x_step_size: EpicsSignalWithRBV = Component(EpicsSignalWithRBV, "X_STEP_SIZE")
y_step_size: EpicsSignalWithRBV = Component(EpicsSignalWithRBV, "Y_STEP_SIZE")

dwell_time: EpicsSignalWithRBV = Component(EpicsSignalWithRBV, "DWELL_TIME")

x_start: EpicsSignalWithRBV = Component(EpicsSignalWithRBV, "X_START")
y1_start: EpicsSignalWithRBV = Component(EpicsSignalWithRBV, "Y_START")
z1_start: EpicsSignalWithRBV = Component(EpicsSignalWithRBV, "Z_START")

position_counter: EpicsSignal = Component(
EpicsSignal, "POS_COUNTER", write_pv="POS_COUNTER_WRITE"
)
x_counter: EpicsSignalRO = Component(EpicsSignalRO, "X_COUNTER")
y_counter: EpicsSignalRO = Component(EpicsSignalRO, "Y_COUNTER")
scan_invalid: EpicsSignalRO = Component(EpicsSignalRO, "SCAN_INVALID")

run_cmd: EpicsSignal = Component(EpicsSignal, "RUN.PROC")
stop_cmd: EpicsSignal = Component(EpicsSignal, "STOP.PROC")
status: EpicsSignalRO = Component(EpicsSignalRO, "SCAN_STATUS")

expected_images: Signal = Component(Signal)

# Kickoff timeout in seconds
KICKOFF_TIMEOUT: float = 5.0

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def set_expected_images(*_, **__):
self.expected_images.put(self.x_steps.get() * self.y_steps.get())

self.x_steps.subscribe(set_expected_images)
self.y_steps.subscribe(set_expected_images)

def is_invalid(self) -> bool:
if "GONP" in self.scan_invalid.pvname:
return False
return self.scan_invalid.get()

def kickoff(self) -> StatusBase:
# Check running already here?
st = DeviceStatus(device=self, timeout=self.KICKOFF_TIMEOUT)

def check_valid_and_scan():
try:
self.log.info("Waiting on position counter reset and valid settings")
while self.is_invalid() or not self.position_counter.get() == 0:
time.sleep(0.1)
self.log.debug("Running scan")
running = SubscriptionStatus(self.status, lambda value: value == 1)
run_requested = self.run_cmd.set(1)
(run_requested and running).wait()
st.set_finished()
except Exception as e:
st.set_exception(e)

threading.Thread(target=check_valid_and_scan, daemon=True).start()
return st

def stage(self) -> List[object]:
self.position_counter.put(0)
return super().stage()

def complete(self) -> DeviceStatus:
return GridScanCompleteStatus(self)


def set_fast_grid_scan_params(scan: FastGridScan, params: GridScanParams):
yield from mv(
scan.x_steps,
params.x_steps,
scan.y_steps,
params.y_steps,
scan.x_step_size,
params.x_step_size,
scan.y_step_size,
params.y_step_size,
scan.dwell_time,
params.dwell_time,
scan.x_start,
params.x_start,
scan.y1_start,
params.y1_start,
scan.z1_start,
params.z1_start,
)
59 changes: 59 additions & 0 deletions src/artemis/devices/motors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from dataclasses import dataclass, field

from ophyd import EpicsMotor
from ophyd.device import Component
from ophyd.epics_motor import MotorBundle


@dataclass
class GridScanLimit:
"""
Represents motor limit(s)
"""

motor: EpicsMotor

def is_within(self, position: float) -> bool:
"""Checks position against limits

:param position: The position to check
:return: True if position is within the limits
"""
low = self.motor.low_limit_travel.get()
high = self.motor.high_limit_travel.get()
return low <= position <= high


@dataclass
class GridScanLimitBundle:
"""
Holder for limits reflecting MX grid scan axes
"""

x: GridScanLimit
y: GridScanLimit
z: GridScanLimit


class GridScanMotorBundle(MotorBundle):
"""
Holder for motors reflecting grid scan axes
"""

x: EpicsMotor = Component(EpicsMotor, ":X")
y: EpicsMotor = Component(EpicsMotor, ":Y")
z: EpicsMotor = Component(EpicsMotor, ":Z")

def get_limits(self) -> GridScanLimitBundle:
"""Get the limits for the bundle.

Note that these limits may not yet be valid until wait_for_connection is called on this MotorBundle.

Returns:
GridScanLimitBundle: The limits for the underlying motor.
"""
return GridScanLimitBundle(
GridScanLimit(self.x),
GridScanLimit(self.y),
GridScanLimit(self.z),
)
Empty file.
22 changes: 22 additions & 0 deletions src/artemis/devices/system_tests/test_gridscan_system.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import pytest

from src.artemis.devices.fast_grid_scan import (
FastGridScan,
set_fast_grid_scan_params,
GridScanParams,
)
from bluesky.run_engine import RunEngine


@pytest.fixture()
def fast_grid_scan():
fast_grid_scan = FastGridScan(name="fast_grid_scan", prefix="BL03S-MO-SGON-01:FGS:")
yield fast_grid_scan


@pytest.mark.s03
def test_set_program_data_and_kickoff(fast_grid_scan: FastGridScan):
RE = RunEngine()
RE(set_fast_grid_scan_params(fast_grid_scan, GridScanParams(2, 2)))
kickoff_status = fast_grid_scan.kickoff()
kickoff_status.wait()
Empty file.
Loading