From 167edd45dd9501715bf21bea4350ffdbb425f46d Mon Sep 17 00:00:00 2001 From: Ragesh Krishna Date: Tue, 19 Mar 2024 13:18:41 +0530 Subject: [PATCH] Fix async reconnect test --- socketio/src/asynchronous/client/client.rs | 25 +++++++++++----------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/socketio/src/asynchronous/client/client.rs b/socketio/src/asynchronous/client/client.rs index 1f9cf057..2043f97f 100644 --- a/socketio/src/asynchronous/client/client.rs +++ b/socketio/src/asynchronous/client/client.rs @@ -107,8 +107,8 @@ impl Client { let mut stream = client_clone.as_stream(); // Consume the stream until it returns None and the stream is closed. while let Some(item) = stream.next().await { - if let e @ Err(Error::IncompleteResponseFromEngineIo(_)) = item { - trace!("Network error occurred: {}", e.unwrap_err()); + if let Err(e) = item { + trace!("Network error occurred: {}", e); } } @@ -233,7 +233,7 @@ impl Client { /// } /// ``` pub async fn disconnect(&self) -> Result<()> { - self.manually_disconnected.store(true, Ordering::Relaxed); + self.manually_disconnected.store(true, Ordering::Release); let disconnect_packet = Packet::new(PacketId::Disconnect, self.nsp.clone(), None, None, 0, None); @@ -500,14 +500,15 @@ impl Client { pub(crate) fn as_stream<'a>( &'a self, ) -> Pin> + Send + 'a>> { - let socket_clone = { - let s = self.socket.try_read().expect("Socket must be readable"); - (*s).clone() - }; + let socket_clone = self.socket.clone(); - stream::unfold(socket_clone, |mut socket| async { + stream::unfold(socket_clone, |socket| async { + let mut socket_read = { + let s = socket.read().await; + (*s).clone() + }; // wait for the next payload - let packet: Option> = socket.next().await; + let packet: Option> = socket_read.next().await; match packet { // end the stream if the underlying one is closed None => None, @@ -552,7 +553,7 @@ mod test { asynchronous::client::{builder::ClientBuilder, client::Client}, error::Result, packet::{Packet, PacketId}, - Event, Payload, TransportType, + Payload, TransportType, }; #[tokio::test] @@ -735,7 +736,7 @@ mod test { let socket = socket.unwrap(); // waiting for server to emit message - std::thread::sleep(std::time::Duration::from_millis(500)); + sleep(Duration::from_millis(500)).await; assert_eq!(load(&CONNECT_NUM), 1, "should connect once"); assert_eq!(load(&MESSAGE_NUM), 1, "should receive one"); @@ -745,7 +746,7 @@ mod test { // waiting for server to restart for _ in 0..10 { - std::thread::sleep(std::time::Duration::from_millis(400)); + sleep(Duration::from_millis(400)).await; if load(&CONNECT_NUM) == 2 && load(&MESSAGE_NUM) == 2 { break; }