diff --git a/madvr/madvr.py b/madvr/madvr.py index 6ea54b4..ad65867 100644 --- a/madvr/madvr.py +++ b/madvr/madvr.py @@ -4,9 +4,7 @@ import logging from typing import Final, Union -import re -import time -import socket +import asyncio from madvr.commands import ACKs, Footer, Commands, Enum, Connections from madvr.errors import AckError, RetryExceededError, HeartBeatError @@ -32,131 +30,81 @@ def __init__( self.MADVR_OK: Final = Connections.welcome.value self.HEARTBEAT: Final = Connections.heartbeat.value - # Incoming attrs - self.incoming_res = "" - self.incoming_frame_rate = "" - self.incoming_color_space = "" - self.incoming_bit_depth = "" - self.hdr_flag = False - self.incoming_colorimetry = "" - self.incoming_black_levels = "" - self.incoming_aspect_ratio = "" - self.aspect_ratio: float = 0 - - # Temps - self.temp_gpu: int = 0 - self.temp_hdmi: int = 0 - self.temp_cpu: int = 0 - self.temp_mainboard: int = 0 - - # Outgoing signal - self.outgoing_res = "" - self.outgoing_frame_rate = "" - self.outgoing_color_space = "" - self.outgoing_bit_depth = "" - self.outgoing_colorimetry = "" - self.outgoing_hdr_flag = False - self.outgoing_black_levels = "" + # stores all attributes + self.msg_dict = {} # Sockets - self.client = None - self.notification_client = None + self.reader = None + self.writer = None + self.reader_lock = asyncio.Lock() + self.writer_lock = asyncio.Lock() + self.is_closed = False # Envy does not have an are you on cmd, just assuming its on based on active connection self.is_on = False self.read_limit = 8000 self.command_read_timeout = 3 + self.logger.debug("Running in debug mode") - def _clear_attr(self) -> None: + async def _clear_attr(self) -> None: """ Clear instance attr so HA doesn't report stale values """ # Incoming attrs - self.incoming_res = "" - self.incoming_frame_rate = "" - self.incoming_color_space = "" - self.incoming_bit_depth = "" - self.hdr_flag = False - self.incoming_colorimetry = "" - self.incoming_black_levels = "" - self.incoming_aspect_ratio = "" - self.aspect_ratio = 0 - - # Temps - self.temp_gpu = 0 - self.temp_hdmi = 0 - self.temp_cpu = 0 - self.temp_mainboard = 0 - - # Outgoing signal - self.outgoing_res = "" - self.outgoing_frame_rate = "" - self.outgoing_color_space = "" - self.outgoing_bit_depth = "" - self.outgoing_colorimetry = "" - self.outgoing_hdr_flag = False - self.outgoing_black_levels = "" - - def close_connection(self) -> None: - """close the connection""" - try: - self.client.close() - except AttributeError: - # means its already closed - pass - - try: - self.notification_client.close() - except AttributeError: - # means its already closed - pass + self.msg_dict = {} - self.client = None - self.notification_client = None + async def close_connection(self) -> None: + """close the connection""" + async with self.writer_lock: + try: + self.writer.close() + await self.writer.wait_closed() + except AttributeError: + # means its already closed + pass + self.reader = None self.is_closed = True # Clear attr - self._clear_attr() + await self._clear_attr() - def open_connection(self) -> None: + async def open_connection(self) -> None: """Open a connection""" self.logger.debug("Starting open connection") - try: - self._reconnect() + await self._reconnect() except AckError as err: self.logger.error(err) - def _reconnect(self) -> None: + async def _reconnect(self) -> None: """ Initiate keep-alive connection. This should handle any error and reconnect eventually. Raises AckError """ - - while True: + i = 0 + while i < 10: try: self.logger.info("Connecting to Envy: %s:%s", self.host, self.port) # Command client - self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.client.settimeout(self.command_read_timeout) - self.client.connect((self.host, self.port)) - - # Notifications client - self.notification_client = socket.socket( - socket.AF_INET, socket.SOCK_STREAM - ) - self.notification_client.settimeout(20) - self.notification_client.connect((self.host, self.port)) + async with self.reader_lock: + self.reader, self.writer = await asyncio.wait_for( + asyncio.open_connection(self.host, self.port), + timeout=self.command_read_timeout, + ) # Test heartbeat self.logger.debug("Handshaking") # Make sure first message says WELCOME - msg_envy = self.client.recv(self.read_limit) + async with self.reader_lock: + msg_envy = await asyncio.wait_for( + self.reader.readline(), + timeout=10, + ) # Check if first 7 char match if self.MADVR_OK != msg_envy[:7]: @@ -165,20 +113,12 @@ def _reconnect(self) -> None: f"Notification did not reply with correct greeting: {msg_envy}" ) - # Make sure first message says WELCOME for notifications - msg_envy = self.notification_client.recv(self.read_limit) - - # Check if first 7 char match - if self.MADVR_OK != msg_envy[:7]: - raise AckError( - f"Notification did not reply with correct greeting: {msg_envy}" - ) self.logger.info("Waiting for envy to be available") # envy needs some time to setup new connections - time.sleep(3) + await asyncio.sleep(3) # handshake func - self._send_heartbeat() + await self._send_heartbeat() self.logger.info("Connection established") @@ -191,68 +131,47 @@ def _reconnect(self) -> None: self.logger.warning( "Error sending heartbeat, retrying in %s seconds", 2 ) - time.sleep(2) + await asyncio.sleep(2) + i += 1 continue # includes conn refused # backoff to not spam HA - except socket.timeout: - self.logger.warning( - "Connecting timeout, retrying in %s seconds", 2 - ) - time.sleep(2) + except asyncio.TimeoutError: + self.logger.warning("Connecting timeout, retrying in %s seconds", 2) + await asyncio.sleep(2) + i += 1 continue + + # includes conn refused + # backoff to not spam HA except OSError as err: - self.logger.warning( - "Connecting failed, retrying in %s seconds", 2 - ) + self.logger.warning("Connecting failed, retrying in %s seconds", 2) self.logger.debug(err) - time.sleep(2) + await asyncio.sleep(2) + i += 1 continue - def _send_heartbeat(self) -> None: + async def _send_heartbeat(self) -> None: """ Send a heartbeat to keep connection open - You should wrap this in try with socket.error and socket.timeout exceptions + You should wrap this in try with OSError and asyncio.TimeoutError exceptions Raises HeartBeatError exception """ - i = 0 - - while i < 3: - # confirm can send heartbeat, ready for commands - self.logger.debug("Sending heartbeats") - - self.client.send(self.HEARTBEAT) - - # Recv until we find ok or timeout - if not self._read_until_ok(self.client): - i += 1 - continue - - # send heartbeat with notification and read ack and then regular client - # ensure both work one after the next - self.notification_client.send(self.HEARTBEAT) - - if not self._read_until_ok(self.notification_client): - i += 1 - continue - - # send again on regular client to make sure both clients work - self.client.send(self.HEARTBEAT) - - if not self._read_until_ok(self.client): - i += 1 - continue + # confirm can send heartbeat, ready for commands + self.logger.debug("Sending heartbeats") - self.logger.debug("Handshakes complete") + async with self.writer_lock: + self.writer.write(self.HEARTBEAT) + await self.writer.drain() - return - - raise HeartBeatError("Sending heartbeat fatal error") + self.logger.debug("Handshakes complete") - def _construct_command(self, raw_command: Union[str, list]) -> tuple[bytes, bool, str]: + async def _construct_command( + self, raw_command: Union[str, list] + ) -> tuple[bytes, str]: """ Transform commands into their byte values from the string value @@ -260,16 +179,18 @@ def _construct_command(self, raw_command: Union[str, list]) -> tuple[bytes, bool Return: bytes: the value to send in bytes - bool: if its informational str: the 'msg' field in the Enum used to filter notifications """ - self.logger.debug("raw_command: %s", raw_command) - self.logger.debug("raw_command length: %s", len(raw_command)) + self.logger.debug( + "raw_command: %s -- raw_command length: %s", raw_command, len(raw_command) + ) + skip_val = False # HA seems to always send commands as a list even if you set them as a str # This lets you use single cmds or something with val like KEYPRESS # If len is 1, then try to split, otherwise its just one word + if len(raw_command) == 1: try: # ['key_press, menu'] @@ -285,14 +206,15 @@ def _construct_command(self, raw_command: Union[str, list]) -> tuple[bytes, bool skip_val = True # if there are more than two values, this is incorrect, error elif len(raw_command) > 2: - self.logger.error("More than two command values provided. Envy does not have more than 2 command values e.g KeyPress MENU") + self.logger.error( + "More than two command values provided. Envy does not have more than 2 command values e.g KeyPress MENU" + ) raise NotImplementedError(f"Too many values provided {raw_command}") else: # else a command was provided as a proper list ['keypress', 'menu'] # raw command will be a list of 2 command, value = raw_command - self.logger.debug("using command %s", command) # Check if command is implemented @@ -300,7 +222,7 @@ def _construct_command(self, raw_command: Union[str, list]) -> tuple[bytes, bool raise NotImplementedError(f"Command not implemented: {command}") # construct the command with nested Enums - command_name, val, is_info = Commands[command].value + command_name, val, _ = Commands[command].value # if there is a value to process if not skip_val: @@ -317,34 +239,12 @@ def _construct_command(self, raw_command: Union[str, list]) -> tuple[bytes, bool self.logger.debug("constructed command: %s", cmd) - return cmd, is_info.value, val + return cmd, val - def _read_until_ok(self, client: socket.socket) -> bool: - """ - Read the buffer until we get ok or timeout because a timeout means no more to read - """ - counter = 0 - # exit loop if exceed maximum checks for ok or retries - while True: - try: - data = client.recv(self.read_limit) - self.logger.debug("_read_until_ok: data found: %s", data) - - if ACKs.reply.value not in data: - self.logger.debug("OK not found yet counter: %s", counter) - counter += 1 - continue - - self.logger.debug("OK found counter: %s", counter) - return True - - except (socket.timeout, socket.error): - self.logger.debug("Timeout reading ack with counter: %s", counter) - return False - - def send_command(self, command: Union[str, list]) -> str: + async def send_command(self, command: Union[str, list]) -> str: """ send a given command same as the official madvr ones + To keep this simple, just send the command without reading response command: str - command to send like KeyPress, MENU Raises RetryExceededError @@ -352,19 +252,19 @@ def send_command(self, command: Union[str, list]) -> str: # Verify the command is supported try: - cmd, is_info, enum_type = self._construct_command(command) + cmd, enum_type = await self._construct_command(command) except NotImplementedError: return f"Command not found: {command}" - self.logger.debug("using values: %s %s %s", cmd, is_info, enum_type) + self.logger.debug("using values: %s %s", cmd, enum_type) # reconnect if client is not init or its off - if self.client is None or self.is_on is False: + if self.reader is None or self.is_on is False: # Don't reconnect if poweroff or standby because HA will complain if "PowerOff" in command or "Standby" in command: return "" self.logger.debug("Connection lost - restarting connection") - self._reconnect() + await self._reconnect() # simple retry logic retry_count = 0 @@ -372,14 +272,11 @@ def send_command(self, command: Union[str, list]) -> str: while retry_count < 5: # Send the command try: - self.client.send(cmd) - - if not self._read_until_ok(self.client): - self.logger.debug("OK not found when reading response, retrying") - retry_count += 1 - continue + async with self.writer_lock: + self.writer.write(cmd) + await self.writer.drain() - except socket.timeout: + except asyncio.TimeoutError: self.logger.debug("OK receipt timed out, retrying") retry_count += 1 continue @@ -390,155 +287,79 @@ def send_command(self, command: Union[str, list]) -> str: retry_count += 1 continue - # If its an info, get the rest of the info - try: - if is_info: - # read response - res = self.client.recv(self.read_limit) - self.logger.debug("Response from info: %s", res) - - # TODO one day: whatever is not part of our command, write that to attr - # e.g if we ask signal, and get notification for aspect, detect it and write that - # so polling isn't required - - # process the output - return self._process_info(res, enum_type["msg"].value) - - return "" - - except socket.timeout: - self.logger.debug("Ack receipt timed out, retrying") - retry_count += 1 - continue + return "" # if we got here something went wrong - self.close_connection() - self._reconnect() + await self.close_connection() + await self._reconnect() return "" - def read_notifications(self, wait_forever: bool) -> None: + async def read_notifications(self) -> None: """ - Listen for notifications. Meant to run as a background task - wait_forever: bool -> if true, it will block forever. False useful for testing + Read notifications from the server and update attributes """ - # TODO: should this be a second integration? - # Is there a way for HA to always poll in background?z + # read notifications + try: + async with self.reader_lock: + msg = await asyncio.wait_for( + self.reader.read(self.read_limit), + timeout=self.command_read_timeout, + ) - # Receive data in a loop - i = 0 - while wait_forever or i < 5: - try: - self.notification_client.sendall(self.HEARTBEAT) - data = self.notification_client.recv(self.read_limit) - time.sleep(1) - i += 1 - if data: - # process data which will get added as class attr - self._process_notifications(data) - else: - # just wait forever for data - # send heartbeat keep conn open - self.notification_client.sendall(self.HEARTBEAT) - continue - except socket.timeout: - self.logger.debug("Connection timed out") - self.notification_client.sendall(self.HEARTBEAT) - continue - except socket.error: - self.logger.debug("Connection error") - self.notification_client.sendall(self.HEARTBEAT) - continue + except asyncio.TimeoutError: + self.logger.warning("Reading notifications timed out") + return - def poll_status(self) -> None: - """ - Poll the status for attributes and write them to state - """ - try: - # send heartbeat so it doesnt close our connection - self._send_heartbeat() - # Get incoming signal info - for cmd in [ - "GetIncomingSignalInfo", - "GetAspectRatio", - "GetTemperatures", - "GetOutgoingSignalInfo", - ]: - res = self.send_command(cmd) - self.logger.debug("poll_status resp: %s", res) - self._process_notifications(res) - except (socket.timeout, socket.error, HeartBeatError) as err: - self.logger.error("Error getting update: %s", err) - - def _process_notifications(self, input_data: Union[bytes, str]) -> None: - """ - Process arbitrary stream of notifications and set them as instance attr - """ + except OSError: + self.logger.warning("Reading notifications failed") + return - self.logger.debug("Processing data for %s", input_data) - try: - if isinstance(input_data, bytes): - # This pattern will be able to extract from the byte encoded stream - pattern = r"([A-Z][^\r\n]*)\r\n" - groups = re.findall(pattern, input_data.decode()) - # split the groups, the first element is the key, remove the key from the values - # for each match in groups, add it to dict - # {"key": ["val1", "val2"]} - val_dict: dict = { - group.split()[0]: group.replace(group.split()[0] + " ", "").split() - for group in groups - } - self.logger.debug("groups: %s", groups) - else: - self.logger.debug("input data: %s", input_data) - # This pattern extracts from a regular string - # If we have a str its assumed we are dealing with one output stream - pattern = r"([A-Z][A-Za-z]*)\s(.*)" - match = re.search(pattern, input_data) - val_dict: dict = {match.group(1): match.group(2).split()} - - self.logger.debug("val dict: %s", val_dict) - except AttributeError: + # if the msg is empty, then the connection is closed + if not msg: + # self.logger.warning("Connection closed") + # await self.close_connection() return - # Map values to attr - incoming_signal_info: list = val_dict.get("IncomingSignalInfo", []) - if incoming_signal_info: - self.logger.debug("incoming signal detected: %s", incoming_signal_info) - self.incoming_res = incoming_signal_info[0] - self.incoming_frame_rate = incoming_signal_info[1] - self.incoming_color_space = incoming_signal_info[3] - self.incoming_bit_depth = incoming_signal_info[4] - self.hdr_flag = "HDR" in incoming_signal_info[5] - self.incoming_colorimetry = incoming_signal_info[6] - self.incoming_black_levels = incoming_signal_info[7] - self.incoming_aspect_ratio = incoming_signal_info[8] - - aspect_ratio: list = val_dict.get("AspectRatio", []) - if aspect_ratio: - self.logger.debug("incoming AR detected: %s", aspect_ratio) - self.aspect_ratio = float(aspect_ratio[1]) - - get_temps: list = val_dict.get("Temperatures", []) - if get_temps: - self.logger.debug("incoming Temps detected: %s", get_temps) - self.temp_gpu = int(get_temps[0]) - self.temp_hdmi = int(get_temps[1]) - self.temp_cpu = int(get_temps[2]) - self.temp_mainboard = int(get_temps[3]) - - outgoing_signal_info: list = val_dict.get("OutgoingSignalInfo", []) - if outgoing_signal_info: - self.logger.debug("outgoing signal detected: %s", outgoing_signal_info) - self.outgoing_res = outgoing_signal_info[0] - self.outgoing_frame_rate = outgoing_signal_info[1] - self.outgoing_color_space = outgoing_signal_info[3] - self.outgoing_bit_depth = outgoing_signal_info[4] - self.outgoing_hdr_flag = "HDR" in outgoing_signal_info[5] - self.outgoing_colorimetry = outgoing_signal_info[6] - self.outgoing_black_levels = outgoing_signal_info[7] - - def power_off(self, standby=False) -> str: + # if the msg is not empty, process it + await self._process_notifications(msg.decode("utf-8")) + + async def _process_notifications(self, msg: str) -> None: + """Parse a message and store the attributes and values in a dictionary""" + self.logger.debug("Processing notifications: %s", msg) + notifications = msg.split("\r\n") + # for each /r/n split it by title, then the rest are values + for notification in notifications: + title, *signal_info = notification.split(" ") + # dont process empty values + if not signal_info: + continue + # at least madvr sends attributes in a consistent order + # could use zip here but why? this works and is simple + if "IncomingSignalInfo" in title: + self.msg_dict["incoming_res"] = signal_info[0] + self.msg_dict["incoming_frame_rate"] = signal_info[1] + self.msg_dict["incoming_color_space"] = signal_info[3] + self.msg_dict["incoming_bit_depth"] = signal_info[4] + self.msg_dict["hdr_flag"] = "HDR" in signal_info[5] + self.msg_dict["incoming_colorimetry"] = signal_info[6] + self.msg_dict["incoming_black_levels"] = signal_info[7] + self.msg_dict["incoming_aspect_ratio"] = signal_info[8] + elif "OutgoingSignalInfo" in title: + self.msg_dict["outgoing_res"] = signal_info[0] + self.msg_dict["outgoing_frame_rate"] = signal_info[1] + self.msg_dict["outgoing_color_space"] = signal_info[3] + self.msg_dict["outgoing_bit_depth"] = signal_info[4] + self.msg_dict["outgoing_hdr_flag"] = "HDR" in signal_info[5] + self.msg_dict["outgoing_colorimetry"] = signal_info[6] + self.msg_dict["outgoing_black_levels"] = signal_info[7] + elif "AspectRatio" in title: + self.msg_dict["aspect_res"] = signal_info[0] + self.msg_dict["aspect_dec"] = float(signal_info[1]) + self.msg_dict["aspect_int"] = signal_info[2] + self.msg_dict["aspect_name"] = signal_info[3] + + async def power_off(self, standby=False) -> str: """ turn off madvr it must have a render thread active at the moment once it is off, you can't turn it back on unless standby=True @@ -549,8 +370,8 @@ def power_off(self, standby=False) -> str: # dont do anything if its off besides mark it off # sending command will open connection try: - res = self.send_command("Standby" if standby else "PowerOff") - self.close_connection() + res = await self.send_command("Standby" if standby else "PowerOff") + await self.close_connection() self.is_on = False return res @@ -558,24 +379,6 @@ def power_off(self, standby=False) -> str: except RetryExceededError: return "Retries Exceeded" - def _process_info(self, input_data: bytes, filter_str: str) -> str: - """ - Process info given input and a filter str to only return the thing we want - e.g for IncomingSignalInfo - b"Ok\r\nIncomingSignalInfo 3840x2160 23.976p 2D 422 - 10bit HDR10 2020 TV 16:9\r\nAspect Ratio ETC ETC\r\n" - turns into -> IncomingSignalInfo 3840x2160 23.976p 2D 422 10bit HDR10 2020 TV 16:9 - - This is used by _process_notifications to turn it into a dict, add to instance attr - """ - - lines = input_data.decode().split("\r\n") - for line in lines: - if filter_str in line: - return line - - return "" - def print_commands(self) -> str: """ Print out all supported commands diff --git a/setup.py b/setup.py index 67455a2..8550cfe 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="py_madvr", - version="1.2.3", + version="1.3.11", author="iloveicedgreentea", description="A package to control MadVR Envy over IP", long_description=long_description, diff --git a/tests/testMadVR.py b/tests/testMadVR.py index a90c806..1f58c34 100644 --- a/tests/testMadVR.py +++ b/tests/testMadVR.py @@ -1,90 +1,45 @@ -# pylint: disable=protected-access, missing-function-docstring, missing-class-docstring, invalid-name, missing-module-docstring +# # pylint: disable=protected-access, missing-function-docstring, missing-class-docstring, invalid-name, missing-module-docstring + import unittest import os -from dotenv import load_dotenv from madvr.madvr import Madvr -import time - -# load .env -load_dotenv() - -host = os.getenv("MADVR_HOST") -madvr = Madvr(host=host, connect_timeout=10) +# import time +madvr = Madvr(host="192.168.88.38", port=44077) -class TestLib(unittest.TestCase): - def test_a_process_info(self): - """Verify the process info func works""" - info = madvr._process_info( - b"Ok\r\nIncomingSignalInfo 3840x2160 23.976p 2D 422 10bit HDR10 2020 TV 16:9\r\nAspect Ratio ETC ETC\r\n", - "IncomingSignalInfo", +# write a class to test the madvr class +class TestMadvr(unittest.IsolatedAsyncioTestCase): + async def test_process_info(self): + """Verify the process info func works to assign attrs, with modifications""" + await madvr._process_notifications( + 'Welcome\r\nOk\r\nIncomingSignalInfo 3840x2160 23.976p 2D 422 10bit HDR10 2020 TV 16:9\r\nAspectRatio 129x123 1.78 178 "TV"\r\nOutgoingSignalInfo 3840x2160 23.976p 2D 444 12bit HDR10 2020 TV\r\n' + ) + # pretend like we got a new signal + await madvr._process_notifications( + "IncomingSignalInfo 4096x2160 60p 2D 444 12bit HDR10 2020 TV 16:9\r\n" ) self.assertEqual( - info, "IncomingSignalInfo 3840x2160 23.976p 2D 422 10bit HDR10 2020 TV 16:9" + madvr.msg_dict, + { + "incoming_res": "4096x2160", + "incoming_frame_rate": "60p", + "incoming_color_space": "444", + "incoming_bit_depth": "12bit", + "hdr_flag": True, + "incoming_colorimetry": "2020", + "incoming_black_levels": "TV", + "incoming_aspect_ratio": "16:9", + "aspect_res": "129x123", + "aspect_dec": 1.78, + "aspect_int": "178", + "aspect_name": '"TV"', + "outgoing_res": "3840x2160", + "outgoing_frame_rate": "23.976p", + "outgoing_color_space": "444", + "outgoing_bit_depth": "12bit", + "outgoing_hdr_flag": True, + "outgoing_colorimetry": "2020", + "outgoing_black_levels": "TV", + }, ) - - def test_b_construct_cmd(self): - """test constructing commands""" - try: - cmd, isinfo, _ = madvr._construct_command("KeyPress, MENU") - self.assertEqual(cmd, b"KeyPress MENU\r\n") - self.assertEqual(isinfo, False) - except NotImplementedError: - self.fail("Command exists but got not implemented") - - try: - cmd, isinfo, _ = madvr._construct_command("GetAspectRatio") - self.assertEqual(cmd, b"GetAspectRatio\r\n") - self.assertEqual(isinfo, True) - except NotImplementedError: - self.fail("Command exists but got not implemented") - - # Should fail - with self.assertRaises(NotImplementedError): - _, _, _ = madvr._construct_command("KeyPress, FAKE") - with self.assertRaises(NotImplementedError): - _, _, _ = madvr._construct_command("Fakecmd, Param") - - -class TestFunctional(unittest.TestCase): - """Test suite""" - - madvr.open_connection() - - def test_a_poll(self): - # self.skipTest("") - - self.assertEqual(madvr.incoming_res, "") - madvr.poll_status() - self.assertNotEqual(madvr.incoming_res, "") - self.assertNotEqual(madvr.outgoing_color_space, "") - - - signal = madvr.send_command("GetAspectRatio") - self.assertNotEqual(signal, "Command not found") - - signal = madvr.send_command("KeyPress, UP") - self.assertNotEqual(signal, "Command not found") - - fake_cmd = madvr.send_command("FakeCommand") - self.assertEqual(fake_cmd, "Command not found") - - fake_param = madvr.send_command("KeyPress, FAKEKEY") - self.assertEqual(fake_param, "Command not found") - - # print("running test_d_notifications") - # madvr.read_notifications(True) - - cmd = madvr.send_command("KeyPress, MENU") - self.assertNotEqual(cmd, "Command not found") - - time.sleep(1) - cmd = madvr.send_command("KeyPress, MENU") - self.assertNotEqual(cmd, "Command not found") - - madvr.close_connection() - self.assertEqual(True, madvr.is_closed) - -if __name__ == "__main__": - unittest.main(failfast=True)