Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better error handling #482

Merged
merged 3 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.9.2] - 2024-12-05

* Better error handling

## [2.9.1] - 2024-12-04

* Check service readiness for every turn
Expand Down
2 changes: 1 addition & 1 deletion ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "2.9.1"
version = "2.9.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand Down
44 changes: 24 additions & 20 deletions ntex-io/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,23 @@ impl IoState {
}
}

/// Get current io error
pub(super) fn error(&self) -> Option<io::Error> {
if let Some(err) = self.error.take() {
self.error
.set(Some(io::Error::new(err.kind(), format!("{}", err))));
Some(err)
} else {
None
}
}

/// Get current io result
pub(super) fn error_or_disconnected(&self) -> io::Error {
self.error()
.unwrap_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "Disconnected"))
}

pub(super) fn io_stopped(&self, err: Option<io::Error>) {
if err.is_some() {
self.error.set(err);
Expand Down Expand Up @@ -257,19 +274,6 @@ impl<F> Io<F> {
fn io_ref(&self) -> &IoRef {
unsafe { &*self.0.get() }
}

/// Get current io error
fn error(&self) -> Option<io::Error> {
self.st().error.take()
}

/// Get current io error
fn error_or_disconnected(&self) -> io::Error {
self.st()
.error
.take()
.unwrap_or_else(|| io::Error::new(io::ErrorKind::Other, "Disconnected"))
}
}

impl<F: FilterLayer, T: Filter> Io<Layer<F, T>> {
Expand Down Expand Up @@ -333,7 +337,7 @@ impl<F> Io<F> {
"Timeout",
))),
Err(RecvError::Stop) => Err(Either::Right(io::Error::new(
io::ErrorKind::Other,
io::ErrorKind::UnexpectedEof,
"Dispatcher stopped",
))),
Err(RecvError::WriteBackpressure) => {
Expand Down Expand Up @@ -423,7 +427,7 @@ impl<F> Io<F> {
let mut flags = st.flags.get();

if flags.is_stopped() {
Poll::Ready(Err(self.error_or_disconnected()))
Poll::Ready(Err(st.error_or_disconnected()))
} else {
st.dispatch_task.register(cx.waker());

Expand Down Expand Up @@ -511,7 +515,7 @@ impl<F> Io<F> {
let st = self.st();
let flags = st.flags.get();
if flags.is_stopped() {
Err(RecvError::PeerGone(self.error()))
Err(RecvError::PeerGone(st.error()))
} else if flags.contains(Flags::DSP_STOP) {
st.remove_flags(Flags::DSP_STOP);
Err(RecvError::Stop)
Expand Down Expand Up @@ -545,12 +549,12 @@ impl<F> Io<F> {
/// otherwise wake up when size of write buffer is lower than
/// buffer max size.
pub fn poll_flush(&self, cx: &mut Context<'_>, full: bool) -> Poll<io::Result<()>> {
let st = self.st();
let flags = self.flags();

if flags.is_stopped() {
Poll::Ready(Err(self.error_or_disconnected()))
Poll::Ready(Err(st.error_or_disconnected()))
} else {
let st = self.st();
let len = st.buffer.write_destination_size();
if len > 0 {
if full {
Expand All @@ -575,7 +579,7 @@ impl<F> Io<F> {
let flags = st.flags.get();

if flags.is_stopped() {
if let Some(err) = self.error() {
if let Some(err) = st.error() {
Poll::Ready(Err(err))
} else {
Poll::Ready(Ok(()))
Expand Down Expand Up @@ -611,7 +615,7 @@ impl<F> Io<F> {
let st = self.st();
let flags = st.flags.get();
if flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
Poll::Ready(IoStatusUpdate::PeerGone(self.error()))
Poll::Ready(IoStatusUpdate::PeerGone(st.error()))
} else if flags.contains(Flags::DSP_STOP) {
st.remove_flags(Flags::DSP_STOP);
Poll::Ready(IoStatusUpdate::Stop)
Expand Down
2 changes: 1 addition & 1 deletion ntex-io/src/ioref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl IoRef {
F: FnOnce(&mut BytesVec) -> R,
{
if self.0.flags.get().contains(Flags::IO_STOPPED) {
Err(io::Error::new(io::ErrorKind::Other, "Disconnected"))
Err(self.0.error_or_disconnected())
} else {
let result = self.0.buffer.with_write_source(self, f);
self.0.filter().process_write_buf(self, &self.0.buffer, 0)?;
Expand Down
2 changes: 1 addition & 1 deletion ntex-server/src/net/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ pub fn bind_addr<S: net::ToSocketAddrs>(
Err(e)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
io::ErrorKind::InvalidInput,
"Cannot bind to address.",
))
}
Expand Down
9 changes: 6 additions & 3 deletions ntex-tls/src/openssl/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@
log::trace!("{}: SSL Handshake start for: {:?}", io.tag(), host);

match openssl.configure() {
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e).into()),
Err(e) => Err(io::Error::new(io::ErrorKind::InvalidInput, e).into()),

Check warning on line 54 in ntex-tls/src/openssl/connect.rs

View check run for this annotation

Codecov / codecov/patch

ntex-tls/src/openssl/connect.rs#L54

Added line #L54 was not covered by tests
Ok(config) => {
let ssl = config
.into_ssl(&host)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let tag = io.tag();
match connect_io(io, ssl).await {
Ok(io) => {
Expand All @@ -64,7 +64,10 @@
}
Err(e) => {
log::trace!("{}: SSL Handshake error: {:?}", tag, e);
Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)).into())
Err(
io::Error::new(io::ErrorKind::InvalidInput, format!("{}", e))
.into(),
)

Check warning on line 70 in ntex-tls/src/openssl/connect.rs

View check run for this annotation

Codecov / codecov/patch

ntex-tls/src/openssl/connect.rs#L67-L70

Added lines #L67 - L70 were not covered by tests
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion ntex-tls/src/openssl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@
ssl::ErrorCode::WANT_READ => {
let res = io.read_notify().await;
match res? {
None => Err(io::Error::new(io::ErrorKind::Other, "disconnected")),
None => {
Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected"))

Check warning on line 254 in ntex-tls/src/openssl/mod.rs

View check run for this annotation

Codecov / codecov/patch

ntex-tls/src/openssl/mod.rs#L254

Added line #L254 was not covered by tests
}
_ => Ok(None),
}
}
Expand Down
11 changes: 7 additions & 4 deletions ntex/src/http/client/h2proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,17 @@
err
);
pl.set_error(
io::Error::new(io::ErrorKind::Other, err)
.into(),
io::Error::new(
io::ErrorKind::UnexpectedEof,
err,
)
.into(),

Check warning on line 194 in ntex/src/http/client/h2proto.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/client/h2proto.rs#L190-L194

Added lines #L190 - L194 were not covered by tests
);
}
_ => {
pl.set_error(
io::Error::new(
io::ErrorKind::Other,
io::ErrorKind::Unsupported,

Check warning on line 200 in ntex/src/http/client/h2proto.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/client/h2proto.rs#L200

Added line #L200 was not covered by tests
"unexpected h2 message",
)
.into(),
Expand All @@ -216,7 +219,7 @@
}
}
_ => Err(SendRequestError::Error(Box::new(io::Error::new(
io::ErrorKind::Other,
io::ErrorKind::Unsupported,

Check warning on line 222 in ntex/src/http/client/h2proto.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/client/h2proto.rs#L222

Added line #L222 was not covered by tests
"unexpected h2 message",
)))),
}
Expand Down
2 changes: 1 addition & 1 deletion ntex/src/http/encoding/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Err(Box::new(e)))),
Poll::Ready(Err(_)) => {
return Poll::Ready(Some(Err(Box::new(io::Error::new(
io::ErrorKind::Other,
io::ErrorKind::Interrupted,

Check warning on line 120 in ntex/src/http/encoding/encoder.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/encoding/encoder.rs#L120

Added line #L120 was not covered by tests
"Canceled",
)))));
}
Expand Down
4 changes: 2 additions & 2 deletions ntex/src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@
impl From<crate::rt::JoinError> for PayloadError {
fn from(_: crate::rt::JoinError) -> Self {
PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
io::ErrorKind::Interrupted,

Check warning on line 220 in ntex/src/http/error.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/error.rs#L220

Added line #L220 was not covered by tests
"Operation is canceled",
))
}
Expand All @@ -228,7 +228,7 @@
match err {
BlockingError::Error(e) => PayloadError::Io(e),
BlockingError::Canceled => PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
io::ErrorKind::Interrupted,
"Operation is canceled",
)),
}
Expand Down
4 changes: 3 additions & 1 deletion ntex/src/http/h2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@
h2::MessageKind::Disconnect(err) => {
log::debug!("Connection is disconnected {:?}", err);
if let Some(mut sender) = self.streams.borrow_mut().remove(&stream.id()) {
sender.set_error(io::Error::new(io::ErrorKind::Other, err).into());
sender.set_error(
io::Error::new(io::ErrorKind::UnexpectedEof, err).into(),
);

Check warning on line 413 in ntex/src/http/h2/service.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/h2/service.rs#L411-L413

Added lines #L411 - L413 were not covered by tests
}
return Ok(());
}
Expand Down
2 changes: 1 addition & 1 deletion ntex/src/http/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ where
server,
client: Client::build().finish(),
}
.set_client_timeout(Seconds(30), Millis(30_000))
.set_client_timeout(Seconds(45), Millis(45_000))
}

#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion ntex/src/web/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@
Err(e)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
io::ErrorKind::InvalidInput,

Check warning on line 470 in ntex/src/web/server.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/web/server.rs#L470

Added line #L470 was not covered by tests
"Cannot bind to address.",
))
}
Expand Down
8 changes: 4 additions & 4 deletions ntex/src/web/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,8 @@ where
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
Connector::default()
.lifetime(Seconds::ZERO)
.keep_alive(Seconds(30))
.timeout(Millis(30_000))
.keep_alive(Seconds(45))
.timeout(Millis(45_000))
.disconnect_timeout(Seconds(5))
.openssl(builder.build())
.finish()
Expand All @@ -727,14 +727,14 @@ where
{
Connector::default()
.lifetime(Seconds::ZERO)
.timeout(Millis(30_000))
.timeout(Millis(45_000))
.finish()
}
};

Client::build()
.connector(connector)
.timeout(Seconds(30))
.timeout(Seconds(45))
.finish()
};

Expand Down
8 changes: 4 additions & 4 deletions ntex/src/ws/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
Ok(())
} else {
self.insert_flags(Flags::PROTO_ERR);
Err(io::Error::new(io::ErrorKind::Other, err_message))
Err(io::Error::new(io::ErrorKind::InvalidData, err_message))

Check warning on line 57 in ntex/src/ws/transport.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/ws/transport.rs#L57

Added line #L57 was not covered by tests
}
}
}
Expand Down Expand Up @@ -96,7 +96,7 @@
self.codec.decode_vec(&mut src).map_err(|e| {
log::trace!("Failed to decode ws codec frames: {:?}", e);
self.insert_flags(Flags::PROTO_ERR);
io::Error::new(io::ErrorKind::Other, e)
io::Error::new(io::ErrorKind::InvalidData, e)

Check warning on line 99 in ntex/src/ws/transport.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/ws/transport.rs#L99

Added line #L99 was not covered by tests
})? {
frame
} else {
Expand All @@ -123,14 +123,14 @@
Frame::Continuation(Item::FirstText(_)) => {
self.insert_flags(Flags::PROTO_ERR);
return Err(io::Error::new(
io::ErrorKind::Other,
io::ErrorKind::InvalidData,

Check warning on line 126 in ntex/src/ws/transport.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/ws/transport.rs#L126

Added line #L126 was not covered by tests
"WebSocket Text continuation frames are not supported",
));
}
Frame::Text(_) => {
self.insert_flags(Flags::PROTO_ERR);
return Err(io::Error::new(
io::ErrorKind::Other,
io::ErrorKind::InvalidData,

Check warning on line 133 in ntex/src/ws/transport.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/ws/transport.rs#L133

Added line #L133 was not covered by tests
"WebSockets Text frames are not supported",
));
}
Expand Down
Loading