Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfixes #13

Merged
merged 7 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion madvr/consts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Constants for madvr module."""

REFRESH_TIME = 60
REFRESH_TIME = 20
PING_DELAY = 30
COMMAND_TIMEOUT = 3
PING_INTERVAL = 5
Expand Down
128 changes: 68 additions & 60 deletions madvr/madvr.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,27 +141,21 @@ async def task_handle_queue(self) -> None:
"""Handle command queue."""
while True:
await self.connection_event.wait()
while (
not self.command_queue.empty() and not self.stop_commands_flag.is_set()
):
while not self.command_queue.empty() and not self.stop_commands_flag.is_set():
command = await self.command_queue.get()
self.logger.debug("sending queue command %s", command)
try:
await self.send_command(command)
except NotImplementedError as err:
self.logger.warning("Command not implemented: %s", err)
except (ConnectionError, ConnectionResetError):
self.logger.warning("Envy was turned off manually")
# update state that its off
await self._handle_power_off()
except (ConnectionError, ConnectionResetError, BrokenPipeError):
self.logger.warning("Task Queue: Envy seems to be disconnected")
except AttributeError:
self.logger.warning("Issue sending command from queue")
except RetryExceededError:
self.logger.warning("Retry exceeded for command %s", command)
except OSError as err:
self.logger.error("Unexpected error when sending command: %s", err)
finally:
self.command_queue.task_done()

if self.stop_commands_flag.is_set():
self.clear_queue()
Expand Down Expand Up @@ -197,9 +191,7 @@ async def task_read_notifications(self) -> None:
# try to connect otherwise it will mark the device as offline
await self._reconnect()
except ConnectionError as e:
self.logger.error(
"Connection error when reading notifications: %s", e
)
self.logger.error("Connection error when reading notifications: %s", e)
continue

await asyncio.sleep(TASK_CPU_DELAY)
Expand All @@ -213,15 +205,7 @@ async def send_heartbeat(self, once: bool = False) -> None:
"""

async def perform_heartbeat() -> None:
if not self.connected:
self.logger.warning("Connection not established")
raise HeartBeatError("Connection not established")

async with self.lock:
if self.writer:
self.writer.write(self.HEARTBEAT)
await self.writer.drain()
self.logger.debug("Heartbeat complete")
await self._write_with_timeout(self.HEARTBEAT)

async def handle_heartbeat_error(
err: TimeoutError | OSError | HeartBeatError,
Expand Down Expand Up @@ -270,9 +254,7 @@ async def task_ping_until_alive(self) -> None:
try:
await self.open_connection()
except ConnectionError as err:
self.logger.error(
"Error opening connection after connectivity check: %s", err
)
self.logger.error("Error opening connection after connectivity check: %s", err)
else:
self.logger.debug(
"Device is not connectable, retrying in %s seconds",
Expand All @@ -285,13 +267,18 @@ async def task_ping_until_alive(self) -> None:
await asyncio.sleep(self.ping_interval)

async def task_refresh_info(self) -> None:
"""Task to refresh some device info every minute"""
"""Task to refresh some device info every 20s"""
while True:
# wait until the connection is established
await self.connection_event.wait()
cmds = [
["GetMacAddress"],
["GetTemperatures"],
# get signal info in case a change was missed and its sitting in limbo
["GetIncomingSignalInfo"],
["GetOutgoingSignalInfo"],
["GetAspectRatio"],
["GetMaskingRatio"],
]
for cmd in cmds:
await self.add_command_to_queue(cmd)
Expand All @@ -317,6 +304,32 @@ def stop(self) -> None:
self.stop_heartbeat.set()
self.stop_commands_flag.set()

async def _write_with_timeout(self, data: bytes) -> None:
"""Write data to the socket with a timeout."""
if not self.connected:
self.logger.error("Connection not established. Reconnecting")
await self._reconnect()

if not self.writer:
self.logger.error("Writer is not initialized. Reconnecting")
await self._reconnect()

async def write_and_drain() -> None:
if not self.writer:
raise ConnectionError("Writer is not initialized")
self.writer.write(data)
await self.writer.drain()

try:
async with self.lock:
await asyncio.wait_for(write_and_drain(), timeout=self.connect_timeout)
except TimeoutError:
self.logger.error("Write operation timed out after %s seconds", self.connect_timeout)
await self._reconnect()
except (ConnectionResetError, OSError) as err:
self.logger.error("Error writing to socket: %s", err)
await self._reconnect()

async def _reconnect(self) -> None:
"""
Initiate a persistent connection to the device.
Expand All @@ -325,10 +338,10 @@ async def _reconnect(self) -> None:
"""
# it will not try to connect until ping is successful
if await self.is_device_connectable():
self.logger.info("Device is online")
self.logger.debug("Device is online")

try:
self.logger.info("Connecting to Envy: %s:%s", self.host, self.port)
self.logger.debug("Connecting to Envy: %s:%s", self.host, self.port)

# Command client
self.reader, self.writer = await asyncio.wait_for(
Expand All @@ -343,14 +356,14 @@ async def _reconnect(self) -> None:
await self._set_connected(True)
self.stop_heartbeat.clear()
# send a heartbeat now
self.logger.debug("Sending heartbeat")
await self.send_heartbeat(once=True)

self.logger.info("Connection established")
self.stop_commands_flag.clear()

except (TimeoutError, HeartBeatError, OSError) as err:
self.logger.error(
"Heartbeat failed. Connection not established %s", err
)
self.logger.error("Heartbeat failed. Connection not established %s", err)
await self._set_connected(False)
raise ConnectionError("Heartbeat failed") from err
else:
Expand All @@ -359,15 +372,22 @@ async def _reconnect(self) -> None:
await self._handle_power_off()

async def is_device_connectable(self) -> bool:
"""Check if the device is connectable without ping."""
try:
async with asyncio.timeout(SMALL_DELAY):
_, writer = await asyncio.open_connection(self.host, self.port)
writer.close()
await writer.wait_closed()
return True
except (asyncio.TimeoutError, ConnectionRefusedError, OSError):
return False
"""Check if the device is connectable without ping. The device is only connectable when on."""
retry_count = 0
# loop because upgrading firmware can take a few seconds and will kill the connection
while retry_count < 10:
try:
async with asyncio.timeout(SMALL_DELAY):
_, writer = await asyncio.open_connection(self.host, self.port)
writer.close()
await writer.wait_closed()
return True
except (TimeoutError, ConnectionRefusedError, OSError):
await asyncio.sleep(SMALL_DELAY)
retry_count += 1
continue
self.logger.debug("Device is not connectable")
return False

async def _clear_attr(self) -> None:
"""
Expand Down Expand Up @@ -400,7 +420,7 @@ async def open_connection(self) -> None:
self.logger.debug("Connection opened")
except (AckError, ConnectionError) as err:
self.logger.error("Error opening connection: %s", err)
raise ConnectionError("Error opening connection") from err
raise

# once connected, try to refresh data once in the case the device was turned connected to while on already
cmds = [
Expand Down Expand Up @@ -441,9 +461,7 @@ async def _construct_command(self, raw_command: list[str]) -> tuple[bytes, str]:
bytes: the value to send in bytes
str: the 'msg' field in the Enum used to filter notifications
"""
self.logger.debug(
"raw_command: %s -- raw_command length: %s", raw_command, len(raw_command)
)
self.logger.debug("raw_command: %s -- raw_command length: %s", raw_command, len(raw_command))
skip_val = False
# HA seems to always send commands as a list even if you set them as a str

Expand Down Expand Up @@ -500,9 +518,7 @@ async def _construct_command(self, raw_command: list[str]) -> tuple[bytes, str]:
cmd = command_base + Footer.footer.value

except KeyError as exc:
raise NotImplementedError(
"Incorrect parameter given for command"
) from exc
raise NotImplementedError("Incorrect parameter given for command") from exc
else:
cmd = command_name + Footer.footer.value

Expand All @@ -529,15 +545,8 @@ async def send_command(self, command: list) -> None:

self.logger.debug("Using values: %s %s", cmd, enum_type)

if not self.connected:
self.logger.error("Connection not established")
raise ConnectionError("Device not connected")

try:
async with self.lock:
if self.writer:
self.writer.write(cmd)
await self.writer.drain()
await self._write_with_timeout(cmd)
except (ConnectionResetError, TimeoutError, OSError) as err:
self.logger.error("Error writing command to socket: %s", err)
raise ConnectionError("Failed to send command") from err
Expand Down Expand Up @@ -574,8 +583,6 @@ async def power_on(self, mac: str = "") -> None:
"""
Power on the device
"""
# start processing commands
self.stop_commands_flag.clear()

# use the detected mac or one that is supplied at init or function call
mac_to_use = self.mac_address or self.mac or mac
Expand All @@ -585,9 +592,7 @@ async def power_on(self, mac: str = "") -> None:
send_magic_packet(mac_to_use, logger=self.logger)
else:
# without wol, you cant power on the device
self.logger.warning(
"No mac provided, no action to take. Implement your own WOL automation"
)
self.logger.warning("No mac provided, no action to take. Implement your own WOL automation")

async def power_off(self, standby: bool = False) -> None:
"""
Expand All @@ -599,6 +604,9 @@ async def power_off(self, standby: bool = False) -> None:
# set the flag to delay the ping task to avoid race conditions
self.powered_off_recently = True
if self.connected:
await self.send_command(["Standby"] if standby else ["PowerOff"])
try:
await self.send_command(["Standby"] if standby else ["PowerOff"])
except ConnectionError as err:
self.logger.error("Error sending power off command: %s", err)

await self.close_connection() #
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pre-commit
mypy
ruff
pydantic
pre-commit
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="py_madvr2",
version="1.6.29",
version="1.6.32",
author="iloveicedgreentea2",
description="A package to control MadVR Envy over IP",
long_description=long_description,
Expand Down
53 changes: 53 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# type: ignore
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch

import pytest

from madvr.madvr import Madvr


@pytest.fixture
def mock_madvr():
with patch("madvr.madvr.asyncio.open_connection", new_callable=AsyncMock), patch(
"madvr.madvr.Madvr.connected", new_callable=PropertyMock, return_value=True
):
madvr = Madvr("192.168.1.100")
# ignore mypy
#
madvr.writer = AsyncMock()
madvr.reader = AsyncMock()
madvr._set_connected = AsyncMock()
madvr._clear_attr = AsyncMock()
madvr.is_device_connectable = AsyncMock()
madvr.close_connection = AsyncMock()
madvr._construct_command = AsyncMock()
madvr._write_with_timeout = AsyncMock()
madvr.stop = MagicMock()
madvr.stop_commands_flag = MagicMock()
madvr.stop_heartbeat = MagicMock()
madvr.add_command_to_queue = AsyncMock()
madvr._reconnect = AsyncMock()
madvr._write_with_timeout = AsyncMock()

# Mock the background tasks to prevent warnings
madvr.task_handle_queue = AsyncMock()
madvr.task_read_notifications = AsyncMock()
# madvr.send_heartbeat = AsyncMock()
madvr.task_ping_until_alive = AsyncMock()
madvr.task_refresh_info = AsyncMock()
yield madvr


@pytest.fixture
def mock_send_magic_packet():
with patch("madvr.madvr.send_magic_packet") as mock:
yield mock


@pytest.fixture
def mock_wait_for():
async def mock_wait_for_func(coro, timeout):
return await coro

with patch("asyncio.wait_for", mock_wait_for_func):
yield
Loading