Skip to content

Commit

Permalink
Use _write_async_message for sensor updates
Browse files Browse the repository at this point in the history
Clients that don't keep up with sensor updates will now be disconnected,
instead of causing the server to use an ever-increasing amount of RAM.

This required some unintuitive changes to keep mypy happy, due to
python/mypy#17723.
  • Loading branch information
bmerry committed Sep 5, 2024
1 parent a2a268b commit fb4eec4
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
10 changes: 5 additions & 5 deletions src/aiokatcp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from typing import Any, Callable, Iterable, Optional, TypeVar

import decorator
from typing_extensions import Protocol
from typing_extensions import Protocol, Self

from . import core

Expand All @@ -44,7 +44,6 @@
_BLANK_RE = re.compile(rb"^[ \t]*[\r\n]?$")
# typing.Protocol requires a contravariant typevar
_C_contra = TypeVar("_C_contra", bound="Connection", contravariant=True)
_C = TypeVar("_C", bound="Connection")


class ConvertCRProtocol(asyncio.StreamReaderProtocol):
Expand Down Expand Up @@ -127,8 +126,8 @@ async def handle_message(self, conn: _C_contra, msg: core.Message) -> None:

class Connection:
def __init__(
self: _C,
owner: _ConnectionOwner[_C],
self,
owner: _ConnectionOwner[Self],
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
is_server: bool,
Expand Down Expand Up @@ -193,7 +192,8 @@ async def drain(self) -> None:
self.logger.warning("Connection closed while draining: %s", error)
self._close_writer()

async def _run(self: _C) -> None:
# The self: Self is needed due to https://github.com/python/mypy/issues/17723
async def _run(self: Self) -> None:
while True:
# If the output buffer gets too full, pause processing requests
await self.drain()
Expand Down
6 changes: 4 additions & 2 deletions src/aiokatcp/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017, 2019, 2022 National Research Foundation (SARAO)
# Copyright 2017, 2019, 2022, 2024 National Research Foundation (SARAO)
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
Expand Down Expand Up @@ -64,6 +64,8 @@
class ClientConnection(connection.Connection):
"""Server's view of the connection from a single client."""

owner: "DeviceServer"

def __init__(
self,
owner: "DeviceServer",
Expand Down Expand Up @@ -99,7 +101,7 @@ def sensor_update(self, s: sensor.Sensor, reading: sensor.Reading) -> None:
msg = core.Message.inform(
"sensor-status", reading.timestamp, 1, s.name, reading.status, reading.value
)
self.write_message(msg)
self.owner._write_async_message(self, msg)


class RequestContext:
Expand Down

0 comments on commit fb4eec4

Please sign in to comment.