Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load tests dynamically + Parameterize the test pipeline #2745

Merged
merged 4 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions tests_e2e/orchestrator/lib/agent_test_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Microsoft Azure Linux Agent
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code to load the tests from their JSON files (you can find them under the "testsuites" directory)

#
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import importlib.util
import json

from pathlib import Path
from typing import Any, Dict, List, Type

from tests_e2e.scenarios.lib.agent_test import AgentTest


class TestSuiteDescription(object):
"""
Description of the test suite loaded from its JSON file.
"""
name: str
tests: List[Type[AgentTest]]


class AgentTestLoader(object):
"""
Loads the description of a set of test suites
"""
def __init__(self, test_source_directory: Path):
"""
The test_source_directory parameter must be the root directory of the end-to-end tests (".../WALinuxAgent/tests_e2e")
"""
self._root: Path = test_source_directory/"scenarios"

def load(self, test_suites: str) -> List[TestSuiteDescription]:
"""
Loads the specified 'test_suites', which are given as a string of comma-separated suite names or a JSON description
of a single test_suite.

When given as a comma-separated list, each item must correspond to the name of the JSON files describing s suite (those
files are located under the .../WALinuxAgent/tests_e2e/scenarios/testsuites directory). For example,
if test_suites == "agent_bvt, fast-track" then this method will load files agent_bvt.json and fast-track.json.

When given as a JSON string, the value must correspond to the description a single test suite, for example

{
"name": "AgentBvt",

"tests": [
"bvts/extension_operations.py",
"bvts/run_command.py",
"bvts/vm_access.py"
]
}
"""
# Attempt to parse 'test_suites' as the JSON description for a single suite
try:
return [self._load_test_suite(json.loads(test_suites))]
except json.decoder.JSONDecodeError:
pass

# Else, it should be a comma-separated list of description files
description_files: List[Path] = [self._root/"testsuites"/f"{t.strip()}.json" for t in test_suites.split(',')]
return [self._load_test_suite(AgentTestLoader._load_file(s)) for s in description_files]

def _load_test_suite(self, test_suite: Dict[str, Any]) -> TestSuiteDescription:
"""
Creates a TestSuiteDescription from its JSON representation, which has been loaded by JSON.loads and is passed
to this method as a dictionary
"""
suite = TestSuiteDescription()
suite.name = test_suite["name"]
suite.tests = []
for source_file in [self._root/"tests"/t for t in test_suite["tests"]]:
suite.tests.extend(AgentTestLoader._load_tests(source_file))
return suite

@staticmethod
def _load_tests(source_file: Path) -> List[Type[AgentTest]]:
"""
Takes a 'source_file', which must be a Python module, and returns a list of all the classes derived from AgentTest.
"""
spec = importlib.util.spec_from_file_location(f"tests_e2e.scenarios.{source_file.name}", str(source_file))
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# return all the classes in the module that are subclasses of AgentTest but are not AgentTest itself.
return [v for v in module.__dict__.values() if isinstance(v, type) and issubclass(v, AgentTest) and v != AgentTest]

@staticmethod
def _load_file(file: Path):
"""Helper to load a JSON file"""
try:
with file.open() as f:
return json.load(f)
except Exception as e:
raise Exception(f"Can't load {file}: {e}")


173 changes: 114 additions & 59 deletions tests_e2e/orchestrator/lib/agent_test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import contextlib
import logging
import re

from assertpy import fail
from enum import Enum
from pathlib import Path
from threading import current_thread, RLock
from typing import List, Type
from typing import Any, Dict, List

# Disable those warnings, since 'lisa' is an external, non-standard, dependency
# E0401: Unable to import 'lisa' (import-error)
Expand All @@ -31,17 +33,19 @@
Logger,
Node,
TestSuite,
TestSuiteMetadata
TestSuiteMetadata,
TestCaseMetadata,
)
from lisa.sut_orchestrator import AZURE # pylint: disable=E0401
from lisa.sut_orchestrator.azure.common import get_node_context, AzureNodeSchema # pylint: disable=E0401

import makepkg
from azurelinuxagent.common.version import AGENT_VERSION
from tests_e2e.scenarios.lib.agent_test import AgentTest
from tests_e2e.orchestrator.lib.agent_test_loader import AgentTestLoader, TestSuiteDescription
from tests_e2e.scenarios.lib.agent_test_context import AgentTestContext
from tests_e2e.scenarios.lib.identifiers import VmIdentifier
from tests_e2e.scenarios.lib.logging import log as agent_test_logger # Logger used by the tests
from tests_e2e.scenarios.lib.logging import set_current_thread_log


def _initialize_lisa_logger():
Expand All @@ -68,6 +72,29 @@ def _initialize_lisa_logger():
_initialize_lisa_logger()


#
# Helper to change the current thread name temporarily
#
@contextlib.contextmanager
def _set_thread_name(name: str):
initial_name = current_thread().name
current_thread().name = name
try:
yield
finally:
current_thread().name = initial_name


#
# Possible values for the collect_logs parameter
#
class CollectLogs(Enum):
Always = 'always' # Always collect logs
Failed = 'failed' # Collect logs only on test failures
No = 'no' # Never collect logs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These enum values match the values for LISA's 'keep_environment' runbook variable (which is used to prevent LISA from deleting the test VMs at the end of the runbook)


@TestSuiteMetadata(area="waagent", category="", description="")
class AgentTestSuite(TestSuite):
"""
Base class for Agent test suites. It provides facilities for setup, execution of tests and reporting results. Derived
Expand All @@ -81,14 +108,17 @@ def __init__(self, vm: VmIdentifier, paths: AgentTestContext.Paths, connection:
self.log: Logger = None
self.node: Node = None
self.runbook_name: str = None
self.suite_name: str = None
self.image_name: str = None
self.test_suites: List[str] = None
self.collect_logs: str = None
self.skip_setup: bool = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the single test run from command line will not change with this addition of new parameters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, don't get your question... Previously the runbook was executed all the tests (methods marked with TestMetadata), now it executes the tests passed as parameters in the 'test_suites' variable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is this change the behavior of single test run that we execute from command line (developer scenario) like running vmaccess.py etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Without my changes, one cannot pass the test to run as parameter. The runbook executes all the methods marked as tests with metadata. With my changes, one can pass the test suite to run and the runbook will execute only that suite. To execute a single test instead of a test suite, one can pass a piece of JSON describing a test suite comprised of that single test, e.g. { "name": "MyTestRun", "tests": [ "bvts/vmaccess.py"] } (however, the current pipeline has a bug and it cannot take spaces in is parameters, that is already fixed in my next PR. the parameter for the runbook itself can take spaces)


def __init__(self, metadata: TestSuiteMetadata) -> None:
super().__init__(metadata)
# The context is initialized by _set_context() via the call to execute()
self.__context: AgentTestSuite._Context = None

def _set_context(self, node: Node, log: Logger):
def _set_context(self, node: Node, variables: Dict[str, Any], log: Logger):
connection_info = node.connection_info
node_context = get_node_context(node)
runbook = node.capability.get_extended_runbook(AzureNodeSchema, AZURE)
Expand All @@ -113,7 +143,23 @@ def _set_context(self, node: Node, log: Logger):

self.__context.log = log
self.__context.node = node
self.__context.suite_name = f"{self._metadata.full_name}_{runbook.marketplace.offer}-{runbook.marketplace.sku}"
self.__context.image_name = f"{runbook.marketplace.offer}-{runbook.marketplace.sku}"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we had a single test suite and we were using its name during logging and to create files. Now we have multiple suites and I am using the image name, or the concatenation of test suite name + image name as appropriate.

self.__context.test_suites = AgentTestSuite._get_required_parameter(variables, "test_suites")
self.__context.collect_logs = AgentTestSuite._get_required_parameter(variables, "collect_logs")
self.__context.skip_setup = AgentTestSuite._get_required_parameter(variables, "skip_setup")

self._log.info(
"Test suite parameters: [skip_setup: %s] [collect_logs: %s] [test_suites: %s]",
self.context.skip_setup,
self.context.collect_logs,
self.context.test_suites)

@staticmethod
def _get_required_parameter(variables: Dict[str, Any], name: str) -> Any:
value = variables.get(name)
if value is None:
raise Exception(f"The runbook is missing required parameter '{name}'")
return value
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the new Pipeline parameters that are then passed to the AgentTestSuite via the LISA runbook.


@property
def context(self):
Expand Down Expand Up @@ -234,94 +280,103 @@ def _collect_node_logs(self) -> None:

# Copy the tarball to the local logs directory
remote_path = "/tmp/waagent-logs.tgz"
local_path = Path.home()/'logs'/'{0}.tgz'.format(self.context.suite_name)
local_path = Path.home()/'logs'/'{0}.tgz'.format(self.context.image_name)
self._log.info("Copying %s:%s to %s", self.context.node.name, remote_path, local_path)
self.context.node.shell.copy_back(remote_path, local_path)
except: # pylint: disable=bare-except
self._log.exception("Failed to collect logs from the test machine")

def execute(self, node: Node, log: Logger, test_suite: List[Type[AgentTest]]) -> None:
@TestCaseMetadata(description="", priority=0)
def execute(self, node: Node, variables: Dict[str, Any], log: Logger) -> None:
"""
Executes each of the AgentTests in the given List. Note that 'test_suite' is a list of test classes, rather than
instances of the test class (this method will instantiate each of these test classes).
"""
self._set_context(node, log)
self._set_context(node, variables, log)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'variables' are the variables from the runbook


failed: List[str] = [] # List of failed tests (names only)

# The thread name is added to self._log, set it to the current test suite while we execute it
thread_name = current_thread().name
current_thread().name = self.context.suite_name
with _set_thread_name(self.context.image_name): # The thread name is added to self._log
try:
if not self.context.skip_setup:
self._setup()

# We create a separate log file for the test suite.
suite_log_file: Path = Path.home()/'logs'/f"{self.context.suite_name}.log"
agent_test_logger.set_current_thread_log(suite_log_file)
try:
if not self.context.skip_setup:
self._setup_node()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skipping setup is mainly for developer scenarios... when testing something small, one can skip the setup and execute the runbook faster.


try:
self._setup()
test_suites: List[TestSuiteDescription] = AgentTestLoader(self.context.test_source_directory).load(self.context.test_suites)

try:
self._setup_node()
for suite in test_suites:
failed.extend(self._execute_test_suite(suite))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored out the test execution to its own method


finally:
collect = self.context.collect_logs
if collect == CollectLogs.Always or collect == CollectLogs.Failed and len(failed) > 0:
self._collect_node_logs()

except: # pylint: disable=bare-except
# Note that we report the error to the LISA log and then re-raise it. We log it here
# so that the message is decorated with the thread name in the LISA log; we re-raise
# to let LISA know the test errored out (LISA will report that error one more time
# in its log)
self._log.exception("UNHANDLED EXCEPTION")
raise

finally:
self._clean_up()

# Fail the entire test suite if any test failed; this exception is handled by LISA
if len(failed) > 0:
fail(f"{[self.context.image_name]} One or more tests failed: {failed}")

def _execute_test_suite(self, suite: TestSuiteDescription) -> List[str]:
suite_name = suite.name
suite_full_name = f"{suite_name}-{self.context.image_name}"

with _set_thread_name(suite_full_name): # The thread name is added to self._log
with set_current_thread_log(Path.home()/'logs'/f"{suite_full_name}.log"):
agent_test_logger.info("")
agent_test_logger.info("**************************************** %s ****************************************", self.context.suite_name)
agent_test_logger.info("**************************************** %s ****************************************", suite_name)
agent_test_logger.info("")

results: List[str] = []
failed: List[str] = []
summary: List[str] = []

for test in suite.tests:
test_name = test.__name__
test_full_name = f"{suite_name}-{test_name}"

for test in test_suite:
result: str = "[UNKNOWN]"
test_full_name = f"{self.context.suite_name} {test.__name__}"
agent_test_logger.info("******** Executing %s", test_full_name)
agent_test_logger.info("******** Executing %s", test_name)
self._log.info("******** Executing %s", test_full_name)
agent_test_logger.info("")

try:

test(self.context).run()
result = f"[Passed] {test_full_name}"

summary.append(f"[Passed] {test_name}")
agent_test_logger.info("******** [Passed] %s", test_name)
self._log.info("******** [Passed] %s", test_full_name)
except AssertionError as e:
failed.append(test.__name__)
result = f"[Failed] {test_full_name}"
agent_test_logger.error("%s", e)
self._log.error("%s", e)
summary.append(f"[Failed] {test_name}")
failed.append(test_full_name)
agent_test_logger.error("******** [Failed] %s: %s", test_name, e)
self._log.error("******** [Failed] %s", test_full_name)
except: # pylint: disable=bare-except
failed.append(test.__name__)
result = f"[Error] {test_full_name}"
agent_test_logger.exception("UNHANDLED EXCEPTION IN %s", test_full_name)
summary.append(f"[Error] {test_name}")
failed.append(test_full_name)
agent_test_logger.exception("UNHANDLED EXCEPTION IN %s", test_name)
self._log.exception("UNHANDLED EXCEPTION IN %s", test_full_name)

agent_test_logger.info("******** %s", result)
agent_test_logger.info("")
self._log.info("******** %s", result)
results.append(result)

agent_test_logger.info("")
agent_test_logger.info("********* [Test Results]")
agent_test_logger.info("")
for r in results:
for r in summary:
agent_test_logger.info("\t%s", r)
agent_test_logger.info("")

finally:
self._collect_node_logs()

except: # pylint: disable=bare-except
agent_test_logger.exception("UNHANDLED EXCEPTION IN %s", self.context.suite_name)
# Note that we report the error to the LISA log and then re-raise it. We log it here
# so that the message is decorated with the thread name in the LISA log; we re-raise
# to let LISA know the test errored out (LISA will report that error one more time
# in its log)
self._log.exception("UNHANDLED EXCEPTION IN %s", self.context.suite_name)
raise

finally:
self._clean_up()
agent_test_logger.close_current_thread_log()
current_thread().name = thread_name

# Fail the entire test suite if any test failed; this exception is handled by LISA
if len(failed) > 0:
fail(f"{[self.context.suite_name]} One or more tests failed: {failed}")
return failed

def execute_script_on_node(self, script_path: Path, parameters: str = "", sudo: bool = False) -> int:
"""
Expand Down
Loading