Skip to content

Commit

Permalink
Fix async reconnect test
Browse files Browse the repository at this point in the history
  • Loading branch information
rageshkrishna committed Mar 19, 2024
1 parent b2073c9 commit 167edd4
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions socketio/src/asynchronous/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ impl Client {
let mut stream = client_clone.as_stream();
// Consume the stream until it returns None and the stream is closed.
while let Some(item) = stream.next().await {
if let e @ Err(Error::IncompleteResponseFromEngineIo(_)) = item {
trace!("Network error occurred: {}", e.unwrap_err());
if let Err(e) = item {
trace!("Network error occurred: {}", e);
}
}

Expand Down Expand Up @@ -233,7 +233,7 @@ impl Client {
/// }
/// ```
pub async fn disconnect(&self) -> Result<()> {
self.manually_disconnected.store(true, Ordering::Relaxed);
self.manually_disconnected.store(true, Ordering::Release);

let disconnect_packet =
Packet::new(PacketId::Disconnect, self.nsp.clone(), None, None, 0, None);
Expand Down Expand Up @@ -500,14 +500,15 @@ impl Client {
pub(crate) fn as_stream<'a>(
&'a self,
) -> Pin<Box<dyn Stream<Item = Result<Packet>> + Send + 'a>> {
let socket_clone = {
let s = self.socket.try_read().expect("Socket must be readable");
(*s).clone()
};
let socket_clone = self.socket.clone();

stream::unfold(socket_clone, |mut socket| async {
stream::unfold(socket_clone, |socket| async {
let mut socket_read = {
let s = socket.read().await;
(*s).clone()
};
// wait for the next payload
let packet: Option<std::result::Result<Packet, Error>> = socket.next().await;
let packet: Option<std::result::Result<Packet, Error>> = socket_read.next().await;
match packet {
// end the stream if the underlying one is closed
None => None,
Expand Down Expand Up @@ -552,7 +553,7 @@ mod test {
asynchronous::client::{builder::ClientBuilder, client::Client},
error::Result,
packet::{Packet, PacketId},
Event, Payload, TransportType,
Payload, TransportType,
};

#[tokio::test]
Expand Down Expand Up @@ -735,7 +736,7 @@ mod test {
let socket = socket.unwrap();

// waiting for server to emit message
std::thread::sleep(std::time::Duration::from_millis(500));
sleep(Duration::from_millis(500)).await;

assert_eq!(load(&CONNECT_NUM), 1, "should connect once");
assert_eq!(load(&MESSAGE_NUM), 1, "should receive one");
Expand All @@ -745,7 +746,7 @@ mod test {

// waiting for server to restart
for _ in 0..10 {
std::thread::sleep(std::time::Duration::from_millis(400));
sleep(Duration::from_millis(400)).await;
if load(&CONNECT_NUM) == 2 && load(&MESSAGE_NUM) == 2 {
break;
}
Expand Down

0 comments on commit 167edd4

Please sign in to comment.