From 154d27b743bf0448aba34f3a2011bc40b2677e27 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 5 Dec 2024 13:24:09 +0500 Subject: [PATCH 1/3] Better error handling --- ntex-io/CHANGES.md | 4 ++++ ntex-io/Cargo.toml | 2 +- ntex-io/src/io.rs | 42 +++++++++++++++++++++++------------------- ntex-io/src/ioref.rs | 2 +- 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 29791179..14f04654 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.9.2] - 2024-12-05 + +* Better error handling + ## [2.9.1] - 2024-12-04 * Check service readiness for every turn diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 2b369e62..696c7476 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.9.1" +version = "2.9.2" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 19cd2d6f..24f6e42e 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -80,6 +80,23 @@ impl IoState { } } + /// Get current io error + pub(super) fn error(&self) -> Option { + if let Some(err) = self.error.take() { + self.error + .set(Some(io::Error::new(err.kind(), format!("{}", err)))); + Some(err) + } else { + None + } + } + + /// Get current io result + pub(super) fn error_or_disconnected(&self) -> io::Error { + self.error() + .unwrap_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "Disconnected")) + } + pub(super) fn io_stopped(&self, err: Option) { if err.is_some() { self.error.set(err); @@ -257,19 +274,6 @@ impl Io { fn io_ref(&self) -> &IoRef { unsafe { &*self.0.get() } } - - /// Get current io error - fn error(&self) -> Option { - self.st().error.take() - } - - /// Get current io error - fn error_or_disconnected(&self) -> io::Error { - self.st() - .error - .take() - .unwrap_or_else(|| io::Error::new(io::ErrorKind::Other, "Disconnected")) - } } impl Io> { @@ -423,7 +427,7 @@ impl Io { let mut flags = st.flags.get(); if flags.is_stopped() { - Poll::Ready(Err(self.error_or_disconnected())) + Poll::Ready(Err(st.error_or_disconnected())) } else { st.dispatch_task.register(cx.waker()); @@ -511,7 +515,7 @@ impl Io { let st = self.st(); let flags = st.flags.get(); if flags.is_stopped() { - Err(RecvError::PeerGone(self.error())) + Err(RecvError::PeerGone(st.error())) } else if flags.contains(Flags::DSP_STOP) { st.remove_flags(Flags::DSP_STOP); Err(RecvError::Stop) @@ -545,12 +549,12 @@ impl Io { /// otherwise wake up when size of write buffer is lower than /// buffer max size. pub fn poll_flush(&self, cx: &mut Context<'_>, full: bool) -> Poll> { + let st = self.st(); let flags = self.flags(); if flags.is_stopped() { - Poll::Ready(Err(self.error_or_disconnected())) + Poll::Ready(Err(st.error_or_disconnected())) } else { - let st = self.st(); let len = st.buffer.write_destination_size(); if len > 0 { if full { @@ -575,7 +579,7 @@ impl Io { let flags = st.flags.get(); if flags.is_stopped() { - if let Some(err) = self.error() { + if let Some(err) = st.error() { Poll::Ready(Err(err)) } else { Poll::Ready(Ok(())) @@ -611,7 +615,7 @@ impl Io { let st = self.st(); let flags = st.flags.get(); if flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) { - Poll::Ready(IoStatusUpdate::PeerGone(self.error())) + Poll::Ready(IoStatusUpdate::PeerGone(st.error())) } else if flags.contains(Flags::DSP_STOP) { st.remove_flags(Flags::DSP_STOP); Poll::Ready(IoStatusUpdate::Stop) diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 054a1906..96fc32a0 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -191,7 +191,7 @@ impl IoRef { F: FnOnce(&mut BytesVec) -> R, { if self.0.flags.get().contains(Flags::IO_STOPPED) { - Err(io::Error::new(io::ErrorKind::Other, "Disconnected")) + Err(self.0.error_or_disconnected()) } else { let result = self.0.buffer.with_write_source(self, f); self.0.filter().process_write_buf(self, &self.0.buffer, 0)?; From 6cc37b2d3163745901af00f0feafb7e9eb577e1d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 5 Dec 2024 13:39:07 +0500 Subject: [PATCH 2/3] wip --- ntex-io/src/io.rs | 2 +- ntex-server/src/net/builder.rs | 2 +- ntex-tls/src/openssl/connect.rs | 9 ++++++--- ntex-tls/src/openssl/mod.rs | 4 +++- ntex/src/http/client/h2proto.rs | 11 +++++++---- ntex/src/http/encoding/encoder.rs | 2 +- ntex/src/http/error.rs | 4 ++-- ntex/src/http/h2/service.rs | 4 +++- ntex/src/web/server.rs | 2 +- ntex/src/ws/transport.rs | 8 ++++---- 10 files changed, 29 insertions(+), 19 deletions(-) diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 24f6e42e..e0b48976 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -337,7 +337,7 @@ impl Io { "Timeout", ))), Err(RecvError::Stop) => Err(Either::Right(io::Error::new( - io::ErrorKind::Other, + io::ErrorKind::UnexpectedEof, "Dispatcher stopped", ))), Err(RecvError::WriteBackpressure) => { diff --git a/ntex-server/src/net/builder.rs b/ntex-server/src/net/builder.rs index 95be84b9..0569f36a 100644 --- a/ntex-server/src/net/builder.rs +++ b/ntex-server/src/net/builder.rs @@ -360,7 +360,7 @@ pub fn bind_addr( Err(e) } else { Err(io::Error::new( - io::ErrorKind::Other, + io::ErrorKind::InvalidInput, "Cannot bind to address.", )) } diff --git a/ntex-tls/src/openssl/connect.rs b/ntex-tls/src/openssl/connect.rs index 3df5debf..c2ffe528 100644 --- a/ntex-tls/src/openssl/connect.rs +++ b/ntex-tls/src/openssl/connect.rs @@ -51,11 +51,11 @@ impl SslConnector { log::trace!("{}: SSL Handshake start for: {:?}", io.tag(), host); match openssl.configure() { - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e).into()), + Err(e) => Err(io::Error::new(io::ErrorKind::InvalidInput, e).into()), Ok(config) => { let ssl = config .into_ssl(&host) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; let tag = io.tag(); match connect_io(io, ssl).await { Ok(io) => { @@ -64,7 +64,10 @@ impl SslConnector { } Err(e) => { log::trace!("{}: SSL Handshake error: {:?}", tag, e); - Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)).into()) + Err( + io::Error::new(io::ErrorKind::InvalidInput, format!("{}", e)) + .into(), + ) } } } diff --git a/ntex-tls/src/openssl/mod.rs b/ntex-tls/src/openssl/mod.rs index 45ed1fcd..429b1e41 100644 --- a/ntex-tls/src/openssl/mod.rs +++ b/ntex-tls/src/openssl/mod.rs @@ -250,7 +250,9 @@ async fn handle_result( ssl::ErrorCode::WANT_READ => { let res = io.read_notify().await; match res? { - None => Err(io::Error::new(io::ErrorKind::Other, "disconnected")), + None => { + Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected")) + } _ => Ok(None), } } diff --git a/ntex/src/http/client/h2proto.rs b/ntex/src/http/client/h2proto.rs index e04d4763..e98209f7 100644 --- a/ntex/src/http/client/h2proto.rs +++ b/ntex/src/http/client/h2proto.rs @@ -187,14 +187,17 @@ async fn get_response( err ); pl.set_error( - io::Error::new(io::ErrorKind::Other, err) - .into(), + io::Error::new( + io::ErrorKind::UnexpectedEof, + err, + ) + .into(), ); } _ => { pl.set_error( io::Error::new( - io::ErrorKind::Other, + io::ErrorKind::Unsupported, "unexpected h2 message", ) .into(), @@ -216,7 +219,7 @@ async fn get_response( } } _ => Err(SendRequestError::Error(Box::new(io::Error::new( - io::ErrorKind::Other, + io::ErrorKind::Unsupported, "unexpected h2 message", )))), } diff --git a/ntex/src/http/encoding/encoder.rs b/ntex/src/http/encoding/encoder.rs index 92003e60..7c24edf3 100644 --- a/ntex/src/http/encoding/encoder.rs +++ b/ntex/src/http/encoding/encoder.rs @@ -117,7 +117,7 @@ impl MessageBody for Encoder { Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Err(Box::new(e)))), Poll::Ready(Err(_)) => { return Poll::Ready(Some(Err(Box::new(io::Error::new( - io::ErrorKind::Other, + io::ErrorKind::Interrupted, "Canceled", ))))); } diff --git a/ntex/src/http/error.rs b/ntex/src/http/error.rs index 85642d6c..f1eba14a 100644 --- a/ntex/src/http/error.rs +++ b/ntex/src/http/error.rs @@ -217,7 +217,7 @@ pub enum BlockingError { impl From for PayloadError { fn from(_: crate::rt::JoinError) -> Self { PayloadError::Io(io::Error::new( - io::ErrorKind::Other, + io::ErrorKind::Interrupted, "Operation is canceled", )) } @@ -228,7 +228,7 @@ impl From> for PayloadError { match err { BlockingError::Error(e) => PayloadError::Io(e), BlockingError::Canceled => PayloadError::Io(io::Error::new( - io::ErrorKind::Other, + io::ErrorKind::Interrupted, "Operation is canceled", )), } diff --git a/ntex/src/http/h2/service.rs b/ntex/src/http/h2/service.rs index 2ca77f4e..8c32e3f4 100644 --- a/ntex/src/http/h2/service.rs +++ b/ntex/src/http/h2/service.rs @@ -408,7 +408,9 @@ where h2::MessageKind::Disconnect(err) => { log::debug!("Connection is disconnected {:?}", err); if let Some(mut sender) = self.streams.borrow_mut().remove(&stream.id()) { - sender.set_error(io::Error::new(io::ErrorKind::Other, err).into()); + sender.set_error( + io::Error::new(io::ErrorKind::UnexpectedEof, err).into(), + ); } return Ok(()); } diff --git a/ntex/src/web/server.rs b/ntex/src/web/server.rs index 9efe14cb..5b774b97 100644 --- a/ntex/src/web/server.rs +++ b/ntex/src/web/server.rs @@ -467,7 +467,7 @@ where Err(e) } else { Err(io::Error::new( - io::ErrorKind::Other, + io::ErrorKind::InvalidInput, "Cannot bind to address.", )) } diff --git a/ntex/src/ws/transport.rs b/ntex/src/ws/transport.rs index f6e27f7a..78473fab 100644 --- a/ntex/src/ws/transport.rs +++ b/ntex/src/ws/transport.rs @@ -54,7 +54,7 @@ impl WsTransport { Ok(()) } else { self.insert_flags(Flags::PROTO_ERR); - Err(io::Error::new(io::ErrorKind::Other, err_message)) + Err(io::Error::new(io::ErrorKind::InvalidData, err_message)) } } } @@ -96,7 +96,7 @@ impl FilterLayer for WsTransport { self.codec.decode_vec(&mut src).map_err(|e| { log::trace!("Failed to decode ws codec frames: {:?}", e); self.insert_flags(Flags::PROTO_ERR); - io::Error::new(io::ErrorKind::Other, e) + io::Error::new(io::ErrorKind::InvalidData, e) })? { frame } else { @@ -123,14 +123,14 @@ impl FilterLayer for WsTransport { Frame::Continuation(Item::FirstText(_)) => { self.insert_flags(Flags::PROTO_ERR); return Err(io::Error::new( - io::ErrorKind::Other, + io::ErrorKind::InvalidData, "WebSocket Text continuation frames are not supported", )); } Frame::Text(_) => { self.insert_flags(Flags::PROTO_ERR); return Err(io::Error::new( - io::ErrorKind::Other, + io::ErrorKind::InvalidData, "WebSockets Text frames are not supported", )); } From 6f570638a83c62d60180a8ca96ad6b63691141b6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 5 Dec 2024 13:42:41 +0500 Subject: [PATCH 3/3] wip --- ntex/src/http/test.rs | 2 +- ntex/src/web/test.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index 1a395eae..8a1dd17c 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -257,7 +257,7 @@ where server, client: Client::build().finish(), } - .set_client_timeout(Seconds(30), Millis(30_000)) + .set_client_timeout(Seconds(45), Millis(45_000)) } #[derive(Debug)] diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index 20c65fd0..c75aad23 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -717,8 +717,8 @@ where .map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e)); Connector::default() .lifetime(Seconds::ZERO) - .keep_alive(Seconds(30)) - .timeout(Millis(30_000)) + .keep_alive(Seconds(45)) + .timeout(Millis(45_000)) .disconnect_timeout(Seconds(5)) .openssl(builder.build()) .finish() @@ -727,14 +727,14 @@ where { Connector::default() .lifetime(Seconds::ZERO) - .timeout(Millis(30_000)) + .timeout(Millis(45_000)) .finish() } }; Client::build() .connector(connector) - .timeout(Seconds(30)) + .timeout(Seconds(45)) .finish() };