diff --git a/core/market/src/db/dao/agreement.rs b/core/market/src/db/dao/agreement.rs index 1d619a52f1..faa2c03f76 100644 --- a/core/market/src/db/dao/agreement.rs +++ b/core/market/src/db/dao/agreement.rs @@ -162,38 +162,48 @@ impl<'c> AgreementDao<'c> { &self, id: &AgreementId, session: &AppSessionId, - ) -> Result<(), AgreementDaoError> { + signature: &String, + ) -> Result { let id = id.clone(); let session = session.clone(); + let signature = signature.clone(); do_with_transaction(self.pool, move |conn| { let mut agreement: Agreement = market_agreement.filter(agreement::id.eq(&id)).first(conn)?; update_state(conn, &mut agreement, AgreementState::Pending)?; + update_proposed_signature(conn, &mut agreement, signature)?; if let Some(session) = session { update_session(conn, &mut agreement, session)?; } - Ok(()) + Ok(agreement) }) .await } /// Function won't change appSessionId, if session parameter is None. - pub async fn approve( + /// Signature will be placed in `approved_signature` field. + pub async fn approving( &self, id: &AgreementId, session: &AppSessionId, - ) -> Result<(), AgreementDaoError> { + signature: &String, + timestamp: &NaiveDateTime, + ) -> Result { let id = id.clone(); let session = session.clone(); + let signature = signature.clone(); + let timestamp = timestamp.clone(); do_with_transaction(self.pool, move |conn| { let mut agreement: Agreement = market_agreement.filter(agreement::id.eq(&id)).first(conn)?; - update_state(conn, &mut agreement, AgreementState::Approved)?; + update_state(conn, &mut agreement, AgreementState::Approving)?; + update_approved_signature(conn, &mut agreement, signature)?; + update_approve_timestamp(conn, &mut agreement, timestamp)?; // It's important, that if None AppSessionId comes, we shouldn't update Agreement // appSessionId field to None. This function can be called in different context, for example @@ -201,10 +211,31 @@ impl<'c> AgreementDao<'c> { if let Some(session) = session { update_session(conn, &mut agreement, session)?; } + Ok(agreement) + }) + .await + } + + /// Signature will be placed in `committed_signature` field. + pub async fn approve( + &self, + id: &AgreementId, + signature: &String, + ) -> Result { + let id = id.clone(); + let signature = signature.clone(); + + do_with_transaction(self.pool, move |conn| { + let mut agreement: Agreement = + market_agreement.filter(agreement::id.eq(&id)).first(conn)?; + + update_state(conn, &mut agreement, AgreementState::Approved)?; + update_committed_signature(conn, &mut agreement, signature)?; + // Always Provider approves. create_event(conn, &agreement, None, Owner::Provider)?; - Ok(()) + Ok(agreement) }) .await } @@ -229,6 +260,29 @@ impl<'c> AgreementDao<'c> { .await } + pub async fn revert_approving(&self, id: &AgreementId) -> Result { + let id = id.clone(); + + do_with_transaction(self.pool, move |conn| { + let agreement: Agreement = + market_agreement.filter(agreement::id.eq(&id)).first(conn)?; + + if agreement.state != AgreementState::Approving { + return Err(AgreementDaoError::InvalidTransition { + from: agreement.state, + to: AgreementState::Pending, + }); + } + + let num_updated = diesel::update(market_agreement.find(&id)) + .set(agreement::state.eq(&AgreementState::Pending)) + .execute(conn) + .map_err(|e| AgreementDaoError::DbError(e.into()))?; + Ok(num_updated > 0) + }) + .await + } + pub async fn clean(&self) -> DbResult<()> { // FIXME use grace time from config file when #460 is merged log::trace!("Clean market agreements: start"); @@ -296,6 +350,65 @@ fn update_state( Ok(num_updated > 0) } +fn update_proposed_signature( + conn: &ConnType, + agreement: &mut Agreement, + signature: String, +) -> Result { + let signature = Some(signature); + let num_updated = diesel::update(market_agreement.find(&agreement.id)) + .set(agreement::proposed_signature.eq(&signature)) + .execute(conn) + .map_err(|e| AgreementDaoError::DbError(e.into()))?; + + agreement.proposed_signature = signature; + Ok(num_updated > 0) +} + +fn update_approved_signature( + conn: &ConnType, + agreement: &mut Agreement, + signature: String, +) -> Result { + let signature = Some(signature); + let num_updated = diesel::update(market_agreement.find(&agreement.id)) + .set(agreement::approved_signature.eq(&signature)) + .execute(conn) + .map_err(|e| AgreementDaoError::DbError(e.into()))?; + + agreement.approved_signature = signature; + Ok(num_updated > 0) +} + +fn update_committed_signature( + conn: &ConnType, + agreement: &mut Agreement, + signature: String, +) -> Result { + let signature = Some(signature); + let num_updated = diesel::update(market_agreement.find(&agreement.id)) + .set(agreement::committed_signature.eq(&signature)) + .execute(conn) + .map_err(|e| AgreementDaoError::DbError(e.into()))?; + + agreement.committed_signature = signature; + Ok(num_updated > 0) +} + +fn update_approve_timestamp( + conn: &ConnType, + agreement: &mut Agreement, + timestamp: NaiveDateTime, +) -> Result { + let num_updated = diesel::update(market_agreement.find(&agreement.id)) + .set(agreement::approved_ts.eq(×tamp)) + .execute(conn) + .map_err(|e| AgreementDaoError::DbError(e.into()))?; + + agreement.approved_ts = Some(timestamp); + Ok(num_updated > 0) +} + fn update_session( conn: &ConnType, agreement: &mut Agreement, diff --git a/core/market/src/db/model/agreement_events.rs b/core/market/src/db/model/agreement_events.rs index 5326d525d3..f88641595f 100644 --- a/core/market/src/db/model/agreement_events.rs +++ b/core/market/src/db/model/agreement_events.rs @@ -64,7 +64,10 @@ impl NewAgreementEvent { Ok(Self { agreement_id: agreement.id.clone(), event_type: match agreement.state { - AgreementState::Pending | AgreementState::Proposal | AgreementState::Expired => { + AgreementState::Pending + | AgreementState::Proposal + | AgreementState::Expired + | AgreementState::Approving => { let msg = format!("Wrong [{}] state {}", agreement.id, agreement.state); log::error!("{}", msg); return Err(EventFromAgreementError(msg)); diff --git a/core/market/src/negotiation/common.rs b/core/market/src/negotiation/common.rs index 2a8382aba5..bbc0834da2 100644 --- a/core/market/src/negotiation/common.rs +++ b/core/market/src/negotiation/common.rs @@ -361,7 +361,13 @@ impl CommonBroker { validate_transition(&agreement, AgreementState::Terminated)?; - protocol_common::propagate_terminate_agreement(&agreement, reason.clone()).await?; + protocol_common::propagate_terminate_agreement( + &agreement, + reason.clone(), + "NotSigned".to_string(), + Utc::now().naive_utc(), + ) + .await?; let reason_string = CommonBroker::reason2string(&reason); dao.terminate(&agreement.id, reason_string, agreement.id.owner()) diff --git a/core/market/src/negotiation/error.rs b/core/market/src/negotiation/error.rs index 1d4750ac37..d3d28d7483 100644 --- a/core/market/src/negotiation/error.rs +++ b/core/market/src/negotiation/error.rs @@ -15,8 +15,9 @@ use crate::db::{ }; use crate::matcher::error::{DemandError, QueryOfferError}; use crate::protocol::negotiation::error::{ - ApproveAgreementError, CounterProposalError as ProtocolProposalError, GsbAgreementError, - NegotiationApiInitError, ProposeAgreementError, RejectProposalError, TerminateAgreementError, + ApproveAgreementError, CommitAgreementError, CounterProposalError as ProtocolProposalError, + GsbAgreementError, NegotiationApiInitError, ProposeAgreementError, RejectProposalError, + TerminateAgreementError, }; #[derive(Error, Debug)] @@ -46,6 +47,8 @@ pub enum MatchValidationError { pub enum AgreementError { #[error("Agreement [{0}] not found.")] NotFound(String), + #[error("Agreement [{0}] expired.")] + Expired(AgreementId), #[error("Can't create Agreement for Proposal {0}. Proposal {1} not found.")] ProposalNotFound(ProposalId, ProposalId), #[error("Can't create second Agreement [{0}] for Proposal [{1}].")] @@ -74,6 +77,8 @@ pub enum AgreementError { ProtocolApprove(#[from] ApproveAgreementError), #[error("Protocol error while terminating: {0}")] ProtocolTerminate(#[from] TerminateAgreementError), + #[error("Protocol error while committing: {0}")] + ProtocolCommit(#[from] CommitAgreementError), #[error("Internal error: {0}")] Internal(String), } diff --git a/core/market/src/negotiation/notifier.rs b/core/market/src/negotiation/notifier.rs index 0fa31fe241..ca6fbe86a1 100644 --- a/core/market/src/negotiation/notifier.rs +++ b/core/market/src/negotiation/notifier.rs @@ -11,11 +11,11 @@ where Type: Debug + PartialEq + Clone + EnableDisplay + 'static, for<'a> DisplayEnabler<'a, Type>: std::fmt::Display, { - #[error("Timeout while waiting for events for subscription [{}]", .0.display())] + #[error("Timeout while waiting for events for id [{}]", .0.display())] Timeout(Type), #[error("Unsubscribed [{}]", .0.display())] Unsubscribed(Type), - #[error("Channel closed while waiting for events for subscription [{}]", .0.display())] + #[error("Channel closed while waiting for events for id [{}]", .0.display())] ChannelClosed(Type), } diff --git a/core/market/src/negotiation/provider.rs b/core/market/src/negotiation/provider.rs index 10843746ed..d7e4b7986f 100644 --- a/core/market/src/negotiation/provider.rs +++ b/core/market/src/negotiation/provider.rs @@ -4,8 +4,10 @@ use metrics::counter; use std::sync::Arc; use ya_client::model::market::{event::ProviderEvent, NewProposal, Reason}; +use ya_core_model::NodeId; use ya_persistence::executor::DbExecutor; use ya_service_api_web::middleware::Identity; +use ya_std_utils::LogErr; use crate::db::{ dao::{AgreementDao, NegotiationEventsDao, ProposalDao, SaveAgreementError}, @@ -19,9 +21,11 @@ use super::common::CommonBroker; use super::error::*; use super::notifier::EventNotifier; use crate::config::Config; +use crate::db::dao::AgreementDaoError; use crate::negotiation::common::validate_transition; +use crate::negotiation::notifier::NotifierError; use crate::utils::display::EnableDisplay; -use ya_core_model::NodeId; +use std::time::Instant; #[derive(Clone, Debug, Eq, PartialEq, derive_more::Display)] pub enum ApprovalResult { @@ -78,9 +82,7 @@ impl ProviderBroker { .on_agreement_terminated(msg, caller, Owner::Requestor) }, move |caller: String, msg: AgreementCommitted| { - commit_broker - .clone() - .on_agreement_committed(msg, caller, Owner::Requestor) + on_agreement_committed(commit_broker.clone(), caller, msg) }, ); @@ -217,22 +219,28 @@ impl ProviderBroker { app_session_id: AppSessionId, timeout: f32, ) -> Result { + let start_now = Instant::now(); let dao = self.common.db.as_dao::(); let agreement = { let _hold = self.common.agreement_lock.lock(&agreement_id).await; - let agreement = match dao - .select(agreement_id, None, Utc::now().naive_utc()) + let agreement = dao + .select(agreement_id, Some(id.identity), Utc::now().naive_utc()) .await .map_err(|e| AgreementError::Get(agreement_id.to_string(), e))? - { - None => Err(AgreementError::NotFound(agreement_id.to_string()))?, - Some(agreement) => agreement, - }; + .ok_or(AgreementError::NotFound(agreement_id.to_string()))?; validate_transition(&agreement, AgreementState::Approving)?; - // TODO: Update app_session_id here. + // TODO: Sign Agreement. + let signature = "NoSignature".to_string(); + let timestamp = Utc::now().naive_utc(); + + let agreement = dao + .approving(&agreement.id, &app_session_id, &signature, ×tamp) + .await + .map_err(|e| AgreementError::UpdateState(agreement.id.clone(), e))?; + if let Some(session) = app_session_id { log::info!( "AppSession id [{}] set for Agreement [{}].", @@ -240,20 +248,20 @@ impl ProviderBroker { &agreement.id ); } - - // TODO: Update state to `AgreementState::Approving`. - agreement }; + // Listen to Agreements notification before we start sending message, because otherwise + // we can lose events. + let mut notifier = self.common.agreement_notifier.listen(&agreement.id); + // It doesn't have to be under lock, since we have `Approving` state. // Note that this state change between `Approving` and `Pending` in both // ways is invisible for REST and GSB user, because `Approving` is only our // internal state and is mapped to `Pending`. // - // Note: There's reason, that it CAN'T be done under lock. If we hold lock whole time + // Note: There's reason, that it CAN'T be done under lock. If we hold lock whole time // Requestor won't be able to cancel his Agreement proposal and he is allowed to do it. - // TODO: Send signature and `approved_ts` from Agreement. self.api.approve_agreement(&agreement, timeout).await?; // TODO: Reverse state to `Pending` in case of error (reverse under lock). // TODO: During reversing it can turn out, that we are in `Cancelled` or `Approved` state @@ -267,14 +275,46 @@ impl ProviderBroker { agreement.requestor_id ); - // TODO: Here we must wait until `AgreementCommitted` message, since `approve_agreement` - // is supposed to return after approval. - // TODO: Waiting should set timeout. - // TODO: What in case of timeout?? Reverse to `Pending` state? - // Note: This function isn't responsible for changing Agreement state to `Approved`. + // TODO: Adjust timeout to elapsed time. + // Here we must wait until `AgreementCommitted` message, since `approve_agreement` + // is supposed to return after approval. + let timeout = + std::time::Duration::from_secs_f64(timeout as f64) - (Instant::now() - start_now); + match notifier.wait_for_event_with_timeout(timeout).await { + Err(NotifierError::Timeout(_)) => { + // TODO: What in case of timeout?? Reverse to `Pending` state? + Err(ApproveAgreementError::Timeout(agreement.id.clone()))? + } + Err(error) => Err(AgreementError::Internal(format!( + "Code logic error. Agreement events notifier shouldn't return: {}.", + error + )))?, + Ok(_) => (), + } // TODO: Check Agreement state here, because it could be `Cancelled`. - return Ok(ApprovalResult::Approved); + { + let _hold = self.common.agreement_lock.lock(&agreement_id).await; + + let agreement = dao + .select(agreement_id, None, Utc::now().naive_utc()) + .await + .map_err(|e| AgreementError::Get(agreement_id.to_string(), e))? + .ok_or(AgreementError::Internal(format!( + "Agreement [{}], which existed previously, disappeared.", + agreement_id + )))?; + + match agreement.state { + AgreementState::Cancelled => Ok(ApprovalResult::Cancelled), + AgreementState::Approved => Ok(ApprovalResult::Approved), + AgreementState::Expired => Err(AgreementError::Expired(agreement.id.clone()))?, + _ => Err(AgreementError::Internal(format!( + "Agreement [{}] has unexpected state [{}]", + agreement.id, agreement.state + ))), + } + } } } @@ -301,30 +341,49 @@ async fn agreement_committed( // Note: we still validate caller here, because we can't be sure, that we were caller // by the same Requestor. - let agreement = match dao + let agreement = dao .select(&msg.agreement_id, Some(caller), Utc::now().naive_utc()) .await - .map_err(|e| AgreementError::Get(msg.agreement_id.to_string(), e))? - { - None => Err(AgreementError::NotFound(msg.agreement_id.to_string()))?, - Some(agreement) => agreement, - }; + .map_err(|e| RemoteCommitAgreementError::Unexpected { + public_msg: "Internal Error getting Agreement".to_string(), + original_msg: e.to_string(), + }) + .log_err()? + .ok_or(RemoteCommitAgreementError::NotFound)?; // Note: We can find out here, that our Agreement is already in `Cancelled` state, because // Requestor is allowed to call `cancel_agreement` at any time, before we commit Agreement. - // In this case we should return here, but we still must call `notify_agreement` to wake other threads. - validate_transition(&agreement, AgreementState::Approving)?; + // In this case we should return here and `cancel_agreement` handler is responsible for + // calling `notify_agreement`. + match validate_transition(&agreement, AgreementState::Approving) { + Ok(_) => Ok(()), + Err(AgreementError::UpdateState( + _, + AgreementDaoError::InvalidTransition { from, .. }, + )) => match from { + AgreementState::Cancelled => Err(RemoteCommitAgreementError::Cancelled), + _ => Err(RemoteCommitAgreementError::InvalidState(from)), + }, + Err(e) => Err(RemoteCommitAgreementError::Unexpected { + public_msg: "Failed to validate Agreement state".to_string(), + original_msg: e.to_string(), + }), + } + .log_err()?; // TODO: Validate committed signature from message. - // TODO: `approve` shouldn't set AppSessionId anymore. - dao.approve(&msg.agreement_id, &app_session_id) + dao.approve(&msg.agreement_id, &msg.signature) .await - .map_err(|e| AgreementError::UpdateState(msg.agreement_id.clone(), e))?; - agreement + .map_err(|e| RemoteCommitAgreementError::Unexpected { + public_msg: "Failed to update Agreement state to `Approved`.".to_string(), + original_msg: e.to_string(), + }) + .log_err()? }; broker.notify_agreement(&agreement).await; + counter!("market.agreements.provider.approved", 1); log::info!( "Agreement [{}] approved (committed) by [{}].", @@ -434,6 +493,9 @@ async fn agreement_received( Owner::Provider, ); agreement.state = AgreementState::Pending; + agreement.proposed_signature = Some(msg.signature); + + // TODO: Validate signature. // Check if we generated the same id, as Requestor sent us. If not, reject // it, because wrong generated ids could be not unique. diff --git a/core/market/src/negotiation/requestor.rs b/core/market/src/negotiation/requestor.rs index 016867d758..2655fe4d3e 100644 --- a/core/market/src/negotiation/requestor.rs +++ b/core/market/src/negotiation/requestor.rs @@ -9,6 +9,7 @@ use ya_client::model::market::{event::RequestorEvent, NewProposal, Reason}; use ya_client::model::{node_id::ParseError, NodeId}; use ya_persistence::executor::DbExecutor; use ya_service_api_web::middleware::Identity; +use ya_std_utils::LogErr; use crate::db::{ dao::{AgreementDao, AgreementDaoError, SaveAgreementError}, @@ -329,7 +330,7 @@ impl RequestorBroker { AgreementState::Terminated => { return Err(WaitForApprovalError::Terminated(id.clone())) } - AgreementState::Pending => (), // Still waiting for approval. + AgreementState::Pending | AgreementState::Approving => (), // Still waiting for approval. }; if let Err(error) = notifier.wait_for_event_with_timeout(timeout).await { @@ -359,7 +360,7 @@ impl RequestorBroker { // Provider approving Agreement before we set proper state in database. let _hold = self.common.agreement_lock.lock(&agreement_id).await; - let agreement = match dao + let mut agreement = match dao .select( agreement_id, Some(id.identity.clone()), @@ -374,11 +375,12 @@ impl RequestorBroker { validate_transition(&agreement, AgreementState::Pending)?; - // TODO : possible race condition here ISSUE#430 - // 1. this state check should be also `db.update_state` - // 2. `dao.confirm` must be invoked after successful propose_agreement + // TODO: Sign Agreement. + let signature = "NoSignature".to_string(); + agreement.proposed_signature = Some(signature.clone()); + self.api.propose_agreement(&agreement).await?; - dao.confirm(agreement_id, &app_session_id) + dao.confirm(agreement_id, &app_session_id, &signature) .await .map_err(|e| AgreementError::UpdateState(agreement_id.clone(), e))?; } @@ -423,16 +425,15 @@ async fn agreement_approved( caller: NodeId, msg: AgreementApproved, ) -> Result<(), RemoteAgreementError> { - // TODO: We should check as many conditions here, as possible, because we want + // TODO: We should check here as many condition, as possible, because we want // to return meaningful message to Provider, what is impossible from `commit_agreement`. + let dao = broker.db.as_dao::(); let agreement = { // We aren't sure, if `confirm_agreement` execution is finished, // so we must lock, to avoid attempt to change database state before. let _hold = broker.agreement_lock.lock(&msg.agreement_id).await; - let agreement = broker - .db - .as_dao::() + let agreement = dao .select(&msg.agreement_id, None, Utc::now().naive_utc()) .await .map_err(|_e| RemoteAgreementError::NotFound(msg.agreement_id.clone()))? @@ -443,24 +444,53 @@ async fn agreement_approved( Err(RemoteAgreementError::NotFound(msg.agreement_id.clone()))? } - // TODO: Validate Agreement `valid_to` timestamp. In previous version we got + validate_transition(&agreement, AgreementState::Approving).map_err(|_| { + RemoteAgreementError::InvalidState(agreement.id.clone(), agreement.state.clone()) + })?; + + // Validate Agreement `valid_to` timestamp. In previous version we got // error from database update, but know we want to escape early, because // otherwise we can't response with this error to Provider. - validate_transition(&agreement, AgreementState::Approving)?; + let margin = chrono::Duration::milliseconds(30); + if agreement.valid_to <= (Utc::now() + margin).naive_utc() { + return Err(RemoteAgreementError::Expired(agreement.id.clone())); + } // TODO: Validate agreement signature. - // TODO: Sign Agreement and update `approved_signature`. - // TODO: Update `approved_ts` Agreement field. - // TODO: Update state to `AgreementState::Approving`. + let signature = "NoSignature".to_string(); - agreement + // Note: session must be None, because either we already set this value in ConfirmAgreement, + // or we purposely left it None. + dao.approving(&agreement.id, &None, &signature, &msg.approved_ts) + .await + .map_err(|err| match err { + AgreementDaoError::InvalidTransition { from, .. } => { + match from { + // Expired Agreement could be InvalidState either, but we want to explicit + // say to provider, that Agreement has expired. + AgreementState::Expired => { + RemoteAgreementError::Expired(agreement.id.clone()) + } + _ => RemoteAgreementError::InvalidState(agreement.id.clone(), from), + } + } + e => { + // Log our internal error, but don't reveal error message to Provider. + log::warn!( + "Approve Agreement [{}] internal error: {}", + &agreement.id, + e + ); + RemoteAgreementError::InternalError(agreement.id.clone()) + } + })? }; - /// TODO: Commit Agreement. We must spawn committing later, because we need to - /// return from this function to provider. - tokio::task::spawn_local(commit_agreement(broker, agreement.id)); + // Commit Agreement. We must spawn committing later, because we need to + // return from this function to provider. + tokio::task::spawn_local(commit_agreement(broker, agreement.id.clone())); log::info!( - "Agreement [{}] approved by [{}].", + "Agreement [{}] approved by [{}]. Committing...", &agreement.id, &agreement.provider_id ); @@ -471,53 +501,59 @@ async fn commit_agreement(broker: CommonBroker, agreement_id: AgreementId) { // Note: in this scenario, we update database after Provider already // got `AgreementCommitted` and updated Agreement state to `Approved`, so we will // wake up `wait_for_agreement` after Provider. - let agreement = match { + let dao = broker.db.as_dao::(); + let agreement = match async { let _hold = broker.agreement_lock.lock(&agreement_id).await; let dao = broker.db.as_dao::(); - let agreement = dao - .select(&msg.agreement_id, None, Utc::now().naive_utc()) + let mut agreement = dao + .select(&agreement_id, None, Utc::now().naive_utc()) .await - .map_err(|_e| RemoteAgreementError::NotFound(msg.agreement_id.clone()))? - .ok_or(RemoteAgreementError::NotFound(msg.agreement_id.clone()))?; + .map_err(|_e| AgreementError::NotFound(agreement_id.to_string()))? + .ok_or(AgreementError::NotFound(agreement_id.to_string()))?; - // TODO: Send `AgreementCommited` message to Provider. - // TODO: We have problem to make GSB call here, since we don't have `api` object. - // we must place this function in `protocol/negotiation/common` + // TODO: Sign Agreement. + let signature = "NoSignature".to_string(); + agreement.committed_signature = Some(signature.clone()); // Note: This GSB call is racing with potential `cancel_agreement` call. // In this case Provider code will decide, which call won the race. + NegotiationApi::commit_agreement(&agreement).await?; - // Note: session must be None, because either we already set this value in ConfirmAgreement, - // or we purposely left it None. - dao.approve(&agreement_id, &None) + // We approve Agreement in database, when we are sure, that committing succeeded. + Ok(dao + .approve(&agreement.id, &signature) .await - .map_err(|err| match err { - AgreementDaoError::InvalidTransition { from, .. } => { - match from { - // Expired Agreement could be InvalidState either, but we want to explicit - // say to provider, that Agreement has expired. - AgreementState::Expired => { - RemoteAgreementError::Expired(agreement_id.clone()) - } - _ => RemoteAgreementError::InvalidState(agreement_id.clone(), from), - } - } - e => { - // Log our internal error, but don't reveal error message to Provider. - log::warn!( - "Approve Agreement [{}] internal error: {}", - &agreement_id, - e - ); - RemoteAgreementError::InternalError(agreement_id.clone()) - } - })?; - Ok(agreement) - } { + .map_err(|e| AgreementError::UpdateState(agreement.id.clone(), e))?) + } + .await + { Ok(agreement) => agreement, + // Return to `Pending` state here unless we are in `Cancelled` state. + Err(AgreementError::ProtocolCommit(CommitAgreementError::Remote( + RemoteCommitAgreementError::Cancelled, + _, + ))) => { + return log::info!( + "Didn't commit Agreement [{}] since it was cancelled.", + agreement_id + ); + } Err(e) => { - // TODO: Return to pending state here. + log::warn!( + "Failed to commit Agreement [{}]. {}. Reverting state to `Pending`.", + e, + agreement_id + ); + + dao.revert_approving(&agreement_id) + .await + .log_err_msg(&format!( + "Failed revert state to `Pending` for Agreement [{}]", + agreement_id + )) + .ok(); + return; } }; @@ -525,7 +561,7 @@ async fn commit_agreement(broker: CommonBroker, agreement_id: AgreementId) { counter!("market.agreements.requestor.approved", 1); log::info!( - "Agreement [{}] approved (committed) by [{}].", + "Agreement [{}] committed (approved) by [{}].", &agreement.id, &agreement.provider_id ); diff --git a/core/market/src/protocol/negotiation.rs b/core/market/src/protocol/negotiation.rs index ad38fc8203..a907a30027 100644 --- a/core/market/src/protocol/negotiation.rs +++ b/core/market/src/protocol/negotiation.rs @@ -14,14 +14,20 @@ pub mod common { use ya_net::{self as net, RemoteEndpoint}; use ya_service_bus::RpcEndpoint; + use chrono::NaiveDateTime; + /// Sent to notify other side about termination. pub async fn propagate_terminate_agreement( agreement: &Agreement, reason: Option, + signature: String, + timestamp: NaiveDateTime, ) -> Result<(), TerminateAgreementError> { let msg = AgreementTerminated { agreement_id: agreement.id.clone().swap_owner(), reason, + signature, + termination_ts: timestamp, }; log::debug!( diff --git a/core/market/src/protocol/negotiation/error.rs b/core/market/src/protocol/negotiation/error.rs index 09c696fb06..5c1775c5d7 100644 --- a/core/market/src/protocol/negotiation/error.rs +++ b/core/market/src/protocol/negotiation/error.rs @@ -49,6 +49,7 @@ pub enum RejectProposalError { } #[derive(Error, Debug, Serialize, Deserialize)] +#[non_exhaustive] pub enum RemoteProposalError { #[error(transparent)] Validation(#[from] ProposalValidationError), @@ -72,9 +73,12 @@ pub enum ProposeAgreementError { Gsb(#[from] GsbAgreementError), #[error("Agreement [{1}] remote error: {0}")] Remote(RemoteProposeAgreementError, AgreementId), + #[error("Agreement [{0}] not signed.")] + NotSigned(AgreementId), } #[derive(Error, Debug, Serialize, Deserialize)] +#[non_exhaustive] pub enum RemoteProposeAgreementError { #[error("Proposal [{0}] not found.")] NotFound(ProposalId), @@ -110,6 +114,10 @@ pub enum ApproveAgreementError { }, #[error("Timeout while sending approval of Agreement [{0}]")] Timeout(AgreementId), + #[error("Agreement [{0}] doesn't contain approval timestamp.")] + NoApprovalTimestamp(AgreementId), + #[error("Agreement [{0}] not signed.")] + NotSigned(AgreementId), } #[derive(Error, Debug, Serialize, Deserialize)] @@ -130,6 +138,7 @@ pub enum TerminateAgreementError { } #[derive(Error, Debug, Serialize, Deserialize)] +#[non_exhaustive] pub enum RemoteAgreementError { #[error("Agreement [{0}] not found.")] NotFound(AgreementId), @@ -143,18 +152,32 @@ pub enum RemoteAgreementError { #[derive(Error, Debug, Serialize, Deserialize)] pub enum CommitAgreementError { - #[error("Remote Commit Agreement [{1}] error: {0}")] + #[error("Commit Agreement {0}.")] + Gsb(#[from] GsbAgreementError), + #[error("Remote commit Agreement [{1}] error: {0}")] Remote(RemoteCommitAgreementError, AgreementId), #[error(transparent)] CallerParse(#[from] CallerParseError), + #[error("Agreement [{0}] not signed.")] + NotSigned(AgreementId), } #[derive(Error, Debug, Serialize, Deserialize)] +#[non_exhaustive] pub enum RemoteCommitAgreementError { - #[error("Agreement [{0}] expired.")] - Expired(AgreementId), - #[error("Agreement [{0}] cancelled.")] - Cancelled(AgreementId), + #[error("Agreement expired.")] + Expired, + #[error("Agreement cancelled.")] + Cancelled, + #[error("Agreement not found.")] + NotFound, + #[error("Agreement in state {0}, can't be committed.")] + InvalidState(AgreementState), + #[error("Unexpected error: {public_msg} {original_msg}.")] + Unexpected { + public_msg: String, + original_msg: String, + }, } impl RemoteSensitiveError for RemoteProposeAgreementError { diff --git a/core/market/src/protocol/negotiation/messages.rs b/core/market/src/protocol/negotiation/messages.rs index fb1132eb68..cb109c714a 100644 --- a/core/market/src/protocol/negotiation/messages.rs +++ b/core/market/src/protocol/negotiation/messages.rs @@ -5,7 +5,9 @@ use ya_client::model::market::Reason; use ya_service_bus::RpcMessage; use crate::db::model::{AgreementId, DbProposal, Owner, Proposal, ProposalId, SubscriptionId}; -use crate::protocol::negotiation::error::{ProposeAgreementError, RejectProposalError}; +use crate::protocol::negotiation::error::{ + CommitAgreementError, ProposeAgreementError, RejectProposalError, +}; use super::super::callback::CallbackMessage; use super::error::{ @@ -197,9 +199,9 @@ pub struct AgreementCommitted { } impl RpcMessage for AgreementCommitted { - const ID: &'static str = "CommitAgreement"; + const ID: &'static str = "AgreementCommitted"; type Item = (); - type Error = AgreementCommitted; + type Error = CommitAgreementError; } /// The same messaged will be used on GSB and as messages in callbacks. diff --git a/core/market/src/protocol/negotiation/provider.rs b/core/market/src/protocol/negotiation/provider.rs index 8fb8d887b9..a8273ef2cf 100644 --- a/core/market/src/protocol/negotiation/provider.rs +++ b/core/market/src/protocol/negotiation/provider.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; +use ya_client::model::market::Reason; use ya_client::model::NodeId; use ya_core_model::market::BUS_ID; use ya_net::{self as net, RemoteEndpoint}; @@ -14,13 +15,11 @@ use super::error::{ NegotiationApiInitError, TerminateAgreementError, }; use super::messages::{ - provider, requestor, AgreementApproved, AgreementCancelled, AgreementReceived, - AgreementRejected, AgreementTerminated, InitialProposalReceived, ProposalContent, - ProposalReceived, ProposalRejected, + provider, requestor, AgreementApproved, AgreementCancelled, AgreementCommitted, + AgreementReceived, AgreementRejected, AgreementTerminated, InitialProposalReceived, + ProposalContent, ProposalReceived, ProposalRejected, }; use crate::protocol::negotiation::error::{ProposeAgreementError, RejectProposalError}; -use crate::testing::negotiation::messages::AgreementCommitted; -use ya_client::model::market::Reason; /// Responsible for communication with markets on other nodes /// during negotiation phase. @@ -39,6 +38,9 @@ struct NegotiationImpl { agreement_committed: HandlerSlot, } +// TODO: Most of these functions don't need to be members of NegotiationApi. +// We should make them plain functions in `provider` module, since it doesn't +// seem, that they will ever need self. impl NegotiationApi { pub fn new( initial_proposal_received: impl CallbackHandler, @@ -105,15 +107,23 @@ impl NegotiationApi { Ok(()) } - /// TODO: pass agreement signature. pub async fn approve_agreement( &self, agreement: &Agreement, timeout: f32, ) -> Result<(), ApproveAgreementError> { let timeout = Duration::from_secs_f32(timeout.max(0.0)); + let id = agreement.id.clone(); + let msg = AgreementApproved { - agreement_id: agreement.id.clone(), + agreement_id: id.clone(), + signature: agreement + .approved_signature + .clone() + .ok_or(ApproveAgreementError::NotSigned(id.clone()))?, + approved_ts: agreement + .approved_ts + .ok_or(ApproveAgreementError::NoApprovalTimestamp(id.clone()))?, }; let net_send_fut = net::from(agreement.provider_id) .to(agreement.requestor_id) @@ -121,8 +131,8 @@ impl NegotiationApi { .send(msg); tokio::time::timeout(timeout, net_send_fut) .await - .map_err(|_| ApproveAgreementError::Timeout(agreement.id.clone()))? - .map_err(|e| GsbAgreementError(e.to_string(), agreement.id.clone()))??; + .map_err(|_| ApproveAgreementError::Timeout(id.clone()))? + .map_err(|e| GsbAgreementError(e.to_string(), id.clone()))??; Ok(()) } @@ -134,6 +144,7 @@ impl NegotiationApi { ) -> Result<(), GsbAgreementError> { let msg = AgreementRejected { agreement_id: agreement_id.clone(), + reason: None, }; net::from(id) .to(owner) diff --git a/core/market/src/protocol/negotiation/requestor.rs b/core/market/src/protocol/negotiation/requestor.rs index 5aa784cae4..2e7db95ee6 100644 --- a/core/market/src/protocol/negotiation/requestor.rs +++ b/core/market/src/protocol/negotiation/requestor.rs @@ -1,6 +1,7 @@ use futures::future::TryFutureExt; use std::sync::Arc; +use ya_client::model::market::Reason; use ya_client::model::NodeId; use ya_core_model::market::BUS_ID; use ya_net::{self as net, RemoteEndpoint}; @@ -18,8 +19,10 @@ use super::messages::{ AgreementRejected, AgreementTerminated, InitialProposalReceived, ProposalContent, ProposalReceived, ProposalRejected, }; -use crate::protocol::negotiation::error::{ProposeAgreementError, RejectProposalError}; -use ya_client::model::market::Reason; +use crate::protocol::negotiation::error::{ + CommitAgreementError, ProposeAgreementError, RejectProposalError, +}; +use crate::protocol::negotiation::messages::AgreementCommitted; /// Responsible for communication with markets on other nodes /// during negotiation phase. @@ -36,6 +39,9 @@ struct NegotiationImpl { agreement_terminated: HandlerSlot, } +// TODO: Most of these functions don't need to be members of NegotiationApi. +// We should make them plain functions in `requestor` module, since it doesn't +// seem, that they will ever need self. impl NegotiationApi { pub fn new( proposal_received: impl CallbackHandler, @@ -132,18 +138,42 @@ impl NegotiationApi { ) -> Result<(), ProposeAgreementError> { let requestor_id = agreement.requestor_id.clone(); let provider_id = agreement.provider_id.clone(); - let agreement_id = agreement.id.clone(); + let id = agreement.id.clone(); let msg = AgreementReceived { agreement_id: agreement.id.clone(), valid_to: agreement.valid_to.clone(), creation_ts: agreement.creation_ts.clone(), proposal_id: agreement.offer_proposal_id.clone(), + signature: agreement + .proposed_signature + .clone() + .ok_or(ProposeAgreementError::NotSigned(id.clone()))?, + }; + net::from(requestor_id) + .to(provider_id) + .service(&provider::agreement_addr(BUS_ID)) + .send(msg) + .map_err(|e| GsbAgreementError(e.to_string(), id)) + .await??; + Ok(()) + } + + pub async fn commit_agreement(agreement: &Agreement) -> Result<(), CommitAgreementError> { + let requestor_id = agreement.requestor_id.clone(); + let provider_id = agreement.provider_id.clone(); + let id = agreement.id.clone(); + let msg = AgreementCommitted { + agreement_id: agreement.id.clone(), + signature: agreement + .committed_signature + .clone() + .ok_or(CommitAgreementError::NotSigned(id.clone()))?, }; net::from(requestor_id) .to(provider_id) .service(&provider::agreement_addr(BUS_ID)) .send(msg) - .map_err(|e| GsbAgreementError(e.to_string(), agreement_id)) + .map_err(|e| GsbAgreementError(e.to_string(), id)) .await??; Ok(()) } @@ -159,6 +189,7 @@ impl NegotiationApi { ) -> Result<(), GsbAgreementError> { let msg = AgreementCancelled { agreement_id: agreement_id.clone(), + reason: None, }; net::from(id) .to(owner) diff --git a/core/market/src/rest_api/error.rs b/core/market/src/rest_api/error.rs index dd71f42d2a..0c368553b8 100644 --- a/core/market/src/rest_api/error.rs +++ b/core/market/src/rest_api/error.rs @@ -193,6 +193,7 @@ impl ResponseError for AgreementError { let msg = ErrorMessage::new(self.to_string()); match self { AgreementError::NotFound(_) => HttpResponse::NotFound().json(msg), + AgreementError::Expired(_) => HttpResponse::Gone().json(msg), AgreementError::AlreadyExists(_, _) => HttpResponse::Conflict().json(msg), AgreementError::UpdateState(_, e) => e.error_response(), AgreementError::NoNegotiations(_) @@ -207,6 +208,7 @@ impl ResponseError for AgreementError { | AgreementError::ProtocolCreate(_) | AgreementError::ProtocolApprove(_) | AgreementError::ProtocolTerminate(_) + | AgreementError::ProtocolCommit(_) | AgreementError::Internal(_) => HttpResponse::InternalServerError().json(msg), } } @@ -219,6 +221,7 @@ impl ResponseError for AgreementDaoError { AgreementDaoError::InvalidTransition { from, .. } => match from { AgreementState::Proposal => HttpResponse::Conflict().json(msg), AgreementState::Pending + | AgreementState::Approving | AgreementState::Cancelled | AgreementState::Rejected | AgreementState::Expired diff --git a/core/market/src/rest_api/requestor.rs b/core/market/src/rest_api/requestor.rs index dec8ec0004..cc6465f3f1 100644 --- a/core/market/src/rest_api/requestor.rs +++ b/core/market/src/rest_api/requestor.rs @@ -1,9 +1,11 @@ use actix_web::web::{Data, Json, Path, Query}; use actix_web::{HttpResponse, Responder, Scope}; +use metrics::counter; use std::str::FromStr; use std::sync::Arc; use ya_client::model::market::{AgreementProposal, NewDemand, NewProposal, Reason}; +use ya_client::model::ErrorMessage; use ya_service_api_web::middleware::Identity; use ya_std_utils::LogErr; @@ -16,7 +18,6 @@ use super::{ }; use crate::negotiation::ApprovalStatus; use crate::rest_api::QueryAppSessionId; -use ya_client::model::ErrorMessage; pub fn register_endpoints(scope: Scope) -> Scope { scope diff --git a/core/market/src/testing/mock_node.rs b/core/market/src/testing/mock_node.rs index 8f143196ad..5213bec522 100644 --- a/core/market/src/testing/mock_node.rs +++ b/core/market/src/testing/mock_node.rs @@ -243,6 +243,7 @@ impl MarketsNetwork { prov_agreement_received: impl CallbackHandler, prov_agreement_cancelled: impl CallbackHandler, prov_agreement_terminated: impl CallbackHandler, + prov_agreement_committed: impl CallbackHandler, ) -> Self { self.add_negotiation_api( name, @@ -252,6 +253,7 @@ impl MarketsNetwork { prov_agreement_received, prov_agreement_cancelled, prov_agreement_terminated, + prov_agreement_committed, default::empty_on_proposal_received, default::empty_on_proposal_rejected, default::empty_on_agreement_approved, @@ -278,6 +280,7 @@ impl MarketsNetwork { default::empty_on_agreement_received, default::empty_on_agreement_cancelled, default::empty_on_agreement_terminated, + default::empty_on_agreement_committed, req_proposal_received, req_proposal_rejected, req_agreement_approved, @@ -296,6 +299,7 @@ impl MarketsNetwork { prov_agreement_received: impl CallbackHandler, prov_agreement_cancelled: impl CallbackHandler, prov_agreement_terminated: impl CallbackHandler, + prov_agreement_committed: impl CallbackHandler, req_proposal_received: impl CallbackHandler, req_proposal_rejected: impl CallbackHandler, req_agreement_approved: impl CallbackHandler, @@ -309,6 +313,7 @@ impl MarketsNetwork { prov_agreement_received, prov_agreement_cancelled, prov_agreement_terminated, + prov_agreement_committed, ); let requestor = requestor::NegotiationApi::new( @@ -588,8 +593,8 @@ impl MarketServiceExt for MarketService { pub mod default { use super::*; use crate::protocol::negotiation::error::{ - ApproveAgreementError, CounterProposalError, GsbAgreementError, ProposeAgreementError, - RejectProposalError, TerminateAgreementError, + ApproveAgreementError, CommitAgreementError, CounterProposalError, GsbAgreementError, + ProposeAgreementError, RejectProposalError, TerminateAgreementError, }; pub async fn empty_on_offers_retrieved( @@ -669,6 +674,13 @@ pub mod default { Ok(()) } + pub async fn empty_on_agreement_committed( + _caller: String, + _msg: AgreementCommitted, + ) -> Result<(), CommitAgreementError> { + Ok(()) + } + pub async fn empty_on_agreement_terminated( _caller: String, _msg: AgreementTerminated,