Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Always pass port to jsonrpsee WebSocket client #2339

Merged
merged 3 commits into from
Mar 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 62 additions & 7 deletions client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ pub struct ReconnectingWsClient {
to_worker_channel: TokioSender<RpcDispatcherMessage>,
}

/// Format url and force addition of a port
fn url_to_string_with_port(url: Url) -> Option<String> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you just use set_port?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_port checks internally and drops the port if it is the default one. Basically the url standard says that the port should be null if it is the default one. jsonrpsee behaves out of line here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not just fix jsonrpsee?

Copy link
Contributor Author

@skunert skunert Mar 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, this is a quick fix to make it work for now. But yeah I will open an issue 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue about this, paritytech/jsonrpsee#1048.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bkchr can we go forward with this? If this gets changed in jsonrpsee I will adjust here again, but for now this is an easy fix and I would like to have this functionality working asap.

// This is already validated on CLI side, just defensive here
if (url.scheme() != "ws" && url.scheme() != "wss") || url.host_str().is_none() {
tracing::warn!(target: LOG_TARGET, ?url, "Non-WebSocket URL or missing host.");
return None
}

// Either we have a user-supplied port or use the default for 'ws' or 'wss' here
Some(format!(
"{}://{}:{}{}{}",
url.scheme(),
url.host_str()?,
url.port_or_known_default()?,
url.path(),
url.query().map(|query| format!("?{}", query)).unwrap_or_default()
skunert marked this conversation as resolved.
Show resolved Hide resolved
))
}

impl ReconnectingWsClient {
/// Create a new websocket client frontend.
pub async fn new(urls: Vec<Url>, task_manager: &mut TaskManager) -> RelayChainResult<Self> {
Expand Down Expand Up @@ -144,7 +163,7 @@ impl ReconnectingWsClient {

/// Worker that should be used in combination with [`RelayChainRpcClient`]. Must be polled to distribute header notifications to listeners.
struct ReconnectingWebsocketWorker {
ws_urls: Vec<Url>,
ws_urls: Vec<String>,
/// Communication channel with the RPC client
client_receiver: TokioReceiver<RpcDispatcherMessage>,

Expand Down Expand Up @@ -176,7 +195,7 @@ fn distribute_header(header: RelayHeader, senders: &mut Vec<Sender<RelayHeader>>
/// and reconnections.
#[derive(Debug)]
struct ClientManager {
urls: Vec<Url>,
urls: Vec<String>,
active_client: Arc<JsonRpcClient>,
active_index: usize,
}
Expand All @@ -189,7 +208,7 @@ struct RelayChainSubscriptions {

/// Try to find a new RPC server to connect to.
async fn connect_next_available_rpc_server(
urls: &Vec<Url>,
urls: &Vec<String>,
starting_position: usize,
) -> Result<(usize, Arc<JsonRpcClient>), ()> {
tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server.");
Expand All @@ -198,18 +217,19 @@ async fn connect_next_available_rpc_server(
tracing::info!(
target: LOG_TARGET,
index,
?url,
url,
"Trying to connect to next external relaychain node.",
);
if let Ok(ws_client) = WsClientBuilder::default().build(url).await {
return Ok((index, Arc::new(ws_client)))
match WsClientBuilder::default().build(&url).await {
Ok(ws_client) => return Ok((index, Arc::new(ws_client))),
Err(err) => tracing::debug!(target: LOG_TARGET, url, ?err, "Unable to connect."),
};
}
Err(())
}

impl ClientManager {
pub async fn new(urls: Vec<Url>) -> Result<Self, ()> {
pub async fn new(urls: Vec<String>) -> Result<Self, ()> {
if urls.is_empty() {
return Err(())
}
Expand Down Expand Up @@ -325,6 +345,8 @@ impl ReconnectingWebsocketWorker {
async fn new(
urls: Vec<Url>,
) -> (ReconnectingWebsocketWorker, TokioSender<RpcDispatcherMessage>) {
let urls = urls.into_iter().filter_map(url_to_string_with_port).collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This silently drops urls without a port.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean either the url has a user-provided port or we use the default. If the URL has no host, it fails at initial parsing already. So this should never filter something out. I will add a log message in that case however, since filtering out anything would be undesired (but passing it on is also undesired since jsonrpsee will also not take it without a port).


let (tx, rx) = tokio_channel(100);
let worker = ReconnectingWebsocketWorker {
ws_urls: urls,
Expand Down Expand Up @@ -518,3 +540,36 @@ impl ReconnectingWebsocketWorker {
}
}
}

#[cfg(test)]
mod test {
use super::url_to_string_with_port;
use url::Url;

#[test]
fn url_to_string_works() {
let url = Url::parse("wss://something/path").unwrap();
assert_eq!(Some("wss://something:443/path".to_string()), url_to_string_with_port(url));

let url = Url::parse("ws://something/path").unwrap();
assert_eq!(Some("ws://something:80/path".to_string()), url_to_string_with_port(url));

let url = Url::parse("wss://something:100/path").unwrap();
assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url));

let url = Url::parse("wss://something:100/path").unwrap();
assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url));

let url = Url::parse("wss://something/path?query=yes").unwrap();
assert_eq!(
Some("wss://something:443/path?query=yes".to_string()),
url_to_string_with_port(url)
);

let url = Url::parse("wss://something:9090/path?query=yes").unwrap();
assert_eq!(
Some("wss://something:9090/path?query=yes".to_string()),
url_to_string_with_port(url)
);
}
}