Skip to content

Commit

Permalink
Fixes for production websockets
Browse files Browse the repository at this point in the history
This patch changes the way we construct the websocket clietn object to
be compatible with https servers and avoid http2 initial negotiation
which doesn't support connection upgrade to websockets.

This patch also corrects a nesting issue where the websocket client was
re-created each loop iteration instead of being maintained over multiple
loops.
  • Loading branch information
jkilpatr committed Sep 17, 2024
1 parent 7f100c9 commit 8e6ac0d
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 83 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions rita_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ futures = { version = "0.3", features = ["compat"] }
tokio = "1.39"
actix-web-actors = "4.3"

[dev-dependencies]
actix-rt = "2.10"

[lib]
name = "rita_client"
path = "src/lib.rs"

[features]
# changes operator urls
operator_debug = []
Expand Down
180 changes: 97 additions & 83 deletions rita_client/src/operator_update/ops_websocket.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::{
str, thread,
time::{Duration, Instant},
use crate::operator_update::{
get_client_mbps, get_exit_con, get_hardware_info_update, get_neighbor_info, get_relay_mbps,
get_rita_uptime, get_user_bandwidth_usage, handle_operator_update,
};

use actix_async::System;
use actix_web_actors::ws;
use althea_types::{
Expand All @@ -15,12 +14,11 @@ use settings::{
get_billing_details, get_contact_info, get_install_details, get_operator_address,
get_system_chain, get_user_bandwidth_limit,
};
use tokio::time::timeout;

use crate::operator_update::{
get_client_mbps, get_exit_con, get_hardware_info_update, get_neighbor_info, get_relay_mbps,
get_rita_uptime, get_user_bandwidth_usage, handle_operator_update,
use std::{
str, thread,
time::{Duration, Instant},
};
use tokio::time::timeout;

/// This function spawns a thread solely responsible for performing the websocket operator update
pub fn start_operator_socket_update_loop() {
Expand Down Expand Up @@ -52,85 +50,101 @@ pub fn send_websocket_update() {
thread::spawn(move || {
// this will always be an error, so it's really just a loop statement
// with some fancy destructuring
while let Err(e) =
{
thread::spawn(move || loop {
let runner = System::new();
runner.block_on(async move {
let res = awc::Client::new().ws(url).connect().await;
match res {
Ok((res, mut ws)) => {
info!("Websocket actor is connected {:?}", res);
let mut ops_last_seen_usage_hour: Option<u64> = None;
// we only need to get the identity once
let rita_client = settings::get_rita_client();
let id = rita_client.get_identity().unwrap();

let mut ten_minute_timer: Instant = Instant::now();
let mut five_minute_timer: Instant = Instant::now();
while let Err(e) = {
thread::spawn(move || {
let runner = System::new();
runner.block_on(async move {
let client = awc::Client::builder()
.max_http_version(awc::http::Version::HTTP_11)
.finish();
loop {
// check if there is anything to read first
while let Ok(Some(msg)) = timeout(SOCKET_CHECKER_TIMEOUT, ws.next()).await {
let msg = msg.unwrap();
if let Some(hour) = handle_received_operator_message(msg) {
ops_last_seen_usage_hour = Some(hour);
}
}

// then send over new checkin data where applicable
if Instant::now() - ten_minute_timer > TEN_MINUTES {
info!("Ten minutes have passed, sending data to operator server");
let messages = get_ten_minute_update_data(id);
for message in messages {
// if this unwrap panics, send has failed because the socket has disconnected;
// the thread will simply reconnect the socket and retry
ws.send(message).await.unwrap();
info!("Websocket connecting to {:?}", url);
let res = client.ws(url).connect().await;
match res {
Ok((res, mut ws)) => {
info!("Websocket actor is connected {:?}", res);
let mut ops_last_seen_usage_hour: Option<u64> = None;
// we only need to get the identity once
let rita_client = settings::get_rita_client();
let id = rita_client.get_identity().unwrap();

let mut ten_minute_timer: Instant = Instant::now();
let mut five_minute_timer: Instant = Instant::now();
loop {
// check if there is anything to read first
while let Ok(Some(msg)) =
timeout(SOCKET_CHECKER_TIMEOUT, ws.next()).await
{
// we will panic here with a connection reset if the socket has disconnected
// tht will then fall out of this loop into the outer loop, restarting the whole thing
// and reconnecting the socket
let msg = msg.unwrap();
if let Some(hour) = handle_received_operator_message(msg) {
ops_last_seen_usage_hour = Some(hour);
}
}

// then send over new checkin data where applicable
if Instant::now() - ten_minute_timer > TEN_MINUTES {
info!(
"Ten minutes have passed, sending data to operator server"
);
let messages = get_ten_minute_update_data(id);
for message in messages {
// if this unwrap panics, send has failed because the socket has disconnected;
// the thread will simply reconnect the socket and retry
ws.send(message).await.unwrap();
}
info!("Ten minute websocket update sent");
ten_minute_timer = Instant::now();
}
if Instant::now() - five_minute_timer > FIVE_MINUTES {
info!(
"Five minutes have passed, sending data to operator server"
);
let messages = get_five_minute_update_data(
id,
ops_last_seen_usage_hour,
);
for message in messages {
ws.send(message).await.unwrap();
}
info!("Five minute websocket update sent");
five_minute_timer = Instant::now();
}
// the rest of these are 10 second interval updates and run every iteration of the loop
let messages = get_ten_second_update_data(id);
for message in messages {
ws.send(message).await.unwrap();
}
info!("Ten second websocket update sent");

// check again for any responses to read
while let Ok(Some(msg)) =
timeout(SOCKET_CHECKER_TIMEOUT, ws.next()).await
{
let msg = msg.unwrap();
if let Some(hour) = handle_received_operator_message(msg) {
ops_last_seen_usage_hour = Some(hour);
}
}
info!("Sleeping until next checkin...");
thread::sleep(SOCKET_UPDATE_FREQUENCY);
}
}
info!("Ten minute websocket update sent");
ten_minute_timer = Instant::now();
}
if Instant::now() - five_minute_timer > FIVE_MINUTES {
info!("Five minutes have passed, sending data to operator server");
let messages =
get_five_minute_update_data(id, ops_last_seen_usage_hour);
for message in messages {
ws.send(message).await.unwrap();
}
info!("Five minute websocket update sent");
five_minute_timer = Instant::now();
}
// the rest of these are 10 second interval updates and run every iteration of the loop
let messages = get_ten_second_update_data(id);
for message in messages {
ws.send(message).await.unwrap();
}
info!("Ten second websocket update sent");

// check again for any responses to read
while let Ok(Some(msg)) = timeout(SOCKET_CHECKER_TIMEOUT, ws.next()).await {
let msg = msg.unwrap();
if let Some(hour) = handle_received_operator_message(msg) {
ops_last_seen_usage_hour = Some(hour);
Err(e) => {
error!(
"Failed to connect to websocket; attempting to restart loop... {:?}",
e
);
thread::sleep(SOCKET_UPDATE_FREQUENCY);
}
}
info!("Sleeping until next checkin...");
thread::sleep(SOCKET_UPDATE_FREQUENCY);
}
}
Err(e) => {
error!(
"Failed to connect to websocket; attempting to restart loop... {:?}",
e
);
thread::sleep(SOCKET_UPDATE_FREQUENCY);
}
}
});
info!("Restarting websocket loop...");
})
.join()
}
{
});
})
.join()
} {
error!("Websocket loop thread panicked! Respawning {:?}", e);
}
});
Expand Down

0 comments on commit 8e6ac0d

Please sign in to comment.