Skip to content

Commit

Permalink
Merge branch 'main' of github.com:juspay/hyperswitch into refactor-ru…
Browse files Browse the repository at this point in the history
…stman-runner

* 'main' of github.com:juspay/hyperswitch:
  chore(version): 2024.02.29.0
  chore(postman): update Postman collection files
  feat(analytics): add force retrieve call for force retrieve calls (#3565)
  refactor(connector): [Mollie] Mask PII data  (#3856)
  refactor(connector): [Gocardless] Mask PII data (#3844)
  feat(analytics): adding metric api for dispute analytics (#3810)
  feat(payment_methods): Add default payment method column in customers table and last used column in payment_methods table (#3790)
  fix(tests/postman/adyen): enable sepa payment method type for payout flows (#3861)
  feat(payouts): Implement Smart Retries for Payout (#3580)
  refactor(payment_link): add Miscellaneous charges in cart (#3645)
  • Loading branch information
pixincreate committed Feb 29, 2024
2 parents e761c84 + 6b078fa commit 6c2efa3
Show file tree
Hide file tree
Showing 77 changed files with 2,382 additions and 212 deletions.
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,35 @@ All notable changes to HyperSwitch will be documented here.

- - -

## 2024.02.29.0

### Features

- **analytics:**
- Adding metric api for dispute analytics ([#3810](https://github.com/juspay/hyperswitch/pull/3810)) ([`de6b16b`](https://github.com/juspay/hyperswitch/commit/de6b16bed98280a4ed8fc8cdad920a759662aa19))
- Add force retrieve call for force retrieve calls ([#3565](https://github.com/juspay/hyperswitch/pull/3565)) ([`032d58c`](https://github.com/juspay/hyperswitch/commit/032d58cdbbf388cf25cbf2e43b0117b83f7d076d))
- **payment_methods:** Add default payment method column in customers table and last used column in payment_methods table ([#3790](https://github.com/juspay/hyperswitch/pull/3790)) ([`f3931cf`](https://github.com/juspay/hyperswitch/commit/f3931cf484f61a4d9c107c362d0f3f6ee872e0e7))
- **payouts:** Implement Smart Retries for Payout ([#3580](https://github.com/juspay/hyperswitch/pull/3580)) ([`8b32dff`](https://github.com/juspay/hyperswitch/commit/8b32dffe324a4cdbfde173cffe3fad2e839a52aa))

### Bug Fixes

- **tests/postman/adyen:** Enable sepa payment method type for payout flows ([#3861](https://github.com/juspay/hyperswitch/pull/3861)) ([`53559c2`](https://github.com/juspay/hyperswitch/commit/53559c22527dde9536aa493ad7cd3bf353335c1a))

### Refactors

- **connector:**
- [Gocardless] Mask PII data ([#3844](https://github.com/juspay/hyperswitch/pull/3844)) ([`2f3ec7f`](https://github.com/juspay/hyperswitch/commit/2f3ec7f951967359d3995f743a486f3b380dd1f8))
- [Mollie] Mask PII data ([#3856](https://github.com/juspay/hyperswitch/pull/3856)) ([`ffbe042`](https://github.com/juspay/hyperswitch/commit/ffbe042fdccde4a721d329d6b85c95203234368e))
- **payment_link:** Add Miscellaneous charges in cart ([#3645](https://github.com/juspay/hyperswitch/pull/3645)) ([`15b367e`](https://github.com/juspay/hyperswitch/commit/15b367eb792448fb3f3312484ab13dd8241d4a14))

### Miscellaneous Tasks

- **postman:** Update Postman collection files ([`5c91a94`](https://github.com/juspay/hyperswitch/commit/5c91a9440e098490cc00a54ead34989da81babc0))

**Full Changelog:** [`2024.02.28.0...2024.02.29.0`](https://github.com/juspay/hyperswitch/compare/2024.02.28.0...2024.02.29.0)

- - -

## 2024.02.28.0

### Features
Expand Down
1 change: 0 additions & 1 deletion config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ mock_locker = true
basilisk_host = ""
locker_enabled = true


[forex_api]
call_delay = 21600
local_fetch_retry_count = 5
Expand Down
14 changes: 13 additions & 1 deletion crates/analytics/src/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
metrics::{latency::LatencyAvg, ApiEventMetricRow},
},
connector_events::events::ConnectorEventsResult,
disputes::filters::DisputeFilterRow,
disputes::{filters::DisputeFilterRow, metrics::DisputeMetricRow},
outgoing_webhook_event::events::OutgoingWebhookLogsResult,
sdk_events::events::SdkEventsResult,
types::TableEngine,
Expand Down Expand Up @@ -170,6 +170,7 @@ impl super::outgoing_webhook_event::events::OutgoingWebhookLogsFilterAnalytics
{
}
impl super::disputes::filters::DisputeFilterAnalytics for ClickhouseClient {}
impl super::disputes::metrics::DisputeMetricAnalytics for ClickhouseClient {}

#[derive(Debug, serde::Serialize)]
struct CkhQuery {
Expand Down Expand Up @@ -278,6 +279,17 @@ impl TryInto<RefundFilterRow> for serde_json::Value {
))
}
}
impl TryInto<DisputeMetricRow> for serde_json::Value {
type Error = Report<ParsingError>;

fn try_into(self) -> Result<DisputeMetricRow, Self::Error> {
serde_json::from_value(self)
.into_report()
.change_context(ParsingError::StructParseFailure(
"Failed to parse DisputeMetricRow in clickhouse results",
))
}
}

impl TryInto<DisputeFilterRow> for serde_json::Value {
type Error = Report<ParsingError>;
Expand Down
8 changes: 6 additions & 2 deletions crates/analytics/src/disputes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
pub mod accumulators;
mod core;

pub mod filters;
pub mod metrics;
pub mod types;
pub use accumulators::{DisputeMetricAccumulator, DisputeMetricsAccumulator};

pub use self::core::get_filters;
pub trait DisputeAnalytics: metrics::DisputeMetricAnalytics {}
pub use self::core::{get_filters, get_metrics};
100 changes: 100 additions & 0 deletions crates/analytics/src/disputes/accumulators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use api_models::analytics::disputes::DisputeMetricsBucketValue;
use diesel_models::enums as storage_enums;

use super::metrics::DisputeMetricRow;
#[derive(Debug, Default)]
pub struct DisputeMetricsAccumulator {
pub disputes_status_rate: RateAccumulator,
pub total_amount_disputed: SumAccumulator,
pub total_dispute_lost_amount: SumAccumulator,
}
#[derive(Debug, Default)]
pub struct RateAccumulator {
pub won_count: i64,
pub challenged_count: i64,
pub lost_count: i64,
pub total: i64,
}
#[derive(Debug, Default)]
#[repr(transparent)]
pub struct SumAccumulator {
pub total: Option<i64>,
}

pub trait DisputeMetricAccumulator {
type MetricOutput;

fn add_metrics_bucket(&mut self, metrics: &DisputeMetricRow);

fn collect(self) -> Self::MetricOutput;
}

impl DisputeMetricAccumulator for SumAccumulator {
type MetricOutput = Option<u64>;
#[inline]
fn add_metrics_bucket(&mut self, metrics: &DisputeMetricRow) {
self.total = match (
self.total,
metrics
.total
.as_ref()
.and_then(bigdecimal::ToPrimitive::to_i64),
) {
(None, None) => None,
(None, i @ Some(_)) | (i @ Some(_), None) => i,
(Some(a), Some(b)) => Some(a + b),
}
}
#[inline]
fn collect(self) -> Self::MetricOutput {
self.total.and_then(|i| u64::try_from(i).ok())
}
}

impl DisputeMetricAccumulator for RateAccumulator {
type MetricOutput = Option<(Option<u64>, Option<u64>, Option<u64>, Option<u64>)>;

fn add_metrics_bucket(&mut self, metrics: &DisputeMetricRow) {
if let Some(ref dispute_status) = metrics.dispute_status {
if dispute_status.as_ref() == &storage_enums::DisputeStatus::DisputeChallenged {
self.challenged_count += metrics.count.unwrap_or_default();
}
if dispute_status.as_ref() == &storage_enums::DisputeStatus::DisputeWon {
self.won_count += metrics.count.unwrap_or_default();
}
if dispute_status.as_ref() == &storage_enums::DisputeStatus::DisputeLost {
self.lost_count += metrics.count.unwrap_or_default();
}
};

self.total += metrics.count.unwrap_or_default();
}

fn collect(self) -> Self::MetricOutput {
if self.total <= 0 {
Some((None, None, None, None))
} else {
Some((
u64::try_from(self.challenged_count).ok(),
u64::try_from(self.won_count).ok(),
u64::try_from(self.lost_count).ok(),
u64::try_from(self.total).ok(),
))
}
}
}

impl DisputeMetricsAccumulator {
pub fn collect(self) -> DisputeMetricsBucketValue {
let (challenge_rate, won_rate, lost_rate, total_dispute) =
self.disputes_status_rate.collect().unwrap_or_default();
DisputeMetricsBucketValue {
disputes_challenged: challenge_rate,
disputes_won: won_rate,
disputes_lost: lost_rate,
total_amount_disputed: self.total_amount_disputed.collect(),
total_dispute_lost_amount: self.total_dispute_lost_amount.collect(),
total_dispute,
}
}
}
122 changes: 115 additions & 7 deletions crates/analytics/src/disputes/core.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,125 @@
use std::collections::HashMap;

use api_models::analytics::{
disputes::DisputeDimensions, DisputeFilterValue, DisputeFiltersResponse,
GetDisputeFilterRequest,
disputes::{
DisputeDimensions, DisputeMetrics, DisputeMetricsBucketIdentifier,
DisputeMetricsBucketResponse,
},
AnalyticsMetadata, DisputeFilterValue, DisputeFiltersResponse, GetDisputeFilterRequest,
GetDisputeMetricRequest, MetricsResponse,
};
use error_stack::{IntoReport, ResultExt};
use router_env::{
logger,
tracing::{self, Instrument},
};
use error_stack::ResultExt;

use super::filters::{get_dispute_filter_for_dimension, DisputeFilterRow};
use super::{
filters::{get_dispute_filter_for_dimension, DisputeFilterRow},
DisputeMetricsAccumulator,
};
use crate::{
disputes::DisputeMetricAccumulator,
errors::{AnalyticsError, AnalyticsResult},
AnalyticsProvider,
metrics, AnalyticsProvider,
};

pub async fn get_metrics(
pool: &AnalyticsProvider,
merchant_id: &String,
req: GetDisputeMetricRequest,
) -> AnalyticsResult<MetricsResponse<DisputeMetricsBucketResponse>> {
let mut metrics_accumulator: HashMap<
DisputeMetricsBucketIdentifier,
DisputeMetricsAccumulator,
> = HashMap::new();
let mut set = tokio::task::JoinSet::new();
for metric_type in req.metrics.iter().cloned() {
let req = req.clone();
let pool = pool.clone();
let task_span = tracing::debug_span!(
"analytics_dispute_query",
refund_metric = metric_type.as_ref()
);
// Currently JoinSet works with only static lifetime references even if the task pool does not outlive the given reference
// We can optimize away this clone once that is fixed
let merchant_id_scoped = merchant_id.to_owned();
set.spawn(
async move {
let data = pool
.get_dispute_metrics(
&metric_type,
&req.group_by_names.clone(),
&merchant_id_scoped,
&req.filters,
&req.time_series.map(|t| t.granularity),
&req.time_range,
)
.await
.change_context(AnalyticsError::UnknownError);
(metric_type, data)
}
.instrument(task_span),
);
}

while let Some((metric, data)) = set
.join_next()
.await
.transpose()
.into_report()
.change_context(AnalyticsError::UnknownError)?
{
let data = data?;
let attributes = &[
metrics::request::add_attributes("metric_type", metric.to_string()),
metrics::request::add_attributes("source", pool.to_string()),
];

let value = u64::try_from(data.len());
if let Ok(val) = value {
metrics::BUCKETS_FETCHED.record(&metrics::CONTEXT, val, attributes);
logger::debug!("Attributes: {:?}, Buckets fetched: {}", attributes, val);
}

for (id, value) in data {
logger::debug!(bucket_id=?id, bucket_value=?value, "Bucket row for metric {metric}");
let metrics_builder = metrics_accumulator.entry(id).or_default();
match metric {
DisputeMetrics::DisputeStatusMetric => metrics_builder
.disputes_status_rate
.add_metrics_bucket(&value),
DisputeMetrics::TotalAmountDisputed => metrics_builder
.total_amount_disputed
.add_metrics_bucket(&value),
DisputeMetrics::TotalDisputeLostAmount => metrics_builder
.total_dispute_lost_amount
.add_metrics_bucket(&value),
}
}

logger::debug!(
"Analytics Accumulated Results: metric: {}, results: {:#?}",
metric,
metrics_accumulator
);
}
let query_data: Vec<DisputeMetricsBucketResponse> = metrics_accumulator
.into_iter()
.map(|(id, val)| DisputeMetricsBucketResponse {
values: val.collect(),
dimensions: id,
})
.collect();

Ok(MetricsResponse {
query_data,
meta_data: [AnalyticsMetadata {
current_time_range: req.time_range,
}],
})
}

pub async fn get_filters(
pool: &AnalyticsProvider,
req: GetDisputeFilterRequest,
Expand Down Expand Up @@ -76,9 +186,7 @@ pub async fn get_filters(
.change_context(AnalyticsError::UnknownError)?
.into_iter()
.filter_map(|fil: DisputeFilterRow| match dim {
DisputeDimensions::DisputeStatus => fil.dispute_status,
DisputeDimensions::DisputeStage => fil.dispute_stage,
DisputeDimensions::ConnectorStatus => fil.connector_status,
DisputeDimensions::Connector => fil.connector,
})
.collect::<Vec<String>>();
Expand Down
Loading

0 comments on commit 6c2efa3

Please sign in to comment.