From 1a8ff1aaeca3e1c5d5d107acdd5921cf79bb46b7 Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Fri, 7 Apr 2023 12:10:27 -0700 Subject: [PATCH] Revert "fix(reduce transform): Fix flush not occuring when events arrive in high rate (#16146)" This reverts commit ab459399a7ca58c088dfbd30dd6c08f5799c929e. --- src/transforms/reduce/mod.rs | 54 ++---------------------------------- 1 file changed, 3 insertions(+), 51 deletions(-) diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index 53230c348a508..618f44fcbb1f2 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -1,6 +1,5 @@ use std::collections::BTreeMap; use std::{ - cmp::min, collections::{hash_map, HashMap}, num::NonZeroUsize, pin::Pin, @@ -230,7 +229,6 @@ struct ReduceState { events: usize, fields: HashMap>, stale_since: Instant, - last_flushed_at: Instant, metadata: EventMetadata, } @@ -242,7 +240,6 @@ impl ReduceState { Self { events: 0, stale_since: Instant::now(), - last_flushed_at: Instant::now(), fields, metadata, } @@ -346,10 +343,9 @@ impl Reduce { fn flush_into(&mut self, output: &mut Vec) { let mut flush_discriminants = Vec::new(); let now = Instant::now(); - for (k, t) in &mut self.reduce_merge_states { - if now - min(t.stale_since, t.last_flushed_at) >= self.expire_after { + for (k, t) in &self.reduce_merge_states { + if (now - t.stale_since) >= self.expire_after { flush_discriminants.push(k.clone()); - t.last_flushed_at = Instant::now(); } } for k in &flush_discriminants { @@ -476,8 +472,7 @@ impl TaskTransform for Reduce { #[cfg(test)] mod test { use serde_json::json; - use tokio::sync::mpsc::{self, Sender}; - use tokio::time::sleep; + use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use value::Kind; @@ -908,47 +903,4 @@ merge_strategies.bar = "concat" }) .await; } - - /// Tests the case where both starts_when and ends_when are not defined, - /// and aggregation continues on and on, without flushing as long as events - /// arrive in rate that is faster than the rate of expire_ms between events. - #[tokio::test] - async fn last_flush_at() { - let reduce_config = toml::from_str::( - r#" -group_by = [ "user_id" ] -expire_after_ms = 200 -flush_period_ms = 250 - "#, - ) - .unwrap(); - - assert_transform_compliance(async move { - let (tx, rx) = mpsc::channel(1); - let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; - - async fn send_event(tx: &Sender, user_id: i32) { - let mut log_event = LogEvent::from("test message"); - log_event.insert("user_id", user_id.to_string()); - tx.send(log_event.into()).await.unwrap(); - } - - // send in a rate that is double than the rate of of expire_ms between events - for _ in 0..5 { - send_event(&tx, 1).await; - sleep(Duration::from_millis(50)).await; - send_event(&tx, 2).await; - sleep(Duration::from_millis(50)).await; - } - - // verify messages arrive during this time - out.try_recv().expect("No message arrived"); - sleep(Duration::from_millis(10)).await; - out.try_recv().expect("No message arrived"); - - drop(tx); - topology.stop().await; - }) - .await; - } }