Skip to content

Commit

Permalink
Implement LONK-WS and test
Browse files Browse the repository at this point in the history
  • Loading branch information
jennydaman committed Sep 10, 2024
1 parent 66a7704 commit 4c12c3b
Show file tree
Hide file tree
Showing 4 changed files with 383 additions and 16 deletions.
54 changes: 48 additions & 6 deletions chris_backend/pacsfiles/consumers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import rest_framework.permissions
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from django.conf import settings
from rest_framework import permissions

from pacsfiles.lonk import (
LonkClient,
validate_subscription,
LonkWsSubscription,
Lonk,
)
from pacsfiles.permissions import IsChrisOrIsPACSUserReadOnly


Expand All @@ -11,19 +18,54 @@ class PACSFileProgress(AsyncJsonWebsocketConsumer):
A WebSockets endpoint which relays progress messages from NATS sent by *oxidicom* to a client.
"""

permission_classes = (permissions.IsAuthenticated, IsChrisOrIsPACSUserReadOnly,)
permission_classes = (
permissions.IsAuthenticated,
IsChrisOrIsPACSUserReadOnly,
)

async def connect(self):
if not await self._has_permission():
await self.close()
else:
await self.accept()
return await self.close()
self.client: LonkClient = await LonkClient.connect(
settings.NATS_ADDRESS
)
await self.accept()

async def receive_json(self, content, **kwargs):
...
if validate_subscription(content):
await self._subscribe(
content['pacs_name'], content['SeriesInstanceUID']
)
return
await self.close(code=400, reason='Invalid subscription')

async def _subscribe(self, pacs_name: str, series_instance_uid: str):
"""
Subscribe to progress notifications about the reception of a DICOM series.
"""
try:
await self.client.subscribe(
pacs_name, series_instance_uid, lambda msg: self.send_json(msg)
)
response = Lonk(
pacs_name=pacs_name,
SeriesInstanceUID=series_instance_uid,
message=LonkWsSubscription(subscription='subscribed'),
)
await self.send_json(response)
except Exception as e:
response = Lonk(
pacs_name=pacs_name,
SeriesInstanceUID=series_instance_uid,
message=LonkWsSubscription(subscription='error'),
)
await self.send_json(response)
await self.close(code=500)
raise e

async def disconnect(self, code):
...
await super().disconnect(code)
await self.client.close()

@database_sync_to_async
def _has_permission(self) -> bool:
Expand Down
209 changes: 209 additions & 0 deletions chris_backend/pacsfiles/lonk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
"""
Implementation of the "Light Oxidicom NotifiKations Encoding"
See https://chrisproject.org/docs/oxidicom/lonk
"""
import asyncio
import enum
from sys import byteorder
from typing import (
Self,
Callable,
TypedDict,
Literal,
TypeGuard,
Any,
Awaitable,
)

import nats
from nats import NATS
from nats.aio.subscription import Subscription
from nats.aio.msg import Msg


class SubscriptionRequest(TypedDict):
"""
A request to subscribe to LONK notifications about a DICOM series.
"""

pacs_name: str
SeriesInstanceUID: str
action: Literal['subscribe']


def validate_subscription(data: Any) -> TypeGuard[SubscriptionRequest]:
if not isinstance(data, dict):
return False
return (
data.get('action', None) == 'subscribe'
and isinstance(data.get('SeriesInstanceUID', None), str)
and isinstance(data.get('pacs_name', None), str)
)


class LonkProgress(TypedDict):
"""
LONK "done" message.
https://chrisproject.org/docs/oxidicom/lonk#lonk-message-encoding
"""

ndicom: int


class LonkError(TypedDict):
"""
LONK "error" message.
https://chrisproject.org/docs/oxidicom/lonk#lonk-message-encoding
"""

error: str


class LonkDone(TypedDict):
"""
LONK "done" message.
https://chrisproject.org/docs/oxidicom/lonk#lonk-message-encoding
"""

done: bool


class LonkWsSubscription(TypedDict):
"""
LONK-WS "subscribed" message.
https://chrisproject.org/docs/oxidicom/lonk-ws#lonk-ws-subscription
"""

subscription: Literal['subscribed', 'error']


LonkMessageData = LonkProgress | LonkError | LonkDone | LonkWsSubscription
"""
Lonk message data.
https://chrisproject.org/docs/oxidicom/lonk-ws#messages
"""


class Lonk(TypedDict):
"""
Serialized LONK message about a DICOM series.
https://chrisproject.org/docs/oxidicom#lonk-message-encoding
"""

SeriesInstanceUID: str
pacs_name: str
message: LonkMessageData


class LonkClient:
"""
"Light Oxidicom NotifiKations Encoding" client:
A client for the messages sent by *oxidicom* over NATS.
https://chrisproject.org/docs/oxidicom/lonk
"""

def __init__(self, nc: NATS):
self._nc = nc
self._subscriptions: list[Subscription] = []

@classmethod
async def connect(cls, servers: str | list[str]) -> Self:
return cls(await nats.connect(servers))

async def subscribe(
self,
pacs_name: str,
series_instance_uid: str,
cb: Callable[[Lonk], Awaitable[None]],
):
subject = subject_of(pacs_name, series_instance_uid)
cb = _curry_message2json(pacs_name, series_instance_uid, cb)
subscription = await self._nc.subscribe(subject, cb=cb)
self._subscriptions.append(subscription)
return subscription

async def close(self):
await asyncio.gather(*(s.unsubscribe() for s in self._subscriptions))
await self._nc.close()


def subject_of(pacs_name: str, series_instance_uid: str) -> str:
"""
Get the NATS subject for a series.
Equivalent to https://github.com/FNNDSC/oxidicom/blob/33838f22a5431a349b3b83a313035b8e22d16bb1/src/lonk.rs#L36-L48
"""
return f'oxidicom.{_sanitize_topic_part(pacs_name)}.{_sanitize_topic_part(series_instance_uid)}'


def _sanitize_topic_part(s: str) -> str:
return (
s.replace('\0', '')
.replace(' ', '_')
.replace('.', '_')
.replace('*', '_')
.replace('>', '_')
)


def _message2json(
pacs_name: str, series_instance_uid: str, message: Msg
) -> Lonk:
return Lonk(
pacs_name=pacs_name,
SeriesInstanceUID=series_instance_uid,
message=_serialize_to_lonkws(message.data),
)


def _curry_message2json(
pacs_name: str,
series_instance_uid: str,
cb: Callable[[Lonk], Awaitable[None]],
):
async def nats_callback(message: Msg):
lonk = _message2json(pacs_name, series_instance_uid, message)
await cb(lonk)

return nats_callback


@enum.unique
class LonkMagicByte(enum.IntEnum):
"""
LONK message first magic byte.
"""

DONE = 0x00
PROGRESS = 0x01
ERROR = 0x02


def _serialize_to_lonkws(payload: bytes) -> LonkMessageData:
"""
Translate LONK binary encoding to LONK-WS JSON.
"""
if len(payload) == 0:
raise ValueError('Empty message')
data = payload[1:]

match payload[0]:
case LonkMagicByte.DONE.value:
return LonkDone(done=True)
case LonkMagicByte.PROGRESS.value:
ndicom = int.from_bytes(data, 'little', signed=False)
return LonkProgress(ndicom=ndicom)
case LonkMagicByte.ERROR.value:
error = data.decode(encoding='utf-8')
return LonkError(error=error)
case _:
hexstr = ' '.join(hex(b) for b in payload)
raise ValueError(f'Unrecognized message: {hexstr}')
47 changes: 47 additions & 0 deletions chris_backend/pacsfiles/tests/mocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Self

import nats
from nats import NATS

from pacsfiles import lonk
from pacsfiles.lonk import LonkMagicByte


class Mockidicom:
"""
A mock *oxidicom* which sends LONK messages to NATS.
Somewhat similar to https://github.com/FNNDSC/oxidicom/blob/e6bb83d1ea2fbaf5bb4af7dbf518a4b1a2957f2d/src/lonk.rs
"""

def __init__(self, nc: NATS):
self._nc = nc

@classmethod
async def connect(cls, servers: str | list[str]) -> Self:
nc = await nats.connect(servers)
return cls(nc)

async def send_progress(
self, pacs_name: str, SeriesInstanceUID: str, ndicom: int
):
subject = lonk.subject_of(pacs_name, SeriesInstanceUID)
u32 = ndicom.to_bytes(length=4, byteorder='little', signed=False)
data = LonkMagicByte.PROGRESS.value.to_bytes() + u32
await self._nc.publish(subject, data)

async def send_done(self, pacs_name: str, SeriesInstanceUID: str):
subject = lonk.subject_of(pacs_name, SeriesInstanceUID)
await self._nc.publish(subject, LonkMagicByte.DONE.value.to_bytes())

async def send_error(
self, pacs_name: str, SeriesInstanceUID: str, error: str
):
subject = lonk.subject_of(pacs_name, SeriesInstanceUID)
data = LonkMagicByte.ERROR.value.to_bytes() + error.encode(
encoding='utf-8'
)
await self._nc.publish(subject, data)

async def close(self):
self._nc.close()
Loading

0 comments on commit 4c12c3b

Please sign in to comment.