Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PESDLC-1113 Copy logs from agent and RP pods #17772

Merged
merged 11 commits into from
May 9, 2024
2 changes: 1 addition & 1 deletion tests/docker/ducktape-deps/tinygo-wasi-transforms
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ function retry {
local count=0
until "$@"; do
exit=$?
count=$(($count + 1))
count=$((count + 1))
if [ $count -lt $retries ]; then
echo "Retry $count/$retries exited $exit, retrying..."
else
Expand Down
110 changes: 84 additions & 26 deletions tests/rptest/clients/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,25 +139,67 @@ def _install(self):
def logger(self) -> Logger:
return self._redpanda.logger

def _local_captured(self, cmd: list[str]):
"""Runs kubectl subcommands on a Cloud Agent
with streaming stdout and stderr to output

Args:
cmd (list[str]): kubectl commands to run

Raises:
RuntimeError: when return code is present

Yields:
Generator[str]: Generator of output line by line
"""
self._redpanda.logger.info(cmd)
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
return_code = process.returncode

if return_code:
raise RuntimeError(process.stdout, return_code)

for line in process.stdout: # type: ignore
yield line

return

def _local_cmd(self, cmd: list[str]):
"""Run the given command locally and return the stdout as bytes.
Logs stdout and stderr on failure."""
self._redpanda.logger.info(cmd)
try:
return subprocess.check_output(cmd, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
self.logger.info(
f'Command failed (rc={e.returncode}).\n' +
f'--------- stdout -----------\n{e.stdout.decode()}' +
f'--------- stderr -----------\n{e.stderr.decode()}')
self.logger.warning(
f"Command failed '{cmd}' (rc={e.returncode}).\n" +
f"--------- stdout -----------\n{e.stdout.decode()}" +
f"--------- stderr -----------\n{e.stderr.decode()}")
raise

def _ssh_cmd(self, cmd: list[str]):
def _ssh_cmd(self, cmd: list[str], capture: bool = False):
"""Execute a command on a the remote node using ssh/tsh as appropriate."""
return self._local_cmd(self._ssh_prefix() + cmd)
local_cmd = self._ssh_prefix() + cmd
if capture:
return self._local_captured(local_cmd)
else:
return self._local_cmd(local_cmd)

def cmd(self, kcmd: list[str] | str):
def cmd(self, kcmd: list[str] | str, capture=False):
"""Execute a kubectl command on the agent node.
savex marked this conversation as resolved.
Show resolved Hide resolved
Capture mode streams data from process stdout via Generator
Non-capture more returns whole output as a list

Args:
kcmd (list[str] | str): command to run on agent with kubectl
capture (bool, optional): Whether return whole result or
iterate line by line. Defaults to False.

Returns:
list[str / bytes]: Return is either a whole lines list
or a Generator with lines as items
"""
# prepare
self._install()
Expand All @@ -167,7 +209,7 @@ def cmd(self, kcmd: list[str] | str):
_kcmd = kcmd if isinstance(kcmd, list) else kcmd.split()
# Format command
cmd = _kubectl + _kcmd
return self._ssh_cmd(cmd)
return self._ssh_cmd(cmd, capture=capture)

def exec(self, remote_cmd, pod_name=None):
"""Execute a command inside of a redpanda pod container.
Expand Down Expand Up @@ -301,7 +343,10 @@ def exec_privileged(self, remote_cmd, pod_name=None):


class KubeNodeShell():
def __init__(self, kubectl: KubectlTool, node_name: str) -> None:
def __init__(self,
kubectl: KubectlTool,
node_name: str,
clean=False) -> None:
self.kubectl = kubectl
self.node_name = node_name
# It is bad, but it works
Expand All @@ -317,8 +362,8 @@ def __init__(self, kubectl: KubectlTool, node_name: str) -> None:
# Cut them to fit
self.pod_name = self.pod_name[:63]

# In case of concurrent tests, just reuse existing pod
self.pod_reused = True if self._is_shell_running() else False
# Set cleaning flag on exit
self.clean = clean

def _is_shell_running(self):
# Check if such pod exists
Expand Down Expand Up @@ -370,10 +415,13 @@ def _build_overrides(self):
}
}

def __enter__(self):
if not self.pod_reused:
def initialize_nodeshell(self):
if not self._is_shell_running():
# Init node shell
overrides = self._build_overrides()
# We do not require timeout option to fail
# if pod is not running at this point
# Feel free to uncomment
_out = self.kubectl.cmd([
f"--context={self.current_context}",
"run",
Expand All @@ -386,23 +434,33 @@ def __enter__(self):
self.logger.debug(f"Response: {_out.decode()}")
return self

def __call__(self, cmd: list[str] | str) -> list:
def destroy_nodeshell(self):
if self._is_shell_running():
try:
self.kubectl.cmd(f"delete pod {self.pod_name}")
except Exception as e:
self.logger.warning("Failed to delete node shell pod "
f"'{self.pod_name}': {e}")
return

def __enter__(self):
return self.initialize_nodeshell()

def __exit__(self, *args, **kwargs):
if self.clean:
self.destroy_nodeshell()
return

def __call__(self, cmd: list[str] | str, capture=False):
self.logger.info(f"Running command inside node '{self.node_name}'")
# Prefix for running inside proper pod
_kcmd = ["exec", self.pod_name, "--"]
# Universal for list and str
_cmd = cmd if isinstance(cmd, list) else cmd.split()
_kcmd += _cmd
# exception handler is inside subclass
_out = self.kubectl.cmd(_kcmd)
return _out.decode().splitlines()

def __exit__(self, *args, **kwargs):
# If this instance created this pod, delete it
if not self.pod_reused:
try:
self.kubectl.cmd(f"delete pod {self.pod_name}")
except Exception as e:
self.logger.warning("Failed to delete node shell pod "
f"'{self.pod_name}': {e}")
return
_out = self.kubectl.cmd(_kcmd, capture=capture)
if capture:
return _out
else:
return _out.decode().splitlines()
11 changes: 11 additions & 0 deletions tests/rptest/remote_scripts/cloud/pod_log_extract.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash
savex marked this conversation as resolved.
Show resolved Hide resolved

set -eu

files=($(find /var/log/pods/ -type f | grep "$1" | grep -v "configurator"))

for item in ${files[*]}; do
echo "# $item"
tail --lines=+$(cat $item | grep -n -m 1 "$2" | cut -d":" -f1) $item
echo "# End of log"
done
60 changes: 57 additions & 3 deletions tests/rptest/services/cloud_broker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
import json
import os
import subprocess
from rptest.services.utils import KubeNodeShell

from dataclasses import dataclass


@dataclass
class DummyAccount():
node_name: str
pod_name: str

@property
def hostname(self) -> str:
return f"{self.node_name}/{self.pod_name}"


class CloudBroker():
# Mimic ducktape cluster node structure
def __init__(self, pod, kubectl, logger) -> None:
self.logger = logger
# Validate
Expand All @@ -16,6 +32,11 @@ def __init__(self, pod, kubectl, logger) -> None:
self._meta = pod['metadata']
self.name = self._meta['name']

# Backward compatibility
# Various classes will use this to hash and compare nodes
self.account = DummyAccount(node_name=pod['spec']['nodeName'],
pod_name=pod['metadata']['name'])

# It appears that the node-id label will only be added if the cluster
# is still being managed by the operator - if managed=false then this
# label won't be updated/added
Expand All @@ -30,11 +51,44 @@ def __init__(self, pod, kubectl, logger) -> None:
self._spec = pod['spec']
self._status = pod['status']

# Backward compatibility
# Various classes will use this var to hash and compare nodes
self.account = self._meta
# Mimic Ducktape cluster node hostname field
self.hostname = f"{self._spec['nodeName']}/{self.name}"
self.nodename = self._spec['nodeName']
# Create node shell pod
self.nodeshell = KubeNodeShell(self._kubeclient,
self.nodename,
clean=False)
# Init node shell pod beforehand
self.nodeshell.initialize_nodeshell()
# Prepare log extraction script
self.inject_script("pod_log_extract.sh")

def inject_script(self, script_name):
# Modified version of inject_script func
self.logger.info(f"Injecting '{script_name}' to {self.nodename}")
rptest_dir = os.path.dirname(os.path.realpath(__file__))
scripts_dir = os.path.join(rptest_dir, os.path.pardir,
"remote_scripts", "cloud")
scripts_dir = os.path.abspath(scripts_dir)
assert os.path.exists(scripts_dir)
script_path = os.path.join(scripts_dir, script_name)
assert os.path.exists(script_path)

# not using paramiko due to complexity of routing to actual node
# Copy ducktape -> agent
_scp_cmd = self._kubeclient._scp_cmd(
script_path, f"{self._kubeclient._remote_uri}:")
self.logger.debug(_scp_cmd)
res = subprocess.check_output(_scp_cmd)
# Copy agent -> broker node
remote_path = os.path.join("/tmp", script_name)
_cp_cmd = self._kubeclient._ssh_prefix() + [
'kubectl', 'cp', script_name,
f"{self.nodeshell.pod_name}:{remote_path}"
]
self.logger.debug(_cp_cmd)
res = subprocess.check_output(_cp_cmd)
return

def _query_broker(self, path, port=None):
"""
Expand Down
6 changes: 6 additions & 0 deletions tests/rptest/services/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ def wrapped(self: HasRedpanda, *args, **kwargs):
# Also skip if we are running against the cloud
return r

if isinstance(self.redpanda, RedpandaServiceCloud):
# Call copy logs function for RedpandaServiceCloud
# It can't be called from usual ducktape routing due
# to different inheritance structure
self.redpanda.copy_cloud_logs(t_initial)

log_local_load(self.test_context.test_name,
self.redpanda.logger, t_initial,
disk_stats_initial)
Expand Down
81 changes: 80 additions & 1 deletion tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from rptest.services.rolling_restarter import RollingRestarter
from rptest.services.storage import ClusterStorage, NodeStorage, NodeCacheStorage
from rptest.services.storage_failure_injection import FailureInjectionConfig
from rptest.services.utils import NodeCrash, LogSearchLocal, LogSearchCloud
from rptest.services.utils import NodeCrash, LogSearchLocal, LogSearchCloud, Stopwatch
from rptest.util import inject_remote_script, ssh_output_stderr, wait_until_result
from rptest.utils.allow_logs_on_predicate import AllowLogsOnPredicate

Expand Down Expand Up @@ -2164,6 +2164,85 @@ def raise_on_bad_logs(self, allow_list=None, test_start_time=None):
test_start_time=test_start_time)
lsearcher.search_logs(self.pods)

def copy_cloud_logs(self, test_start_time):
"""Method makes sure that agent and cloud logs is copied after the test
"""
def create_dest_path(service_name):
# Create directory into which service logs will be copied
dest = os.path.join(
TestContext.results_dir(self._context,
self._context.test_index),
service_name)
if not os.path.isdir(dest):
mkdir_p(dest)

return dest

def copy_from_agent(since):
service_name = f"{self._cloud_cluster.cluster_id}-agent"
# Example path:
# '/home/ubuntu/redpanda/tests/results/2024-04-11--019/SelfRedpandaCloudTest/test_healthy/2/coc12bfs0etj2dg9a5ig-agent'
dest = create_dest_path(service_name)

logfile = os.path.join(dest, "agent.log")
# Query journalctl to copy logs from
with open(logfile, 'wb') as lfile:
for line in self.kubectl._ssh_cmd(
f"journalctl -u redpanda-agent -S '{since}'".split(),
capture=True):
lfile.writelines([line])
return

def copy_from_pod(params):
"""Function copies logs from agent and all RP pods
"""
pod = params["pod"]
test_start_time = params["s_time"]
dest = create_dest_path(pod.name)
try:
remote_path = os.path.join("/tmp", "pod_log_extract.sh")
logfile = os.path.join(dest, f"{pod.name}.log")
with open(logfile, 'wb') as lfile:
for line in pod.nodeshell(
f"bash {remote_path} '{pod.name}' "
f"'{test_start_time}'".split(),
capture=True):
lfile.writelines([line]) # type: ignore
except Exception as e:
self.logger.warning(f"Error getting logs for {pod.name}: {e}")
return pod.name

# Safeguard if CloudService not created
if self.pods is None or self._cloud_cluster is None:
return {}
# Prepare time for different occasions
t_start_time = time.gmtime(test_start_time)

# Do not include seconds on purpose to improve chances
time_format = "%Y-%m-%d %H:%M"
f_start_time = time.strftime(time_format, t_start_time)

# Collect-agent-logs
self._context.logger.debug("Copying cloud agent logs...")
sw = Stopwatch()
sw.start()
copy_from_agent(f_start_time)
sw.split()
self.logger.info(sw.elapsedf("# Done log copy from agent"))

# Collect pod logs
# Use CloudBrokers as a source of metadata and the rest
pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
params = []
for pod in self.pods:
params.append({"pod": pod, "s_time": f_start_time})
sw.start()
for name in pool.map(copy_from_pod, params):
# Calculate time for this node
self.logger.info(
sw.elapsedf(f"# Done log copy for {name} (interim)"))
return {}


class RedpandaService(RedpandaServiceBase):

Expand Down
Loading
Loading