Skip to content

Commit

Permalink
Use updated Service trait
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Nov 3, 2024
1 parent 32ca384 commit 3dbbeca
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 28 deletions.
4 changes: 4 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.8.0] - 2024-11-04

* Use updated Service trait

## [2.7.1] - 2024-10-15

* Disconnect on error from service readiness check
Expand Down
4 changes: 2 additions & 2 deletions ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "2.7.1"
version = "2.8.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -19,7 +19,7 @@ path = "src/lib.rs"
ntex-codec = "0.6"
ntex-bytes = "0.1"
ntex-util = "2.5"
ntex-service = "3"
ntex-service = "3.3"

bitflags = "2"
log = "0.4"
Expand Down
63 changes: 37 additions & 26 deletions ntex-io/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,30 +446,38 @@ where
}
}

fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> {
match self.shared.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
// check for errors
Poll::Ready(if let Some(err) = self.shared.error.take() {
log::trace!(
"{}: Error occured, stopping dispatcher",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;
fn check_error(&mut self) -> PollService<U> {
// check for errors
if let Some(err) = self.shared.error.take() {
log::trace!(
"{}: Error occured, stopping dispatcher",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;

match err {
DispatcherError::Encoder(err) => {
PollService::Item(DispatchItem::EncoderError(err))
}
DispatcherError::Service(err) => {
self.error = Some(err);
PollService::Continue
}
}
} else {
PollService::Ready
})
match err {
DispatcherError::Encoder(err) => {
PollService::Item(DispatchItem::EncoderError(err))
}
DispatcherError::Service(err) => {
self.error = Some(err);
PollService::Continue
}
}
} else {
PollService::Ready
}
}

fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> {
// check service readiness
if self.shared.service.poll_not_ready(cx).is_pending() {
return Poll::Ready(self.check_error());
}

// wait until service becomes ready
match self.shared.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(self.check_error()),
// pause io read task
Poll::Pending => {
log::trace!(
Expand Down Expand Up @@ -850,13 +858,15 @@ mod tests {

impl Service<DispatchItem<BytesCodec>> for Srv {
type Response = Option<Response<BytesCodec>>;
type Error = ();
type Error = &'static str;

async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), ()> {
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.0.set(self.0.get() + 1);
Err(())
Err("test")
}

async fn not_ready(&self) {}

async fn call(
&self,
_: DispatchItem<BytesCodec>,
Expand All @@ -868,7 +878,8 @@ mod tests {

let (disp, state) = Dispatcher::debug(server, BytesCodec, Srv(counter.clone()));
spawn(async move {
let _ = disp.await;
let res = disp.await;
assert_eq!(res, Err("test"));
});

state
Expand Down

0 comments on commit 3dbbeca

Please sign in to comment.