Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http ws nonjsonrpc support #554

Merged
merged 22 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 215 additions & 90 deletions core/main/src/broker/endpoint_broker.rs

Large diffs are not rendered by default.

28 changes: 24 additions & 4 deletions core/main/src/broker/http_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@ use ripple_sdk::{
};

use super::{
endpoint_broker::{BrokerCallback, BrokerSender, EndpointBroker},
endpoint_broker::{
BrokerCallback, BrokerCleaner, BrokerOutputForwarder, BrokerSender, EndpointBroker,
},
rules_engine::RuleEndpoint,
};

pub struct HttpBroker {
sender: BrokerSender,
cleaner: BrokerCleaner,
}

impl EndpointBroker for HttpBroker {
fn get_broker(endpoint: RuleEndpoint, callback: BrokerCallback) -> Self {
let (tx, mut tr) = mpsc::channel(10);
let broker = BrokerSender { sender: tx };

let is_json_rpc = endpoint.jsonrpc;
let uri: Uri = endpoint.url.parse().unwrap();
// let mut headers = HeaderMap::new();
// headers.insert("Content-Type", "application/json".parse().unwrap());
Expand Down Expand Up @@ -69,16 +72,33 @@ impl EndpointBroker for HttpBroker {
if let Ok(bytes) = hyper::body::to_bytes(body).await {
let value: Vec<u8> = bytes.into();
let value = value.as_slice();
Self::handle_response(value, callback.clone());
if is_json_rpc {
Self::handle_jsonrpc_response(value, callback.clone());
} else if let Err(e) =
BrokerOutputForwarder::handle_non_jsonrpc_response(
value,
callback.clone(),
request.clone(),
)
{
error!("Error forwarding {:?}", e)
}
}
}
}
}
});
Self { sender: broker }
Self {
sender: broker,
cleaner: BrokerCleaner { cleaner: None },
}
}

fn get_sender(&self) -> BrokerSender {
self.sender.clone()
}

fn get_cleaner(&self) -> BrokerCleaner {
self.cleaner.clone()
}
}
6 changes: 6 additions & 0 deletions core/main/src/broker/rules_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ impl RuleSet {
pub struct RuleEndpoint {
pub protocol: RuleEndpointProtocol,
pub url: String,
#[serde(default = "default_autostart")]
pub jsonrpc: bool,
}

fn default_autostart() -> bool {
true
}

#[derive(Deserialize, Debug, Clone)]
Expand Down
36 changes: 32 additions & 4 deletions core/main/src/broker/thunder_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use crate::utils::rpc_utils::extract_tcp_port;
// SPDX-License-Identifier: Apache-2.0
//
use super::{
endpoint_broker::{BrokerCallback, BrokerOutput, BrokerRequest, BrokerSender, EndpointBroker},
endpoint_broker::{
BrokerCallback, BrokerCleaner, BrokerOutput, BrokerRequest, BrokerSender, EndpointBroker,
},
rules_engine::RuleEndpoint,
};
use futures_util::{SinkExt, StreamExt};
Expand All @@ -40,18 +42,24 @@ use tokio_tungstenite::client_async;
pub struct ThunderBroker {
sender: BrokerSender,
subscription_map: Arc<RwLock<HashMap<String, Vec<BrokerRequest>>>>,
cleaner: BrokerCleaner,
}

impl ThunderBroker {
fn start(endpoint: RuleEndpoint, callback: BrokerCallback) -> Self {
let (tx, mut tr) = mpsc::channel(10);
let (c_tx, mut c_tr) = mpsc::channel(2);
let sender = BrokerSender { sender: tx };
let subscription_map = Arc::new(RwLock::new(HashMap::new()));
let broker = Self {
sender,
subscription_map,
cleaner: BrokerCleaner {
cleaner: Some(c_tx.clone()),
},
};
let broker_c = broker.clone();
let broker_for_cleanup = broker.clone();
let callback_for_sender = callback.clone();
tokio::spawn(async move {
info!("Broker Endpoint url {}", endpoint.url);
Expand Down Expand Up @@ -88,7 +96,7 @@ impl ThunderBroker {
Ok(v) => {
if let tokio_tungstenite::tungstenite::Message::Text(t) = v {
// send the incoming text without context back to the sender
Self::handle_response(t.as_bytes(),callback.clone())
Self::handle_jsonrpc_response(t.as_bytes(),callback.clone())
}
},
Err(e) => {
Expand All @@ -105,11 +113,27 @@ impl ThunderBroker {
debug!("Sending request to broker {:?}", updated_request);
for r in updated_request {
let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await;
let _flush = ws_tx.flush().await;
let _flush = ws_tx.flush().await;
}
}
Err(e) => callback_for_sender.send_error(request,e).await
}
},
Some(cleanup_request) = c_tr.recv() => {
let value = {
broker_for_cleanup.subscription_map.write().unwrap().remove(&cleanup_request)
};
if let Some(mut cleanup) = value {
let sender = broker_for_cleanup.get_sender();
while let Some(mut v) = cleanup.pop() {
v.rpc = v.rpc.get_unsubscribe();
if (sender.send(v).await).is_err() {
error!("Cleanup Error for {}",&cleanup_request);
}
}

}

}
}
}
Expand Down Expand Up @@ -178,6 +202,10 @@ impl EndpointBroker for ThunderBroker {
self.sender.clone()
}

fn get_cleaner(&self) -> BrokerCleaner {
self.cleaner.clone()
}

fn prepare_request(
&self,
rpc_request: &super::endpoint_broker::BrokerRequest,
Expand Down Expand Up @@ -236,7 +264,7 @@ impl EndpointBroker for ThunderBroker {

/// Default handler method for the broker to remove the context and send it back to the
/// client for consumption
fn handle_response(result: &[u8], callback: BrokerCallback) {
fn handle_jsonrpc_response(result: &[u8], callback: BrokerCallback) {
let mut final_result = Err(RippleError::ParseError);
if let Ok(data) = serde_json::from_slice::<JsonRpcApiResponse>(result) {
let updated_data = Self::update_response(&data);
Expand Down
Loading
Loading