Skip to content

Commit

Permalink
feat. mandate kv support
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshay S authored and Akshay S committed Apr 10, 2024
1 parent 44e7456 commit dffa098
Show file tree
Hide file tree
Showing 17 changed files with 566 additions and 72 deletions.
23 changes: 22 additions & 1 deletion crates/diesel_models/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
payouts::{Payouts, PayoutsNew, PayoutsUpdate},
refund::{Refund, RefundNew, RefundUpdate},
reverse_lookup::{ReverseLookup, ReverseLookupNew},
PaymentIntent, PaymentMethod, PaymentMethodNew, PaymentMethodUpdateInternal, PgPooledConn,
PaymentIntent, PaymentMethod, PaymentMethodNew, PaymentMethodUpdateInternal, PgPooledConn,Mandate, MandateNew, MandateUpdateInternal,
};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -38,6 +38,7 @@ impl DBOperation {
Insertable::PayoutAttempt(_) => "payout_attempt",
Insertable::ReverseLookUp(_) => "reverse_lookup",
Insertable::PaymentMethod(_) => "payment_method",
Insertable::Mandate(_) => "mandate",
},
Self::Update { updatable } => match updatable {
Updateable::PaymentIntentUpdate(_) => "payment_intent",
Expand All @@ -47,6 +48,7 @@ impl DBOperation {
Updateable::PayoutsUpdate(_) => "payouts",
Updateable::PayoutAttemptUpdate(_) => "payout_attempt",
Updateable::PaymentMethodUpdate(_) => "payment_method",
Updateable::MandateUpdate(_) => " mandate",
},
}
}
Expand All @@ -62,6 +64,7 @@ pub enum DBResult {
Payouts(Box<Payouts>),
PayoutAttempt(Box<PayoutAttempt>),
PaymentMethod(Box<PaymentMethod>),
Mandate(Box<Mandate>),
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -92,6 +95,7 @@ impl DBOperation {
Insertable::PaymentMethod(rev) => {
DBResult::PaymentMethod(Box::new(rev.insert(conn).await?))
}
Insertable::Mandate(m) => DBResult::Mandate(Box::new(m.insert(conn).await?)),
},
Self::Update { updatable } => match updatable {
Updateable::PaymentIntentUpdate(a) => {
Expand All @@ -117,6 +121,15 @@ impl DBOperation {
.update_with_payment_method_id(conn, v.update_data)
.await?,
)),
Updateable::MandateUpdate(m) => DBResult::Mandate(Box::new(
Mandate::update_by_merchant_id_mandate_id(
conn,
&m.orig.merchant_id,
&m.orig.mandate_id,
m.update_data,
)
.await?,
)),
},
})
}
Expand Down Expand Up @@ -154,6 +167,7 @@ pub enum Insertable {
Payouts(PayoutsNew),
PayoutAttempt(PayoutAttemptNew),
PaymentMethod(PaymentMethodNew),
Mandate(MandateNew),
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -166,6 +180,7 @@ pub enum Updateable {
PayoutsUpdate(PayoutsUpdateMems),
PayoutAttemptUpdate(PayoutAttemptUpdateMems),
PaymentMethodUpdate(PaymentMethodUpdateMems),
MandateUpdate(MandateUpdateMems),
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -209,3 +224,9 @@ pub struct PaymentMethodUpdateMems {
pub orig: PaymentMethod,
pub update_data: PaymentMethodUpdateInternal,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MandateUpdateMems {
pub orig: Mandate,
pub update_data: MandateUpdateInternal,
}
80 changes: 77 additions & 3 deletions crates/diesel_models/src/mandate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use time::PrimitiveDateTime;

use crate::{enums as storage_enums, schema::mandate};

#[derive(Clone, Debug, Identifiable, Queryable)]
#[derive(Clone, Debug, Identifiable, Queryable, serde::Serialize, serde::Deserialize)]
#[diesel(table_name = mandate)]
pub struct Mandate {
pub id: i32,
Expand Down Expand Up @@ -35,7 +35,14 @@ pub struct Mandate {
}

#[derive(
router_derive::Setter, Clone, Debug, Default, Insertable, router_derive::DebugAsDisplay,
router_derive::Setter,
Clone,
Debug,
Default,
Insertable,
router_derive::DebugAsDisplay,
serde::Serialize,
serde::Deserialize,
)]
#[diesel(table_name = mandate)]
pub struct MandateNew {
Expand Down Expand Up @@ -89,7 +96,15 @@ pub struct SingleUseMandate {
pub currency: storage_enums::Currency,
}

#[derive(Clone, Debug, Default, AsChangeset, router_derive::DebugAsDisplay)]
#[derive(
Clone,
Debug,
Default,
AsChangeset,
router_derive::DebugAsDisplay,
serde::Serialize,
serde::Deserialize,
)]
#[diesel(table_name = mandate)]
pub struct MandateUpdateInternal {
mandate_status: Option<storage_enums::MandateStatus>,
Expand Down Expand Up @@ -140,3 +155,62 @@ impl From<MandateUpdate> for MandateUpdateInternal {
}
}
}

impl MandateUpdateInternal {
pub fn apply_changeset(self, source: Mandate) -> Mandate {
let Self {
mandate_status,
amount_captured,
connector_mandate_ids,
connector_mandate_id,
payment_method_id,
original_payment_id,
} = self;

Mandate {
mandate_status: mandate_status.unwrap_or(source.mandate_status),
amount_captured: amount_captured.map_or(source.amount_captured, Some),
connector_mandate_ids: connector_mandate_ids
.map_or(source.connector_mandate_ids, Some),
connector_mandate_id: connector_mandate_id
.map_or(source.connector_mandate_id, Some),
payment_method_id: payment_method_id.unwrap_or(source.payment_method_id),
original_payment_id: original_payment_id
.map_or(source.original_payment_id, Some),
..source
}
}
}

impl From<&MandateNew> for Mandate {
fn from(mandate_new: &MandateNew) -> Self {
Self {
id: 0i32,
mandate_id: mandate_new.mandate_id.clone(),
customer_id: mandate_new.customer_id.clone(),
merchant_id: mandate_new.merchant_id.clone(),
payment_method_id: mandate_new.payment_method_id.clone(),
mandate_status: mandate_new.mandate_status,
mandate_type: mandate_new.mandate_type,
customer_accepted_at: mandate_new.customer_accepted_at,
customer_ip_address: mandate_new.customer_ip_address.clone(),
customer_user_agent: mandate_new.customer_user_agent.clone(),
network_transaction_id: mandate_new.network_transaction_id.clone(),
previous_attempt_id: mandate_new.previous_attempt_id.clone(),
created_at: mandate_new
.created_at
.unwrap_or_else(common_utils::date_time::now),
mandate_amount: mandate_new.mandate_amount,
mandate_currency: mandate_new.mandate_currency,
amount_captured: mandate_new.amount_captured,
connector: mandate_new.connector.clone(),
connector_mandate_id: mandate_new.connector_mandate_id.clone(),
start_date: mandate_new.start_date,
end_date: mandate_new.end_date,
metadata: mandate_new.metadata.clone(),
connector_mandate_ids: mandate_new.connector_mandate_ids.clone(),
original_payment_id: mandate_new.original_payment_id.clone(),
merchant_connector_id: mandate_new.merchant_connector_id.clone(),
}
}
}
4 changes: 2 additions & 2 deletions crates/diesel_models/src/query/mandate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ impl Mandate {
conn: &PgPooledConn,
merchant_id: &str,
mandate_id: &str,
mandate: MandateUpdate,
mandate: MandateUpdateInternal,
) -> StorageResult<Self> {
generics::generic_update_with_results::<<Self as HasTable>::Table, _, _, _>(
conn,
dsl::merchant_id
.eq(merchant_id.to_owned())
.and(dsl::mandate_id.eq(mandate_id.to_owned())),
MandateUpdateInternal::from(mandate),
mandate,
)
.await?
.first()
Expand Down
49 changes: 37 additions & 12 deletions crates/router/src/core/mandate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use crate::{
mandates::{self, MandateResponseExt},
ConnectorData, GetToken,
},
domain, storage,
domain,
storage::{self, enums::MerchantStorageScheme},
transformers::ForeignFrom,
},
utils::OptionExt,
Expand All @@ -39,7 +40,11 @@ pub async fn get_mandate(
let mandate = state
.store
.as_ref()
.find_mandate_by_merchant_id_mandate_id(&merchant_account.merchant_id, &req.mandate_id)
.find_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
&req.mandate_id,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;
Ok(services::ApplicationResponse::Json(
Expand All @@ -62,7 +67,11 @@ pub async fn revoke_mandate(
) -> RouterResponse<mandates::MandateRevokedResponse> {
let db = state.store.as_ref();
let mandate = db
.find_mandate_by_merchant_id_mandate_id(&merchant_account.merchant_id, &req.mandate_id)
.find_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
&req.mandate_id,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;
match mandate.mandate_status {
Expand Down Expand Up @@ -123,6 +132,8 @@ pub async fn revoke_mandate(
storage::MandateUpdate::StatusUpdate {
mandate_status: storage::enums::MandateStatus::Revoked,
},
mandate,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;
Expand Down Expand Up @@ -162,6 +173,7 @@ pub async fn update_connector_mandate_id(
mandate_ids_opt: Option<String>,
payment_method_id: Option<String>,
resp: Result<types::PaymentsResponseData, types::ErrorResponse>,
storage_scheme: MerchantStorageScheme,
) -> RouterResponse<mandates::MandateResponse> {
let mandate_details = Option::foreign_from(resp);
let connector_mandate_id = mandate_details
Expand All @@ -176,7 +188,7 @@ pub async fn update_connector_mandate_id(
//Ignore updation if the payment_attempt mandate_id or connector_mandate_id is not present
if let Some((mandate_id, connector_id)) = mandate_ids_opt.zip(connector_mandate_id) {
let mandate = db
.find_mandate_by_merchant_id_mandate_id(&merchant_account, &mandate_id)
.find_mandate_by_merchant_id_mandate_id(&merchant_account, &mandate_id, storage_scheme)
.await
.change_context(errors::ApiErrorResponse::MandateNotFound)?;

Expand All @@ -199,6 +211,8 @@ pub async fn update_connector_mandate_id(
&merchant_account,
&mandate_id,
update_mandate_details,
mandate,
storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::MandateUpdateFailed)?;
Expand Down Expand Up @@ -262,6 +276,7 @@ pub async fn update_mandate_procedure<F, FData>(
mandate: Mandate,
merchant_id: &str,
pm_id: Option<String>,
storage_scheme: MerchantStorageScheme,
) -> errors::RouterResult<types::RouterData<F, FData, types::PaymentsResponseData>>
where
FData: MandateBehaviour,
Expand All @@ -276,13 +291,14 @@ where
};

let old_record = payments::UpdateHistory {
connector_mandate_id: mandate.connector_mandate_id,
payment_method_id: mandate.payment_method_id,
original_payment_id: mandate.original_payment_id,
connector_mandate_id: mandate.connector_mandate_id.clone(),
payment_method_id: mandate.payment_method_id.clone(),
original_payment_id: mandate.original_payment_id.clone(),
};

let mandate_ref = mandate
.connector_mandate_ids
.clone()
.parse_value::<payments::ConnectorMandateReferenceId>("Connector Reference Id")
.change_context(errors::ApiErrorResponse::MandateDeserializationFailed)?;

Expand All @@ -302,11 +318,12 @@ where
.change_context(errors::ApiErrorResponse::InternalServerError)
.map(masking::Secret::new)?;

let mandate_id = mandate.mandate_id.clone();
let _update_mandate_details = state
.store
.update_mandate_by_merchant_id_mandate_id(
merchant_id,
&mandate.mandate_id,
&mandate_id,
diesel_models::MandateUpdate::ConnectorMandateIdUpdate {
connector_mandate_id: mandate_details
.as_ref()
Expand All @@ -316,6 +333,8 @@ where
.unwrap_or("Error retrieving the payment_method_id".to_string()),
original_payment_id: Some(resp.payment_id.clone()),
},
mandate,
storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::MandateUpdateFailed)?;
Expand All @@ -327,6 +346,7 @@ pub async fn mandate_procedure<F, FData>(
maybe_customer: &Option<domain::Customer>,
pm_id: Option<String>,
merchant_connector_id: Option<String>,
storage_scheme: MerchantStorageScheme,
) -> errors::RouterResult<types::RouterData<F, FData, types::PaymentsResponseData>>
where
FData: MandateBehaviour,
Expand All @@ -336,15 +356,16 @@ where
Ok(_) => match resp.request.get_mandate_id() {
Some(mandate_id) => {
if let Some(ref mandate_id) = mandate_id.mandate_id {
let mandate = state
let orig_mandate = state
.store
.find_mandate_by_merchant_id_mandate_id(
resp.merchant_id.as_ref(),
mandate_id,
storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;
let mandate = match mandate.mandate_type {
let mandate = match orig_mandate.mandate_type {
storage_enums::MandateType::SingleUse => state
.store
.update_mandate_by_merchant_id_mandate_id(
Expand All @@ -353,6 +374,8 @@ where
storage::MandateUpdate::StatusUpdate {
mandate_status: storage_enums::MandateStatus::Revoked,
},
orig_mandate,
storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::MandateUpdateFailed),
Expand All @@ -363,10 +386,12 @@ where
mandate_id,
storage::MandateUpdate::CaptureAmountUpdate {
amount_captured: Some(
mandate.amount_captured.unwrap_or(0)
orig_mandate.amount_captured.unwrap_or(0)
+ resp.request.get_amount(),
),
},
orig_mandate,
storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::MandateUpdateFailed),
Expand Down Expand Up @@ -451,7 +476,7 @@ where
}));
state
.store
.insert_mandate(new_mandate_data)
.insert_mandate(new_mandate_data, storage_scheme)
.await
.to_duplicate_response(errors::ApiErrorResponse::DuplicateMandate)?;
metrics::MANDATE_COUNT.add(
Expand Down
1 change: 1 addition & 0 deletions crates/router/src/core/payments/flows/authorize_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl Feature<api::Authorize, types::PaymentsAuthorizeData> for types::PaymentsAu
maybe_customer,
payment_method_id,
connector.merchant_connector_id.clone(),
merchant_account.storage_scheme,
)
.await?)
} else {
Expand Down
Loading

0 comments on commit dffa098

Please sign in to comment.