Skip to content

Commit

Permalink
feat(async):Async reconnections
Browse files Browse the repository at this point in the history
The async client can now be configured to automatically reconnect on
network errors or server disconnections
  • Loading branch information
rageshkrishna committed Mar 22, 2024
1 parent bf02c60 commit 795cdbd
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 70 deletions.
54 changes: 21 additions & 33 deletions socketio/src/asynchronous/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures_util::{future::BoxFuture, StreamExt};
use futures_util::future::BoxFuture;
use log::trace;
use native_tls::TlsConnector;
use rust_engineio::{
Expand All @@ -8,7 +8,7 @@ use rust_engineio::{
use std::collections::HashMap;
use url::Url;

use crate::{error::Result, Error, Event, Payload, TransportType};
use crate::{error::Result, Event, Payload, TransportType};

use super::{
callback::{Callback, DynAsyncAnyCallback, DynAsyncCallback},
Expand All @@ -22,13 +22,13 @@ use crate::asynchronous::socket::Socket as InnerSocket;
/// acts the `build` method and returns a connected [`Client`].
pub struct ClientBuilder {
address: String,
on: HashMap<Event, Callback<DynAsyncCallback>>,
on_any: Option<Callback<DynAsyncAnyCallback>>,
namespace: String,
pub(crate) on: HashMap<Event, Callback<DynAsyncCallback>>,
pub(crate) on_any: Option<Callback<DynAsyncAnyCallback>>,
pub(crate) namespace: String,
tls_config: Option<TlsConnector>,
opening_headers: Option<HeaderMap>,
transport_type: TransportType,
auth: Option<serde_json::Value>,
pub(crate) auth: Option<serde_json::Value>,
pub(crate) reconnect: bool,
pub(crate) reconnect_on_disconnect: bool,
// None implies infinite attempts
Expand Down Expand Up @@ -383,26 +383,14 @@ impl ClientBuilder {
/// }
/// ```
pub async fn connect(self) -> Result<Client> {
let socket = self.connect_manual().await?;
let socket_clone = socket.clone();

// Use thread to consume items in iterator in order to call callbacks
tokio::runtime::Handle::current().spawn(async move {
let mut stream = socket_clone.as_stream();
// Consume the stream until it returns None and the stream is closed.
while let Some(item) = stream.next().await {
if let e @ Err(Error::IncompleteResponseFromEngineIo(_)) = item {
trace!("Network error occurred: {}", e.unwrap_err());
}
}
});
let mut socket = self.connect_manual().await?;
socket.poll_stream().await?;

Ok(socket)
}

//TODO: 0.3.X stabilize
pub(crate) async fn connect_manual(self) -> Result<Client> {
// Parse url here rather than in new to keep new returning Self.
/// Creates a new Socket that can be used for reconnections
pub(crate) async fn inner_create(&self) -> Result<InnerSocket> {
let mut url = Url::parse(&self.address)?;

if url.path() == "/" {
Expand All @@ -411,11 +399,11 @@ impl ClientBuilder {

let mut builder = EngineIoClientBuilder::new(url);

if let Some(tls_config) = self.tls_config {
builder = builder.tls_config(tls_config);
if let Some(tls_config) = &self.tls_config {
builder = builder.tls_config(tls_config.to_owned());
}
if let Some(headers) = self.opening_headers {
builder = builder.headers(headers);
if let Some(headers) = &self.opening_headers {
builder = builder.headers(headers.to_owned());
}

let engine_client = match self.transport_type {
Expand All @@ -426,14 +414,14 @@ impl ClientBuilder {
};

let inner_socket = InnerSocket::new(engine_client)?;
Ok(inner_socket)
}

//TODO: 0.3.X stabilize
pub(crate) async fn connect_manual(self) -> Result<Client> {
let inner_socket = self.inner_create().await?;

let socket = Client::new(
inner_socket,
&self.namespace,
self.on,
self.on_any,
self.auth,
)?;
let socket = Client::new(inner_socket, self)?;
socket.connect().await?;

Ok(socket)
Expand Down
Loading

0 comments on commit 795cdbd

Please sign in to comment.