Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(analytics): adding outgoing webhooks kafka event #3140

Merged
merged 18 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
83486f2
feat(analytics): adding outgoing webhooks event
harsh-sharma-juspay Dec 13, 2023
66ebd60
adding outgoing webhooks kafka event
harsh-sharma-juspay Dec 15, 2023
98a29bf
resolved conflicts
harsh-sharma-juspay Dec 15, 2023
a1f65bc
added outgoing-webhook-event-topic
harsh-sharma-juspay Dec 15, 2023
47be290
added outgoing-webhook-event-topic
harsh-sharma-juspay Dec 15, 2023
0060f76
added webhook event metric
harsh-sharma-juspay Dec 15, 2023
2df2e9e
flatten outgoingwebhookeventcontent
harsh-sharma-juspay Dec 15, 2023
626499b
chore: run formatter
hyperswitch-bot[bot] Dec 15, 2023
f2ddb68
Merge branch 'main' of github.com:juspay/hyperswitch into outgoing_we…
harsh-sharma-juspay Dec 18, 2023
8f04ea6
Merge branch 'outgoing_webhook_event' of github.com:juspay/hyperswitc…
harsh-sharma-juspay Dec 18, 2023
347ebaa
updated config files to add outgoing webhook events
harsh-sharma-juspay Dec 27, 2023
1a73560
Merge branch 'main' of github.com:juspay/hyperswitch into outgoing_we…
harsh-sharma-juspay Dec 27, 2023
877ab89
masking the content
harsh-sharma-juspay Dec 27, 2023
6b58e91
added default failed json serializer case and clickhouse sql script f…
harsh-sharma-juspay Dec 28, 2023
6a6e517
Merge branch 'main' of github.com:juspay/hyperswitch into outgoing_we…
harsh-sharma-juspay Dec 28, 2023
78e66e6
fix script
harsh-sharma-juspay Dec 28, 2023
bbd7fbb
fix script
harsh-sharma-juspay Dec 28, 2023
b992f58
Merge branch 'main' into outgoing_webhook_event
harsh-sharma-juspay Jan 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/development.toml
harsh-sharma-juspay marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ attempt_analytics_topic = "hyperswitch-payment-attempt-events"
refund_analytics_topic = "hyperswitch-refund-events"
api_logs_topic = "hyperswitch-api-log-events"
connector_logs_topic = "hyperswitch-connector-api-events"
outgoing_webhook_logs_topic = "hyperswitch-connector-outgoing-webhook-events"
harsh-sharma-juspay marked this conversation as resolved.
Show resolved Hide resolved

[analytics]
source = "sqlx"
Expand Down
4 changes: 2 additions & 2 deletions crates/api_models/src/mandates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct MandateRevokedResponse {
pub status: api_enums::MandateStatus,
}

#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone)]
#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone, PartialEq)]
pub struct MandateResponse {
/// The identifier for mandate
pub mandate_id: String,
Expand All @@ -37,7 +37,7 @@ pub struct MandateResponse {
pub customer_acceptance: Option<payments::CustomerAcceptance>,
}

#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone)]
#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone, PartialEq)]
pub struct MandateCardDetails {
/// The last 4 digits of card
pub last4_digits: Option<String>,
Expand Down
2 changes: 1 addition & 1 deletion crates/router/src/core/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ pub enum KmsError {
Utf8DecodingFailed,
}

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, serde::Serialize)]
pub enum WebhooksFlowError {
#[error("Merchant webhook config not found")]
MerchantConfigNotFound,
Expand Down
37 changes: 33 additions & 4 deletions crates/router/src/core/webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use crate::{
payments, refunds,
},
db::StorageInterface,
events::api_logs::ApiEvent,
events::{
api_logs::ApiEvent,
outgoing_webhook_logs::{OutgoingWebhookEvent, OutgoingWebhookEventMetric},
},
logger,
routes::{app::AppStateInfo, lock_utils, metrics::request::add_attributes, AppState},
services::{self, authentication as auth},
Expand Down Expand Up @@ -731,21 +734,47 @@ pub async fn create_event_and_trigger_outgoing_webhook<W: types::OutgoingWebhook
if state.conf.webhooks.outgoing_enabled {
let outgoing_webhook = api::OutgoingWebhook {
merchant_id: merchant_account.merchant_id.clone(),
event_id: event.event_id,
event_id: event.event_id.clone(),
event_type: event.event_type,
content,
content: content.clone(),
timestamp: event.created_at,
};

let state_clone = state.clone();
// Using a tokio spawn here and not arbiter because not all caller of this function
// may have an actix arbiter
tokio::spawn(async move {
let mut error = None;
let result =
trigger_webhook_to_merchant::<W>(business_profile, outgoing_webhook, state).await;

if let Err(e) = result {
error.replace(
serde_json::to_value(e.current_context())
.into_report()
.attach_printable("Failed to serialize json error response")
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.ok()
.into(),
);
logger::error!(?e);
}
let outgoing_webhook_event_type = content.get_outgoing_webhook_event_type();
let webhook_event = OutgoingWebhookEvent::new(
merchant_account.merchant_id.clone(),
event.event_id.clone(),
event_type,
outgoing_webhook_event_type,
error.is_some(),
error,
);
match webhook_event.clone().try_into() {
Ok(event) => {
state_clone.event_handler().log_event(event);
}
Err(err) => {
logger::error!(error=?err, event=?webhook_event, "Error Logging Outgoing Webhook Event");
}
}
});
}

Expand Down
2 changes: 2 additions & 0 deletions crates/router/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod api_logs;
pub mod connector_api_logs;
pub mod event_logger;
pub mod kafka_handler;
pub mod outgoing_webhook_logs;

pub(super) trait EventHandler: Sync + Send + dyn_clone::DynClone {
fn log_event(&self, event: RawEvent);
Expand All @@ -31,6 +32,7 @@ pub enum EventType {
Refund,
ApiLogs,
ConnectorApiLogs,
OutgoingWebhookLogs,
}

#[derive(Debug, Default, Deserialize, Clone)]
Expand Down
110 changes: 110 additions & 0 deletions crates/router/src/events/outgoing_webhook_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use api_models::{
disputes, enums::EventType as OutgoingWebhookEventType, mandates, payments, refunds,
webhooks::OutgoingWebhookContent,
};
use serde::Serialize;
use time::OffsetDateTime;

use super::{EventType, RawEvent};

#[derive(Clone, Debug, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct OutgoingWebhookEvent {
merchant_id: String,
event_id: String,
event_type: OutgoingWebhookEventType,
#[serde(flatten)]
content: Option<OutgoingWebhookEventContent>,
is_error: bool,
error: Option<serde_json::Value>,
created_at_timestamp: i128,
}

#[derive(Clone, Debug, PartialEq, Serialize)]
#[serde(tag = "outgoing_webhook_event_type", rename_all = "snake_case")]
pub enum OutgoingWebhookEventContent {
Payment {
payment_id: Option<String>,
content: payments::PaymentsResponse,
},
Refund {
payment_id: String,
refund_id: String,
content: refunds::RefundResponse,
},
Dispute {
payment_id: String,
attempt_id: String,
dispute_id: String,
content: disputes::DisputeResponse,
},
Mandate {
payment_method_id: String,
mandate_id: String,
content: mandates::MandateResponse,
},
}
pub trait OutgoingWebhookEventMetric {
fn get_outgoing_webhook_event_type(&self) -> Option<OutgoingWebhookEventContent> {
None
}
harsh-sharma-juspay marked this conversation as resolved.
Show resolved Hide resolved
}
impl OutgoingWebhookEventMetric for OutgoingWebhookContent {
fn get_outgoing_webhook_event_type(&self) -> Option<OutgoingWebhookEventContent> {
match self {
Self::PaymentDetails(payment_payload) => Some(OutgoingWebhookEventContent::Payment {
payment_id: payment_payload.payment_id.clone(),
content: payment_payload.clone(),
harsh-sharma-juspay marked this conversation as resolved.
Show resolved Hide resolved
}),
Self::RefundDetails(refund_payload) => Some(OutgoingWebhookEventContent::Refund {
payment_id: refund_payload.payment_id.clone(),
refund_id: refund_payload.refund_id.clone(),
content: refund_payload.clone(),
}),
Self::DisputeDetails(dispute_payload) => Some(OutgoingWebhookEventContent::Dispute {
payment_id: dispute_payload.payment_id.clone(),
attempt_id: dispute_payload.attempt_id.clone(),
dispute_id: dispute_payload.dispute_id.clone(),
content: *dispute_payload.clone(),
}),
Self::MandateDetails(mandate_payload) => Some(OutgoingWebhookEventContent::Mandate {
payment_method_id: mandate_payload.payment_method_id.clone(),
mandate_id: mandate_payload.mandate_id.clone(),
content: *mandate_payload.clone(),
}),
}
}
}

impl OutgoingWebhookEvent {
pub fn new(
merchant_id: String,
event_id: String,
event_type: OutgoingWebhookEventType,
content: Option<OutgoingWebhookEventContent>,
is_error: bool,
error: Option<serde_json::Value>,
) -> Self {
Self {
merchant_id,
event_id,
event_type,
content,
is_error,
error,
created_at_timestamp: OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000,
}
}
}

impl TryFrom<OutgoingWebhookEvent> for RawEvent {
type Error = serde_json::Error;

fn try_from(value: OutgoingWebhookEvent) -> Result<Self, Self::Error> {
Ok(Self {
event_type: EventType::OutgoingWebhookLogs,
key: value.merchant_id.clone(),
payload: serde_json::to_value(value)?,
})
}
}
4 changes: 4 additions & 0 deletions crates/router/src/services/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub struct KafkaSettings {
refund_analytics_topic: String,
api_logs_topic: String,
connector_logs_topic: String,
outgoing_webhook_logs_topic: String,
}

impl KafkaSettings {
Expand Down Expand Up @@ -140,6 +141,7 @@ pub struct KafkaProducer {
refund_analytics_topic: String,
api_logs_topic: String,
connector_logs_topic: String,
outgoing_webhook_logs_topic: String,
}

struct RdKafkaProducer(ThreadedProducer<DefaultProducerContext>);
Expand Down Expand Up @@ -177,6 +179,7 @@ impl KafkaProducer {
refund_analytics_topic: conf.refund_analytics_topic.clone(),
api_logs_topic: conf.api_logs_topic.clone(),
connector_logs_topic: conf.connector_logs_topic.clone(),
outgoing_webhook_logs_topic: conf.outgoing_webhook_logs_topic.clone(),
})
}

Expand Down Expand Up @@ -309,6 +312,7 @@ impl KafkaProducer {
EventType::PaymentIntent => &self.intent_analytics_topic,
EventType::Refund => &self.refund_analytics_topic,
EventType::ConnectorApiLogs => &self.connector_logs_topic,
EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic,
}
}
}
Expand Down
Loading