Skip to content

Commit

Permalink
Check service readiness once per decoded item (#463)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Nov 10, 2024
1 parent 5700a50 commit a97059a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
4 changes: 4 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.8.3] - 2024-11-10

* Check service readiness once per decoded item

## [2.8.2] - 2024-11-05

* Do not rely on not_ready(), always check service readiness
Expand Down
2 changes: 1 addition & 1 deletion ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "2.8.2"
version = "2.8.3"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand Down
22 changes: 16 additions & 6 deletions ntex-io/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ pin_project_lite::pin_project! {
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 {
const READY_ERR = 0b00001;
const IO_ERR = 0b00010;
const KA_ENABLED = 0b00100;
const KA_TIMEOUT = 0b01000;
const READ_TIMEOUT = 0b10000;
const READY = 0b100000;
const READY_ERR = 0b000001;
const IO_ERR = 0b000010;
const KA_ENABLED = 0b000100;
const KA_TIMEOUT = 0b001000;
const READ_TIMEOUT = 0b010000;
const READY = 0b100000;
}
}

Expand Down Expand Up @@ -342,6 +342,7 @@ where
PollService::Continue => continue,
};

slf.flags.remove(Flags::READY);
slf.call_service(cx, item);
}
// handle write back-pressure
Expand Down Expand Up @@ -471,9 +472,18 @@ where
}

fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> {
if self.flags.contains(Flags::READY) {
if self.shared.service.poll_not_ready(cx).is_ready() {
self.flags.remove(Flags::READY);
} else {
return Poll::Ready(self.check_error());
}
}

// wait until service becomes ready
match self.shared.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
self.flags.insert(Flags::READY);
let _ = self.shared.service.poll_not_ready(cx);
Poll::Ready(self.check_error())
}
Expand Down

0 comments on commit a97059a

Please sign in to comment.