Skip to content

Commit

Permalink
feat: mock device extension (#238)
Browse files Browse the repository at this point in the history
* chore: added extn folder

* feat: added mockdevice contract

* chore: renamed extn dir

* chore: added start of ffi

* chore: renamed contract

* tools: fix incorrect paths in setup script

* chore: mock_device extn loading

* chore: added mock ws server

* wip: mock server start

* feat: mock ws server listening for connections

* chore: added ability to modify mock_data at run time

* feat: ripple boots from mock device extn

* fix: added missing event payload to mock data

* chore: cleared warnings

* chore: mock web socket server stands up with shared state in extn

* feat: added rpc interface for mock device extn

* feat: mock device addRequestResponse working

* feat: mock data can be removed from mock device

* chore: removed Arc around mock_data mutex

* feat: added method to emit events

* chore: mock_websocket_server from firebolt

* chore: removed debug logs

* refactor: added more flexible api surface

* chore: self-review todying

* chore: fix manifest examples

* refactor: better error handling and messages.

* feat: mock server responding accross connections. dynamic jsonrpc id

* refactor: added payload types so that we can explicitly support json-rpc

* refactor: simplified names

* chore: fixed clippy error

* chore: licenses

* test: unit tests for mock_data module

* refactor: removed message type in favour of payload type

* test: added mock websocket server tests

* chore: removed unneeded test

* chore: added files for docs

* refactor: use rpc_error util

* feat: mock data file location is now configurable

* refactor: fixed issue with contracts

* refactor: clean up errors. tried to add ffi test

* test: tried adding tests to mock device controller

* test: added placeholder test for processor

* docs: added usage docs

* chore: copyright notice

* chore: missing copyright noticer

* refactor: boot server panic message

* feat: Self contained extension provider (#301)

* feat: Self contained extension provider

* chore: tidied use statements

* test: fixed test for contract name change

* docs: updated mock device examples

* Add ADR for passthrough rpc

* feat: Communication Broker

* fix: atomicity and gateway

* fix: Http Broker

* fix: build errors

* fix: clippy

* feat: Working one

* fix: Http broker

* fix: errors

* fix: PR Ready

* fix: changes to the open rpc to remove user interest

* fix: for making mock device rpc work

* fix: cleanup Mock data hash

* fix: Adding delay

* fix: Eventing mechanism

* fix: Changes for eventing

* fix: cleanup pass through from mock

* fix: cleanup dependencies

* fix: Dependecies

* fix: Update unit tests

* fix: clippy errors

* fix: logger initialization issues

* fix: unit tests

* fix: logging error

* fix: Add emit support

* fix: unwrap

* fix: remove PartialEq impl

* fix: dependency errors

* fix: cleanup manifests

* fix: Add and remove requests

* Update docs

* fix: remove requests

* fix: for events

* fix: Emit events update

* fix: clippy errors

* fix: unit tests

---------

Co-authored-by: Sathishkumar Deena Kirupakaran <Sathishkumar_deenakirupakaran@comcast.com>
Co-authored-by: Sathishkumar <satlead@gmail.com>
Co-authored-by: Kevin Pearson <kevin_pearson@cable.comcast.com>
  • Loading branch information
4 people committed Apr 3, 2024
1 parent 275b5c0 commit afcb10c
Show file tree
Hide file tree
Showing 43 changed files with 3,219 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ members = [
"core/main",
"core/launcher",
"device/thunder",
"device/mock_device",
"distributor/general",
"examples/rpc_extn",
"examples/tm_extn",
Expand Down
1 change: 0 additions & 1 deletion core/main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ sd-notify = { version = "0.4.1", optional = true }
exitcode = "1.1.2"
rand = "0.8"


[build-dependencies]
vergen = "1"

Expand Down
2 changes: 1 addition & 1 deletion core/main/src/bootstrap/extn/load_extn_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Bootstep<BootstrapState> for LoadExtensionsStep {
return Err(RippleError::BootstrapError);
}
} else {
error!("invalid channel builder in {}", path);
error!("failed loading builder in {}", path);
return Err(RippleError::BootstrapError);
}
} else {
Expand Down
1 change: 1 addition & 0 deletions core/main/src/state/bootstrap_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl ChannelsState {
let (gateway_tx, gateway_tr) = mpsc::channel(32);
let (app_req_tx, app_req_tr) = mpsc::channel(32);
let (ctx, ctr) = unbounded();

ChannelsState {
gateway_channel: TransientChannel::new(gateway_tx, gateway_tr),
app_req_channel: TransientChannel::new(app_req_tx, app_req_tr),
Expand Down
1 change: 0 additions & 1 deletion core/main/src/state/platform_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ impl PlatformState {
version: Option<String>,
) -> PlatformState {
let exclusory = ExclusoryImpl::get(&manifest);

Self {
extn_manifest,
cap_state: CapState::new(manifest.clone()),
Expand Down
6 changes: 2 additions & 4 deletions core/main/src/utils/rpc_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,12 @@ use crate::{
state::platform_state::PlatformState,
};

pub use ripple_sdk::utils::rpc_utils::rpc_err;

pub const FIRE_BOLT_DEEPLINK_ERROR_CODE: i32 = -40400;
pub const DOWNSTREAM_SERVICE_UNAVAILABLE_ERROR_CODE: i32 = -50200;
pub const SESSION_NO_INTENT_ERROR_CODE: i32 = -40000;

pub fn rpc_err(msg: impl Into<String>) -> Error {
Error::Custom(msg.into())
}

/// Awaits a oneshot to respond. If the oneshot fails to repond, creates a generic
/// RPC internal error
pub async fn rpc_await_oneshot<T>(rx: oneshot::Receiver<T>) -> RpcResult<T> {
Expand Down
18 changes: 17 additions & 1 deletion core/sdk/src/extn/client/extn_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,22 @@ impl ExtnClient {
{
self.context_update(request);
}
// if its a request coming as an extn provider the extension is calling on itself.
// for eg an extension has a RPC Method provider and also a channel to process the
// requests this below impl will take care of sending the data back to the Extension
else if let Some(extn_id) = target_contract.is_extn_provider() {
if let Some(s) = self.get_extn_sender_with_extn_id(&extn_id) {
let new_message = message.clone();
tokio::spawn(async move {
if let Err(e) = s.send(new_message.into()).await {
error!("Error forwarding request {:?}", e)
}
});
} else {
error!("couldn't find the extension id registered the extn channel {:?} is not available", extn_id);
self.handle_no_processor_error(message);
}
}
// Forward the message to an extn sender
else if let Some(sender) = self.get_extn_sender_with_contract(target_contract)
{
Expand Down Expand Up @@ -600,7 +616,7 @@ impl ExtnClient {

/// Request method which accepts a impl [ExtnPayloadProvider] and uses the capability provided by the trait to send the request.
/// As part of the send process it adds a callback to asynchronously respond back to the caller when the response does get
/// received. This method can be called synchrnously with a timeout
/// received. This method can be called synchronously with a timeout
///
/// # Arguments
/// `payload` - impl [ExtnPayloadProvider]
Expand Down
7 changes: 3 additions & 4 deletions core/sdk/src/extn/client/extn_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,9 @@ pub trait ExtnRequestProcessor: ExtnStreamProcessor + Send + Sync + 'static {
///
/// # Returns
///
/// `Option<bool>` -> Used by [ExtnClient] to handle post processing
/// None - means not processed
/// Some(true) - Successful processing with status success
/// Some(false) - Successful processing with status error
/// `bool` -> Used by [ExtnClient] to handle post processing
/// `true` - Successful processing with status success
/// `false` - Successful processing with status error
async fn process_request(
state: Self::STATE,
msg: ExtnMessage,
Expand Down
6 changes: 4 additions & 2 deletions core/sdk/src/extn/client/extn_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ impl ExtnSender {
}

pub fn check_contract_fulfillment(&self, contract: RippleContract) -> bool {
if self.id.is_main() {
if self.id.is_main() || self.fulfills.contains(&contract.as_clear_string()) {
true
} else if let Ok(extn_id) = ExtnId::try_from(contract.as_clear_string()) {
self.id.eq(&extn_id)
} else {
self.fulfills.contains(&contract.as_clear_string())
false
}
}

Expand Down
131 changes: 126 additions & 5 deletions core/sdk/src/extn/extn_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@
// SPDX-License-Identifier: Apache-2.0
//

use crate::utils::error::RippleError;
use crate::{
framework::ripple_contract::{ContractAdjective, RippleContract},
utils::error::RippleError,
};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value;

use super::extn_client_message::{ExtnPayload, ExtnPayloadProvider, ExtnRequest, ExtnResponse};

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExtnClassId {
Expand Down Expand Up @@ -133,13 +140,36 @@ impl ExtnClassType {
/// Below capability means the given plugin offers a JsonRpsee rpc extension for a service named bridge
///
/// `ripple:extn:jsonrpsee:bridge`
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct ExtnId {
pub _type: ExtnType,
pub class: ExtnClassId,
pub service: String,
}

impl Serialize for ExtnId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&self.to_string())
}
}

impl<'de> Deserialize<'de> for ExtnId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
if let Ok(str) = String::deserialize(deserializer) {
if let Ok(id) = ExtnId::try_from(str) {
return Ok(id);
}
}
Err(serde::de::Error::unknown_variant("unknown", &["unknown"]))
}
}

impl ToString for ExtnId {
fn to_string(&self) -> String {
let r = format!(
Expand Down Expand Up @@ -406,9 +436,100 @@ impl ExtnId {
}
}

impl PartialEq for ExtnId {
fn eq(&self, other: &ExtnId) -> bool {
self._type == other._type && self.class == other.class
#[derive(Debug, Clone, PartialEq)]
pub struct ExtnProviderAdjective {
pub id: ExtnId,
}

impl Serialize for ExtnProviderAdjective {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&self.id.to_string())
}
}

impl<'de> Deserialize<'de> for ExtnProviderAdjective {
fn deserialize<D>(deserializer: D) -> Result<ExtnProviderAdjective, D::Error>
where
D: Deserializer<'de>,
{
if let Ok(str) = String::deserialize(deserializer) {
if let Ok(id) = ExtnId::try_from(str) {
return Ok(ExtnProviderAdjective { id });
}
}
Err(serde::de::Error::unknown_variant("unknown", &["unknown"]))
}
}

impl ContractAdjective for ExtnProviderAdjective {
fn get_contract(&self) -> RippleContract {
RippleContract::ExtnProvider(self.clone())
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ExtnProviderRequest {
pub value: Value,
pub id: ExtnId,
}

impl ExtnPayloadProvider for ExtnProviderRequest {
fn get_from_payload(payload: ExtnPayload) -> Option<Self> {
if let ExtnPayload::Request(ExtnRequest::Extn(value)) = payload {
if let Ok(v) = serde_json::from_value::<ExtnProviderRequest>(value) {
return Some(v);
}
}

None
}

fn get_extn_payload(&self) -> ExtnPayload {
ExtnPayload::Request(ExtnRequest::Extn(
serde_json::to_value(self.clone()).unwrap(),
))
}

fn contract() -> RippleContract {
// Will be replaced by the IEC before CExtnMessage conversion
RippleContract::ExtnProvider(ExtnProviderAdjective {
id: ExtnId::get_main_target("default".into()),
})
}

fn get_contract(&self) -> RippleContract {
RippleContract::ExtnProvider(ExtnProviderAdjective {
id: self.id.clone(),
})
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ExtnProviderResponse {
pub value: Value,
}

impl ExtnPayloadProvider for ExtnProviderResponse {
fn get_from_payload(payload: ExtnPayload) -> Option<Self> {
if let ExtnPayload::Response(ExtnResponse::Value(value)) = payload {
return Some(ExtnProviderResponse { value });
}

None
}

fn get_extn_payload(&self) -> ExtnPayload {
ExtnPayload::Response(ExtnResponse::Value(self.value.clone()))
}

fn contract() -> RippleContract {
// Will be replaced by the IEC before CExtnMessage conversion
RippleContract::ExtnProvider(ExtnProviderAdjective {
id: ExtnId::get_main_target("default".into()),
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/sdk/src/extn/ffi/ffi_library.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct ExtnSymbolMetadata {
}

#[repr(C)]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct CExtnMetadata {
pub name: String,
pub metadata: String,
Expand Down
7 changes: 2 additions & 5 deletions core/sdk/src/extn/ffi/ffi_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ mod tests {
// Create a mock CExtnMessage
let c_extn_message = CExtnMessage {
id: "test_id".to_string(),
requestor,
requestor: requestor.clone(),
target,
target_id: "".to_string(),
payload,
Expand All @@ -201,10 +201,7 @@ mod tests {
assert!(extn_message.is_ok(), "Expected Ok, but got Err");
if let Ok(extn_message) = extn_message {
assert_eq!(extn_message.id, "test_id");
assert_eq!(
extn_message.requestor,
ExtnId::new_channel(ExtnClassId::Device, "info".to_string())
);
assert_eq!(extn_message.requestor.to_string(), requestor);
assert_eq!(extn_message.target, RippleContract::DeviceInfo);
assert_eq!(extn_message.target_id, None);
assert_eq!(
Expand Down
28 changes: 27 additions & 1 deletion core/sdk/src/framework/ripple_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ use crate::{
session::{EventAdjective, PubSubAdjective, SessionAdjective},
storage_property::StorageAdjective,
},
extn::extn_id::ExtnProviderAdjective,
utils::{error::RippleError, serde_utils::SerdeClearString},
};
use jsonrpsee_core::DeserializeOwned;
use log::error;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand Down Expand Up @@ -122,13 +124,14 @@ pub enum RippleContract {
/// the Session information based on their policies. Used by [crate::api::session::AccountSession]
Session(SessionAdjective),
RippleContext,
ExtnProvider(ExtnProviderAdjective),
AppCatalog,
Apps,
// Runtime ability for a given distributor to turn off a certian feature
RemoteFeatureControl,
}

pub trait ContractAdjective: serde::ser::Serialize {
pub trait ContractAdjective: serde::ser::Serialize + DeserializeOwned {
fn as_string(&self) -> String {
let adjective = SerdeClearString::as_clear_string(self);
if let Some(contract) = self.get_contract().get_adjective_contract() {
Expand Down Expand Up @@ -185,13 +188,27 @@ impl RippleContract {
Self::Session(adj) => Some(adj.as_string()),
Self::PubSub(adj) => Some(adj.as_string()),
Self::DeviceEvents(adj) => Some(adj.as_string()),
Self::ExtnProvider(adj) => Some(adj.id.to_string()),
_ => None,
}
}

fn get_contract_from_adjective<T: ContractAdjective>(str: &str) -> Option<RippleContract> {
match serde_json::from_str::<T>(str) {
Ok(v) => Some(v.get_contract()),
Err(e) => {
error!("contract parser_error={:?}", e);
None
}
}
}

pub fn from_adjective_string(contract: &str, adjective: &str) -> Option<Self> {
let adjective = format!("\"{}\"", adjective);
match contract {
"extn_provider" => {
return Self::get_contract_from_adjective::<ExtnProviderAdjective>(&adjective)
}
"storage" => match serde_json::from_str::<StorageAdjective>(&adjective) {
Ok(v) => return Some(v.get_contract()),
Err(e) => error!("contract parser_error={:?}", e),
Expand All @@ -217,6 +234,7 @@ impl RippleContract {
match self {
Self::Storage(_) => Some("storage".to_owned()),
Self::Session(_) => Some("session".to_owned()),
Self::ExtnProvider(_) => Some("extn_provider".to_owned()),
Self::PubSub(_) => Some("pubsub".to_owned()),
Self::DeviceEvents(_) => Some("device_events".to_owned()),
_ => None,
Expand All @@ -238,6 +256,14 @@ impl RippleContract {
}
None
}

pub fn is_extn_provider(&self) -> Option<String> {
if let RippleContract::ExtnProvider(e) = self {
Some(e.id.to_string())
} else {
None
}
}
}

#[derive(Debug, Clone, PartialEq)]
Expand Down
4 changes: 4 additions & 0 deletions core/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ pub extern crate serde_json;
pub extern crate serde_yaml;
pub extern crate tokio;
pub extern crate uuid;

pub trait Mockable {
fn mock() -> Self;
}
Loading

0 comments on commit afcb10c

Please sign in to comment.