Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(payment_methods): Implement Process tracker workflow for Payment method Status update #4668

Merged
merged 16 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/diesel_models/src/process_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ pub enum ProcessTrackerRunner {
ApiKeyExpiryWorkflow,
OutgoingWebhookRetryWorkflow,
AttachPayoutAccountWorkflow,
PaymentMethodStatusUpdateWorkflow,
}

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions crates/router/src/bin/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner {
)
}
}
storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow => Ok(Box::new(
workflows::payment_method_status_update::PaymentMethodStatusUpdateWorkflow,
)),
}
};

Expand Down
72 changes: 71 additions & 1 deletion crates/router/src/core/payment_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,28 @@ use api_models::payments::CardToken;
#[cfg(feature = "payouts")]
pub use api_models::{enums::PayoutConnectors, payouts as payout_types};
use diesel_models::enums;
use error_stack::ResultExt;
use hyperswitch_domain_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent};
use router_env::{instrument, tracing};

use crate::{
core::{errors::RouterResult, payments::helpers, pm_auth as core_pm_auth},
consts,
core::{
errors::{self, RouterResult},
payments::helpers,
pm_auth as core_pm_auth,
},
db,
routes::SessionState,
types::{
api::{self, payments},
domain, storage,
},
};

const PAYMENT_METHOD_STATUS_UPDATE_TASK: &str = "PAYMENT_METHOD_STATUS_UPDATE";
const PAYMENT_METHOD_STATUS_TAG: &str = "PAYMENT_METHOD_STATUS";

#[instrument(skip_all)]
pub async fn retrieve_payment_method(
pm_data: &Option<payments::PaymentMethodData>,
Expand Down Expand Up @@ -95,6 +105,66 @@ pub async fn retrieve_payment_method(
}
}

fn generate_task_id_for_payment_method_status_update_workflow(
key_id: &str,
runner: &storage::ProcessTrackerRunner,
task: &str,
) -> String {
format!("{runner}_{task}_{key_id}")
}

pub async fn add_payment_method_status_update_task(
db: &dyn db::StorageInterface,
payment_method: &diesel_models::PaymentMethod,
prev_status: enums::PaymentMethodStatus,
curr_status: enums::PaymentMethodStatus,
merchant_id: &str,
) -> Result<(), errors::ProcessTrackerError> {
let created_at = payment_method.created_at;
let schedule_time =
created_at.saturating_add(time::Duration::seconds(consts::DEFAULT_SESSION_EXPIRY));

let tracking_data = storage::PaymentMethodStatusTrackingData {
payment_method_id: payment_method.payment_method_id.clone(),
prev_status,
curr_status,
merchant_id: merchant_id.to_string(),
};

let runner = storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow;
let task = PAYMENT_METHOD_STATUS_UPDATE_TASK;
let tag = [PAYMENT_METHOD_STATUS_TAG];

let process_tracker_id = generate_task_id_for_payment_method_status_update_workflow(
payment_method.payment_method_id.as_str(),
&runner,
task,
);
let process_tracker_entry = storage::ProcessTrackerNew::new(
process_tracker_id,
task,
runner,
tag,
tracking_data,
schedule_time,
)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to construct PAYMENT_METHOD_STATUS_UPDATE process tracker task")?;

db
.insert_process(process_tracker_entry)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable_lazy(|| {
format!(
"Failed while inserting PAYMENT_METHOD_STATUS_UPDATE reminder to process_tracker for payment_method_id: {}",
payment_method.payment_method_id.clone()
)
})?;

Ok(())
}

#[instrument(skip_all)]
pub async fn retrieve_payment_method_with_token(
state: &SessionState,
Expand Down
17 changes: 16 additions & 1 deletion crates/router/src/core/payment_methods/cards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::{
core::{
errors::{self, StorageErrorExt},
payment_methods::{
transformers as payment_methods,
add_payment_method_status_update_task, transformers as payment_methods,
utils::{get_merchant_pm_filter_graph, make_pm_graph, refresh_pm_filters_cache},
vault,
},
Expand Down Expand Up @@ -310,6 +310,21 @@ pub async fn get_client_secret_or_add_payment_method(
)
.await?;

if res.status == enums::PaymentMethodStatus::AwaitingData {
add_payment_method_status_update_task(
db,
&res,
enums::PaymentMethodStatus::AwaitingData,
enums::PaymentMethodStatus::Inactive,
merchant_id,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"Failed to add payment method status update task in process tracker",
)?;
}

Ok(services::api::ApplicationResponse::Json(
api::PaymentMethodResponse::foreign_from(res),
))
Expand Down
8 changes: 8 additions & 0 deletions crates/router/src/types/storage/payment_method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,11 @@ impl DerefMut for PaymentsMandateReference {
&mut self.0
}
}

#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
pub struct PaymentMethodStatusTrackingData {
pub payment_method_id: String,
pub prev_status: enums::PaymentMethodStatus,
pub curr_status: enums::PaymentMethodStatus,
pub merchant_id: String,
}
1 change: 1 addition & 0 deletions crates/router/src/workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod api_key_expiry;
#[cfg(feature = "payouts")]
pub mod attach_payout_account_workflow;
pub mod outgoing_webhook_retry;
pub mod payment_method_status_update;
pub mod payment_sync;
pub mod refund_router;
pub mod tokenized_data;
111 changes: 111 additions & 0 deletions crates/router/src/workflows/payment_method_status_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use common_utils::ext_traits::ValueExt;
// use router_env::logger;
use scheduler::{
consumer::types::process_data, utils as pt_utils, workflows::ProcessTrackerWorkflow,
};

use crate::{
errors,
logger::error,
routes::{metrics, SessionState},
types::storage::{self, PaymentMethodStatusTrackingData},
};

pub struct PaymentMethodStatusUpdateWorkflow;

#[async_trait::async_trait]
impl ProcessTrackerWorkflow<SessionState> for PaymentMethodStatusUpdateWorkflow {
async fn execute_workflow<'a>(
Sarthak1799 marked this conversation as resolved.
Show resolved Hide resolved
&'a self,
state: &'a SessionState,
process: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
let db = &*state.store;
let tracking_data: PaymentMethodStatusTrackingData = process
.tracking_data
.clone()
.parse_value("PaymentMethodStatusTrackingData")?;

let retry_count = process.retry_count;
let pm_id = tracking_data.payment_method_id;
let prev_pm_status = tracking_data.prev_status;
let curr_pm_status = tracking_data.curr_status;
let merchant_id = tracking_data.merchant_id;

let key_store = state
.store
.get_merchant_key_store_by_merchant_id(
merchant_id.as_str(),
&state.store.get_master_key().to_vec().into(),
)
.await?;

let merchant_account = db
.find_merchant_account_by_merchant_id(&merchant_id, &key_store)
.await?;

let payment_method = db
.find_payment_method(&pm_id, merchant_account.storage_scheme)
.await?;

if payment_method.status != prev_pm_status {
return db
.as_scheduler()
.finish_process_with_business_status(
process,
"PROCESS_ALREADY_COMPLETED".to_string(),
)
.await
.map_err(Into::<errors::ProcessTrackerError>::into);
}

let pm_update = storage::PaymentMethodUpdate::StatusUpdate {
status: Some(curr_pm_status),
};

let res = db
.update_payment_method(payment_method, pm_update, merchant_account.storage_scheme)
.await
.map_err(errors::ProcessTrackerError::EStorageError);

if let Ok(_pm) = res {
db.as_scheduler()
.finish_process_with_business_status(process, "COMPLETED_BY_PT".to_string())
.await?;
} else {
let mapping = process_data::PaymentMethodsPTMapping::default();
let time_delta = if retry_count == 0 {
Some(mapping.default_mapping.start_after)
} else {
pt_utils::get_delay(retry_count + 1, &mapping.default_mapping.frequencies)
};

let schedule_time = pt_utils::get_time_from_delta(time_delta);

match schedule_time {
Some(s_time) => db
.as_scheduler()
.retry_process(process, s_time)
.await
.map_err(Into::<errors::ProcessTrackerError>::into)?,
None => db
.as_scheduler()
.finish_process_with_business_status(process, "RETRIES_EXCEEDED".to_string())
.await
.map_err(Into::<errors::ProcessTrackerError>::into)?,
};
};

Ok(())
}

async fn error_handler<'a>(
&'a self,
_state: &'a SessionState,
process: storage::ProcessTracker,
_error: errors::ProcessTrackerError,
) -> errors::CustomResult<(), errors::ProcessTrackerError> {
error!(%process.id, "Failed while executing workflow");
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/scheduler/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub fn get_outgoing_webhook_retry_schedule_time(
}

/// Get the delay based on the retry count
fn get_delay<'a>(
pub fn get_delay<'a>(
retry_count: i32,
frequencies: impl IntoIterator<Item = &'a (i32, i32)>,
) -> Option<i32> {
Expand Down
Loading