Skip to content

Commit

Permalink
fixes: http_broker bugfixes (#606)
Browse files Browse the repository at this point in the history
* fixes: http_broker bugfixes

* pr: feedback

* pr: feedback
  • Loading branch information
brendanobra committed Aug 26, 2024
1 parent 0206187 commit 465e394
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 48 deletions.
172 changes: 124 additions & 48 deletions core/main/src/broker/http_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,78 +15,154 @@
// SPDX-License-Identifier: Apache-2.0
//

use hyper::{Body, Client, Method, Request, Uri};
use std::vec;

use hyper::{client::HttpConnector, Body, Client, Method, Request, Response, Uri};
use ripple_sdk::{
log::error,
log::{debug, error, trace},
tokio::{self, sync::mpsc},
utils::error::RippleError,
};

use tokio_tungstenite::tungstenite::http::uri::InvalidUri;

use super::endpoint_broker::{
BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutputForwarder, BrokerSender,
EndpointBroker,
BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutputForwarder, BrokerRequest,
BrokerSender, EndpointBroker,
};

pub struct HttpBroker {
sender: BrokerSender,
cleaner: BrokerCleaner,
}
/*
*/

async fn send_http_request(
client: &Client<HttpConnector>,
method: Method,
uri: &Uri,
path: &str,
) -> Result<Response<Body>, RippleError> {
/*
TODO? we may need to support body for POST request in the future
*/
let http_request = Request::new(Body::empty());
let (mut parts, _) = http_request.into_parts();
//TODO, need to refactor to support other methods
parts.method = method.clone();
/*
mix endpoint url with method
*/
/*
TODONT: unwraps are bad, need to handle errors
*/

let uri: Uri = format!("{}{}", uri, path)
.parse()
.map_err(|e: InvalidUri| RippleError::BrokerError(e.to_string()))?;
let new_request = Request::builder()
.uri(uri)
.body(Body::empty())
.map_err(|e| RippleError::BrokerError(e.to_string()))?;
let (uri_parts, _) = new_request.into_parts();

parts.uri = uri_parts.uri;

let http_request = Request::from_parts(parts, Body::empty());

debug!(
"http_broker sending {} request={}",
method,
http_request.uri(),
);
match client.request(http_request).await {
Ok(v) => Ok(v),
Err(e) => {
error!("Error in server");
Err(RippleError::BrokerError(e.to_string()))
}
}
}
async fn send_broker_response(callback: &BrokerCallback, request: &BrokerRequest, body: &[u8]) {
match BrokerOutputForwarder::handle_non_jsonrpc_response(
body,
callback.clone(),
request.clone(),
) {
Ok(_) => {}
Err(e) => {
error!("Error message from http broker {:?}", e)
}
}
}
fn error_string_to_json(msg: &str) -> serde_json::Value {
serde_json::json!({
"error": msg
})
}
async fn body_to_bytes(body: Body) -> Vec<u8> {
match hyper::body::to_bytes(body).await {
Ok(bytes) => {
let value: Vec<u8> = bytes.into();
value.as_slice().to_vec()
}
Err(e) => format!("error in http broker transforming body to bytes {}", e)
.to_string()
.as_bytes()
.to_vec(),
}
}

impl EndpointBroker for HttpBroker {
fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self {
let endpoint = request.endpoint.clone();
let (tx, mut tr) = mpsc::channel(10);
let broker = BrokerSender { sender: tx };
let is_json_rpc = endpoint.jsonrpc;
let uri: Uri = endpoint.get_url().parse().unwrap();
// let mut headers = HeaderMap::new();
// headers.insert("Content-Type", "application/json".parse().unwrap());
// if let Some(auth) = &endpoint.authentication {
// if auth.to_lowercase().contains("bearer") {
// if let Some(session) = session {
// headers.insert(
// "Authorization",
// format!("Bearer {}", session.token).parse().unwrap(),
// );
// }
// }
// }

let client = Client::new();
tokio::spawn(async move {
let _ = endpoint.get_url().parse().map_err(|e| error!("broker url {:?} in endpoint is invalid, cannot start http broker. error={}",endpoint,e) ).map(|uri| tokio::spawn(async move {
while let Some(request) = tr.recv().await {
if let Ok(broker_request) = Self::update_request(&request) {
let body = Body::from(broker_request);
let http_request = Request::new(body);
let (mut parts, body) = http_request.into_parts();
parts.method = Method::POST;
parts.uri = uri.clone();
//parts.headers = headers.clone();

let http_request = Request::from_parts(parts, body);
if let Ok(v) = client.request(http_request).await {
let (parts, body) = v.into_parts();
if !parts.status.is_success() {
error!("Error in server");
}
if let Ok(bytes) = hyper::body::to_bytes(body).await {
let value: Vec<u8> = bytes.into();
let value = value.as_slice();
if is_json_rpc {
Self::handle_jsonrpc_response(value, callback.clone());
} else if let Err(e) =
BrokerOutputForwarder::handle_non_jsonrpc_response(
value,
callback.clone(),
request.clone(),
)
{
error!("Error forwarding {:?}", e)
debug!("http broker received request={:?}", request);
match send_http_request(&client, Method::GET, &uri, &request.clone().rule.alias)
.await
{
Ok(response) => {
let (parts, body) = response.into_parts();
let body = body_to_bytes(body).await;
let mut request = request;
if let Ok(json_str) = serde_json::from_slice::<serde_json::Value>(&body).map(|v| vec![v])
.and_then(|v| serde_json::to_string(&v))
{
request.rpc.params_json = json_str;
let response = Self::update_request(&request);
trace!(
"http broker response={:?} to request: {:?} using rule={:?}",
response, request, request.rule
);

send_broker_response(&callback, &request, &body).await;
if !parts.status.is_success() {
error!(
"http error {} returned from http service in http broker {:?}",
parts.status, body
);
}
} else {
let msg = format!("Error in http broker parsing response from http service at {}. status={:?}",uri, parts.status);
error!("{}",msg);
send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await;
}
}
Err(err) => {
let msg = format!("An error message from calling the downstream http service={} in http broker {:?}", uri, err);
error!("{}",msg);
send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await;
}
}
}
});
}));

Self {
sender: broker,
cleaner: BrokerCleaner { cleaner: None },
Expand Down
5 changes: 5 additions & 0 deletions core/sdk/src/utils/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub enum RippleError {
NotAvailable,
RuleError,
ServiceNotReady,
BrokerError(String),
}
impl std::fmt::Display for RippleError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -61,6 +62,10 @@ impl std::fmt::Display for RippleError {
RippleError::NotAvailable => write!(f, "NotAvailable"),
RippleError::RuleError => write!(f, "RuleError"),
RippleError::ServiceNotReady => write!(f, "ServiceNotReady"),
RippleError::BrokerError(msg) => {
let msg = format!("BrokerError {}", msg);
write!(f, "{}", msg)
}
}
}
}
Expand Down

0 comments on commit 465e394

Please sign in to comment.