From 83486f267549b34181ab4e9372285919f8645708 Mon Sep 17 00:00:00 2001 From: harsh_sharma_juspay Date: Wed, 13 Dec 2023 22:03:08 +0530 Subject: [PATCH 01/12] feat(analytics): adding outgoing webhooks event --- crates/api_models/src/mandates.rs | 4 +- crates/router/src/core/errors.rs | 2 +- crates/router/src/core/webhooks.rs | 36 +++++++++-- crates/router/src/events.rs | 1 + crates/router/src/events/api_logs.rs | 97 +++++++++++++++++++++++++++- crates/router/src/services/kafka.rs | 4 ++ 6 files changed, 135 insertions(+), 9 deletions(-) diff --git a/crates/api_models/src/mandates.rs b/crates/api_models/src/mandates.rs index 035f7adec9f7..c474ef58bdd2 100644 --- a/crates/api_models/src/mandates.rs +++ b/crates/api_models/src/mandates.rs @@ -19,7 +19,7 @@ pub struct MandateRevokedResponse { pub status: api_enums::MandateStatus, } -#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone)] +#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone, PartialEq)] pub struct MandateResponse { /// The identifier for mandate pub mandate_id: String, @@ -37,7 +37,7 @@ pub struct MandateResponse { pub customer_acceptance: Option, } -#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone)] +#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone, PartialEq)] pub struct MandateCardDetails { /// The last 4 digits of card pub last4_digits: Option, diff --git a/crates/router/src/core/errors.rs b/crates/router/src/core/errors.rs index 054f4053504e..cbc4290f63bb 100644 --- a/crates/router/src/core/errors.rs +++ b/crates/router/src/core/errors.rs @@ -226,7 +226,7 @@ pub enum KmsError { Utf8DecodingFailed, } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, serde::Serialize)] pub enum WebhooksFlowError { #[error("Merchant webhook config not found")] MerchantConfigNotFound, diff --git a/crates/router/src/core/webhooks.rs b/crates/router/src/core/webhooks.rs index be8d118a47c2..553b245b5f4c 100644 --- a/crates/router/src/core/webhooks.rs +++ b/crates/router/src/core/webhooks.rs @@ -8,7 +8,7 @@ use api_models::{ payments::HeaderPayload, webhooks::{self, WebhookResponseTracker}, }; -use common_utils::{errors::ReportSwitchExt, events::ApiEventsType}; +use common_utils::{errors::{ReportSwitchExt,ErrorSwitch}, events::ApiEventsType}; use error_stack::{report, IntoReport, ResultExt}; use masking::ExposeInterface; use router_env::{instrument, tracing, tracing_actix_web::RequestId}; @@ -25,7 +25,7 @@ use crate::{ payments, refunds, }, db::StorageInterface, - events::api_logs::ApiEvent, + events::api_logs::{ApiEvent, OutgoingWebhookEvent, OutgoingWebhookEventMetric}, logger, routes::{app::AppStateInfo, lock_utils, metrics::request::add_attributes, AppState}, services::{self, authentication as auth}, @@ -39,6 +39,7 @@ use crate::{ utils::{self as helper_utils, generate_id, Encode, OptionExt, ValueExt}, }; + const OUTGOING_WEBHOOK_TIMEOUT_SECS: u64 = 5; const MERCHANT_ID: &str = "merchant_id"; @@ -732,12 +733,13 @@ pub async fn create_event_and_trigger_outgoing_webhook(business_profile, outgoing_webhook, &state).await; if let Err(e) = result { + is_error.replace(true); + error.replace(serde_json::to_value(e.current_context()) + .into_report() + .attach_printable("Failed to serialize json error response") + .change_context(errors::ApiErrorResponse::InternalServerError.switch()) + .ok() + .into()); logger::error!(?e); } }); + let outgoing_webhook_event_type = content.get_outgoing_webhook_event_type(); + let webhook_event = OutgoingWebhookEvent::new( + merchant_account.merchant_id.clone(), + event.event_id.clone(), + event_type, + outgoing_webhook_event_type, + is_error.unwrap_or(false), + error, + ); + match webhook_event.clone().try_into() { + Ok(event) => { + state.event_handler().log_event(event); + } + Err(err) => { + logger::error!(error=?err, event=?webhook_event, "Error Logging Outgoing Webhook Event"); + } + } } Ok(()) diff --git a/crates/router/src/events.rs b/crates/router/src/events.rs index 8f980fee504a..961ef56c4ea6 100644 --- a/crates/router/src/events.rs +++ b/crates/router/src/events.rs @@ -29,6 +29,7 @@ pub enum EventType { PaymentAttempt, Refund, ApiLogs, + OutgoingWebhookLogs, } #[derive(Debug, Default, Deserialize, Clone)] diff --git a/crates/router/src/events/api_logs.rs b/crates/router/src/events/api_logs.rs index bfc10f722c1f..74d24fb9053f 100644 --- a/crates/router/src/events/api_logs.rs +++ b/crates/router/src/events/api_logs.rs @@ -20,7 +20,7 @@ use crate::{ AttachEvidenceRequest, Config, ConfigUpdate, CreateFileRequest, DisputeId, FileId, }, }; - +use api_models::{enums::EventType as OutgoingWebhookEventType, webhooks::OutgoingWebhookContent, payments, refunds, disputes, mandates}; #[derive(Clone, Debug, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] pub struct ApiEvent { @@ -100,6 +100,101 @@ impl TryFrom for RawEvent { } } +#[derive(Clone, Debug, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] +pub struct OutgoingWebhookEvent { + merchant_id: String, + event_id: String, + event_type: OutgoingWebhookEventType, + #[serde(flatten)] + content: Option, + is_error: bool, + error: Option, + created_at_timestamp: i128, +} + +#[derive(Clone, Debug, PartialEq, Serialize)] +#[serde( + tag = "outgoing_webhook_event_type", + content = "payload", + rename_all = "snake_case" +)] +pub enum OutgoingWebhookEventContent{ + Payment{payment_id: Option,content: payments::PaymentsResponse,}, + Refund{payment_id: String, refund_id: String, content: refunds::RefundResponse,}, + Dispute{payment_id: String, attempt_id: String, dispute_id: String, content: disputes::DisputeResponse,}, + Mandate{payment_method_id: String, mandate_id: String, content: mandates::MandateResponse,}, +} +pub trait OutgoingWebhookEventMetric { + fn get_outgoing_webhook_event_type(&self) -> Option { + None + } +} +impl OutgoingWebhookEventMetric for OutgoingWebhookContent{ + fn get_outgoing_webhook_event_type(&self) -> Option { + match self { + Self::PaymentDetails(reponse) => Some(OutgoingWebhookEventContent::Payment{ + payment_id: reponse.payment_id.clone(), + content: reponse.clone(), + }), + Self::RefundDetails(reponse) => Some(OutgoingWebhookEventContent::Refund{ + payment_id: reponse.payment_id.clone(), + refund_id: reponse.refund_id.clone(), + content: reponse.clone(), + }), + Self::DisputeDetails(reponse) => Some(OutgoingWebhookEventContent::Dispute{ + payment_id: reponse.payment_id.clone(), + attempt_id: reponse.attempt_id.clone(), + dispute_id: reponse.dispute_id.clone(), + content: *reponse.clone(), + }), + Self::MandateDetails(reponse) => Some(OutgoingWebhookEventContent::Mandate{ + payment_method_id: reponse.payment_method_id.clone(), + mandate_id: reponse.mandate_id.clone(), + content: *reponse.clone(), + }), + } + } +} + + +impl OutgoingWebhookEvent { + pub fn new( + merchant_id: String, + event_id: String, + event_type: OutgoingWebhookEventType, + content: Option, + is_error: bool, + error: Option, + ) -> Self { + Self { + merchant_id, + event_id, + event_type, + content, + is_error, + error, + created_at_timestamp: OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000, + } + } +} + + +impl TryFrom for RawEvent { + type Error = serde_json::Error; + + fn try_from(value: OutgoingWebhookEvent) -> Result { + Ok(Self { + event_type: EventType::OutgoingWebhookLogs, + key: value.merchant_id.clone(), + payload: serde_json::to_value(value)?, + }) + } +} + + + + impl ApiEventMetric for ApplicationResponse { fn get_api_event_type(&self) -> Option { match self { diff --git a/crates/router/src/services/kafka.rs b/crates/router/src/services/kafka.rs index 497ac16721b5..fca623d3ec32 100644 --- a/crates/router/src/services/kafka.rs +++ b/crates/router/src/services/kafka.rs @@ -83,6 +83,7 @@ pub struct KafkaSettings { attempt_analytics_topic: String, refund_analytics_topic: String, api_logs_topic: String, + outgoing_webhook_logs_topic: String, } impl KafkaSettings { @@ -130,6 +131,7 @@ pub struct KafkaProducer { attempt_analytics_topic: String, refund_analytics_topic: String, api_logs_topic: String, + outgoing_webhook_logs_topic: String, } struct RdKafkaProducer(ThreadedProducer); @@ -166,6 +168,7 @@ impl KafkaProducer { attempt_analytics_topic: conf.attempt_analytics_topic.clone(), refund_analytics_topic: conf.refund_analytics_topic.clone(), api_logs_topic: conf.api_logs_topic.clone(), + outgoing_webhook_logs_topic: conf.outgoing_webhook_logs_topic.clone(), }) } @@ -297,6 +300,7 @@ impl KafkaProducer { EventType::PaymentAttempt => &self.attempt_analytics_topic, EventType::PaymentIntent => &self.intent_analytics_topic, EventType::Refund => &self.refund_analytics_topic, + EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic, } } } From 66ebd60bef1044522d607d9b23ba949ab7869599 Mon Sep 17 00:00:00 2001 From: harsh_sharma_juspay Date: Fri, 15 Dec 2023 16:11:19 +0530 Subject: [PATCH 02/12] adding outgoing webhooks kafka event --- crates/router/src/core/webhooks.rs | 52 ++++++++++++++-------------- crates/router/src/events/api_logs.rs | 46 +++++++++++++++--------- 2 files changed, 56 insertions(+), 42 deletions(-) diff --git a/crates/router/src/core/webhooks.rs b/crates/router/src/core/webhooks.rs index 553b245b5f4c..d82c742a34cf 100644 --- a/crates/router/src/core/webhooks.rs +++ b/crates/router/src/core/webhooks.rs @@ -8,7 +8,7 @@ use api_models::{ payments::HeaderPayload, webhooks::{self, WebhookResponseTracker}, }; -use common_utils::{errors::{ReportSwitchExt,ErrorSwitch}, events::ApiEventsType}; +use common_utils::{errors::ReportSwitchExt, events::ApiEventsType}; use error_stack::{report, IntoReport, ResultExt}; use masking::ExposeInterface; use router_env::{instrument, tracing, tracing_actix_web::RequestId}; @@ -39,7 +39,6 @@ use crate::{ utils::{self as helper_utils, generate_id, Encode, OptionExt, ValueExt}, }; - const OUTGOING_WEBHOOK_TIMEOUT_SECS: u64 = 5; const MERCHANT_ID: &str = "merchant_id"; @@ -735,45 +734,46 @@ pub async fn create_event_and_trigger_outgoing_webhook(business_profile, outgoing_webhook, &state).await; if let Err(e) = result { - is_error.replace(true); - error.replace(serde_json::to_value(e.current_context()) + error.replace( + serde_json::to_value(e.current_context()) .into_report() .attach_printable("Failed to serialize json error response") - .change_context(errors::ApiErrorResponse::InternalServerError.switch()) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) .ok() - .into()); + .into(), + ); logger::error!(?e); } - }); - let outgoing_webhook_event_type = content.get_outgoing_webhook_event_type(); - let webhook_event = OutgoingWebhookEvent::new( - merchant_account.merchant_id.clone(), - event.event_id.clone(), - event_type, - outgoing_webhook_event_type, - is_error.unwrap_or(false), - error, - ); - match webhook_event.clone().try_into() { - Ok(event) => { - state.event_handler().log_event(event); - } - Err(err) => { - logger::error!(error=?err, event=?webhook_event, "Error Logging Outgoing Webhook Event"); + let outgoing_webhook_event_type = content.get_outgoing_webhook_event_type(); + let webhook_event = OutgoingWebhookEvent::new( + merchant_account.merchant_id.clone(), + event.event_id.clone(), + event_type, + outgoing_webhook_event_type, + if error.is_some() { true } else { false }, + error, + ); + match webhook_event.clone().try_into() { + Ok(event) => { + state.event_handler().log_event(event); + } + Err(err) => { + logger::error!(error=?err, event=?webhook_event, "Error Logging Outgoing Webhook Event"); + } } - } + }); } Ok(()) diff --git a/crates/router/src/events/api_logs.rs b/crates/router/src/events/api_logs.rs index 74d24fb9053f..b2f0627ae51e 100644 --- a/crates/router/src/events/api_logs.rs +++ b/crates/router/src/events/api_logs.rs @@ -1,4 +1,8 @@ use actix_web::HttpRequest; +use api_models::{ + disputes, enums::EventType as OutgoingWebhookEventType, mandates, payments, refunds, + webhooks::OutgoingWebhookContent, +}; pub use common_utils::events::{ApiEventMetric, ApiEventsType}; use common_utils::impl_misc_api_event_type; use router_env::{tracing_actix_web::RequestId, types::FlowMetric}; @@ -20,7 +24,6 @@ use crate::{ AttachEvidenceRequest, Config, ConfigUpdate, CreateFileRequest, DisputeId, FileId, }, }; -use api_models::{enums::EventType as OutgoingWebhookEventType, webhooks::OutgoingWebhookContent, payments, refunds, disputes, mandates}; #[derive(Clone, Debug, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] pub struct ApiEvent { @@ -119,36 +122,52 @@ pub struct OutgoingWebhookEvent { content = "payload", rename_all = "snake_case" )] -pub enum OutgoingWebhookEventContent{ - Payment{payment_id: Option,content: payments::PaymentsResponse,}, - Refund{payment_id: String, refund_id: String, content: refunds::RefundResponse,}, - Dispute{payment_id: String, attempt_id: String, dispute_id: String, content: disputes::DisputeResponse,}, - Mandate{payment_method_id: String, mandate_id: String, content: mandates::MandateResponse,}, +pub enum OutgoingWebhookEventContent { + Payment { + payment_id: Option, + content: payments::PaymentsResponse, + }, + Refund { + payment_id: String, + refund_id: String, + content: refunds::RefundResponse, + }, + Dispute { + payment_id: String, + attempt_id: String, + dispute_id: String, + content: disputes::DisputeResponse, + }, + Mandate { + payment_method_id: String, + mandate_id: String, + content: mandates::MandateResponse, + }, } pub trait OutgoingWebhookEventMetric { fn get_outgoing_webhook_event_type(&self) -> Option { None } } -impl OutgoingWebhookEventMetric for OutgoingWebhookContent{ +impl OutgoingWebhookEventMetric for OutgoingWebhookContent { fn get_outgoing_webhook_event_type(&self) -> Option { match self { - Self::PaymentDetails(reponse) => Some(OutgoingWebhookEventContent::Payment{ + Self::PaymentDetails(reponse) => Some(OutgoingWebhookEventContent::Payment { payment_id: reponse.payment_id.clone(), content: reponse.clone(), }), - Self::RefundDetails(reponse) => Some(OutgoingWebhookEventContent::Refund{ + Self::RefundDetails(reponse) => Some(OutgoingWebhookEventContent::Refund { payment_id: reponse.payment_id.clone(), refund_id: reponse.refund_id.clone(), content: reponse.clone(), }), - Self::DisputeDetails(reponse) => Some(OutgoingWebhookEventContent::Dispute{ + Self::DisputeDetails(reponse) => Some(OutgoingWebhookEventContent::Dispute { payment_id: reponse.payment_id.clone(), attempt_id: reponse.attempt_id.clone(), dispute_id: reponse.dispute_id.clone(), content: *reponse.clone(), }), - Self::MandateDetails(reponse) => Some(OutgoingWebhookEventContent::Mandate{ + Self::MandateDetails(reponse) => Some(OutgoingWebhookEventContent::Mandate { payment_method_id: reponse.payment_method_id.clone(), mandate_id: reponse.mandate_id.clone(), content: *reponse.clone(), @@ -157,7 +176,6 @@ impl OutgoingWebhookEventMetric for OutgoingWebhookContent{ } } - impl OutgoingWebhookEvent { pub fn new( merchant_id: String, @@ -179,7 +197,6 @@ impl OutgoingWebhookEvent { } } - impl TryFrom for RawEvent { type Error = serde_json::Error; @@ -192,9 +209,6 @@ impl TryFrom for RawEvent { } } - - - impl ApiEventMetric for ApplicationResponse { fn get_api_event_type(&self) -> Option { match self { From a1f65bc51c8dd2a4e17c2138906f0ebe5681747d Mon Sep 17 00:00:00 2001 From: harsh_sharma_juspay Date: Fri, 15 Dec 2023 16:28:47 +0530 Subject: [PATCH 03/12] added outgoing-webhook-event-topic --- config/development.toml | 1 + crates/router/src/core/webhooks.rs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/config/development.toml b/config/development.toml index 9646a0a0456d..5e60c44b4eb9 100644 --- a/config/development.toml +++ b/config/development.toml @@ -518,6 +518,7 @@ attempt_analytics_topic = "hyperswitch-payment-attempt-events" refund_analytics_topic = "hyperswitch-refund-events" api_logs_topic = "hyperswitch-api-log-events" connector_logs_topic = "hyperswitch-connector-api-events" +outgoing_webhook_logs_topic = "hyperswitch-connector-outgoing-webhook-events" [analytics] source = "sqlx" diff --git a/crates/router/src/core/webhooks.rs b/crates/router/src/core/webhooks.rs index df4d25136020..a11e2b05a34e 100644 --- a/crates/router/src/core/webhooks.rs +++ b/crates/router/src/core/webhooks.rs @@ -736,7 +736,7 @@ pub async fn create_event_and_trigger_outgoing_webhook { - state.event_handler().log_event(event); + state_clone.event_handler().log_event(event); } Err(err) => { logger::error!(error=?err, event=?webhook_event, "Error Logging Outgoing Webhook Event"); From 47be290634906e99df7c4b817ef562c6f5918dff Mon Sep 17 00:00:00 2001 From: harsh_sharma_juspay Date: Fri, 15 Dec 2023 16:40:49 +0530 Subject: [PATCH 04/12] added outgoing-webhook-event-topic --- crates/router/src/core/webhooks.rs | 5 +- crates/router/src/events.rs | 1 + crates/router/src/events/api_logs.rs | 111 +---------------- .../src/events/outgoing_webhook_logs.rs | 114 ++++++++++++++++++ 4 files changed, 120 insertions(+), 111 deletions(-) create mode 100644 crates/router/src/events/outgoing_webhook_logs.rs diff --git a/crates/router/src/core/webhooks.rs b/crates/router/src/core/webhooks.rs index a11e2b05a34e..8959be755ecf 100644 --- a/crates/router/src/core/webhooks.rs +++ b/crates/router/src/core/webhooks.rs @@ -25,7 +25,10 @@ use crate::{ payments, refunds, }, db::StorageInterface, - events::api_logs::{ApiEvent, OutgoingWebhookEvent, OutgoingWebhookEventMetric}, + events::{ + api_logs::ApiEvent, + outgoing_webhook_logs::{OutgoingWebhookEvent, OutgoingWebhookEventMetric}, + }, logger, routes::{app::AppStateInfo, lock_utils, metrics::request::add_attributes, AppState}, services::{self, authentication as auth}, diff --git a/crates/router/src/events.rs b/crates/router/src/events.rs index 1af71ae815e7..0091de588f13 100644 --- a/crates/router/src/events.rs +++ b/crates/router/src/events.rs @@ -9,6 +9,7 @@ pub mod api_logs; pub mod connector_api_logs; pub mod event_logger; pub mod kafka_handler; +pub mod outgoing_webhook_logs; pub(super) trait EventHandler: Sync + Send + dyn_clone::DynClone { fn log_event(&self, event: RawEvent); diff --git a/crates/router/src/events/api_logs.rs b/crates/router/src/events/api_logs.rs index b2f0627ae51e..bfc10f722c1f 100644 --- a/crates/router/src/events/api_logs.rs +++ b/crates/router/src/events/api_logs.rs @@ -1,8 +1,4 @@ use actix_web::HttpRequest; -use api_models::{ - disputes, enums::EventType as OutgoingWebhookEventType, mandates, payments, refunds, - webhooks::OutgoingWebhookContent, -}; pub use common_utils::events::{ApiEventMetric, ApiEventsType}; use common_utils::impl_misc_api_event_type; use router_env::{tracing_actix_web::RequestId, types::FlowMetric}; @@ -24,6 +20,7 @@ use crate::{ AttachEvidenceRequest, Config, ConfigUpdate, CreateFileRequest, DisputeId, FileId, }, }; + #[derive(Clone, Debug, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] pub struct ApiEvent { @@ -103,112 +100,6 @@ impl TryFrom for RawEvent { } } -#[derive(Clone, Debug, PartialEq, Serialize)] -#[serde(rename_all = "snake_case")] -pub struct OutgoingWebhookEvent { - merchant_id: String, - event_id: String, - event_type: OutgoingWebhookEventType, - #[serde(flatten)] - content: Option, - is_error: bool, - error: Option, - created_at_timestamp: i128, -} - -#[derive(Clone, Debug, PartialEq, Serialize)] -#[serde( - tag = "outgoing_webhook_event_type", - content = "payload", - rename_all = "snake_case" -)] -pub enum OutgoingWebhookEventContent { - Payment { - payment_id: Option, - content: payments::PaymentsResponse, - }, - Refund { - payment_id: String, - refund_id: String, - content: refunds::RefundResponse, - }, - Dispute { - payment_id: String, - attempt_id: String, - dispute_id: String, - content: disputes::DisputeResponse, - }, - Mandate { - payment_method_id: String, - mandate_id: String, - content: mandates::MandateResponse, - }, -} -pub trait OutgoingWebhookEventMetric { - fn get_outgoing_webhook_event_type(&self) -> Option { - None - } -} -impl OutgoingWebhookEventMetric for OutgoingWebhookContent { - fn get_outgoing_webhook_event_type(&self) -> Option { - match self { - Self::PaymentDetails(reponse) => Some(OutgoingWebhookEventContent::Payment { - payment_id: reponse.payment_id.clone(), - content: reponse.clone(), - }), - Self::RefundDetails(reponse) => Some(OutgoingWebhookEventContent::Refund { - payment_id: reponse.payment_id.clone(), - refund_id: reponse.refund_id.clone(), - content: reponse.clone(), - }), - Self::DisputeDetails(reponse) => Some(OutgoingWebhookEventContent::Dispute { - payment_id: reponse.payment_id.clone(), - attempt_id: reponse.attempt_id.clone(), - dispute_id: reponse.dispute_id.clone(), - content: *reponse.clone(), - }), - Self::MandateDetails(reponse) => Some(OutgoingWebhookEventContent::Mandate { - payment_method_id: reponse.payment_method_id.clone(), - mandate_id: reponse.mandate_id.clone(), - content: *reponse.clone(), - }), - } - } -} - -impl OutgoingWebhookEvent { - pub fn new( - merchant_id: String, - event_id: String, - event_type: OutgoingWebhookEventType, - content: Option, - is_error: bool, - error: Option, - ) -> Self { - Self { - merchant_id, - event_id, - event_type, - content, - is_error, - error, - created_at_timestamp: OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000, - } - } -} - -impl TryFrom for RawEvent { - type Error = serde_json::Error; - - fn try_from(value: OutgoingWebhookEvent) -> Result { - Ok(Self { - event_type: EventType::OutgoingWebhookLogs, - key: value.merchant_id.clone(), - payload: serde_json::to_value(value)?, - }) - } -} - impl ApiEventMetric for ApplicationResponse { fn get_api_event_type(&self) -> Option { match self { diff --git a/crates/router/src/events/outgoing_webhook_logs.rs b/crates/router/src/events/outgoing_webhook_logs.rs new file mode 100644 index 000000000000..eb924aa52005 --- /dev/null +++ b/crates/router/src/events/outgoing_webhook_logs.rs @@ -0,0 +1,114 @@ +use api_models::{ + disputes, enums::EventType as OutgoingWebhookEventType, mandates, payments, refunds, + webhooks::OutgoingWebhookContent, +}; +use serde::Serialize; +use time::OffsetDateTime; + +use super::{EventType, RawEvent}; + +#[derive(Clone, Debug, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] +pub struct OutgoingWebhookEvent { + merchant_id: String, + event_id: String, + event_type: OutgoingWebhookEventType, + #[serde(flatten)] + content: Option, + is_error: bool, + error: Option, + created_at_timestamp: i128, +} + +#[derive(Clone, Debug, PartialEq, Serialize)] +#[serde( + tag = "outgoing_webhook_event_type", + content = "payload", + rename_all = "snake_case" +)] +pub enum OutgoingWebhookEventContent { + Payment { + payment_id: Option, + content: payments::PaymentsResponse, + }, + Refund { + payment_id: String, + refund_id: String, + content: refunds::RefundResponse, + }, + Dispute { + payment_id: String, + attempt_id: String, + dispute_id: String, + content: disputes::DisputeResponse, + }, + Mandate { + payment_method_id: String, + mandate_id: String, + content: mandates::MandateResponse, + }, +} +pub trait OutgoingWebhookEventMetric { + fn get_outgoing_webhook_event_type(&self) -> Option { + None + } +} +impl OutgoingWebhookEventMetric for OutgoingWebhookContent { + fn get_outgoing_webhook_event_type(&self) -> Option { + match self { + Self::PaymentDetails(reponse) => Some(OutgoingWebhookEventContent::Payment { + payment_id: reponse.payment_id.clone(), + content: reponse.clone(), + }), + Self::RefundDetails(reponse) => Some(OutgoingWebhookEventContent::Refund { + payment_id: reponse.payment_id.clone(), + refund_id: reponse.refund_id.clone(), + content: reponse.clone(), + }), + Self::DisputeDetails(reponse) => Some(OutgoingWebhookEventContent::Dispute { + payment_id: reponse.payment_id.clone(), + attempt_id: reponse.attempt_id.clone(), + dispute_id: reponse.dispute_id.clone(), + content: *reponse.clone(), + }), + Self::MandateDetails(reponse) => Some(OutgoingWebhookEventContent::Mandate { + payment_method_id: reponse.payment_method_id.clone(), + mandate_id: reponse.mandate_id.clone(), + content: *reponse.clone(), + }), + } + } +} + +impl OutgoingWebhookEvent { + pub fn new( + merchant_id: String, + event_id: String, + event_type: OutgoingWebhookEventType, + content: Option, + is_error: bool, + error: Option, + ) -> Self { + Self { + merchant_id, + event_id, + event_type, + content, + is_error, + error, + created_at_timestamp: OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000, + } + } +} + +impl TryFrom for RawEvent { + type Error = serde_json::Error; + + fn try_from(value: OutgoingWebhookEvent) -> Result { + Ok(Self { + event_type: EventType::OutgoingWebhookLogs, + key: value.merchant_id.clone(), + payload: serde_json::to_value(value)?, + }) + } +} From 0060f7628b056ff1c8b989662d02ab5617d876e2 Mon Sep 17 00:00:00 2001 From: harsh_sharma_juspay Date: Fri, 15 Dec 2023 16:49:50 +0530 Subject: [PATCH 05/12] added webhook event metric --- crates/router/src/core/webhooks.rs | 2 +- .../src/events/outgoing_webhook_logs.rs | 32 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/crates/router/src/core/webhooks.rs b/crates/router/src/core/webhooks.rs index 8959be755ecf..c7e7548f00bd 100644 --- a/crates/router/src/core/webhooks.rs +++ b/crates/router/src/core/webhooks.rs @@ -764,7 +764,7 @@ pub async fn create_event_and_trigger_outgoing_webhook Option { match self { - Self::PaymentDetails(reponse) => Some(OutgoingWebhookEventContent::Payment { - payment_id: reponse.payment_id.clone(), - content: reponse.clone(), + Self::PaymentDetails(payment_payload) => Some(OutgoingWebhookEventContent::Payment { + payment_id: payment_payload.payment_id.clone(), + content: payment_payload.clone(), }), - Self::RefundDetails(reponse) => Some(OutgoingWebhookEventContent::Refund { - payment_id: reponse.payment_id.clone(), - refund_id: reponse.refund_id.clone(), - content: reponse.clone(), + Self::RefundDetails(refund_payload) => Some(OutgoingWebhookEventContent::Refund { + payment_id: refund_payload.payment_id.clone(), + refund_id: refund_payload.refund_id.clone(), + content: refund_payload.clone(), }), - Self::DisputeDetails(reponse) => Some(OutgoingWebhookEventContent::Dispute { - payment_id: reponse.payment_id.clone(), - attempt_id: reponse.attempt_id.clone(), - dispute_id: reponse.dispute_id.clone(), - content: *reponse.clone(), + Self::DisputeDetails(dispute_payload) => Some(OutgoingWebhookEventContent::Dispute { + payment_id: dispute_payload.payment_id.clone(), + attempt_id: dispute_payload.attempt_id.clone(), + dispute_id: dispute_payload.dispute_id.clone(), + content: *dispute_payload.clone(), }), - Self::MandateDetails(reponse) => Some(OutgoingWebhookEventContent::Mandate { - payment_method_id: reponse.payment_method_id.clone(), - mandate_id: reponse.mandate_id.clone(), - content: *reponse.clone(), + Self::MandateDetails(mandate_payload) => Some(OutgoingWebhookEventContent::Mandate { + payment_method_id: mandate_payload.payment_method_id.clone(), + mandate_id: mandate_payload.mandate_id.clone(), + content: *mandate_payload.clone(), }), } } From 2df2e9e9ec795f2f75bde5b4070e57f335ce1343 Mon Sep 17 00:00:00 2001 From: harsh_sharma_juspay Date: Fri, 15 Dec 2023 19:03:22 +0530 Subject: [PATCH 06/12] flatten outgoingwebhookeventcontent --- crates/router/src/events/outgoing_webhook_logs.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/router/src/events/outgoing_webhook_logs.rs b/crates/router/src/events/outgoing_webhook_logs.rs index 2511f372fcaf..927f03e5312a 100644 --- a/crates/router/src/events/outgoing_webhook_logs.rs +++ b/crates/router/src/events/outgoing_webhook_logs.rs @@ -23,7 +23,6 @@ pub struct OutgoingWebhookEvent { #[derive(Clone, Debug, PartialEq, Serialize)] #[serde( tag = "outgoing_webhook_event_type", - content = "payload", rename_all = "snake_case" )] pub enum OutgoingWebhookEventContent { From 626499b416f1de39e04401e398fbf61ff1ed1282 Mon Sep 17 00:00:00 2001 From: "hyperswitch-bot[bot]" <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Date: Fri, 15 Dec 2023 13:33:58 +0000 Subject: [PATCH 07/12] chore: run formatter --- crates/router/src/events/outgoing_webhook_logs.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/router/src/events/outgoing_webhook_logs.rs b/crates/router/src/events/outgoing_webhook_logs.rs index 927f03e5312a..10e45489f390 100644 --- a/crates/router/src/events/outgoing_webhook_logs.rs +++ b/crates/router/src/events/outgoing_webhook_logs.rs @@ -21,10 +21,7 @@ pub struct OutgoingWebhookEvent { } #[derive(Clone, Debug, PartialEq, Serialize)] -#[serde( - tag = "outgoing_webhook_event_type", - rename_all = "snake_case" -)] +#[serde(tag = "outgoing_webhook_event_type", rename_all = "snake_case")] pub enum OutgoingWebhookEventContent { Payment { payment_id: Option, From 347ebaa39ddb1812074cdc3e1c5e2d4deb1cb2d6 Mon Sep 17 00:00:00 2001 From: harsh_sharma_juspay Date: Wed, 27 Dec 2023 13:27:12 +0530 Subject: [PATCH 08/12] updated config files to add outgoing webhook events --- config/config.example.toml | 13 +++++++------ config/development.toml | 2 +- config/docker_compose.toml | 1 + 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/config/config.example.toml b/config/config.example.toml index 7b4380ba2db0..e9cbaf17cb40 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -523,9 +523,10 @@ enabled = true # Switch to enable or disable PayPal onboarding source = "logs" # The event sink to push events supports kafka or logs (stdout) [events.kafka] -brokers = [] # Kafka broker urls for bootstrapping the client -intent_analytics_topic = "topic" # Kafka topic to be used for PaymentIntent events -attempt_analytics_topic = "topic" # Kafka topic to be used for PaymentAttempt events -refund_analytics_topic = "topic" # Kafka topic to be used for Refund events -api_logs_topic = "topic" # Kafka topic to be used for incoming api events -connector_logs_topic = "topic" # Kafka topic to be used for connector api events \ No newline at end of file +brokers = [] # Kafka broker urls for bootstrapping the client +intent_analytics_topic = "topic" # Kafka topic to be used for PaymentIntent events +attempt_analytics_topic = "topic" # Kafka topic to be used for PaymentAttempt events +refund_analytics_topic = "topic" # Kafka topic to be used for Refund events +api_logs_topic = "topic" # Kafka topic to be used for incoming api events +connector_logs_topic = "topic" # Kafka topic to be used for connector api events +outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events \ No newline at end of file diff --git a/config/development.toml b/config/development.toml index bc36ee564ac1..71e1c8897a13 100644 --- a/config/development.toml +++ b/config/development.toml @@ -518,7 +518,7 @@ attempt_analytics_topic = "hyperswitch-payment-attempt-events" refund_analytics_topic = "hyperswitch-refund-events" api_logs_topic = "hyperswitch-api-log-events" connector_logs_topic = "hyperswitch-connector-api-events" -outgoing_webhook_logs_topic = "hyperswitch-connector-outgoing-webhook-events" +outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events" [analytics] source = "sqlx" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index eab1ea5408c0..3b61cecefff7 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -365,6 +365,7 @@ attempt_analytics_topic = "hyperswitch-payment-attempt-events" refund_analytics_topic = "hyperswitch-refund-events" api_logs_topic = "hyperswitch-api-log-events" connector_logs_topic = "hyperswitch-connector-api-events" +outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events" [analytics] source = "sqlx" From 877ab89b5383e167b11ac74bdd1c27b694e2752d Mon Sep 17 00:00:00 2001 From: harsh_sharma_juspay Date: Wed, 27 Dec 2023 13:56:55 +0530 Subject: [PATCH 09/12] masking the content --- config/config.example.toml | 2 +- crates/api_models/src/mandates.rs | 4 +-- .../src/events/outgoing_webhook_logs.rs | 26 +++++++++---------- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/config/config.example.toml b/config/config.example.toml index e0450b0a8866..21a3ba6de93c 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -530,4 +530,4 @@ attempt_analytics_topic = "topic" # Kafka topic to be used for PaymentAt refund_analytics_topic = "topic" # Kafka topic to be used for Refund events api_logs_topic = "topic" # Kafka topic to be used for incoming api events connector_logs_topic = "topic" # Kafka topic to be used for connector api events -outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events \ No newline at end of file +outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events diff --git a/crates/api_models/src/mandates.rs b/crates/api_models/src/mandates.rs index c474ef58bdd2..035f7adec9f7 100644 --- a/crates/api_models/src/mandates.rs +++ b/crates/api_models/src/mandates.rs @@ -19,7 +19,7 @@ pub struct MandateRevokedResponse { pub status: api_enums::MandateStatus, } -#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone, PartialEq)] +#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone)] pub struct MandateResponse { /// The identifier for mandate pub mandate_id: String, @@ -37,7 +37,7 @@ pub struct MandateResponse { pub customer_acceptance: Option, } -#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone, PartialEq)] +#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone)] pub struct MandateCardDetails { /// The last 4 digits of card pub last4_digits: Option, diff --git a/crates/router/src/events/outgoing_webhook_logs.rs b/crates/router/src/events/outgoing_webhook_logs.rs index 10e45489f390..ad898f69456d 100644 --- a/crates/router/src/events/outgoing_webhook_logs.rs +++ b/crates/router/src/events/outgoing_webhook_logs.rs @@ -1,8 +1,6 @@ -use api_models::{ - disputes, enums::EventType as OutgoingWebhookEventType, mandates, payments, refunds, - webhooks::OutgoingWebhookContent, -}; +use api_models::{enums::EventType as OutgoingWebhookEventType, webhooks::OutgoingWebhookContent}; use serde::Serialize; +use serde_json::Value; use time::OffsetDateTime; use super::{EventType, RawEvent}; @@ -16,7 +14,7 @@ pub struct OutgoingWebhookEvent { #[serde(flatten)] content: Option, is_error: bool, - error: Option, + error: Option, created_at_timestamp: i128, } @@ -25,23 +23,23 @@ pub struct OutgoingWebhookEvent { pub enum OutgoingWebhookEventContent { Payment { payment_id: Option, - content: payments::PaymentsResponse, + content: Value, }, Refund { payment_id: String, refund_id: String, - content: refunds::RefundResponse, + content: Value, }, Dispute { payment_id: String, attempt_id: String, dispute_id: String, - content: disputes::DisputeResponse, + content: Value, }, Mandate { payment_method_id: String, mandate_id: String, - content: mandates::MandateResponse, + content: Value, }, } pub trait OutgoingWebhookEventMetric { @@ -54,23 +52,23 @@ impl OutgoingWebhookEventMetric for OutgoingWebhookContent { match self { Self::PaymentDetails(payment_payload) => Some(OutgoingWebhookEventContent::Payment { payment_id: payment_payload.payment_id.clone(), - content: payment_payload.clone(), + content: masking::masked_serialize(&payment_payload).unwrap_or_default(), }), Self::RefundDetails(refund_payload) => Some(OutgoingWebhookEventContent::Refund { payment_id: refund_payload.payment_id.clone(), refund_id: refund_payload.refund_id.clone(), - content: refund_payload.clone(), + content: masking::masked_serialize(&refund_payload).unwrap_or_default(), }), Self::DisputeDetails(dispute_payload) => Some(OutgoingWebhookEventContent::Dispute { payment_id: dispute_payload.payment_id.clone(), attempt_id: dispute_payload.attempt_id.clone(), dispute_id: dispute_payload.dispute_id.clone(), - content: *dispute_payload.clone(), + content: masking::masked_serialize(&dispute_payload).unwrap_or_default(), }), Self::MandateDetails(mandate_payload) => Some(OutgoingWebhookEventContent::Mandate { payment_method_id: mandate_payload.payment_method_id.clone(), mandate_id: mandate_payload.mandate_id.clone(), - content: *mandate_payload.clone(), + content: masking::masked_serialize(&mandate_payload).unwrap_or_default(), }), } } @@ -83,7 +81,7 @@ impl OutgoingWebhookEvent { event_type: OutgoingWebhookEventType, content: Option, is_error: bool, - error: Option, + error: Option, ) -> Self { Self { merchant_id, From 6b58e913b5dc18a696bd5e0f160136918917be45 Mon Sep 17 00:00:00 2001 From: harsh_sharma_juspay Date: Thu, 28 Dec 2023 12:08:17 +0530 Subject: [PATCH 10/12] added default failed json serializer case and clickhouse sql script for outgoing webhook events --- .../scripts/outgoing_webhook_events.sql | 109 ++++++++++++++++++ .../src/events/outgoing_webhook_logs.rs | 16 +-- 2 files changed, 118 insertions(+), 7 deletions(-) create mode 100644 crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql diff --git a/crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql b/crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql new file mode 100644 index 000000000000..7f3cda6d2ef6 --- /dev/null +++ b/crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql @@ -0,0 +1,109 @@ +CREATE TABLE + outgoing_webhook_events_queue ( + `merchant_id` String, + `event_id` Nullable(String), + `event_type` LowCardinality(String), + `outgoing_webhook_event_type` LowCardinality(String), + `payment_id` Nullable(String), + `refund_id` Nullable(String), + `attempt_id` Nullable(String), + `dispute_id` Nullable(String), + `payment_method_id` Nullable(String), + `mandate_id` Nullable(String), + `content` Nullable(String), + `is_error` Bool, + `error` Nullable(String), + `created_at_timestamp` DateTime64(3) + ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', + kafka_topic_list = 'hyperswitch-outgoing-webhook-events', + kafka_group_name = 'hyper-c1', + kafka_format = 'JSONEachRow', + kafka_handle_error_mode = 'stream'; + +CREATE TABLE + outgoing_webhook_events_cluster ( + `merchant_id` String, + `event_id` Nullable(String), + `event_type` LowCardinality(String), + `outgoing_webhook_event_type` LowCardinality(String), + `payment_id` Nullable(String), + `refund_id` Nullable(String), + `attempt_id` Nullable(String), + `dispute_id` Nullable(String), + `payment_method_id` Nullable(String), + `mandate_id` Nullable(String), + `content` Nullable(String), + `is_error` Bool, + `error` Nullable(String), + `created_at_timestamp` DateTime64(3), + `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), + INDEX eventIndex event_type TYPE bloom_filter GRANULARITY 1, + INDEX webhookeventIndex outgoing_webhook_event_type TYPE bloom_filter GRANULARITY 1 + ) ENGINE = MergeTree PARTITION BY toStartOfDay(created_at) +ORDER BY ( + created_at, + merchant_id, + flow_type, + status_code, + api_flow + ) TTL created_at + toIntervalMonth(6); + +CREATE MATERIALIZED VIEW outgoing_webhook_events_mv TO outgoing_webhook_events_cluster ( + `merchant_id` String, + `event_id` Nullable(String), + `event_type` LowCardinality(String), + `outgoing_webhook_event_type` LowCardinality(String), + `payment_id` Nullable(String), + `refund_id` Nullable(String), + `attempt_id` Nullable(String), + `dispute_id` Nullable(String), + `payment_method_id` Nullable(String), + `mandate_id` Nullable(String), + `content` Nullable(String), + `is_error` Bool, + `error` Nullable(String), + `created_at_timestamp` DateTime64(3), + `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), +) AS +SELECT + merchant_id, + event_id, + event_type, + outgoing_webhook_event_type, + payment_id, + refund_id, + attempt_id, + dispute_id, + payment_method_id, + mandate_id, + content, + is_error, + error, + created_at_timestamp, + now() AS inserted_at +FROM + outgoing_webhook_events_queue +where length(_error) = 0; + +CREATE MATERIALIZED VIEW outgoing_webhook_parse_errors ( + `topic` String, + `partition` Int64, + `offset` Int64, + `raw` String, + `error` String +) ENGINE = MergeTree +ORDER BY ( + topic, partition, + offset + ) SETTINGS index_granularity = 8192 AS +SELECT + _topic AS topic, + _partition AS partition, + _offset AS +offset +, + _raw_message AS raw, + _error AS error +FROM + outgoing_webhook_events_queue +WHERE length(_error) > 0; \ No newline at end of file diff --git a/crates/router/src/events/outgoing_webhook_logs.rs b/crates/router/src/events/outgoing_webhook_logs.rs index ad898f69456d..ebf36caf706e 100644 --- a/crates/router/src/events/outgoing_webhook_logs.rs +++ b/crates/router/src/events/outgoing_webhook_logs.rs @@ -43,32 +43,34 @@ pub enum OutgoingWebhookEventContent { }, } pub trait OutgoingWebhookEventMetric { - fn get_outgoing_webhook_event_type(&self) -> Option { - None - } + fn get_outgoing_webhook_event_type(&self) -> Option; } impl OutgoingWebhookEventMetric for OutgoingWebhookContent { fn get_outgoing_webhook_event_type(&self) -> Option { match self { Self::PaymentDetails(payment_payload) => Some(OutgoingWebhookEventContent::Payment { payment_id: payment_payload.payment_id.clone(), - content: masking::masked_serialize(&payment_payload).unwrap_or_default(), + content: masking::masked_serialize(&payment_payload) + .unwrap_or(serde_json::json!({"error":"failed to serialize"})), }), Self::RefundDetails(refund_payload) => Some(OutgoingWebhookEventContent::Refund { payment_id: refund_payload.payment_id.clone(), refund_id: refund_payload.refund_id.clone(), - content: masking::masked_serialize(&refund_payload).unwrap_or_default(), + content: masking::masked_serialize(&refund_payload) + .unwrap_or(serde_json::json!({"error":"failed to serialize"})), }), Self::DisputeDetails(dispute_payload) => Some(OutgoingWebhookEventContent::Dispute { payment_id: dispute_payload.payment_id.clone(), attempt_id: dispute_payload.attempt_id.clone(), dispute_id: dispute_payload.dispute_id.clone(), - content: masking::masked_serialize(&dispute_payload).unwrap_or_default(), + content: masking::masked_serialize(&dispute_payload) + .unwrap_or(serde_json::json!({"error":"failed to serialize"})), }), Self::MandateDetails(mandate_payload) => Some(OutgoingWebhookEventContent::Mandate { payment_method_id: mandate_payload.payment_method_id.clone(), mandate_id: mandate_payload.mandate_id.clone(), - content: masking::masked_serialize(&mandate_payload).unwrap_or_default(), + content: masking::masked_serialize(&mandate_payload) + .unwrap_or(serde_json::json!({"error":"failed to serialize"})), }), } } From 78e66e602067c907fe695010d777bd7a9315a6d1 Mon Sep 17 00:00:00 2001 From: harsh_sharma_juspay Date: Thu, 28 Dec 2023 12:42:42 +0530 Subject: [PATCH 11/12] fix script --- .../scripts/outgoing_webhook_events.sql | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql b/crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql index 7f3cda6d2ef6..0379886b9f85 100644 --- a/crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql +++ b/crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql @@ -14,7 +14,7 @@ CREATE TABLE `is_error` Bool, `error` Nullable(String), `created_at_timestamp` DateTime64(3) - ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', + ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:9092', kafka_topic_list = 'hyperswitch-outgoing-webhook-events', kafka_group_name = 'hyper-c1', kafka_format = 'JSONEachRow', @@ -23,7 +23,7 @@ CREATE TABLE CREATE TABLE outgoing_webhook_events_cluster ( `merchant_id` String, - `event_id` Nullable(String), + `event_id` String, `event_type` LowCardinality(String), `outgoing_webhook_event_type` LowCardinality(String), `payment_id` Nullable(String), @@ -39,14 +39,14 @@ CREATE TABLE `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), INDEX eventIndex event_type TYPE bloom_filter GRANULARITY 1, INDEX webhookeventIndex outgoing_webhook_event_type TYPE bloom_filter GRANULARITY 1 - ) ENGINE = MergeTree PARTITION BY toStartOfDay(created_at) + ) ENGINE = MergeTree PARTITION BY toStartOfDay(created_at_timestamp) ORDER BY ( - created_at, + created_at_timestamp, merchant_id, - flow_type, - status_code, - api_flow - ) TTL created_at + toIntervalMonth(6); + event_id, + event_type, + outgoing_webhook_event_type + ) TTL inserted_at + toIntervalMonth(6); CREATE MATERIALIZED VIEW outgoing_webhook_events_mv TO outgoing_webhook_events_cluster ( `merchant_id` String, From bbd7fbb3aa26ea2dfeca7cfe5d30df827dfb539e Mon Sep 17 00:00:00 2001 From: harsh_sharma_juspay Date: Thu, 28 Dec 2023 12:43:28 +0530 Subject: [PATCH 12/12] fix script --- .../docs/clickhouse/scripts/outgoing_webhook_events.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql b/crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql index 0379886b9f85..3dc907629d0a 100644 --- a/crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql +++ b/crates/analytics/docs/clickhouse/scripts/outgoing_webhook_events.sql @@ -14,7 +14,7 @@ CREATE TABLE `is_error` Bool, `error` Nullable(String), `created_at_timestamp` DateTime64(3) - ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:9092', + ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', kafka_topic_list = 'hyperswitch-outgoing-webhook-events', kafka_group_name = 'hyper-c1', kafka_format = 'JSONEachRow',