From d15bf3e95a19d18c4c03ab90fb661fe333ccac11 Mon Sep 17 00:00:00 2001 From: VinodSathyaseelan Date: Wed, 26 Jun 2024 22:18:06 -0700 Subject: [PATCH] fix: Code cleanup --- .../thunder/thunder_plugins_status_mgr.rs | 319 +++++++++--------- 1 file changed, 160 insertions(+), 159 deletions(-) diff --git a/core/main/src/broker/thunder/thunder_plugins_status_mgr.rs b/core/main/src/broker/thunder/thunder_plugins_status_mgr.rs index 85b372a33..ccdec1f9b 100644 --- a/core/main/src/broker/thunder/thunder_plugins_status_mgr.rs +++ b/core/main/src/broker/thunder/thunder_plugins_status_mgr.rs @@ -21,7 +21,7 @@ use std::{ }; use ripple_sdk::{ - api::gateway::rpc_gateway_api::JsonRpcApiResponse, log::info, tokio, utils::error::RippleError, + api::gateway::rpc_gateway_api::JsonRpcApiResponse, log::info, utils::error::RippleError, }; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -107,7 +107,7 @@ impl State { #[derive(Debug, Clone)] pub struct ThunderPluginState { pub state: State, - pub last_seen: Duration, + pub last_attempt_time: Duration, pub pending_requests: Vec, } #[derive(Debug, Clone)] @@ -145,7 +145,7 @@ impl StatusManager { plugin_name, ThunderPluginState { state, - last_seen: Duration::from_secs(0), + last_attempt_time: Duration::from_secs(0), pending_requests: Vec::new(), }, ); @@ -161,14 +161,14 @@ impl StatusManager { plugin_name.clone(), ThunderPluginState { state: State::Unknown, - last_seen: Duration::from_secs(0), + last_attempt_time: Duration::from_secs(0), pending_requests: vec![request], }, ); } // update the last seen time if let Some(plugin_state) = status.get_mut(&plugin_name) { - plugin_state.last_seen = Duration::from_secs(0); + plugin_state.last_attempt_time = Duration::from_secs(0); } } @@ -180,7 +180,7 @@ impl StatusManager { let pending_requests = plugin_state.pending_requests.clone(); plugin_state.pending_requests.clear(); // check if the activation time has expired. - if plugin_state.last_seen.as_secs() > DEFAULT_PLUGIN_ACTIVATION_TIMEOUT { + if plugin_state.last_attempt_time.as_secs() > DEFAULT_PLUGIN_ACTIVATION_TIMEOUT { return (pending_requests, true); } else { return (pending_requests, false); @@ -280,52 +280,55 @@ impl StatusManager { callback: BrokerCallback, result: &[u8], ) -> bool { - if let Ok(data) = serde_json::from_slice::(result) { - if let Some(method) = data.method { - if method == "client.events.1.statechange" { - // intercept the statechange event and update plugin status. - if let Some(params) = data.params { - let event: Result = - serde_json::from_value(params); - if let Ok(event) = event { - self.update_status(event.callsign.clone(), event.state.clone()); - if event.state.is_activated() { - // get the pending BrokerRequest and process. - let (pending_requests, expired) = - self.retrive_pending_request(event.callsign); - if !pending_requests.is_empty() { - for pending_request in pending_requests { - if expired { - info!("Expired request: {:?}", pending_request); - callback - .send_error( - pending_request, - RippleError::ServiceError, - ) - .await; - } else { - let _ = sender.send(pending_request).await; - } - } - } + let data = match serde_json::from_slice::(result) { + Ok(data) => data, + Err(_) => return false, + }; + + if let Some(method) = data.method { + if method == "client.events.1.statechange" { + // intercept the statechange event and update plugin status. + let params = match data.params { + Some(params) => params, + None => return false, + }; + + let event: StateChangeEvent = match serde_json::from_value(params) { + Ok(event) => event, + Err(_) => return false, + }; + + self.update_status(event.callsign.clone(), event.state.clone()); + + if event.state.is_activated() { + // get the pending BrokerRequest and process. + let (pending_requests, expired) = self.retrive_pending_request(event.callsign); + if !pending_requests.is_empty() { + for pending_request in pending_requests { + if expired { + info!("Expired request: {:?}", pending_request); + callback + .send_error(pending_request, RippleError::ServiceError) + .await; + } else { + let _ = sender.send(pending_request).await; } } } - // return false from here so that other subscribers can also process the response - return false; } + + // return false from here so that other subscribers can also process the response + return false; } - if let Some(id) = data.id { - let active_plugins_request = self.active_plugins_request.read().unwrap(); - active_plugins_request.contains_key(&id) - } else { - false - } - } else { - false } - } + if let Some(id) = data.id { + let active_plugins_request = self.active_plugins_request.read().unwrap(); + return active_plugins_request.contains_key(&id); + } + + false + } async fn on_activate_response( &self, sender: BrokerSender, @@ -333,31 +336,33 @@ impl StatusManager { data: &JsonRpcApiResponse, request: &str, ) { - if let Some(result) = &data.result { - if let Some(callsign) = request.split("callsign\":").last() { - let plugin_name = callsign.trim_matches(|c| c == '"' || c == '}'); - // get the pending BrokerRequest and process. - let (pending_requests, expired) = - self.retrive_pending_request(plugin_name.to_string()); - if result.is_null() { - self.update_status(plugin_name.to_string(), State::Activated); - if !pending_requests.is_empty() { - for pending_request in pending_requests { - if expired { - info!("Expired request: {:?}", pending_request); - callback - .send_error(pending_request, RippleError::ServiceError) - .await; - } else { - let _ = sender.send(pending_request).await; - } - } - } - } else if let Some(_e) = &data.error { - Self::on_thunder_error_response(self, callback, data, &plugin_name.to_string()) + let result = match &data.result { + Some(result) => result, + None => return, + }; + + let callsign = match request.split("callsign\":").last() { + Some(callsign) => callsign.trim_matches(|c| c == '"' || c == '}'), + None => return, + }; + + let (pending_requests, expired) = self.retrive_pending_request(callsign.to_string()); + + if result.is_null() { + self.update_status(callsign.to_string(), State::Activated); + + for pending_request in pending_requests { + if expired { + info!("Expired request: {:?}", pending_request); + callback + .send_error(pending_request, RippleError::ServiceError) .await; + } else { + let _ = sender.send(pending_request).await; } } + } else if let Some(_e) = &data.error { + Self::on_thunder_error_response(self, callback, data, &callsign.to_string()).await; } } @@ -368,53 +373,42 @@ impl StatusManager { data: &JsonRpcApiResponse, request: &str, ) { - // handle status response - if let Some(result) = &data.result { - if let Some(callsign) = request.split('@').last() { - let plugin_name = callsign.trim_matches(|c| c == '"' || c == '}'); - let status_res: Result, serde_json::Error> = - serde_json::from_value(result.clone()); - match status_res { - Ok(status_res) => { - for status in status_res { - if status.callsign == plugin_name { - self.update_status( - plugin_name.to_string(), - status.to_plugin_state(), - ); - - if status.to_plugin_state().is_activated() { - // get the pending BrokerRequest and process. - let (pending_requests, expired) = - self.retrive_pending_request(plugin_name.to_string()); - if !pending_requests.is_empty() { - for pending_request in pending_requests { - if expired { - info!("Expired request: {:?}", pending_request); - callback - .send_error( - pending_request, - RippleError::ServiceError, - ) - .await; - } else { - let _ = sender.send(pending_request).await; - } - } - } - } - } + let result = match &data.result { + Some(result) => result, + None => return, + }; + + let callsign = match request.split('@').last() { + Some(callsign) => callsign.trim_matches(|c| c == '"' || c == '}'), + None => return, + }; + + let status_res: Vec = match serde_json::from_value(result.clone()) { + Ok(status_res) => status_res, + Err(_) => { + Self::on_thunder_error_response(self, callback, data, &callsign.to_string()).await; + return; + } + }; + + for status in status_res { + if status.callsign == callsign { + self.update_status(callsign.to_string(), status.to_plugin_state()); + + if status.to_plugin_state().is_activated() { + let (pending_requests, expired) = + self.retrive_pending_request(callsign.to_string()); + + for pending_request in pending_requests { + if expired { + info!("Expired request: {:?}", pending_request); + callback + .send_error(pending_request, RippleError::ServiceError) + .await; + } else { + let _ = sender.send(pending_request).await; } } - Err(_e) => { - Self::on_thunder_error_response( - self, - callback, - data, - &plugin_name.to_string(), - ) - .await; - } } } } @@ -426,26 +420,31 @@ impl StatusManager { data: &JsonRpcApiResponse, plugin_name: &String, ) { - if let Some(error) = &data.error { - info!( - "Error Received from Thunder on getting the status of the plugin : {:?}", - error - ); - if let Ok(error) = serde_json::from_value::(error.clone()) { - let state = error.get_plugin_state(); - self.update_status(plugin_name.to_string(), state.clone()); - if state.is_unavailable() { - // get the pending BrokerRequest and send error resposne. - let (pending_requests, _) = - self.retrive_pending_request(plugin_name.to_string()); - if !pending_requests.is_empty() { - for pending_request in pending_requests { - callback - .send_error(pending_request, RippleError::ServiceError) - .await; - } - } - } + let error = match &data.error { + Some(error) => error, + None => return, + }; + + info!( + "Error Received from Thunder on getting the status of the plugin: {:?}", + error + ); + + let thunder_error: ThunderError = match serde_json::from_value(error.clone()) { + Ok(error) => error, + Err(_) => return, + }; + + let state = thunder_error.get_plugin_state(); + self.update_status(plugin_name.to_string(), state.clone()); + + if state.is_unavailable() { + let (pending_requests, _) = self.retrive_pending_request(plugin_name.to_string()); + + for pending_request in pending_requests { + callback + .send_error(pending_request, RippleError::ServiceError) + .await; } } } @@ -461,34 +460,36 @@ impl StatusManager { callback: BrokerCallback, result: &[u8], ) { - if let Ok(data) = serde_json::from_slice::(result) { - if let Some(id) = data.id { - if let Some(request) = self.get_request_from_active_plugins_request(id) { - if request.contains("Controller.1.activate") { - // move this to spawned task - let state_mgr = StatusManager { - status: self.status.clone(), - active_plugins_request: self.active_plugins_request.clone(), - }; - //let request_c = request.clone(); - tokio::spawn(async move { - state_mgr - .on_activate_response(sender, callback, &data, &request) - .await; - }); - //Self::on_activate_response(self, sender, callback, &data, request).await; - } else if request.contains("Controller.1.status@") { - // handle status response - Self::on_status_response(self, sender, callback, &data, &request).await; - } else if request.contains("Controller.1.register") { - // nothing to do here - info!("StatusManger Received response for register request"); - } - } - let mut active_plugins_request = self.active_plugins_request.write().unwrap(); - active_plugins_request.remove(&id); - } + let data = match serde_json::from_slice::(result) { + Ok(data) => data, + Err(_) => return, + }; + + let id = match data.id { + Some(id) => id, + None => return, + }; + + let request = match self.get_request_from_active_plugins_request(id) { + Some(request) => request, + None => return, + }; + + if request.contains("Controller.1.activate") { + // handle activate response + self.on_activate_response(sender, callback, &data, &request) + .await; + } else if request.contains("Controller.1.status@") { + // handle status response + self.on_status_response(sender, callback, &data, &request) + .await; + } else if request.contains("Controller.1.register") { + // nothing to do here + info!("StatusManger Received response for register request"); } + + let mut active_plugins_request = self.active_plugins_request.write().unwrap(); + active_plugins_request.remove(&id); } }