Skip to content

Commit

Permalink
chore(kubernetes_logs source): Adds tests for checkpointing regressio…
Browse files Browse the repository at this point in the history
…ns in k8s (#6564)

* Adds tests for checkpointing regressions in k8s

Signed-off-by: Ian Henry <ianjhenry00@gmail.com>

* Identified problem with checkpoint test, unable to properly time out the test now

Signed-off-by: Spencer Gilbert <spencer.gilbert@gmail.com>

* Rewrote and fixed checkpoint test

Signed-off-by: Spencer Gilbert <spencer.gilbert@gmail.com>

Co-authored-by: Spencer Gilbert <spencer.gilbert@gmail.com>
  • Loading branch information
2 people authored and pull[bot] committed Nov 15, 2022
1 parent 5e2b589 commit 683cd7c
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 2 deletions.
122 changes: 122 additions & 0 deletions lib/k8s-e2e-tests/tests/vector-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1825,3 +1825,125 @@ async fn host_metrics() -> Result<(), Box<dyn std::error::Error>> {
drop(vector);
Ok(())
}

#[tokio::test]
async fn simple_checkpoint() -> Result<(), Box<dyn std::error::Error>> {
let _guard = lock();
let framework = make_framework();

let vector = framework
.vector(
"test-vector",
HELM_CHART_VECTOR_AGENT,
VectorConfig {
custom_helm_values: HELM_VALUES_STDOUT_SINK,
..Default::default()
},
)
.await?;
framework
.wait_for_rollout(
"test-vector",
"daemonset/vector-agent",
vec!["--timeout=60s"],
)
.await?;

let test_namespace = framework.namespace("test-vector-test-pod").await?;

let test_pod = framework
.test_pod(test_pod::Config::from_pod(&make_test_pod(
"test-vector-test-pod",
"test-pod",
// This allows us to read and checkpoint the first log
// then ensure we just read the new marker after restarting Vector
"echo CHECKED_MARKER; sleep 60; echo MARKER",
vec![],
vec![],
))?)
.await?;
framework
.wait(
"test-vector-test-pod",
vec!["pods/test-pod"],
WaitFor::Condition("initialized"),
vec!["--timeout=60s"],
)
.await?;

let mut log_reader = framework.logs("test-vector", "daemonset/vector-agent")?;
smoke_check_first_line(&mut log_reader).await;

// Read the rest of the log lines.
let mut got_marker = false;
look_for_log_line(&mut log_reader, |val| {
if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" {
// A log from something other than our test pod, pretend we don't
// see it.
return FlowControlCommand::GoOn;
}

// Ensure we got the marker.
assert_eq!(val["message"], "CHECKED_MARKER");

if got_marker {
// We've already seen one marker! This is not good, we only emitted
// one.
panic!("Marker seen more than once");
}

// If we did, remember it.
got_marker = true;

// Request to stop the flow.
FlowControlCommand::Terminate
})
.await?;
assert!(got_marker);

framework
.restart_rollout("test-vector", "daemonset/vector-agent", vec![])
.await?;
// We need to wait for the new pod to start
framework
.wait_for_rollout(
"test-vector",
"daemonset/vector-agent",
vec!["--timeout=60s"],
)
.await?;
got_marker = false;
// We need to start reading from the newly started pod
let mut log_reader = framework.logs("test-vector", "daemonset/vector-agent")?;
look_for_log_line(&mut log_reader, |val| {
if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" {
return FlowControlCommand::GoOn;
}

if val["message"].eq("CHECKED_MARKER") {
panic!("Checkpointed marker should not be found");
};

assert_eq!(val["message"], "MARKER");

if got_marker {
// We've already seen one marker! This is not good, we only emitted
// one.
panic!("Marker seen more than once");
}

// If we did, remember it.
got_marker = true;

// Request to stop the flow.
FlowControlCommand::Terminate
})
.await?;

assert!(got_marker);

drop(test_pod);
drop(test_namespace);
drop(vector);
Ok(())
}
16 changes: 14 additions & 2 deletions lib/k8s-test-framework/src/framework.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! The test framework main entry point.
use super::{
exec_tail, kubernetes_version, log_lookup, namespace, pod, port_forward, test_pod, up_down,
vector, wait_for_resource, wait_for_rollout, Interface, PortForwarder, Reader, Result,
exec_tail, kubernetes_version, log_lookup, namespace, pod, port_forward, restart_rollout,
test_pod, up_down, vector, wait_for_resource, wait_for_rollout, Interface, PortForwarder,
Reader, Result,
};

/// Framework wraps the interface to the system with an easy-to-use rust API
Expand Down Expand Up @@ -162,6 +163,17 @@ impl Framework {
wait_for_rollout::run(&self.interface.kubectl_command, namespace, resource, extra).await
}

/// Trigger a restart for a rollout of a `resource`.
/// Use `extr
pub async fn restart_rollout<'a>(
&self,
namespace: &str,
resources: &str,
extra: impl IntoIterator<Item = &'a str>,
) -> Result<()> {
restart_rollout::run(&self.interface.kubectl_command, namespace, resources, extra).await
}

/// Gets the node for a given pod.
async fn get_node_for_pod(&self, namespace: &str, pod: &str) -> Result<String> {
pod::get_node(&self.interface.kubectl_command, namespace, pod).await
Expand Down
1 change: 1 addition & 0 deletions lib/k8s-test-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod pod;
mod port_forward;
mod reader;
mod resource_file;
pub mod restart_rollout;
mod temp_file;
pub mod test_pod;
mod up_down;
Expand Down
37 changes: 37 additions & 0 deletions lib/k8s-test-framework/src/restart_rollout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//! Restart a resource rollout.
use super::Result;
use crate::util::run_command;
use std::{ffi::OsStr, process::Stdio};
use tokio::process::Command;

/// Restart a rollout of a `resource` within a `namespace` to complete
/// via the specified `kubectl_command`.
/// Use the `extra` field to pass additional args to `kubectl`
pub async fn run<Cmd, NS, R, EX>(
kubectl_command: Cmd,
namespace: NS,
resource: R,
extra: impl IntoIterator<Item = EX>,
) -> Result<()>
where
Cmd: AsRef<OsStr>,
NS: AsRef<OsStr>,
R: AsRef<OsStr>,
EX: AsRef<OsStr>,
{
let mut command = Command::new(kubectl_command);

command
.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit());

command.arg("rollout").arg("restart");
command.arg("-n").arg(namespace);
command.arg(resource);
command.args(extra);

run_command(command).await?;
Ok(())
}

0 comments on commit 683cd7c

Please sign in to comment.