Skip to content

Commit

Permalink
Add tshark at tests monitoring and stop threads
Browse files Browse the repository at this point in the history
Signed-off-by: Marius Sincovici <marius.sincovici@nordsec.com>

* add mechanism to stop the running threads when tests are finished
* record traffic with tshart
  • Loading branch information
mariusSincovici committed Jan 16, 2025
1 parent a70aae6 commit 3e4f6d8
Showing 1 changed file with 52 additions and 31 deletions.
83 changes: 52 additions & 31 deletions test/qa/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datetime
import io
import subprocess
import signal
import threading
import time

Expand Down Expand Up @@ -39,57 +41,76 @@ def _print_with_timestamp(*args, **kwargs):

@pytest.fixture(scope="function", autouse=True)
def setup_check_internet_connection():
print("~~~setup_check_internet_connection: Check internet connection before starting tests")
if network.is_available():
print("~~~setup_check_internet_connection: BEFORE TEST network.is_available SUCCESS")
else:
print("~~~setup_check_internet_connection: BEFORE TEST network.is_available FAILURE")
if not network.is_available():
print("setup_check_internet_connection: no internet available before running the tests")


@pytest.fixture(scope="session", autouse=True)
def start_system_monitoring():
print("~~~start_system_monitoring: Start system monitoring")
print("Run start_system_monitoring")

connection_check_thread = threading.Thread(target=_check_connection_to_ip, args=("1.1.1.1",), daemon=True)
connection_out_vpn_check_thread = threading.Thread(target=_check_connection_to_ip_outside_vpn, args=("1.1.1.1",), daemon=True)
dns_resolver_thread = threading.Thread(target=_check_dns_resolution, args=("nordvpn.com",), daemon=True)
connection_check_thread.start()
connection_out_vpn_check_thread.start()
dns_resolver_thread.start()
# control running threads execution
stop_event = threading.Event()

threads = []

threads.append(threading.Thread(target=_check_connection_to_ip, args=["1.1.1.1", stop_event], daemon=True))
threads.append(threading.Thread(target=_check_connection_to_ip_outside_vpn, args=["1.1.1.1", stop_event], daemon=True))
threads.append(threading.Thread(target=_check_dns_resolution, args=["nordvpn.com", stop_event], daemon=True))
threads.append(threading.Thread(target=_capture_traffic, args=[stop_event], daemon=True))
print(threads)

for thread in threads:
thread.start()

# execute tests
yield

# stop monitoring after execution
stop_event.set()
for thread in threads:
thread.join()

def _check_connection_to_ip(ip_address):
while True:
def _check_connection_to_ip(ip_address, stop_event):
print("Start _check_connection_to_ip")
while not stop_event.is_set():
try:
print(f"~~~_check_connection_to_ip: {ip_address}")
"icmp_seq=" in sh.ping("-c", "3", "-W", "3", ip_address) # noqa: B015
print(f"~~~_check_connection_to_ip: IN-PING {ip_address} SUCCESS")
except sh.ErrorReturnCode as e:
print(f"~~~_check_connection_to_ip: IN-PING {ip_address} FAILURE: {e}.")
time.sleep(_CHECK_FREQUENCY)
except Exception as e: # noqa: BLE001
print(f"_check_connection_to_ip: IN-PING {ip_address} FAILURE: {e}.")
stop_event.wait(_CHECK_FREQUENCY)


def _check_connection_to_ip_outside_vpn(ip_address):
while True:
def _check_connection_to_ip_outside_vpn(ip_address, stop_event):
print("Start _check_connection_to_ip_outside_vpn")
while not stop_event.is_set():
try:
print(f"~~~_check_connection_to_ip_outside_vpn: {ip_address}")
"icmp_seq=" in sh.sudo.ping("-c", "3", "-W", "3", "-m", "57841", ip_address) # noqa: B015
print(f"~~~_check_connection_to_ip_outside_vpn: OUT-PING {ip_address} SUCCESS")
except sh.ErrorReturnCode as e:
print(f"~~~_check_connection_to_ip_outside_vpn: OUT-PING {ip_address} FAILURE: {e}.")
time.sleep(_CHECK_FREQUENCY)
except Exception as e: # noqa: BLE001
print(f"~~~_check_connection_to_ip_outside_vpn: {ip_address} FAILURE: {e}.")
stop_event.wait(_CHECK_FREQUENCY)


def _check_dns_resolution(domain):
while True:
def _check_dns_resolution(domain, stop_event):
print("Start _check_dns_resolution")
while not stop_event.is_set():
try:
print(f"~~~_check_dns_resolution: {domain}")
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8']
resolver.resolve(domain, 'A') # 'A' for IPv4
print(f"~~~_check_dns_resolution: DNS {domain} SUCCESS")
except Exception as e: # noqa: BLE001
print(f"~~~_check_dns_resolution: DNS {domain} FAILURE. Error: {e}")
time.sleep(_CHECK_FREQUENCY)
stop_event.wait(_CHECK_FREQUENCY)


def _capture_traffic(stop_event):
print("Start _capture_traffic")
# use circular log files, keep only 2 latest each 10MB size
command = ["tshark", "-a", "filesize:10240", "-b", "files:2", "-i", "any", "-w", os.environ["WORKDIR"] + "/dist/logs/tshark_capture.pcap"]
print("Starting tshark")
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stop_event.wait()
print("Stopping tshark with Ctrl+C")
process.send_signal(signal.SIGINT)
print(f"tshark out {process.stdout.read().strip()} - {process.stderr.read().strip()}")
time.sleep(1)

0 comments on commit 3e4f6d8

Please sign in to comment.