Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(events): Add events framework for registering events #4115

Merged
merged 50 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
71912ff
refactor(events): remove intermediary raw event and instead rely on t…
lsampras Feb 27, 2024
bba344e
chore(makefile): update the default behaviour or make fmt to mutate t…
lsampras Feb 27, 2024
1c8c721
Merge remote-tracking branch 'origin/main' into create-payment-lifecy…
lsampras Feb 27, 2024
679f233
feat(events): add support for generating audit events
lsampras Feb 27, 2024
a9a7256
refactor(events): add event setup in payment data
lsampras Feb 27, 2024
3bd533c
refactor(clippy): fix large futures
lsampras Feb 28, 2024
f2f9735
Merge remote-tracking branch 'origin/main' into create-payment-lifecy…
lsampras Mar 7, 2024
8b1eea9
fix(events): remove payment data changes field
lsampras Mar 7, 2024
6121889
chore: run formatter
hyperswitch-bot[bot] Mar 8, 2024
fdd3ca4
feat(events): remove event generation logic
lsampras Mar 14, 2024
60ccc32
Merge branch 'main' into create-payment-lifecycle-events
lsampras Mar 14, 2024
8341a4a
chore: run formatter
hyperswitch-bot[bot] Mar 14, 2024
815aa59
chore(clippy): fix big futures
lsampras Mar 14, 2024
06f72d3
chore(config): update config placeholders
lsampras Mar 15, 2024
f4dedc9
feat(events): add events framework
lsampras Mar 15, 2024
af52920
chore(events): add docstrings for events interface
lsampras Mar 15, 2024
33cb42d
feat(events): imple event sink for kafka
lsampras Mar 15, 2024
6855195
feat(events): use events framework for event handler
lsampras Mar 18, 2024
44c5bf5
Merge remote-tracking branch 'origin/main' into add_events_crate
lsampras Mar 18, 2024
e21d292
chore: run formatter
hyperswitch-bot[bot] Mar 18, 2024
b32f1ac
Merge remote-tracking branch 'origin/main' into add_events_crate
lsampras Mar 18, 2024
8302161
Merge branch 'main' into add_events_crate
lsampras Mar 18, 2024
08639ea
feat(events): support extraction of root level data from event
lsampras Mar 18, 2024
4f5da96
chore: run formatter
hyperswitch-bot[bot] Mar 18, 2024
2e1e689
refactor(app): remove generics for appstateinfo from server_wrap
lsampras Mar 18, 2024
f674547
Merge branch 'main' into add_events_crate
lsampras Mar 18, 2024
2025dce
feat(events): add a request context generated from server_wrap
lsampras Mar 19, 2024
1d8ab48
Merge branch 'add_events_crate' of github.com:juspay/hyperswitch into…
lsampras Mar 19, 2024
6bce3aa
Merge branch 'main' into add_events_crate
lsampras Mar 19, 2024
1a10bbd
chore(lints): fix clippy unused lint
lsampras Mar 19, 2024
b941291
Merge remote-tracking branch 'origin/main' into add_events_crate
lsampras Mar 19, 2024
e6cbb7c
Merge branch 'main' into add_events_crate
lsampras Mar 21, 2024
9a2cffc
fix(events): update events trait
lsampras Mar 21, 2024
438b655
Merge remote-tracking branch 'origin/main' into add_events_crate
lsampras Mar 21, 2024
5c113c2
fix(events): add req state for payout api
lsampras Mar 21, 2024
cfa1b10
Merge remote-tracking branch 'origin/main' into add_events_crate
lsampras Mar 28, 2024
77e2b38
feat(events): add message interface
lsampras Apr 1, 2024
ee99221
Merge remote-tracking branch 'origin/main' into add_events_crate
lsampras Apr 1, 2024
25bd267
chore: run formatter
hyperswitch-bot[bot] Apr 1, 2024
5f983b6
fix(events): update event trait
lsampras Apr 1, 2024
de94a57
chore(lints): fix clippy lints
lsampras Apr 1, 2024
9109757
fix(events): add must use for event builder
lsampras Apr 3, 2024
b0c41c2
Merge remote-tracking branch 'origin/main' into add_events_crate
lsampras Apr 3, 2024
528339a
feat(events): remove box dyn from event info
lsampras Apr 3, 2024
9f64670
Merge remote-tracking branch 'origin/main' into add_events_crate
lsampras Apr 3, 2024
84989dc
fix(events): update messaging to use masked serialize
lsampras Apr 4, 2024
91f2f2d
Merge remote-tracking branch 'origin/main' into add_events_crate
lsampras Apr 4, 2024
6d8325d
fix(events): update server wrap response handle
lsampras Apr 4, 2024
ae24a0b
Merge remote-tracking branch 'origin/main' into add_events_crate
lsampras Apr 4, 2024
de20710
Merge remote-tracking branch 'origin/main' into add_events_crate
lsampras Apr 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ endif
# Format Rust sources with rustfmt.
#
# Usage :
# make fmt [writing=(no|yes)]
# make fmt [dry_run=(no|yes)]

fmt :
cargo +nightly fmt --all $(if $(call eq,$(writing),yes),,-- --check)
cargo +nightly fmt --all $(if $(call eq,$(dry_run),yes),-- --check,)

# Lint Rust sources with Clippy.
#
Expand Down
1 change: 1 addition & 0 deletions config/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ api_logs_topic = "topic" # Kafka topic to be used for incoming api
connector_logs_topic = "topic" # Kafka topic to be used for connector api events
outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events
dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events
audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events

# File storage configuration
[file_storage]
Expand Down
2 changes: 2 additions & 0 deletions config/deployments/env_specific.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ refund_analytics_topic = "topic" # Kafka topic to be used for Refund events
api_logs_topic = "topic" # Kafka topic to be used for incoming api events
connector_logs_topic = "topic" # Kafka topic to be used for connector api events
outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events
dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events
audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events

# File storage configuration
[file_storage]
Expand Down
1 change: 1 addition & 0 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ api_logs_topic = "hyperswitch-api-log-events"
connector_logs_topic = "hyperswitch-connector-api-events"
outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events"
dispute_analytics_topic = "hyperswitch-dispute-events"
audit_events_topic = "hyperswitch-audit-events"

[analytics]
source = "sqlx"
Expand Down
1 change: 1 addition & 0 deletions config/docker_compose.toml
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ api_logs_topic = "hyperswitch-api-log-events"
connector_logs_topic = "hyperswitch-connector-api-events"
outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events"
dispute_analytics_topic = "hyperswitch-dispute-events"
audit_events_topic = "hyperswitch-audit-events"

[analytics]
source = "sqlx"
Expand Down
2 changes: 1 addition & 1 deletion crates/data_models/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod errors;
pub mod mandates;
pub mod payments;

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub enum RemoteStorageObject<T: ForeignIDRef> {
ForeignID(String),
Object(T),
Expand Down
2 changes: 1 addition & 1 deletion crates/data_models/src/payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use common_enums as storage_enums;
use self::payment_attempt::PaymentAttempt;
use crate::RemoteStorageObject;

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct PaymentIntent {
pub id: i32,
pub payment_id: String,
Expand Down
15 changes: 15 additions & 0 deletions crates/events/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "events"
version = "0.1.0"
edition.workspace = true
rust-version.workspace = true
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
error-stack = "0.3.1"
masking = { version = "0.1.0", path = "../masking" }
serde_json = "1.0.114"
thiserror = "1.0.58"
time = "0.3.34"
149 changes: 149 additions & 0 deletions crates/events/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg_hide))]
#![cfg_attr(docsrs, doc(cfg_hide(doc)))]
#![forbid(unsafe_code)]
#![warn(missing_docs)]

//!
//! A generic event handler system.
//! This library consists of 4 parts:
//! Event Sink: A trait that defines how events are published. This could be a simple logger, a message queue, or a database.
//! EventContext: A struct that holds the event sink and metadata about the event. This is used to create events. This can be used to add metadata to all events, such as the user who triggered the event.
//! EventInfo: A trait that defines the metadata that is sent with the event. This trait is used to define the data that is sent with the event it works with the EventContext to add metadata to all events.
//! Event: A trait that defines the event itself. This trait is used to define the data that is sent with the event and defines the event's type & identifier.
//!

use std::rc::Rc;

use error_stack::Result;
use time::PrimitiveDateTime;

/// Errors that can occur when working with events.
#[derive(Debug, Clone, thiserror::Error)]
pub enum EventsError {
/// An error occurred when publishing the event.
#[error("Generic Error")]
GenericError,
#[error("Event serialization error: {0}")]
SerializationError(String),
}

/// An event that can be published.
pub trait Event: EventInfo {
/// The type of the event.
type EventType;
/// The timestamp of the event.
fn timestamp(&self) -> PrimitiveDateTime;

/// The (unique) identifier of the event.
fn identifier(&self) -> String;

/// The class/type of the event. This is used to group/categorize events together.
fn class(&self) -> Self::EventType;
}

/// An Event sink that can publish events.
/// This could be a simple logger, a message queue, or a database.
pub trait EventSink<T> {
/// Publish an event.
/// The parameters for this function are determined from the Event trait.
fn publish_event(
&self,
data: serde_json::Value,
identifier: String,
topic: T,
timestamp: PrimitiveDateTime,
) -> Result<(), EventsError>;
}

/// Hold the context information for any events
#[derive(Clone)]
pub struct EventContext<T> {
event_sink: Rc<Box<dyn EventSink<T>>>,
metadata: Vec<Rc<Box<dyn EventInfo>>>,
}

/// intermediary structure to build events with in-place info.
pub struct EventBuilder<T> {
event_sink: Rc<Box<dyn EventSink<T>>>,
src_metadata: Vec<Rc<Box<dyn EventInfo>>>,
event_metadata: Vec<Rc<Box<dyn EventInfo>>>,
event: Box<dyn Event<EventType = T>>,
}

impl<T> EventBuilder<T> {
/// Add metadata to the event.
pub fn with<E: EventInfo + 'static>(mut self, info: E) -> Self {
let boxed_event: Box<dyn EventInfo> = Box::new(info);
self.event_metadata.push(boxed_event.into());
self
}
/// Emit the event.
pub fn emit(self) -> Result<(), EventsError> {
lsampras marked this conversation as resolved.
Show resolved Hide resolved
self.event_sink.publish_event(
self.data()?,
self.event.identifier(),
self.event.class(),
self.event.timestamp(),
)
}
}

impl<T> EventInfo for EventBuilder<T> {
fn data(&self) -> Result<serde_json::Value, EventsError> {
self.src_metadata
.iter()
.chain(self.event_metadata.iter())
.map(|info| info.data().map(|d| (info.key(), d)))
.collect()
}

fn key(&self) -> String {
self.event.key()
}
}

impl<T> EventContext<T> {
/// Create a new event context.
pub fn new(event_sink: Rc<Box<dyn EventSink<T>>>) -> Self {
Self {
event_sink,
metadata: Vec::new(),
}
}

/// Add metadata to the event context.
pub fn record_info<E: EventInfo + 'static>(&mut self, info: E) {
let boxed_event: Box<dyn EventInfo> = Box::new(info);
self.metadata.push(boxed_event.into());
}

/// Emit an event.
pub fn emit(&self, event: Box<dyn Event<EventType = T>>) -> Result<(), EventsError> {
EventBuilder {
event_sink: self.event_sink.clone(),
src_metadata: self.metadata.clone(),
event_metadata: vec![],
event,
}
.emit()
}

/// Create an event builder.
pub fn event(&self, event: Box<dyn Event<EventType = T>>) -> EventBuilder<T> {
EventBuilder {
event_sink: self.event_sink.clone(),
src_metadata: self.metadata.clone(),
event_metadata: vec![],
event,
}
}
}

/// Add information/metadata to the current context of an event.
pub trait EventInfo {
/// The data that is sent with the event.
fn data(&self) -> Result<serde_json::Value, EventsError>;

/// The key identifying the data for an event.
fn key(&self) -> String;
}
1 change: 1 addition & 0 deletions crates/router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ rdkafka = "0.36.0"
isocountry = "0.3.2"
iso_currency = "0.4.4"
actix-http = "3.3.1"
events = { version = "0.1.0", path = "../events" }

[build-dependencies]
router_env = { version = "0.1.0", path = "../router_env", default-features = false }
Expand Down
8 changes: 4 additions & 4 deletions crates/router/src/core/payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ where
&connectors,
)
.await?;
call_multiple_connectors_service(
Box::pin(call_multiple_connectors_service(
state,
&merchant_account,
&key_store,
Expand All @@ -402,7 +402,7 @@ where
payment_data,
&customer,
session_surcharge_details,
)
))
.await?
}
};
Expand Down Expand Up @@ -3427,7 +3427,7 @@ pub async fn payment_external_authentication(
id: profile_id.to_string(),
})?;

let authentication_response = authentication_core::perform_authentication(
let authentication_response = Box::pin(authentication_core::perform_authentication(
&state,
authentication_connector,
payment_method_details.0,
Expand All @@ -3451,7 +3451,7 @@ pub async fn payment_external_authentication(
req.sdk_information,
req.threeds_method_comp_ind,
optional_customer.and_then(|customer| customer.email.map(common_utils::pii::Email::from)),
)
))
.await?;
Ok(services::ApplicationResponse::Json(
api_models::payments::PaymentsExternalAuthenticationResponse {
Expand Down
Loading
Loading