Skip to content

Commit

Permalink
extension telemetry pipeline scenario (#2901)
Browse files Browse the repository at this point in the history
* Update version to dummy 1.0.0.0'

* Revert version change

* Barebones for etp

* Scenario should own VM because of conf change

* Add extension telemetry pipeline test

* Clean up code

* Improve log messages

* Fix pylint errors

* Improve logging

* Improve code comments

* VmAccess is not supported on flatcar

* Address PR comments

* Add support_distros in VmExtensionIdentifier

* Fix logic for support_distros in VmExtensionIdentifier

* Use run_remote_test for remote script
  • Loading branch information
maddieford committed Aug 28, 2023
1 parent b138dfe commit e86f1a5
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 3 deletions.
2 changes: 1 addition & 1 deletion tests_e2e/orchestrator/runbook.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ variable:
#
# The test suites to execute
- name: test_suites
value: "agent_bvt, no_outbound_connections, extensions_disabled, agent_not_provisioned, fips, agent_ext_workflow, agent_update, agent_status, multi_config_ext, agent_cgroups, ext_cgroups, agent_firewall"
value: "agent_bvt, no_outbound_connections, extensions_disabled, agent_not_provisioned, fips, agent_ext_workflow, agent_update, agent_status, multi_config_ext, agent_cgroups, ext_cgroups, agent_firewall, ext_telemetry_pipeline"
- name: cloud
value: "AzureCloud"
is_case_visible: true
Expand Down
9 changes: 9 additions & 0 deletions tests_e2e/test_suites/ext_telemetry_pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# This test ensures that the agent does not throw any errors while trying to transmit events to wireserver. It does not
# validate if the events actually make it to wireserver
#
name: "ExtTelemetryPipeline"
tests:
- "agent_bvt/vm_access.py"
- "ext_telemetry_pipeline/ext_telemetry_pipeline.py"
images: "random(endorsed)"
4 changes: 2 additions & 2 deletions tests_e2e/tests/agent_bvt/vm_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
class VmAccessBvt(AgentTest):
def run(self):
ssh: SshClient = self._context.create_ssh_client()
if "-flatcar" in ssh.run_command("uname -a"):
raise TestSkipped("Currently VMAccess is not supported on Flatcar")
if not VmExtensionIds.VmAccess.supports_distro(ssh.run_command("uname -a")):
raise TestSkipped("Currently VMAccess is not supported on this distro")

# Try to use a unique username for each test run (note that we truncate to 32 chars to
# comply with the rules for usernames)
Expand Down
109 changes: 109 additions & 0 deletions tests_e2e/tests/ext_telemetry_pipeline/ext_telemetry_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python3

# Microsoft Azure Linux Agent
#
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# This test ensures that the agent does not throw any errors while trying to transmit events to wireserver. It does not
# validate if the events actually make it to wireserver
# TODO: Update this test suite to verify that the agent picks up AND sends telemetry produced by extensions
# (work item https://dev.azure.com/msazure/One/_workitems/edit/24903999)
#

import random
from typing import List, Dict, Any

from azurelinuxagent.common.conf import get_etp_collection_period

from tests_e2e.tests.lib.agent_test import AgentTest
from tests_e2e.tests.lib.identifiers import VmExtensionIds
from tests_e2e.tests.lib.logging import log
from tests_e2e.tests.lib.ssh_client import SshClient
from tests_e2e.tests.lib.virtual_machine_extension_client import VirtualMachineExtensionClient


class ExtTelemetryPipeline(AgentTest):
def run(self):
ssh_client: SshClient = self._context.create_ssh_client()

# Extensions we will create events for
extensions = ["Microsoft.Azure.Extensions.CustomScript"]
if VmExtensionIds.VmAccess.supports_distro(ssh_client.run_command("uname -a")):
extensions.append("Microsoft.OSTCExtensions.VMAccessForLinux")

# Set the etp collection period to 30 seconds instead of default 5 minutes
default_collection_period = get_etp_collection_period()
log.info("")
log.info("Set ETP collection period to 30 seconds on the test VM [%s]", self._context.vm.name)
output = ssh_client.run_command("update-waagent-conf Debug.EtpCollectionPeriod=30", use_sudo=True)
log.info("Updated waagent conf with Debug.ETPCollectionPeriod=30 completed:\n%s", output)

# Add CSE to the test VM twice to ensure its events directory still exists after re-enabling
log.info("")
log.info("Add CSE to the test VM...")
cse = VirtualMachineExtensionClient(self._context.vm, VmExtensionIds.CustomScript, resource_name="CustomScript")
cse.enable(settings={'commandToExecute': "echo 'enable'"})
cse.assert_instance_view()

log.info("")
log.info("Add CSE to the test VM again...")
cse.enable(settings={'commandToExecute': "echo 'enable again'"})
cse.assert_instance_view()

# Check agent log to verify ETP is enabled
command = "agent_ext_workflow-check_data_in_agent_log.py --data 'Extension Telemetry pipeline enabled: True'"
log.info("")
log.info("Check agent log to verify ETP is enabled...")
log.info("Remote command [%s] completed:\n%s", command, ssh_client.run_command(command))

# Add good extension events for each extension and check that the TelemetryEventsCollector collects them
# TODO: Update test suite to check that the agent is picking up the events generated by the extension, instead
# of generating on the extensions' behalf
# (work item - https://dev.azure.com/msazure/One/_workitems/edit/24903999)
log.info("")
log.info("Add good extension events and check they are reported...")
max_events = random.randint(10, 50)
self._run_remote_test(f"ext_telemetry_pipeline-add_extension_events.py "
f"--extensions {','.join(extensions)} "
f"--num_events_total {max_events}", use_sudo=True)
log.info("")
log.info("Good extension events were successfully reported.")

# Add invalid events for each extension and check that the TelemetryEventsCollector drops them
log.info("")
log.info("Add bad extension events and check they are reported...")
self._run_remote_test(f"ext_telemetry_pipeline-add_extension_events.py "
f"--extensions {','.join(extensions)} "
f"--num_events_total {max_events} "
f"--num_events_bad {random.randint(5, max_events-5)}", use_sudo=True)
log.info("")
log.info("Bad extension events were successfully dropped.")

# Reset the etp collection period to the default value so this VM can be shared with other suites
log.info("")
log.info("Reset ETP collection period to {0} seconds on the test VM [{1}]".format(default_collection_period, self._context.vm.name))
output = ssh_client.run_command("update-waagent-conf Debug.EtpCollectionPeriod={0}".format(default_collection_period), use_sudo=True)
log.info("Updated waagent conf with default collection period completed:\n%s", output)

def get_ignore_error_rules(self) -> List[Dict[str, Any]]:
return [
{'message': r"Dropped events for Extension.*"}
]


if __name__ == "__main__":
ExtTelemetryPipeline.run_from_command_line()
15 changes: 15 additions & 0 deletions tests_e2e/tests/lib/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# limitations under the License.
#

from typing import Dict, List


class VmIdentifier(object):
def __init__(self, cloud: str, location: str, subscription: str, resource_group: str, name: str):
Expand Down Expand Up @@ -45,6 +47,19 @@ def __init__(self, publisher: str, ext_type: str, version: str):
self.type: str = ext_type
self.version: str = version

unsupported_distros: Dict[str, List[str]] = {
"Microsoft.OSTCExtensions.VMAccessForLinux": ["flatcar"]
}

def supports_distro(self, system_info: str) -> bool:
"""
Returns true if an unsupported distro name for the extension is found in the provided system info
"""
ext_unsupported_distros = VmExtensionIdentifier.unsupported_distros.get(self.publisher + "." + self.type)
if ext_unsupported_distros is not None and any(distro in system_info for distro in ext_unsupported_distros):
return False
return True

def __str__(self):
return f"{self.publisher}.{self.type}"

Expand Down
224 changes: 224 additions & 0 deletions tests_e2e/tests/scripts/ext_telemetry_pipeline-add_extension_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
#!/usr/bin/env pypy3

# Microsoft Azure Linux Agent
#
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Adds extension events for each provided extension and verifies the TelemetryEventsCollector collected or dropped them
#

import argparse
import json
import os
import sys
import time
import uuid

from assertpy import fail
from datetime import datetime, timedelta
from random import choice
from typing import List

from tests_e2e.tests.lib.agent_log import AgentLog
from tests_e2e.tests.lib.logging import log


def add_extension_events(extensions: List[str], bad_event_count=0, no_of_events_per_extension=50):
def missing_key(bad_event):
key = choice(list(bad_event.keys()))
del bad_event[key]
return "MissingKeyError: {0}".format(key)

def oversize_error(bad_event):
bad_event["EventLevel"] = "ThisIsAnOversizeError\n" * 300
return "OversizeEventError"

def empty_message(bad_event):
bad_event["Message"] = ""
return "EmptyMessageError"

errors = [
missing_key,
oversize_error,
empty_message
]

sample_ext_event = {
"EventLevel": "INFO",
"Message": "Starting IaaS ScriptHandler Extension v1",
"Version": "1.0",
"TaskName": "Extension Info",
"EventPid": "3228",
"EventTid": "1",
"OperationId": "519e4beb-018a-4bd9-8d8e-c5226cf7f56e",
"TimeStamp": "2019-12-12T01:20:05.0950244Z"
}

sample_messages = [
"Starting IaaS ScriptHandler Extension v1",
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.",
"The quick brown fox jumps over the lazy dog",
"Cursus risus at ultrices mi.",
"Doing Something",
"Iaculis eu non diam phasellus.",
"Doing other thing",
"Look ma, lemons",
"Pretium quam vulputate dignissim suspendisse.",
"Man this is insane",
"I wish it worked as it should and not as it ain't",
"Ut faucibus pulvinar elementum integer enim neque volutpat ac tincidunt."
"Did you get any of that?",
"Non-English message - 此文字不是英文的"
"κόσμε",
"�",
"Quizdeltagerne spiste jordbær med fløde, mens cirkusklovnen Wolther spillede på xylofon.",
"Falsches Üben von Xylophonmusik quält jeden größeren Zwerg",
"Zwölf Boxkämpfer jagten Eva quer über den Sylter Deich",
"Heizölrückstoßabdämpfung",
"Γαζέες καὶ μυρτιὲς δὲν θὰ βρῶ πιὰ στὸ χρυσαφὶ ξέφωτο",
"Ξεσκεπάζω τὴν ψυχοφθόρα βδελυγμία",
"El pingüino Wenceslao hizo kilómetros bajo exhaustiva lluvia y frío, añoraba a su querido cachorro.",
"Portez ce vieux whisky au juge blond qui fume sur son île intérieure, à côté de l'alcôve ovoïde, où les bûches",
"se consument dans l'âtre, ce qui lui permet de penser à la cænogenèse de l'être dont il est question",
"dans la cause ambiguë entendue à Moÿ, dans un capharnaüm qui, pense-t-il, diminue çà et là la qualité de son œuvre.",
"D'fhuascail Íosa, Úrmhac na hÓighe Beannaithe, pór Éava agus Ádhaimh",
"Árvíztűrő tükörfúrógép",
"Kæmi ný öxi hér ykist þjófum nú bæði víl og ádrepa",
"Sævör grét áðan því úlpan var ónýt",
"いろはにほへとちりぬるを わかよたれそつねならむ うゐのおくやまけふこえて あさきゆめみしゑひもせす",
"イロハニホヘト チリヌルヲ ワカヨタレソ ツネナラム ウヰノオクヤマ ケフコエテ アサキユメミシ ヱヒモセスン",
"? דג סקרן שט בים מאוכזב ולפתע מצא לו חברה איך הקליטה"
"Pchnąć w tę łódź jeża lub ośm skrzyń fig",
"В чащах юга жил бы цитрус? Да, но фальшивый экземпляр!",
"๏ เป็นมนุษย์สุดประเสริฐเลิศคุณค่า กว่าบรรดาฝูงสัตว์เดรัจฉาน",
"Pijamalı hasta, yağız şoföre çabucak güvendi."
]

for ext in extensions:
bad_count = bad_event_count
event_dir = os.path.join("/var/log/azure/", ext, "events")
if not os.path.isdir(event_dir):
fail(f"Expected events dir: {event_dir} does not exist")

log.info("")
log.info("Expected dir: {0} exists".format(event_dir))
log.info("Creating random extension events for {0}. No of Good Events: {1}, No of Bad Events: {2}".format(
ext, no_of_events_per_extension - bad_event_count, bad_event_count))

new_opr_id = str(uuid.uuid4())
event_list = []

for _ in range(no_of_events_per_extension):
event = sample_ext_event.copy()
event["OperationId"] = new_opr_id
event["TimeStamp"] = datetime.utcnow().strftime(u'%Y-%m-%dT%H:%M:%S.%fZ')
event["Message"] = choice(sample_messages)

if bad_count != 0:
# Make this event a bad event
reason = choice(errors)(event)
bad_count -= 1

# Missing key error might delete the TaskName key from the event
if "TaskName" in event:
event["TaskName"] = "{0}. This is a bad event: {1}".format(event["TaskName"], reason)
else:
event["EventLevel"] = "{0}. This is a bad event: {1}".format(event["EventLevel"], reason)

event_list.append(event)

file_name = os.path.join(event_dir, '{0}.json'.format(int(time.time() * 1000000)))
log.info("Create json with extension events in event directory: {0}".format(file_name))
with open("{0}.tmp".format(file_name), 'w+') as f:
json.dump(event_list, f)
os.rename("{0}.tmp".format(file_name), file_name)


def wait_for_extension_events_dir_empty(extensions: List[str]):
# By ensuring events dir to be empty, we verify that the telemetry events collector has completed its run
start_time = datetime.now()
timeout = timedelta(minutes=2)
ext_event_dirs = [os.path.join("/var/log/azure/", ext, "events") for ext in extensions]

while (start_time + timeout) >= datetime.now():
log.info("")
log.info("Waiting for extension event directories to be empty...")
all_dir_empty = True
for event_dir in ext_event_dirs:
if not os.path.exists(event_dir) or len(os.listdir(event_dir)) != 0:
log.info("Dir: {0} is not yet empty".format(event_dir))
all_dir_empty = False

if all_dir_empty:
log.info("Extension event directories are empty: \n{0}".format(ext_event_dirs))
return

time.sleep(20)

fail("Extension events dir not empty before 2 minute timeout")


def main():
# This test is a best effort test to ensure that the agent does not throw any errors while trying to transmit
# events to wireserver. We're not validating if the events actually make it to wireserver.

parser = argparse.ArgumentParser()
parser.add_argument("--extensions", dest='extensions', type=str, required=True)
parser.add_argument("--num_events_total", dest='num_events_total', type=int, required=True)
parser.add_argument("--num_events_bad", dest='num_events_bad', type=int, required=False, default=0)
args, _ = parser.parse_known_args()

extensions = args.extensions.split(',')
add_extension_events(extensions=extensions, bad_event_count=args.num_events_bad,
no_of_events_per_extension=args.num_events_total)

# Ensure that the event collector ran after adding the events
wait_for_extension_events_dir_empty(extensions=extensions)

# Sleep for a min to ensure that the TelemetryService has enough time to send events and report errors if any
time.sleep(60)
found_error = False
agent_log = AgentLog()

log.info("")
log.info("Check that the TelemetryEventsCollector did not emit any errors while collecting and reporting events...")
telemetry_event_collector_name = "TelemetryEventsCollector"
for agent_record in agent_log.read():
if agent_record.thread == telemetry_event_collector_name and agent_record.level == "ERROR":
found_error = True
log.info("waagent.log contains the following errors emitted by the {0} thread: \n{1}".format(telemetry_event_collector_name, agent_record))

if found_error:
fail("Found error(s) emitted by the TelemetryEventsCollector, but none were expected.")
log.info("The TelemetryEventsCollector did not emit any errors while collecting and reporting events")

for ext in extensions:
good_count = args.num_events_total - args.num_events_bad
log.info("")
if not agent_log.agent_log_contains("Collected {0} events for extension: {1}".format(good_count, ext)):
fail("The TelemetryEventsCollector did not collect the expected number of events: {0} for {1}".format(good_count, ext))
log.info("All {0} good events for {1} were collected by the TelemetryEventsCollector".format(good_count, ext))

if args.num_events_bad != 0:
log.info("")
if not agent_log.agent_log_contains("Dropped events for Extension: {0}".format(ext)):
fail("The TelemetryEventsCollector did not drop bad events for {0} as expected".format(ext))
log.info("The TelemetryEventsCollector dropped bad events for {0} as expected".format(ext))

sys.exit(0)


if __name__ == "__main__":
main()

0 comments on commit e86f1a5

Please sign in to comment.