diff --git a/core/main/src/broker/http_broker.rs b/core/main/src/broker/http_broker.rs index df3fffecf..c5be0a6e0 100644 --- a/core/main/src/broker/http_broker.rs +++ b/core/main/src/broker/http_broker.rs @@ -15,78 +15,154 @@ // SPDX-License-Identifier: Apache-2.0 // -use hyper::{Body, Client, Method, Request, Uri}; +use std::vec; + +use hyper::{client::HttpConnector, Body, Client, Method, Request, Response, Uri}; use ripple_sdk::{ - log::error, + log::{debug, error, trace}, tokio::{self, sync::mpsc}, + utils::error::RippleError, }; +use tokio_tungstenite::tungstenite::http::uri::InvalidUri; + use super::endpoint_broker::{ - BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutputForwarder, BrokerSender, - EndpointBroker, + BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutputForwarder, BrokerRequest, + BrokerSender, EndpointBroker, }; pub struct HttpBroker { sender: BrokerSender, cleaner: BrokerCleaner, } +/* + +*/ + +async fn send_http_request( + client: &Client, + method: Method, + uri: &Uri, + path: &str, +) -> Result, RippleError> { + /* + TODO? we may need to support body for POST request in the future + */ + let http_request = Request::new(Body::empty()); + let (mut parts, _) = http_request.into_parts(); + //TODO, need to refactor to support other methods + parts.method = method.clone(); + /* + mix endpoint url with method + */ + /* + TODONT: unwraps are bad, need to handle errors + */ + + let uri: Uri = format!("{}{}", uri, path) + .parse() + .map_err(|e: InvalidUri| RippleError::BrokerError(e.to_string()))?; + let new_request = Request::builder() + .uri(uri) + .body(Body::empty()) + .map_err(|e| RippleError::BrokerError(e.to_string()))?; + let (uri_parts, _) = new_request.into_parts(); + + parts.uri = uri_parts.uri; + + let http_request = Request::from_parts(parts, Body::empty()); + + debug!( + "http_broker sending {} request={}", + method, + http_request.uri(), + ); + match client.request(http_request).await { + Ok(v) => Ok(v), + Err(e) => { + error!("Error in server"); + Err(RippleError::BrokerError(e.to_string())) + } + } +} +async fn send_broker_response(callback: &BrokerCallback, request: &BrokerRequest, body: &[u8]) { + match BrokerOutputForwarder::handle_non_jsonrpc_response( + body, + callback.clone(), + request.clone(), + ) { + Ok(_) => {} + Err(e) => { + error!("Error message from http broker {:?}", e) + } + } +} +fn error_string_to_json(msg: &str) -> serde_json::Value { + serde_json::json!({ + "error": msg + }) +} +async fn body_to_bytes(body: Body) -> Vec { + match hyper::body::to_bytes(body).await { + Ok(bytes) => { + let value: Vec = bytes.into(); + value.as_slice().to_vec() + } + Err(e) => format!("error in http broker transforming body to bytes {}", e) + .to_string() + .as_bytes() + .to_vec(), + } +} impl EndpointBroker for HttpBroker { fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { let endpoint = request.endpoint.clone(); let (tx, mut tr) = mpsc::channel(10); let broker = BrokerSender { sender: tx }; - let is_json_rpc = endpoint.jsonrpc; - let uri: Uri = endpoint.get_url().parse().unwrap(); - // let mut headers = HeaderMap::new(); - // headers.insert("Content-Type", "application/json".parse().unwrap()); - // if let Some(auth) = &endpoint.authentication { - // if auth.to_lowercase().contains("bearer") { - // if let Some(session) = session { - // headers.insert( - // "Authorization", - // format!("Bearer {}", session.token).parse().unwrap(), - // ); - // } - // } - // } - let client = Client::new(); - tokio::spawn(async move { + let _ = endpoint.get_url().parse().map_err(|e| error!("broker url {:?} in endpoint is invalid, cannot start http broker. error={}",endpoint,e) ).map(|uri| tokio::spawn(async move { while let Some(request) = tr.recv().await { - if let Ok(broker_request) = Self::update_request(&request) { - let body = Body::from(broker_request); - let http_request = Request::new(body); - let (mut parts, body) = http_request.into_parts(); - parts.method = Method::POST; - parts.uri = uri.clone(); - //parts.headers = headers.clone(); - - let http_request = Request::from_parts(parts, body); - if let Ok(v) = client.request(http_request).await { - let (parts, body) = v.into_parts(); - if !parts.status.is_success() { - error!("Error in server"); - } - if let Ok(bytes) = hyper::body::to_bytes(body).await { - let value: Vec = bytes.into(); - let value = value.as_slice(); - if is_json_rpc { - Self::handle_jsonrpc_response(value, callback.clone()); - } else if let Err(e) = - BrokerOutputForwarder::handle_non_jsonrpc_response( - value, - callback.clone(), - request.clone(), - ) - { - error!("Error forwarding {:?}", e) + debug!("http broker received request={:?}", request); + match send_http_request(&client, Method::GET, &uri, &request.clone().rule.alias) + .await + { + Ok(response) => { + let (parts, body) = response.into_parts(); + let body = body_to_bytes(body).await; + let mut request = request; + if let Ok(json_str) = serde_json::from_slice::(&body).map(|v| vec![v]) + .and_then(|v| serde_json::to_string(&v)) + { + request.rpc.params_json = json_str; + let response = Self::update_request(&request); + trace!( + "http broker response={:?} to request: {:?} using rule={:?}", + response, request, request.rule + ); + + send_broker_response(&callback, &request, &body).await; + if !parts.status.is_success() { + error!( + "http error {} returned from http service in http broker {:?}", + parts.status, body + ); } + } else { + let msg = format!("Error in http broker parsing response from http service at {}. status={:?}",uri, parts.status); + error!("{}",msg); + send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await; } } + Err(err) => { + let msg = format!("An error message from calling the downstream http service={} in http broker {:?}", uri, err); + error!("{}",msg); + send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await; + } } } - }); + })); + Self { sender: broker, cleaner: BrokerCleaner { cleaner: None }, diff --git a/core/sdk/src/utils/error.rs b/core/sdk/src/utils/error.rs index a5e31c166..8d11ae733 100644 --- a/core/sdk/src/utils/error.rs +++ b/core/sdk/src/utils/error.rs @@ -39,6 +39,7 @@ pub enum RippleError { NotAvailable, RuleError, ServiceNotReady, + BrokerError(String), } impl std::fmt::Display for RippleError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -61,6 +62,10 @@ impl std::fmt::Display for RippleError { RippleError::NotAvailable => write!(f, "NotAvailable"), RippleError::RuleError => write!(f, "RuleError"), RippleError::ServiceNotReady => write!(f, "ServiceNotReady"), + RippleError::BrokerError(msg) => { + let msg = format!("BrokerError {}", msg); + write!(f, "{}", msg) + } } } }