From a97059aa9b038201cfebe740e6f25da3798c32af Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 10 Nov 2024 15:24:00 +0500 Subject: [PATCH] Check service readiness once per decoded item (#463) --- ntex-io/CHANGES.md | 4 ++++ ntex-io/Cargo.toml | 2 +- ntex-io/src/dispatcher.rs | 22 ++++++++++++++++------ 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index c4d0aa3c..42a1e056 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.8.3] - 2024-11-10 + +* Check service readiness once per decoded item + ## [2.8.2] - 2024-11-05 * Do not rely on not_ready(), always check service readiness diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index f4a5b406..6f5faacf 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.8.2" +version = "2.8.3" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 4d2c467b..aa0d61f0 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -126,12 +126,12 @@ pin_project_lite::pin_project! { bitflags::bitflags! { #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] struct Flags: u8 { - const READY_ERR = 0b00001; - const IO_ERR = 0b00010; - const KA_ENABLED = 0b00100; - const KA_TIMEOUT = 0b01000; - const READ_TIMEOUT = 0b10000; - const READY = 0b100000; + const READY_ERR = 0b000001; + const IO_ERR = 0b000010; + const KA_ENABLED = 0b000100; + const KA_TIMEOUT = 0b001000; + const READ_TIMEOUT = 0b010000; + const READY = 0b100000; } } @@ -342,6 +342,7 @@ where PollService::Continue => continue, }; + slf.flags.remove(Flags::READY); slf.call_service(cx, item); } // handle write back-pressure @@ -471,9 +472,18 @@ where } fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.flags.contains(Flags::READY) { + if self.shared.service.poll_not_ready(cx).is_ready() { + self.flags.remove(Flags::READY); + } else { + return Poll::Ready(self.check_error()); + } + } + // wait until service becomes ready match self.shared.service.poll_ready(cx) { Poll::Ready(Ok(_)) => { + self.flags.insert(Flags::READY); let _ = self.shared.service.poll_not_ready(cx); Poll::Ready(self.check_error()) }