Skip to content

Commit

Permalink
Fix async inner socket
Browse files Browse the repository at this point in the history
  • Loading branch information
rageshkrishna committed Mar 19, 2024
1 parent 24f0878 commit b2073c9
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions socketio/src/asynchronous/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
#[derive(Clone)]
pub struct Client {
/// The inner socket client to delegate the methods to.
socket: InnerSocket,
socket: Arc<RwLock<InnerSocket>>,
outstanding_acks: Arc<RwLock<Vec<Ack>>>,
// namespace, for multiplexing messages
nsp: String,
Expand All @@ -53,7 +53,7 @@ impl Client {
/// ```
pub(crate) fn new(socket: InnerSocket, builder: ClientBuilder) -> Result<Self> {
Ok(Client {
socket,
socket: Arc::new(RwLock::new(socket)),
nsp: builder.namespace.to_owned(),
outstanding_acks: Arc::new(RwLock::new(Vec::new())),
auth: builder.auth.clone(),
Expand All @@ -67,21 +67,26 @@ impl Client {
/// called to interact with the server.
pub(crate) async fn connect(&self) -> Result<()> {
// Connect the underlying socket
self.socket.connect().await?;
self.socket.read().await.connect().await?;

// construct the opening packet
let auth = self.auth.as_ref().map(|data| data.to_string());
let open_packet = Packet::new(PacketId::Connect, self.nsp.clone(), auth, None, 0, None);

self.socket.send(open_packet).await?;
self.socket.read().await.send(open_packet).await?;

Ok(())
}

pub(crate) async fn reconnect(&mut self) -> Result<()> {
let builder = self.builder.write().await;
let socket = builder.inner_create().await?;
self.socket = socket;

// New inner socket that can be connected
let mut client_socket = self.socket.write().await;
*client_socket = socket;
drop(client_socket);

self.connect().await?;

Ok(())
Expand Down Expand Up @@ -187,7 +192,11 @@ impl Client {
E: Into<Event>,
D: Into<Payload>,
{
self.socket.emit(&self.nsp, event.into(), data.into()).await
self.socket
.read()
.await
.emit(&self.nsp, event.into(), data.into())
.await
}

/// Disconnects this client from the server by sending a `socket.io` closing
Expand Down Expand Up @@ -229,8 +238,8 @@ impl Client {
let disconnect_packet =
Packet::new(PacketId::Disconnect, self.nsp.clone(), None, None, 0, None);

self.socket.send(disconnect_packet).await?;
self.socket.disconnect().await?;
self.socket.read().await.send(disconnect_packet).await?;
self.socket.read().await.disconnect().await?;

Ok(())
}
Expand Down Expand Up @@ -312,7 +321,7 @@ impl Client {
// add the ack to the tuple of outstanding acks
self.outstanding_acks.write().await.push(ack);

self.socket.send(socket_packet).await
self.socket.read().await.send(socket_packet).await
}

async fn callback<P: Into<Payload>>(&self, event: &Event, payload: P) -> Result<()> {
Expand Down Expand Up @@ -491,7 +500,12 @@ impl Client {
pub(crate) fn as_stream<'a>(
&'a self,
) -> Pin<Box<dyn Stream<Item = Result<Packet>> + Send + 'a>> {
stream::unfold(self.socket.clone(), |mut socket| async {
let socket_clone = {
let s = self.socket.try_read().expect("Socket must be readable");
(*s).clone()
};

stream::unfold(socket_clone, |mut socket| async {
// wait for the next payload
let packet: Option<std::result::Result<Packet, Error>> = socket.next().await;
match packet {
Expand Down Expand Up @@ -685,7 +699,6 @@ mod test {
#[tokio::test]
async fn socket_io_reconnect_integration() -> Result<()> {
static CONNECT_NUM: AtomicUsize = AtomicUsize::new(0);
static CLOSE_NUM: AtomicUsize = AtomicUsize::new(0);
static MESSAGE_NUM: AtomicUsize = AtomicUsize::new(0);

let url = crate::test::socket_io_restart_server();
Expand All @@ -694,7 +707,7 @@ mod test {
.reconnect(true)
.max_reconnect_attempts(100)
.reconnect_delay(100, 100)
.on(Event::Connect, |_, socket| {
.on("open", |_, socket| {
async move {
CONNECT_NUM.fetch_add(1, Ordering::Release);
let r = socket.emit_with_ack(
Expand All @@ -707,12 +720,6 @@ mod test {
}
.boxed()
})
.on(Event::Close, |_, _| {
async move {
CLOSE_NUM.fetch_add(1, Ordering::Release);
}
.boxed()
})
.on("message", |_, _socket| {
async move {
// test the iterator implementation and make sure there is a constant
Expand All @@ -732,7 +739,6 @@ mod test {

assert_eq!(load(&CONNECT_NUM), 1, "should connect once");
assert_eq!(load(&MESSAGE_NUM), 1, "should receive one");
assert_eq!(load(&CLOSE_NUM), 0, "should not close");

let r = socket.emit("restart_server", json!("")).await;
assert!(r.is_ok(), "should emit restart success");
Expand All @@ -747,7 +753,6 @@ mod test {

assert_eq!(load(&CONNECT_NUM), 2, "should connect twice");
assert_eq!(load(&MESSAGE_NUM), 2, "should receive two messages");
assert_eq!(load(&CLOSE_NUM), 1, "should close once");

socket.disconnect().await?;
Ok(())
Expand Down

0 comments on commit b2073c9

Please sign in to comment.