diff --git a/kunai/src/bin/main.rs b/kunai/src/bin/main.rs index b5cd333..db3925c 100644 --- a/kunai/src/bin/main.rs +++ b/kunai/src/bin/main.rs @@ -1492,7 +1492,7 @@ impl EventProducer { )), ) .unwrap(); - let event_reader = shared.clone(); + let event_producer = shared.clone(); let bar = barrier.clone(); let conf = config.clone(); @@ -1528,13 +1528,13 @@ impl EventProducer { ); { - let er = event_reader.lock().await; + let ep = event_producer.lock().await; for ty in Type::variants() { if ty.is_configurable() { error!( "stats {}: {}", ty, - er.stats.get(&ty, 0).unwrap_or_default() + ep.stats.get(&ty, 0).unwrap_or_default() ); } } @@ -1546,7 +1546,7 @@ impl EventProducer { // and is always <= buffers.len() for buf in buffers.iter().take(events.read) { let mut dec = EncodedEvent::from_bytes(buf); - let mut er = event_reader.lock().await; + let mut er = event_producer.lock().await; // we make sure here that only events for which we can grab info for // are pushed to the pipe. It is simplifying the error handling process @@ -1588,7 +1588,7 @@ impl EventProducer { // only one task needs to reduce if cpu_id == reducer_cpu_id { - let mut ep = event_reader.lock().await; + let mut ep = event_producer.lock().await; if ep.has_pending_events() { ep.process_piped_events().await; ep.batch += 1; @@ -1600,7 +1600,7 @@ impl EventProducer { bar.wait().await; // we break the loop if processor is stopped - if event_reader.lock().await.stop { + if event_producer.lock().await.stop { break; } }