From a312c07ebee563b836e93cdd62b9dfeae83ec100 Mon Sep 17 00:00:00 2001 From: Sathishkumar Date: Mon, 15 Jul 2024 18:15:31 -0400 Subject: [PATCH] Http ws nonjsonrpc support (#554) * feat: Supporting Non Json Rpc HTTP/WS Brokers * feat: add unsubscribe support * feat: Make subscription more cleaner * feat: add more coverag * feat: Add mock websocket support * feat: Add cleanup tests * fix: mock unit test * fix: cleaner mock * fix: add more refactoring * fix: Changes to support subscription * fix: Handle cases where handshake is unsuccessful * feat: Add resiliency support for WS * fix: for reconnect * fix: unit tests --- core/main/src/bootstrap/boot.rs | 6 +- .../bootstrap/start_communication_broker.rs | 8 + .../src/bootstrap/start_fbgateway_step.rs | 9 +- core/main/src/broker/broker_utils.rs | 69 +++ core/main/src/broker/endpoint_broker.rs | 473 +++++++++++++----- core/main/src/broker/http_broker.rs | 33 +- core/main/src/broker/mod.rs | 1 + core/main/src/broker/rules_engine.rs | 8 + core/main/src/broker/thunder_broker.rs | 104 ++-- core/main/src/broker/websocket_broker.rs | 268 ++++++++-- core/main/src/firebolt/firebolt_gateway.rs | 79 ++- core/main/src/service/extn/ripple_client.rs | 3 +- core/main/src/state/platform_state.rs | 4 +- core/main/src/utils/test_utils.rs | 116 ++++- core/sdk/src/api/gateway/rpc_gateway_api.rs | 51 ++ core/sdk/src/extn/client/extn_client.rs | 103 +++- core/sdk/src/extn/extn_client_message.rs | 42 ++ 17 files changed, 1109 insertions(+), 268 deletions(-) create mode 100644 core/main/src/broker/broker_utils.rs diff --git a/core/main/src/bootstrap/boot.rs b/core/main/src/bootstrap/boot.rs index 6ef3c49f6..bc758226d 100644 --- a/core/main/src/bootstrap/boot.rs +++ b/core/main/src/bootstrap/boot.rs @@ -53,8 +53,8 @@ use super::{ /// 4. [LoadExtensionsStep] - Loads the Extensions in to [crate::state::extn_state::ExtnState] /// 5. [StartExtnChannelsStep] - Starts the Device channel extension /// 6. [StartAppManagerStep] - Starts the App Manager and other supporting services -/// 7. [LoadDistributorValuesStep] - Loads the values from distributor like Session -/// 8. [StartOtherBrokers] - Start Other brokers if they are setup in endpoints for rules +/// 7. [StartOtherBrokers] - Start Other brokers if they are setup in endpoints for rules +/// 8. [LoadDistributorValuesStep] - Loads the values from distributor like Session /// 9. [CheckLauncherStep] - Checks the presence of launcher extension and starts default app /// 10. [StartWsStep] - Starts the Websocket to accept external and internal connections /// 11. [FireboltGatewayStep] - Starts the firebolt gateway and blocks the thread to keep it alive till interruption. @@ -68,8 +68,8 @@ pub async fn boot(state: BootstrapState) -> RippleResponse { execute_step(LoadExtensionsStep, &bootstrap).await?; execute_step(StartExtnChannelsStep, &bootstrap).await?; execute_step(StartAppManagerStep, &bootstrap).await?; - execute_step(LoadDistributorValuesStep, &bootstrap).await?; execute_step(StartOtherBrokers, &bootstrap).await?; + execute_step(LoadDistributorValuesStep, &bootstrap).await?; execute_step(CheckLauncherStep, &bootstrap).await?; execute_step(StartWsStep, &bootstrap).await?; execute_step(FireboltGatewayStep, &bootstrap).await?; diff --git a/core/main/src/bootstrap/start_communication_broker.rs b/core/main/src/bootstrap/start_communication_broker.rs index 1ee508c53..01d2a1524 100644 --- a/core/main/src/bootstrap/start_communication_broker.rs +++ b/core/main/src/bootstrap/start_communication_broker.rs @@ -20,6 +20,7 @@ use ripple_sdk::{ }; use crate::broker::endpoint_broker::BrokerOutputForwarder; +use crate::processor::rpc_gateway_processor::RpcGatewayProcessor; use crate::state::bootstrap_state::BootstrapState; pub struct StartCommunicationBroker; @@ -32,6 +33,13 @@ impl Bootstep for StartCommunicationBroker { async fn setup(&self, state: BootstrapState) -> Result<(), RippleError> { let ps = state.platform_state.clone(); + // When endpoint broker starts up enable RPC processor there might be internal services which might need + // brokering data + state + .platform_state + .get_client() + .add_request_processor(RpcGatewayProcessor::new(state.platform_state.get_client())); + // Start the Broker Reciever if let Ok(rx) = state.channels_state.get_broker_receiver() { BrokerOutputForwarder::start_forwarder(ps.clone(), rx) diff --git a/core/main/src/bootstrap/start_fbgateway_step.rs b/core/main/src/bootstrap/start_fbgateway_step.rs index 739b9ece0..4d6323587 100644 --- a/core/main/src/bootstrap/start_fbgateway_step.rs +++ b/core/main/src/bootstrap/start_fbgateway_step.rs @@ -36,7 +36,6 @@ use crate::{ }, rpc::RippleRPCProvider, }, - processor::rpc_gateway_processor::RpcGatewayProcessor, service::telemetry_builder::TelemetryBuilder, state::{bootstrap_state::BootstrapState, platform_state::PlatformState}, }; @@ -102,12 +101,6 @@ impl Bootstep for FireboltGatewayStep { .await; let gateway = FireboltGateway::new(state.clone(), methods); debug!("Handlers initialized"); - // Main can now recieve RPC requests - state - .platform_state - .get_client() - .add_request_processor(RpcGatewayProcessor::new(state.platform_state.get_client())); - debug!("Adding RPC gateway processor"); #[cfg(feature = "sysd")] if sd_notify::booted().is_ok() && sd_notify::notify(false, &[sd_notify::NotifyState::Ready]).is_err() @@ -117,6 +110,6 @@ impl Bootstep for FireboltGatewayStep { TelemetryBuilder::send_ripple_telemetry(&state.platform_state); gateway.start().await; - Ok(()) + Err(RippleError::ServiceError) } } diff --git a/core/main/src/broker/broker_utils.rs b/core/main/src/broker/broker_utils.rs new file mode 100644 index 000000000..642997fff --- /dev/null +++ b/core/main/src/broker/broker_utils.rs @@ -0,0 +1,69 @@ +// Copyright 2023 Comcast Cable Communications Management, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::time::Duration; + +use crate::utils::rpc_utils::extract_tcp_port; +use futures::stream::{SplitSink, SplitStream}; +use futures_util::StreamExt; +use ripple_sdk::{ + log::{error, info}, + tokio::{self, net::TcpStream}, +}; +use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream}; + +pub struct BrokerUtils; + +impl BrokerUtils { + pub async fn get_ws_broker( + endpoint: &str, + alias: Option, + ) -> ( + SplitSink, Message>, + SplitStream>, + ) { + info!("Broker Endpoint url {}", endpoint); + let url_path = if let Some(a) = alias { + format!("{}{}", endpoint, a) + } else { + endpoint.to_owned() + }; + let url = url::Url::parse(&url_path).unwrap(); + let port = extract_tcp_port(endpoint); + info!("Url host str {}", url.host_str().unwrap()); + let mut index = 0; + + loop { + // Try connecting to the tcp port first + if let Ok(v) = TcpStream::connect(&port).await { + // Setup handshake for websocket with the tcp port + // Some WS servers lock on to the Port but not setup handshake till they are fully setup + if let Ok((stream, _)) = client_async(url_path.clone(), v).await { + break stream.split(); + } + } + if (index % 10).eq(&0) { + error!( + "Broker with {} failed with retry for last {} secs in {}", + url_path, index, port + ); + } + index += 1; + tokio::time::sleep(Duration::from_secs(1)).await; + } + } +} diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index ecc92f089..88911f430 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -23,12 +23,12 @@ use ripple_sdk::{ }, session::AccountSession, }, - extn::extn_client_message::ExtnMessage, + extn::extn_client_message::{ExtnEvent, ExtnMessage}, framework::RippleResponse, log::error, tokio::{ self, - sync::mpsc::{Receiver, Sender}, + sync::mpsc::{self, Receiver, Sender}, }, utils::error::RippleError, }; @@ -43,7 +43,8 @@ use std::{ }; use crate::{ - firebolt::firebolt_gateway::JsonRpcError, + firebolt::firebolt_gateway::{FireboltGatewayCommand, JsonRpcError}, + service::extn::ripple_client::RippleClient, state::platform_state::PlatformState, utils::router_utils::{return_api_message_for_transport, return_extn_response}, }; @@ -60,6 +61,21 @@ pub struct BrokerSender { pub sender: Sender, } +#[derive(Clone, Debug)] +pub struct BrokerCleaner { + pub cleaner: Option>, +} + +impl BrokerCleaner { + async fn cleanup_session(&self, appid: &str) { + if let Some(cleaner) = self.cleaner.clone() { + if let Err(e) = cleaner.send(appid.to_owned()).await { + error!("Couldnt cleanup {} {:?}", appid, e) + } + } + } +} + #[derive(Clone, Debug)] pub struct BrokerRequest { pub rpc: RpcRequest, @@ -67,12 +83,68 @@ pub struct BrokerRequest { pub subscription_processed: Option, } +pub type BrokerSubMap = HashMap>; + +#[derive(Clone, Debug)] +pub struct BrokerConnectRequest { + pub key: String, + pub endpoint: RuleEndpoint, + pub sub_map: BrokerSubMap, + pub session: Option, + pub reconnector: Sender, +} + +impl BrokerConnectRequest { + pub fn new( + key: String, + endpoint: RuleEndpoint, + reconnector: Sender, + ) -> Self { + Self { + key, + endpoint, + sub_map: HashMap::new(), + session: None, + reconnector, + } + } + + pub fn new_with_sesssion( + key: String, + endpoint: RuleEndpoint, + reconnector: Sender, + session: Option, + ) -> Self { + Self { + key, + endpoint, + sub_map: HashMap::new(), + session, + reconnector, + } + } +} + impl BrokerRequest { pub fn is_subscription_processed(&self) -> bool { self.subscription_processed.is_some() } } +impl BrokerRequest { + pub fn new(rpc_request: &RpcRequest, rule: Rule) -> BrokerRequest { + BrokerRequest { + rpc: rpc_request.clone(), + rule, + subscription_processed: None, + } + } + + pub fn get_id(&self) -> String { + self.rpc.ctx.session_id.clone() + } +} + /// BrokerCallback will be used by the communication broker to send the firebolt response /// back to the gateway for client consumption #[derive(Clone, Debug)] @@ -161,17 +233,47 @@ pub struct EndpointBrokerState { request_map: Arc>>, extension_request_map: Arc>>, rule_engine: RuleEngine, + cleaner_list: Arc>>, + reconnect_tx: Sender, } impl EndpointBrokerState { - pub fn new(tx: Sender, rule_engine: RuleEngine) -> Self { - Self { + pub fn new( + tx: Sender, + rule_engine: RuleEngine, + ripple_client: RippleClient, + ) -> Self { + let (reconnect_tx, rec_tr) = mpsc::channel(2); + let state = Self { endpoint_map: Arc::new(RwLock::new(HashMap::new())), callback: BrokerCallback { sender: tx }, request_map: Arc::new(RwLock::new(HashMap::new())), extension_request_map: Arc::new(RwLock::new(HashMap::new())), rule_engine, - } + cleaner_list: Arc::new(RwLock::new(Vec::new())), + reconnect_tx, + }; + state.reconnect_thread(rec_tr, ripple_client); + state + } + + fn reconnect_thread(&self, mut rx: Receiver, client: RippleClient) { + let mut state = self.clone(); + tokio::spawn(async move { + while let Some(v) = rx.recv().await { + if matches!(v.endpoint.protocol, RuleEndpointProtocol::Thunder) { + if client + .send_gateway_command(FireboltGatewayCommand::StopServer) + .is_err() + { + error!("Stopping server") + } + break; + } else { + state.build_endpoint(v) + } + } + }); } fn get_request(&self, id: u64) -> Result { @@ -195,11 +297,20 @@ impl EndpointBrokerState { } } - fn get_extn_message(&self, id: u64) -> Result { - let result = { self.extension_request_map.write().unwrap().remove(&id) }; - match result { - Some(v) => Ok(v), - None => Err(RippleError::NotAvailable), + fn get_extn_message(&self, id: u64, is_event: bool) -> Result { + if is_event { + let v = { self.extension_request_map.read().unwrap().get(&id).cloned() }; + if let Some(v1) = v { + Ok(v1) + } else { + Err(RippleError::NotAvailable) + } + } else { + let result = { self.extension_request_map.write().unwrap().remove(&id) }; + match result { + Some(v) => Ok(v), + None => Err(RippleError::NotAvailable), + } } } @@ -234,53 +345,68 @@ impl EndpointBrokerState { } rpc_request_c.ctx.call_id = id; - self.get_broker_request(&rpc_request_c, rule) + BrokerRequest::new(&rpc_request_c, rule) } pub fn build_thunder_endpoint(&mut self) { if let Some(endpoint) = self.rule_engine.rules.endpoints.get("thunder").cloned() { - let mut endpoint_map = self.endpoint_map.write().unwrap(); - endpoint_map.insert( + let request = BrokerConnectRequest::new( "thunder".to_owned(), - ThunderBroker::get_broker(endpoint.clone(), self.callback.clone()).get_sender(), + endpoint.clone(), + self.reconnect_tx.clone(), ); + self.build_endpoint(request); } } pub fn build_other_endpoints(&mut self, session: Option) { for (key, endpoint) in self.rule_engine.rules.endpoints.clone() { - match endpoint.protocol { - RuleEndpointProtocol::Http => { - let mut endpoint_map = self.endpoint_map.write().unwrap(); - endpoint_map.insert( - key, - HttpBroker::get_broker_with_session( - endpoint.clone(), - self.callback.clone(), - session.clone(), - ) - .get_sender(), - ); - } - RuleEndpointProtocol::Websocket => { - let mut endpoint_map = self.endpoint_map.write().unwrap(); - endpoint_map.insert( - key, - WebsocketBroker::get_broker(endpoint.clone(), self.callback.clone()) - .get_sender(), - ); - } - _ => {} - } + let request = BrokerConnectRequest::new_with_sesssion( + key, + endpoint.clone(), + self.reconnect_tx.clone(), + session.clone(), + ); + self.build_endpoint(request); } } - fn get_sender(&self, hash: &str) -> Option { + fn build_endpoint(&mut self, request: BrokerConnectRequest) { + let endpoint = request.endpoint.clone(); + let key = request.key.clone(); + let (broker, cleaner) = match endpoint.protocol { + RuleEndpointProtocol::Http => ( + HttpBroker::get_broker(request, self.callback.clone()).get_sender(), + None, + ), + RuleEndpointProtocol::Websocket => { + let ws_broker = WebsocketBroker::get_broker(request, self.callback.clone()); + (ws_broker.get_sender(), Some(ws_broker.get_cleaner())) + } + RuleEndpointProtocol::Thunder => { + let thunder_broker = ThunderBroker::get_broker(request, self.callback.clone()); + ( + thunder_broker.get_sender(), + Some(thunder_broker.get_cleaner()), + ) + } + }; + { - self.endpoint_map.read().unwrap().get(hash).cloned() + let mut endpoint_map = self.endpoint_map.write().unwrap(); + endpoint_map.insert(key, broker); + } + + if let Some(cleaner) = cleaner { + let mut cleaner_list = self.cleaner_list.write().unwrap(); + cleaner_list.push(cleaner); } } + fn get_sender(&self, hash: &str) -> Option { + self.endpoint_map.read().unwrap().get(hash).cloned() + } + /// Main handler method whcih checks for brokerage and then sends the request for /// asynchronous processing pub fn handle_brokerage( @@ -321,6 +447,14 @@ impl EndpointBrokerState { self.rule_engine.get_rule(rpc_request) } + // Method to cleanup all subscription on App termination + pub async fn cleanup_for_app(&self, app_id: &str) { + let cleaners = { self.cleaner_list.read().unwrap().clone() }; + for cleaner in cleaners { + cleaner.cleanup_session(app_id).await + } + } + // Get Broker Request from rpc_request pub fn get_broker_request(&self, rpc_request: &RpcRequest, rule: Rule) -> BrokerRequest { BrokerRequest { @@ -334,18 +468,8 @@ impl EndpointBrokerState { /// Trait which contains all the abstract methods for a Endpoint Broker /// There could be Websocket or HTTP protocol implementations of the given trait pub trait EndpointBroker { - fn get_broker_with_session( - endpoint: RuleEndpoint, - callback: BrokerCallback, - _: Option, - ) -> Self - where - Self: Sized, - { - Self::get_broker(endpoint, callback) - } + fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self; - fn get_broker(endpoint: RuleEndpoint, callback: BrokerCallback) -> Self; fn get_sender(&self) -> BrokerSender; fn prepare_request(&self, rpc_request: &BrokerRequest) -> Result, RippleError> { @@ -404,7 +528,7 @@ pub trait EndpointBroker { /// Default handler method for the broker to remove the context and send it back to the /// client for consumption - fn handle_response(result: &[u8], callback: BrokerCallback) { + fn handle_jsonrpc_response(result: &[u8], callback: BrokerCallback) { let mut final_result = Err(RippleError::ParseError); if let Ok(data) = serde_json::from_slice::(result) { final_result = Ok(BrokerOutput { data }); @@ -415,6 +539,8 @@ pub trait EndpointBroker { error!("Bad broker response {}", String::from_utf8_lossy(result)); } } + + fn get_cleaner(&self) -> BrokerCleaner; } /// Forwarder gets the BrokerOutput and forwards the response to the gateway. @@ -425,97 +551,80 @@ impl BrokerOutputForwarder { tokio::spawn(async move { while let Some(mut v) = rx.recv().await { let mut is_event = false; + // First validate the id check if it could be an event let id = if let Some(e) = v.get_event() { is_event = true; Some(e) } else { v.data.id }; + if let Some(id) = id { if let Ok(broker_request) = platform_state.endpoint_state.get_request(id) { let sub_processed = broker_request.is_subscription_processed(); - let rpc_request = broker_request.rpc; + let rpc_request = broker_request.rpc.clone(); let session_id = rpc_request.ctx.get_id(); let is_subscription = rpc_request.is_subscription(); - if let Some(session) = platform_state - .session_state - .get_session_for_connection_id(&session_id) - { - let request_id = rpc_request.ctx.call_id; - v.data.id = Some(request_id); - if let Some(result) = v.data.result.clone() { - if is_event { - if let Some(filter) = broker_request - .rule - .transform - .get_filter(super::rules_engine::RuleTransformType::Event) - { - if let Ok(r) = jq_compile( - result, - &filter, - format!("{}_event", rpc_request.ctx.method), - ) { - v.data.result = Some(r); - } - } - } else if is_subscription { - if sub_processed { - continue; - } - v.data.result = Some(json!({ - "listening" : rpc_request.is_listening(), - "event" : rpc_request.ctx.method - })); - platform_state.endpoint_state.update_unsubscribe_request(id); - } else if let Some(filter) = broker_request - .rule - .transform - .get_filter(super::rules_engine::RuleTransformType::Response) - { - match jq_compile( - result.clone(), - &filter, - format!("{}_response", rpc_request.ctx.method), - ) { - Ok(r) => { - if r.to_string().to_lowercase().contains("null") { - v.data.error = None; - v.data.result = Some(Value::Null); - } else if result.get("success").is_some() { - v.data.result = Some(r); - v.data.error = None; - } else { - v.data.error = Some(r); - v.data.result = None; - } - } - Err(e) => error!("jq_compile error {:?}", e), - } + + // Step 1: Create the data + if let Some(result) = v.data.result.clone() { + if is_event { + apply_rule_for_event( + &broker_request, + &result, + &rpc_request, + &mut v, + ); + } else if is_subscription { + if sub_processed { + continue; } + v.data.result = Some(json!({ + "listening" : rpc_request.is_listening(), + "event" : rpc_request.ctx.method + })); + platform_state.endpoint_state.update_unsubscribe_request(id); + } else if let Some(filter) = broker_request + .rule + .transform + .get_filter(super::rules_engine::RuleTransformType::Response) + { + apply_response(result, filter, &rpc_request, &mut v); } - let message = ApiMessage { - request_id: request_id.to_string(), - protocol: rpc_request.ctx.protocol.clone(), - jsonrpc_msg: serde_json::to_string(&v.data).unwrap(), - }; - - match rpc_request.ctx.protocol.clone() { - ApiProtocol::Extn => { - if let Ok(v) = - platform_state.endpoint_state.get_extn_message(id) - { - return_extn_response(message, v) - } - } - _ => { - return_api_message_for_transport( - session, - message, - platform_state.clone(), - ) - .await + } + + let request_id = rpc_request.ctx.call_id; + v.data.id = Some(request_id); + + // Step 2: Create the message + let message = ApiMessage { + request_id: request_id.to_string(), + protocol: rpc_request.ctx.protocol.clone(), + jsonrpc_msg: serde_json::to_string(&v.data).unwrap(), + }; + + // Step 3: Handle Non Extension + if matches!(rpc_request.ctx.protocol, ApiProtocol::Extn) { + if let Ok(extn_message) = + platform_state.endpoint_state.get_extn_message(id, is_event) + { + if is_event { + forward_extn_event(&extn_message, v.data, &platform_state) + .await; + } else { + return_extn_response(message, extn_message) } } + } else if let Some(session) = platform_state + .session_state + .get_session_for_connection_id(&session_id) + { + return_api_message_for_transport( + session, + message, + platform_state.clone(), + ) + .await } } } else { @@ -524,6 +633,98 @@ impl BrokerOutputForwarder { } }); } + + pub fn handle_non_jsonrpc_response( + data: &[u8], + callback: BrokerCallback, + request: BrokerRequest, + ) -> RippleResponse { + // find if its event + let method = if request.rpc.is_subscription() { + Some(format!( + "{}.{}", + request.rpc.ctx.call_id, request.rpc.ctx.method + )) + } else { + None + }; + let parse_result = serde_json::from_slice::(data); + if parse_result.is_err() { + return Err(RippleError::ParseError); + } + let result = Some(parse_result.unwrap()); + // build JsonRpcApiResponse + let data = JsonRpcApiResponse { + jsonrpc: "2.0".to_owned(), + id: Some(request.rpc.ctx.call_id), + method, + result, + error: None, + params: None, + }; + let output = BrokerOutput { data }; + tokio::spawn(async move { callback.sender.send(output).await }); + Ok(()) + } +} + +async fn forward_extn_event( + extn_message: &ExtnMessage, + v: JsonRpcApiResponse, + platform_state: &PlatformState, +) { + if let Ok(event) = extn_message.get_event(ExtnEvent::Value(serde_json::to_value(v).unwrap())) { + if let Err(e) = platform_state + .get_client() + .get_extn_client() + .send_message(event) + .await + { + error!("couldnt send back event {:?}", e) + } + } +} + +fn apply_response(result: Value, filter: String, rpc_request: &RpcRequest, v: &mut BrokerOutput) { + match jq_compile( + result.clone(), + &filter, + format!("{}_response", rpc_request.ctx.method), + ) { + Ok(r) => { + if r.to_string().to_lowercase().contains("null") { + v.data.result = Some(Value::Null) + } else if result.get("success").is_some() { + v.data.result = Some(r); + v.data.error = None; + } else { + v.data.error = Some(r); + v.data.result = None; + } + } + Err(e) => error!("jq_compile error {:?}", e), + } +} + +fn apply_rule_for_event( + broker_request: &BrokerRequest, + result: &Value, + rpc_request: &RpcRequest, + v: &mut BrokerOutput, +) { + if let Some(filter) = broker_request + .rule + .transform + .get_filter(super::rules_engine::RuleTransformType::Event) + { + if let Ok(r) = jq_compile( + result.clone(), + &filter, + format!("{}_event", rpc_request.ctx.method), + ) { + v.data.result = Some(r); + } + } } #[cfg(test)] @@ -596,21 +797,29 @@ mod tests { mod endpoint_broker_state { use ripple_sdk::{ - api::gateway::rpc_gateway_api::RpcRequest, tokio::sync::mpsc::channel, Mockable, + api::gateway::rpc_gateway_api::RpcRequest, tokio, tokio::sync::mpsc::channel, Mockable, }; - use crate::broker::rules_engine::{Rule, RuleEngine, RuleSet, RuleTransform}; + use crate::{ + broker::{ + endpoint_broker::tests::RippleClient, + rules_engine::{Rule, RuleEngine, RuleSet, RuleTransform}, + }, + state::bootstrap_state::ChannelsState, + }; use super::EndpointBrokerState; - #[test] - fn get_request() { + #[tokio::test] + async fn get_request() { let (tx, _) = channel(2); + let client = RippleClient::new(ChannelsState::new()); let state = EndpointBrokerState::new( tx, RuleEngine { rules: RuleSet::default(), }, + client, ); let mut request = RpcRequest::mock(); state.update_request( diff --git a/core/main/src/broker/http_broker.rs b/core/main/src/broker/http_broker.rs index 4c5151256..2831ee33c 100644 --- a/core/main/src/broker/http_broker.rs +++ b/core/main/src/broker/http_broker.rs @@ -21,20 +21,22 @@ use ripple_sdk::{ tokio::{self, sync::mpsc}, }; -use super::{ - endpoint_broker::{BrokerCallback, BrokerSender, EndpointBroker}, - rules_engine::RuleEndpoint, +use super::endpoint_broker::{ + BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutputForwarder, BrokerSender, + EndpointBroker, }; pub struct HttpBroker { sender: BrokerSender, + cleaner: BrokerCleaner, } impl EndpointBroker for HttpBroker { - fn get_broker(endpoint: RuleEndpoint, callback: BrokerCallback) -> Self { + fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { + let endpoint = request.endpoint.clone(); let (tx, mut tr) = mpsc::channel(10); let broker = BrokerSender { sender: tx }; - + let is_json_rpc = endpoint.jsonrpc; let uri: Uri = endpoint.url.parse().unwrap(); // let mut headers = HeaderMap::new(); // headers.insert("Content-Type", "application/json".parse().unwrap()); @@ -69,16 +71,33 @@ impl EndpointBroker for HttpBroker { if let Ok(bytes) = hyper::body::to_bytes(body).await { let value: Vec = bytes.into(); let value = value.as_slice(); - Self::handle_response(value, callback.clone()); + if is_json_rpc { + Self::handle_jsonrpc_response(value, callback.clone()); + } else if let Err(e) = + BrokerOutputForwarder::handle_non_jsonrpc_response( + value, + callback.clone(), + request.clone(), + ) + { + error!("Error forwarding {:?}", e) + } } } } } }); - Self { sender: broker } + Self { + sender: broker, + cleaner: BrokerCleaner { cleaner: None }, + } } fn get_sender(&self) -> BrokerSender { self.sender.clone() } + + fn get_cleaner(&self) -> BrokerCleaner { + self.cleaner.clone() + } } diff --git a/core/main/src/broker/mod.rs b/core/main/src/broker/mod.rs index f25290781..da26923a9 100644 --- a/core/main/src/broker/mod.rs +++ b/core/main/src/broker/mod.rs @@ -14,6 +14,7 @@ // // SPDX-License-Identifier: Apache-2.0 // +pub mod broker_utils; pub mod endpoint_broker; pub mod http_broker; pub mod rules_engine; diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index ce1614e91..bacd9214d 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -45,6 +45,12 @@ impl RuleSet { pub struct RuleEndpoint { pub protocol: RuleEndpointProtocol, pub url: String, + #[serde(default = "default_autostart")] + pub jsonrpc: bool, +} + +fn default_autostart() -> bool { + true } #[derive(Deserialize, Debug, Clone)] @@ -59,6 +65,8 @@ pub enum RuleEndpointProtocol { #[derive(Debug, Clone, Deserialize)] pub struct Rule { pub alias: String, + // Not every rule needs transform + #[serde(default)] pub transform: RuleTransform, pub endpoint: Option, } diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 9f587a2db..9bc270c6f 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -15,83 +15,65 @@ // SPDX-License-Identifier: Apache-2.0 // use super::{ - endpoint_broker::{BrokerCallback, BrokerOutput, BrokerRequest, BrokerSender, EndpointBroker}, - rules_engine::RuleEndpoint, + endpoint_broker::{ + BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutput, BrokerRequest, + BrokerSender, BrokerSubMap, EndpointBroker, + }, thunder::thunder_plugins_status_mgr::StatusManager, }; -use crate::utils::rpc_utils::extract_tcp_port; +use crate::broker::broker_utils::BrokerUtils; use futures_util::{SinkExt, StreamExt}; + use ripple_sdk::{ api::gateway::rpc_gateway_api::JsonRpcApiResponse, log::{debug, error, info}, - tokio::{self, net::TcpStream, sync::mpsc}, + tokio::{self, sync::mpsc}, utils::error::RippleError, }; use serde_json::json; use std::{ - collections::HashMap, sync::{Arc, RwLock}, - time::Duration, vec, }; -use tokio_tungstenite::client_async; #[derive(Debug, Clone)] pub struct ThunderBroker { sender: BrokerSender, - subscription_map: Arc>>>, + subscription_map: Arc>, + cleaner: BrokerCleaner, status_manager: StatusManager, } impl ThunderBroker { - fn start(endpoint: RuleEndpoint, callback: BrokerCallback) -> Self { + fn start(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { + let endpoint = request.endpoint.clone(); let (tx, mut tr) = mpsc::channel(10); + let (c_tx, mut c_tr) = mpsc::channel(2); let sender = BrokerSender { sender: tx }; - let subscription_map = Arc::new(RwLock::new(HashMap::new())); + let subscription_map = Arc::new(RwLock::new(request.sub_map.clone())); let broker = Self { sender, subscription_map, + cleaner: BrokerCleaner { + cleaner: Some(c_tx.clone()), + }, status_manager: StatusManager::new(), }; let broker_c = broker.clone(); + let broker_for_cleanup = broker.clone(); let callback_for_sender = callback.clone(); + let broker_for_reconnect = broker.clone(); tokio::spawn(async move { - info!("Broker Endpoint url {}", endpoint.url); - let url = url::Url::parse(&endpoint.url).unwrap(); - let port = extract_tcp_port(&endpoint.url); - info!("Url host str {}", url.host_str().unwrap()); - let mut index = 0; - //let tcp_url = url.host_str() - let tcp = loop { - let resp = TcpStream::connect(&port).await; - match resp { - Ok(v) => { - break v; - } - Err(e) => { - if (index % 10).eq(&0) { - error!( - "Thunder Broker failed with retry for last {} secs in {} {:?}", - index, port, e - ); - } - index += 1; - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - }; - - let (stream, _) = client_async(url, tcp).await.unwrap(); - let (mut ws_tx, mut ws_rx) = stream.split(); + let (mut ws_tx, mut ws_rx) = BrokerUtils::get_ws_broker(&endpoint.url, None).await; // send the first request to the broker. This is the controller statechange subscription request - let request = broker_c + let status_request = broker_c .status_manager .generate_state_change_subscribe_request(); let _feed = ws_tx .feed(tokio_tungstenite::tungstenite::Message::Text( - request.to_string(), + status_request.to_string(), )) .await; let _flush = ws_tx.flush().await; @@ -110,13 +92,14 @@ impl ThunderBroker { } else { // send the incoming text without context back to the sender - Self::handle_response(t.as_bytes(),callback.clone()) + Self::handle_jsonrpc_response(t.as_bytes(),callback.clone()) } } }, Err(e) => { error!("Broker Websocket error on read {:?}", e); - break false + // Time to reconnect Thunder with existing subscription + break; } } @@ -141,9 +124,37 @@ impl ThunderBroker { } } } + }, + Some(cleanup_request) = c_tr.recv() => { + let value = { + broker_for_cleanup.subscription_map.write().unwrap().remove(&cleanup_request) + }; + if let Some(mut cleanup) = value { + let sender = broker_for_cleanup.get_sender(); + while let Some(mut v) = cleanup.pop() { + v.rpc = v.rpc.get_unsubscribe(); + if (sender.send(v).await).is_err() { + error!("Cleanup Error for {}",&cleanup_request); + } + } + + } + + } } + } + + let mut reconnect_request = request.clone(); + // Thunder Disconnected try reconnecting. + { + let mut subs = broker_for_reconnect.subscription_map.write().unwrap(); + for (k, v) in subs.drain().take(1) { + let _ = reconnect_request.sub_map.insert(k, v); } } + if request.reconnector.send(reconnect_request).await.is_err() { + error!("Error reconnecting to thunder"); + } }); broker } @@ -187,7 +198,6 @@ impl ThunderBroker { method, app_id ); response = Some(v.remove(i)); - //let _ = response.insert(v.remove(i)); } if listen { v.push(request.clone()); @@ -201,14 +211,18 @@ impl ThunderBroker { } impl EndpointBroker for ThunderBroker { - fn get_broker(endpoint: RuleEndpoint, callback: BrokerCallback) -> Self { - Self::start(endpoint, callback) + fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { + Self::start(request, callback) } fn get_sender(&self) -> BrokerSender { self.sender.clone() } + fn get_cleaner(&self) -> BrokerCleaner { + self.cleaner.clone() + } + fn prepare_request( &self, rpc_request: &super::endpoint_broker::BrokerRequest, @@ -304,7 +318,7 @@ impl EndpointBroker for ThunderBroker { /// Default handler method for the broker to remove the context and send it back to the /// client for consumption - fn handle_response(result: &[u8], callback: BrokerCallback) { + fn handle_jsonrpc_response(result: &[u8], callback: BrokerCallback) { let mut final_result = Err(RippleError::ParseError); if let Ok(data) = serde_json::from_slice::(result) { let updated_data = Self::update_response(&data); diff --git a/core/main/src/broker/websocket_broker.rs b/core/main/src/broker/websocket_broker.rs index e4d3493a9..e5b6a5219 100644 --- a/core/main/src/broker/websocket_broker.rs +++ b/core/main/src/broker/websocket_broker.rs @@ -15,88 +15,286 @@ // SPDX-License-Identifier: Apache-2.0 // -use crate::utils::rpc_utils::extract_tcp_port; +use crate::broker::broker_utils::BrokerUtils; -use super::{ - endpoint_broker::{BrokerCallback, BrokerSender, EndpointBroker}, - rules_engine::RuleEndpoint, +use super::endpoint_broker::{ + BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutputForwarder, BrokerRequest, + BrokerSender, EndpointBroker, }; use futures_util::{SinkExt, StreamExt}; use ripple_sdk::{ - log::{debug, error, info}, - tokio::{self, net::TcpStream, sync::mpsc}, + log::{debug, error}, + tokio::{self, sync::mpsc}, +}; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, }; -use std::time::Duration; -use tokio_tungstenite::client_async; pub struct WebsocketBroker { sender: BrokerSender, + cleaner: BrokerCleaner, } impl WebsocketBroker { - fn start(endpoint: RuleEndpoint, callback: BrokerCallback) -> Self { + fn start(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { + let endpoint = request.endpoint.clone(); let (tx, mut tr) = mpsc::channel(10); + let (cleaner_tx, mut cleaner_tr) = mpsc::channel::(1); + let non_json_rpc_map: Arc>>>> = + Arc::new(RwLock::new(HashMap::new())); + let map_clone = non_json_rpc_map.clone(); let broker = BrokerSender { sender: tx }; tokio::spawn(async move { - info!("Broker Endpoint url {}", endpoint.url); - let url = url::Url::parse(&endpoint.url).unwrap(); - let port = extract_tcp_port(&endpoint.url); - info!("Url host str {}", url.host_str().unwrap()); - //let tcp_url = url.host_str() - let tcp = loop { - if let Ok(v) = TcpStream::connect(&port).await { - break v; - } else { - error!("Broker Wait for a sec and retry {}", port); - tokio::time::sleep(Duration::from_secs(1)).await; + if endpoint.jsonrpc { + let (mut ws_tx, mut ws_rx) = BrokerUtils::get_ws_broker(&endpoint.url, None).await; + + tokio::pin! { + let read = ws_rx.next(); } - }; + loop { + tokio::select! { + Some(value) = &mut read => { + match value { + Ok(v) => { + if let tokio_tungstenite::tungstenite::Message::Text(t) = v { + // send the incoming text without context back to the sender + Self::handle_jsonrpc_response(t.as_bytes(),callback.clone()) + } + }, + Err(e) => { + error!("Broker Websocket error on read {:?}", e); + break false + } + } - let (stream, _) = client_async(url, tcp).await.unwrap(); - let (mut ws_tx, mut ws_rx) = stream.split(); + }, + Some(request) = tr.recv() => { + debug!("Got request from receiver for broker {:?}", request); + if let Ok(updated_request) = Self::update_request(&request) { + debug!("Sending request to broker {}", updated_request); + let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(updated_request)).await; + let _flush = ws_tx.flush().await; + } + + } + } + } + } else { + let cleaner_clone = non_json_rpc_map.clone(); + tokio::spawn(async move { + while let Some(v) = cleaner_tr.recv().await { + { + if let Some(cleaner_list) = + { cleaner_clone.write().unwrap().remove(&v) } + { + for sender in cleaner_list { + if sender.try_send(v.clone()).is_err() { + error!("Cleaning up listener"); + } + } + } + } + } + }); + + while let Some(v) = tr.recv().await { + let id = v.get_id(); + let cleaner = WSNotificationBroker::start( + v.clone(), + callback.clone(), + endpoint.url.clone(), + ); + { + let mut map = map_clone.write().unwrap(); + let mut sender_list = map.remove(&id).unwrap_or_default(); + sender_list.push(cleaner); + let _ = map.insert(id, sender_list); + } + } + + true + } + }); + + Self { + sender: broker, + cleaner: BrokerCleaner { + cleaner: Some(cleaner_tx), + }, + } + } +} + +pub struct WSNotificationBroker; + +impl WSNotificationBroker { + fn start( + request_c: BrokerRequest, + callback_c: BrokerCallback, + url: String, + ) -> mpsc::Sender { + let (tx, mut tr) = mpsc::channel::(1); + tokio::spawn(async move { + let app_id = request_c.get_id(); + let alias = request_c.rule.alias.clone(); + let (mut ws_tx, mut ws_rx) = + BrokerUtils::get_ws_broker(&url, Some(alias.clone())).await; tokio::pin! { let read = ws_rx.next(); } + loop { - tokio::select! { + tokio::select!( Some(value) = &mut read => { match value { Ok(v) => { if let tokio_tungstenite::tungstenite::Message::Text(t) = v { // send the incoming text without context back to the sender - Self::handle_response(t.as_bytes(),callback.clone()) + if let Err(e) = BrokerOutputForwarder::handle_non_jsonrpc_response( + t.as_bytes(), + callback_c.clone(), + request_c.clone(), + ) { + error!("error forwarding {}", e); + } } }, Err(e) => { error!("Broker Websocket error on read {:?}", e); - break false + break; } } }, Some(request) = tr.recv() => { - debug!("Got request from receiver for broker {:?}", request); - if let Ok(updated_request) = Self::update_request(&request) { - debug!("Sending request to broker {}", updated_request); - let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(updated_request)).await; + debug!("Recieved cleaner request for {}", request); + if request.eq(&app_id) { + let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Close(None)).await; let _flush = ws_tx.flush().await; + break; } - } - } + ) } }); - Self { sender: broker } + tx } } impl EndpointBroker for WebsocketBroker { - fn get_broker(endpoint: RuleEndpoint, callback: BrokerCallback) -> Self { - Self::start(endpoint, callback) + fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { + Self::start(request, callback) } fn get_sender(&self) -> BrokerSender { self.sender.clone() } + + fn get_cleaner(&self) -> BrokerCleaner { + self.cleaner.clone() + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use crate::{ + broker::{ + endpoint_broker::{BrokerOutput, BrokerRequest}, + rules_engine::{Rule, RuleEndpoint, RuleTransform}, + }, + utils::test_utils::{MockWebsocket, WSMockData}, + }; + use ripple_sdk::api::gateway::rpc_gateway_api::RpcRequest; + use serde_json::json; + + use super::*; + + async fn setup_broker( + tx: mpsc::Sender, + send_data: Vec, + sender: mpsc::Sender, + on_close: bool, + ) -> WebsocketBroker { + // setup mock websocket server + let port = MockWebsocket::start(send_data, Vec::new(), tx, on_close).await; + + let endpoint = RuleEndpoint { + url: format!("ws://127.0.0.1:{}", port), + protocol: crate::broker::rules_engine::RuleEndpointProtocol::Websocket, + jsonrpc: false, + }; + let (tx, _) = mpsc::channel(1); + let request = BrokerConnectRequest::new("somekey".to_owned(), endpoint, tx); + let callback = BrokerCallback { sender }; + // Setup websocket broker + WebsocketBroker::start(request, callback) + } + + #[tokio::test] + async fn connect_non_json_rpc_websocket() { + let (tx, mut tr) = mpsc::channel(1); + let (sender, mut rec) = mpsc::channel(1); + let send_data = vec![WSMockData::get(json!({"key":"value"}).to_string())]; + + let broker = setup_broker(tx, send_data, sender, false).await; + // Use Broker to connect to it + let request = BrokerRequest { + rpc: RpcRequest::get_new_internal("some_method".to_owned(), None), + rule: Rule { + alias: "".to_owned(), + transform: RuleTransform::default(), + endpoint: None, + }, + subscription_processed: None, + }; + + broker.sender.send(request).await.unwrap(); + + // See if broker output gets the value + + let v = tokio::time::timeout(Duration::from_secs(2), rec.recv()) + .await + .unwrap() + .unwrap(); + assert!(v + .data + .result + .unwrap() + .get("key") + .unwrap() + .as_str() + .unwrap() + .eq("value")); + + assert!(tr.recv().await.unwrap()) + } + + #[tokio::test] + async fn cleanup_non_json_rpc_websocket() { + let (tx, mut tr) = mpsc::channel(1); + let (sender, _) = mpsc::channel(1); + + let broker = setup_broker(tx, Vec::new(), sender, true).await; + // Use Broker to connect to it + let request = BrokerRequest { + rpc: RpcRequest::get_new_internal("some_method".to_owned(), None), + rule: Rule { + alias: "".to_owned(), + transform: RuleTransform::default(), + endpoint: None, + }, + subscription_processed: None, + }; + let id = request.get_id(); + + broker.sender.send(request).await.unwrap(); + + broker.cleaner.cleaner.unwrap().send(id).await.unwrap(); + // See if ws is closed + assert!(tr.recv().await.unwrap()) + } } diff --git a/core/main/src/firebolt/firebolt_gateway.rs b/core/main/src/firebolt/firebolt_gateway.rs index 0e423e440..d4bdc8831 100644 --- a/core/main/src/firebolt/firebolt_gateway.rs +++ b/core/main/src/firebolt/firebolt_gateway.rs @@ -38,7 +38,10 @@ use serde::Serialize; use crate::{ firebolt::firebolt_gatekeeper::FireboltGatekeeper, - service::{apps::app_events::AppEvents, telemetry_builder::TelemetryBuilder}, + service::{ + apps::{app_events::AppEvents, provider_broker::ProviderBroker}, + telemetry_builder::TelemetryBuilder, + }, state::{ bootstrap_state::BootstrapState, openrpc_state::OpenRpcState, platform_state::PlatformState, session_state::Session, @@ -82,6 +85,7 @@ pub enum FireboltGatewayCommand { HandleRpcForExtn { msg: ExtnMessage, }, + StopServer, } impl FireboltGateway { @@ -115,6 +119,13 @@ impl FireboltGateway { } UnregisterSession { session_id, cid } => { AppEvents::remove_session(&self.state.platform_state, session_id.clone()); + ProviderBroker::unregister_session(&self.state.platform_state, cid.clone()) + .await; + self.state + .platform_state + .endpoint_state + .cleanup_for_app(&cid) + .await; self.state.platform_state.session_state.clear_session(&cid); } HandleRpc { request } => self.handle(request, None).await, @@ -125,6 +136,10 @@ impl FireboltGateway { error!("Not a valid RPC Request {:?}", msg); } } + StopServer => { + error!("Stopping server"); + break; + } } } } @@ -134,11 +149,17 @@ impl FireboltGateway { "firebolt_gateway Received Firebolt request {} {} {}", request.ctx.request_id, request.method, request.params_json ); + let mut extn_request = false; // First check sender if no sender no need to process let callback_c = extn_msg.clone(); match request.ctx.protocol { ApiProtocol::Extn => { - if callback_c.is_none() || callback_c.unwrap().callback.is_none() { + extn_request = true; + // extn protocol with subscription requests means there is no need for callback + // it is using extn_client::subscribe method which uses id field to resolve. + if !request.is_subscription() + && (callback_c.is_none() || callback_c.unwrap().callback.is_none()) + { error!("No callback for request {:?} ", request); return; } @@ -176,36 +197,40 @@ impl FireboltGateway { tokio::spawn(async move { let start = Utc::now().timestamp_millis(); + let result = if extn_request { + // extn protocol means its an internal Ripple request skip permissions. + Ok(()) + } else { + // Validate incoming request parameters. + if let Err(error_string) = validate_request(open_rpc_state, &request_c) { + let now = Utc::now().timestamp_millis(); - // Validate incoming request parameters. - if let Err(error_string) = validate_request(open_rpc_state, &request_c) { - let now = Utc::now().timestamp_millis(); - - RpcRouter::log_rdk_telemetry_message( - &request.ctx.app_id, - &request.method, - JSON_RPC_STANDARD_ERROR_INVALID_PARAMS, - now - start, - ); + RpcRouter::log_rdk_telemetry_message( + &request.ctx.app_id, + &request.method, + JSON_RPC_STANDARD_ERROR_INVALID_PARAMS, + now - start, + ); - TelemetryBuilder::stop_and_send_firebolt_metrics_timer( - &platform_state.clone(), - metrics_timer, - format!("{}", JSON_RPC_STANDARD_ERROR_INVALID_PARAMS), - ) - .await; + TelemetryBuilder::stop_and_send_firebolt_metrics_timer( + &platform_state.clone(), + metrics_timer, + format!("{}", JSON_RPC_STANDARD_ERROR_INVALID_PARAMS), + ) + .await; - let json_rpc_error = JsonRpcError { - code: JSON_RPC_STANDARD_ERROR_INVALID_PARAMS, - message: error_string, - data: None, - }; + let json_rpc_error = JsonRpcError { + code: JSON_RPC_STANDARD_ERROR_INVALID_PARAMS, + message: error_string, + data: None, + }; - send_json_rpc_error(&platform_state, &request, json_rpc_error).await; - return; - } + send_json_rpc_error(&platform_state, &request, json_rpc_error).await; + return; + } - let result = FireboltGatekeeper::gate(platform_state.clone(), request_c.clone()).await; + FireboltGatekeeper::gate(platform_state.clone(), request_c.clone()).await + }; match result { Ok(_) => { diff --git a/core/main/src/service/extn/ripple_client.rs b/core/main/src/service/extn/ripple_client.rs index 4ea1ac50c..d65e8c6e7 100644 --- a/core/main/src/service/extn/ripple_client.rs +++ b/core/main/src/service/extn/ripple_client.rs @@ -147,7 +147,8 @@ impl RippleClient { self.get_extn_client().clone().request(payload).await } pub fn send_extn_request_transient(&self, payload: impl ExtnPayloadProvider) -> RippleResponse { - self.get_extn_client().request_transient(payload) + self.get_extn_client().request_transient(payload)?; + Ok(()) } pub async fn respond(&self, msg: ExtnMessage) -> Result<(), RippleError> { diff --git a/core/main/src/state/platform_state.rs b/core/main/src/state/platform_state.rs index ba6eae4bc..8e14e5b16 100644 --- a/core/main/src/state/platform_state.rs +++ b/core/main/src/state/platform_state.rs @@ -125,7 +125,7 @@ impl PlatformState { cap_state: CapState::new(manifest.clone()), session_state: SessionState::default(), device_manifest: manifest.clone(), - ripple_client: client, + ripple_client: client.clone(), app_library_state: AppLibraryState::new(app_library), app_events_state: AppEventsState::default(), provider_broker_state: ProviderBrokerState::default(), @@ -137,7 +137,7 @@ impl PlatformState { device_session_id: DeviceSessionIdentifier::default(), ripple_cache: RippleCache::default(), version, - endpoint_state: EndpointBrokerState::new(broker_sender, rule_engine), + endpoint_state: EndpointBrokerState::new(broker_sender, rule_engine, client), } } diff --git a/core/main/src/utils/test_utils.rs b/core/main/src/utils/test_utils.rs index 7e3dda9fb..40cefb9b1 100644 --- a/core/main/src/utils/test_utils.rs +++ b/core/main/src/utils/test_utils.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + // Copyright 2023 Comcast Cable Communications Management, LLC // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,6 +16,7 @@ // // SPDX-License-Identifier: Apache-2.0 // +use futures_util::{SinkExt, StreamExt}; use ripple_sdk::{ api::{ firebolt::fb_capabilities::{ @@ -21,7 +24,14 @@ use ripple_sdk::{ }, gateway::rpc_gateway_api::{ApiMessage, CallContext}, }, - tokio::sync::mpsc::{self, Receiver}, + log::debug, + tokio::{ + self, + net::{TcpListener, TcpStream}, + sync::mpsc::{self, Receiver}, + time::sleep, + }, + utils::logger::init_logger, }; use ripple_tdk::utils::test_utils::Mockable; @@ -102,3 +112,107 @@ impl MockCallContext { } } } + +#[derive(Clone)] +pub struct WSMockData { + pub data: String, + pub delay: Option, +} + +impl WSMockData { + pub fn get(data: String) -> Self { + Self { data, delay: None } + } +} + +pub struct MockWebsocket; + +impl MockWebsocket { + pub async fn start( + send_data: Vec, + recv_data: Vec, + result: mpsc::Sender, + on_close: bool, + ) -> u32 { + let _ = init_logger("mock websocket tests".to_owned()); + let mut port: u32 = 34743; + + loop { + let url = format!("127.0.0.1:{}", port); + match TcpListener::bind(&url).await { + Ok(l) => { + tokio::spawn(async move { + if let Ok((stream, _)) = l.accept().await { + tokio::spawn(Self::accept_connection( + stream, send_data, recv_data, result, on_close, + )); + } + }); + break; + } + Err(_) => port += 1, + } + } + + port + } + + async fn accept_connection( + stream: TcpStream, + send_data: Vec, + recv_data: Vec, + result: mpsc::Sender, + on_close: bool, + ) { + let addr = stream + .peer_addr() + .expect("connected streams should have a peer address"); + debug!("Peer address: {}", addr); + + let ws_stream = tokio_tungstenite::accept_async(stream) + .await + .expect("Error during the websocket handshake occurred"); + + debug!("New WebSocket connection: {}", addr); + + let (mut write, mut read) = ws_stream.split(); + + for send in send_data { + if let Some(d) = send.delay { + sleep(Duration::from_millis(d)).await; + } + write + .send(tokio_tungstenite::tungstenite::Message::Text(send.data)) + .await + .unwrap(); + write.flush().await.unwrap(); + } + + if recv_data.is_empty() && on_close { + while let Some(Ok(v)) = read.next().await { + if let tokio_tungstenite::tungstenite::Message::Close(_) = v { + result.send(true).await.unwrap(); + } + } + } else { + for r in recv_data { + let value = read.next().await.unwrap().unwrap(); + if let tokio_tungstenite::tungstenite::Message::Text(v) = value { + if !r.data.eq_ignore_ascii_case(&v) { + result.send(false).await.unwrap(); + return; + } + } else if let tokio_tungstenite::tungstenite::Message::Close(_) = value { + if on_close { + result.send(true).await.unwrap(); + } + } + } + } + + write.close().await.unwrap(); + if !on_close { + result.send(true).await.unwrap(); + } + } +} diff --git a/core/sdk/src/api/gateway/rpc_gateway_api.rs b/core/sdk/src/api/gateway/rpc_gateway_api.rs index 0892989eb..bb8b87a0e 100644 --- a/core/sdk/src/api/gateway/rpc_gateway_api.rs +++ b/core/sdk/src/api/gateway/rpc_gateway_api.rs @@ -19,6 +19,7 @@ use log::debug; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; use crate::{ api::firebolt::{fb_general::ListenRequest, fb_openrpc::FireboltOpenRpcMethod}, @@ -176,6 +177,17 @@ pub struct JsonRpcApiRequest { pub params: Option, } +impl JsonRpcApiRequest { + pub fn new(method: String, params: Option) -> Self { + Self { + jsonrpc: "2.0".to_owned(), + id: None, + method, + params, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JsonRpcApiResponse { pub jsonrpc: String, @@ -323,6 +335,12 @@ impl RpcRequest { false } + pub fn get_unsubscribe(&self) -> RpcRequest { + let mut rpc_request = self.clone(); + rpc_request.params_json = serde_json::to_string(&ListenRequest { listen: false }).unwrap(); + rpc_request + } + pub fn get_params(&self) -> Option { if let Ok(mut v) = serde_json::from_str::>(&self.params_json) { if v.len() > 1 { @@ -331,6 +349,25 @@ impl RpcRequest { } None } + + pub fn get_new_internal(method: String, params: Option) -> Self { + let ctx = CallContext::new( + Uuid::new_v4().to_string(), + Uuid::new_v4().to_string(), + "internal".into(), + 1, + crate::api::gateway::rpc_gateway_api::ApiProtocol::Extn, + method.clone(), + None, + false, + ); + let request = serde_json::to_value(JsonRpcApiRequest::new(method.clone(), params)).unwrap(); + RpcRequest { + params_json: Self::prepend_ctx(Some(request), &ctx), + ctx, + method, + } + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -366,6 +403,7 @@ mod tests { use super::*; use crate::api::gateway::rpc_gateway_api::{ApiProtocol, CallContext}; use crate::utils::test_utils::test_extn_payload_provider; + use crate::Mockable; #[test] fn test_caller_session_from_call_context() { @@ -655,4 +693,17 @@ mod tests { let error_code = result.unwrap(); assert_eq!(error_code, None); } + + #[test] + fn new_json_rpc_methods() { + let request = JsonRpcApiRequest::new("some_method".to_owned(), None); + assert!(request.method.eq("some_method")); + } + + #[test] + fn test_rpc_unsubscribe() { + let new = RpcRequest::mock().get_unsubscribe(); + let request = serde_json::from_str::(&new.params_json).unwrap(); + assert!(!request.listen); + } } diff --git a/core/sdk/src/extn/client/extn_client.rs b/core/sdk/src/extn/client/extn_client.rs index 057a136fd..552c9ef3a 100644 --- a/core/sdk/src/extn/client/extn_client.rs +++ b/core/sdk/src/extn/client/extn_client.rs @@ -23,6 +23,7 @@ use std::{ use async_channel::{bounded, Receiver as CReceiver, Sender as CSender}; use chrono::Utc; +use log::warn; #[cfg(not(test))] use log::{debug, error, info, trace}; @@ -452,12 +453,18 @@ impl ExtnClient { ) { let id_c = msg.target.as_clear_string(); let mut gc_sender_indexes: Vec = Vec::new(); + let mut processed = false; let read_processor = processor.clone(); { let processors = read_processor.read().unwrap(); - let v = processors.get(&id_c).cloned(); + let v = if let Some(id_proc) = processors.get(&msg.id).cloned() { + Some(id_proc) + } else { + processors.get(&id_c).cloned() + }; if let Some(v) = v { for (index, s) in v.iter().enumerate() { + processed = true; if !s.is_closed() { let sndr = s.clone(); let m = msg.clone(); @@ -470,10 +477,13 @@ impl ExtnClient { gc_sender_indexes.push(index); } } + } else { + warn!("No valid processors for the event {:?}", msg) } }; - if RippleContext::is_ripple_context(&msg.payload).is_none() { + // Print this message only for messages which were not processed + if !processed && RippleContext::is_ripple_context(&msg.payload).is_none() { // Not every extension will have a context listener error!("No Event Processor for {:?}", msg); } @@ -596,6 +606,37 @@ impl ExtnClient { Err(RippleError::ExtnError) } + /// Subscribe method which accepts a impl [ExtnPayloadProvider] and a [MSender] + /// As part of the subscription process it will automatically callback the Sender with the value recieved from the Publisher + pub async fn subscribe( + &mut self, + payload: impl ExtnPayloadProvider, + sender: MSender, + ) -> Result { + let id = self.request_transient(payload)?; + add_vec_stream_processor(id.clone(), sender, self.event_processors.clone()); + Ok(id) + } + + // Unsubscribe is an antonym for subscribe accepts a impl [ExtnPayLoadProvider] and a [MSender, + ) -> RippleResponse { + remove_processor(id, self.event_processors.clone()); + if sender.is_closed() { + // send the unsubscription payload to the subscriber + self.request_transient(payload)?; + Ok(()) + } else { + // if sender is closed automatically it gets cleaned up from event processors + error!("Expect sender to be closed before unsubscription"); + Err(RippleError::InvalidInput) + } + } + pub async fn main_internal_request( &mut self, payload: impl ExtnPayloadProvider, @@ -682,10 +723,15 @@ impl ExtnClient { /// /// # Arguments /// `payload` - impl [ExtnPayloadProvider] - pub fn request_transient(&self, payload: impl ExtnPayloadProvider) -> RippleResponse { + pub fn request_transient( + &self, + payload: impl ExtnPayloadProvider, + ) -> Result { let id = uuid::Uuid::new_v4().to_string(); let other_sender = self.get_extn_sender_with_contract(payload.get_contract()); - self.sender.send_request(id, payload, other_sender, None) + self.sender + .send_request(id.clone(), payload, other_sender, None)?; + Ok(id) } pub fn get_stack_size(&self) -> Option { @@ -805,17 +851,20 @@ pub mod tests { }, extn_sender::tests::Mockable as extn_sender_mockable, }, - extn_client_message::{ExtnPayload, ExtnRequest}, + extn_client_message::{ExtnEvent, ExtnPayload, ExtnRequest}, extn_id::{ExtnClassId, ExtnId}, }, - utils::mock_utils::{get_mock_extn_client, MockEvent, MockRequest}, + utils::{ + logger::init_logger, + mock_utils::{get_mock_extn_client, MockEvent, MockRequest}, + }, }; use async_channel::unbounded; use core::panic; use rstest::rstest; use std::collections::HashMap; use testing_logger::{self, validate}; - use tokio::sync::oneshot; + use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; use uuid::Uuid; @@ -2297,4 +2346,44 @@ pub mod tests { let result = extn_client.get_timezone(); assert_eq!(result, Some(test_timezone)); } + + #[tokio::test] + async fn test_subscription() { + let _ = init_logger("subscription_test".to_owned()); + let mut extn_client = ExtnClient::mock(); + let (tx, mut tr) = mpsc::channel(1); + let request = MockRequest { + app_id: "test_app_id".to_string(), + contract: RippleContract::Internal, + expected_response: None, + }; + let id = extn_client.subscribe(request.clone(), tx).await.unwrap(); + + let msg = ExtnMessage { + id: id.clone(), + requestor: ExtnId::get_main_target("main".into()), + target: RippleContract::Internal, + target_id: None, + payload: request.get_extn_payload(), + callback: None, + ts: Some(Utc::now().timestamp_millis()), + }; + + let event = msg.get_event(ExtnEvent::String("some".to_owned())).unwrap(); + let _ = extn_client.send_message(event).await; + tokio::spawn(async move { + extn_client.initialize().await; + }); + let r = tokio::time::timeout(Duration::from_secs(2), tr.recv()) + .await + .unwrap() + .unwrap(); + + if let Some(ExtnEvent::String(s)) = r.payload.extract() { + assert!(s.eq("some")) + } else { + // more readable and can help detect panic! definitions in code + unreachable!() + } + } } diff --git a/core/sdk/src/extn/extn_client_message.rs b/core/sdk/src/extn/extn_client_message.rs index 770986b81..d66926eb6 100644 --- a/core/sdk/src/extn/extn_client_message.rs +++ b/core/sdk/src/extn/extn_client_message.rs @@ -125,6 +125,28 @@ impl ExtnMessage { } } + /// This method can be used to create [ExtnEvent] payload message from a given [ExtnRequest] + /// payload. + /// + /// Note: If used in a processor this method can be safely unwrapped + pub fn get_event(&self, event: ExtnEvent) -> Result { + match self.payload { + ExtnPayload::Request(_) => Ok(ExtnMessage { + callback: self.callback.clone(), + id: self.id.clone(), + payload: ExtnPayload::Event(event), + requestor: self.requestor.clone(), + target: self.target.clone(), + target_id: self.target_id.clone(), + ts: None, + }), + _ => { + error!("can only event for a request message"); + Err(RippleError::InvalidInput) + } + } + } + pub fn ack(&self) -> ExtnMessage { ExtnMessage { id: self.id.clone(), @@ -402,6 +424,7 @@ mod tests { extn_id::ExtnClassId, }; use rstest::rstest; + use serde_json::json; #[test] fn test_extract_response() { @@ -570,4 +593,23 @@ mod tests { let expected_contract = RippleContract::Internal; assert_eq!(ExtnEvent::contract(), expected_contract); } + + #[test] + fn test_get_event() { + // Create a sample ExtnMessage for testing + let original_message = ExtnMessage { + id: "test_id".to_string(), + requestor: ExtnId::get_main_target("main".into()), + target: RippleContract::Internal, + target_id: Some(ExtnId::get_main_target("main".into())), + payload: ExtnPayload::Request(ExtnRequest::Config(Config::DefaultName)), + callback: None, + ts: Some(1234567890), + }; + let event_payload = ExtnEvent::Value(json!(1)); + let value = original_message.get_event(event_payload.clone()).unwrap(); + let extn_event_payload = ExtnPayload::Event(event_payload); + assert!(value.id.eq_ignore_ascii_case("test_id")); + assert!(value.payload.eq(&extn_event_payload)); + } }