Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Changes for Rule engine and better run script #580

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/main/src/broker/http_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl EndpointBroker for HttpBroker {
let (tx, mut tr) = mpsc::channel(10);
let broker = BrokerSender { sender: tx };
let is_json_rpc = endpoint.jsonrpc;
let uri: Uri = endpoint.url.parse().unwrap();
let uri: Uri = endpoint.get_url().parse().unwrap();
// let mut headers = HeaderMap::new();
// headers.insert("Content-Type", "application/json".parse().unwrap());
// if let Some(auth) = &endpoint.authentication {
Expand Down
37 changes: 32 additions & 5 deletions core/main/src/broker/rules_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ pub struct RuleSet {
impl RuleSet {
pub fn append(&mut self, rule_set: RuleSet) {
self.endpoints.extend(rule_set.endpoints);
self.rules.extend(rule_set.rules);
let rules: HashMap<String, Rule> = rule_set
.rules
.into_iter()
.map(|(k, v)| {
debug!("Loading JQ Rule for {}", k.to_lowercase());
(k.to_lowercase(), v)
})
.collect();
self.rules.extend(rules);
}
}

Expand All @@ -49,6 +57,17 @@ pub struct RuleEndpoint {
pub jsonrpc: bool,
}

impl RuleEndpoint {
pub fn get_url(&self) -> String {
if cfg!(feature = "local_dev") {
if let Ok(host_override) = std::env::var("DEVICE_HOST") {
return self.url.replace("127.0.0.1", &host_override);
}
}
self.url.clone()
}
}

fn default_autostart() -> bool {
true
}
Expand Down Expand Up @@ -131,8 +150,7 @@ impl RuleEngine {
if let Some(p) = Path::new(&path_for_rule).to_str() {
if let Ok(contents) = fs::read_to_string(p) {
debug!("Rule content {}", contents);
if let Ok((path, rule_set)) = Self::load_from_content(contents) {
debug!("Rules loaded from path={}", path);
if let Ok((_, rule_set)) = Self::load_from_content(contents) {
engine.rules.append(rule_set)
} else {
warn!("invalid rule found in path {}", path)
Expand All @@ -158,13 +176,22 @@ impl RuleEngine {
}

pub fn has_rule(&self, request: &RpcRequest) -> bool {
self.rules.rules.contains_key(&request.ctx.method)
self.rules
.rules
.contains_key(&request.ctx.method.to_lowercase())
}

pub fn get_rule(&self, rpc_request: &RpcRequest) -> Option<Rule> {
if let Some(mut rule) = self.rules.rules.get(&rpc_request.method).cloned() {
if let Some(mut rule) = self
.rules
.rules
.get(&rpc_request.method.to_lowercase())
.cloned()
{
rule.transform.apply_context(rpc_request);
return Some(rule);
} else {
info!("Rule not available for {}", rpc_request.method);
}
None
}
Expand Down
3 changes: 2 additions & 1 deletion core/main/src/broker/thunder_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl ThunderBroker {
let callback_for_sender = callback.clone();
let broker_for_reconnect = broker.clone();
tokio::spawn(async move {
let (mut ws_tx, mut ws_rx) = BrokerUtils::get_ws_broker(&endpoint.url, None).await;
let (mut ws_tx, mut ws_rx) =
BrokerUtils::get_ws_broker(&endpoint.get_url(), None).await;

// send the first request to the broker. This is the controller statechange subscription request
let status_request = broker_c
Expand Down
5 changes: 3 additions & 2 deletions core/main/src/broker/websocket_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ impl WebsocketBroker {
let broker = BrokerSender { sender: tx };
tokio::spawn(async move {
if endpoint.jsonrpc {
let (mut ws_tx, mut ws_rx) = BrokerUtils::get_ws_broker(&endpoint.url, None).await;
let (mut ws_tx, mut ws_rx) =
BrokerUtils::get_ws_broker(&endpoint.get_url(), None).await;

tokio::pin! {
let read = ws_rx.next();
Expand Down Expand Up @@ -103,7 +104,7 @@ impl WebsocketBroker {
let cleaner = WSNotificationBroker::start(
v.clone(),
callback.clone(),
endpoint.url.clone(),
endpoint.get_url().clone(),
);
{
let mut map = map_clone.write().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion device/thunder_ripple_sdk/src/bootstrap/get_config_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl ThunderGetConfigStep {
GATEWAY_DEFAULT
);
}
if let Ok(host_override) = std::env::var("THUNDER_HOST") {
if let Ok(host_override) = std::env::var("DEVICE_HOST") {
gateway_url.set_host(Some(&host_override)).ok();
}
return Ok(ThunderBootstrapStateWithConfig {
Expand Down
12 changes: 9 additions & 3 deletions device/thunder_ripple_sdk/src/client/thunder_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,9 +677,15 @@ pub struct ThunderRawBoolRequest {

impl ThunderRawBoolRequest {
async fn send_request(self: Box<Self>) -> Value {
let host = match env::var("THUNDER_HOST") {
Ok(h) => h,
Err(_) => String::from("127.0.0.1"),
let host = {
if cfg!(feature = "local_dev") {
match env::var("DEVICE_HOST") {
Ok(h) => h,
Err(_) => String::from("127.0.0.1"),
}
} else {
String::from("127.0.0.1")
}
};

if let Ok(t) = env::var("THUNDER_TOKEN") {
Expand Down
2 changes: 1 addition & 1 deletion ripple
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ case ${1} in
;;
"run")
cargo build --features local_dev
THUNDER_HOST=${2} cargo run --features local_dev core/main
DEVICE_HOST=${2} cargo run --features local_dev core/main
;;
"run-mock")
workspace_dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
Expand Down
Loading