From 683cd7ceeb8e597d7b0e52537e01eb669b151b85 Mon Sep 17 00:00:00 2001 From: Ian Henry Date: Mon, 14 Jun 2021 15:25:18 -0400 Subject: [PATCH] chore(kubernetes_logs source): Adds tests for checkpointing regressions in k8s (#6564) * Adds tests for checkpointing regressions in k8s Signed-off-by: Ian Henry * Identified problem with checkpoint test, unable to properly time out the test now Signed-off-by: Spencer Gilbert * Rewrote and fixed checkpoint test Signed-off-by: Spencer Gilbert Co-authored-by: Spencer Gilbert --- lib/k8s-e2e-tests/tests/vector-agent.rs | 122 ++++++++++++++++++ lib/k8s-test-framework/src/framework.rs | 16 ++- lib/k8s-test-framework/src/lib.rs | 1 + lib/k8s-test-framework/src/restart_rollout.rs | 37 ++++++ 4 files changed, 174 insertions(+), 2 deletions(-) create mode 100644 lib/k8s-test-framework/src/restart_rollout.rs diff --git a/lib/k8s-e2e-tests/tests/vector-agent.rs b/lib/k8s-e2e-tests/tests/vector-agent.rs index 9a3e572dc49756..86582a8f3150a7 100644 --- a/lib/k8s-e2e-tests/tests/vector-agent.rs +++ b/lib/k8s-e2e-tests/tests/vector-agent.rs @@ -1825,3 +1825,125 @@ async fn host_metrics() -> Result<(), Box> { drop(vector); Ok(()) } + +#[tokio::test] +async fn simple_checkpoint() -> Result<(), Box> { + let _guard = lock(); + let framework = make_framework(); + + let vector = framework + .vector( + "test-vector", + HELM_CHART_VECTOR_AGENT, + VectorConfig { + custom_helm_values: HELM_VALUES_STDOUT_SINK, + ..Default::default() + }, + ) + .await?; + framework + .wait_for_rollout( + "test-vector", + "daemonset/vector-agent", + vec!["--timeout=60s"], + ) + .await?; + + let test_namespace = framework.namespace("test-vector-test-pod").await?; + + let test_pod = framework + .test_pod(test_pod::Config::from_pod(&make_test_pod( + "test-vector-test-pod", + "test-pod", + // This allows us to read and checkpoint the first log + // then ensure we just read the new marker after restarting Vector + "echo CHECKED_MARKER; sleep 60; echo MARKER", + vec![], + vec![], + ))?) + .await?; + framework + .wait( + "test-vector-test-pod", + vec!["pods/test-pod"], + WaitFor::Condition("initialized"), + vec!["--timeout=60s"], + ) + .await?; + + let mut log_reader = framework.logs("test-vector", "daemonset/vector-agent")?; + smoke_check_first_line(&mut log_reader).await; + + // Read the rest of the log lines. + let mut got_marker = false; + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + // A log from something other than our test pod, pretend we don't + // see it. + return FlowControlCommand::GoOn; + } + + // Ensure we got the marker. + assert_eq!(val["message"], "CHECKED_MARKER"); + + if got_marker { + // We've already seen one marker! This is not good, we only emitted + // one. + panic!("Marker seen more than once"); + } + + // If we did, remember it. + got_marker = true; + + // Request to stop the flow. + FlowControlCommand::Terminate + }) + .await?; + assert!(got_marker); + + framework + .restart_rollout("test-vector", "daemonset/vector-agent", vec![]) + .await?; + // We need to wait for the new pod to start + framework + .wait_for_rollout( + "test-vector", + "daemonset/vector-agent", + vec!["--timeout=60s"], + ) + .await?; + got_marker = false; + // We need to start reading from the newly started pod + let mut log_reader = framework.logs("test-vector", "daemonset/vector-agent")?; + look_for_log_line(&mut log_reader, |val| { + if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" { + return FlowControlCommand::GoOn; + } + + if val["message"].eq("CHECKED_MARKER") { + panic!("Checkpointed marker should not be found"); + }; + + assert_eq!(val["message"], "MARKER"); + + if got_marker { + // We've already seen one marker! This is not good, we only emitted + // one. + panic!("Marker seen more than once"); + } + + // If we did, remember it. + got_marker = true; + + // Request to stop the flow. + FlowControlCommand::Terminate + }) + .await?; + + assert!(got_marker); + + drop(test_pod); + drop(test_namespace); + drop(vector); + Ok(()) +} diff --git a/lib/k8s-test-framework/src/framework.rs b/lib/k8s-test-framework/src/framework.rs index 3ca05265b406a2..9c977f10a2031f 100644 --- a/lib/k8s-test-framework/src/framework.rs +++ b/lib/k8s-test-framework/src/framework.rs @@ -1,8 +1,9 @@ //! The test framework main entry point. use super::{ - exec_tail, kubernetes_version, log_lookup, namespace, pod, port_forward, test_pod, up_down, - vector, wait_for_resource, wait_for_rollout, Interface, PortForwarder, Reader, Result, + exec_tail, kubernetes_version, log_lookup, namespace, pod, port_forward, restart_rollout, + test_pod, up_down, vector, wait_for_resource, wait_for_rollout, Interface, PortForwarder, + Reader, Result, }; /// Framework wraps the interface to the system with an easy-to-use rust API @@ -162,6 +163,17 @@ impl Framework { wait_for_rollout::run(&self.interface.kubectl_command, namespace, resource, extra).await } + /// Trigger a restart for a rollout of a `resource`. + /// Use `extr + pub async fn restart_rollout<'a>( + &self, + namespace: &str, + resources: &str, + extra: impl IntoIterator, + ) -> Result<()> { + restart_rollout::run(&self.interface.kubectl_command, namespace, resources, extra).await + } + /// Gets the node for a given pod. async fn get_node_for_pod(&self, namespace: &str, pod: &str) -> Result { pod::get_node(&self.interface.kubectl_command, namespace, pod).await diff --git a/lib/k8s-test-framework/src/lib.rs b/lib/k8s-test-framework/src/lib.rs index 5f4daf5219fdca..ca073de0d15bd1 100644 --- a/lib/k8s-test-framework/src/lib.rs +++ b/lib/k8s-test-framework/src/lib.rs @@ -27,6 +27,7 @@ mod pod; mod port_forward; mod reader; mod resource_file; +pub mod restart_rollout; mod temp_file; pub mod test_pod; mod up_down; diff --git a/lib/k8s-test-framework/src/restart_rollout.rs b/lib/k8s-test-framework/src/restart_rollout.rs new file mode 100644 index 00000000000000..87e1b9f7b49277 --- /dev/null +++ b/lib/k8s-test-framework/src/restart_rollout.rs @@ -0,0 +1,37 @@ +//! Restart a resource rollout. + +use super::Result; +use crate::util::run_command; +use std::{ffi::OsStr, process::Stdio}; +use tokio::process::Command; + +/// Restart a rollout of a `resource` within a `namespace` to complete +/// via the specified `kubectl_command`. +/// Use the `extra` field to pass additional args to `kubectl` +pub async fn run( + kubectl_command: Cmd, + namespace: NS, + resource: R, + extra: impl IntoIterator, +) -> Result<()> +where + Cmd: AsRef, + NS: AsRef, + R: AsRef, + EX: AsRef, +{ + let mut command = Command::new(kubectl_command); + + command + .stdin(Stdio::null()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()); + + command.arg("rollout").arg("restart"); + command.arg("-n").arg(namespace); + command.arg(resource); + command.args(extra); + + run_command(command).await?; + Ok(()) +}