diff --git a/Cargo.lock b/Cargo.lock index f17cd0594805..5aaf6b672354 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2799,6 +2799,19 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "events" +version = "0.1.0" +dependencies = [ + "error-stack", + "masking", + "router_env", + "serde", + "serde_json", + "thiserror", + "time", +] + [[package]] name = "external_services" version = "0.1.0" @@ -5589,6 +5602,7 @@ dependencies = [ "erased-serde", "error-stack", "euclid", + "events", "external_services", "futures 0.3.30", "hex", diff --git a/crates/events/Cargo.toml b/crates/events/Cargo.toml new file mode 100644 index 000000000000..b0d352f81e8d --- /dev/null +++ b/crates/events/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "events" +description = "Events framework for generating events & some sample implementations" +version = "0.1.0" +edition.workspace = true +rust-version.workspace = true +license.workspace = true + +[dependencies] +# First Party crates +masking = { version = "0.1.0", path = "../masking" } +router_env = { version = "0.1.0", path = "../router_env" } + +# Third Party crates +error-stack = "0.4.1" +serde = "1.0.197" +serde_json = "1.0.114" +thiserror = "1.0.58" +time = "0.3.34" diff --git a/crates/events/src/lib.rs b/crates/events/src/lib.rs new file mode 100644 index 000000000000..03bccbb517ff --- /dev/null +++ b/crates/events/src/lib.rs @@ -0,0 +1,267 @@ +#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg_hide))] +#![cfg_attr(docsrs, doc(cfg_hide(doc)))] +#![forbid(unsafe_code)] +#![warn(missing_docs)] + +//! +//! A generic event handler system. +//! This library consists of 4 parts: +//! Event Sink: A trait that defines how events are published. This could be a simple logger, a message queue, or a database. +//! EventContext: A struct that holds the event sink and metadata about the event. This is used to create events. This can be used to add metadata to all events, such as the user who triggered the event. +//! EventInfo: A trait that defines the metadata that is sent with the event. It works with the EventContext to add metadata to all events. +//! Event: A trait that defines the event itself. This trait is used to define the data that is sent with the event and defines the event's type & identifier. +//! + +use std::{collections::HashMap, sync::Arc}; + +use error_stack::{Result, ResultExt}; +use masking::{ErasedMaskSerialize, Serialize}; +use router_env::logger; +use serde::Serializer; +use serde_json::Value; +use time::PrimitiveDateTime; + +/// Errors that can occur when working with events. +#[derive(Debug, Clone, thiserror::Error)] +pub enum EventsError { + /// An error occurred when publishing the event. + #[error("Generic Error")] + GenericError, + /// An error occurred when serializing the event. + #[error("Event serialization error")] + SerializationError, + /// An error occurred when publishing/producing the event. + #[error("Event publishing error")] + PublishError, +} + +/// An event that can be published. +pub trait Event: EventInfo { + /// The type of the event. + type EventType; + /// The timestamp of the event. + fn timestamp(&self) -> PrimitiveDateTime; + + /// The (unique) identifier of the event. + fn identifier(&self) -> String; + + /// The class/type of the event. This is used to group/categorize events together. + fn class(&self) -> Self::EventType; +} + +/// Hold the context information for any events +#[derive(Clone)] +pub struct EventContext +where + A: MessagingInterface, +{ + message_sink: Arc, + metadata: HashMap, +} + +/// intermediary structure to build events with in-place info. +#[must_use] +pub struct EventBuilder +where + A: MessagingInterface, + E: Event, +{ + message_sink: Arc, + metadata: HashMap, + event: E, +} + +struct RawEvent>(HashMap, A); + +impl EventBuilder +where + A: MessagingInterface, + E: Event, +{ + /// Add metadata to the event. + pub fn with + 'static>( + mut self, + info: G, + ) -> Self { + info.data() + .and_then(|i| { + i.masked_serialize() + .change_context(EventsError::SerializationError) + }) + .map_err(|e| { + logger::error!("Error adding event info: {:?}", e); + }) + .ok() + .and_then(|data| self.metadata.insert(info.key(), data)); + self + } + /// Emit the event and log any errors. + pub fn emit(self) { + self.try_emit() + .map_err(|e| { + logger::error!("Error emitting event: {:?}", e); + }) + .ok(); + } + + /// Emit the event. + #[must_use = "make sure to call `emit` to actually emit the event"] + pub fn try_emit(self) -> Result<(), EventsError> { + let ts = self.event.timestamp(); + self.message_sink + .send_message(RawEvent(self.metadata, self.event), ts) + } +} + +impl Serialize for RawEvent +where + A: Event, +{ + fn serialize(&self, serializer: S) -> core::result::Result + where + S: Serializer, + { + let mut serialize_map: HashMap<_, _> = self + .0 + .iter() + .filter_map(|(k, v)| Some((k.clone(), v.masked_serialize().ok()?))) + .collect(); + match self.1.data().map(|i| i.masked_serialize()) { + Ok(Ok(Value::Object(map))) => { + for (k, v) in map.into_iter() { + serialize_map.insert(k, v); + } + } + Ok(Ok(i)) => { + serialize_map.insert(self.1.key(), i); + } + i => { + logger::error!("Error serializing event: {:?}", i); + } + }; + serialize_map.serialize(serializer) + } +} + +impl EventContext +where + A: MessagingInterface, +{ + /// Create a new event context. + pub fn new(message_sink: A) -> Self { + Self { + message_sink: Arc::new(message_sink), + metadata: HashMap::new(), + } + } + + /// Add metadata to the event context. + #[track_caller] + pub fn record_info + 'static>( + &mut self, + info: E, + ) { + match info.data().and_then(|i| { + i.masked_serialize() + .change_context(EventsError::SerializationError) + }) { + Ok(data) => { + self.metadata.insert(info.key(), data); + } + Err(e) => { + logger::error!("Error recording event info: {:?}", e); + } + } + } + + /// Emit an event. + pub fn try_emit>(&self, event: E) -> Result<(), EventsError> { + EventBuilder { + message_sink: self.message_sink.clone(), + metadata: self.metadata.clone(), + event, + } + .try_emit() + } + + /// Emit an event. + pub fn emit>(&self, event: E) { + EventBuilder { + message_sink: self.message_sink.clone(), + metadata: self.metadata.clone(), + event, + } + .emit() + } + + /// Create an event builder. + pub fn event>( + &self, + event: E, + ) -> EventBuilder { + EventBuilder { + message_sink: self.message_sink.clone(), + metadata: self.metadata.clone(), + event, + } + } +} + +/// Add information/metadata to the current context of an event. +pub trait EventInfo { + /// The data that is sent with the event. + type Data: ErasedMaskSerialize; + /// The data that is sent with the event. + fn data(&self) -> Result; + + /// The key identifying the data for an event. + fn key(&self) -> String; +} + +impl EventInfo for (String, String) { + type Data = String; + fn data(&self) -> Result { + Ok(self.1.clone()) + } + + fn key(&self) -> String { + self.0.clone() + } +} + +/// A messaging interface for sending messages/events. +/// This can be implemented for any messaging system, such as a message queue, a logger, or a database. +pub trait MessagingInterface { + /// The type of the event used for categorization by the event publisher. + type MessageClass; + /// Send a message that follows the defined message class. + fn send_message(&self, data: T, timestamp: PrimitiveDateTime) -> Result<(), EventsError> + where + T: Message + ErasedMaskSerialize; +} + +/// A message that can be sent. +pub trait Message { + /// The type of the event used for categorization by the event publisher. + type Class; + /// The type of the event used for categorization by the event publisher. + fn get_message_class(&self) -> Self::Class; + + /// The (unique) identifier of the event. + fn identifier(&self) -> String; +} + +impl Message for RawEvent +where + A: Event, +{ + type Class = T; + + fn get_message_class(&self) -> Self::Class { + self.1.class() + } + + fn identifier(&self) -> String { + self.1.identifier() + } +} diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index 002e22ec4d82..1adecf37c67e 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -124,6 +124,7 @@ rdkafka = "0.36.2" isocountry = "0.3.2" iso_currency = "0.4.4" actix-http = "3.6.0" +events = { version = "0.1.0", path = "../events" } [build-dependencies] router_env = { version = "0.1.0", path = "../router_env", default-features = false } diff --git a/crates/router/src/analytics.rs b/crates/router/src/analytics.rs index a9c2b8995b56..b3e72008cc43 100644 --- a/crates/router/src/analytics.rs +++ b/crates/router/src/analytics.rs @@ -122,7 +122,7 @@ pub mod routes { state, &req, domain.into_inner(), - |_, _, domain: analytics::AnalyticsDomain| async { + |_, _, domain: analytics::AnalyticsDomain, _| async { analytics::core::get_domain_info(domain) .await .map(ApplicationResponse::Json) @@ -154,7 +154,7 @@ pub mod routes { state, &req, payload, - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { analytics::payments::get_metrics( &state.pool, &auth.merchant_account.merchant_id, @@ -190,7 +190,7 @@ pub mod routes { state, &req, payload, - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { analytics::refunds::get_metrics( &state.pool, &auth.merchant_account.merchant_id, @@ -226,7 +226,7 @@ pub mod routes { state, &req, payload, - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { analytics::sdk_events::get_metrics( &state.pool, auth.merchant_account.publishable_key.as_ref(), @@ -252,7 +252,7 @@ pub mod routes { state, &req, json_payload.into_inner(), - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { analytics::payments::get_filters( &state.pool, req, @@ -278,7 +278,7 @@ pub mod routes { state, &req, json_payload.into_inner(), - |state, auth: AuthenticationData, req: GetRefundFilterRequest| async move { + |state, auth: AuthenticationData, req: GetRefundFilterRequest, _| async move { analytics::refunds::get_filters( &state.pool, req, @@ -304,7 +304,7 @@ pub mod routes { state, &req, json_payload.into_inner(), - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { analytics::sdk_events::get_filters( &state.pool, req, @@ -330,7 +330,7 @@ pub mod routes { state, &req, json_payload.into_inner(), - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { api_events_core(&state.pool, req, auth.merchant_account.merchant_id) .await .map(ApplicationResponse::Json) @@ -354,7 +354,7 @@ pub mod routes { state, &req, json_payload.into_inner(), - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { outgoing_webhook_events_core(&state.pool, req, auth.merchant_account.merchant_id) .await .map(ApplicationResponse::Json) @@ -376,7 +376,7 @@ pub mod routes { state, &req, json_payload.into_inner(), - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { sdk_events_core( &state.pool, req, @@ -402,7 +402,7 @@ pub mod routes { state.clone(), &req, json_payload.into_inner(), - |state, (auth, user_id): auth::AuthenticationDataWithUserId, payload| async move { + |state, (auth, user_id): auth::AuthenticationDataWithUserId, payload, _| async move { let user = UserInterface::find_user_by_id(&*state.store, &user_id) .await .change_context(AnalyticsError::UnknownError)?; @@ -444,7 +444,7 @@ pub mod routes { state.clone(), &req, json_payload.into_inner(), - |state, (auth, user_id): auth::AuthenticationDataWithUserId, payload| async move { + |state, (auth, user_id): auth::AuthenticationDataWithUserId, payload, _| async move { let user = UserInterface::find_user_by_id(&*state.store, &user_id) .await .change_context(AnalyticsError::UnknownError)?; @@ -486,7 +486,7 @@ pub mod routes { state.clone(), &req, json_payload.into_inner(), - |state, (auth, user_id): auth::AuthenticationDataWithUserId, payload| async move { + |state, (auth, user_id): auth::AuthenticationDataWithUserId, payload, _| async move { let user = UserInterface::find_user_by_id(&*state.store, &user_id) .await .change_context(AnalyticsError::UnknownError)?; @@ -538,7 +538,7 @@ pub mod routes { state.clone(), &req, payload, - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { analytics::api_event::get_api_event_metrics( &state.pool, &auth.merchant_account.merchant_id, @@ -564,7 +564,7 @@ pub mod routes { state.clone(), &req, json_payload.into_inner(), - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { analytics::api_event::get_filters( &state.pool, req, @@ -590,7 +590,7 @@ pub mod routes { state, &req, json_payload.into_inner(), - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { connector_events_core(&state.pool, req, auth.merchant_account.merchant_id) .await .map(ApplicationResponse::Json) @@ -612,7 +612,7 @@ pub mod routes { state.clone(), &req, json_payload.into_inner(), - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { analytics::search::msearch_results( req, &auth.merchant_account.merchant_id, @@ -643,7 +643,7 @@ pub mod routes { state.clone(), &req, indexed_req, - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { analytics::search::search_results( req, &auth.merchant_account.merchant_id, @@ -669,7 +669,7 @@ pub mod routes { state, &req, json_payload.into_inner(), - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { analytics::disputes::get_filters( &state.pool, req, @@ -704,7 +704,7 @@ pub mod routes { state, &req, payload, - |state, auth: AuthenticationData, req| async move { + |state, auth: AuthenticationData, req, _| async move { analytics::disputes::get_metrics( &state.pool, &auth.merchant_account.merchant_id, diff --git a/crates/router/src/compatibility/stripe/customers.rs b/crates/router/src/compatibility/stripe/customers.rs index cd2ce4835e5c..3628c72a31c9 100644 --- a/crates/router/src/compatibility/stripe/customers.rs +++ b/crates/router/src/compatibility/stripe/customers.rs @@ -35,7 +35,6 @@ pub async fn customer_create( _, _, _, - _, types::CreateCustomerResponse, errors::StripeErrorCode, _, @@ -44,7 +43,7 @@ pub async fn customer_create( state.into_inner(), &req, create_cust_req, - |state, auth, req| { + |state, auth, req, _| { customers::create_customer(state, auth.merchant_account, auth.key_store, req) }, &auth::ApiKeyAuth, @@ -70,7 +69,6 @@ pub async fn customer_retrieve( _, _, _, - _, types::CustomerRetrieveResponse, errors::StripeErrorCode, _, @@ -79,7 +77,7 @@ pub async fn customer_retrieve( state.into_inner(), &req, payload, - |state, auth, req| { + |state, auth, req, _| { customers::retrieve_customer(state, auth.merchant_account, auth.key_store, req) }, &auth::ApiKeyAuth, @@ -114,7 +112,6 @@ pub async fn customer_update( _, _, _, - _, types::CustomerUpdateResponse, errors::StripeErrorCode, _, @@ -123,7 +120,7 @@ pub async fn customer_update( state.into_inner(), &req, cust_update_req, - |state, auth, req| { + |state, auth, req, _| { customers::update_customer(state, auth.merchant_account, req, auth.key_store) }, &auth::ApiKeyAuth, @@ -149,7 +146,6 @@ pub async fn customer_delete( _, _, _, - _, types::CustomerDeleteResponse, errors::StripeErrorCode, _, @@ -158,7 +154,7 @@ pub async fn customer_delete( state.into_inner(), &req, payload, - |state, auth, req| { + |state, auth, req, _| { customers::delete_customer(state, auth.merchant_account, req, auth.key_store) }, &auth::ApiKeyAuth, @@ -183,7 +179,6 @@ pub async fn list_customer_payment_method_api( _, _, _, - _, types::CustomerPaymentMethodListResponse, errors::StripeErrorCode, _, @@ -192,7 +187,7 @@ pub async fn list_customer_payment_method_api( state.into_inner(), &req, payload, - |state, auth, req| { + |state, auth, req, _| { cards::do_list_customer_pm_fetch_customer_if_not_passed( state, auth.merchant_account, diff --git a/crates/router/src/compatibility/stripe/payment_intents.rs b/crates/router/src/compatibility/stripe/payment_intents.rs index 87560032ea4b..609116fd8875 100644 --- a/crates/router/src/compatibility/stripe/payment_intents.rs +++ b/crates/router/src/compatibility/stripe/payment_intents.rs @@ -43,7 +43,6 @@ pub async fn payment_intents_create( _, _, _, - _, types::StripePaymentIntentResponse, errors::StripeErrorCode, _, @@ -52,7 +51,7 @@ pub async fn payment_intents_create( state.into_inner(), &req, create_payment_req, - |state, auth, req| { + |state, auth, req, _| { let eligible_connectors = req.connector.clone(); payments::payments_core::( state, @@ -104,7 +103,6 @@ pub async fn payment_intents_retrieve( _, _, _, - _, types::StripePaymentIntentResponse, errors::StripeErrorCode, _, @@ -113,7 +111,7 @@ pub async fn payment_intents_retrieve( state.into_inner(), &req, payload, - |state, auth, payload| { + |state, auth, payload, _| { payments::payments_core::( state, auth.merchant_account, @@ -174,7 +172,6 @@ pub async fn payment_intents_retrieve_with_gateway_creds( _, _, _, - _, types::StripePaymentIntentResponse, errors::StripeErrorCode, _, @@ -183,7 +180,7 @@ pub async fn payment_intents_retrieve_with_gateway_creds( state.into_inner(), &req, payload, - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::( state, auth.merchant_account, @@ -239,7 +236,6 @@ pub async fn payment_intents_update( _, _, _, - _, types::StripePaymentIntentResponse, errors::StripeErrorCode, _, @@ -248,7 +244,7 @@ pub async fn payment_intents_update( state.into_inner(), &req, payload, - |state, auth, req| { + |state, auth, req, _| { let eligible_connectors = req.connector.clone(); payments::payments_core::( state, @@ -311,7 +307,6 @@ pub async fn payment_intents_confirm( _, _, _, - _, types::StripePaymentIntentResponse, errors::StripeErrorCode, _, @@ -320,7 +315,7 @@ pub async fn payment_intents_confirm( state.into_inner(), &req, payload, - |state, auth, req| { + |state, auth, req, _| { let eligible_connectors = req.connector.clone(); payments::payments_core::( state, @@ -373,7 +368,6 @@ pub async fn payment_intents_capture( _, _, _, - _, types::StripePaymentIntentResponse, errors::StripeErrorCode, _, @@ -382,7 +376,7 @@ pub async fn payment_intents_capture( state.into_inner(), &req, payload, - |state, auth, payload| { + |state, auth, payload, _| { payments::payments_core::( state, auth.merchant_account, @@ -438,7 +432,6 @@ pub async fn payment_intents_cancel( _, _, _, - _, types::StripePaymentIntentResponse, errors::StripeErrorCode, _, @@ -447,7 +440,7 @@ pub async fn payment_intents_cancel( state.into_inner(), &req, payload, - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::( state, auth.merchant_account, @@ -484,7 +477,6 @@ pub async fn payment_intent_list( _, _, _, - _, types::StripePaymentIntentListResponse, errors::StripeErrorCode, _, @@ -493,7 +485,7 @@ pub async fn payment_intent_list( state.into_inner(), &req, payload, - |state, auth, req| payments::list_payments(state, auth.merchant_account, req), + |state, auth, req, _| payments::list_payments(state, auth.merchant_account, req), &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, )) diff --git a/crates/router/src/compatibility/stripe/refunds.rs b/crates/router/src/compatibility/stripe/refunds.rs index 80ebbc4f84d7..da68e79e4223 100644 --- a/crates/router/src/compatibility/stripe/refunds.rs +++ b/crates/router/src/compatibility/stripe/refunds.rs @@ -40,7 +40,6 @@ pub async fn refund_create( _, _, _, - _, types::StripeRefundResponse, errors::StripeErrorCode, _, @@ -49,7 +48,7 @@ pub async fn refund_create( state.into_inner(), &req, create_refund_req, - |state, auth, req| { + |state, auth, req, _| { refunds::refund_create_core(state, auth.merchant_account, auth.key_store, req) }, &auth::ApiKeyAuth, @@ -85,7 +84,6 @@ pub async fn refund_retrieve_with_gateway_creds( _, _, _, - _, types::StripeRefundResponse, errors::StripeErrorCode, _, @@ -94,7 +92,7 @@ pub async fn refund_retrieve_with_gateway_creds( state.into_inner(), &req, refund_request, - |state, auth, refund_request| { + |state, auth, refund_request, _| { refunds::refund_response_wrapper( state, auth.merchant_account, @@ -128,7 +126,6 @@ pub async fn refund_retrieve( _, _, _, - _, types::StripeRefundResponse, errors::StripeErrorCode, _, @@ -137,7 +134,7 @@ pub async fn refund_retrieve( state.into_inner(), &req, refund_request, - |state, auth, refund_request| { + |state, auth, refund_request, _| { refunds::refund_response_wrapper( state, auth.merchant_account, @@ -169,7 +166,6 @@ pub async fn refund_update( _, _, _, - _, types::StripeRefundResponse, errors::StripeErrorCode, _, @@ -178,7 +174,7 @@ pub async fn refund_update( state.into_inner(), &req, create_refund_update_req, - |state, auth, req| refunds::refund_update_core(state, auth.merchant_account, req), + |state, auth, req, _| refunds::refund_update_core(state, auth.merchant_account, req), &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, )) diff --git a/crates/router/src/compatibility/stripe/setup_intents.rs b/crates/router/src/compatibility/stripe/setup_intents.rs index 6522dc4697c1..d56937e91c43 100644 --- a/crates/router/src/compatibility/stripe/setup_intents.rs +++ b/crates/router/src/compatibility/stripe/setup_intents.rs @@ -44,7 +44,6 @@ pub async fn setup_intents_create( _, _, _, - _, types::StripeSetupIntentResponse, errors::StripeErrorCode, _, @@ -53,7 +52,7 @@ pub async fn setup_intents_create( state.into_inner(), &req, create_payment_req, - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::< api_types::SetupMandate, api_types::PaymentsResponse, @@ -111,7 +110,6 @@ pub async fn setup_intents_retrieve( _, _, _, - _, types::StripeSetupIntentResponse, errors::StripeErrorCode, _, @@ -120,7 +118,7 @@ pub async fn setup_intents_retrieve( state.into_inner(), &req, payload, - |state, auth, payload| { + |state, auth, payload, _| { payments::payments_core::( state, auth.merchant_account, @@ -177,7 +175,6 @@ pub async fn setup_intents_update( _, _, _, - _, types::StripeSetupIntentResponse, errors::StripeErrorCode, _, @@ -186,7 +183,7 @@ pub async fn setup_intents_update( state.into_inner(), &req, payload, - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::< api_types::SetupMandate, api_types::PaymentsResponse, @@ -251,7 +248,6 @@ pub async fn setup_intents_confirm( _, _, _, - _, types::StripeSetupIntentResponse, errors::StripeErrorCode, _, @@ -260,7 +256,7 @@ pub async fn setup_intents_confirm( state.into_inner(), &req, payload, - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::< api_types::SetupMandate, api_types::PaymentsResponse, diff --git a/crates/router/src/compatibility/wrap.rs b/crates/router/src/compatibility/wrap.rs index da9a7f3d1634..96163727da37 100644 --- a/crates/router/src/compatibility/wrap.rs +++ b/crates/router/src/compatibility/wrap.rs @@ -8,22 +8,25 @@ use serde::Serialize; use crate::{ core::{api_locking, errors}, events::api_logs::ApiEventMetric, - routes::{app::AppStateInfo, metrics}, + routes::{ + app::{AppStateInfo, ReqState}, + metrics, AppState, + }, services::{self, api, authentication as auth, logger}, }; #[instrument(skip(request, payload, state, func, api_authentication))] -pub async fn compatibility_api_wrap<'a, 'b, A, U, T, Q, F, Fut, S, E, E2>( +pub async fn compatibility_api_wrap<'a, 'b, U, T, Q, F, Fut, S, E, E2>( flow: impl router_env::types::FlowMetric, - state: Arc, + state: Arc, request: &'a HttpRequest, payload: T, func: F, - api_authentication: &dyn auth::AuthenticateAndFetch, + api_authentication: &dyn auth::AuthenticateAndFetch, lock_action: api_locking::LockAction, ) -> HttpResponse where - F: Fn(A, U, T) -> Fut, + F: Fn(AppState, U, T, ReqState) -> Fut, Fut: Future, E2>>, E2: ErrorSwitch + std::error::Error + Send + Sync + 'static, Q: Serialize + std::fmt::Debug + 'a + ApiEventMetric, @@ -32,7 +35,6 @@ where error_stack::Report: services::EmbedError, errors::ApiErrorResponse: ErrorSwitch, T: std::fmt::Debug + Serialize + ApiEventMetric, - A: AppStateInfo + Clone, { let request_method = request.method().as_str(); let url_path = request.path(); @@ -41,11 +43,13 @@ where let start_instant = Instant::now(); logger::info!(tag = ?Tag::BeginRequest, payload = ?payload); + let req_state = state.get_req_state(); let server_wrap_util_res = metrics::request::record_request_time_metric( api::server_wrap_util( &flow, state.clone().into(), + req_state, request, payload, func, diff --git a/crates/router/src/core/connector_onboarding.rs b/crates/router/src/core/connector_onboarding.rs index 21377d0ba0bb..b69b2996a6ac 100644 --- a/crates/router/src/core/connector_onboarding.rs +++ b/crates/router/src/core/connector_onboarding.rs @@ -3,8 +3,9 @@ use masking::Secret; use crate::{ core::errors::{ApiErrorResponse, RouterResponse, RouterResult}, + routes::app::ReqState, services::{authentication as auth, ApplicationResponse}, - types::{self as oss_types}, + types as oss_types, utils::connector_onboarding as utils, AppState, }; @@ -20,6 +21,7 @@ pub async fn get_action_url( state: AppState, user_from_token: auth::UserFromToken, request: api::ActionUrlRequest, + _req_state: ReqState, ) -> RouterResponse { utils::check_if_connector_exists(&state, &request.connector_id, &user_from_token.merchant_id) .await?; @@ -54,6 +56,7 @@ pub async fn sync_onboarding_status( state: AppState, user_from_token: auth::UserFromToken, request: api::OnboardingSyncRequest, + _req_state: ReqState, ) -> RouterResponse { utils::check_if_connector_exists(&state, &request.connector_id, &user_from_token.merchant_id) .await?; @@ -107,6 +110,7 @@ pub async fn reset_tracking_id( state: AppState, user_from_token: auth::UserFromToken, request: api::ResetTrackingIdRequest, + _req_state: ReqState, ) -> RouterResponse<()> { utils::check_if_connector_exists(&state, &request.connector_id, &user_from_token.merchant_id) .await?; diff --git a/crates/router/src/core/user.rs b/crates/router/src/core/user.rs index 50fd094d8d08..364af0f83ddb 100644 --- a/crates/router/src/core/user.rs +++ b/crates/router/src/core/user.rs @@ -15,7 +15,7 @@ use super::errors::{StorageErrorExt, UserErrors, UserResponse, UserResult}; use crate::services::email::types as email_types; use crate::{ consts, - routes::AppState, + routes::{app::ReqState, AppState}, services::{authentication as auth, authorization::roles, ApplicationResponse}, types::{domain, transformers::ForeignInto}, utils, @@ -429,6 +429,7 @@ pub async fn invite_user( state: AppState, request: user_api::InviteUserRequest, user_from_token: auth::UserFromToken, + req_state: ReqState, ) -> UserResponse { let inviter_user = state .store @@ -571,6 +572,9 @@ pub async fn invite_user( let is_email_sent; #[cfg(feature = "email")] { + // Doing this to avoid clippy lints + // will add actual usage for this later + let _ = req_state.clone(); let email_contents = email_types::InviteUser { recipient_email: invitee_email, user_name: domain::UserName::new(new_user.get_name())?, @@ -603,6 +607,7 @@ pub async fn invite_user( state.clone(), invited_user_token, set_metadata_request, + req_state, ) .await?; } @@ -624,6 +629,7 @@ pub async fn invite_multiple_user( state: AppState, user_from_token: auth::UserFromToken, requests: Vec, + req_state: ReqState, ) -> UserResponse> { if requests.len() > 10 { return Err(report!(UserErrors::MaxInvitationsError)) @@ -631,7 +637,7 @@ pub async fn invite_multiple_user( } let responses = futures::future::join_all(requests.iter().map(|request| async { - match handle_invitation(&state, &user_from_token, request).await { + match handle_invitation(&state, &user_from_token, request, &req_state).await { Ok(response) => response, Err(error) => InviteMultipleUserResponse { email: request.email.clone(), @@ -650,6 +656,7 @@ async fn handle_invitation( state: &AppState, user_from_token: &auth::UserFromToken, request: &user_api::InviteUserRequest, + req_state: &ReqState, ) -> UserResult { let inviter_user = user_from_token.get_user_from_db(state).await?; @@ -688,7 +695,7 @@ async fn handle_invitation( .err() .unwrap_or(false) { - handle_new_user_invitation(state, user_from_token, request).await + handle_new_user_invitation(state, user_from_token, request, req_state.clone()).await } else { Err(UserErrors::InternalServerError.into()) } @@ -770,6 +777,7 @@ async fn handle_new_user_invitation( state: &AppState, user_from_token: &auth::UserFromToken, request: &user_api::InviteUserRequest, + req_state: ReqState, ) -> UserResult { let new_user = domain::NewUser::try_from((request.clone(), user_from_token.clone()))?; @@ -810,6 +818,9 @@ async fn handle_new_user_invitation( let is_email_sent; #[cfg(feature = "email")] { + // TODO: Adding this to avoid clippy lints + // Will be adding actual usage for this variable later + let _ = req_state.clone(); let invitee_email = domain::UserEmail::from_pii_email(request.email.clone())?; let email_contents = email_types::InviteUser { recipient_email: invitee_email, @@ -840,8 +851,13 @@ async fn handle_new_user_invitation( }; let set_metadata_request = SetMetaDataRequest::IsChangePasswordRequired; - dashboard_metadata::set_metadata(state.clone(), invited_user_token, set_metadata_request) - .await?; + dashboard_metadata::set_metadata( + state.clone(), + invited_user_token, + set_metadata_request, + req_state, + ) + .await?; } Ok(InviteMultipleUserResponse { @@ -861,6 +877,7 @@ pub async fn resend_invite( state: AppState, user_from_token: auth::UserFromToken, request: user_api::ReInviteUserRequest, + _req_state: ReqState, ) -> UserResponse<()> { let invitee_email = domain::UserEmail::from_pii_email(request.email)?; let user: domain::UserFromStorage = state @@ -1210,6 +1227,7 @@ pub async fn get_user_details_in_merchant_account( state: AppState, user_from_token: auth::UserFromToken, request: user_api::GetUserDetailsRequest, + _req_state: ReqState, ) -> UserResponse { let required_user = utils::user::get_user_from_db_by_email(&state, request.email.try_into()?) .await @@ -1458,6 +1476,7 @@ pub async fn update_user_details( state: AppState, user_token: auth::UserFromToken, req: user_api::UpdateUserAccountDetailsRequest, + _req_state: ReqState, ) -> UserResponse<()> { let user: domain::UserFromStorage = state .store diff --git a/crates/router/src/core/user/dashboard_metadata.rs b/crates/router/src/core/user/dashboard_metadata.rs index 328a54c540b6..bd4b2c3a10d0 100644 --- a/crates/router/src/core/user/dashboard_metadata.rs +++ b/crates/router/src/core/user/dashboard_metadata.rs @@ -10,7 +10,7 @@ use router_env::logger; use crate::{ core::errors::{UserErrors, UserResponse, UserResult}, - routes::AppState, + routes::{app::ReqState, AppState}, services::{authentication::UserFromToken, ApplicationResponse}, types::domain::{user::dashboard_metadata as types, MerchantKeyStore}, utils::user::dashboard_metadata as utils, @@ -22,6 +22,7 @@ pub async fn set_metadata( state: AppState, user: UserFromToken, request: api::SetMetaDataRequest, + _req_state: ReqState, ) -> UserResponse<()> { let metadata_value = parse_set_request(request)?; let metadata_key = DBEnum::from(&metadata_value); @@ -35,6 +36,7 @@ pub async fn get_multiple_metadata( state: AppState, user: UserFromToken, request: GetMultipleMetaDataPayload, + _req_state: ReqState, ) -> UserResponse> { let metadata_keys: Vec = request.results.into_iter().map(parse_get_request).collect(); diff --git a/crates/router/src/core/user/sample_data.rs b/crates/router/src/core/user/sample_data.rs index 19b7d3bd815c..f95f48f19e79 100644 --- a/crates/router/src/core/user/sample_data.rs +++ b/crates/router/src/core/user/sample_data.rs @@ -7,7 +7,7 @@ pub type SampleDataApiResponse = SampleDataResult>; use crate::{ core::errors::sample_data::SampleDataResult, - routes::AppState, + routes::{app::ReqState, AppState}, services::{authentication::UserFromToken, ApplicationResponse}, utils::user::sample_data::generate_sample_data, }; @@ -16,6 +16,7 @@ pub async fn generate_sample_data_for_user( state: AppState, user_from_token: UserFromToken, req: SampleDataRequest, + _req_state: ReqState, ) -> SampleDataApiResponse<()> { let sample_data = generate_sample_data(&state, req, user_from_token.merchant_id.as_str()).await?; @@ -59,6 +60,7 @@ pub async fn delete_sample_data_for_user( state: AppState, user_from_token: UserFromToken, _req: SampleDataRequest, + _req_state: ReqState, ) -> SampleDataApiResponse<()> { let merchant_id_del = user_from_token.merchant_id.as_str(); diff --git a/crates/router/src/core/user_role.rs b/crates/router/src/core/user_role.rs index 2939c7d51b3c..6ddab570aed8 100644 --- a/crates/router/src/core/user_role.rs +++ b/crates/router/src/core/user_role.rs @@ -7,9 +7,9 @@ use router_env::logger; use crate::{ consts, core::errors::{StorageErrorExt, UserErrors, UserResponse}, - routes::AppState, + routes::{app::ReqState, AppState}, services::{ - authentication::{self as auth}, + authentication as auth, authorization::{info, roles}, ApplicationResponse, }, @@ -50,6 +50,7 @@ pub async fn update_user_role( state: AppState, user_from_token: auth::UserFromToken, req: user_role_api::UpdateUserRoleRequest, + _req_state: ReqState, ) -> UserResponse<()> { let role_info = roles::RoleInfo::from_role_id( &state, @@ -120,6 +121,7 @@ pub async fn transfer_org_ownership( state: AppState, user_from_token: auth::UserFromToken, req: user_role_api::TransferOrgOwnershipRequest, + _req_state: ReqState, ) -> UserResponse { if user_from_token.role_id != consts::user_role::ROLE_ID_ORGANIZATION_ADMIN { return Err(report!(UserErrors::InvalidRoleOperation)).attach_printable(format!( @@ -171,6 +173,7 @@ pub async fn accept_invitation( state: AppState, user_token: auth::UserWithoutMerchantFromToken, req: user_role_api::AcceptInvitationRequest, + _req_state: ReqState, ) -> UserResponse { let user_role = futures::future::join_all(req.merchant_ids.iter().map(|merchant_id| async { state @@ -223,6 +226,7 @@ pub async fn delete_user_role( state: AppState, user_from_token: auth::UserFromToken, request: user_role_api::DeleteUserRoleRequest, + _req_state: ReqState, ) -> UserResponse<()> { let user_from_db: domain::UserFromStorage = state .store diff --git a/crates/router/src/core/user_role/role.rs b/crates/router/src/core/user_role/role.rs index a0cf5a6caf96..504d5dd632be 100644 --- a/crates/router/src/core/user_role/role.rs +++ b/crates/router/src/core/user_role/role.rs @@ -7,7 +7,7 @@ use error_stack::{report, ResultExt}; use crate::{ consts, core::errors::{StorageErrorExt, UserErrors, UserResponse}, - routes::AppState, + routes::{app::ReqState, AppState}, services::{ authentication::{blacklist, UserFromToken}, authorization::roles::{self, predefined_roles::PREDEFINED_ROLES}, @@ -57,6 +57,7 @@ pub async fn create_role( state: AppState, user_from_token: UserFromToken, req: role_api::CreateRoleRequest, + _req_state: ReqState, ) -> UserResponse { let now = common_utils::date_time::now(); let role_name = RoleName::new(req.role_name)?; diff --git a/crates/router/src/events.rs b/crates/router/src/events.rs index d02b42d41766..d226c2bdbeda 100644 --- a/crates/router/src/events.rs +++ b/crates/router/src/events.rs @@ -1,8 +1,11 @@ use data_models::errors::{StorageError, StorageResult}; use error_stack::ResultExt; +use events::{EventsError, Message, MessagingInterface}; +use masking::ErasedMaskSerialize; use router_env::logger; use serde::{Deserialize, Serialize}; use storage_impl::errors::ApplicationError; +use time::PrimitiveDateTime; use crate::{ db::KafkaProducer, @@ -14,7 +17,6 @@ pub mod audit_events; pub mod connector_api_logs; pub mod event_logger; pub mod outgoing_webhook_logs; - #[derive(Debug, Serialize, Clone, Copy)] #[serde(rename_all = "snake_case")] pub enum EventType { @@ -84,3 +86,21 @@ impl EventsHandler { }; } } + +impl MessagingInterface for EventsHandler { + type MessageClass = EventType; + + fn send_message( + &self, + data: T, + timestamp: PrimitiveDateTime, + ) -> error_stack::Result<(), EventsError> + where + T: Message + ErasedMaskSerialize, + { + match self { + Self::Kafka(a) => a.send_message(data, timestamp), + Self::Logs(a) => a.send_message(data, timestamp), + } + } +} diff --git a/crates/router/src/events/audit_events.rs b/crates/router/src/events/audit_events.rs index 6000d37527d8..90fc8cfe7694 100644 --- a/crates/router/src/events/audit_events.rs +++ b/crates/router/src/events/audit_events.rs @@ -1,31 +1,24 @@ -use data_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent}; +use events::{Event, EventInfo}; use serde::Serialize; - -use crate::services::kafka::KafkaMessage; +use time::PrimitiveDateTime; #[derive(Debug, Clone, Serialize)] pub enum AuditEventType { - Error { - error_message: String, - }, + Error { error_message: String }, PaymentCreated, ConnectorDecided, ConnectorCalled, RefundCreated, RefundSuccess, RefundFail, - PaymentUpdate { - payment_id: String, - merchant_id: String, - payment_intent: PaymentIntent, - payment_attempt: PaymentAttempt, - }, + PaymentCancelled { cancellation_reason: Option }, } #[derive(Debug, Clone, Serialize)] pub struct AuditEvent { event_type: AuditEventType, - created_at: time::PrimitiveDateTime, + #[serde(with = "common_utils::custom_serde::iso8601")] + created_at: PrimitiveDateTime, } impl AuditEvent { @@ -37,12 +30,43 @@ impl AuditEvent { } } -impl KafkaMessage for AuditEvent { - fn key(&self) -> String { - format!("{}", self.created_at.assume_utc().unix_timestamp_nanos()) +impl Event for AuditEvent { + type EventType = super::EventType; + + fn timestamp(&self) -> PrimitiveDateTime { + self.created_at } - fn event_type(&self) -> super::EventType { + fn identifier(&self) -> String { + let event_type = match &self.event_type { + AuditEventType::Error { .. } => "error", + AuditEventType::PaymentCreated => "payment_created", + AuditEventType::ConnectorDecided => "connector_decided", + AuditEventType::ConnectorCalled => "connector_called", + AuditEventType::RefundCreated => "refund_created", + AuditEventType::RefundSuccess => "refund_success", + AuditEventType::RefundFail => "refund_fail", + AuditEventType::PaymentCancelled { .. } => "payment_cancelled", + }; + format!( + "{event_type}-{}", + self.timestamp().assume_utc().unix_timestamp_nanos() + ) + } + + fn class(&self) -> Self::EventType { super::EventType::AuditEvent } } + +impl EventInfo for AuditEvent { + type Data = Self; + + fn data(&self) -> error_stack::Result { + Ok(self.clone()) + } + + fn key(&self) -> String { + "event".to_string() + } +} diff --git a/crates/router/src/events/event_logger.rs b/crates/router/src/events/event_logger.rs index 6851128bf469..235ee4a3e3c2 100644 --- a/crates/router/src/events/event_logger.rs +++ b/crates/router/src/events/event_logger.rs @@ -1,3 +1,8 @@ +use events::{EventsError, Message, MessagingInterface}; +use masking::ErasedMaskSerialize; +use time::PrimitiveDateTime; + +use super::EventType; use crate::services::{kafka::KafkaMessage, logger}; #[derive(Clone, Debug, Default)] @@ -6,6 +11,22 @@ pub struct EventLogger {} impl EventLogger { #[track_caller] pub(super) fn log_event(&self, event: &T) { - logger::info!(event = ?serde_json::to_value(event).unwrap_or(serde_json::json!({"error": "serialization failed"})), event_type =? event.event_type(), event_id =? event.key(), log_type = "event"); + logger::info!(event = ?event.masked_serialize().unwrap_or_else(|e| serde_json::json!({"error": e.to_string()})), event_type =? event.event_type(), event_id =? event.key(), log_type =? "event"); + } +} + +impl MessagingInterface for EventLogger { + type MessageClass = EventType; + + fn send_message( + &self, + data: T, + _timestamp: PrimitiveDateTime, + ) -> error_stack::Result<(), EventsError> + where + T: Message + ErasedMaskSerialize, + { + logger::info!(event =? data.masked_serialize().unwrap_or_else(|e| serde_json::json!({"error": e.to_string()})), event_type =? data.get_message_class(), event_id =? data.identifier(), log_type =? "event"); + Ok(()) } } diff --git a/crates/router/src/routes/admin.rs b/crates/router/src/routes/admin.rs index f5a6a49b9c95..bc534dde3904 100644 --- a/crates/router/src/routes/admin.rs +++ b/crates/router/src/routes/admin.rs @@ -35,7 +35,7 @@ pub async fn merchant_account_create( state, &req, json_payload.into_inner(), - |state, _, req| create_merchant_account(state, req), + |state, _, req, _| create_merchant_account(state, req), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, )) @@ -74,7 +74,7 @@ pub async fn retrieve_merchant_account( state, &req, payload, - |state, _, req| get_merchant_account(state, req), + |state, _, req, _| get_merchant_account(state, req), auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -102,7 +102,7 @@ pub async fn merchant_account_list( state, &req, query_params.into_inner(), - |state, _, request| list_merchant_account(state, request), + |state, _, request, _| list_merchant_account(state, request), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, )) @@ -139,7 +139,7 @@ pub async fn update_merchant_account( state, &req, json_payload.into_inner(), - |state, _, req| merchant_account_update(state, &merchant_id, req), + |state, _, req, _| merchant_account_update(state, &merchant_id, req), auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -184,7 +184,7 @@ pub async fn delete_merchant_account( state, &req, payload, - |state, _, req| merchant_account_delete(state, req.merchant_id), + |state, _, req, _| merchant_account_delete(state, req.merchant_id), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, ) @@ -219,7 +219,7 @@ pub async fn payment_connector_create( state, &req, json_payload.into_inner(), - |state, _, req| create_payment_connector(state, req, &merchant_id), + |state, _, req, _| create_payment_connector(state, req, &merchant_id), auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -270,7 +270,7 @@ pub async fn payment_connector_retrieve( state, &req, payload, - |state, _, req| { + |state, _, req, _| { retrieve_payment_connector(state, req.merchant_id, req.merchant_connector_id) }, auth::auth_type( @@ -317,7 +317,7 @@ pub async fn payment_connector_list( state, &req, merchant_id.to_owned(), - |state, _, merchant_id| list_payment_connectors(state, merchant_id), + |state, _, merchant_id, _| list_payment_connectors(state, merchant_id), auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -365,7 +365,9 @@ pub async fn payment_connector_update( state, &req, json_payload.into_inner(), - |state, _, req| update_payment_connector(state, &merchant_id, &merchant_connector_id, req), + |state, _, req, _| { + update_payment_connector(state, &merchant_id, &merchant_connector_id, req) + }, auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -416,7 +418,9 @@ pub async fn payment_connector_delete( state, &req, payload, - |state, _, req| delete_payment_connector(state, req.merchant_id, req.merchant_connector_id), + |state, _, req, _| { + delete_payment_connector(state, req.merchant_id, req.merchant_connector_id) + }, auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -448,7 +452,7 @@ pub async fn merchant_account_toggle_kv( state, &req, payload, - |state, _, payload| kv_for_merchant(state, payload.merchant_id, payload.kv_enabled), + |state, _, payload, _| kv_for_merchant(state, payload.merchant_id, payload.kv_enabled), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, ) @@ -470,7 +474,7 @@ pub async fn business_profile_create( state, &req, payload, - |state, _, req| create_business_profile(state, req, &merchant_id), + |state, _, req, _| create_business_profile(state, req, &merchant_id), auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -497,7 +501,7 @@ pub async fn business_profile_retrieve( state, &req, profile_id, - |state, _, profile_id| retrieve_business_profile(state, profile_id), + |state, _, profile_id, _| retrieve_business_profile(state, profile_id), auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -525,7 +529,7 @@ pub async fn business_profile_update( state, &req, json_payload.into_inner(), - |state, _, req| update_business_profile(state, &profile_id, &merchant_id, req), + |state, _, req, _| update_business_profile(state, &profile_id, &merchant_id, req), auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -552,7 +556,7 @@ pub async fn business_profile_delete( state, &req, profile_id, - |state, _, profile_id| delete_business_profile(state, profile_id, &merchant_id), + |state, _, profile_id, _| delete_business_profile(state, profile_id, &merchant_id), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, ) @@ -572,7 +576,7 @@ pub async fn business_profiles_list( state, &req, merchant_id.clone(), - |state, _, merchant_id| list_business_profile(state, merchant_id), + |state, _, merchant_id, _| list_business_profile(state, merchant_id), auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -602,7 +606,7 @@ pub async fn merchant_account_kv_status( state, &req, merchant_id, - |state, _, req| check_merchant_account_kv_status(state, req), + |state, _, req, _| check_merchant_account_kv_status(state, req), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, ) diff --git a/crates/router/src/routes/api_keys.rs b/crates/router/src/routes/api_keys.rs index 2e95fd536cfb..559595891524 100644 --- a/crates/router/src/routes/api_keys.rs +++ b/crates/router/src/routes/api_keys.rs @@ -41,7 +41,7 @@ pub async fn api_key_create( state, &req, payload, - |state, _, payload| async { + |state, _, payload, _| async { api_keys::create_api_key(state, payload, merchant_id.clone()).await }, auth::auth_type( @@ -88,7 +88,7 @@ pub async fn api_key_retrieve( state, &req, (&merchant_id, &key_id), - |state, _, (merchant_id, key_id)| api_keys::retrieve_api_key(state, merchant_id, key_id), + |state, _, (merchant_id, key_id), _| api_keys::retrieve_api_key(state, merchant_id, key_id), auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -138,7 +138,7 @@ pub async fn api_key_update( state, &req, payload, - |state, _, payload| api_keys::update_api_key(state, payload), + |state, _, payload, _| api_keys::update_api_key(state, payload), auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -184,7 +184,7 @@ pub async fn api_key_revoke( state, &req, (&merchant_id, &key_id), - |state, _, (merchant_id, key_id)| api_keys::revoke_api_key(state, merchant_id, key_id), + |state, _, (merchant_id, key_id), _| api_keys::revoke_api_key(state, merchant_id, key_id), auth::auth_type( &auth::AdminApiAuth, &auth::JWTAuthMerchantFromRoute { @@ -233,7 +233,7 @@ pub async fn api_key_list( state, &req, (limit, offset, merchant_id.clone()), - |state, _, (limit, offset, merchant_id)| async move { + |state, _, (limit, offset, merchant_id), _| async move { api_keys::list_api_keys(state, merchant_id, limit, offset).await }, auth::auth_type( diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index e0e3045aa8aa..74bb8bbc2422 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -57,6 +57,11 @@ pub use crate::{ services::get_store, }; +#[derive(Clone)] +pub struct ReqState { + pub event_context: events::EventContext, +} + #[derive(Clone)] pub struct AppState { pub flow_name: String, @@ -239,6 +244,12 @@ impl AppState { )) .await } + + pub fn get_req_state(&self) -> ReqState { + ReqState { + event_context: events::EventContext::new(self.event_handler.clone()), + } + } } pub struct Health; diff --git a/crates/router/src/routes/blocklist.rs b/crates/router/src/routes/blocklist.rs index 87072d2c7777..bd0ae9f6c66f 100644 --- a/crates/router/src/routes/blocklist.rs +++ b/crates/router/src/routes/blocklist.rs @@ -31,7 +31,7 @@ pub async fn add_entry_to_blocklist( state, &req, json_payload.into_inner(), - |state, auth: auth::AuthenticationData, body| { + |state, auth: auth::AuthenticationData, body, _| { blocklist::add_entry_to_blocklist(state, auth.merchant_account, body) }, auth::auth_type( @@ -67,7 +67,7 @@ pub async fn remove_entry_from_blocklist( state, &req, json_payload.into_inner(), - |state, auth: auth::AuthenticationData, body| { + |state, auth: auth::AuthenticationData, body, _| { blocklist::remove_entry_from_blocklist(state, auth.merchant_account, body) }, auth::auth_type( @@ -105,7 +105,7 @@ pub async fn list_blocked_payment_methods( state, &req, query_payload.into_inner(), - |state, auth: auth::AuthenticationData, query| { + |state, auth: auth::AuthenticationData, query, _| { blocklist::list_blocklist_entries(state, auth.merchant_account, query) }, auth::auth_type( @@ -143,7 +143,7 @@ pub async fn toggle_blocklist_guard( state, &req, query_payload.into_inner(), - |state, auth: auth::AuthenticationData, query| { + |state, auth: auth::AuthenticationData, query, _| { blocklist::toggle_blocklist_guard(state, auth.merchant_account, query) }, auth::auth_type( diff --git a/crates/router/src/routes/cache.rs b/crates/router/src/routes/cache.rs index 520d6ab7db22..e7742d2d18ca 100644 --- a/crates/router/src/routes/cache.rs +++ b/crates/router/src/routes/cache.rs @@ -22,7 +22,7 @@ pub async fn invalidate( state, &req, &key, - |state, _, key| cache::invalidate(state, key), + |state, _, key, _| cache::invalidate(state, key), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, ) diff --git a/crates/router/src/routes/cards_info.rs b/crates/router/src/routes/cards_info.rs index 79a2061ac4c7..4d65fc720368 100644 --- a/crates/router/src/routes/cards_info.rs +++ b/crates/router/src/routes/cards_info.rs @@ -46,7 +46,7 @@ pub async fn card_iin_info( state, &req, payload, - |state, auth, req| cards_info::retrieve_card_info(state, auth.merchant_account, req), + |state, auth, req, _| cards_info::retrieve_card_info(state, auth.merchant_account, req), &*auth, api_locking::LockAction::NotApplicable, ) diff --git a/crates/router/src/routes/configs.rs b/crates/router/src/routes/configs.rs index d7e96f40235e..74b9cd73b1e6 100644 --- a/crates/router/src/routes/configs.rs +++ b/crates/router/src/routes/configs.rs @@ -22,7 +22,7 @@ pub async fn config_key_create( state, &req, payload, - |state, _, data| configs::set_config(state, data), + |state, _, data, _| configs::set_config(state, data), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, ) @@ -42,7 +42,7 @@ pub async fn config_key_retrieve( state, &req, &key, - |state, _, key| configs::read_config(state, key), + |state, _, key, _| configs::read_config(state, key), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, ) @@ -65,7 +65,7 @@ pub async fn config_key_update( state, &req, &payload, - |state, _, payload| configs::update_config(state, payload), + |state, _, payload, _| configs::update_config(state, payload), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, ) @@ -86,7 +86,7 @@ pub async fn config_key_delete( state, &req, key, - |state, _, key| configs::config_delete(state, key), + |state, _, key, _| configs::config_delete(state, key), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, ) diff --git a/crates/router/src/routes/currency.rs b/crates/router/src/routes/currency.rs index 74a559e88e9f..e80bd53d8df9 100644 --- a/crates/router/src/routes/currency.rs +++ b/crates/router/src/routes/currency.rs @@ -14,7 +14,7 @@ pub async fn retrieve_forex(state: web::Data, req: HttpRequest) -> Htt state, &req, (), - |state, _auth: auth::AuthenticationData, _| currency::retrieve_forex(state), + |state, _auth: auth::AuthenticationData, _, _| currency::retrieve_forex(state), auth::auth_type( &auth::ApiKeyAuth, &auth::DashboardNoPermissionAuth, @@ -39,7 +39,7 @@ pub async fn convert_forex( state.clone(), &req, (), - |state, _, _| { + |state, _, _, _| { currency::convert_forex( state, *amount, diff --git a/crates/router/src/routes/customers.rs b/crates/router/src/routes/customers.rs index a20d2dd026b9..64fa8e83d498 100644 --- a/crates/router/src/routes/customers.rs +++ b/crates/router/src/routes/customers.rs @@ -20,7 +20,7 @@ pub async fn customers_create( state, &req, json_payload.into_inner(), - |state, auth, req| create_customer(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| create_customer(state, auth.merchant_account, auth.key_store, req), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::CustomerWrite), @@ -57,7 +57,7 @@ pub async fn customers_retrieve( state, &req, payload, - |state, auth, req| retrieve_customer(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| retrieve_customer(state, auth.merchant_account, auth.key_store, req), &*auth, api_locking::LockAction::NotApplicable, ) @@ -73,7 +73,9 @@ pub async fn customers_list(state: web::Data, req: HttpRequest) -> Htt state, &req, (), - |state, auth, _| list_customers(state, auth.merchant_account.merchant_id, auth.key_store), + |state, auth, _, _| { + list_customers(state, auth.merchant_account.merchant_id, auth.key_store) + }, auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::CustomerRead), @@ -99,7 +101,7 @@ pub async fn customers_update( state, &req, json_payload.into_inner(), - |state, auth, req| update_customer(state, auth.merchant_account, req, auth.key_store), + |state, auth, req, _| update_customer(state, auth.merchant_account, req, auth.key_store), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::CustomerWrite), @@ -126,7 +128,7 @@ pub async fn customers_delete( state, &req, payload, - |state, auth, req| delete_customer(state, auth.merchant_account, req, auth.key_store), + |state, auth, req, _| delete_customer(state, auth.merchant_account, req, auth.key_store), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::CustomerWrite), @@ -152,7 +154,7 @@ pub async fn get_customer_mandates( state, &req, customer_id, - |state, auth, req| { + |state, auth, req, _| { crate::core::mandate::get_customer_mandates( state, auth.merchant_account, diff --git a/crates/router/src/routes/disputes.rs b/crates/router/src/routes/disputes.rs index 28b3e0db2119..a4a6c7507a4f 100644 --- a/crates/router/src/routes/disputes.rs +++ b/crates/router/src/routes/disputes.rs @@ -43,7 +43,7 @@ pub async fn retrieve_dispute( state, &req, dispute_id, - |state, auth, req| disputes::retrieve_dispute(state, auth.merchant_account, req), + |state, auth, req, _| disputes::retrieve_dispute(state, auth.merchant_account, req), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::DisputeRead), @@ -90,7 +90,7 @@ pub async fn retrieve_disputes_list( state, &req, payload, - |state, auth, req| disputes::retrieve_disputes_list(state, auth.merchant_account, req), + |state, auth, req, _| disputes::retrieve_disputes_list(state, auth.merchant_account, req), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::DisputeRead), @@ -130,7 +130,7 @@ pub async fn accept_dispute( state, &req, dispute_id, - |state, auth, req| { + |state, auth, req, _| { disputes::accept_dispute(state, auth.merchant_account, auth.key_store, req) }, auth::auth_type( @@ -167,7 +167,7 @@ pub async fn submit_dispute_evidence( state, &req, json_payload.into_inner(), - |state, auth, req| { + |state, auth, req, _| { disputes::submit_evidence(state, auth.merchant_account, auth.key_store, req) }, auth::auth_type( @@ -212,7 +212,7 @@ pub async fn attach_dispute_evidence( state, &req, attach_evidence_request, - |state, auth, req| { + |state, auth, req, _| { disputes::attach_evidence(state, auth.merchant_account, auth.key_store, req) }, auth::auth_type( @@ -255,7 +255,9 @@ pub async fn retrieve_dispute_evidence( state, &req, dispute_id, - |state, auth, req| disputes::retrieve_dispute_evidence(state, auth.merchant_account, req), + |state, auth, req, _| { + disputes::retrieve_dispute_evidence(state, auth.merchant_account, req) + }, auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::DisputeRead), @@ -293,7 +295,7 @@ pub async fn delete_dispute_evidence( state, &req, json_payload.into_inner(), - |state, auth, req| disputes::delete_evidence(state, auth.merchant_account, req), + |state, auth, req, _| disputes::delete_evidence(state, auth.merchant_account, req), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::DisputeWrite), diff --git a/crates/router/src/routes/dummy_connector.rs b/crates/router/src/routes/dummy_connector.rs index 7d2aad7e3482..79338fc620c2 100644 --- a/crates/router/src/routes/dummy_connector.rs +++ b/crates/router/src/routes/dummy_connector.rs @@ -27,7 +27,7 @@ pub async fn dummy_connector_authorize_payment( state, &req, payload, - |state, _, req| core::payment_authorize(state, req), + |state, _, req, _| core::payment_authorize(state, req), &auth::NoAuth, api_locking::LockAction::NotApplicable, ) @@ -51,7 +51,7 @@ pub async fn dummy_connector_complete_payment( state, &req, payload, - |state, _, req| core::payment_complete(state, req), + |state, _, req, _| core::payment_complete(state, req), &auth::NoAuth, api_locking::LockAction::NotApplicable, ) @@ -70,7 +70,7 @@ pub async fn dummy_connector_payment( state, &req, payload, - |state, _, req| core::payment(state, req), + |state, _, req, _| core::payment(state, req), &auth::NoAuth, api_locking::LockAction::NotApplicable, ) @@ -90,7 +90,7 @@ pub async fn dummy_connector_payment_data( state, &req, payload, - |state, _, req| core::payment_data(state, req), + |state, _, req, _| core::payment_data(state, req), &auth::NoAuth, api_locking::LockAction::NotApplicable, ) @@ -111,7 +111,7 @@ pub async fn dummy_connector_refund( state, &req, payload, - |state, _, req| core::refund_payment(state, req), + |state, _, req, _| core::refund_payment(state, req), &auth::NoAuth, api_locking::LockAction::NotApplicable, ) @@ -131,7 +131,7 @@ pub async fn dummy_connector_refund_data( state, &req, payload, - |state, _, req| core::refund_data(state, req), + |state, _, req, _| core::refund_data(state, req), &auth::NoAuth, api_locking::LockAction::NotApplicable, ) diff --git a/crates/router/src/routes/ephemeral_key.rs b/crates/router/src/routes/ephemeral_key.rs index a9e70c1b33c2..bfe7c353c287 100644 --- a/crates/router/src/routes/ephemeral_key.rs +++ b/crates/router/src/routes/ephemeral_key.rs @@ -21,7 +21,7 @@ pub async fn ephemeral_key_create( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { helpers::make_ephemeral_key(state, req.customer_id, auth.merchant_account.merchant_id) }, &auth::ApiKeyAuth, @@ -42,7 +42,7 @@ pub async fn ephemeral_key_delete( state, &req, payload, - |state, _, req| helpers::delete_ephemeral_key(state, req), + |state, _, req, _| helpers::delete_ephemeral_key(state, req), &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, ) diff --git a/crates/router/src/routes/files.rs b/crates/router/src/routes/files.rs index 63dfb38c614a..92be12b2bc91 100644 --- a/crates/router/src/routes/files.rs +++ b/crates/router/src/routes/files.rs @@ -44,7 +44,7 @@ pub async fn files_create( state, &req, create_file_request, - |state, auth, req| files_create_core(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| files_create_core(state, auth.merchant_account, auth.key_store, req), auth::auth_type( &auth::ApiKeyAuth, &auth::DashboardNoPermissionAuth, @@ -86,7 +86,7 @@ pub async fn files_delete( state, &req, file_id, - |state, auth, req| files_delete_core(state, auth.merchant_account, req), + |state, auth, req, _| files_delete_core(state, auth.merchant_account, req), auth::auth_type( &auth::ApiKeyAuth, &auth::DashboardNoPermissionAuth, @@ -128,7 +128,9 @@ pub async fn files_retrieve( state, &req, file_id, - |state, auth, req| files_retrieve_core(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| { + files_retrieve_core(state, auth.merchant_account, auth.key_store, req) + }, auth::auth_type( &auth::ApiKeyAuth, &auth::DashboardNoPermissionAuth, diff --git a/crates/router/src/routes/fraud_check.rs b/crates/router/src/routes/fraud_check.rs index d4363a236bb3..f0b73015f3cb 100644 --- a/crates/router/src/routes/fraud_check.rs +++ b/crates/router/src/routes/fraud_check.rs @@ -20,7 +20,7 @@ pub async fn frm_fulfillment( state.clone(), &req, json_payload.into_inner(), - |state, auth, req| { + |state, auth, req, _| { frm_core::frm_fulfillment_core(state, auth.merchant_account, auth.key_store, req) }, &services::authentication::ApiKeyAuth, diff --git a/crates/router/src/routes/gsm.rs b/crates/router/src/routes/gsm.rs index ff70635959fc..77a0cb0a645d 100644 --- a/crates/router/src/routes/gsm.rs +++ b/crates/router/src/routes/gsm.rs @@ -39,7 +39,7 @@ pub async fn create_gsm_rule( state.clone(), &req, payload, - |state, _, payload| gsm::create_gsm_rule(state, payload), + |state, _, payload, _| gsm::create_gsm_rule(state, payload), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, )) @@ -76,7 +76,7 @@ pub async fn get_gsm_rule( state.clone(), &req, gsm_retrieve_req, - |state, _, gsm_retrieve_req| gsm::retrieve_gsm_rule(state, gsm_retrieve_req), + |state, _, gsm_retrieve_req, _| gsm::retrieve_gsm_rule(state, gsm_retrieve_req), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, )) @@ -114,7 +114,7 @@ pub async fn update_gsm_rule( state.clone(), &req, payload, - |state, _, payload| gsm::update_gsm_rule(state, payload), + |state, _, payload, _| gsm::update_gsm_rule(state, payload), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, )) @@ -153,7 +153,7 @@ pub async fn delete_gsm_rule( state, &req, payload, - |state, _, payload| gsm::delete_gsm_rule(state, payload), + |state, _, payload, _| gsm::delete_gsm_rule(state, payload), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, )) diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 2afb1c064ec5..fbfcd893a636 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -33,7 +33,7 @@ pub async fn deep_health_check( state, &request, (), - |state, _, _| deep_health_check_func(state), + |state, _, _, _| deep_health_check_func(state), &auth::NoAuth, api_locking::LockAction::NotApplicable, )) diff --git a/crates/router/src/routes/locker_migration.rs b/crates/router/src/routes/locker_migration.rs index a3df0c3a229b..2a8b1ca7911a 100644 --- a/crates/router/src/routes/locker_migration.rs +++ b/crates/router/src/routes/locker_migration.rs @@ -19,7 +19,7 @@ pub async fn rust_locker_migration( state, &req, &merchant_id, - |state, _, _| locker_migration::rust_locker_migration(state, &merchant_id), + |state, _, _, _| locker_migration::rust_locker_migration(state, &merchant_id), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, )) diff --git a/crates/router/src/routes/mandates.rs b/crates/router/src/routes/mandates.rs index 3e47d78da8a3..365f9a432483 100644 --- a/crates/router/src/routes/mandates.rs +++ b/crates/router/src/routes/mandates.rs @@ -41,7 +41,9 @@ pub async fn get_mandate( state, &req, mandate_id, - |state, auth, req| mandate::get_mandate(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| { + mandate::get_mandate(state, auth.merchant_account, auth.key_store, req) + }, &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, ) @@ -80,7 +82,7 @@ pub async fn revoke_mandate( state, &req, mandate_id, - |state, auth, req| { + |state, auth, req, _| { mandate::revoke_mandate(state, auth.merchant_account, auth.key_store, req) }, &auth::ApiKeyAuth, @@ -124,7 +126,7 @@ pub async fn retrieve_mandates_list( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { mandate::retrieve_mandates_list(state, auth.merchant_account, auth.key_store, req) }, auth::auth_type( diff --git a/crates/router/src/routes/payment_link.rs b/crates/router/src/routes/payment_link.rs index dd6898724c53..7742f6c1c0e0 100644 --- a/crates/router/src/routes/payment_link.rs +++ b/crates/router/src/routes/payment_link.rs @@ -44,7 +44,7 @@ pub async fn payment_link_retrieve( state, &req, payload.clone(), - |state, _auth, _| retrieve_payment_link(state, path.clone()), + |state, _auth, _, _| retrieve_payment_link(state, path.clone()), &*auth_type, api_locking::LockAction::NotApplicable, ) @@ -67,7 +67,7 @@ pub async fn initiate_payment_link( state, &req, payload.clone(), - |state, auth, _| { + |state, auth, _, _| { initiate_payment_link_flow( state, auth.merchant_account, @@ -117,7 +117,7 @@ pub async fn payments_link_list( state, &req, payload, - |state, auth, payload| list_payment_link(state, auth.merchant_account, payload), + |state, auth, payload, _| list_payment_link(state, auth.merchant_account, payload), &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, ) @@ -140,7 +140,7 @@ pub async fn payment_link_status( state, &req, payload.clone(), - |state, auth, _| { + |state, auth, _, _| { get_payment_link_status( state, auth.merchant_account, diff --git a/crates/router/src/routes/payment_methods.rs b/crates/router/src/routes/payment_methods.rs index 7ef20994e2e4..555410a416be 100644 --- a/crates/router/src/routes/payment_methods.rs +++ b/crates/router/src/routes/payment_methods.rs @@ -28,7 +28,7 @@ pub async fn create_payment_method_api( state, &req, json_payload.into_inner(), - |state, auth, req| async move { + |state, auth, req, _| async move { Box::pin(cards::add_payment_method( state, req, @@ -61,7 +61,7 @@ pub async fn list_payment_method_api( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { cards::list_payment_methods(state, auth.merchant_account, auth.key_store, req) }, &*auth, @@ -113,7 +113,7 @@ pub async fn list_customer_payment_method_api( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { cards::do_list_customer_pm_fetch_customer_if_not_passed( state, auth.merchant_account, @@ -169,7 +169,7 @@ pub async fn list_customer_payment_method_api_client( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { cards::do_list_customer_pm_fetch_customer_if_not_passed( state, auth.merchant_account, @@ -201,7 +201,7 @@ pub async fn payment_method_retrieve_api( state, &req, payload, - |state, auth, pm| cards::retrieve_payment_method(state, pm, auth.key_store), + |state, auth, pm, _| cards::retrieve_payment_method(state, pm, auth.key_store), &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, )) @@ -223,7 +223,7 @@ pub async fn payment_method_update_api( state, &req, json_payload.into_inner(), - |state, auth, payload| { + |state, auth, payload, _| { cards::update_customer_payment_method( state, auth.merchant_account, @@ -253,7 +253,7 @@ pub async fn payment_method_delete_api( state, &req, pm, - |state, auth, req| { + |state, auth, req, _| { cards::delete_payment_method(state, auth.merchant_account, req, auth.key_store) }, &auth::ApiKeyAuth, @@ -275,7 +275,7 @@ pub async fn list_countries_currencies_for_connector_payment_method( state, &req, payload, - |state, _auth: auth::AuthenticationData, req| { + |state, _auth: auth::AuthenticationData, req, _| { cards::list_countries_currencies_for_connector_payment_method(state, req) }, #[cfg(not(feature = "release"))] @@ -312,7 +312,7 @@ pub async fn default_payment_method_set_api( state, &req, payload, - |_state, auth: auth::AuthenticationData, default_payment_method| { + |_state, auth: auth::AuthenticationData, default_payment_method, _| { cards::set_default_payment_method( db, auth.merchant_account.merchant_id, diff --git a/crates/router/src/routes/payments.rs b/crates/router/src/routes/payments.rs index eda181f5a8bb..cf12878c2fb5 100644 --- a/crates/router/src/routes/payments.rs +++ b/crates/router/src/routes/payments.rs @@ -123,7 +123,7 @@ pub async fn payments_create( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { authorize_verify_select::<_, Oss>( payments::PaymentCreate, state, @@ -186,7 +186,7 @@ pub async fn payments_start( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::< api_types::Authorize, payment_types::PaymentsResponse, @@ -267,7 +267,7 @@ pub async fn payments_retrieve( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::( state, auth.merchant_account, @@ -339,7 +339,7 @@ pub async fn payments_retrieve_with_gateway_creds( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::( state, auth.merchant_account, @@ -408,7 +408,7 @@ pub async fn payments_update( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { authorize_verify_select::<_, Oss>( payments::PaymentUpdate, state, @@ -485,7 +485,7 @@ pub async fn payments_confirm( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { authorize_verify_select::<_, Oss>( payments::PaymentConfirm, state, @@ -543,7 +543,7 @@ pub async fn payments_capture( state, &req, payload, - |state, auth, payload| { + |state, auth, payload, _| { payments::payments_core::< api_types::Capture, payment_types::PaymentsResponse, @@ -601,7 +601,7 @@ pub async fn payments_connector_session( state, &req, payload, - |state, auth, payload| { + |state, auth, payload, _| { payments::payments_core::< api_types::Session, payment_types::PaymentsSessionResponse, @@ -672,7 +672,7 @@ pub async fn payments_redirect_response( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { >::handle_payments_redirect_response( &payments::PaymentRedirectSync {}, state, @@ -732,7 +732,7 @@ pub async fn payments_redirect_response_with_creds_identifier( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { >::handle_payments_redirect_response( &payments::PaymentRedirectSync {}, state, @@ -774,7 +774,7 @@ pub async fn payments_complete_authorize( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { >::handle_payments_redirect_response( &payments::PaymentRedirectCompleteAuthorize {}, @@ -828,7 +828,7 @@ pub async fn payments_cancel( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::( state, auth.merchant_account, @@ -885,7 +885,7 @@ pub async fn payments_list( state, &req, payload, - |state, auth, req| payments::list_payments(state, auth.merchant_account, req), + |state, auth, req, _| payments::list_payments(state, auth.merchant_account, req), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::PaymentRead), @@ -909,7 +909,9 @@ pub async fn payments_list_by_filter( state, &req, payload, - |state, auth, req| payments::apply_filters_on_payments(state, auth.merchant_account, req), + |state, auth, req, _| { + payments::apply_filters_on_payments(state, auth.merchant_account, req) + }, auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::PaymentRead), @@ -933,7 +935,7 @@ pub async fn get_filters_for_payments( state, &req, payload, - |state, auth, req| payments::get_filters_for_payments(state, auth.merchant_account, req), + |state, auth, req, _| payments::get_filters_for_payments(state, auth.merchant_account, req), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::PaymentRead), @@ -968,7 +970,7 @@ pub async fn payments_approve( state, &http_req, payload.clone(), - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::< api_types::Capture, payment_types::PaymentsResponse, @@ -1028,7 +1030,7 @@ pub async fn payments_reject( state, &http_req, payload.clone(), - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::< api_types::Void, payment_types::PaymentsResponse, @@ -1182,7 +1184,7 @@ pub async fn payments_incremental_authorization( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { payments::payments_core::< api_types::IncrementalAuthorization, payment_types::PaymentsResponse, @@ -1246,7 +1248,7 @@ pub async fn payments_external_authentication( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { payments::payment_external_authentication( state, auth.merchant_account, @@ -1304,7 +1306,7 @@ pub async fn post_3ds_payments_authorize( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { >::handle_payments_redirect_response( &payments::PaymentAuthenticateCompleteAuthorize {}, state, diff --git a/crates/router/src/routes/payouts.rs b/crates/router/src/routes/payouts.rs index 2a429126c850..f3b6a7cd4774 100644 --- a/crates/router/src/routes/payouts.rs +++ b/crates/router/src/routes/payouts.rs @@ -38,7 +38,9 @@ pub async fn payouts_create( state, &req, json_payload.into_inner(), - |state, auth, req| payouts_create_core(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| { + payouts_create_core(state, auth.merchant_account, auth.key_store, req) + }, &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, )) @@ -76,7 +78,9 @@ pub async fn payouts_retrieve( state, &req, payout_retrieve_request, - |state, auth, req| payouts_retrieve_core(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| { + payouts_retrieve_core(state, auth.merchant_account, auth.key_store, req) + }, auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::PayoutRead), @@ -118,7 +122,9 @@ pub async fn payouts_update( state, &req, payout_update_payload, - |state, auth, req| payouts_update_core(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| { + payouts_update_core(state, auth.merchant_account, auth.key_store, req) + }, &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, )) @@ -156,7 +162,9 @@ pub async fn payouts_cancel( state, &req, payload, - |state, auth, req| payouts_cancel_core(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| { + payouts_cancel_core(state, auth.merchant_account, auth.key_store, req) + }, &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, )) @@ -194,7 +202,9 @@ pub async fn payouts_fulfill( state, &req, payload, - |state, auth, req| payouts_fulfill_core(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| { + payouts_fulfill_core(state, auth.merchant_account, auth.key_store, req) + }, &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, )) @@ -228,7 +238,7 @@ pub async fn payouts_list( state, &req, payload, - |state, auth, req| payouts_list_core(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| payouts_list_core(state, auth.merchant_account, auth.key_store, req), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::PayoutRead), @@ -266,7 +276,7 @@ pub async fn payouts_list_by_filter( state, &req, payload, - |state, auth, req| { + |state, auth, req, _| { payouts_filtered_list_core(state, auth.merchant_account, auth.key_store, req) }, auth::auth_type( @@ -306,7 +316,9 @@ pub async fn payouts_list_available_filters( state, &req, payload, - |state, auth, req| payouts_list_available_filters_core(state, auth.merchant_account, req), + |state, auth, req, _| { + payouts_list_available_filters_core(state, auth.merchant_account, req) + }, auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::PayoutRead), diff --git a/crates/router/src/routes/pm_auth.rs b/crates/router/src/routes/pm_auth.rs index cfadd787c310..e0cce9c515cb 100644 --- a/crates/router/src/routes/pm_auth.rs +++ b/crates/router/src/routes/pm_auth.rs @@ -24,7 +24,7 @@ pub async fn link_token_create( state, &req, payload, - |state, auth, payload| { + |state, auth, payload, _| { crate::core::pm_auth::create_link_token( state, auth.merchant_account, @@ -58,7 +58,7 @@ pub async fn exchange_token( state, &req, payload, - |state, auth, payload| { + |state, auth, payload, _| { crate::core::pm_auth::exchange_token_core( state, auth.merchant_account, diff --git a/crates/router/src/routes/recon.rs b/crates/router/src/routes/recon.rs index faa41d9d1d25..7845f52c9243 100644 --- a/crates/router/src/routes/recon.rs +++ b/crates/router/src/routes/recon.rs @@ -35,7 +35,7 @@ pub async fn update_merchant( state, &req, json_payload.into_inner(), - |state, _user, req| recon_merchant_account_update(state, req), + |state, _user, req, _| recon_merchant_account_update(state, req), &auth::ReconAdmin, api_locking::LockAction::NotApplicable, )) @@ -49,7 +49,7 @@ pub async fn request_for_recon(state: web::Data, http_req: HttpRequest state, &http_req, (), - |state, user: UserFromToken, _req| send_recon_request(state, user), + |state, user: UserFromToken, _req, _| send_recon_request(state, user), &auth::DashboardNoPermissionAuth, api_locking::LockAction::NotApplicable, )) @@ -63,7 +63,7 @@ pub async fn get_recon_token(state: web::Data, req: HttpRequest) -> Ht state, &req, (), - |state, user: ReconUser, _| generate_recon_token(state, user), + |state, user: ReconUser, _, _| generate_recon_token(state, user), &auth::ReconJWT, api_locking::LockAction::NotApplicable, )) diff --git a/crates/router/src/routes/refunds.rs b/crates/router/src/routes/refunds.rs index ef9ffb411240..d68c7138213f 100644 --- a/crates/router/src/routes/refunds.rs +++ b/crates/router/src/routes/refunds.rs @@ -36,7 +36,7 @@ pub async fn refunds_create( state, &req, json_payload.into_inner(), - |state, auth, req| refund_create_core(state, auth.merchant_account, auth.key_store, req), + |state, auth, req, _| refund_create_core(state, auth.merchant_account, auth.key_store, req), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::RefundWrite), @@ -88,7 +88,7 @@ pub async fn refunds_retrieve( state, &req, refund_request, - |state, auth, refund_request| { + |state, auth, refund_request, _| { refund_response_wrapper( state, auth.merchant_account, @@ -139,7 +139,7 @@ pub async fn refunds_retrieve_with_body( state, &req, json_payload.into_inner(), - |state, auth, req| { + |state, auth, req, _| { refund_response_wrapper( state, auth.merchant_account, @@ -187,7 +187,7 @@ pub async fn refunds_update( state, &req, refund_update_req, - |state, auth, req| refund_update_core(state, auth.merchant_account, req), + |state, auth, req, _| refund_update_core(state, auth.merchant_account, req), &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, ) @@ -220,7 +220,7 @@ pub async fn refunds_list( state, &req, payload.into_inner(), - |state, auth, req| refund_list(state, auth.merchant_account, req), + |state, auth, req, _| refund_list(state, auth.merchant_account, req), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::RefundRead), @@ -257,7 +257,7 @@ pub async fn refunds_filter_list( state, &req, payload.into_inner(), - |state, auth, req| refund_filter_list(state, auth.merchant_account, req), + |state, auth, req, _| refund_filter_list(state, auth.merchant_account, req), auth::auth_type( &auth::ApiKeyAuth, &auth::JWTAuth(Permission::RefundRead), diff --git a/crates/router/src/routes/routing.rs b/crates/router/src/routes/routing.rs index 28d65f66a828..8cae5d5b6450 100644 --- a/crates/router/src/routes/routing.rs +++ b/crates/router/src/routes/routing.rs @@ -31,7 +31,7 @@ pub async fn routing_create_config( state, &req, json_payload.into_inner(), - |state, auth: auth::AuthenticationData, payload| { + |state, auth: auth::AuthenticationData, payload, _| { routing::create_routing_config( state, auth.merchant_account, @@ -67,7 +67,7 @@ pub async fn routing_link_config( state, &req, path.into_inner(), - |state, auth: auth::AuthenticationData, algorithm_id| { + |state, auth: auth::AuthenticationData, algorithm_id, _| { routing::link_routing_config( state, auth.merchant_account, @@ -104,7 +104,7 @@ pub async fn routing_retrieve_config( state, &req, algorithm_id, - |state, auth: auth::AuthenticationData, algorithm_id| { + |state, auth: auth::AuthenticationData, algorithm_id, _| { routing::retrieve_routing_config(state, auth.merchant_account, algorithm_id) }, #[cfg(not(feature = "release"))] @@ -136,7 +136,7 @@ pub async fn list_routing_configs( state, &req, query.into_inner(), - |state, auth: auth::AuthenticationData, query_params| { + |state, auth: auth::AuthenticationData, query_params, _| { routing::retrieve_merchant_routing_dictionary( state, auth.merchant_account, @@ -165,7 +165,7 @@ pub async fn list_routing_configs( state, &req, (), - |state, auth: auth::AuthenticationData, _| { + |state, auth: auth::AuthenticationData, _, _| { routing::retrieve_merchant_routing_dictionary(state, auth.merchant_account) }, #[cfg(not(feature = "release"))] @@ -200,7 +200,7 @@ pub async fn routing_unlink_config( state, &req, payload.into_inner(), - |state, auth: auth::AuthenticationData, payload_req| { + |state, auth: auth::AuthenticationData, payload_req, _| { routing::unlink_routing_config( state, auth.merchant_account, @@ -229,7 +229,7 @@ pub async fn routing_unlink_config( state, &req, (), - |state, auth: auth::AuthenticationData, _| { + |state, auth: auth::AuthenticationData, _, _| { routing::unlink_routing_config( state, auth.merchant_account, @@ -264,7 +264,7 @@ pub async fn routing_update_default_config( state, &req, json_payload.into_inner(), - |state, auth: auth::AuthenticationData, updated_config| { + |state, auth: auth::AuthenticationData, updated_config, _| { routing::update_default_routing_config( state, auth.merchant_account, @@ -297,7 +297,7 @@ pub async fn routing_retrieve_default_config( state, &req, (), - |state, auth: auth::AuthenticationData, _| { + |state, auth: auth::AuthenticationData, _, _| { routing::retrieve_default_routing_config(state, auth.merchant_account, transaction_type) }, #[cfg(not(feature = "release"))] @@ -326,7 +326,7 @@ pub async fn upsert_surcharge_decision_manager_config( state, &req, json_payload.into_inner(), - |state, auth: auth::AuthenticationData, update_decision| { + |state, auth: auth::AuthenticationData, update_decision, _| { surcharge_decision_config::upsert_surcharge_decision_config( state, auth.key_store, @@ -358,7 +358,7 @@ pub async fn delete_surcharge_decision_manager_config( state, &req, (), - |state, auth: auth::AuthenticationData, ()| { + |state, auth: auth::AuthenticationData, (), _| { surcharge_decision_config::delete_surcharge_decision_config( state, auth.key_store, @@ -390,7 +390,7 @@ pub async fn retrieve_surcharge_decision_manager_config( state, &req, (), - |state, auth: auth::AuthenticationData, _| { + |state, auth: auth::AuthenticationData, _, _| { surcharge_decision_config::retrieve_surcharge_decision_config( state, auth.merchant_account, @@ -422,7 +422,7 @@ pub async fn upsert_decision_manager_config( state, &req, json_payload.into_inner(), - |state, auth: auth::AuthenticationData, update_decision| { + |state, auth: auth::AuthenticationData, update_decision, _| { conditional_config::upsert_conditional_config( state, auth.key_store, @@ -455,7 +455,7 @@ pub async fn delete_decision_manager_config( state, &req, (), - |state, auth: auth::AuthenticationData, ()| { + |state, auth: auth::AuthenticationData, (), _| { conditional_config::delete_conditional_config( state, auth.key_store, @@ -487,7 +487,7 @@ pub async fn retrieve_decision_manager_config( state, &req, (), - |state, auth: auth::AuthenticationData, _| { + |state, auth: auth::AuthenticationData, _, _| { conditional_config::retrieve_conditional_config(state, auth.merchant_account) }, #[cfg(not(feature = "release"))] @@ -520,7 +520,7 @@ pub async fn routing_retrieve_linked_config( state, &req, query.into_inner(), - |state, auth: AuthenticationData, query_params| { + |state, auth: AuthenticationData, query_params, _| { routing::retrieve_linked_routing_config( state, auth.merchant_account, @@ -549,7 +549,7 @@ pub async fn routing_retrieve_linked_config( state, &req, (), - |state, auth: auth::AuthenticationData, _| { + |state, auth: auth::AuthenticationData, _, _| { routing::retrieve_linked_routing_config(state, auth.merchant_account) }, #[cfg(not(feature = "release"))] @@ -584,7 +584,7 @@ pub async fn upsert_connector_agnostic_mandate_config( state, &req, json_payload.into_inner(), - |state, _auth: AuthenticationData, mandate_config| { + |state, _auth: AuthenticationData, mandate_config, _| { Box::pin(routing::upsert_connector_agnostic_mandate_config( state, &business_profile_id, @@ -616,7 +616,7 @@ pub async fn routing_retrieve_default_config_for_profiles( state, &req, (), - |state, auth: auth::AuthenticationData, _| { + |state, auth: auth::AuthenticationData, _, _| { routing::retrieve_default_routing_config_for_profiles( state, auth.merchant_account, @@ -658,7 +658,7 @@ pub async fn routing_update_default_config_for_profile( state, &req, routing_payload_wrapper, - |state, auth: auth::AuthenticationData, wrapper| { + |state, auth: auth::AuthenticationData, wrapper, _| { routing::update_default_routing_config_for_profile( state, auth.merchant_account, diff --git a/crates/router/src/routes/user.rs b/crates/router/src/routes/user.rs index 3e5061b401d8..eb34c8ae3213 100644 --- a/crates/router/src/routes/user.rs +++ b/crates/router/src/routes/user.rs @@ -32,7 +32,7 @@ pub async fn user_signup_with_merchant_id( state, &http_req, req_payload.clone(), - |state, _, req_body| user_core::signup_with_merchant_id(state, req_body), + |state, _, req_body, _| user_core::signup_with_merchant_id(state, req_body), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, )) @@ -51,7 +51,7 @@ pub async fn user_signup( state, &http_req, req_payload.clone(), - |state, _, req_body| user_core::signup(state, req_body), + |state, _, req_body, _| user_core::signup(state, req_body), &auth::NoAuth, api_locking::LockAction::NotApplicable, )) @@ -70,7 +70,7 @@ pub async fn user_signin_without_invite_checks( state, &http_req, req_payload.clone(), - |state, _, req_body| user_core::signin_without_invite_checks(state, req_body), + |state, _, req_body, _| user_core::signin_without_invite_checks(state, req_body), &auth::NoAuth, api_locking::LockAction::NotApplicable, )) @@ -89,7 +89,7 @@ pub async fn user_signin( state, &http_req, req_payload.clone(), - |state, _, req_body| user_core::signin(state, req_body), + |state, _, req_body, _| user_core::signin(state, req_body), &auth::NoAuth, api_locking::LockAction::NotApplicable, )) @@ -109,7 +109,7 @@ pub async fn user_connect_account( state, &http_req, req_payload.clone(), - |state, _, req_body| user_core::connect_account(state, req_body), + |state, _, req_body, _| user_core::connect_account(state, req_body), &auth::NoAuth, api_locking::LockAction::NotApplicable, )) @@ -123,7 +123,7 @@ pub async fn signout(state: web::Data, http_req: HttpRequest) -> HttpR state.clone(), &http_req, (), - |state, user, _| user_core::signout(state, user), + |state, user, _, _| user_core::signout(state, user), &auth::DashboardNoPermissionAuth, api_locking::LockAction::NotApplicable, )) @@ -141,7 +141,7 @@ pub async fn change_password( state.clone(), &http_req, json_payload.into_inner(), - |state, user, req| user_core::change_password(state, req, user), + |state, user, req, _| user_core::change_password(state, req, user), &auth::DashboardNoPermissionAuth, api_locking::LockAction::NotApplicable, )) @@ -211,7 +211,7 @@ pub async fn internal_user_signup( state.clone(), &http_req, json_payload.into_inner(), - |state, _, req| user_core::create_internal_user(state, req), + |state, _, req, _| user_core::create_internal_user(state, req), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, )) @@ -229,7 +229,7 @@ pub async fn switch_merchant_id( state.clone(), &http_req, json_payload.into_inner(), - |state, user, req| user_core::switch_merchant_id(state, req, user), + |state, user, req, _| user_core::switch_merchant_id(state, req, user), &auth::DashboardNoPermissionAuth, api_locking::LockAction::NotApplicable, )) @@ -247,7 +247,7 @@ pub async fn user_merchant_account_create( state, &req, json_payload.into_inner(), - |state, auth: auth::UserFromToken, json_payload| { + |state, auth: auth::UserFromToken, json_payload, _| { user_core::create_merchant_account(state, auth, json_payload) }, &auth::JWTAuth(Permission::MerchantAccountCreate), @@ -304,7 +304,7 @@ pub async fn list_merchants_for_user(state: web::Data, req: HttpReques state, &req, (), - |state, user, _| user_core::list_merchants_for_user(state, user), + |state, user, _, _| user_core::list_merchants_for_user(state, user), &auth::DashboardNoPermissionAuth, api_locking::LockAction::NotApplicable, )) @@ -339,7 +339,7 @@ pub async fn list_users_for_merchant_account( state.clone(), &req, (), - |state, user, _| user_core::list_users_for_merchant_account(state, user), + |state, user, _, _| user_core::list_users_for_merchant_account(state, user), &auth::JWTAuth(Permission::UsersRead), api_locking::LockAction::NotApplicable, )) @@ -358,7 +358,7 @@ pub async fn forgot_password( state.clone(), &req, payload.into_inner(), - |state, _, payload| user_core::forgot_password(state, payload), + |state, _, payload, _| user_core::forgot_password(state, payload), &auth::NoAuth, api_locking::LockAction::NotApplicable, )) @@ -377,7 +377,7 @@ pub async fn reset_password( state.clone(), &req, payload.into_inner(), - |state, _, payload| user_core::reset_password(state, payload), + |state, _, payload, _| user_core::reset_password(state, payload), &auth::NoAuth, api_locking::LockAction::NotApplicable, )) @@ -395,7 +395,7 @@ pub async fn invite_user( state.clone(), &req, payload.into_inner(), - |state, user, payload| user_core::invite_user(state, payload, user), + |state, user, payload, req_state| user_core::invite_user(state, payload, user, req_state), &auth::JWTAuth(Permission::UsersWrite), api_locking::LockAction::NotApplicable, )) @@ -450,7 +450,7 @@ pub async fn accept_invite_from_email( state.clone(), &req, payload.into_inner(), - |state, _, request_payload| user_core::accept_invite_from_email(state, request_payload), + |state, _, request_payload, _| user_core::accept_invite_from_email(state, request_payload), &auth::NoAuth, api_locking::LockAction::NotApplicable, )) @@ -469,7 +469,9 @@ pub async fn verify_email_without_invite_checks( state, &http_req, json_payload.into_inner(), - |state, _, req_payload| user_core::verify_email_without_invite_checks(state, req_payload), + |state, _, req_payload, _| { + user_core::verify_email_without_invite_checks(state, req_payload) + }, &auth::NoAuth, api_locking::LockAction::NotApplicable, )) @@ -488,7 +490,7 @@ pub async fn verify_email( state, &http_req, json_payload.into_inner(), - |state, _, req_payload| user_core::verify_email(state, req_payload), + |state, _, req_payload, _| user_core::verify_email(state, req_payload), &auth::NoAuth, api_locking::LockAction::NotApplicable, )) @@ -507,7 +509,7 @@ pub async fn verify_email_request( state.clone(), &http_req, json_payload.into_inner(), - |state, _, req_body| user_core::send_verification_mail(state, req_body), + |state, _, req_body, _| user_core::send_verification_mail(state, req_body), &auth::NoAuth, api_locking::LockAction::NotApplicable, )) @@ -522,7 +524,7 @@ pub async fn verify_recon_token(state: web::Data, http_req: HttpReques state.clone(), &http_req, (), - |state, user, _req| user_core::verify_token(state, user), + |state, user, _req, _| user_core::verify_token(state, user), &auth::ReconJWT, api_locking::LockAction::NotApplicable, )) diff --git a/crates/router/src/routes/user_role.rs b/crates/router/src/routes/user_role.rs index 65e2aa4cdb87..7ffcc8d09df0 100644 --- a/crates/router/src/routes/user_role.rs +++ b/crates/router/src/routes/user_role.rs @@ -27,7 +27,7 @@ pub async fn get_authorization_info( state.clone(), &http_req, (), - |state, _: (), _| async move { + |state, _: (), _, _| async move { // TODO: Permissions to be deprecated once groups are stable if respond_with_groups { user_role_core::get_authorization_info_with_groups(state).await @@ -54,7 +54,7 @@ pub async fn get_role_from_token( state.clone(), &req, (), - |state, user, _| async move { + |state, user, _, _| async move { // TODO: Permissions to be deprecated once groups are stable if respond_with_groups { role_core::get_role_from_token_with_groups(state, user).await @@ -98,7 +98,7 @@ pub async fn list_all_roles( state.clone(), &req, (), - |state, user, _| async move { + |state, user, _, _| async move { // TODO: Permissions to be deprecated once groups are stable if respond_with_groups { role_core::list_invitable_roles_with_groups(state, user).await @@ -128,7 +128,7 @@ pub async fn get_role( state.clone(), &req, request_payload, - |state, user, payload| async move { + |state, user, payload, _| async move { // TODO: Permissions to be deprecated once groups are stable if respond_with_groups { role_core::get_role_with_groups(state, user, payload).await @@ -156,7 +156,7 @@ pub async fn update_role( state.clone(), &req, json_payload.into_inner(), - |state, user, req| role_core::update_role(state, user, req, &role_id), + |state, user, req, _| role_core::update_role(state, user, req, &role_id), &auth::JWTAuth(Permission::UsersWrite), api_locking::LockAction::NotApplicable, )) diff --git a/crates/router/src/routes/verification.rs b/crates/router/src/routes/verification.rs index 91fd204ba2fc..00662663113a 100644 --- a/crates/router/src/routes/verification.rs +++ b/crates/router/src/routes/verification.rs @@ -22,7 +22,7 @@ pub async fn apple_pay_merchant_registration( state, &req, json_payload.into_inner(), - |state, _, body| { + |state, _, body, _| { verification::verify_merchant_creds_for_applepay( state.clone(), body, @@ -54,7 +54,7 @@ pub async fn retrieve_apple_pay_verified_domains( state, &req, merchant_id.clone(), - |state, _, _| { + |state, _, _, _| { verification::get_verified_apple_domains_with_mid_mca_id( state, merchant_id.to_string(), diff --git a/crates/router/src/routes/verify_connector.rs b/crates/router/src/routes/verify_connector.rs index bfb1b781ada4..c045b3a8e6a5 100644 --- a/crates/router/src/routes/verify_connector.rs +++ b/crates/router/src/routes/verify_connector.rs @@ -20,7 +20,7 @@ pub async fn payment_connector_verify( state, &req, json_payload.into_inner(), - |state, _: (), req| verify_connector::verify_connector_credentials(state, req), + |state, _: (), req, _| verify_connector::verify_connector_credentials(state, req), &auth::JWTAuth(Permission::MerchantConnectorAccountWrite), api_locking::LockAction::NotApplicable, )) diff --git a/crates/router/src/routes/webhook_events.rs b/crates/router/src/routes/webhook_events.rs index 2ee18cbe767c..ef1d64f54e95 100644 --- a/crates/router/src/routes/webhook_events.rs +++ b/crates/router/src/routes/webhook_events.rs @@ -32,7 +32,7 @@ pub async fn list_initial_webhook_delivery_attempts( state, &req, request_internal, - |state, _, request_internal| { + |state, _, request_internal, _| { webhook_events::list_initial_delivery_attempts( state, request_internal.merchant_id_or_profile_id, @@ -71,7 +71,7 @@ pub async fn list_webhook_delivery_attempts( state, &req, request_internal, - |state, _, request_internal| { + |state, _, request_internal, _| { webhook_events::list_delivery_attempts( state, request_internal.merchant_id_or_profile_id, @@ -110,7 +110,7 @@ pub async fn retry_webhook_delivery_attempt( state, &req, request_internal, - |state, _, request_internal| { + |state, _, request_internal, _| { webhook_events::retry_delivery_attempt( state, request_internal.merchant_id_or_profile_id, diff --git a/crates/router/src/routes/webhooks.rs b/crates/router/src/routes/webhooks.rs index 10eb4ef75e4d..b9be68bbd577 100644 --- a/crates/router/src/routes/webhooks.rs +++ b/crates/router/src/routes/webhooks.rs @@ -26,7 +26,7 @@ pub async fn receive_incoming_webhook( state, &req, WebhookBytes(body), - |state, auth, payload| { + |state, auth, payload, _| { webhooks::webhooks_wrapper::( &flow, state.to_owned(), diff --git a/crates/router/src/services/api.rs b/crates/router/src/services/api.rs index 5a139d8da35e..854574069f8d 100644 --- a/crates/router/src/services/api.rs +++ b/crates/router/src/services/api.rs @@ -45,7 +45,7 @@ use crate::{ }, logger, routes::{ - app::AppStateInfo, + app::{AppStateInfo, ReqState}, metrics::{self, request as metrics_request}, AppState, }, @@ -943,23 +943,27 @@ pub enum AuthFlow { Merchant, } -#[instrument(skip(request, payload, state, func, api_auth), fields(merchant_id))] -pub async fn server_wrap_util<'a, 'b, A, U, T, Q, F, Fut, E, OErr>( +#[allow(clippy::too_many_arguments)] +#[instrument( + skip(request, payload, state, func, api_auth, request_state), + fields(merchant_id) +)] +pub async fn server_wrap_util<'a, 'b, U, T, Q, F, Fut, E, OErr>( flow: &'a impl router_env::types::FlowMetric, - state: web::Data, + state: web::Data, + request_state: ReqState, request: &'a HttpRequest, payload: T, func: F, - api_auth: &dyn AuthenticateAndFetch, + api_auth: &dyn AuthenticateAndFetch, lock_action: api_locking::LockAction, ) -> CustomResult, OErr> where - F: Fn(A, U, T) -> Fut, + F: Fn(AppState, U, T, ReqState) -> Fut, 'b: 'a, Fut: Future, E>>, Q: Serialize + Debug + 'a + ApiEventMetric, T: Debug + Serialize + ApiEventMetric, - A: AppStateInfo + Clone, E: ErrorSwitch + error_stack::Context, OErr: ResponseError + error_stack::Context + Serialize, errors::ApiErrorResponse: ErrorSwitch, @@ -969,9 +973,9 @@ where .attach_printable("Unable to extract request id from request") .change_context(errors::ApiErrorResponse::InternalServerError.switch())?; - let mut request_state = state.get_ref().clone(); + let mut app_state = state.get_ref().clone(); - request_state.add_request_id(request_id); + app_state.add_request_id(request_id); let start_instant = Instant::now(); let serialized_request = masking::masked_serialize(&payload) .attach_printable("Failed to serialize json request") @@ -981,7 +985,7 @@ where // Currently auth failures are not recorded as API events let (auth_out, auth_type) = api_auth - .authenticate_and_fetch(request.headers(), &request_state) + .authenticate_and_fetch(request.headers(), &app_state) .await .switch()?; @@ -990,23 +994,23 @@ where .unwrap_or("MERCHANT_ID_NOT_FOUND") .to_string(); - request_state.add_merchant_id(Some(merchant_id.clone())); + app_state.add_merchant_id(Some(merchant_id.clone())); - request_state.add_flow_name(flow.to_string()); + app_state.add_flow_name(flow.to_string()); tracing::Span::current().record("merchant_id", &merchant_id); let output = { lock_action .clone() - .perform_locking_action(&request_state, merchant_id.to_owned()) + .perform_locking_action(&app_state, merchant_id.to_owned()) .await .switch()?; - let res = func(request_state.clone(), auth_out, payload) + let res = func(app_state.clone(), auth_out, payload, request_state) .await .switch(); lock_action - .free_lock_action(&request_state, merchant_id.to_owned()) + .free_lock_action(&app_state, merchant_id.to_owned()) .await .switch()?; res @@ -1082,24 +1086,24 @@ where skip(request, state, func, api_auth, payload), fields(request_method, request_url_path, status_code) )] -pub async fn server_wrap<'a, A, T, U, Q, F, Fut, E>( +pub async fn server_wrap<'a, T, U, Q, F, Fut, E>( flow: impl router_env::types::FlowMetric, - state: web::Data, + state: web::Data, request: &'a HttpRequest, payload: T, func: F, - api_auth: &dyn AuthenticateAndFetch, + api_auth: &dyn AuthenticateAndFetch, lock_action: api_locking::LockAction, ) -> HttpResponse where - F: Fn(A, U, T) -> Fut, + F: Fn(AppState, U, T, ReqState) -> Fut, Fut: Future, E>>, Q: Serialize + Debug + ApiEventMetric + 'a, T: Debug + Serialize + ApiEventMetric, - A: AppStateInfo + Clone, ApplicationResponse: Debug, E: ErrorSwitch + error_stack::Context, { + let req_state = state.get_req_state(); let request_method = request.method().as_str(); let url_path = request.path(); @@ -1136,6 +1140,7 @@ where server_wrap_util( &flow, state.clone(), + req_state, request, payload, func, diff --git a/crates/router/src/services/kafka.rs b/crates/router/src/services/kafka.rs index 166df034cc93..3ae923e246ff 100644 --- a/crates/router/src/services/kafka.rs +++ b/crates/router/src/services/kafka.rs @@ -1,7 +1,9 @@ use std::sync::Arc; +use bigdecimal::ToPrimitive; use common_utils::errors::CustomResult; use error_stack::{report, ResultExt}; +use events::{EventsError, Message, MessagingInterface}; use rdkafka::{ config::FromClientConfig, producer::{BaseRecord, DefaultProducerContext, Producer, ThreadedProducer}, @@ -16,7 +18,7 @@ mod refund; use data_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent}; use diesel_models::refund::Refund; use serde::Serialize; -use time::OffsetDateTime; +use time::{OffsetDateTime, PrimitiveDateTime}; #[cfg(feature = "payouts")] use self::payout::KafkaPayout; @@ -410,3 +412,41 @@ impl Drop for RdKafkaProducer { } } } + +impl MessagingInterface for KafkaProducer { + type MessageClass = EventType; + + fn send_message( + &self, + data: T, + timestamp: PrimitiveDateTime, + ) -> error_stack::Result<(), EventsError> + where + T: Message + masking::ErasedMaskSerialize, + { + let topic = self.get_topic(data.get_message_class()); + let json_data = data + .masked_serialize() + .and_then(|i| serde_json::to_vec(&i)) + .change_context(EventsError::SerializationError)?; + self.producer + .0 + .send( + BaseRecord::to(topic) + .key(&data.identifier()) + .payload(&json_data) + .timestamp( + (timestamp.assume_utc().unix_timestamp_nanos() / 1_000) + .to_i64() + .unwrap_or_else(|| { + // kafka producer accepts milliseconds + // try converting nanos to millis if that fails convert seconds to millis + timestamp.assume_utc().unix_timestamp() * 1_000 + }), + ), + ) + .map_err(|(error, record)| report!(error).attach_printable(format!("{record:?}"))) + .change_context(KafkaError::GenericError) + .change_context(EventsError::PublishError) + } +}