From 109304082ee44982e7bf0ac7eec03d2cfaa0f6f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damian=20Michalak-Szmaci=C5=84ski?= Date: Mon, 6 Mar 2023 21:05:58 +0100 Subject: [PATCH] Add to flake8 in workflow and fix python files (#25251) --- .flake8 | 1 - .../integration-tests/common/device.py | 1 - .../integration-tests/common/fixtures.py | 6 +-- .../integration-tests/common/fvp_device.py | 3 +- .../integration-tests/common/utils.py | 11 ++--- .../openiotsdk/integration-tests/conftest.py | 2 - .../integration-tests/lock-app/test_app.py | 45 +++++++++--------- .../integration-tests/shell/test_app.py | 46 +++++++++---------- 8 files changed, 53 insertions(+), 62 deletions(-) diff --git a/.flake8 b/.flake8 index 95261687837818..c2a1ce2c574fa0 100644 --- a/.flake8 +++ b/.flake8 @@ -8,7 +8,6 @@ exclude = third_party # temporarily scan only directories with fixed files # TODO: Remove the paths below when all bugs are fixed src/tools/chip-cert/* - src/test_driver/openiotsdk/* src/test_driver/mbed/* src/test_driver/linux-cirque/* build/chip/java/tests/* diff --git a/src/test_driver/openiotsdk/integration-tests/common/device.py b/src/test_driver/openiotsdk/integration-tests/common/device.py index eff76e1c5ad413..51e939e135f3b3 100644 --- a/src/test_driver/openiotsdk/integration-tests/common/device.py +++ b/src/test_driver/openiotsdk/integration-tests/common/device.py @@ -96,7 +96,6 @@ def wait_for_output(self, search: str, timeout: float = 10, assert_timeout: bool if line: lines.append(line) if search in line: - end = time() return lines except queue.Empty: diff --git a/src/test_driver/openiotsdk/integration-tests/common/fixtures.py b/src/test_driver/openiotsdk/integration-tests/common/fixtures.py index 510512ed0c1f01..04d1927ae77f2b 100644 --- a/src/test_driver/openiotsdk/integration-tests/common/fixtures.py +++ b/src/test_driver/openiotsdk/integration-tests/common/fixtures.py @@ -19,13 +19,11 @@ import os import pathlib import shutil -from time import sleep import chip.CertificateAuthority import chip.native import pytest -from chip import ChipDeviceCtrl, exceptions -from chip.ChipStack import * +from chip import exceptions from .fvp_device import FvpDevice from .telnet_connection import TelnetConnection @@ -113,7 +111,7 @@ def controller(vendor_id, fabric_id, node_id): except exceptions.ChipStackException as ex: log.error("Controller initialization failed {}".format(ex)) return None - except: + except Exception: log.error("Controller initialization failed") return None diff --git a/src/test_driver/openiotsdk/integration-tests/common/fvp_device.py b/src/test_driver/openiotsdk/integration-tests/common/fvp_device.py index 4431b868cb3280..9fd1233be2e818 100644 --- a/src/test_driver/openiotsdk/integration-tests/common/fvp_device.py +++ b/src/test_driver/openiotsdk/integration-tests/common/fvp_device.py @@ -16,7 +16,6 @@ # import logging -import os import subprocess import threading from time import sleep @@ -45,7 +44,7 @@ def __init__(self, fvp, fvp_config, binary_file, connection_channel, network_int '-C', 'mps3_board.telnetterminal0.start_port={}'.format(self.connection_channel.get_port()) ] - if network_interface != None: + if network_interface is not None: self.fvp_cmd.extend(['-C', 'mps3_board.hostbridge.interfaceName={}'.format(network_interface), ]) else: self.fvp_cmd.extend(['-C', 'mps3_board.hostbridge.userNetworking=1']) diff --git a/src/test_driver/openiotsdk/integration-tests/common/utils.py b/src/test_driver/openiotsdk/integration-tests/common/utils.py index 69cbd4282cbc19..905876f36057da 100644 --- a/src/test_driver/openiotsdk/integration-tests/common/utils.py +++ b/src/test_driver/openiotsdk/integration-tests/common/utils.py @@ -16,13 +16,10 @@ # import asyncio -import ctypes import logging -import os import random import re import shlex -from time import sleep from chip import discovery, exceptions from chip.clusters import Objects as GeneratedObjects @@ -40,7 +37,7 @@ def get_setup_payload(device): :return: setup payload or None """ ret = device.wait_for_output("SetupQRCode") - if ret == None or len(ret) < 2: + if ret is None or len(ret) < 2: return None qr_code = re.sub( @@ -85,7 +82,7 @@ def connect_device(setupPayload, commissionableDevice, nodeId=None): :param nodeId: device node ID :return: node ID if connection successful or None if failed """ - if nodeId == None: + if nodeId is None: nodeId = random.randint(1, 1000000) pincode = int(setupPayload.attributes['SetUpPINCode']) @@ -191,7 +188,7 @@ def send_zcl_command(devCtrl, line, requestTimeoutMs: int = None): raise exceptions.UnknownCluster(cluster) cmdArgsWithType = allCommands.get(cluster).get(command, None) # When command takes no arguments, (not command) is True - if command == None: + if command is None: raise exceptions.UnknownCommand(cluster, command) args = FormatZCLArguments(cluster, cmdArgsLine, cmdArgsWithType) @@ -232,7 +229,7 @@ def read_zcl_attribute(devCtrl, line): raise exceptions.UnknownCluster(cluster) attrDetails = allAttrs.get(cluster).get(attribute, None) - if attrDetails == None: + if attrDetails is None: raise exceptions.UnknownAttribute(cluster, attribute) res = devCtrl.ZCLReadAttribute(cluster, attribute, int( diff --git a/src/test_driver/openiotsdk/integration-tests/conftest.py b/src/test_driver/openiotsdk/integration-tests/conftest.py index 90116956e55dec..0181911de7e8b2 100644 --- a/src/test_driver/openiotsdk/integration-tests/conftest.py +++ b/src/test_driver/openiotsdk/integration-tests/conftest.py @@ -15,8 +15,6 @@ # limitations under the License. # -import pytest - pytest_plugins = ['common.fixtures'] diff --git a/src/test_driver/openiotsdk/integration-tests/lock-app/test_app.py b/src/test_driver/openiotsdk/integration-tests/lock-app/test_app.py index 2895deebb532ff..b2fc33ef64ffff 100644 --- a/src/test_driver/openiotsdk/integration-tests/lock-app/test_app.py +++ b/src/test_driver/openiotsdk/integration-tests/lock-app/test_app.py @@ -16,11 +16,11 @@ # import logging -from time import sleep +import os import pytest from chip.clusters.Objects import DoorLock -from common.utils import * +from common.utils import connect_device, disconnect_device, discover_device, get_setup_payload, read_zcl_attribute, send_zcl_command log = logging.getLogger(__name__) @@ -36,32 +36,32 @@ def binaryPath(request, rootDir): @pytest.mark.smoketest def test_smoke_test(device): ret = device.wait_for_output("Open IoT SDK lock-app example application start") - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0 ret = device.wait_for_output("Open IoT SDK lock-app example application run") - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0 @pytest.mark.commissioningtest def test_commissioning(device, controller): - assert controller != None + assert controller is not None devCtrl = controller setupPayload = get_setup_payload(device) - assert setupPayload != None + assert setupPayload is not None commissionable_device = discover_device(devCtrl, setupPayload) - assert commissionable_device != None + assert commissionable_device is not None assert commissionable_device.vendorId == int(setupPayload.attributes['VendorID']) assert commissionable_device.productId == int(setupPayload.attributes['ProductID']) - assert commissionable_device.addresses[0] != None + assert commissionable_device.addresses[0] is not None nodeId = connect_device(setupPayload, commissionable_device) - assert nodeId != None + assert nodeId is not None log.info("Device {} connected".format(commissionable_device.addresses[0])) ret = device.wait_for_output("Commissioning completed successfully", timeout=30) - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0 assert disconnect_device(devCtrl, nodeId) @@ -75,20 +75,20 @@ def test_commissioning(device, controller): @pytest.mark.ctrltest def test_lock_ctrl(device, controller): - assert controller != None + assert controller is not None devCtrl = controller setupPayload = get_setup_payload(device) - assert setupPayload != None + assert setupPayload is not None commissionable_device = discover_device(devCtrl, setupPayload) - assert commissionable_device != None + assert commissionable_device is not None nodeId = connect_device(setupPayload, commissionable_device) - assert nodeId != None + assert nodeId is not None ret = device.wait_for_output("Commissioning completed successfully", timeout=30) - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0 err, res = send_zcl_command( devCtrl, "DoorLock SetUser {} {} operationType={} userIndex={} userName={} userUniqueId={} " @@ -105,7 +105,7 @@ def test_lock_ctrl(device, controller): ret = device.wait_for_output("Successfully set the user [mEndpointId={},index={},adjustedIndex=0]".format( LOCK_CTRL_TEST_ENDPOINT_ID, LOCK_CTRL_TEST_USER_INDEX)) - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0 err, res = send_zcl_command( devCtrl, "DoorLock GetUser {} {} userIndex={}".format(nodeId, LOCK_CTRL_TEST_ENDPOINT_ID, @@ -133,9 +133,12 @@ def test_lock_ctrl(device, controller): assert res.status == DoorLock.Enums.DlStatus.kSuccess ret = device.wait_for_output("Successfully set the credential [mEndpointId={},index={}," - "credentialType={},creator=1,modifier=1]".format(LOCK_CTRL_TEST_ENDPOINT_ID, - LOCK_CTRL_TEST_USER_INDEX, DoorLock.Enums.DlCredentialType.kPin)) - assert ret != None and len(ret) > 0 + "credentialType={},creator=1,modifier=1]".format( + LOCK_CTRL_TEST_ENDPOINT_ID, + LOCK_CTRL_TEST_USER_INDEX, + DoorLock.Enums.DlCredentialType.kPin + )) + assert ret is not None and len(ret) > 0 err, res = send_zcl_command( devCtrl, "DoorLock GetCredentialStatus {} {} credential=struct:DlCredential(credentialType={}," @@ -152,7 +155,7 @@ def test_lock_ctrl(device, controller): assert err == 0 ret = device.wait_for_output("Setting door lock state to \"Locked\" [endpointId={}]".format(LOCK_CTRL_TEST_ENDPOINT_ID)) - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0 err, res = read_zcl_attribute( devCtrl, "DoorLock LockState {} {}".format(nodeId, LOCK_CTRL_TEST_ENDPOINT_ID)) @@ -165,7 +168,7 @@ def test_lock_ctrl(device, controller): assert err == 0 ret = device.wait_for_output("Setting door lock state to \"Unlocked\" [endpointId={}]".format(LOCK_CTRL_TEST_ENDPOINT_ID)) - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0 err, res = read_zcl_attribute( devCtrl, "DoorLock LockState {} {}".format(nodeId, LOCK_CTRL_TEST_ENDPOINT_ID)) diff --git a/src/test_driver/openiotsdk/integration-tests/shell/test_app.py b/src/test_driver/openiotsdk/integration-tests/shell/test_app.py index 192257bceb0c3d..a10989b7f9379d 100644 --- a/src/test_driver/openiotsdk/integration-tests/shell/test_app.py +++ b/src/test_driver/openiotsdk/integration-tests/shell/test_app.py @@ -14,14 +14,12 @@ # limitations under the License. import logging -import re -from time import sleep +import os import chip.native import pytest from chip import exceptions from chip.setup_payload import SetupPayload -from common.utils import * from packaging import version log = logging.getLogger(__name__) @@ -72,9 +70,9 @@ def parse_boarding_codes_response(response): @pytest.mark.smoketest def test_smoke_test(device): ret = device.wait_for_output("Open IoT SDK shell example application start") - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0 ret = device.wait_for_output("Open IoT SDK shell example application run") - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0 @pytest.mark.ctrltest @@ -84,64 +82,64 @@ def test_command_check(device): except exceptions.ChipStackException as ex: log.error("CHIP initialization failed {}".format(ex)) assert False - except: + except Exception: log.error("CHIP initialization failed") assert False ret = device.wait_for_output("Open IoT SDK shell example application start") - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0 ret = device.wait_for_output("Open IoT SDK shell example application run") - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0 # Help ret = device.send(command="help", expected_output="Done") - assert ret != None and len(ret) > 1 + assert ret is not None and len(ret) > 1 shell_commands = get_shell_command(ret[1:-1]) assert set(SHELL_COMMAND_NAME) == set(shell_commands) # Echo ret = device.send(command="echo Hello", expected_output="Done") - assert ret != None and len(ret) > 1 + assert ret is not None and len(ret) > 1 assert "Hello" in ret[-2] # Log ret = device.send(command="log Hello", expected_output="Done") - assert ret != None and len(ret) > 1 + assert ret is not None and len(ret) > 1 assert "[INF] [TOO] Hello" in ret[-2] # Rand ret = device.send(command="rand", expected_output="Done") - assert ret != None and len(ret) > 1 + assert ret is not None and len(ret) > 1 assert ret[-2].rstrip().isdigit() # Base64 hex_string = "1234" ret = device.send(command="base64 encode {}".format( hex_string), expected_output="Done") - assert ret != None and len(ret) > 1 + assert ret is not None and len(ret) > 1 base64code = ret[-2] ret = device.send(command="base64 decode {}".format( base64code), expected_output="Done") - assert ret != None and len(ret) > 1 + assert ret is not None and len(ret) > 1 assert ret[-2].rstrip() == hex_string # Version ret = device.send(command="version", expected_output="Done") - assert ret != None and len(ret) > 1 + assert ret is not None and len(ret) > 1 assert "CHIP" in ret[-2].split()[0] app_version = ret[-2].split()[1] assert isinstance(version.parse(app_version), version.Version) # Config ret = device.send(command="config", expected_output="Done") - assert ret != None and len(ret) > 2 + assert ret is not None and len(ret) > 2 config = parse_config_response(ret[1:-1]) for param_name, value in config.items(): ret = device.send(command="config {}".format( param_name), expected_output="Done") - assert ret != None and len(ret) > 1 + assert ret is not None and len(ret) > 1 if "discriminator" in param_name: assert int(ret[-2].split()[0], 16) == value else: @@ -150,23 +148,23 @@ def test_command_check(device): new_value = int(config['discriminator']) + 1 ret = device.send(command="config discriminator {}".format( new_value), expected_output="Done") - assert ret != None and len(ret) > 1 + assert ret is not None and len(ret) > 1 assert "Setup discriminator set to: {}".format(new_value) in ret[-2] ret = device.send(command="config discriminator", expected_output="Done") - assert ret != None and len(ret) > 1 + assert ret is not None and len(ret) > 1 assert int(ret[-2].split()[0], 16) == new_value # Onboardingcodes ret = device.send(command="onboardingcodes none", expected_output="Done") - assert ret != None and len(ret) > 2 + assert ret is not None and len(ret) > 2 boarding_codes = parse_boarding_codes_response(ret[1:-1]) for param, value in boarding_codes.items(): ret = device.send(command="onboardingcodes none {}".format( param), expected_output="Done") - assert ret != None and len(ret) > 1 + assert ret is not None and len(ret) > 1 assert value == ret[-2].strip() try: @@ -175,7 +173,7 @@ def test_command_check(device): except exceptions.ChipStackError as ex: log.error(ex.msg) assert False - assert device_details != None and len(device_details) != 0 + assert device_details is not None and len(device_details) != 0 try: device_details = dict(SetupPayload().ParseManualPairingCode( @@ -183,8 +181,8 @@ def test_command_check(device): except exceptions.ChipStackError as ex: log.error(ex.msg) assert False - assert device_details != None and len(device_details) != 0 + assert device_details is not None and len(device_details) != 0 # Exit - should be the last check ret = device.send(command="exit", expected_output="Goodbye") - assert ret != None and len(ret) > 0 + assert ret is not None and len(ret) > 0