diff --git a/tests/docker/ducktape-deps/tinygo-wasi-transforms b/tests/docker/ducktape-deps/tinygo-wasi-transforms index bce38013c01f1..565df01315e22 100644 --- a/tests/docker/ducktape-deps/tinygo-wasi-transforms +++ b/tests/docker/ducktape-deps/tinygo-wasi-transforms @@ -10,7 +10,7 @@ function retry { local count=0 until "$@"; do exit=$? - count=$(($count + 1)) + count=$((count + 1)) if [ $count -lt $retries ]; then echo "Retry $count/$retries exited $exit, retrying..." else diff --git a/tests/rptest/clients/kubectl.py b/tests/rptest/clients/kubectl.py index 6cf3615865b82..37158d27ced6f 100644 --- a/tests/rptest/clients/kubectl.py +++ b/tests/rptest/clients/kubectl.py @@ -139,6 +139,33 @@ def _install(self): def logger(self) -> Logger: return self._redpanda.logger + def _local_captured(self, cmd: list[str]): + """Runs kubectl subcommands on a Cloud Agent + with streaming stdout and stderr to output + + Args: + cmd (list[str]): kubectl commands to run + + Raises: + RuntimeError: when return code is present + + Yields: + Generator[str]: Generator of output line by line + """ + self._redpanda.logger.info(cmd) + process = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + return_code = process.returncode + + if return_code: + raise RuntimeError(process.stdout, return_code) + + for line in process.stdout: # type: ignore + yield line + + return + def _local_cmd(self, cmd: list[str]): """Run the given command locally and return the stdout as bytes. Logs stdout and stderr on failure.""" @@ -146,18 +173,33 @@ def _local_cmd(self, cmd: list[str]): try: return subprocess.check_output(cmd, stderr=subprocess.PIPE) except subprocess.CalledProcessError as e: - self.logger.info( - f'Command failed (rc={e.returncode}).\n' + - f'--------- stdout -----------\n{e.stdout.decode()}' + - f'--------- stderr -----------\n{e.stderr.decode()}') + self.logger.warning( + f"Command failed '{cmd}' (rc={e.returncode}).\n" + + f"--------- stdout -----------\n{e.stdout.decode()}" + + f"--------- stderr -----------\n{e.stderr.decode()}") raise - def _ssh_cmd(self, cmd: list[str]): + def _ssh_cmd(self, cmd: list[str], capture: bool = False): """Execute a command on a the remote node using ssh/tsh as appropriate.""" - return self._local_cmd(self._ssh_prefix() + cmd) + local_cmd = self._ssh_prefix() + cmd + if capture: + return self._local_captured(local_cmd) + else: + return self._local_cmd(local_cmd) - def cmd(self, kcmd: list[str] | str): + def cmd(self, kcmd: list[str] | str, capture=False): """Execute a kubectl command on the agent node. + Capture mode streams data from process stdout via Generator + Non-capture more returns whole output as a list + + Args: + kcmd (list[str] | str): command to run on agent with kubectl + capture (bool, optional): Whether return whole result or + iterate line by line. Defaults to False. + + Returns: + list[str / bytes]: Return is either a whole lines list + or a Generator with lines as items """ # prepare self._install() @@ -167,7 +209,7 @@ def cmd(self, kcmd: list[str] | str): _kcmd = kcmd if isinstance(kcmd, list) else kcmd.split() # Format command cmd = _kubectl + _kcmd - return self._ssh_cmd(cmd) + return self._ssh_cmd(cmd, capture=capture) def exec(self, remote_cmd, pod_name=None): """Execute a command inside of a redpanda pod container. @@ -301,7 +343,10 @@ def exec_privileged(self, remote_cmd, pod_name=None): class KubeNodeShell(): - def __init__(self, kubectl: KubectlTool, node_name: str) -> None: + def __init__(self, + kubectl: KubectlTool, + node_name: str, + clean=False) -> None: self.kubectl = kubectl self.node_name = node_name # It is bad, but it works @@ -317,8 +362,8 @@ def __init__(self, kubectl: KubectlTool, node_name: str) -> None: # Cut them to fit self.pod_name = self.pod_name[:63] - # In case of concurrent tests, just reuse existing pod - self.pod_reused = True if self._is_shell_running() else False + # Set cleaning flag on exit + self.clean = clean def _is_shell_running(self): # Check if such pod exists @@ -370,10 +415,13 @@ def _build_overrides(self): } } - def __enter__(self): - if not self.pod_reused: + def initialize_nodeshell(self): + if not self._is_shell_running(): # Init node shell overrides = self._build_overrides() + # We do not require timeout option to fail + # if pod is not running at this point + # Feel free to uncomment _out = self.kubectl.cmd([ f"--context={self.current_context}", "run", @@ -386,7 +434,24 @@ def __enter__(self): self.logger.debug(f"Response: {_out.decode()}") return self - def __call__(self, cmd: list[str] | str) -> list: + def destroy_nodeshell(self): + if self._is_shell_running(): + try: + self.kubectl.cmd(f"delete pod {self.pod_name}") + except Exception as e: + self.logger.warning("Failed to delete node shell pod " + f"'{self.pod_name}': {e}") + return + + def __enter__(self): + return self.initialize_nodeshell() + + def __exit__(self, *args, **kwargs): + if self.clean: + self.destroy_nodeshell() + return + + def __call__(self, cmd: list[str] | str, capture=False): self.logger.info(f"Running command inside node '{self.node_name}'") # Prefix for running inside proper pod _kcmd = ["exec", self.pod_name, "--"] @@ -394,15 +459,8 @@ def __call__(self, cmd: list[str] | str) -> list: _cmd = cmd if isinstance(cmd, list) else cmd.split() _kcmd += _cmd # exception handler is inside subclass - _out = self.kubectl.cmd(_kcmd) - return _out.decode().splitlines() - - def __exit__(self, *args, **kwargs): - # If this instance created this pod, delete it - if not self.pod_reused: - try: - self.kubectl.cmd(f"delete pod {self.pod_name}") - except Exception as e: - self.logger.warning("Failed to delete node shell pod " - f"'{self.pod_name}': {e}") - return + _out = self.kubectl.cmd(_kcmd, capture=capture) + if capture: + return _out + else: + return _out.decode().splitlines() diff --git a/tests/rptest/remote_scripts/cloud/pod_log_extract.sh b/tests/rptest/remote_scripts/cloud/pod_log_extract.sh new file mode 100644 index 0000000000000..a3301d6af9b33 --- /dev/null +++ b/tests/rptest/remote_scripts/cloud/pod_log_extract.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -eu + +files=($(find /var/log/pods/ -type f | grep "$1" | grep -v "configurator")) + +for item in ${files[*]}; do + echo "# $item" + tail --lines=+$(cat $item | grep -n -m 1 "$2" | cut -d":" -f1) $item + echo "# End of log" +done diff --git a/tests/rptest/services/cloud_broker.py b/tests/rptest/services/cloud_broker.py index 9ccf89ebfa71c..a389a0c7cad89 100644 --- a/tests/rptest/services/cloud_broker.py +++ b/tests/rptest/services/cloud_broker.py @@ -1,7 +1,23 @@ import json +import os +import subprocess +from rptest.services.utils import KubeNodeShell + +from dataclasses import dataclass + + +@dataclass +class DummyAccount(): + node_name: str + pod_name: str + + @property + def hostname(self) -> str: + return f"{self.node_name}/{self.pod_name}" class CloudBroker(): + # Mimic ducktape cluster node structure def __init__(self, pod, kubectl, logger) -> None: self.logger = logger # Validate @@ -16,6 +32,11 @@ def __init__(self, pod, kubectl, logger) -> None: self._meta = pod['metadata'] self.name = self._meta['name'] + # Backward compatibility + # Various classes will use this to hash and compare nodes + self.account = DummyAccount(node_name=pod['spec']['nodeName'], + pod_name=pod['metadata']['name']) + # It appears that the node-id label will only be added if the cluster # is still being managed by the operator - if managed=false then this # label won't be updated/added @@ -30,11 +51,44 @@ def __init__(self, pod, kubectl, logger) -> None: self._spec = pod['spec'] self._status = pod['status'] - # Backward compatibility - # Various classes will use this var to hash and compare nodes - self.account = self._meta # Mimic Ducktape cluster node hostname field self.hostname = f"{self._spec['nodeName']}/{self.name}" + self.nodename = self._spec['nodeName'] + # Create node shell pod + self.nodeshell = KubeNodeShell(self._kubeclient, + self.nodename, + clean=False) + # Init node shell pod beforehand + self.nodeshell.initialize_nodeshell() + # Prepare log extraction script + self.inject_script("pod_log_extract.sh") + + def inject_script(self, script_name): + # Modified version of inject_script func + self.logger.info(f"Injecting '{script_name}' to {self.nodename}") + rptest_dir = os.path.dirname(os.path.realpath(__file__)) + scripts_dir = os.path.join(rptest_dir, os.path.pardir, + "remote_scripts", "cloud") + scripts_dir = os.path.abspath(scripts_dir) + assert os.path.exists(scripts_dir) + script_path = os.path.join(scripts_dir, script_name) + assert os.path.exists(script_path) + + # not using paramiko due to complexity of routing to actual node + # Copy ducktape -> agent + _scp_cmd = self._kubeclient._scp_cmd( + script_path, f"{self._kubeclient._remote_uri}:") + self.logger.debug(_scp_cmd) + res = subprocess.check_output(_scp_cmd) + # Copy agent -> broker node + remote_path = os.path.join("/tmp", script_name) + _cp_cmd = self._kubeclient._ssh_prefix() + [ + 'kubectl', 'cp', script_name, + f"{self.nodeshell.pod_name}:{remote_path}" + ] + self.logger.debug(_cp_cmd) + res = subprocess.check_output(_cp_cmd) + return def _query_broker(self, path, port=None): """ diff --git a/tests/rptest/services/cluster.py b/tests/rptest/services/cluster.py index b8eb177b99a75..700bdf0f78212 100644 --- a/tests/rptest/services/cluster.py +++ b/tests/rptest/services/cluster.py @@ -134,6 +134,12 @@ def wrapped(self: HasRedpanda, *args, **kwargs): # Also skip if we are running against the cloud return r + if isinstance(self.redpanda, RedpandaServiceCloud): + # Call copy logs function for RedpandaServiceCloud + # It can't be called from usual ducktape routing due + # to different inheritance structure + self.redpanda.copy_cloud_logs(t_initial) + log_local_load(self.test_context.test_name, self.redpanda.logger, t_initial, disk_stats_initial) diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index ae275d3b4cbfc..8e6e0a83e8f7d 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -66,7 +66,7 @@ from rptest.services.rolling_restarter import RollingRestarter from rptest.services.storage import ClusterStorage, NodeStorage, NodeCacheStorage from rptest.services.storage_failure_injection import FailureInjectionConfig -from rptest.services.utils import NodeCrash, LogSearchLocal, LogSearchCloud +from rptest.services.utils import NodeCrash, LogSearchLocal, LogSearchCloud, Stopwatch from rptest.util import inject_remote_script, ssh_output_stderr, wait_until_result from rptest.utils.allow_logs_on_predicate import AllowLogsOnPredicate @@ -2164,6 +2164,85 @@ def raise_on_bad_logs(self, allow_list=None, test_start_time=None): test_start_time=test_start_time) lsearcher.search_logs(self.pods) + def copy_cloud_logs(self, test_start_time): + """Method makes sure that agent and cloud logs is copied after the test + """ + def create_dest_path(service_name): + # Create directory into which service logs will be copied + dest = os.path.join( + TestContext.results_dir(self._context, + self._context.test_index), + service_name) + if not os.path.isdir(dest): + mkdir_p(dest) + + return dest + + def copy_from_agent(since): + service_name = f"{self._cloud_cluster.cluster_id}-agent" + # Example path: + # '/home/ubuntu/redpanda/tests/results/2024-04-11--019/SelfRedpandaCloudTest/test_healthy/2/coc12bfs0etj2dg9a5ig-agent' + dest = create_dest_path(service_name) + + logfile = os.path.join(dest, "agent.log") + # Query journalctl to copy logs from + with open(logfile, 'wb') as lfile: + for line in self.kubectl._ssh_cmd( + f"journalctl -u redpanda-agent -S '{since}'".split(), + capture=True): + lfile.writelines([line]) + return + + def copy_from_pod(params): + """Function copies logs from agent and all RP pods + """ + pod = params["pod"] + test_start_time = params["s_time"] + dest = create_dest_path(pod.name) + try: + remote_path = os.path.join("/tmp", "pod_log_extract.sh") + logfile = os.path.join(dest, f"{pod.name}.log") + with open(logfile, 'wb') as lfile: + for line in pod.nodeshell( + f"bash {remote_path} '{pod.name}' " + f"'{test_start_time}'".split(), + capture=True): + lfile.writelines([line]) # type: ignore + except Exception as e: + self.logger.warning(f"Error getting logs for {pod.name}: {e}") + return pod.name + + # Safeguard if CloudService not created + if self.pods is None or self._cloud_cluster is None: + return {} + # Prepare time for different occasions + t_start_time = time.gmtime(test_start_time) + + # Do not include seconds on purpose to improve chances + time_format = "%Y-%m-%d %H:%M" + f_start_time = time.strftime(time_format, t_start_time) + + # Collect-agent-logs + self._context.logger.debug("Copying cloud agent logs...") + sw = Stopwatch() + sw.start() + copy_from_agent(f_start_time) + sw.split() + self.logger.info(sw.elapsedf("# Done log copy from agent")) + + # Collect pod logs + # Use CloudBrokers as a source of metadata and the rest + pool = concurrent.futures.ThreadPoolExecutor(max_workers=3) + params = [] + for pod in self.pods: + params.append({"pod": pod, "s_time": f_start_time}) + sw.start() + for name in pool.map(copy_from_pod, params): + # Calculate time for this node + self.logger.info( + sw.elapsedf(f"# Done log copy for {name} (interim)")) + return {} + class RedpandaService(RedpandaServiceBase): diff --git a/tests/rptest/services/utils.py b/tests/rptest/services/utils.py index 520ccd31de4e1..010854acb3173 100644 --- a/tests/rptest/services/utils.py +++ b/tests/rptest/services/utils.py @@ -6,20 +6,46 @@ from typing import Generator, Optional from rptest.clients.kubectl import KubectlTool, KubeNodeShell -from rptest.services.cloud_broker import CloudBroker + + +class Stopwatch(): + def __init__(self): + self._start = 0 + self._end = 0 + + def start(self) -> None: + self._start = time.time() + self._end = self._start + + def split(self) -> None: + self._end = time.time() + if self._start == 0: + self._start = self._end + + @property + def elapsed(self) -> float: + """Calculates time elapsed from _start to _end. + If stop method is not called, returns current elapsed time + + Returns: + float: elapsed time + """ + if self._end != self._start: + return self._end - self._start + else: + return time.time() - self._start + + def elapseds(self) -> str: + return f"{self.elapsed:.2f}s" + + def elapsedf(self, note) -> str: + return f"{note}: {self.elapseds()}" class BadLogLines(Exception): def __init__(self, node_to_lines): self.node_to_lines = node_to_lines - @staticmethod - def _get_hostname(node): - if isinstance(node, CloudBroker): - return node.hostname - else: - return node.account.hostname - def __str__(self): # Pick the first line from the first node as an example, and include it # in the string output so that for single line failures, it isn't necessary @@ -28,7 +54,7 @@ def __str__(self): example = next(iter(example_lines)) summary = ','.join([ - f'{self._get_hostname(i[0])}({len(i[1])})' + f'{i[0].account.hostname}({len(i[1])})' for i in self.node_to_lines.items() ]) return f"" @@ -135,7 +161,9 @@ def _check_oversized_allocations(self, line): def _search(self, nodes): bad_lines = collections.defaultdict(list) test_name = self._context.function_name + sw = Stopwatch() for node in nodes: + sw.start() hostname = self._get_hostname(node) self.logger.info(f"Scanning node {hostname} log for errors...") # Prepare/Build capture func shortcut @@ -155,6 +183,9 @@ def _search(self, nodes): bad_lines[node].append(line) self.logger.warn(f"[{test_name}] Unexpected log line on " f"{hostname}: {line}") + self.logger.info( + sw.elapsedf( + f"##### Time spent to scan bad logs on '{hostname}'")) return bad_lines def search_logs(self, nodes): @@ -204,28 +235,28 @@ def parse_k8s_time(logline, tz): # Load log, output is in binary form loglines = [] - node_name = pod._spec['nodeName'] tz = "+00:00" - with KubeNodeShell(self.kubectl, node_name) as ksh: - try: - # get time zone in +00:00 format - tz = ksh("date +'%:z'") - # Assume UTC if output is empty - # But this should never happen - tz = tz[0] if len(tz) > 0 else "+00:00" - # Find all log files for target pod - logfiles = ksh(f"find /var/log/pods -type f") - for logfile in logfiles: - if pod.name in logfile and \ - 'redpanda-configurator' not in logfile: - self.logger.info(f"Inspecting '{logfile}'") - lines = ksh(f"cat {logfile} | grep {expr}") - loglines += lines - except Exception as e: - self.logger.warning(f"Error getting logs for {pod.name}: {e}") - else: - _size = len(loglines) - self.logger.debug(f"Received {_size}B of data from {pod.name}") + try: + # get time zone in +00:00 format + tz = pod.nodeshell("date +'%:z'") + # Assume UTC if output is empty + # But this should never happen + tz = tz[0] if len(tz) > 0 else "+00:00" + # Find all log files for target pod + # Return type without capture is always str, so ignore type + logfiles = pod.nodeshell( + f"find /var/log/pods -type f") # type: ignore + for logfile in logfiles: + if pod.name in logfile and \ + 'redpanda-configurator' not in logfile: # type: ignore + self.logger.info(f"Inspecting '{logfile}'") + lines = pod.nodeshell(f"cat {logfile} | grep {expr}") + loglines += lines + except Exception as e: + self.logger.warning(f"Error getting logs for {pod.name}: {e}") + else: + _size = len(loglines) + self.logger.debug(f"Received {_size}B of data from {pod.name}") # check log lines for proper timing. # Log lines will have two timing objects: