diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 559be99e1b5a2..7b0334fa0e905 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -368,6 +368,7 @@ nixpkgs nokia nslookup nsupdate +ntapi ntfs opendal opensearch diff --git a/Cargo.lock b/Cargo.lock index 741b8ac7c55d0..c19e929958c33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3816,12 +3816,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.2.6" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" -dependencies = [ - "libc", -] +checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" [[package]] name = "hex" @@ -4235,14 +4232,14 @@ dependencies = [ [[package]] name = "is-terminal" -version = "0.4.1" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927609f78c2913a6f6ac3c27a4fe87f43e2a35367c0c4b0f8265e8f49a104330" +checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" dependencies = [ - "hermit-abi 0.2.6", + "hermit-abi 0.3.1", "io-lifetimes 1.0.3", - "rustix 0.36.4", - "windows-sys 0.42.0", + "rustix 0.37.5", + "windows-sys 0.48.0", ] [[package]] @@ -4655,12 +4652,6 @@ version = "0.0.46" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4d2456c373231a208ad294c33dc5bff30051eafd954cd4caae83a712b12854d" -[[package]] -name = "linux-raw-sys" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f" - [[package]] name = "linux-raw-sys" version = "0.3.0" @@ -5317,8 +5308,7 @@ dependencies = [ [[package]] name = "ntapi" version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" +source = "git+https://github.com/MSxDOS/ntapi.git?rev=24fc1e47677fc9f6e38e5f154e6011dc9b270da6#24fc1e47677fc9f6e38e5f154e6011dc9b270da6" dependencies = [ "winapi", ] @@ -7007,20 +6997,6 @@ dependencies = [ "windows-sys 0.42.0", ] -[[package]] -name = "rustix" -version = "0.36.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb93e85278e08bb5788653183213d3a60fc242b10cb9be96586f5a73dcb67c23" -dependencies = [ - "bitflags", - "errno 0.2.8", - "io-lifetimes 1.0.3", - "libc", - "linux-raw-sys 0.1.3", - "windows-sys 0.42.0", -] - [[package]] name = "rustix" version = "0.37.5" diff --git a/Cargo.toml b/Cargo.toml index a5d4de65f0164..34de723f65910 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -372,6 +372,9 @@ chrono = { git = "https://github.com/vectordotdev/chrono.git", tag = "v0.4.24-no # The upgrade for `tokio-util` >= 0.6.9 is blocked on https://github.com/vectordotdev/vector/issues/11257. tokio-util = { git = "https://github.com/vectordotdev/tokio", branch = "tokio-util-0.7.4-framed-read-continue-on-error" } nix = { git = "https://github.com/vectordotdev/nix.git", branch = "memfd/gnu/musl" } +# The `heim` crates depend on `ntapi` 0.3.7 on Windows, but that version has an +# unaligned access bug fixed in the following revision. +ntapi = { git = "https://github.com/MSxDOS/ntapi.git", rev = "24fc1e47677fc9f6e38e5f154e6011dc9b270da6" } [features] # Default features for *-unknown-linux-gnu and *-apple-darwin diff --git a/Tiltfile b/Tiltfile index 1cb7236c5f9d5..1766beef83e69 100644 --- a/Tiltfile +++ b/Tiltfile @@ -7,7 +7,7 @@ load('ext://helm_resource', 'helm_resource', 'helm_repo') docker_build( ref='timberio/vector', context='.', - build_args={'RUST_VERSION': '1.66.1'}, + build_args={'RUST_VERSION': '1.69.0'}, dockerfile='tilt/Dockerfile' ) diff --git a/lib/codecs/src/decoding/framing/octet_counting.rs b/lib/codecs/src/decoding/framing/octet_counting.rs index 98fdfbc78fdf0..0e9f2d15a4b4d 100644 --- a/lib/codecs/src/decoding/framing/octet_counting.rs +++ b/lib/codecs/src/decoding/framing/octet_counting.rs @@ -406,7 +406,7 @@ mod tests { let mut buffer = BytesMut::with_capacity(32); buffer.put(&b"32thisshouldbelongerthanthmaxframeasizewhichmeansthesyslogparserwillnotbeabletodecodeit"[..]); - let _ = decoder.decode(&mut buffer); + _ = decoder.decode(&mut buffer); assert_eq!(decoder.octet_decoding, Some(State::DiscardingToEol)); buffer.put(&b"wemustcontinuetodiscard\n32 something valid"[..]); diff --git a/lib/file-source/benches/buffer.rs b/lib/file-source/benches/buffer.rs index 525c3ad89dc1a..f1a187961d269 100644 --- a/lib/file-source/benches/buffer.rs +++ b/lib/file-source/benches/buffer.rs @@ -57,7 +57,7 @@ fn read_until_bench(c: &mut Criterion) { let delimiter: [u8; 1] = [param.delim]; group.bench_with_input(BenchmarkId::new("read_until", param), ¶m, |b, _| { b.iter(|| { - let _ = read_until_with_max_size( + _ = read_until_with_max_size( &mut reader, &mut position, &delimiter, diff --git a/lib/file-source/src/file_watcher/tests/experiment.rs b/lib/file-source/src/file_watcher/tests/experiment.rs index bf435c513e1d9..decdbdab98240 100644 --- a/lib/file-source/src/file_watcher/tests/experiment.rs +++ b/lib/file-source/src/file_watcher/tests/experiment.rs @@ -48,7 +48,8 @@ fn experiment(actions: Vec) { for action in actions.iter() { match *action { FileWatcherAction::DeleteFile => { - let _ = fs::remove_file(&path); + _ = fs::remove_file(&path); + #[cfg(not(windows))] // Windows will only remove after the file is closed. assert!(!path.exists()); fwfiles[0].reset(); break; diff --git a/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs b/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs index b1a184984f087..ee8a24a9f95bf 100644 --- a/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs +++ b/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs @@ -32,7 +32,8 @@ fn experiment_no_truncations(actions: Vec) { for action in actions.iter() { match *action { FileWatcherAction::DeleteFile => { - let _ = fs::remove_file(&path); + _ = fs::remove_file(&path); + #[cfg(not(windows))] // Windows will only remove after the file is closed. assert!(!path.exists()); fwfiles[0].reset(); break; diff --git a/lib/k8s-e2e-tests/src/lib.rs b/lib/k8s-e2e-tests/src/lib.rs index 22bcaaaee12e7..6d5788cfa767c 100644 --- a/lib/k8s-e2e-tests/src/lib.rs +++ b/lib/k8s-e2e-tests/src/lib.rs @@ -17,7 +17,7 @@ pub mod metrics; pub const BUSYBOX_IMAGE: &str = "busybox:1.28"; pub fn init() { - let _ = env_logger::builder().is_test(true).try_init(); + _ = env_logger::builder().is_test(true).try_init(); } pub fn get_namespace() -> String { diff --git a/lib/k8s-test-framework/src/temp_file.rs b/lib/k8s-test-framework/src/temp_file.rs index 6eb122de3ca1e..d1535b6b5a658 100644 --- a/lib/k8s-test-framework/src/temp_file.rs +++ b/lib/k8s-test-framework/src/temp_file.rs @@ -23,7 +23,7 @@ impl TempFile { impl Drop for TempFile { fn drop(&mut self) { if let Some(dir) = self.path.parent() { - let _ = std::fs::remove_dir_all(dir); + _ = std::fs::remove_dir_all(dir); } } } diff --git a/lib/tracing-limit/benches/limit.rs b/lib/tracing-limit/benches/limit.rs index ebf861b709a06..f53677b206fc7 100644 --- a/lib/tracing-limit/benches/limit.rs +++ b/lib/tracing-limit/benches/limit.rs @@ -97,7 +97,7 @@ struct Visitor<'a>(MutexGuard<'a, String>); impl<'a> field::Visit for Visitor<'a> { fn record_debug(&mut self, _field: &field::Field, value: &dyn fmt::Debug) { use std::fmt::Write; - let _ = write!(&mut *self.0, "{:?}", value); + _ = write!(&mut *self.0, "{:?}", value); } } @@ -110,7 +110,7 @@ where } fn enabled(&self, metadata: &Metadata<'_>, _ctx: Context<'_, S>) -> bool { - let _ = metadata; + _ = metadata; true } @@ -130,23 +130,23 @@ where } fn on_follows_from(&self, id: &span::Id, follows: &span::Id, _ctx: Context<'_, S>) { - let _ = (id, follows); + _ = (id, follows); } fn on_enter(&self, id: &span::Id, _ctx: Context<'_, S>) { - let _ = id; + _ = id; } fn on_exit(&self, id: &span::Id, _ctx: Context<'_, S>) { - let _ = id; + _ = id; } fn on_close(&self, id: span::Id, _ctx: Context<'_, S>) { - let _ = id; + _ = id; } fn on_id_change(&self, old: &span::Id, new: &span::Id, _ctx: Context<'_, S>) { - let _ = (old, new); + _ = (old, new); } } diff --git a/lib/vector-api-client/src/subscription.rs b/lib/vector-api-client/src/subscription.rs index f05dc6645cc7c..86929cda81e9a 100644 --- a/lib/vector-api-client/src/subscription.rs +++ b/lib/vector-api-client/src/subscription.rs @@ -108,7 +108,7 @@ impl SubscriptionClient { _ = &mut shutdown_rx => { let subscriptions = subscriptions_clone.lock().unwrap(); for id in subscriptions.keys() { - let _ = tx_clone.send(Payload::stop(*id)); + _ = tx_clone.send(Payload::stop(*id)); } break }, @@ -120,7 +120,7 @@ impl SubscriptionClient { let subscriptions = subscriptions_clone.lock().unwrap(); let s: Option<&Sender> = subscriptions.get::(&p.id); if let Some(s) = s { - let _ = s.send(p); + _ = s.send(p); } } None => { @@ -159,8 +159,8 @@ impl SubscriptionClient { self.subscriptions.lock().unwrap().insert(id, tx); // Initialize the connection with the relevant control messages. - let _ = self.tx.send(Payload::init(id)); - let _ = self.tx.send(Payload::start::(id, request_body)); + _ = self.tx.send(Payload::init(id)); + _ = self.tx.send(Payload::start::(id, request_body)); Box::pin( BroadcastStream::new(rx) @@ -185,7 +185,7 @@ pub async fn connect_subscription_client( // Forwarded received messages back upstream to the GraphQL server tokio::spawn(async move { while let Some(p) = send_rx.recv().await { - let _ = ws_tx + _ = ws_tx .send(Message::Text(serde_json::to_string(&p).unwrap())) .await; } @@ -195,7 +195,7 @@ pub async fn connect_subscription_client( tokio::spawn(async move { while let Some(Ok(Message::Text(m))) = ws_rx.next().await { if let Ok(p) = serde_json::from_str::(&m) { - let _ = recv_tx.send(p); + _ = recv_tx.send(p); } } }); diff --git a/lib/vector-buffers/benches/common.rs b/lib/vector-buffers/benches/common.rs index c258407a488b3..1a07505215583 100644 --- a/lib/vector-buffers/benches/common.rs +++ b/lib/vector-buffers/benches/common.rs @@ -102,7 +102,7 @@ impl FixedEncodable for Message { let id = buffer.get_u64(); for _ in 0..N { // this covers self._padding - let _ = buffer.get_u64(); + _ = buffer.get_u64(); } Ok(Message::new(id)) } @@ -177,6 +177,6 @@ pub async fn war_measurement( ) { for msg in messages.into_iter() { sender.send(msg).await.unwrap(); - let _ = receiver.next().await.unwrap(); + _ = receiver.next().await.unwrap(); } } diff --git a/lib/vector-buffers/src/topology/channel/tests.rs b/lib/vector-buffers/src/topology/channel/tests.rs index d3b48bacff769..f68604a406894 100644 --- a/lib/vector-buffers/src/topology/channel/tests.rs +++ b/lib/vector-buffers/src/topology/channel/tests.rs @@ -44,7 +44,7 @@ where // Synchronize with sender and then wait for a small period of time to simulate a // blocking delay. - let _ = recv_baton.wait().await; + _ = recv_baton.wait().await; sleep(recv_delay).await; // Grab all messages and then return the results. @@ -58,7 +58,7 @@ where // task correctly exits. If we didn't drop it, the receiver task would just assume that we // had no more messages to send, waiting for-ev-er for the next one. let start = Instant::now(); - let _ = send_baton.wait().await; + _ = send_baton.wait().await; assert!(sender.send(send_value.into()).await.is_ok()); let send_delay = start.elapsed(); assert!(send_delay > recv_delay); diff --git a/lib/vector-buffers/src/variants/disk_v2/backed_archive.rs b/lib/vector-buffers/src/variants/disk_v2/backed_archive.rs index 0aaea625980e1..baf5323872ba6 100644 --- a/lib/vector-buffers/src/variants/disk_v2/backed_archive.rs +++ b/lib/vector-buffers/src/variants/disk_v2/backed_archive.rs @@ -71,7 +71,7 @@ where for<'a> T::Archived: CheckBytes>, { // Validate that the input is, well, valid. - let _ = check_archived_root::(backing.as_ref())?; + _ = check_archived_root::(backing.as_ref())?; // Now that we know the buffer fits T, we're good to go! Ok(Self { @@ -110,7 +110,7 @@ where { // Serialize our value so we can shove it into the backing. let mut serializer = DefaultSerializer::default(); - let _ = serializer + _ = serializer .serialize_value(&value) .map_err(|e| SerializeError::FailedToSerialize(e.to_string()))?; diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs index 5dc43198c9889..d723af0fa6ca9 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs @@ -349,7 +349,7 @@ impl ReaderModel { // We have enough unconsumed event acknowledgements to fully acknowledge this // record. Remove it, consume the event acknowledgements, add a record // acknowledgement, and update the buffer size. - let _ = self.pending_record_acks.pop_front().unwrap(); + _ = self.pending_record_acks.pop_front().unwrap(); self.unconsumed_event_acks -= 1; self.unconsumed_record_acks += 1; @@ -369,7 +369,7 @@ impl ReaderModel { if self.unconsumed_record_acks >= required_record_acks { // We have enough unconsumed record acknowledgements to fully acknowledge this data // file. Remove it, consume the record acknowledgements, and delete the data file. - let _ = self.pending_data_file_acks.pop_front().unwrap(); + _ = self.pending_data_file_acks.pop_front().unwrap(); self.unconsumed_record_acks -= required_record_acks; assert!( diff --git a/lib/vector-common/src/event_test_util.rs b/lib/vector-common/src/event_test_util.rs index f98b90b1fe9a7..cbcfadf663ee9 100644 --- a/lib/vector-common/src/event_test_util.rs +++ b/lib/vector-common/src/event_test_util.rs @@ -21,7 +21,7 @@ pub fn contains_name_once(pattern: &str) -> Result<(), String> { names.push_str(", "); } n_events += 1; - let _ = write!(names, "`{event}`"); + _ = write!(names, "`{event}`"); } } if n_events == 0 { diff --git a/lib/vector-common/src/finalization.rs b/lib/vector-common/src/finalization.rs index 4510e74c60e23..396760b5200cb 100644 --- a/lib/vector-common/src/finalization.rs +++ b/lib/vector-common/src/finalization.rs @@ -266,7 +266,7 @@ impl OwnedBatchNotifier { let status = self.status.load(); // Ignore the error case, as it will happen during normal // source shutdown and we can't detect that here. - let _ = notifier.send(status); + _ = notifier.send(status); } } } diff --git a/lib/vector-core/src/event/metric/tags.rs b/lib/vector-core/src/event/metric/tags.rs index e70dc88f4204e..9a2070b0f4d30 100644 --- a/lib/vector-core/src/event/metric/tags.rs +++ b/lib/vector-core/src/event/metric/tags.rs @@ -269,7 +269,7 @@ impl<'a> IntoIterator for &'a TagValueSet { // The impl for `Hash` here follows the guarantees for the derived `PartialEq`, The resulting hash // will always be the same if the contents compare equal, so we can ignore the clippy lint. -#[allow(clippy::derive_hash_xor_eq)] +#[allow(clippy::derived_hash_with_manual_eq)] impl Hash for TagValueSet { fn hash(&self, hasher: &mut H) { match self { diff --git a/lib/vector-core/src/metrics/recency.rs b/lib/vector-core/src/metrics/recency.rs index 9d3747f10b8b8..e7c33cfe8be5a 100644 --- a/lib/vector-core/src/metrics/recency.rs +++ b/lib/vector-core/src/metrics/recency.rs @@ -110,7 +110,7 @@ impl Generational { F: Fn(&T) -> V, { let result = f(&self.inner); - let _ = self.gen.fetch_add(1, Ordering::AcqRel); + _ = self.gen.fetch_add(1, Ordering::AcqRel); result } } diff --git a/lib/vector-core/src/metrics/tests/mod.rs b/lib/vector-core/src/metrics/tests/mod.rs index 1e60807aa196b..b5c55a506092b 100644 --- a/lib/vector-core/src/metrics/tests/mod.rs +++ b/lib/vector-core/src/metrics/tests/mod.rs @@ -6,7 +6,7 @@ use crate::event::Event; #[ignore] #[test] fn test_labels_injection() { - let _ = super::init(); + _ = super::init(); let span = span!( Level::ERROR, @@ -46,7 +46,7 @@ fn test_labels_injection() { #[test] fn test_cardinality_metric() { - let _ = super::init(); + _ = super::init(); let capture_value = || { let metric = super::Controller::get() diff --git a/rust-toolchain.toml b/rust-toolchain.toml index c727eb35b1d1d..f238a797e8764 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.66.1" +channel = "1.69.0" profile = "default" diff --git a/src/api/schema/components/mod.rs b/src/api/schema/components/mod.rs index d6ba5ade90874..bd8cdb2f682fe 100644 --- a/src/api/schema/components/mod.rs +++ b/src/api/schema/components/mod.rs @@ -327,7 +327,7 @@ pub fn update_config(config: &Config) { existing_component_keys .difference(&new_component_keys) .for_each(|component_key| { - let _ = COMPONENT_CHANGED.send(ComponentChanged::Removed( + _ = COMPONENT_CHANGED.send(ComponentChanged::Removed( state::component_by_component_key(component_key) .expect("Couldn't get component by key"), )); @@ -337,7 +337,7 @@ pub fn update_config(config: &Config) { new_component_keys .difference(&existing_component_keys) .for_each(|component_key| { - let _ = COMPONENT_CHANGED.send(ComponentChanged::Added( + _ = COMPONENT_CHANGED.send(ComponentChanged::Added( new_components.get(component_key).unwrap().clone(), )); }); diff --git a/src/api/tap.rs b/src/api/tap.rs index bc189154db32e..38603f8aace4e 100644 --- a/src/api/tap.rs +++ b/src/api/tap.rs @@ -179,7 +179,7 @@ fn shutdown_trigger(control_tx: ControlChannel, sink_id: ComponentKey) -> Shutdo let (shutdown_tx, shutdown_rx) = oneshot::channel(); tokio::spawn(async move { - let _ = shutdown_rx.await; + _ = shutdown_rx.await; if control_tx .send(fanout::ControlMessage::Remove(sink_id.clone())) .is_err() diff --git a/src/api/tests.rs b/src/api/tests.rs index b951e986af622..9abe2c6277e7c 100644 --- a/src/api/tests.rs +++ b/src/api/tests.rs @@ -84,8 +84,8 @@ async fn sink_events() { MetricValue::Counter { value: 1.0 }, )); - let _ = fanout.send(metric_event).await.unwrap(); - let _ = fanout.send(log_event).await.unwrap(); + _ = fanout.send(metric_event).await.unwrap(); + _ = fanout.send(log_event).await.unwrap(); // 3rd payload should be the metric event assert!(matches!( diff --git a/src/app.rs b/src/app.rs index 17e0ed6c4bc28..c66adb2463375 100644 --- a/src/app.rs +++ b/src/app.rs @@ -129,7 +129,7 @@ impl ApplicationConfig { } Err(e) => { error!("An error occurred that Vector couldn't handle: {}.", e); - let _ = self.graceful_crash_sender.send(()); + _ = self.graceful_crash_sender.send(()); None } } @@ -154,7 +154,7 @@ impl Application { pub fn prepare() -> Result<(Runtime, Self), ExitCode> { let opts = Opts::get_matches().map_err(|error| { // Printing to stdout/err can itself fail; ignore it. - let _ = error.print(); + _ = error.print(); exitcode::USAGE })?; diff --git a/src/async_read.rs b/src/async_read.rs index 75d9a238f3e2b..00a3c53024684 100644 --- a/src/async_read.rs +++ b/src/async_read.rs @@ -91,7 +91,7 @@ mod tests { // Test one of the AsyncBufRead extension functions let mut line_one = String::new(); - let _ = reader.read_line(&mut line_one).await; + _ = reader.read_line(&mut line_one).await; assert_eq!("First line\n", line_one); @@ -99,7 +99,7 @@ mod tests { writer.flush().await.unwrap(); let mut line_two = String::new(); - let _ = reader.read_line(&mut line_two).await; + _ = reader.read_line(&mut line_two).await; assert_eq!("Second line\n", line_two); @@ -124,7 +124,7 @@ mod tests { // Test one of the AsyncBufRead extension functions let mut line_one = String::new(); - let _ = reader.read_line(&mut line_one).await; + _ = reader.read_line(&mut line_one).await; assert_eq!("First line\n", line_one); @@ -134,7 +134,7 @@ mod tests { writer.flush().await.unwrap(); let mut line_two = String::new(); - let _ = reader.read_line(&mut line_two).await; + _ = reader.read_line(&mut line_two).await; assert_eq!("", line_two); diff --git a/src/components/validation/resources/http.rs b/src/components/validation/resources/http.rs index a2ee1b349afe5..8fa066b6f3f4a 100644 --- a/src/components/validation/resources/http.rs +++ b/src/components/validation/resources/http.rs @@ -164,7 +164,7 @@ fn spawn_input_http_server( // Mark ourselves as completed now that we've sent all inputs to the source, and // additionally signal the HTTP server to also gracefully shutdown. - let _ = http_server_shutdown_tx.send(()); + _ = http_server_shutdown_tx.send(()); resource_completed.mark_as_done(); debug!("HTTP server external input resource completed."); @@ -278,7 +278,7 @@ fn spawn_output_http_server( debug!("HTTP server external output resource started."); resource_shutdown_rx.wait().await; - let _ = http_server_shutdown_tx.send(()); + _ = http_server_shutdown_tx.send(()); resource_completed.mark_as_done(); debug!("HTTP server external output resource completed."); diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index 3bf9cab4c2f62..2c2a066b17806 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -446,7 +446,7 @@ fn spawn_component_topology( config.healthchecks.set_require_healthy(Some(true)); let config_diff = ConfigDiff::initial(&config); - let _ = std::thread::spawn(move || { + _ = std::thread::spawn(move || { let test_runtime = Builder::new_current_thread() .enable_all() .build() diff --git a/src/components/validation/sync.rs b/src/components/validation/sync.rs index bff640db4b130..0e842b66eb87f 100644 --- a/src/components/validation/sync.rs +++ b/src/components/validation/sync.rs @@ -132,7 +132,7 @@ impl WaitTrigger { // We don't care if our trigger is actually received, because the receiver side may // intentionally not be used i.e. if the code is generic in a way where only some codepaths // wait to be triggered and others don't, but the trigger must always be called regardless. - let _ = self.tx.send(()); + _ = self.tx.send(()); } } diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 9047795b3f9a5..1616a8670969d 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -102,7 +102,7 @@ pub fn spawn_thread<'a>( #[cfg(unix)] fn raise_sighup() { use nix::sys::signal; - let _ = signal::raise(signal::Signal::SIGHUP).map_err(|error| { + _ = signal::raise(signal::Signal::SIGHUP).map_err(|error| { error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error) }); } diff --git a/src/docker.rs b/src/docker.rs index 4befd12ea301d..805a24436aad3 100644 --- a/src/docker.rs +++ b/src/docker.rs @@ -149,7 +149,7 @@ async fn pull_image(docker: &Docker, image: &str, tag: &str) { async fn remove_container(docker: &Docker, id: &str) { trace!("Stopping container."); - let _ = docker + _ = docker .stop_container(id, None) .await .map_err(|e| error!(%e)); @@ -157,7 +157,7 @@ async fn remove_container(docker: &Docker, id: &str) { trace!("Removing container."); // Don't panic, as this is unrelated to the test - let _ = docker + _ = docker .remove_container(id, None) .await .map_err(|e| error!(%e)); diff --git a/src/internal_events/template.rs b/src/internal_events/template.rs index 7ee283c3f40d4..b1265d5b83924 100644 --- a/src/internal_events/template.rs +++ b/src/internal_events/template.rs @@ -15,7 +15,7 @@ impl<'a> InternalEvent for TemplateRenderingError<'a> { let mut msg = "Failed to render template".to_owned(); if let Some(field) = self.field { use std::fmt::Write; - let _ = write!(msg, " for \"{}\"", field); + _ = write!(msg, " for \"{}\"", field); } msg.push('.'); diff --git a/src/internal_telemetry/allocations/allocator/token.rs b/src/internal_telemetry/allocations/allocator/token.rs index ca1dec48de235..8409320e0132f 100644 --- a/src/internal_telemetry/allocations/allocator/token.rs +++ b/src/internal_telemetry/allocations/allocator/token.rs @@ -80,11 +80,11 @@ pub struct AllocationGroupToken { impl AllocationGroupToken { pub fn enter(&self) { - let _ = LOCAL_ALLOCATION_GROUP_STACK.try_with(|stack| stack.borrow_mut().push(self.id)); + _ = LOCAL_ALLOCATION_GROUP_STACK.try_with(|stack| stack.borrow_mut().push(self.id)); } pub fn exit(&self) { - let _ = LOCAL_ALLOCATION_GROUP_STACK.try_with(|stack| stack.borrow_mut().pop()); + _ = LOCAL_ALLOCATION_GROUP_STACK.try_with(|stack| stack.borrow_mut().pop()); } } diff --git a/src/internal_telemetry/allocations/mod.rs b/src/internal_telemetry/allocations/mod.rs index 6436bee6ca4ab..6cbd433b00b62 100644 --- a/src/internal_telemetry/allocations/mod.rs +++ b/src/internal_telemetry/allocations/mod.rs @@ -93,7 +93,7 @@ impl Tracer for MainTracer { #[inline(always)] fn trace_allocation(&self, object_size: usize, group_id: AllocationGroupId) { // Handle the case when thread local destructor is ran. - let _ = GROUP_MEM_STATS.try_with(|t| { + _ = GROUP_MEM_STATS.try_with(|t| { t.stats.allocations[group_id.as_raw() as usize] .fetch_add(object_size as u64, Ordering::Relaxed) }); @@ -102,7 +102,7 @@ impl Tracer for MainTracer { #[inline(always)] fn trace_deallocation(&self, object_size: usize, source_group_id: AllocationGroupId) { // Handle the case when thread local destructor is ran. - let _ = GROUP_MEM_STATS.try_with(|t| { + _ = GROUP_MEM_STATS.try_with(|t| { t.stats.deallocations[source_group_id.as_raw() as usize] .fetch_add(object_size as u64, Ordering::Relaxed) }); diff --git a/src/main.rs b/src/main.rs index 65aac7c9e080f..1859eff381c06 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,7 @@ fn main() { let opts = vector::cli::Opts::get_matches() .map_err(|error| { // Printing to stdout/err can itself fail; ignore it. - let _ = error.print(); + _ = error.print(); exitcode::USAGE }) .unwrap_or_else(|code| { diff --git a/src/signal.rs b/src/signal.rs index c7bbfe5bbcd1d..cb8666325f361 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -130,7 +130,7 @@ impl SignalHandler { pub fn clear(&mut self) { for shutdown_tx in self.shutdown_txs.drain(..) { // An error just means the channel was already shut down; safe to ignore. - let _ = shutdown_tx.send(()); + _ = shutdown_tx.send(()); } } } diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index 31747b2b3caee..a8b85e8428504 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -474,7 +474,7 @@ async fn create_client_test() -> CloudwatchLogsClient { async fn ensure_group() { let client = create_client_test().await; - let _ = client + _ = client .create_log_group() .log_group_name(GROUP_NAME) .send() diff --git a/src/sinks/blackhole/sink.rs b/src/sinks/blackhole/sink.rs index 7f627456e040c..92f90377931b1 100644 --- a/src/sinks/blackhole/sink.rs +++ b/src/sinks/blackhole/sink.rs @@ -89,8 +89,8 @@ impl StreamSink for BlackholeSink { let message_len = events.estimated_json_encoded_size_of(); - let _ = self.total_events.fetch_add(events.len(), Ordering::AcqRel); - let _ = self + _ = self.total_events.fetch_add(events.len(), Ordering::AcqRel); + _ = self .total_raw_bytes .fetch_add(message_len, Ordering::AcqRel); @@ -99,7 +99,7 @@ impl StreamSink for BlackholeSink { } // Notify the reporting task to shutdown. - let _ = shutdown.send(()); + _ = shutdown.send(()); Ok(()) } diff --git a/src/sinks/databend/service.rs b/src/sinks/databend/service.rs index 16a4ab76d381a..23da26f560d41 100644 --- a/src/sinks/databend/service.rs +++ b/src/sinks/databend/service.rs @@ -187,7 +187,7 @@ impl DatabendService { Some(self.file_format_options.clone()), Some(self.copy_options.clone()), ); - let _ = self.client.query(req).await?; + _ = self.client.query(req).await?; Ok(()) } } diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index ce9601e470a06..a2bd8330c5f35 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -706,9 +706,9 @@ mod tests { fn get_compressed_empty_series_payload() -> Bytes { let mut compressor = get_compressor(); - let _ = write_payload_header(DatadogMetricsEndpoint::Series, &mut compressor) + _ = write_payload_header(DatadogMetricsEndpoint::Series, &mut compressor) .expect("should not fail"); - let _ = write_payload_footer(DatadogMetricsEndpoint::Series, &mut compressor) + _ = write_payload_footer(DatadogMetricsEndpoint::Series, &mut compressor) .expect("should not fail"); compressor.finish().expect("should not fail").freeze() @@ -1001,7 +1001,7 @@ mod tests { compressed_limit, ); if let Ok(mut encoder) = result { - let _ = encoder.try_encode(metric); + _ = encoder.try_encode(metric); if let Ok((payload, _processed, _raw_bytes)) = encoder.finish() { prop_assert!(payload.len() <= compressed_limit); diff --git a/src/sinks/datadog/traces/apm_stats/flusher.rs b/src/sinks/datadog/traces/apm_stats/flusher.rs index 0dcdbd885e34e..d8a670473fc18 100644 --- a/src/sinks/datadog/traces/apm_stats/flusher.rs +++ b/src/sinks/datadog/traces/apm_stats/flusher.rs @@ -65,7 +65,7 @@ pub async fn flush_apm_stats_thread( sender.flush_apm_stats(true).await; // signal the sink (who tripped the tripwire), that we are done flushing - let _ = sink_shutdown_ack_sender.send(()); + _ = sink_shutdown_ack_sender.send(()); break; } Err(_) => { diff --git a/src/sinks/datadog/traces/sink.rs b/src/sinks/datadog/traces/sink.rs index 878011e430b55..910e108dfebe0 100644 --- a/src/sinks/datadog/traces/sink.rs +++ b/src/sinks/datadog/traces/sink.rs @@ -129,7 +129,7 @@ where let (sender, receiver) = channel(); // Signal the stats thread task to flush remaining payloads and shutdown. - let _ = self.shutdown.send(sender); + _ = self.shutdown.send(sender); // The stats flushing thread has until the component shutdown grace period to end // gracefully. Otherwise the sink + stats flushing thread will be killed and an error diff --git a/src/sinks/datadog_archives.rs b/src/sinks/datadog_archives.rs index a855179595630..c8fc0b1bce155 100644 --- a/src/sinks/datadog_archives.rs +++ b/src/sinks/datadog_archives.rs @@ -930,7 +930,7 @@ mod tests { let mut writer = Cursor::new(Vec::new()); let encoding = DatadogArchivesEncoding::new(Default::default()); - let _ = encoding.encode_input(vec![event], &mut writer); + _ = encoding.encode_input(vec![event], &mut writer); let encoded = writer.into_inner(); let json: BTreeMap = @@ -1016,7 +1016,7 @@ mod tests { let log1 = Event::Log(LogEvent::from("test event 1")); let mut writer = Cursor::new(Vec::new()); let encoding = DatadogArchivesEncoding::new(Default::default()); - let _ = encoding.encode_input(vec![log1], &mut writer); + _ = encoding.encode_input(vec![log1], &mut writer); let encoded = writer.into_inner(); let json: BTreeMap = serde_json::from_slice(encoded.as_slice()).unwrap(); @@ -1030,7 +1030,7 @@ mod tests { // check that id is different for the next event let log2 = Event::Log(LogEvent::from("test event 2")); let mut writer = Cursor::new(Vec::new()); - let _ = encoding.encode_input(vec![log2], &mut writer); + _ = encoding.encode_input(vec![log2], &mut writer); let encoded = writer.into_inner(); let json: BTreeMap = serde_json::from_slice(encoded.as_slice()).unwrap(); @@ -1048,7 +1048,7 @@ mod tests { let log = Event::Log(LogEvent::from("test message")); let mut writer = Cursor::new(Vec::new()); let encoding = DatadogArchivesEncoding::new(Default::default()); - let _ = encoding.encode_input(vec![log], &mut writer); + _ = encoding.encode_input(vec![log], &mut writer); let encoded = writer.into_inner(); let json: BTreeMap = serde_json::from_slice(encoded.as_slice()).unwrap(); diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs index 8e0bde24255c0..0558e8c44684c 100644 --- a/src/sinks/elasticsearch/encoder.rs +++ b/src/sinks/elasticsearch/encoder.rs @@ -134,7 +134,7 @@ mod tests { fn suppress_type_with_id() { let mut writer = Vec::new(); - let _ = write_bulk_action( + _ = write_bulk_action( &mut writer, "ACTION", "INDEX", @@ -162,7 +162,7 @@ mod tests { fn suppress_type_without_id() { let mut writer = Vec::new(); - let _ = write_bulk_action(&mut writer, "ACTION", "INDEX", "TYPE", true, &None); + _ = write_bulk_action(&mut writer, "ACTION", "INDEX", "TYPE", true, &None); let value: serde_json::Value = serde_json::from_slice(&writer).unwrap(); let value = value.as_object().unwrap(); @@ -182,7 +182,7 @@ mod tests { fn type_with_id() { let mut writer = Vec::new(); - let _ = write_bulk_action( + _ = write_bulk_action( &mut writer, "ACTION", "INDEX", @@ -211,7 +211,7 @@ mod tests { fn type_without_id() { let mut writer = Vec::new(); - let _ = write_bulk_action(&mut writer, "ACTION", "INDEX", "TYPE", false, &None); + _ = write_bulk_action(&mut writer, "ACTION", "INDEX", "TYPE", false, &None); let value: serde_json::Value = serde_json::from_slice(&writer).unwrap(); let value = value.as_object().unwrap(); diff --git a/src/sinks/elasticsearch/integration_tests.rs b/src/sinks/elasticsearch/integration_tests.rs index ad11b86215e87..2adc6eec701a4 100644 --- a/src/sinks/elasticsearch/integration_tests.rs +++ b/src/sinks/elasticsearch/integration_tests.rs @@ -224,7 +224,7 @@ async fn auto_version_http() { batch: batch_settings(), ..Default::default() }; - let _ = ElasticsearchCommon::parse_single(&config) + _ = ElasticsearchCommon::parse_single(&config) .await .expect("Config error"); } @@ -249,7 +249,7 @@ async fn auto_version_https() { batch: batch_settings(), ..Default::default() }; - let _ = ElasticsearchCommon::parse_single(&config) + _ = ElasticsearchCommon::parse_single(&config) .await .expect("Config error"); } @@ -270,7 +270,7 @@ async fn auto_version_aws() { ..Default::default() }; - let _ = ElasticsearchCommon::parse_single(&config) + _ = ElasticsearchCommon::parse_single(&config) .await .expect("Config error"); } @@ -669,7 +669,7 @@ async fn run_insert_tests_with_multiple_endpoints(config: &ElasticsearchConfig) // make sure writes are all visible for common in commons { - let _ = flush(common).await; + _ = flush(common).await; } let client = create_http_client(); diff --git a/src/sinks/http.rs b/src/sinks/http.rs index bf3bce0969b36..e85bf8b1f4365 100644 --- a/src/sinks/http.rs +++ b/src/sinks/http.rs @@ -631,7 +631,7 @@ mod tests { let cx = SinkContext::new_test(); - let _ = config.build(cx).await.unwrap(); + _ = config.build(cx).await.unwrap(); } #[tokio::test] diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 7aa28db6d8830..93e61aed8c7dd 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -768,7 +768,7 @@ mod tests { let (mut config, cx) = load_sink::(&config).unwrap(); // Make sure we can build the config - let _ = config.build(cx.clone()).await.unwrap(); + _ = config.build(cx.clone()).await.unwrap(); let addr = next_addr(); // Swap out the host so we can force send it diff --git a/src/sinks/influxdb/mod.rs b/src/sinks/influxdb/mod.rs index 6823253f521f2..4cefce45e3992 100644 --- a/src/sinks/influxdb/mod.rs +++ b/src/sinks/influxdb/mod.rs @@ -613,7 +613,7 @@ mod tests { database = "my-database" "#; let config: InfluxDbTestConfig = toml::from_str(config).unwrap(); - let _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap(); + _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap(); } #[test] @@ -624,7 +624,7 @@ mod tests { token = "my-token" "#; let config: InfluxDbTestConfig = toml::from_str(config).unwrap(); - let _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap(); + _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap(); } #[test] diff --git a/src/sinks/mezmo.rs b/src/sinks/mezmo.rs index cae05fabe0357..aed7049244f26 100644 --- a/src/sinks/mezmo.rs +++ b/src/sinks/mezmo.rs @@ -471,7 +471,7 @@ mod tests { .unwrap(); // Make sure we can build the config - let _ = config.build(cx.clone()).await.unwrap(); + _ = config.build(cx.clone()).await.unwrap(); let addr = next_addr(); // Swap out the host so we can force send it diff --git a/src/sinks/prometheus/collector.rs b/src/sinks/prometheus/collector.rs index e97023a7a528f..0a3714485e11d 100644 --- a/src/sinks/prometheus/collector.rs +++ b/src/sinks/prometheus/collector.rs @@ -255,7 +255,7 @@ impl MetricCollector for StringCollector { result.push_str(name); result.push_str(suffix); Self::encode_tags(result, tags, extra); - let _ = match timestamp_millis { + _ = match timestamp_millis { None => writeln!(result, " {}", value), Some(timestamp) => writeln!(result, " {} {}", value, timestamp), }; diff --git a/src/sinks/sematext/logs.rs b/src/sinks/sematext/logs.rs index 58d5d8cace551..a5dcb0da9434c 100644 --- a/src/sinks/sematext/logs.rs +++ b/src/sinks/sematext/logs.rs @@ -185,7 +185,7 @@ mod tests { .unwrap(); // Make sure we can build the config - let _ = config.build(cx.clone()).await.unwrap(); + _ = config.build(cx.clone()).await.unwrap(); let addr = next_addr(); // Swap out the host so we can force send it diff --git a/src/sinks/splunk_hec/common/acknowledgements.rs b/src/sinks/splunk_hec/common/acknowledgements.rs index 1528141cb6397..9ef7fb9e9bb09 100644 --- a/src/sinks/splunk_hec/common/acknowledgements.rs +++ b/src/sinks/splunk_hec/common/acknowledgements.rs @@ -163,7 +163,7 @@ impl HecAckClient { let mut removed_count = 0.0; for ack_id in ack_ids { if let Some((_, ack_event_status_sender)) = self.acks.remove(ack_id) { - let _ = ack_event_status_sender.send(EventStatus::Delivered); + _ = ack_event_status_sender.send(EventStatus::Delivered); removed_count += 1.0; debug!(message = "Finalized ack id.", ?ack_id); } @@ -198,7 +198,7 @@ impl HecAckClient { let mut removed_count = 0.0; for ack_id in expired_ack_ids { if let Some((_, ack_event_status_sender)) = self.acks.remove(&ack_id) { - let _ = ack_event_status_sender.send(status); + _ = ack_event_status_sender.send(status); removed_count += 1.0; } } @@ -328,7 +328,7 @@ mod tests { fn test_get_ack_query_body() { let mut ack_client = get_ack_client(1); let ack_ids = (0..100).collect::>(); - let _ = populate_ack_client(&mut ack_client, &ack_ids); + _ = populate_ack_client(&mut ack_client, &ack_ids); let expected_ack_body = HecAckStatusRequest { acks: ack_ids }; let mut ack_request_body = ack_client.get_ack_query_body(); @@ -340,7 +340,7 @@ mod tests { fn test_decrement_retries() { let mut ack_client = get_ack_client(1); let ack_ids = (0..100).collect::>(); - let _ = populate_ack_client(&mut ack_client, &ack_ids); + _ = populate_ack_client(&mut ack_client, &ack_ids); let mut ack_request_body = ack_client.get_ack_query_body(); ack_request_body.acks.sort_unstable(); diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index d720f6c26c6ff..7f44cfa90e4df 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -562,7 +562,7 @@ mod tests { let mock_server = get_hec_mock_server(true, ack_response_always_fail).await; let mut service = get_hec_service(mock_server.uri(), Default::default()); - let _ = service.call(get_hec_request()).await; + _ = service.call(get_hec_request()).await; } #[tokio::test] diff --git a/src/sinks/statsd.rs b/src/sinks/statsd.rs index fb8f08e47ac03..95978bd54ec88 100644 --- a/src/sinks/statsd.rs +++ b/src/sinks/statsd.rs @@ -105,7 +105,7 @@ fn default_address() -> SocketAddr { impl GenerateConfig for StatsdSinkConfig { fn generate_config() -> toml::Value { - toml::Value::try_from(&Self { + toml::Value::try_from(Self { default_namespace: None, mode: Mode::Udp(StatsdUdpConfig { batch: Default::default(), diff --git a/src/sinks/util/request_builder.rs b/src/sinks/util/request_builder.rs index 81c0f82c4da55..87280d2ec966e 100644 --- a/src/sinks/util/request_builder.rs +++ b/src/sinks/util/request_builder.rs @@ -74,7 +74,7 @@ pub trait RequestBuilder { // of clash-y with `Self::Metadata`. let mut compressor = Compressor::from(self.compression()); let is_compressed = compressor.is_compressed(); - let _ = self.encoder().encode_input(events, &mut compressor)?; + _ = self.encoder().encode_input(events, &mut compressor)?; let payload = compressor.into_inner().freeze(); let result = if is_compressed { diff --git a/src/sinks/util/sink.rs b/src/sinks/util/sink.rs index 9651b457f154f..000b3d82be265 100644 --- a/src/sinks/util/sink.rs +++ b/src/sinks/util/sink.rs @@ -468,7 +468,7 @@ where // If the rx end is dropped we still completed // the request so this is a weird case that we can // ignore for now. - let _ = tx.send(()); + _ = tx.send(()); }) .instrument(info_span!("request", %request_id).or_current()) .boxed() diff --git a/src/sinks/util/udp.rs b/src/sinks/util/udp.rs index 3e9ea874aa690..4899a66b84959 100644 --- a/src/sinks/util/udp.rs +++ b/src/sinks/util/udp.rs @@ -246,7 +246,7 @@ impl Service for UdpService { Box::pin(async move { // TODO: Add reconnect support as TCP/Unix? let result = udp_send(&mut socket, &msg).await.context(SendSnafu); - let _ = sender.send(socket); + _ = sender.send(socket); if result.is_ok() { // NOTE: This is obviously not happening before things like compression, etc, so it's currently a diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index 2a307e0a61b69..07ced35ba2a59 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -353,7 +353,7 @@ impl StreamSink for WebSocketSink { .await .is_ok() { - let _ = ws_sink.close().await; + _ = ws_sink.close().await; } } diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index 6544589845738..22aab58e2c2a4 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -839,7 +839,7 @@ impl EventStreamBuilder { fn finish(self, result: Result) { // This can legally fail when shutting down, and any other // reason should have been logged in the main future. - let _ = self.main_send.send(result); + _ = self.main_send.send(result); } } diff --git a/src/sources/docker_logs/tests.rs b/src/sources/docker_logs/tests.rs index 0f8cd0981c069..a9af612c30db3 100644 --- a/src/sources/docker_logs/tests.rs +++ b/src/sources/docker_logs/tests.rs @@ -224,7 +224,7 @@ mod integration_tests { trace!("Removing container."); // Don't panic, as this is unrelated to the test, and there are possibly other containers that need to be removed - let _ = docker + _ = docker .remove_container(id, None::) .await .map_err(|e| error!(%e)); @@ -633,7 +633,7 @@ mod integration_tests { let out = source_with(&[name], None, None).await; let events = collect_n(out, 1).await; - let _ = container_kill(&id, &docker).await; + _ = container_kill(&id, &docker).await; container_remove(&id, &docker).await; schema_definition.assert_valid_for_event(&events[0]); @@ -736,8 +736,8 @@ mod integration_tests { let exclude_out = source_with_config(config_ex).await; let include_out = source_with_config(config_in).await; - let _ = collect_n(include_out, 1).await; - let _ = container_kill(&id, &docker).await; + _ = collect_n(include_out, 1).await; + _ = container_kill(&id, &docker).await; container_remove(&id, &docker).await; assert!(is_empty(exclude_out)); @@ -766,7 +766,7 @@ mod integration_tests { let out = source_with(&[name], None, None).await; let events = collect_n(out, 1).await; - let _ = container_kill(&id, &docker).await; + _ = container_kill(&id, &docker).await; container_remove(&id, &docker).await; schema_definition.assert_valid_for_event(&events[0]); diff --git a/src/sources/http_client/integration_tests.rs b/src/sources/http_client/integration_tests.rs index 6f99b598d617b..ec7b9f11d71fc 100644 --- a/src/sources/http_client/integration_tests.rs +++ b/src/sources/http_client/integration_tests.rs @@ -303,5 +303,5 @@ async fn shutdown() { assert!(shutdown_success); // Ensure source actually shut down successfully. - let _ = source_handle.await.unwrap(); + _ = source_handle.await.unwrap(); } diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 61288dbf33584..d6d91ceef9256 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -678,7 +678,7 @@ struct RunningJournalctl(Child); impl Drop for RunningJournalctl { fn drop(&mut self) { if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) { - let _ = kill(Pid::from_raw(pid), Signal::SIGTERM); + _ = kill(Pid::from_raw(pid), Signal::SIGTERM); } } } diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index e8d4f1e4e56d1..4a7a9cc88e930 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -420,7 +420,7 @@ async fn kafka_source( if let Ok(current_assignment) = consumer.assignment() { // not logging on error because it will error if there are no offsets stored for a partition, // and this is best-effort cleanup anyway - let _ = consumer.commit(¤t_assignment, CommitMode::Sync); + _ = consumer.commit(¤t_assignment, CommitMode::Sync); } Ok(()) } diff --git a/src/sources/kubernetes_logs/lifecycle.rs b/src/sources/kubernetes_logs/lifecycle.rs index 58937ab9a5be6..93d17478eeb7d 100644 --- a/src/sources/kubernetes_logs/lifecycle.rs +++ b/src/sources/kubernetes_logs/lifecycle.rs @@ -131,7 +131,7 @@ impl Future for ShutdownHandle { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let _ = ready!(self.0.poll_unpin(cx)); + _ = ready!(self.0.poll_unpin(cx)); Poll::Ready(()) } } diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index a5d5efa6359bd..86b6fefa31d85 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -243,7 +243,7 @@ const fn default_read_from() -> ReadFromConfig { impl GenerateConfig for Config { fn generate_config() -> toml::Value { - toml::Value::try_from(&Self { + toml::Value::try_from(Self { self_node_name: default_self_node_name_env_template(), auto_partial_merge: true, ..Default::default() diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 23419aaed5beb..280a57d92e6f2 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -106,7 +106,7 @@ async fn receive_grpc_logs_vector_namespace() { schema_url: "v1".into(), }], }); - let _ = client.export(req).await; + _ = client.export(req).await; let mut output = test_util::collect_ready(logs_output).await; // we just send one, so only one output assert_eq!(output.len(), 1); @@ -246,7 +246,7 @@ async fn receive_grpc_logs_legacy_namespace() { schema_url: "v1".into(), }], }); - let _ = client.export(req).await; + _ = client.export(req).await; let mut output = test_util::collect_ready(logs_output).await; // we just send one, so only one output assert_eq!(output.len(), 1); diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index ebfcec8b000cc..5ca1473907bde 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -707,7 +707,7 @@ mod test { assert!(shutdown_success); // Ensure source actually shut down successfully. - let _ = source_handle.await.unwrap(); + _ = source_handle.await.unwrap(); }) .await; } @@ -1112,7 +1112,7 @@ mod test { let (tx, rx) = SourceSender::new_test(); let address = init_udp(tx, false).await; - let _ = send_lines_udp(address, vec!["test".to_string()]); + _ = send_lines_udp(address, vec!["test".to_string()]); let events = collect_n(rx, 1).await; assert_eq!( @@ -1148,7 +1148,7 @@ mod test { assert!(shutdown_success); // Ensure source actually shut down successfully. - let _ = source_handle.await.unwrap(); + _ = source_handle.await.unwrap(); }) .await; } @@ -1187,7 +1187,7 @@ mod test { assert!(shutdown_success); // Ensure that the source has actually shut down. - let _ = source_handle.await.unwrap(); + _ = source_handle.await.unwrap(); // Stop the pump from sending lines forever. run_pump_atomic_sender.store(false, Ordering::Relaxed); diff --git a/src/sources/socket/udp.rs b/src/sources/socket/udp.rs index d6bbee7f45dc6..50fd6a7f6f408 100644 --- a/src/sources/socket/udp.rs +++ b/src/sources/socket/udp.rs @@ -215,7 +215,7 @@ pub(super) fn udp( Ok((mut events, _byte_size)) => { if last && truncated { // The last event in this payload was truncated, so we want to drop it. - let _ = events.pop(); + _ = events.pop(); warn!( message = "Discarding frame larger than max_length.", max_length = max_length, diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index b239921ce5b9e..c0aca7babff59 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -2169,7 +2169,7 @@ mod tests { .unwrap(); assert_eq!("Success", event_res.text.as_str()); assert_eq!(0, event_res.code); - let _ = collect_n(source, 1).await; + _ = collect_n(source, 1).await; let ack_message = serde_json::to_string(&HecAckStatusRequest { acks: vec![event_res.ack_id], @@ -2214,7 +2214,7 @@ mod tests { .unwrap(); assert_eq!("Success", event_res.text.as_str()); assert_eq!(0, event_res.code); - let _ = collect_n(source, 1).await; + _ = collect_n(source, 1).await; let ack_message = serde_json::to_string(&HecAckStatusRequest { acks: vec![event_res.ack_id], @@ -2257,7 +2257,7 @@ mod tests { .json::() .await .unwrap(); - let _ = collect_n(source, 1).await; + _ = collect_n(source, 1).await; let ack_message = serde_json::to_string(&HecAckStatusRequest { acks: vec![event_res.ack_id], @@ -2363,7 +2363,7 @@ mod tests { .json::() .await .unwrap(); - let _ = collect_n(source, 11).await; + _ = collect_n(source, 11).await; let ack_message_dropped = serde_json::to_string(&HecAckStatusRequest { acks: (0..10).collect::>(), diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index ba088ebb85ef6..8ed437d9bbaed 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -152,7 +152,7 @@ impl FrameStreamReader { None } else if self.state.expect_control_frame { self.state.expect_control_frame = false; - let _ = self.handle_control_frame(frame); + _ = self.handle_control_frame(frame); None } else { //data frame @@ -196,7 +196,7 @@ impl FrameStreamReader { } ControlHeader::Start => { //check for content type - let _ = self.process_fields(header, &mut frame)?; + _ = self.process_fields(header, &mut frame)?; //if didn't error, then we are ok to change state self.state.control_state = ControlState::ReadingData; self.state.is_bidirectional = false; //if first message was START then we are unidirectional (no responses) @@ -208,7 +208,7 @@ impl FrameStreamReader { match header { ControlHeader::Start => { //check for content type - let _ = self.process_fields(header, &mut frame)?; + _ = self.process_fields(header, &mut frame)?; //if didn't error, then we are ok to change state self.state.control_state = ControlState::ReadingData; } @@ -219,7 +219,7 @@ impl FrameStreamReader { match header { ControlHeader::Stop => { //check there aren't any fields - let _ = self.process_fields(header, &mut frame)?; + _ = self.process_fields(header, &mut frame)?; if self.state.is_bidirectional { //send FINISH frame -- but only if we are bidirectional self.send_control_frame(Self::make_frame(ControlHeader::Finish, None)); @@ -390,7 +390,7 @@ pub fn build_framestream_unix_source( // system's 'net.core.rmem_max' might have to be changed if socket receive buffer is not updated properly if let Some(socket_receive_buffer_size) = frame_handler.socket_receive_buffer_size() { - let _ = nix::sys::socket::setsockopt( + _ = nix::sys::socket::setsockopt( listener.as_raw_fd(), nix::sys::socket::sockopt::RcvBuf, &(socket_receive_buffer_size), @@ -405,7 +405,7 @@ pub fn build_framestream_unix_source( // system's 'net.core.wmem_max' might have to be changed if socket send buffer is not updated properly if let Some(socket_send_buffer_size) = frame_handler.socket_send_buffer_size() { - let _ = nix::sys::socket::setsockopt( + _ = nix::sys::socket::setsockopt( listener.as_raw_fd(), nix::sys::socket::sockopt::SndBuf, &(socket_send_buffer_size), @@ -762,7 +762,7 @@ mod test { ) { let mut stream = stream::iter(frames.into_iter()); //send and send_all consume the sink - let _ = sock_sink.send_all(&mut stream).await; + _ = sock_sink.send_all(&mut stream).await; } async fn send_control_frame + Unpin>( @@ -870,7 +870,7 @@ mod test { // Ensure source actually shut down successfully. signal_shutdown(source_name, &mut shutdown).await; - let _ = source_handle.await.unwrap(); + _ = source_handle.await.unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -918,7 +918,7 @@ mod test { // Ensure source actually shut down successfully. signal_shutdown(source_name, &mut shutdown).await; - let _ = source_handle.await.unwrap(); + _ = source_handle.await.unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -948,7 +948,7 @@ mod test { // Ensure source actually shut down successfully. signal_shutdown(source_name, &mut shutdown).await; - let _ = source_handle.await.unwrap(); + _ = source_handle.await.unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -983,7 +983,7 @@ mod test { // Ensure source actually shut down successfully. signal_shutdown(source_name, &mut shutdown).await; - let _ = source_handle.await.unwrap(); + _ = source_handle.await.unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -1041,7 +1041,7 @@ mod test { // Ensure source actually shut down successfully. signal_shutdown(source_name, &mut shutdown).await; - let _ = source_handle.await.unwrap(); + _ = source_handle.await.unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -1079,7 +1079,7 @@ mod test { // Ensure source actually shut down successfully. signal_shutdown(source_name, &mut shutdown).await; - let _ = source_handle.await.unwrap(); + _ = source_handle.await.unwrap(); } #[tokio::test(flavor = "multi_thread")] diff --git a/src/sources/util/net/tcp/mod.rs b/src/sources/util/net/tcp/mod.rs index 5d0a8f2d406f1..e260e67872eb3 100644 --- a/src/sources/util/net/tcp/mod.rs +++ b/src/sources/util/net/tcp/mod.rs @@ -143,7 +143,7 @@ where let tripwire = cx.shutdown.clone(); let tripwire = async move { - let _ = tripwire.await; + _ = tripwire.await; sleep(shutdown_timeout_secs).await; } .shared(); diff --git a/src/test_util/mock/sinks/basic.rs b/src/test_util/mock/sinks/basic.rs index 0b4d3905a443b..0182b56d77bf0 100644 --- a/src/test_util/mock/sinks/basic.rs +++ b/src/test_util/mock/sinks/basic.rs @@ -75,7 +75,7 @@ impl SinkConfig for BasicSinkConfig { let health_tx = if self.healthy { Some(tx) } else { - let _ = tx.send(Err(HealthcheckError::Unhealthy.into())); + _ = tx.send(Err(HealthcheckError::Unhealthy.into())); None }; @@ -109,7 +109,7 @@ impl StreamSink for MockSink { match self.sink { Mode::Normal(mut sink) => { if let Some(tx) = self.health_tx.take() { - let _ = tx.send(Ok(())); + _ = tx.send(Ok(())); } // We have an inner sink, so forward the input normally diff --git a/src/test_util/mock/sinks/oneshot.rs b/src/test_util/mock/sinks/oneshot.rs index 300b93eac0c66..084efdb107e83 100644 --- a/src/test_util/mock/sinks/oneshot.rs +++ b/src/test_util/mock/sinks/oneshot.rs @@ -68,7 +68,7 @@ impl StreamSink for OneshotSink { .next() .await .expect("must always get an item in oneshot sink"); - let _ = tx.send(events); + _ = tx.send(events); Ok(()) } diff --git a/src/test_util/mod.rs b/src/test_util/mod.rs index f126404251e3f..99f9f1b51badb 100644 --- a/src/test_util/mod.rs +++ b/src/test_util/mod.rs @@ -593,7 +593,7 @@ impl Future for CountReceiver { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); if let Some(trigger) = this.trigger.take() { - let _ = trigger.send(()); + _ = trigger.send(()); } let result = ready!(this.handle.poll_unpin(cx)); diff --git a/src/top/cmd.rs b/src/top/cmd.rs index 577a80d8c9818..f967b7d2c98ce 100644 --- a/src/top/cmd.rs +++ b/src/top/cmd.rs @@ -71,7 +71,7 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { continue; } }; - let _ = tx.send(EventType::InitializeState(state)).await; + _ = tx.send(EventType::InitializeState(state)).await; let subscription_client = match connect_subscription_client(ws_url.clone()).await { Ok(c) => c, @@ -85,21 +85,21 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { let finished = metrics::subscribe(subscription_client, tx.clone(), opts_clone.interval as i64); - let _ = tx + _ = tx .send(EventType::ConnectionUpdated(ConnectionStatus::Connected)) .await; // Tasks spawned in metrics::subscribe finish when the subscription // streams have completed. Currently, subscription streams only // complete when the underlying web socket connection to the GraphQL // server drops. - let _ = join_all(finished).await; - let _ = tx + _ = join_all(finished).await; + _ = tx .send(EventType::ConnectionUpdated( ConnectionStatus::Disconnected(RECONNECT_DELAY), )) .await; if opts_clone.no_reconnect { - let _ = shutdown_tx.send(()); + _ = shutdown_tx.send(()); break; } } diff --git a/src/top/dashboard.rs b/src/top/dashboard.rs index 2b3edb888dbf9..3f73b3424685e 100644 --- a/src/top/dashboard.rs +++ b/src/top/dashboard.rs @@ -361,12 +361,12 @@ pub async fn init_dashboard<'a>( }, k = key_press_rx.recv() => { if let KeyCode::Esc | KeyCode::Char('q') = k.unwrap() { - let _ = key_press_kill_tx.send(()); + _ = key_press_kill_tx.send(()); break } } _ = &mut shutdown_rx => { - let _ = key_press_kill_tx.send(()); + _ = key_press_kill_tx.send(()); break } } diff --git a/src/top/events.rs b/src/top/events.rs index 82f807a7381eb..5696e696c2910 100644 --- a/src/top/events.rs +++ b/src/top/events.rs @@ -17,7 +17,7 @@ pub fn capture_key_press() -> (mpsc::UnboundedReceiver, oneshot::Sender _ = &mut kill_rx => return, Some(Ok(event)) = events.next() => { if let Event::Key(k) = event { - let _ = tx.clone().send(k.code); + _ = tx.clone().send(k.code); }; } } diff --git a/src/top/metrics.rs b/src/top/metrics.rs index 188cbd6dc4f5f..8afbdbb9727f5 100644 --- a/src/top/metrics.rs +++ b/src/top/metrics.rs @@ -23,7 +23,7 @@ async fn component_added(client: Arc, tx: state::EventTx) { if let Some(d) = res.data { let c = d.component_added; let key = ComponentKey::from(c.component_id); - let _ = tx + _ = tx .send(state::EventType::ComponentAdded(state::ComponentRow { key, kind: c.on.to_string(), @@ -54,7 +54,7 @@ async fn allocated_bytes(client: Arc, tx: state::EventTx, in while let Some(Some(res)) = stream.next().await { if let Some(d) = res.data { let c = d.component_allocated_bytes; - let _ = tx + _ = tx .send(state::EventType::AllocatedBytes( c.into_iter() .map(|c| { @@ -79,7 +79,7 @@ async fn component_removed(client: Arc, tx: state::EventTx) if let Some(d) = res.data { let c = d.component_removed; let id = ComponentKey::from(c.component_id.as_str()); - let _ = tx.send(state::EventType::ComponentRemoved(id)).await; + _ = tx.send(state::EventType::ComponentRemoved(id)).await; } } } @@ -96,7 +96,7 @@ async fn received_events_totals( while let Some(Some(res)) = stream.next().await { if let Some(d) = res.data { let c = d.component_received_events_totals; - let _ = tx + _ = tx .send(state::EventType::ReceivedEventsTotals( c.into_iter() .map(|c| { @@ -124,7 +124,7 @@ async fn received_events_throughputs( while let Some(Some(res)) = stream.next().await { if let Some(d) = res.data { let c = d.component_received_events_throughputs; - let _ = tx + _ = tx .send(state::EventType::ReceivedEventsThroughputs( interval, c.into_iter() @@ -144,7 +144,7 @@ async fn sent_events_totals(client: Arc, tx: state::EventTx, while let Some(Some(res)) = stream.next().await { if let Some(d) = res.data { let c = d.component_sent_events_totals; - let _ = tx + _ = tx .send(state::EventType::SentEventsTotals( c.into_iter() .map(|c| SentEventsMetric { @@ -171,7 +171,7 @@ async fn sent_events_throughputs( while let Some(Some(res)) = stream.next().await { if let Some(d) = res.data { let c = d.component_sent_events_throughputs; - let _ = tx + _ = tx .send(state::EventType::SentEventsThroughputs( interval, c.into_iter() @@ -199,7 +199,7 @@ async fn processed_bytes_totals( while let Some(Some(res)) = stream.next().await { if let Some(d) = res.data { let c = d.component_processed_bytes_totals; - let _ = tx + _ = tx .send(state::EventType::ProcessedBytesTotals( c.into_iter() .map(|c| { @@ -227,7 +227,7 @@ async fn processed_bytes_throughputs( while let Some(Some(res)) = stream.next().await { if let Some(d) = res.data { let c = d.component_processed_bytes_throughputs; - let _ = tx + _ = tx .send(state::EventType::ProcessedBytesThroughputs( interval, c.into_iter() @@ -247,7 +247,7 @@ async fn errors_totals(client: Arc, tx: state::EventTx, inte while let Some(Some(res)) = stream.next().await { if let Some(d) = res.data { let c = d.component_errors_totals; - let _ = tx + _ = tx .send(state::EventType::ErrorsTotals( c.into_iter() .map(|c| { diff --git a/src/top/state.rs b/src/top/state.rs index 157ca058036db..dcc10409d4136 100644 --- a/src/top/state.rs +++ b/src/top/state.rs @@ -221,10 +221,10 @@ pub async fn updater(mut event_rx: EventRx) -> StateRx { } } EventType::ComponentAdded(c) => { - let _ = state.components.insert(c.key.clone(), c); + _ = state.components.insert(c.key.clone(), c); } EventType::ComponentRemoved(key) => { - let _ = state.components.remove(&key); + _ = state.components.remove(&key); } EventType::ConnectionUpdated(status) => { state.connection_status = status; @@ -232,7 +232,7 @@ pub async fn updater(mut event_rx: EventRx) -> StateRx { } // Send updated map to listeners - let _ = tx.send(state.clone()).await; + _ = tx.send(state.clone()).await; } }); diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 4c2c31531a9ca..722213cd965ad 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -239,7 +239,7 @@ pub async fn build_pieces( if let Err(e) = output { // Immediately send the error to the source's wrapper future, but ignore any // errors during the send, since nested errors wouldn't make any sense here. - let _ = pump_error_tx.send(e); + _ = pump_error_tx.send(e); had_pump_error = true; break; } diff --git a/src/topology/mod.rs b/src/topology/mod.rs index d77cc334e6eba..bb5299839d025 100644 --- a/src/topology/mod.rs +++ b/src/topology/mod.rs @@ -160,7 +160,7 @@ async fn handle_errors( .and_then(|res| res) .map_err(|e| { error!("An error occurred that Vector couldn't handle: {}.", e); - let _ = abort_tx.send(()); + _ = abort_tx.send(()); e }) } @@ -172,7 +172,7 @@ fn retain(vec: &mut Vec, mut retain_filter: impl FnMut(&mut T) -> bool) { if retain_filter(data) { i += 1; } else { - let _ = vec.remove(i); + _ = vec.remove(i); } } } diff --git a/src/topology/running.rs b/src/topology/running.rs index 0ab7a830dfe2e..903d36be042f4 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -685,7 +685,7 @@ impl RunningTopology { // output for the first time, since there's nothing to actually replace at this point. debug!(component = %key, fanout_id = %input, "Adding component input to fanout."); - let _ = output.send(ControlMessage::Add(key.clone(), tx.clone())); + _ = output.send(ControlMessage::Add(key.clone(), tx.clone())); } else { // We know that if this component is connected to a given input, and neither // components were changed, then the output must still exist, which means we paused @@ -693,7 +693,7 @@ impl RunningTopology { // now: debug!(component = %key, fanout_id = %input, "Replacing component input in fanout."); - let _ = output.send(ControlMessage::Replace(key.clone(), tx.clone())); + _ = output.send(ControlMessage::Replace(key.clone(), tx.clone())); } } @@ -737,14 +737,14 @@ impl RunningTopology { // Case 3: This component is no longer connected to the input from new config. debug!(component = %key, fanout_id = %input, "Removing component input from fanout."); - let _ = output.send(ControlMessage::Remove(key.clone())); + _ = output.send(ControlMessage::Remove(key.clone())); } else { // We know that if this component is connected to a given input, and it isn't being // changed, then it will exist when we reconnect inputs, so we should pause it // now to pause further sends through that component until we reconnect: debug!(component = %key, fanout_id = %input, "Pausing component input in fanout."); - let _ = output.send(ControlMessage::Pause(key.clone())); + _ = output.send(ControlMessage::Pause(key.clone())); } } } @@ -762,7 +762,7 @@ impl RunningTopology { let input = self.inputs.get(transform_key).cloned().unwrap(); let output = self.outputs.get_mut(&output_id).unwrap(); - let _ = output.send(ControlMessage::Add(transform_key.clone(), input)); + _ = output.send(ControlMessage::Add(transform_key.clone(), input)); } } @@ -777,7 +777,7 @@ impl RunningTopology { let input = self.inputs.get(sink_key).cloned().unwrap(); let output = self.outputs.get_mut(&output_id).unwrap(); - let _ = output.send(ControlMessage::Add(sink_key.clone(), input)); + _ = output.send(ControlMessage::Add(sink_key.clone(), input)); } } } diff --git a/src/topology/test/crash.rs b/src/topology/test/crash.rs index 001a7e4917b6b..254dce3662fb8 100644 --- a/src/topology/test/crash.rs +++ b/src/topology/test/crash.rs @@ -90,7 +90,7 @@ async fn test_source_panic() { let input_lines = random_lines(100).take(num_lines).collect::>(); send_lines(in_addr, input_lines.clone()).await.unwrap(); sleep(Duration::from_secs(1)).await; - let _ = std::panic::take_hook(); + _ = std::panic::take_hook(); // Our panic source should have panicked, but since the sink was also pulling from the other source, it should have // still been able to get all the events it sent. @@ -191,7 +191,7 @@ async fn test_sink_panic() { // Our panic sink should have panicked, but the other sink should have still been able to finish processing as it was not // directly attached. - let _ = std::panic::take_hook(); + _ = std::panic::take_hook(); assert!(UnboundedReceiverStream::new(crash).next().await.is_some()); topology.stop().await; diff --git a/src/topology/test/end_to_end.rs b/src/topology/test/end_to_end.rs index 33022aa0c250c..e9b1f8e499945 100644 --- a/src/topology/test/end_to_end.rs +++ b/src/topology/test/end_to_end.rs @@ -24,7 +24,7 @@ pub async fn respond( tx.send(()) .await .expect("Error sending 'before' status from test server"); - let _ = waiter.lock().await; + _ = waiter.lock().await; Ok(Response::builder() .status(status) .body(Body::empty()) diff --git a/src/trace.rs b/src/trace.rs index 2d7d78bcb3343..e5291a4d9d1e3 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -100,7 +100,7 @@ pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64) RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit); let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter)); - let _ = subscriber.try_init(); + _ = subscriber.try_init(); } else { let formatter = tracing_subscriber::fmt::layer() .with_ansi(color) @@ -113,7 +113,7 @@ pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64) RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit); let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter)); - let _ = subscriber.try_init(); + _ = subscriber.try_init(); } } @@ -154,7 +154,7 @@ fn try_buffer_event(log: &LogEvent) -> bool { /// If no subscribers are connected, this does nothing. fn try_broadcast_event(log: LogEvent) { if let Some(sender) = maybe_get_trace_sender() { - let _ = sender.send(log); + _ = sender.send(log); } } @@ -237,7 +237,7 @@ pub fn stop_early_buffering() { let buffered_events = consume_early_buffer(); for subscriber_tx in subscribers_tx { // Ignore any errors sending since the caller may have dropped or something else. - let _ = subscriber_tx.send(buffered_events.clone()); + _ = subscriber_tx.send(buffered_events.clone()); } } } diff --git a/src/transforms/lua/v2/mod.rs b/src/transforms/lua/v2/mod.rs index 13f9d76e503ac..b472368f62247 100644 --- a/src/transforms/lua/v2/mod.rs +++ b/src/transforms/lua/v2/mod.rs @@ -319,7 +319,7 @@ impl Lua { emit!(LuaGcTriggered { used_memory: self.lua.used_memory() }); - let _ = self + _ = self .lua .gc_collect() .context(RuntimeErrorGcSnafu) @@ -349,7 +349,7 @@ impl RuntimeTransform for Lua { F: FnMut(Event), { let lua = &self.lua; - let _ = lua + _ = lua .scope(|scope| -> mlua::Result<()> { lua.registry_value::(&self.hook_process)? .call(( @@ -371,7 +371,7 @@ impl RuntimeTransform for Lua { F: FnMut(Event), { let lua = &self.lua; - let _ = lua + _ = lua .scope(|scope| -> mlua::Result<()> { match &self.hook_init { Some(key) => lua @@ -391,7 +391,7 @@ impl RuntimeTransform for Lua { F: FnMut(Event), { let lua = &self.lua; - let _ = lua + _ = lua .scope(|scope| -> mlua::Result<()> { match &self.hook_shutdown { Some(key) => lua @@ -411,7 +411,7 @@ impl RuntimeTransform for Lua { F: FnMut(Event), { let lua = &self.lua; - let _ = lua + _ = lua .scope(|scope| -> mlua::Result<()> { let handler_key = &self.timers[timer.id as usize].1; lua.registry_value::(handler_key)?