Skip to content

Commit

Permalink
Rewrote and fixed checkpoint test
Browse files Browse the repository at this point in the history
Signed-off-by: Spencer Gilbert <spencer.gilbert@gmail.com>
  • Loading branch information
spencergilbert committed Jun 8, 2021
1 parent c83c1e2 commit 7c0dc25
Showing 1 changed file with 27 additions and 77 deletions.
104 changes: 27 additions & 77 deletions lib/k8s-e2e-tests/tests/vector-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,9 @@ async fn simple_checkpoint() -> Result<(), Box<dyn std::error::Error>> {
.test_pod(test_pod::Config::from_pod(&make_test_pod(
"test-vector-test-pod",
"test-pod",
"echo MARKER",
// This allows us to read and checkpoint the first log
// then ensure we just read the new marker after restarting Vector
"echo CHECKED_MARKER; sleep 60; echo MARKER",
vec![],
vec![],
))?)
Expand All @@ -1833,7 +1835,7 @@ async fn simple_checkpoint() -> Result<(), Box<dyn std::error::Error>> {
}

// Ensure we got the marker.
assert_eq!(val["message"], "MARKER");
assert_eq!(val["message"], "CHECKED_MARKER");

if got_marker {
// We've already seen one marker! This is not good, we only emitted
Expand All @@ -1854,94 +1856,42 @@ async fn simple_checkpoint() -> Result<(), Box<dyn std::error::Error>> {
.restart_rollout("test-vector", "daemonset/vector-agent", vec![])
.await?;
// We need to wait for the new pod to start
// framework
// .wait_for_rollout(
// "test-vector",
// "daemonset/vector-agent",
// vec!["--timeout=60s"],
// )
// .await?;
framework
.wait_for_rollout(
"test-vector",
"daemonset/vector-agent",
vec!["--timeout=60s"],
)
.await?;
got_marker = false;
// We need to start reading from the newly started pod
let mut log_reader = framework.logs("test-vector", "daemonset/vector-agent")?;
look_for_log_line(&mut log_reader, |val| {
if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" {
return FlowControlCommand::GoOn;
}

if val["message"].eq("MARKER") {
panic!("Checkpointed, marker should not be found");
if val["message"].eq("CHECKED_MARKER") {
panic!("Checkpointed marker should not be found");
};

assert_eq!(val["message"], "MARKER");

if got_marker {
// We've already seen one marker! This is not good, we only emitted
// one.
panic!("Marker seen more than once");
}

// If we did, remember it.
got_marker = true;

// Request to stop the flow.
FlowControlCommand::Terminate
})
.await?;
// We need to re-create the log_reader to connect to the new pod
// let mut log_reader = framework.logs("test-vector", "daemonset/vector-agent")?;
// let mut lines_till_we_give_up: usize = 10000;
// let (stop_tx, mut stop_rx) = futures::channel::mpsc::channel(0);
// loop {
// let line = tokio::select! {
// result = stop_rx.next() => {
// result.unwrap();
// log_reader.kill().await?;
// continue;
// }
// line = log_reader.read_line() => line,
// };
// let line = match line {
// Some(line) => line,
// None => break,
// };
// println!("Got line: {:?}", line);

// lines_till_we_give_up -= 1;
// if lines_till_we_give_up == 0 {
// println!("Giving up");
// log_reader.kill().await?;
// break;
// }

// if !line.starts_with('{') {
// This isn't a json, must be an entry from Vector's own log stream.
// continue;
// }

// let val = parse_json(&line)?;

//if val["kubernetes"]["pod_namespace"] != "test-vector-test-pod" {
// A log from something other than our test pod, pretend we don't
// see it.
// continue;
//}

//if val["message"].eq("MARKER") {
// panic!("Checkpointed, marker should not be found");
//};

// Request termination in a while.
// let mut stop_tx = stop_tx.clone();
// tokio::spawn(async move {
// Wait for two minutes - a reasonable time for vector internals to
// pick up new `Pod` and collect events from them in idle load.
// Here, we're assuming that if the `Pod` that was supposed to be
// ignored was in fact collected (meaning something's wrong with
// the exclusion logic), we'd see it's data within this time frame.
// It's not enough to just wait for `Pod` complete, we should still
// apply a reasonably big timeout before we stop waiting for the
// logs to appear to have high confidence that Vector has enough
// time to pick them up and spit them out.
// let duration = std::time::Duration::from_secs(120);
// println!("Starting stop timer, due in {} seconds", duration.as_secs());
// tokio::time::sleep(duration).await;
// println!("Stop timer complete");
// stop_tx.send(()).await.unwrap();
// });
// }

// Ensure log reader exited.
// log_reader.wait().await.expect("log reader wait failed");

assert!(!got_marker);
assert!(got_marker);

drop(test_pod);
drop(test_namespace);
Expand Down

0 comments on commit 7c0dc25

Please sign in to comment.