Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPPL-2452: Firebolt Ripple JQ Rules: Support for static response data #607

Merged
merged 11 commits into from
Aug 23, 2024
66 changes: 49 additions & 17 deletions core/main/src/broker/endpoint_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl EndpointBrokerState {
rpc_request: &RpcRequest,
rule: Rule,
extn_message: Option<ExtnMessage>,
) -> BrokerRequest {
) -> (u64, BrokerRequest) {
let id = Self::get_next_id();
let mut rpc_request_c = rpc_request.clone();
{
Expand All @@ -346,7 +346,7 @@ impl EndpointBrokerState {
}

rpc_request_c.ctx.call_id = id;
BrokerRequest::new(&rpc_request_c, rule)
(id, BrokerRequest::new(&rpc_request_c, rule))
}

pub fn build_thunder_endpoint(&mut self) {
Expand Down Expand Up @@ -404,6 +404,23 @@ impl EndpointBrokerState {
}
}

fn handle_static_request(
&self,
rpc_request: RpcRequest,
extn_message: Option<ExtnMessage>,
rule: Rule,
callback: BrokerCallback,
) {
let (id, _updated_request) = self.update_request(&rpc_request, rule.clone(), extn_message);
let mut data = JsonRpcApiResponse::default();
SKumarMetro marked this conversation as resolved.
Show resolved Hide resolved
// return em[ty result and handle the rest with jq rule
let jv: Value = "".into();
data.result = Some(jv);
data.id = Some(id);
let output = BrokerOutput { data };
tokio::spawn(async move { callback.sender.send(output).await });
}

fn get_sender(&self, hash: &str) -> Option<BrokerSender> {
self.endpoint_map.read().unwrap().get(hash).cloned()
}
Expand All @@ -415,6 +432,7 @@ impl EndpointBrokerState {
rpc_request: RpcRequest,
extn_message: Option<ExtnMessage>,
) -> bool {
let mut handled: bool = true;
let callback = self.callback.clone();
let mut broker_sender = None;
let mut found_rule = None;
Expand All @@ -424,24 +442,34 @@ impl EndpointBrokerState {
if let Some(endpoint) = self.get_sender(&endpoint) {
let _ = broker_sender.insert(endpoint);
}
} else if let Some(endpoint) = self.get_sender("thunder") {
let _ = broker_sender.insert(endpoint);
} else if rule.alias != "static" {
if let Some(endpoint) = self.get_sender("thunder") {
let _ = broker_sender.insert(endpoint);
}
}
}

if broker_sender.is_none() || found_rule.is_none() {
return false;
}
let rule = found_rule.unwrap();
let broker = broker_sender.unwrap();
let updated_request = self.update_request(&rpc_request, rule, extn_message);
tokio::spawn(async move {
if let Err(e) = broker.send(updated_request.clone()).await {
// send some rpc error
callback.send_error(updated_request, e).await
if found_rule.is_some() {
let rule = found_rule.unwrap();

if rule.alias == "static" {
self.handle_static_request(rpc_request, extn_message, rule, callback);
} else if broker_sender.is_some() {
let broker = broker_sender.unwrap();
let (_, updated_request) = self.update_request(&rpc_request, rule, extn_message);
tokio::spawn(async move {
if let Err(e) = broker.send(updated_request.clone()).await {
callback.send_error(updated_request, e).await
}
});
} else {
handled = false;
}
});
true
} else {
handled = false;
}

handled
}

pub fn get_rule(&self, rpc_request: &RpcRequest) -> Option<Rule> {
Expand Down Expand Up @@ -653,6 +681,8 @@ impl BrokerOutputForwarder {
{
apply_response(v.data.clone(), filter, &rpc_request, &mut v);
}
} else {
trace!("start_forwarder: null result");
}

let request_id = rpc_request.ctx.call_id;
Expand Down Expand Up @@ -688,6 +718,8 @@ impl BrokerOutputForwarder {
)
.await
}
} else {
error!("start_forwarder:{} request not found {:?}", line!(), v);
}
} else {
error!("Error couldnt broker the event {:?}", v)
Expand Down Expand Up @@ -761,7 +793,7 @@ fn apply_response(
format!("{}_response", rpc_request.ctx.method),
) {
Ok(r) => {
ripple_sdk::log::trace!(
trace!(
"jq rendered output {:?} original input {:?} for filter {}",
r,
v,
Expand Down
13 changes: 13 additions & 0 deletions core/sdk/src/api/gateway/rpc_gateway_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,19 @@ pub struct JsonRpcApiResponse {
pub params: Option<Value>,
}

impl Default for JsonRpcApiResponse {
fn default() -> Self {
JsonRpcApiResponse {
id: None,
jsonrpc: "2.0".to_string(),
result: None,
error: None,
method: None,
params: None,
}
}
}

impl crate::Mockable for JsonRpcApiResponse {
fn mock() -> Self {
JsonRpcApiResponse {
Expand Down
Loading