Skip to content

Commit

Permalink
Merge pull request #3148 from golemfactory/release/v0.15
Browse files Browse the repository at this point in the history
Release/v0.15
  • Loading branch information
scx1332 authored Apr 3, 2024
2 parents 2949cf3 + d2d88ad commit 2d37874
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 328 deletions.
36 changes: 32 additions & 4 deletions core/model/src/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ pub mod local {
InvalidDefaultToken(String, String),
#[error("Invalid default network specified: {0}")]
InvalidDefaultNetwork(String),
#[error("Internal timeout")]
InternalTimeout,
}

impl RpcMessage for RegisterDriver {
Expand All @@ -168,13 +170,19 @@ pub mod local {
type Error = RegisterDriverError;
}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum UnregisterDriverError {
#[error("Internal timeout")]
InternalTimeout,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UnregisterDriver(pub String);

impl RpcMessage for UnregisterDriver {
const ID: &'static str = "UnregisterDriver";
type Item = ();
type Error = NoError;
type Error = UnregisterDriverError;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand All @@ -198,6 +206,8 @@ pub mod local {
UnsupportedToken(String, String, String),
#[error("Error while registering account: {0}")]
Other(String),
#[error("Internal timeout")]
InternalTimeout,
}

impl RpcMessage for RegisterAccount {
Expand All @@ -206,6 +216,12 @@ pub mod local {
type Error = RegisterAccountError;
}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum UnregisterAccountError {
#[error("Internal timeout")]
InternalTimeout,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UnregisterAccount {
pub platform: String,
Expand All @@ -215,7 +231,7 @@ pub mod local {
impl RpcMessage for UnregisterAccount {
const ID: &'static str = "UnregisterAccount";
type Item = ();
type Error = NoError;
type Error = UnregisterAccountError;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -348,10 +364,16 @@ pub mod local {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GetAccounts {}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum GetAccountsError {
#[error("Internal timeout")]
InternalTimeout,
}

impl RpcMessage for GetAccounts {
const ID: &'static str = "GetAccounts";
type Item = Vec<Account>;
type Error = GenericError;
type Error = GetAccountsError;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -431,10 +453,16 @@ pub mod local {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GetDrivers {}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum GetDriversError {
#[error("Internal timeout")]
InternalTimeout,
}

impl RpcMessage for GetDrivers {
const ID: &'static str = "GetDrivers";
type Item = HashMap<String, DriverDetails>;
type Error = NoError;
type Error = GetDriversError;
}

// ********************* STATUS ********************************
Expand Down
4 changes: 2 additions & 2 deletions core/payment-driver/erc20/config-payments.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ external-source-check-interval = 300

[[chain.holesky.rpc-endpoints]]
names = """
holesky.geth.golem.network
holesky.rpc-node.dev.golem.network
"""

endpoints = """
https://holesky.geth.golem.network,
https://holesky.rpc-node.dev.golem.network,
"""
priority = 0
max-timeout-ms = 5000
Expand Down
2 changes: 1 addition & 1 deletion core/payment/examples/payment_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async fn main() -> anyhow::Result<()> {

log::debug!("bind_gsb_router()");

let processor = PaymentProcessor::new(db.clone());
let processor = Arc::new(PaymentProcessor::new(db.clone()));
ya_payment::service::bind_service(&db, processor, BindOptions::default().run_sync_job(false));
log::debug!("bind_service()");

Expand Down
13 changes: 11 additions & 2 deletions core/payment/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ pub enum Error {
ExtService(#[from] ExternalServiceError),
#[error("RPC error: {0}")]
Rpc(#[from] RpcMessageError),
#[error("Timeout")]
Timeout(#[from] tokio::time::error::Elapsed),
}

impl From<ya_core_model::activity::RpcMessageError> for Error {
Expand Down Expand Up @@ -113,6 +111,7 @@ pub mod processor {
use crate::models::order::ReadObj as Order;
use bigdecimal::BigDecimal;
use std::fmt::Display;
use tokio::time::error::Elapsed;
use ya_core_model::driver::AccountMode;
use ya_core_model::payment::local::{
GenericError, ValidateAllocationError as GsbValidateAllocationError,
Expand Down Expand Up @@ -154,6 +153,8 @@ pub mod processor {
Database(#[from] DbError),
#[error("Payment service is shutting down")]
Shutdown,
#[error("Internal timeout")]
InternalTimeout(#[from] Elapsed),
}

impl From<SchedulePaymentError> for GenericError {
Expand Down Expand Up @@ -219,6 +220,8 @@ pub mod processor {
Database(#[from] DbError),
#[error("Singning error: {0}")]
Sign(#[from] ya_core_model::driver::GenericError),
#[error("Internal timeout")]
InternalTimeout(#[from] Elapsed),
}

impl NotifyPaymentError {
Expand Down Expand Up @@ -252,6 +255,8 @@ pub mod processor {
Database(#[from] DbError),
#[error("{0}")]
Validation(String),
#[error("Internal timeout")]
InternalTimeout(#[from] Elapsed),
}

impl VerifyPaymentError {
Expand Down Expand Up @@ -369,6 +374,8 @@ pub mod processor {
ServiceBus(#[from] ya_service_bus::error::Error),
#[error("Error while sending payment: {0}")]
Driver(#[from] ya_core_model::driver::GenericError),
#[error("Internal timeout")]
InternalTimeout(#[from] Elapsed),
}

#[derive(thiserror::Error, Debug)]
Expand All @@ -383,6 +390,8 @@ pub mod processor {
Database(#[from] DbError),
#[error("Payment service is shutting down")]
Shutdown,
#[error("Internal timeout")]
InternalTimeout(#[from] Elapsed),
}

impl From<ValidateAllocationError> for GsbValidateAllocationError {
Expand Down
5 changes: 3 additions & 2 deletions core/payment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::processor::PaymentProcessor;
use futures::FutureExt;
use service::BindOptions;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use ya_core_model::payment::local as pay_local;
use ya_persistence::executor::DbExecutor;
use ya_service_api_interfaces::*;
Expand All @@ -22,6 +22,7 @@ pub mod payment_sync;
pub mod processor;
pub mod schema;
pub mod service;
pub mod timeout_lock;
pub mod utils;
mod wallet;

Expand Down Expand Up @@ -52,7 +53,7 @@ impl PaymentService {
let db = context.component();
db.apply_migration(migrations::run_with_output)?;

let mut processor = PaymentProcessor::new(db.clone());
let processor = Arc::new(PaymentProcessor::new(db.clone()));
self::service::bind_service(&db, processor.clone(), BindOptions::default());

tokio::task::spawn(async move {
Expand Down
18 changes: 14 additions & 4 deletions core/payment/src/payment_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ use ya_core_model::{
};
use ya_net::RemoteEndpoint;
use ya_persistence::executor::DbExecutor;
use ya_service_bus::{typed, RpcEndpoint};
use ya_service_bus::{timeout::IntoTimeoutFuture, typed, RpcEndpoint};

use crate::dao::{DebitNoteDao, InvoiceDao, InvoiceEventDao, PaymentDao, SyncNotifsDao};

const SYNC_NOTIF_DELAY_0: Duration = Duration::from_secs(30);
const SYNC_NOTIF_RATIO: u32 = 6;
const SYNC_NOTIF_MAX_RETRIES: u32 = 7;

const REMOTE_CALL_TIMEOUT: Duration = Duration::from_secs(30);

async fn payment_sync(db: &DbExecutor, peer_id: NodeId) -> anyhow::Result<PaymentSync> {
let payment_dao: PaymentDao = db.as_dao();
let invoice_dao: InvoiceDao = db.as_dao();
Expand Down Expand Up @@ -180,9 +182,10 @@ async fn send_sync_notifs(db: &DbExecutor) -> anyhow::Result<Option<Duration>> {
.to(peer)
.service(ya_core_model::payment::public::BUS_ID)
.call(msg.clone())
.timeout(Some(REMOTE_CALL_TIMEOUT))
.await;

if matches!(&result, Ok(Ok(_))) {
if matches!(&result, Ok(Ok(Ok(_)))) {
mark_all_sent(db, msg).await?;
dao.drop(peer).await?;
} else {
Expand Down Expand Up @@ -247,10 +250,17 @@ async fn send_sync_requests_impl(db: DbExecutor) -> anyhow::Result<()> {
.to(peer_id)
.service(payment::public::BUS_ID)
.call(PaymentSyncRequest)
.timeout(Some(REMOTE_CALL_TIMEOUT))
.await;

if let Err(e) = result {
log::debug!("Couldn't deliver PaymentSyncRequest to [{peer_id}]: {e}");
match result {
Err(_) => {
log::debug!("Couldn't deliver PaymentSyncRequest to [{peer_id}]: timeout");
}
Ok(Err(e)) => {
log::debug!("Couldn't deliver PaymentSyncRequest to [{peer_id}]: {e}");
}
Ok(Ok(_)) => {}
}
}
}
Expand Down
Loading

0 comments on commit 2d37874

Please sign in to comment.