Skip to content

Commit

Permalink
fixe()integrating the circuit breaker on the different protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
ndefokou committed Dec 18, 2024
1 parent 6bc341b commit 3885322
Show file tree
Hide file tree
Showing 13 changed files with 1,056 additions and 724 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ thiserror.workspace = true
didcomm = { workspace = true, features = ["uniffi"] }
hyper = { workspace = true, features = ["full"] }
axum = { workspace = true, features = ["macros"] }
tokio = "1.27.0"

[dev-dependencies]
keystore = { workspace = true, features = ["test-utils"] }
Expand Down
130 changes: 72 additions & 58 deletions crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,78 +4,84 @@ use didcomm::{AttachmentData, Message};
use mongodb::bson::doc;
use serde_json::{json, Value};
use shared::{
circuit_breaker::CircuitBreaker,
repository::entity::{Connection, RoutedMessage},
retry::{retry_async, RetryOptions},
state::{AppState, AppStateRepository},
};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

/// Mediator receives forwarded messages, extract the next field in the message body, and the attachments in the message
/// then stores the attachment with the next field as key for pickup
pub(crate) async fn mediator_forward_process(
state: Arc<AppState>,
message: Message,
circuit_breaker: Arc<Mutex<CircuitBreaker>>,
) -> Result<Option<Message>, ForwardError> {
let AppStateRepository {
message_repository,
connection_repository,
..
} = state
.repository
.as_ref()
.ok_or_else(|| ForwardError::InternalServerError)?;

let circuit_breaker = state.circuit_breaker.clone();
if circuit_breaker.is_open() {
return Err(ForwardError::CircuitOpen);
}

let next = match checks(&message, connection_repository).await.ok() {
Some(next) => Ok(next),
None => Err(ForwardError::InternalServerError),
};

let attachments = message.attachments.unwrap_or_default();
for attachment in attachments {
let attached = match attachment.data {
AttachmentData::Json { value: data } => data.json,
AttachmentData::Base64 { value: data } => json!(data.base64),
AttachmentData::Links { value: data } => json!(data.links),
};

let result = retry_async(
|| {
let attached = attached.clone();
let recipient_did = next.as_ref().unwrap().to_owned();

async move {
message_repository
.store(RoutedMessage {
id: None,
message: attached,
recipient_did,
})
.await
let mut cb = circuit_breaker.lock().await;

let result = cb
.call_async(|| {
let state = Arc::clone(&state);
let message = message.clone();
async move {
let AppStateRepository {
message_repository,
connection_repository,
..
} = state
.repository
.as_ref()
.ok_or_else(|| ForwardError::InternalServerError)?;

let next = match checks(&message, connection_repository).await.ok() {
Some(next) => Ok(next),
None => Err(ForwardError::InternalServerError),
}?;

let attachments = message.attachments.unwrap_or_default();
for attachment in attachments {
let attached = match attachment.data {
AttachmentData::Json { value: data } => data.json,
AttachmentData::Base64 { value: data } => json!(data.base64),
AttachmentData::Links { value: data } => json!(data.links),
};
retry_async(
|| {
let attached = attached.clone();
let recipient_did = next.to_owned();

async move {
message_repository
.store(RoutedMessage {
id: None,
message: attached,
recipient_did,
})
.await
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(1)),
)
.await
.map_err(|_| ForwardError::InternalServerError)?;
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(1)),
)
Ok::<Option<Message>, ForwardError>(None)
}
})
.await;

match result {
Ok(_) => circuit_breaker.record_success(),
Err(_) => {
circuit_breaker.record_failure();
return Err(ForwardError::InternalServerError);
}
};
match result {
Some(Ok(None)) => Ok(None),
Some(Ok(Some(_))) => Err(ForwardError::InternalServerError),
Some(Err(err)) => Err(err),
None => Err(ForwardError::CircuitOpen),
}

Ok(None)
}

async fn checks(
Expand Down Expand Up @@ -113,6 +119,7 @@ mod test {
use keystore::Secrets;
use serde_json::json;
use shared::{
circuit_breaker,
repository::{
entity::Connection,
tests::{MockConnectionRepository, MockMessagesRepository},
Expand Down Expand Up @@ -196,9 +203,16 @@ mod test {
.await
.expect("Unable unpack");

let msg = mediator_forward_process(Arc::new(state.clone()), msg)
.await
.unwrap();
// Wrap the CircuitBreaker in Arc and Mutex
let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3));

let msg: Option<Message> = mediator_forward_process(
Arc::new(state.clone()),
msg,
Arc::new(circuit_breaker.into()),
)
.await
.unwrap();

println!("Mediator1 is forwarding message \n{:?}\n", msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use async_trait::async_trait;
use axum::response::{IntoResponse, Response};
use didcomm::Message;
use message_api::{MessageHandler, MessagePlugin, MessageRouter};
use shared::state::AppState;
use std::sync::Arc;
use shared::{circuit_breaker::CircuitBreaker, state::AppState};
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;

pub struct RoutingProtocol;

Expand All @@ -17,7 +18,13 @@ impl MessageHandler for ForwardHandler {
state: Arc<AppState>,
msg: Message,
) -> Result<Option<Message>, Response> {
crate::handler::mediator_forward_process(state, msg)
let circuit_breaker = Arc::new(Mutex::new(CircuitBreaker::new(
2,
Duration::from_millis(5000),
)));

// Pass the state, msg, and the circuit_breaker as arguments
crate::handler::mediator_forward_process(state, msg, circuit_breaker)
.await
.map_err(|e| e.into_response())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub(crate) enum MediationError {
UnexpectedMessageFormat,
#[error("internal server error")]
InternalServerError,
#[error("service unavailable")]
CircuitOpen,
}

impl IntoResponse for MediationError {
Expand All @@ -26,6 +28,7 @@ impl IntoResponse for MediationError {
MediationError::UncoordinatedSender => StatusCode::UNAUTHORIZED,
MediationError::UnexpectedMessageFormat => StatusCode::BAD_REQUEST,
MediationError::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
MediationError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE,
};

let body = Json(serde_json::json!({
Expand Down
Loading

0 comments on commit 3885322

Please sign in to comment.