Skip to content

Commit

Permalink
Merge pull request redpanda-data#17772 from redpanda-data/PESDLC-1113…
Browse files Browse the repository at this point in the history
…-add-htt-logs-copy

PESDLC-1113 Copy logs from agent and RP pods
  • Loading branch information
savex authored May 9, 2024
2 parents b0816de + a8909aa commit 1acaf9d
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 61 deletions.
2 changes: 1 addition & 1 deletion tests/docker/ducktape-deps/tinygo-wasi-transforms
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ function retry {
local count=0
until "$@"; do
exit=$?
count=$(($count + 1))
count=$((count + 1))
if [ $count -lt $retries ]; then
echo "Retry $count/$retries exited $exit, retrying..."
else
Expand Down
110 changes: 84 additions & 26 deletions tests/rptest/clients/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,25 +139,67 @@ def _install(self):
def logger(self) -> Logger:
return self._redpanda.logger

def _local_captured(self, cmd: list[str]):
"""Runs kubectl subcommands on a Cloud Agent
with streaming stdout and stderr to output
Args:
cmd (list[str]): kubectl commands to run
Raises:
RuntimeError: when return code is present
Yields:
Generator[str]: Generator of output line by line
"""
self._redpanda.logger.info(cmd)
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
return_code = process.returncode

if return_code:
raise RuntimeError(process.stdout, return_code)

for line in process.stdout: # type: ignore
yield line

return

def _local_cmd(self, cmd: list[str]):
"""Run the given command locally and return the stdout as bytes.
Logs stdout and stderr on failure."""
self._redpanda.logger.info(cmd)
try:
return subprocess.check_output(cmd, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
self.logger.info(
f'Command failed (rc={e.returncode}).\n' +
f'--------- stdout -----------\n{e.stdout.decode()}' +
f'--------- stderr -----------\n{e.stderr.decode()}')
self.logger.warning(
f"Command failed '{cmd}' (rc={e.returncode}).\n" +
f"--------- stdout -----------\n{e.stdout.decode()}" +
f"--------- stderr -----------\n{e.stderr.decode()}")
raise

def _ssh_cmd(self, cmd: list[str]):
def _ssh_cmd(self, cmd: list[str], capture: bool = False):
"""Execute a command on a the remote node using ssh/tsh as appropriate."""
return self._local_cmd(self._ssh_prefix() + cmd)
local_cmd = self._ssh_prefix() + cmd
if capture:
return self._local_captured(local_cmd)
else:
return self._local_cmd(local_cmd)

def cmd(self, kcmd: list[str] | str):
def cmd(self, kcmd: list[str] | str, capture=False):
"""Execute a kubectl command on the agent node.
Capture mode streams data from process stdout via Generator
Non-capture more returns whole output as a list
Args:
kcmd (list[str] | str): command to run on agent with kubectl
capture (bool, optional): Whether return whole result or
iterate line by line. Defaults to False.
Returns:
list[str / bytes]: Return is either a whole lines list
or a Generator with lines as items
"""
# prepare
self._install()
Expand All @@ -167,7 +209,7 @@ def cmd(self, kcmd: list[str] | str):
_kcmd = kcmd if isinstance(kcmd, list) else kcmd.split()
# Format command
cmd = _kubectl + _kcmd
return self._ssh_cmd(cmd)
return self._ssh_cmd(cmd, capture=capture)

def exec(self, remote_cmd, pod_name=None):
"""Execute a command inside of a redpanda pod container.
Expand Down Expand Up @@ -301,7 +343,10 @@ def exec_privileged(self, remote_cmd, pod_name=None):


class KubeNodeShell():
def __init__(self, kubectl: KubectlTool, node_name: str) -> None:
def __init__(self,
kubectl: KubectlTool,
node_name: str,
clean=False) -> None:
self.kubectl = kubectl
self.node_name = node_name
# It is bad, but it works
Expand All @@ -317,8 +362,8 @@ def __init__(self, kubectl: KubectlTool, node_name: str) -> None:
# Cut them to fit
self.pod_name = self.pod_name[:63]

# In case of concurrent tests, just reuse existing pod
self.pod_reused = True if self._is_shell_running() else False
# Set cleaning flag on exit
self.clean = clean

def _is_shell_running(self):
# Check if such pod exists
Expand Down Expand Up @@ -370,10 +415,13 @@ def _build_overrides(self):
}
}

def __enter__(self):
if not self.pod_reused:
def initialize_nodeshell(self):
if not self._is_shell_running():
# Init node shell
overrides = self._build_overrides()
# We do not require timeout option to fail
# if pod is not running at this point
# Feel free to uncomment
_out = self.kubectl.cmd([
f"--context={self.current_context}",
"run",
Expand All @@ -386,23 +434,33 @@ def __enter__(self):
self.logger.debug(f"Response: {_out.decode()}")
return self

def __call__(self, cmd: list[str] | str) -> list:
def destroy_nodeshell(self):
if self._is_shell_running():
try:
self.kubectl.cmd(f"delete pod {self.pod_name}")
except Exception as e:
self.logger.warning("Failed to delete node shell pod "
f"'{self.pod_name}': {e}")
return

def __enter__(self):
return self.initialize_nodeshell()

def __exit__(self, *args, **kwargs):
if self.clean:
self.destroy_nodeshell()
return

def __call__(self, cmd: list[str] | str, capture=False):
self.logger.info(f"Running command inside node '{self.node_name}'")
# Prefix for running inside proper pod
_kcmd = ["exec", self.pod_name, "--"]
# Universal for list and str
_cmd = cmd if isinstance(cmd, list) else cmd.split()
_kcmd += _cmd
# exception handler is inside subclass
_out = self.kubectl.cmd(_kcmd)
return _out.decode().splitlines()

def __exit__(self, *args, **kwargs):
# If this instance created this pod, delete it
if not self.pod_reused:
try:
self.kubectl.cmd(f"delete pod {self.pod_name}")
except Exception as e:
self.logger.warning("Failed to delete node shell pod "
f"'{self.pod_name}': {e}")
return
_out = self.kubectl.cmd(_kcmd, capture=capture)
if capture:
return _out
else:
return _out.decode().splitlines()
11 changes: 11 additions & 0 deletions tests/rptest/remote_scripts/cloud/pod_log_extract.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

set -eu

files=($(find /var/log/pods/ -type f | grep "$1" | grep -v "configurator"))

for item in ${files[*]}; do
echo "# $item"
tail --lines=+$(cat $item | grep -n -m 1 "$2" | cut -d":" -f1) $item
echo "# End of log"
done
60 changes: 57 additions & 3 deletions tests/rptest/services/cloud_broker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
import json
import os
import subprocess
from rptest.services.utils import KubeNodeShell

from dataclasses import dataclass


@dataclass
class DummyAccount():
node_name: str
pod_name: str

@property
def hostname(self) -> str:
return f"{self.node_name}/{self.pod_name}"


class CloudBroker():
# Mimic ducktape cluster node structure
def __init__(self, pod, kubectl, logger) -> None:
self.logger = logger
# Validate
Expand All @@ -16,6 +32,11 @@ def __init__(self, pod, kubectl, logger) -> None:
self._meta = pod['metadata']
self.name = self._meta['name']

# Backward compatibility
# Various classes will use this to hash and compare nodes
self.account = DummyAccount(node_name=pod['spec']['nodeName'],
pod_name=pod['metadata']['name'])

# It appears that the node-id label will only be added if the cluster
# is still being managed by the operator - if managed=false then this
# label won't be updated/added
Expand All @@ -30,11 +51,44 @@ def __init__(self, pod, kubectl, logger) -> None:
self._spec = pod['spec']
self._status = pod['status']

# Backward compatibility
# Various classes will use this var to hash and compare nodes
self.account = self._meta
# Mimic Ducktape cluster node hostname field
self.hostname = f"{self._spec['nodeName']}/{self.name}"
self.nodename = self._spec['nodeName']
# Create node shell pod
self.nodeshell = KubeNodeShell(self._kubeclient,
self.nodename,
clean=False)
# Init node shell pod beforehand
self.nodeshell.initialize_nodeshell()
# Prepare log extraction script
self.inject_script("pod_log_extract.sh")

def inject_script(self, script_name):
# Modified version of inject_script func
self.logger.info(f"Injecting '{script_name}' to {self.nodename}")
rptest_dir = os.path.dirname(os.path.realpath(__file__))
scripts_dir = os.path.join(rptest_dir, os.path.pardir,
"remote_scripts", "cloud")
scripts_dir = os.path.abspath(scripts_dir)
assert os.path.exists(scripts_dir)
script_path = os.path.join(scripts_dir, script_name)
assert os.path.exists(script_path)

# not using paramiko due to complexity of routing to actual node
# Copy ducktape -> agent
_scp_cmd = self._kubeclient._scp_cmd(
script_path, f"{self._kubeclient._remote_uri}:")
self.logger.debug(_scp_cmd)
res = subprocess.check_output(_scp_cmd)
# Copy agent -> broker node
remote_path = os.path.join("/tmp", script_name)
_cp_cmd = self._kubeclient._ssh_prefix() + [
'kubectl', 'cp', script_name,
f"{self.nodeshell.pod_name}:{remote_path}"
]
self.logger.debug(_cp_cmd)
res = subprocess.check_output(_cp_cmd)
return

def _query_broker(self, path, port=None):
"""
Expand Down
6 changes: 6 additions & 0 deletions tests/rptest/services/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ def wrapped(self: HasRedpanda, *args, **kwargs):
# Also skip if we are running against the cloud
return r

if isinstance(self.redpanda, RedpandaServiceCloud):
# Call copy logs function for RedpandaServiceCloud
# It can't be called from usual ducktape routing due
# to different inheritance structure
self.redpanda.copy_cloud_logs(t_initial)

log_local_load(self.test_context.test_name,
self.redpanda.logger, t_initial,
disk_stats_initial)
Expand Down
81 changes: 80 additions & 1 deletion tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from rptest.services.rolling_restarter import RollingRestarter
from rptest.services.storage import ClusterStorage, NodeStorage, NodeCacheStorage
from rptest.services.storage_failure_injection import FailureInjectionConfig
from rptest.services.utils import NodeCrash, LogSearchLocal, LogSearchCloud
from rptest.services.utils import NodeCrash, LogSearchLocal, LogSearchCloud, Stopwatch
from rptest.util import inject_remote_script, ssh_output_stderr, wait_until_result
from rptest.utils.allow_logs_on_predicate import AllowLogsOnPredicate

Expand Down Expand Up @@ -2164,6 +2164,85 @@ def raise_on_bad_logs(self, allow_list=None, test_start_time=None):
test_start_time=test_start_time)
lsearcher.search_logs(self.pods)

def copy_cloud_logs(self, test_start_time):
"""Method makes sure that agent and cloud logs is copied after the test
"""
def create_dest_path(service_name):
# Create directory into which service logs will be copied
dest = os.path.join(
TestContext.results_dir(self._context,
self._context.test_index),
service_name)
if not os.path.isdir(dest):
mkdir_p(dest)

return dest

def copy_from_agent(since):
service_name = f"{self._cloud_cluster.cluster_id}-agent"
# Example path:
# '/home/ubuntu/redpanda/tests/results/2024-04-11--019/SelfRedpandaCloudTest/test_healthy/2/coc12bfs0etj2dg9a5ig-agent'
dest = create_dest_path(service_name)

logfile = os.path.join(dest, "agent.log")
# Query journalctl to copy logs from
with open(logfile, 'wb') as lfile:
for line in self.kubectl._ssh_cmd(
f"journalctl -u redpanda-agent -S '{since}'".split(),
capture=True):
lfile.writelines([line])
return

def copy_from_pod(params):
"""Function copies logs from agent and all RP pods
"""
pod = params["pod"]
test_start_time = params["s_time"]
dest = create_dest_path(pod.name)
try:
remote_path = os.path.join("/tmp", "pod_log_extract.sh")
logfile = os.path.join(dest, f"{pod.name}.log")
with open(logfile, 'wb') as lfile:
for line in pod.nodeshell(
f"bash {remote_path} '{pod.name}' "
f"'{test_start_time}'".split(),
capture=True):
lfile.writelines([line]) # type: ignore
except Exception as e:
self.logger.warning(f"Error getting logs for {pod.name}: {e}")
return pod.name

# Safeguard if CloudService not created
if self.pods is None or self._cloud_cluster is None:
return {}
# Prepare time for different occasions
t_start_time = time.gmtime(test_start_time)

# Do not include seconds on purpose to improve chances
time_format = "%Y-%m-%d %H:%M"
f_start_time = time.strftime(time_format, t_start_time)

# Collect-agent-logs
self._context.logger.debug("Copying cloud agent logs...")
sw = Stopwatch()
sw.start()
copy_from_agent(f_start_time)
sw.split()
self.logger.info(sw.elapsedf("# Done log copy from agent"))

# Collect pod logs
# Use CloudBrokers as a source of metadata and the rest
pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
params = []
for pod in self.pods:
params.append({"pod": pod, "s_time": f_start_time})
sw.start()
for name in pool.map(copy_from_pod, params):
# Calculate time for this node
self.logger.info(
sw.elapsedf(f"# Done log copy for {name} (interim)"))
return {}


class RedpandaService(RedpandaServiceBase):

Expand Down
Loading

0 comments on commit 1acaf9d

Please sign in to comment.