Skip to content

Commit

Permalink
Http ws nonjsonrpc support (#554)
Browse files Browse the repository at this point in the history
* feat: Supporting Non Json Rpc HTTP/WS Brokers

* feat: add unsubscribe support

* feat: Make subscription more cleaner

* feat: add more coverag

* feat: Add mock websocket support

* feat: Add cleanup tests

* fix: mock unit test

* fix: cleaner mock

* fix: add more refactoring

* fix: Changes to support subscription

* fix: Handle cases where handshake is unsuccessful

* feat: Add resiliency support for WS

* fix: for reconnect

* fix: unit tests
  • Loading branch information
satlead committed Jul 15, 2024
1 parent b9600e6 commit a312c07
Show file tree
Hide file tree
Showing 17 changed files with 1,109 additions and 268 deletions.
6 changes: 3 additions & 3 deletions core/main/src/bootstrap/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ use super::{
/// 4. [LoadExtensionsStep] - Loads the Extensions in to [crate::state::extn_state::ExtnState]
/// 5. [StartExtnChannelsStep] - Starts the Device channel extension
/// 6. [StartAppManagerStep] - Starts the App Manager and other supporting services
/// 7. [LoadDistributorValuesStep] - Loads the values from distributor like Session
/// 8. [StartOtherBrokers] - Start Other brokers if they are setup in endpoints for rules
/// 7. [StartOtherBrokers] - Start Other brokers if they are setup in endpoints for rules
/// 8. [LoadDistributorValuesStep] - Loads the values from distributor like Session
/// 9. [CheckLauncherStep] - Checks the presence of launcher extension and starts default app
/// 10. [StartWsStep] - Starts the Websocket to accept external and internal connections
/// 11. [FireboltGatewayStep] - Starts the firebolt gateway and blocks the thread to keep it alive till interruption.
Expand All @@ -68,8 +68,8 @@ pub async fn boot(state: BootstrapState) -> RippleResponse {
execute_step(LoadExtensionsStep, &bootstrap).await?;
execute_step(StartExtnChannelsStep, &bootstrap).await?;
execute_step(StartAppManagerStep, &bootstrap).await?;
execute_step(LoadDistributorValuesStep, &bootstrap).await?;
execute_step(StartOtherBrokers, &bootstrap).await?;
execute_step(LoadDistributorValuesStep, &bootstrap).await?;
execute_step(CheckLauncherStep, &bootstrap).await?;
execute_step(StartWsStep, &bootstrap).await?;
execute_step(FireboltGatewayStep, &bootstrap).await?;
Expand Down
8 changes: 8 additions & 0 deletions core/main/src/bootstrap/start_communication_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use ripple_sdk::{
};

use crate::broker::endpoint_broker::BrokerOutputForwarder;
use crate::processor::rpc_gateway_processor::RpcGatewayProcessor;
use crate::state::bootstrap_state::BootstrapState;

pub struct StartCommunicationBroker;
Expand All @@ -32,6 +33,13 @@ impl Bootstep<BootstrapState> for StartCommunicationBroker {

async fn setup(&self, state: BootstrapState) -> Result<(), RippleError> {
let ps = state.platform_state.clone();
// When endpoint broker starts up enable RPC processor there might be internal services which might need
// brokering data
state
.platform_state
.get_client()
.add_request_processor(RpcGatewayProcessor::new(state.platform_state.get_client()));

// Start the Broker Reciever
if let Ok(rx) = state.channels_state.get_broker_receiver() {
BrokerOutputForwarder::start_forwarder(ps.clone(), rx)
Expand Down
9 changes: 1 addition & 8 deletions core/main/src/bootstrap/start_fbgateway_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::{
},
rpc::RippleRPCProvider,
},
processor::rpc_gateway_processor::RpcGatewayProcessor,
service::telemetry_builder::TelemetryBuilder,
state::{bootstrap_state::BootstrapState, platform_state::PlatformState},
};
Expand Down Expand Up @@ -102,12 +101,6 @@ impl Bootstep<BootstrapState> for FireboltGatewayStep {
.await;
let gateway = FireboltGateway::new(state.clone(), methods);
debug!("Handlers initialized");
// Main can now recieve RPC requests
state
.platform_state
.get_client()
.add_request_processor(RpcGatewayProcessor::new(state.platform_state.get_client()));
debug!("Adding RPC gateway processor");
#[cfg(feature = "sysd")]
if sd_notify::booted().is_ok()
&& sd_notify::notify(false, &[sd_notify::NotifyState::Ready]).is_err()
Expand All @@ -117,6 +110,6 @@ impl Bootstep<BootstrapState> for FireboltGatewayStep {
TelemetryBuilder::send_ripple_telemetry(&state.platform_state);
gateway.start().await;

Ok(())
Err(RippleError::ServiceError)
}
}
69 changes: 69 additions & 0 deletions core/main/src/broker/broker_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2023 Comcast Cable Communications Management, LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
//

use std::time::Duration;

use crate::utils::rpc_utils::extract_tcp_port;
use futures::stream::{SplitSink, SplitStream};
use futures_util::StreamExt;
use ripple_sdk::{
log::{error, info},
tokio::{self, net::TcpStream},
};
use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream};

pub struct BrokerUtils;

impl BrokerUtils {
pub async fn get_ws_broker(
endpoint: &str,
alias: Option<String>,
) -> (
SplitSink<WebSocketStream<TcpStream>, Message>,
SplitStream<WebSocketStream<TcpStream>>,
) {
info!("Broker Endpoint url {}", endpoint);
let url_path = if let Some(a) = alias {
format!("{}{}", endpoint, a)
} else {
endpoint.to_owned()
};
let url = url::Url::parse(&url_path).unwrap();
let port = extract_tcp_port(endpoint);
info!("Url host str {}", url.host_str().unwrap());
let mut index = 0;

loop {
// Try connecting to the tcp port first
if let Ok(v) = TcpStream::connect(&port).await {
// Setup handshake for websocket with the tcp port
// Some WS servers lock on to the Port but not setup handshake till they are fully setup
if let Ok((stream, _)) = client_async(url_path.clone(), v).await {
break stream.split();
}
}
if (index % 10).eq(&0) {
error!(
"Broker with {} failed with retry for last {} secs in {}",
url_path, index, port
);
}
index += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
Loading

0 comments on commit a312c07

Please sign in to comment.