Skip to content

Commit

Permalink
[pay] traces for yagna freeze bug hunt
Browse files Browse the repository at this point in the history
  • Loading branch information
tworec committed Mar 17, 2021
1 parent 07c2e3c commit 69c9ac2
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 1 deletion.
Empty file added a
Empty file.
21 changes: 20 additions & 1 deletion core/payment/src/api/debit_notes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ async fn accept_debit_note(
let allocation_id = acceptance.allocation_id.clone();

let dao: DebitNoteDao = db.as_dao();
log::trace!("Querying DB for Debit Note [{}]", debit_note_id);
let debit_note: DebitNote = match dao.get(debit_note_id.clone(), node_id).await {
Ok(Some(debit_note)) => debit_note,
Ok(None) => return response::not_found(),
Expand All @@ -260,6 +261,11 @@ async fn accept_debit_note(
}

let activity_id = debit_note.activity_id.clone();
log::trace!(
"Querying DB for Activity [{}] for Debit Note [{}]",
activity_id,
debit_note_id
);
let activity = match db
.as_dao::<ActivityDao>()
.get(activity_id.clone(), node_id)
Expand All @@ -271,6 +277,11 @@ async fn accept_debit_note(
};
let amount_to_pay = &debit_note.total_amount_due - &activity.total_amount_scheduled.0;

log::trace!(
"Querying DB for Allocation [{}] for Debit Note [{}]",
allocation_id,
debit_note_id
);
let allocation = match db
.as_dao::<AllocationDao>()
.get(allocation_id.clone(), node_id)
Expand All @@ -293,19 +304,27 @@ async fn accept_debit_note(
let timeout = query.timeout.unwrap_or(params::DEFAULT_ACK_TIMEOUT);
with_timeout(timeout, async move {
let issuer_id = debit_note.issuer_id;
let accept_msg = AcceptDebitNote::new(debit_note_id.clone(), acceptance, issuer_id);
let accept_msg = ::new(debit_note_id.clone(), acceptance, issuer_id);
let schedule_msg =
SchedulePayment::from_debit_note(debit_note, allocation_id, amount_to_pay);
match async move {
log::trace!(
"Sending AcceptDebitNote [{}] to [{}]",
debit_note_id,
issuer_id
);
ya_net::from(node_id)
.to(issuer_id)
.service(PUBLIC_SERVICE)
.call(accept_msg)
.await??;
if let Some(msg) = schedule_msg {
log::trace!("Calling SchedulePayment [{}] locally", debit_note_id);
bus::service(LOCAL_SERVICE).send(msg).await??;
}
log::trace!("Accepting Debit Note [{}] in DB", debit_note_id);
dao.accept(debit_note_id, node_id).await?;
log::trace!("Debit Note accepted successfully for [{}]", debit_note_id);

counter!("payment.debit_notes.requestor.accepted", 1);
Ok(())
Expand Down
16 changes: 16 additions & 0 deletions core/payment/src/api/invoices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ async fn accept_invoice(
let allocation_id = acceptance.allocation_id.clone();

let dao: InvoiceDao = db.as_dao();

log::trace!("Querying DB for Invoice [{}]", invoice_id);
let invoice = match dao.get(invoice_id.clone(), node_id).await {
Ok(Some(invoice)) => invoice,
Ok(None) => return response::not_found(),
Expand All @@ -303,6 +305,11 @@ async fn accept_invoice(
}

let agreement_id = invoice.agreement_id.clone();
log::trace!(
"Querying DB for Agreement [{}] for Invoice [{}]",
agreement_id,
invoice_id
);
let agreement = match db
.as_dao::<AgreementDao>()
.get(agreement_id.clone(), node_id)
Expand All @@ -316,6 +323,11 @@ async fn accept_invoice(
};
let amount_to_pay = &invoice.amount - &agreement.total_amount_scheduled.0;

log::trace!(
"Querying DB for Allocation [{}] for Invoice [{}]",
allocation_id,
invoice_id
);
let allocation = match db
.as_dao::<AllocationDao>()
.get(allocation_id.clone(), node_id)
Expand All @@ -341,15 +353,19 @@ async fn accept_invoice(
let accept_msg = AcceptInvoice::new(invoice_id.clone(), acceptance, issuer_id);
let schedule_msg = SchedulePayment::from_invoice(invoice, allocation_id, amount_to_pay);
match async move {
log::trace!("Sending AcceptInvoice [{}] to [{}]", invoice_id, issuer_id);
ya_net::from(node_id)
.to(issuer_id)
.service(PUBLIC_SERVICE)
.call(accept_msg)
.await??;
if let Some(msg) = schedule_msg {
log::trace!("Calling SchedulePayment [{}] locally", invoice_id);
bus::service(LOCAL_SERVICE).send(msg).await??;
}
log::trace!("Accepting Invoice [{}] in DB", invoice_id);
dao.accept(invoice_id, node_id).await?;
log::trace!("Invoice accepted successfully for [{}]", invoice_id);

counter!("payment.invoices.requestor.accepted", 1);
Ok(())
Expand Down
9 changes: 9 additions & 0 deletions core/payment/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,17 @@ impl PaymentProcessor {

pub async fn schedule_payment(&self, msg: SchedulePayment) -> Result<(), SchedulePaymentError> {
let amount = msg.amount.clone();
log::trace!("Getting driver for [{:?}]", msg.title);
let driver = self.registry.lock().await.driver(
&msg.payment_platform,
&msg.payer_addr,
AccountMode::SEND,
)?;
log::trace!(
"Scheduling payment [{:?}] using driver [{}]",
msg.title,
driver
);
let order_id = driver_endpoint(&driver)
.send(driver::SchedulePayment::new(
amount,
Expand All @@ -419,11 +425,14 @@ impl PaymentProcessor {
))
.await??;

log::trace!("Creating payment order in DB for [{:?}]", msg.title);
self.db_executor
.as_dao::<OrderDao>()
.create(msg, order_id, driver)
.await?;

log::trace!("Payment scheduled successfully for [{:?}]", msg.title);

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions core/payment/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,13 @@ pub async fn with_timeout<Work: Future<Output = HttpResponse>>(
) -> HttpResponse {
let timeout_secs = timeout_secs.into();
if timeout_secs > 0.0 {
log::trace!("Starting timeout for: {}s", timeout_secs);
match tokio::time::timeout(Duration::from_secs_f64(timeout_secs), work).await {
Ok(v) => v,
Err(_) => return HttpResponse::GatewayTimeout().finish(),
}
} else {
log::trace!("Executing /wo timeout.");
work.await
}
}
Expand Down

0 comments on commit 69c9ac2

Please sign in to comment.