diff --git a/Cargo.lock b/Cargo.lock index 086614b94d..00b24a6bce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7667,6 +7667,7 @@ dependencies = [ "ya-compile-time-utils", "ya-core-model", "ya-file-logging", + "ya-std-utils", "ya-utils-actix", "ya-utils-path", "ya-utils-process", diff --git a/agent/provider/Cargo.toml b/agent/provider/Cargo.toml index 49e2804ba9..73d1710f06 100644 --- a/agent/provider/Cargo.toml +++ b/agent/provider/Cargo.toml @@ -25,6 +25,7 @@ ya-file-logging = "0.1" ya-utils-actix = "0.1" ya-utils-path = "0.1" ya-utils-process = { version = "0.1", features = ['lock'] } +ya-std-utils = "0.1" actix = { version = "0.10", default-features = false } actix-rt = "1.1.1" diff --git a/agent/provider/src/execution/task_runner.rs b/agent/provider/src/execution/task_runner.rs index eb1e6b75af..af16503a6f 100644 --- a/agent/provider/src/execution/task_runner.rs +++ b/agent/provider/src/execution/task_runner.rs @@ -27,7 +27,7 @@ use ya_utils_process::ExeUnitExitStatus; use super::exeunits_registry::{ExeUnitDesc, ExeUnitsRegistry}; use super::task::Task; -use crate::market::provider_market::AgreementApproved; +use crate::market::provider_market::NewAgreement; use crate::market::Preset; use crate::tasks::{AgreementBroken, AgreementClosed}; @@ -360,7 +360,7 @@ impl TaskRunner { #[logfn_inputs(Debug, fmt = "{}Got {:?} {:?}")] pub fn on_agreement_approved( &mut self, - msg: AgreementApproved, + msg: NewAgreement, _ctx: &mut Context, ) -> Result<()> { // Agreement waits for first create activity event. @@ -538,7 +538,7 @@ impl Actor for TaskRunner { type Context = Context; } -forward_actix_handler!(TaskRunner, AgreementApproved, on_agreement_approved); +forward_actix_handler!(TaskRunner, NewAgreement, on_agreement_approved); forward_actix_handler!(TaskRunner, ExeUnitProcessFinished, on_exeunit_exited); forward_actix_handler!(TaskRunner, GetExeUnit, get_exeunit); actix_signal_handler!(TaskRunner, ActivityCreated, activity_created); diff --git a/agent/provider/src/market/provider_market.rs b/agent/provider/src/market/provider_market.rs index 9ce6c79743..8f33be1afd 100644 --- a/agent/provider/src/market/provider_market.rs +++ b/agent/provider/src/market/provider_market.rs @@ -19,6 +19,7 @@ use ya_client_model::market::{ agreement_event::AgreementTerminator, Agreement, NewOffer, Proposal, ProviderEvent, Reason, }; use ya_client_model::NodeId; +use ya_std_utils::LogErr; use ya_utils_actix::{ actix_handler::ResultTypeGetter, actix_signal::{SignalSlot, Subscribe}, @@ -30,6 +31,7 @@ use super::negotiator::{AgreementResponse, AgreementResult, NegotiatorAddr, Prop use super::Preset; use crate::market::config::MarketConfig; use crate::market::termination_reason::GolemReason; +use crate::tasks::task_manager::ClosingCause; use crate::tasks::{AgreementBroken, AgreementClosed, CloseAgreement}; // =========================================== // @@ -62,7 +64,7 @@ pub enum OfferKind { /// and broadcasts same event to external world. #[derive(Clone, Debug, Message)] #[rtype(result = "Result<()>")] -pub struct AgreementApproved { +pub struct NewAgreement { pub agreement: AgreementView, } @@ -108,7 +110,7 @@ pub struct ProviderMarket { config: Arc, /// External actors can listen on this signal. - pub agreement_signed_signal: SignalSlot, + pub agreement_signed_signal: SignalSlot, pub agreement_terminated_signal: SignalSlot, /// Infinite tasks requiring to be killed on shutdown. @@ -134,7 +136,7 @@ impl ProviderMarket { negotiator: Arc::new(NegotiatorAddr::default()), config: Arc::new(config), subscriptions: HashMap::new(), - agreement_signed_signal: SignalSlot::::new(), + agreement_signed_signal: SignalSlot::::new(), agreement_terminated_signal: SignalSlot::::new(), handles: HashMap::new(), }; @@ -169,11 +171,7 @@ impl ProviderMarket { // Market internals - proposals and agreements reactions // =========================================== // - fn on_agreement_approved( - &mut self, - msg: AgreementApproved, - _ctx: &mut Context, - ) -> Result<()> { + fn on_agreement_approved(&mut self, msg: NewAgreement, _ctx: &mut Context) -> Result<()> { log::info!("Got approved agreement [{}].", msg.agreement.agreement_id,); // At this moment we only forward agreement to outside world. self.agreement_signed_signal.send_signal(msg) @@ -359,6 +357,16 @@ async fn process_agreement( match action { AgreementResponse::ApproveAgreement => { + // Prepare Provider for Agreement. We aren't sure here, that approval will + // succeed, but we are obligated to reserve all promised resources for Requestor, + // so after `approve_agreement` will return, we are ready to create activities. + ctx.market + .send(NewAgreement { + agreement: agreement.clone(), + }) + .await? + .ok(); + // TODO: We should retry approval, but only a few times, than we should // give up since it's better to take another agreement. let result = ctx @@ -386,11 +394,6 @@ async fn process_agreement( // We negotiated agreement and here responsibility of ProviderMarket ends. // Notify outside world about agreement for further processing. - let message = AgreementApproved { - agreement: agreement.clone(), - }; - - let _ = ctx.market.send(message).await?; } AgreementResponse::RejectAgreement { reason } => { ctx.api @@ -571,16 +574,13 @@ impl Handler for ProviderMarket { self.agreement_terminated_signal .send_signal(CloseAgreement { - is_terminated: true, + cause: ClosingCause::Termination, agreement_id: id.clone(), }) - .map_err(|e| { - log::error!( - "Failed to propagate termination info for agreement [{}]. {}", - id, - e - ) - }) + .log_err_msg(&format!( + "Failed to propagate termination info for agreement [{}]", + id + )) .ok(); Ok(()) } @@ -604,14 +604,10 @@ impl Handler for ProviderMarket { .negotiator .create_offer(&msg.offer_definition) .await - .map_err(|e| { - log::error!( - "Negotiator failed to create offer for preset [{}]. Error: {}", - msg.preset.name, - e - ); - e - })?; + .log_err_msg(&format!( + "Negotiator failed to create offer for preset [{}]", + msg.preset.name, + ))?; log::debug!( "Offer created: {}", @@ -623,14 +619,10 @@ impl Handler for ProviderMarket { let preset_name = msg.preset.name.clone(); subscribe(ctx.market, ctx.api, offer, msg.preset) .await - .map_err(|error| { - log::error!( - "Can't subscribe new offer for preset [{}], error: {}", - preset_name, - error - ); - error - })?; + .log_err_msg(&format!( + "Can't subscribe new offer for preset [{}]", + preset_name, + ))?; Ok(()) } .boxed_local() @@ -702,13 +694,13 @@ async fn resubscribe_offers( let preset = sub.preset; let preset_name = preset.name.clone(); - if let Err(e) = subscribe(market.clone(), api.clone(), offer, preset).await { - log::warn!( - "Unable to create subscription for preset {}: {}", + subscribe(market.clone(), api.clone(), offer, preset) + .await + .log_warn_msg(&format!( + "Unable to create subscription for preset {}", preset_name, - e - ); - } + )) + .ok(); } } @@ -720,18 +712,28 @@ impl Handler for ProviderMarket { let agreement_id = msg.id.clone(); let result = msg.result.clone(); + if let AgreementResult::ApprovalFailed = &msg.result { + self.agreement_terminated_signal + .send_signal(CloseAgreement { + cause: ClosingCause::ApprovalFail, + agreement_id: agreement_id.clone(), + }) + .log_err_msg(&format!( + "Failed to propagate ApprovalFailed info for agreement [{}]", + agreement_id + )) + .ok(); + } + let future = async move { - if let Err(error) = ctx - .negotiator + ctx.negotiator .agreement_finalized(&agreement_id, result) .await - { - log::warn!( - "Negotiator failed while handling agreement [{}] finalize. Error: {}", + .log_err_msg(&format!( + "Negotiator failed while handling agreement [{}] finalize", &agreement_id, - error, - ); - } + )) + .ok(); } .into_actor(self) .map(|_, myself, ctx| { @@ -818,9 +820,9 @@ impl Handler for ProviderMarket { } forward_actix_handler!(ProviderMarket, Subscription, on_subscription); -forward_actix_handler!(ProviderMarket, AgreementApproved, on_agreement_approved); +forward_actix_handler!(ProviderMarket, NewAgreement, on_agreement_approved); actix_signal_handler!(ProviderMarket, CloseAgreement, agreement_terminated_signal); -actix_signal_handler!(ProviderMarket, AgreementApproved, agreement_signed_signal); +actix_signal_handler!(ProviderMarket, NewAgreement, agreement_signed_signal); fn get_backoff() -> backoff::ExponentialBackoff { // TODO: We could have config for Market actor to be able to set at least initial interval. diff --git a/agent/provider/src/payments/payments.rs b/agent/provider/src/payments/payments.rs index 5b8b85060e..486a19ddcc 100644 --- a/agent/provider/src/payments/payments.rs +++ b/agent/provider/src/payments/payments.rs @@ -14,7 +14,7 @@ use super::agreement::{compute_cost, ActivityPayment, AgreementPayment, CostInfo use super::model::PaymentModel; use super::payment_checker::{DeadlineChecker, DeadlineElapsed, StopTracking, TrackDeadline}; use crate::execution::{ActivityCreated, ActivityDestroyed}; -use crate::market::provider_market::AgreementApproved; +use crate::market::provider_market::NewAgreement; use crate::market::termination_reason::BreakReason; use crate::tasks::{AgreementBroken, AgreementClosed, BreakAgreement}; @@ -165,7 +165,7 @@ impl Payments { pub fn on_signed_agreement( &mut self, - msg: AgreementApproved, + msg: NewAgreement, _ctx: &mut Context, ) -> Result<()> { log::info!( @@ -383,7 +383,7 @@ async fn compute_cost_and_send_debit_note( Ok((debit_note, cost_info)) } -forward_actix_handler!(Payments, AgreementApproved, on_signed_agreement); +forward_actix_handler!(Payments, NewAgreement, on_signed_agreement); impl Handler for Payments { type Result = anyhow::Result<()>; diff --git a/agent/provider/src/tasks/task_manager.rs b/agent/provider/src/tasks/task_manager.rs index a2fc27a985..a1990da12d 100644 --- a/agent/provider/src/tasks/task_manager.rs +++ b/agent/provider/src/tasks/task_manager.rs @@ -11,7 +11,7 @@ use ya_utils_actix::forward_actix_handler; use super::task_info::TaskInfo; use super::task_state::{AgreementState, TasksStates}; use crate::execution::{ActivityCreated, ActivityDestroyed, TaskRunner}; -use crate::market::provider_market::{AgreementApproved, ProviderMarket}; +use crate::market::provider_market::{NewAgreement, ProviderMarket}; use crate::market::termination_reason::BreakReason; use crate::payments::Payments; @@ -36,12 +36,19 @@ pub struct BreakAgreement { pub reason: BreakReason, } +#[derive(Clone, PartialEq)] +pub enum ClosingCause { + ApprovalFail, + Termination, + SingleActivity, +} + /// Notifies TaskManager that Requestor close agreement. #[derive(Message, Clone)] #[rtype(result = "Result<()>")] pub struct CloseAgreement { pub agreement_id: String, - pub is_terminated: bool, + pub cause: ClosingCause, } // =========================================== // @@ -200,7 +207,7 @@ impl TaskManager { } } - fn add_new_agreement(&mut self, msg: &AgreementApproved) -> anyhow::Result { + fn add_new_agreement(&mut self, msg: &NewAgreement) -> anyhow::Result { let agreement_id = msg.agreement.agreement_id.clone(); self.tasks.new_agreement(&agreement_id)?; @@ -235,7 +242,7 @@ impl Handler for TaskManager { let future = async move { // Listen to AgreementApproved event. - let msg = Subscribe::(actx.myself.clone().recipient()); + let msg = Subscribe::(actx.myself.clone().recipient()); actx.market.send(msg).await?; // Listen to Agreement terminated event from market. @@ -263,10 +270,10 @@ impl Handler for TaskManager { // Messages modifying agreement state // =========================================== // -impl Handler for TaskManager { +impl Handler for TaskManager { type Result = ActorResponse; - fn handle(&mut self, msg: AgreementApproved, ctx: &mut Context) -> Self::Result { + fn handle(&mut self, msg: NewAgreement, ctx: &mut Context) -> Self::Result { // Add new agreement with it's state. let task_info = match self.add_new_agreement(&msg) { Err(error) => { @@ -398,7 +405,7 @@ impl Handler for TaskManager { actx.myself.do_send(CloseAgreement { agreement_id: agreement_id.to_string(), - is_terminated: false, + cause: ClosingCause::SingleActivity, }); } Ok(()) @@ -462,14 +469,18 @@ impl Handler for TaskManager { let future = async move { start_transition(&actx.myself, &msg.agreement_id, AgreementState::Closed).await?; - let msg = AgreementClosed { + let closed_msg = AgreementClosed { agreement_id: msg.agreement_id.clone(), - send_terminate: !msg.is_terminated, + send_terminate: msg.cause == ClosingCause::Termination, }; - actx.runner.do_send(msg.clone()); - actx.market.do_send(msg.clone()); - actx.payments.send(msg.clone()).await??; + // No need to notify market. + if msg.cause != ClosingCause::ApprovalFail { + actx.market.do_send(closed_msg.clone()); + } + + actx.runner.do_send(closed_msg.clone()); + actx.payments.send(closed_msg.clone()).await??; finish_transition(&actx.myself, &msg.agreement_id, AgreementState::Closed).await?; diff --git a/core/market/migrations/2020-10-04-190200_agreement_events/up.sql b/core/market/migrations/2020-10-04-190200_agreement_events/up.sql index 2218b6d81a..a4ee367ba7 100644 --- a/core/market/migrations/2020-10-04-190200_agreement_events/up.sql +++ b/core/market/migrations/2020-10-04-190200_agreement_events/up.sql @@ -48,7 +48,7 @@ CREATE TABLE market_agreement( approved_signature TEXT, committed_signature TEXT, - CHECK (state in ('Proposal','Pending','Cancelled','Rejected','Approved','Expired','Terminated')) + CHECK (state in ('Proposal','Pending','Cancelled','Rejected','Approved','Expired','Terminated', 'Approving')) ); -- Change Proposal state from enum to Text value for better database introspection. diff --git a/core/market/src/db/dao/agreement.rs b/core/market/src/db/dao/agreement.rs index 1d619a52f1..faa2c03f76 100644 --- a/core/market/src/db/dao/agreement.rs +++ b/core/market/src/db/dao/agreement.rs @@ -162,38 +162,48 @@ impl<'c> AgreementDao<'c> { &self, id: &AgreementId, session: &AppSessionId, - ) -> Result<(), AgreementDaoError> { + signature: &String, + ) -> Result { let id = id.clone(); let session = session.clone(); + let signature = signature.clone(); do_with_transaction(self.pool, move |conn| { let mut agreement: Agreement = market_agreement.filter(agreement::id.eq(&id)).first(conn)?; update_state(conn, &mut agreement, AgreementState::Pending)?; + update_proposed_signature(conn, &mut agreement, signature)?; if let Some(session) = session { update_session(conn, &mut agreement, session)?; } - Ok(()) + Ok(agreement) }) .await } /// Function won't change appSessionId, if session parameter is None. - pub async fn approve( + /// Signature will be placed in `approved_signature` field. + pub async fn approving( &self, id: &AgreementId, session: &AppSessionId, - ) -> Result<(), AgreementDaoError> { + signature: &String, + timestamp: &NaiveDateTime, + ) -> Result { let id = id.clone(); let session = session.clone(); + let signature = signature.clone(); + let timestamp = timestamp.clone(); do_with_transaction(self.pool, move |conn| { let mut agreement: Agreement = market_agreement.filter(agreement::id.eq(&id)).first(conn)?; - update_state(conn, &mut agreement, AgreementState::Approved)?; + update_state(conn, &mut agreement, AgreementState::Approving)?; + update_approved_signature(conn, &mut agreement, signature)?; + update_approve_timestamp(conn, &mut agreement, timestamp)?; // It's important, that if None AppSessionId comes, we shouldn't update Agreement // appSessionId field to None. This function can be called in different context, for example @@ -201,10 +211,31 @@ impl<'c> AgreementDao<'c> { if let Some(session) = session { update_session(conn, &mut agreement, session)?; } + Ok(agreement) + }) + .await + } + + /// Signature will be placed in `committed_signature` field. + pub async fn approve( + &self, + id: &AgreementId, + signature: &String, + ) -> Result { + let id = id.clone(); + let signature = signature.clone(); + + do_with_transaction(self.pool, move |conn| { + let mut agreement: Agreement = + market_agreement.filter(agreement::id.eq(&id)).first(conn)?; + + update_state(conn, &mut agreement, AgreementState::Approved)?; + update_committed_signature(conn, &mut agreement, signature)?; + // Always Provider approves. create_event(conn, &agreement, None, Owner::Provider)?; - Ok(()) + Ok(agreement) }) .await } @@ -229,6 +260,29 @@ impl<'c> AgreementDao<'c> { .await } + pub async fn revert_approving(&self, id: &AgreementId) -> Result { + let id = id.clone(); + + do_with_transaction(self.pool, move |conn| { + let agreement: Agreement = + market_agreement.filter(agreement::id.eq(&id)).first(conn)?; + + if agreement.state != AgreementState::Approving { + return Err(AgreementDaoError::InvalidTransition { + from: agreement.state, + to: AgreementState::Pending, + }); + } + + let num_updated = diesel::update(market_agreement.find(&id)) + .set(agreement::state.eq(&AgreementState::Pending)) + .execute(conn) + .map_err(|e| AgreementDaoError::DbError(e.into()))?; + Ok(num_updated > 0) + }) + .await + } + pub async fn clean(&self) -> DbResult<()> { // FIXME use grace time from config file when #460 is merged log::trace!("Clean market agreements: start"); @@ -296,6 +350,65 @@ fn update_state( Ok(num_updated > 0) } +fn update_proposed_signature( + conn: &ConnType, + agreement: &mut Agreement, + signature: String, +) -> Result { + let signature = Some(signature); + let num_updated = diesel::update(market_agreement.find(&agreement.id)) + .set(agreement::proposed_signature.eq(&signature)) + .execute(conn) + .map_err(|e| AgreementDaoError::DbError(e.into()))?; + + agreement.proposed_signature = signature; + Ok(num_updated > 0) +} + +fn update_approved_signature( + conn: &ConnType, + agreement: &mut Agreement, + signature: String, +) -> Result { + let signature = Some(signature); + let num_updated = diesel::update(market_agreement.find(&agreement.id)) + .set(agreement::approved_signature.eq(&signature)) + .execute(conn) + .map_err(|e| AgreementDaoError::DbError(e.into()))?; + + agreement.approved_signature = signature; + Ok(num_updated > 0) +} + +fn update_committed_signature( + conn: &ConnType, + agreement: &mut Agreement, + signature: String, +) -> Result { + let signature = Some(signature); + let num_updated = diesel::update(market_agreement.find(&agreement.id)) + .set(agreement::committed_signature.eq(&signature)) + .execute(conn) + .map_err(|e| AgreementDaoError::DbError(e.into()))?; + + agreement.committed_signature = signature; + Ok(num_updated > 0) +} + +fn update_approve_timestamp( + conn: &ConnType, + agreement: &mut Agreement, + timestamp: NaiveDateTime, +) -> Result { + let num_updated = diesel::update(market_agreement.find(&agreement.id)) + .set(agreement::approved_ts.eq(×tamp)) + .execute(conn) + .map_err(|e| AgreementDaoError::DbError(e.into()))?; + + agreement.approved_ts = Some(timestamp); + Ok(num_updated > 0) +} + fn update_session( conn: &ConnType, agreement: &mut Agreement, diff --git a/core/market/src/db/model/agreement.rs b/core/market/src/db/model/agreement.rs index e405f6249e..a0c20ee306 100644 --- a/core/market/src/db/model/agreement.rs +++ b/core/market/src/db/model/agreement.rs @@ -38,6 +38,10 @@ pub enum AgreementState { Proposal, /// Confirmed by a Requestor and sent to Provider for approval Pending, + /// Additional internal state mapped to `Pending` in client structures. + /// This state will appear after Provider will call `approve_agreement`, + /// but before Requestor will send back `AgreementCommitted`. + Approving, /// Cancelled by a Requestor Cancelled, /// Rejected by a Provider @@ -180,6 +184,7 @@ impl From for ClientAgreementState { match agreement_state { AgreementState::Proposal => ClientAgreementState::Proposal, AgreementState::Pending => ClientAgreementState::Pending, + AgreementState::Approving => ClientAgreementState::Pending, AgreementState::Cancelled => ClientAgreementState::Cancelled, AgreementState::Rejected => ClientAgreementState::Rejected, AgreementState::Approved => ClientAgreementState::Approved, @@ -199,8 +204,15 @@ pub fn check_transition(from: AgreementState, to: AgreementState) -> Result<(), _ => (), }, AgreementState::Pending => match to { + AgreementState::Approving => return Ok(()), AgreementState::Cancelled => return Ok(()), AgreementState::Rejected => return Ok(()), + AgreementState::Expired => return Ok(()), + _ => (), + }, + AgreementState::Approving => match to { + // Reverse transition from `Approving` to `Pending` is forbidden on purpose. It is handled solely in `revert_approving()` + AgreementState::Cancelled => return Ok(()), AgreementState::Approved => return Ok(()), AgreementState::Expired => return Ok(()), _ => (), diff --git a/core/market/src/db/model/agreement_events.rs b/core/market/src/db/model/agreement_events.rs index 5326d525d3..f88641595f 100644 --- a/core/market/src/db/model/agreement_events.rs +++ b/core/market/src/db/model/agreement_events.rs @@ -64,7 +64,10 @@ impl NewAgreementEvent { Ok(Self { agreement_id: agreement.id.clone(), event_type: match agreement.state { - AgreementState::Pending | AgreementState::Proposal | AgreementState::Expired => { + AgreementState::Pending + | AgreementState::Proposal + | AgreementState::Expired + | AgreementState::Approving => { let msg = format!("Wrong [{}] state {}", agreement.id, agreement.state); log::error!("{}", msg); return Err(EventFromAgreementError(msg)); diff --git a/core/market/src/negotiation/common.rs b/core/market/src/negotiation/common.rs index 3806ece7af..dd705a7a63 100644 --- a/core/market/src/negotiation/common.rs +++ b/core/market/src/negotiation/common.rs @@ -361,7 +361,13 @@ impl CommonBroker { validate_transition(&agreement, AgreementState::Terminated)?; - protocol_common::propagate_terminate_agreement(&agreement, reason.clone()).await?; + protocol_common::propagate_terminate_agreement( + &agreement, + reason.clone(), + "NotSigned".to_string(), + Utc::now().naive_utc(), + ) + .await?; let reason_string = CommonBroker::reason2string(&reason); dao.terminate(&agreement.id, reason_string, agreement.id.owner()) diff --git a/core/market/src/negotiation/error.rs b/core/market/src/negotiation/error.rs index 1d4750ac37..d3d28d7483 100644 --- a/core/market/src/negotiation/error.rs +++ b/core/market/src/negotiation/error.rs @@ -15,8 +15,9 @@ use crate::db::{ }; use crate::matcher::error::{DemandError, QueryOfferError}; use crate::protocol::negotiation::error::{ - ApproveAgreementError, CounterProposalError as ProtocolProposalError, GsbAgreementError, - NegotiationApiInitError, ProposeAgreementError, RejectProposalError, TerminateAgreementError, + ApproveAgreementError, CommitAgreementError, CounterProposalError as ProtocolProposalError, + GsbAgreementError, NegotiationApiInitError, ProposeAgreementError, RejectProposalError, + TerminateAgreementError, }; #[derive(Error, Debug)] @@ -46,6 +47,8 @@ pub enum MatchValidationError { pub enum AgreementError { #[error("Agreement [{0}] not found.")] NotFound(String), + #[error("Agreement [{0}] expired.")] + Expired(AgreementId), #[error("Can't create Agreement for Proposal {0}. Proposal {1} not found.")] ProposalNotFound(ProposalId, ProposalId), #[error("Can't create second Agreement [{0}] for Proposal [{1}].")] @@ -74,6 +77,8 @@ pub enum AgreementError { ProtocolApprove(#[from] ApproveAgreementError), #[error("Protocol error while terminating: {0}")] ProtocolTerminate(#[from] TerminateAgreementError), + #[error("Protocol error while committing: {0}")] + ProtocolCommit(#[from] CommitAgreementError), #[error("Internal error: {0}")] Internal(String), } diff --git a/core/market/src/negotiation/notifier.rs b/core/market/src/negotiation/notifier.rs index 0fa31fe241..7ea4aab135 100644 --- a/core/market/src/negotiation/notifier.rs +++ b/core/market/src/negotiation/notifier.rs @@ -1,5 +1,5 @@ use std::fmt::Debug; -use std::time::Duration; +use std::time::{Duration, Instant}; use thiserror::Error; use tokio::sync::broadcast::{channel, Receiver, Sender}; @@ -11,11 +11,11 @@ where Type: Debug + PartialEq + Clone + EnableDisplay + 'static, for<'a> DisplayEnabler<'a, Type>: std::fmt::Display, { - #[error("Timeout while waiting for events for subscription [{}]", .0.display())] + #[error("Timeout while waiting for events for id [{}]", .0.display())] Timeout(Type), #[error("Unsubscribed [{}]", .0.display())] Unsubscribed(Type), - #[error("Channel closed while waiting for events for subscription [{}]", .0.display())] + #[error("Channel closed while waiting for events for id [{}]", .0.display())] ChannelClosed(Type), } @@ -115,4 +115,18 @@ where .await .map_err(|_| NotifierError::Timeout(self.subscription_id.clone()))? } + + pub async fn wait_for_event_until( + &mut self, + timeout: Instant, + ) -> Result<(), NotifierError> { + let now = Instant::now(); + let timeout = if timeout > now { + timeout - Instant::now() + } else { + Duration::from_millis(0) + }; + + self.wait_for_event_with_timeout(timeout).await + } } diff --git a/core/market/src/negotiation/provider.rs b/core/market/src/negotiation/provider.rs index e0fca80001..d1870c5df9 100644 --- a/core/market/src/negotiation/provider.rs +++ b/core/market/src/negotiation/provider.rs @@ -4,8 +4,10 @@ use metrics::counter; use std::sync::Arc; use ya_client::model::market::{event::ProviderEvent, NewProposal, Reason}; +use ya_core_model::NodeId; use ya_persistence::executor::DbExecutor; use ya_service_api_web::middleware::Identity; +use ya_std_utils::LogErr; use crate::db::{ dao::{AgreementDao, NegotiationEventsDao, ProposalDao, SaveAgreementError}, @@ -19,8 +21,11 @@ use super::common::CommonBroker; use super::error::*; use super::notifier::EventNotifier; use crate::config::Config; +use crate::db::dao::AgreementDaoError; use crate::negotiation::common::validate_transition; +use crate::negotiation::notifier::NotifierError; use crate::utils::display::EnableDisplay; +use std::time::Instant; #[derive(Clone, Debug, Eq, PartialEq, derive_more::Display)] pub enum ApprovalResult { @@ -51,6 +56,7 @@ impl ProviderBroker { let broker3 = broker.clone(); let broker_proposal_reject = broker.clone(); let broker_terminated = broker.clone(); + let commit_broker = broker.clone(); let api = NegotiationApi::new( move |caller: String, msg: InitialProposalReceived| { @@ -75,6 +81,9 @@ impl ProviderBroker { .clone() .on_agreement_terminated(msg, caller, Owner::Requestor) }, + move |caller: String, msg: AgreementCommitted| { + on_agreement_committed(commit_broker.clone(), caller, msg) + }, ); // Initialize counters to 0 value. Otherwise they won't appear on metrics endpoint @@ -84,6 +93,8 @@ impl ProviderBroker { counter!("market.agreements.provider.terminated", 0); counter!("market.agreements.provider.terminated.reason", 0, "reason" => "NotSpecified"); counter!("market.agreements.provider.terminated.reason", 0, "reason" => "Success"); + counter!("market.agreements.provider.approving", 0); + counter!("market.agreements.provider.committing", 0); counter!("market.events.provider.queried", 0); counter!("market.proposals.provider.countered", 0); counter!("market.proposals.provider.init-negotiation", 0); @@ -210,50 +221,207 @@ impl ProviderBroker { app_session_id: AppSessionId, timeout: f32, ) -> Result { + let stop_time = Instant::now() + std::time::Duration::from_secs_f64(timeout as f64); let dao = self.common.db.as_dao::(); + let agreement = { let _hold = self.common.agreement_lock.lock(&agreement_id).await; - let agreement = match dao - .select(agreement_id, None, Utc::now().naive_utc()) + let agreement = dao + .select(agreement_id, Some(id.identity), Utc::now().naive_utc()) .await .map_err(|e| AgreementError::Get(agreement_id.to_string(), e))? - { - None => Err(AgreementError::NotFound(agreement_id.to_string()))?, - Some(agreement) => agreement, - }; + .ok_or(AgreementError::NotFound(agreement_id.to_string()))?; - validate_transition(&agreement, AgreementState::Approved)?; + if agreement.state == AgreementState::Cancelled { + return Ok(ApprovalResult::Cancelled); + } - // `db.update_state` must be invoked after successful approve_agreement - // TODO: if dao.approve fails, Provider and Requestor have inconsistent state. - self.api.approve_agreement(&agreement, timeout).await?; - dao.approve(agreement_id, &app_session_id) - .await - .map_err(|e| AgreementError::UpdateState(agreement_id.clone(), e))?; + validate_transition(&agreement, AgreementState::Approving)?; + + // TODO: Sign Agreement. + let signature = "NoSignature".to_string(); + let timestamp = Utc::now().naive_utc(); + let agreement = dao + .approving(&agreement.id, &app_session_id, &signature, ×tamp) + .await + .map_err(|e| AgreementError::UpdateState(agreement.id.clone(), e))?; + + counter!("market.agreements.provider.approving", 1); + if let Some(session) = app_session_id { + log::info!( + "AppSession id [{}] set for Agreement [{}].", + &session, + &agreement.id + ); + } agreement }; - self.common.notify_agreement(&agreement).await; + // Listen to Agreements notification before we start sending message, because otherwise + // we can lose events. + let mut notifier = self.common.agreement_notifier.listen(&agreement.id); + + // It doesn't have to be under lock, since we have `Approving` state. + // Note that this state change between `Approving` and `Pending` in both + // ways is invisible for REST and GSB user, because `Approving` is only our + // internal state and is mapped to `Pending`. + // + // Note: There's reason, that it CAN'T be done under lock. If we hold lock whole time + // Requestor won't be able to cancel his Agreement proposal and he is allowed to do it. + match self.api.approve_agreement(&agreement, timeout).await { + Ok(_) => (), + // It can turn out, that we are in `Cancelled` state since we weren't under + // lock during `self.api.approve_agreement` execution. In such a case, + // we shouldn't return error from here. + Err(ApproveAgreementError::Remote(RemoteAgreementError::InvalidState( + _, + AgreementState::Cancelled, + ))) => return Ok(ApprovalResult::Cancelled), + Err(e) => { + let _hold = self.common.agreement_lock.lock(&agreement_id).await; + + log::warn!( + "Failed to send Approve Agreement [{}] to Requestor. {}. Reverting state to `Pending`.", + agreement_id, + e, + ); + dao.revert_approving(agreement_id).await.log_err().ok(); + Err(e)? + } + } - counter!("market.agreements.provider.approved", 1); + counter!("market.agreements.provider.committing", 1); log::info!( - "Provider {} approved Agreement [{}].", + "Provider {} approved Agreement [{}]. Waiting for commit from Requestor [{}].", id.display(), - &agreement_id, + &agreement.id, + agreement.requestor_id ); - if let Some(session) = app_session_id { - log::info!( - "AppSession id [{}] set for Agreement [{}].", - &session, - &agreement_id - ); + + // Here we must wait until `AgreementCommitted` message, since `approve_agreement` + // is supposed to return after approval. + match notifier.wait_for_event_until(stop_time).await { + Err(NotifierError::Timeout(_)) => { + let _hold = self.common.agreement_lock.lock(&agreement_id).await; + dao.revert_approving(agreement_id).await.log_err().ok(); + + Err(ApproveAgreementError::Timeout(agreement.id.clone()).into()) + } + Err(error) => Err(AgreementError::Internal(format!( + "Code logic error. Agreement events notifier shouldn't return: {}.", + error + ))), + Ok(_) => Ok(()), + } + .log_err()?; + + { + let _hold = self.common.agreement_lock.lock(&agreement_id).await; + + let agreement = dao + .select(agreement_id, None, Utc::now().naive_utc()) + .await + .map_err(|e| AgreementError::Get(agreement_id.to_string(), e))? + .ok_or(AgreementError::Internal(format!( + "Agreement [{}], which existed previously, disappeared.", + agreement_id + )))?; + + match agreement.state { + AgreementState::Cancelled => Ok(ApprovalResult::Cancelled), + AgreementState::Approved => Ok(ApprovalResult::Approved), + AgreementState::Expired => Err(AgreementError::Expired(agreement.id.clone()))?, + _ => Err(AgreementError::Internal(format!( + "Agreement [{}] has unexpected state [{}]", + agreement.id, agreement.state + ))), + } } - return Ok(ApprovalResult::Approved); } } +async fn on_agreement_committed( + broker: CommonBroker, + caller: String, + msg: AgreementCommitted, +) -> Result<(), CommitAgreementError> { + let agreement_id = msg.agreement_id.clone(); + let caller_id = CommonBroker::parse_caller(&caller)?; + agreement_committed(broker, caller_id, msg) + .await + .map_err(|e| CommitAgreementError::Remote(e, agreement_id)) +} + +async fn agreement_committed( + broker: CommonBroker, + caller: NodeId, + msg: AgreementCommitted, +) -> Result<(), RemoteCommitAgreementError> { + let dao = broker.db.as_dao::(); + let agreement = { + let _hold = broker.agreement_lock.lock(&msg.agreement_id).await; + + // Note: we still validate caller here, because we can't be sure, that we were called + // by the same Requestor. + let agreement = dao + .select(&msg.agreement_id, None, Utc::now().naive_utc()) + .await + .map_err(|e| RemoteCommitAgreementError::Unexpected { + public_msg: "Internal Error getting Agreement".to_string(), + original_msg: e.to_string(), + }) + .log_err()? + .ok_or(RemoteCommitAgreementError::NotFound)?; + + if agreement.requestor_id != caller { + // Don't reveal, that we know this Agreement id. + Err(RemoteCommitAgreementError::NotFound)? + } + + // Note: We can find out here, that our Agreement is already in `Cancelled` state, because + // Requestor is allowed to call `cancel_agreement` at any time, before we commit Agreement. + // In this case we should return here and `cancel_agreement` handler is responsible for + // calling `notify_agreement`. + match validate_transition(&agreement, AgreementState::Approved) { + Ok(_) => Ok(()), + Err(AgreementError::UpdateState( + _, + AgreementDaoError::InvalidTransition { from, .. }, + )) => match from { + AgreementState::Cancelled => Err(RemoteCommitAgreementError::Cancelled), + _ => Err(RemoteCommitAgreementError::InvalidState(from)), + }, + Err(e) => Err(RemoteCommitAgreementError::Unexpected { + public_msg: "Failed to validate Agreement state".to_string(), + original_msg: e.to_string(), + }), + } + .log_err()?; + + // TODO: Validate committed signature from message. + + dao.approve(&msg.agreement_id, &msg.signature) + .await + .map_err(|e| RemoteCommitAgreementError::Unexpected { + public_msg: "Failed to update Agreement state to `Approved`.".to_string(), + original_msg: e.to_string(), + }) + .log_err()? + }; + + broker.notify_agreement(&agreement).await; + + counter!("market.agreements.provider.approved", 1); + log::info!( + "Agreement [{}] approved (committed) by [{}].", + &agreement.id, + &caller + ); + Ok(()) +} + // TODO: We need more elegant solution than this. This function still returns // CounterProposalError, which should be hidden in negotiation API and implementations // of handlers should return RemoteProposalError. @@ -354,6 +522,9 @@ async fn agreement_received( Owner::Provider, ); agreement.state = AgreementState::Pending; + agreement.proposed_signature = Some(msg.signature); + + // TODO: Validate signature. // Check if we generated the same id, as Requestor sent us. If not, reject // it, because wrong generated ids could be not unique. diff --git a/core/market/src/negotiation/requestor.rs b/core/market/src/negotiation/requestor.rs index ab3b3a7ca5..f7dc5fb5e2 100644 --- a/core/market/src/negotiation/requestor.rs +++ b/core/market/src/negotiation/requestor.rs @@ -9,6 +9,7 @@ use ya_client::model::market::{event::RequestorEvent, NewProposal, Reason}; use ya_client::model::{node_id::ParseError, NodeId}; use ya_persistence::executor::DbExecutor; use ya_service_api_web::middleware::Identity; +use ya_std_utils::LogErr; use crate::db::{ dao::{AgreementDao, AgreementDaoError, SaveAgreementError}, @@ -67,7 +68,10 @@ impl RequestorBroker { move |caller: String, msg: AgreementApproved| { on_agreement_approved(broker2.clone(), caller, msg) }, - move |_caller: String, _msg: AgreementRejected| async move { unimplemented!() }, + move |_caller: String, _msg: AgreementRejected| async move { + counter!("market.agreements.requestor.rejected", 1); + unimplemented!() + }, move |caller: String, msg: AgreementTerminated| { broker_terminated .clone() @@ -91,6 +95,7 @@ impl RequestorBroker { counter!("market.agreements.requestor.terminated", 0); counter!("market.agreements.requestor.terminated.reason", 0, "reason" => "NotSpecified"); counter!("market.agreements.requestor.terminated.reason", 0, "reason" => "Success"); + counter!("market.agreements.requestor.committing", 0); counter!("market.events.requestor.queried", 0); counter!("market.proposals.requestor.countered", 0); counter!("market.proposals.requestor.generated", 0); @@ -311,15 +316,12 @@ impl RequestorBroker { match agreement.state { AgreementState::Approved => { - counter!("market.agreements.requestor.approved", 1); return Ok(ApprovalStatus::Approved); } AgreementState::Rejected => { - counter!("market.agreements.requestor.rejected", 1); return Ok(ApprovalStatus::Rejected); } AgreementState::Cancelled => { - counter!("market.agreements.requestor.cancelled", 1); return Ok(ApprovalStatus::Cancelled); } AgreementState::Expired => return Err(WaitForApprovalError::Expired(id.clone())), @@ -329,7 +331,7 @@ impl RequestorBroker { AgreementState::Terminated => { return Err(WaitForApprovalError::Terminated(id.clone())) } - AgreementState::Pending => (), // Still waiting for approval. + AgreementState::Pending | AgreementState::Approving => (), // Still waiting for approval. }; if let Err(error) = notifier.wait_for_event_with_timeout(timeout).await { @@ -359,7 +361,7 @@ impl RequestorBroker { // Provider approving Agreement before we set proper state in database. let _hold = self.common.agreement_lock.lock(&agreement_id).await; - let agreement = match dao + let mut agreement = match dao .select( agreement_id, Some(id.identity.clone()), @@ -374,11 +376,12 @@ impl RequestorBroker { validate_transition(&agreement, AgreementState::Pending)?; - // TODO : possible race condition here ISSUE#430 - // 1. this state check should be also `db.update_state` - // 2. `dao.confirm` must be invoked after successful propose_agreement + // TODO: Sign Agreement. + let signature = "NoSignature".to_string(); + agreement.proposed_signature = Some(signature.clone()); + self.api.propose_agreement(&agreement).await?; - dao.confirm(agreement_id, &app_session_id) + dao.confirm(agreement_id, &app_session_id, &signature) .await .map_err(|e| AgreementError::UpdateState(agreement_id.clone(), e))?; } @@ -423,14 +426,15 @@ async fn agreement_approved( caller: NodeId, msg: AgreementApproved, ) -> Result<(), RemoteAgreementError> { + // TODO: We should check here as many condition, as possible, because we want + // to return meaningful message to Provider, what is impossible from `commit_agreement`. + let dao = broker.db.as_dao::(); let agreement = { // We aren't sure, if `confirm_agreement` execution is finished, // so we must lock, to avoid attempt to change database state before. let _hold = broker.agreement_lock.lock(&msg.agreement_id).await; - let agreement = broker - .db - .as_dao::() + let agreement = dao .select(&msg.agreement_id, None, Utc::now().naive_utc()) .await .map_err(|_e| RemoteAgreementError::NotFound(msg.agreement_id.clone()))? @@ -441,13 +445,24 @@ async fn agreement_approved( Err(RemoteAgreementError::NotFound(msg.agreement_id.clone()))? } + validate_transition(&agreement, AgreementState::Approving).map_err(|_| { + RemoteAgreementError::InvalidState(agreement.id.clone(), agreement.state.clone()) + })?; + + // Validate Agreement `valid_to` timestamp. In previous version we got + // error from database update, but know we want to escape early, because + // otherwise we can't response with this error to Provider. + let margin = chrono::Duration::milliseconds(30); + if agreement.valid_to <= (Utc::now() + margin).naive_utc() { + return Err(RemoteAgreementError::Expired(agreement.id.clone())); + } + // TODO: Validate agreement signature. + let signature = "NoSignature".to_string(); + // Note: session must be None, because either we already set this value in ConfirmAgreement, // or we purposely left it None. - broker - .db - .as_dao::() - .approve(&msg.agreement_id, &None) + dao.approving(&agreement.id, &None, &signature, &msg.approved_ts) .await .map_err(|err| match err { AgreementDaoError::InvalidTransition { from, .. } => { @@ -455,29 +470,106 @@ async fn agreement_approved( // Expired Agreement could be InvalidState either, but we want to explicit // say to provider, that Agreement has expired. AgreementState::Expired => { - RemoteAgreementError::Expired(msg.agreement_id.clone()) + RemoteAgreementError::Expired(agreement.id.clone()) } - _ => RemoteAgreementError::InvalidState(msg.agreement_id.clone(), from), + _ => RemoteAgreementError::InvalidState(agreement.id.clone(), from), } } e => { // Log our internal error, but don't reveal error message to Provider. log::warn!( "Approve Agreement [{}] internal error: {}", - &msg.agreement_id, + &agreement.id, e ); - RemoteAgreementError::InternalError(msg.agreement_id.clone()) + RemoteAgreementError::InternalError(agreement.id.clone()) } - })?; - agreement + })? }; - broker.notify_agreement(&agreement).await; - log::info!("Agreement [{}] approved by [{}].", &agreement.id, &caller); + counter!("market.agreements.requestor.committing", 1); + + // Commit Agreement. We must spawn committing later, because we need to + // return from this function to provider. + tokio::task::spawn_local(commit_agreement(broker, agreement.id.clone())); + log::info!( + "Agreement [{}] approved by [{}]. Committing...", + &agreement.id, + &agreement.provider_id + ); Ok(()) } +async fn commit_agreement(broker: CommonBroker, agreement_id: AgreementId) { + // Note: in this scenario, we update database after Provider already + // got `AgreementCommitted` and updated Agreement state to `Approved`, so we will + // wake up `wait_for_agreement` after Provider. + let dao = broker.db.as_dao::(); + let agreement = match async { + let _hold = broker.agreement_lock.lock(&agreement_id).await; + + let dao = broker.db.as_dao::(); + let mut agreement = dao + .select(&agreement_id, None, Utc::now().naive_utc()) + .await + .map_err(|_e| AgreementError::NotFound(agreement_id.to_string()))? + .ok_or(AgreementError::NotFound(agreement_id.to_string()))?; + + // TODO: Sign Agreement. + let signature = "NoSignature".to_string(); + agreement.committed_signature = Some(signature.clone()); + + // Note: This GSB call is racing with potential `cancel_agreement` call. + // In this case Provider code will decide, which call won the race. + NegotiationApi::commit_agreement(&agreement).await?; + + // We approve Agreement in database, when we are sure, that committing succeeded. + Ok(dao + .approve(&agreement.id, &signature) + .await + .map_err(|e| AgreementError::UpdateState(agreement.id.clone(), e))?) + } + .await + { + Ok(agreement) => agreement, + // Return to `Pending` state here unless we are in `Cancelled` state. + Err(AgreementError::ProtocolCommit(CommitAgreementError::Remote( + RemoteCommitAgreementError::Cancelled, + _, + ))) => { + return log::info!( + "Can't commit Agreement [{}] since it was canceled already.", + agreement_id + ); + } + Err(e) => { + log::warn!( + "Failed to commit Agreement [{}]. {}. Reverting state to `Pending`.", + agreement_id, + e, + ); + + dao.revert_approving(&agreement_id) + .await + .log_err_msg(&format!( + "Failed revert state to `Pending` for Agreement [{}]", + agreement_id + )) + .ok(); + return; + } + }; + + broker.notify_agreement(&agreement).await; + + counter!("market.agreements.requestor.approved", 1); + log::info!( + "Agreement [{}] committed (approved) by [{}].", + &agreement.id, + &agreement.provider_id + ); +} + pub async fn proposal_receiver_thread( broker: CommonBroker, mut proposal_receiver: UnboundedReceiver, diff --git a/core/market/src/protocol/negotiation.rs b/core/market/src/protocol/negotiation.rs index ad38fc8203..a907a30027 100644 --- a/core/market/src/protocol/negotiation.rs +++ b/core/market/src/protocol/negotiation.rs @@ -14,14 +14,20 @@ pub mod common { use ya_net::{self as net, RemoteEndpoint}; use ya_service_bus::RpcEndpoint; + use chrono::NaiveDateTime; + /// Sent to notify other side about termination. pub async fn propagate_terminate_agreement( agreement: &Agreement, reason: Option, + signature: String, + timestamp: NaiveDateTime, ) -> Result<(), TerminateAgreementError> { let msg = AgreementTerminated { agreement_id: agreement.id.clone().swap_owner(), reason, + signature, + termination_ts: timestamp, }; log::debug!( diff --git a/core/market/src/protocol/negotiation/error.rs b/core/market/src/protocol/negotiation/error.rs index 7fdf52de7c..5c1775c5d7 100644 --- a/core/market/src/protocol/negotiation/error.rs +++ b/core/market/src/protocol/negotiation/error.rs @@ -49,6 +49,7 @@ pub enum RejectProposalError { } #[derive(Error, Debug, Serialize, Deserialize)] +#[non_exhaustive] pub enum RemoteProposalError { #[error(transparent)] Validation(#[from] ProposalValidationError), @@ -72,9 +73,12 @@ pub enum ProposeAgreementError { Gsb(#[from] GsbAgreementError), #[error("Agreement [{1}] remote error: {0}")] Remote(RemoteProposeAgreementError, AgreementId), + #[error("Agreement [{0}] not signed.")] + NotSigned(AgreementId), } #[derive(Error, Debug, Serialize, Deserialize)] +#[non_exhaustive] pub enum RemoteProposeAgreementError { #[error("Proposal [{0}] not found.")] NotFound(ProposalId), @@ -110,6 +114,10 @@ pub enum ApproveAgreementError { }, #[error("Timeout while sending approval of Agreement [{0}]")] Timeout(AgreementId), + #[error("Agreement [{0}] doesn't contain approval timestamp.")] + NoApprovalTimestamp(AgreementId), + #[error("Agreement [{0}] not signed.")] + NotSigned(AgreementId), } #[derive(Error, Debug, Serialize, Deserialize)] @@ -130,6 +138,7 @@ pub enum TerminateAgreementError { } #[derive(Error, Debug, Serialize, Deserialize)] +#[non_exhaustive] pub enum RemoteAgreementError { #[error("Agreement [{0}] not found.")] NotFound(AgreementId), @@ -141,6 +150,36 @@ pub enum RemoteAgreementError { InternalError(AgreementId), } +#[derive(Error, Debug, Serialize, Deserialize)] +pub enum CommitAgreementError { + #[error("Commit Agreement {0}.")] + Gsb(#[from] GsbAgreementError), + #[error("Remote commit Agreement [{1}] error: {0}")] + Remote(RemoteCommitAgreementError, AgreementId), + #[error(transparent)] + CallerParse(#[from] CallerParseError), + #[error("Agreement [{0}] not signed.")] + NotSigned(AgreementId), +} + +#[derive(Error, Debug, Serialize, Deserialize)] +#[non_exhaustive] +pub enum RemoteCommitAgreementError { + #[error("Agreement expired.")] + Expired, + #[error("Agreement cancelled.")] + Cancelled, + #[error("Agreement not found.")] + NotFound, + #[error("Agreement in state {0}, can't be committed.")] + InvalidState(AgreementState), + #[error("Unexpected error: {public_msg} {original_msg}.")] + Unexpected { + public_msg: String, + original_msg: String, + }, +} + impl RemoteSensitiveError for RemoteProposeAgreementError { fn hide_sensitive_info(self) -> RemoteProposeAgreementError { match self { diff --git a/core/market/src/protocol/negotiation/messages.rs b/core/market/src/protocol/negotiation/messages.rs index 6a02638e39..e5521c6c8e 100644 --- a/core/market/src/protocol/negotiation/messages.rs +++ b/core/market/src/protocol/negotiation/messages.rs @@ -5,7 +5,9 @@ use ya_client::model::market::Reason; use ya_service_bus::RpcMessage; use crate::db::model::{AgreementId, DbProposal, Owner, Proposal, ProposalId, SubscriptionId}; -use crate::protocol::negotiation::error::{ProposeAgreementError, RejectProposalError}; +use crate::protocol::negotiation::error::{ + CommitAgreementError, ProposeAgreementError, RejectProposalError, +}; use super::super::callback::CallbackMessage; use super::error::{ @@ -117,7 +119,8 @@ pub struct AgreementReceived { pub agreement_id: AgreementId, pub creation_ts: NaiveDateTime, pub valid_to: NaiveDateTime, - // TODO: We should send here signature. + /// This will be placed in `proposed_signature` Agreement field. + pub signature: String, } impl RpcMessage for AgreementReceived { @@ -130,7 +133,13 @@ impl RpcMessage for AgreementReceived { #[serde(rename_all = "camelCase")] pub struct AgreementApproved { pub agreement_id: AgreementId, - // TODO: We should send here signature. + /// This will be placed in `approved_signature` Agreement field. + pub signature: String, + /// This timestamp will differ from timestamp, when Agreement will be updated in + /// database to `Approved` state and `AgreementApprovedEvent` timestamp either. + /// But we can't set it to time value, when state changes to `Approved`, because we + /// must include this field in signature. + pub approved_ts: NaiveDateTime, } impl RpcMessage for AgreementApproved { @@ -143,6 +152,7 @@ impl RpcMessage for AgreementApproved { #[serde(rename_all = "camelCase")] pub struct AgreementRejected { pub agreement_id: AgreementId, + pub reason: Option, } impl RpcMessage for AgreementRejected { @@ -155,6 +165,7 @@ impl RpcMessage for AgreementRejected { #[serde(rename_all = "camelCase")] pub struct AgreementCancelled { pub agreement_id: AgreementId, + pub reason: Option, } impl RpcMessage for AgreementCancelled { @@ -168,6 +179,10 @@ impl RpcMessage for AgreementCancelled { pub struct AgreementTerminated { pub agreement_id: AgreementId, pub reason: Option, + /// Signature for `AgreementTerminatedEvent`. + pub signature: String, + /// Termination timestamp, that will be included in signature. + pub termination_ts: NaiveDateTime, } impl RpcMessage for AgreementTerminated { @@ -176,6 +191,20 @@ impl RpcMessage for AgreementTerminated { type Error = TerminateAgreementError; } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AgreementCommitted { + pub agreement_id: AgreementId, + /// This will be placed in `committed_signature` Agreement field. + pub signature: String, +} + +impl RpcMessage for AgreementCommitted { + const ID: &'static str = "AgreementCommitted"; + type Item = (); + type Error = CommitAgreementError; +} + /// The same messaged will be used on GSB and as messages in callbacks. impl CallbackMessage for Message { type Ok = ::Item; @@ -237,3 +266,10 @@ impl AgreementTerminated { self } } + +impl AgreementCommitted { + pub fn translate(mut self, owner: Owner) -> Self { + self.agreement_id = self.agreement_id.translate(owner); + self + } +} diff --git a/core/market/src/protocol/negotiation/provider.rs b/core/market/src/protocol/negotiation/provider.rs index e1646623c8..66e54ede5c 100644 --- a/core/market/src/protocol/negotiation/provider.rs +++ b/core/market/src/protocol/negotiation/provider.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; +use ya_client::model::market::Reason; use ya_client::model::NodeId; use ya_core_model::market::BUS_ID; use ya_net::{self as net, RemoteEndpoint}; @@ -14,12 +15,13 @@ use super::error::{ NegotiationApiInitError, TerminateAgreementError, }; use super::messages::{ - provider, requestor, AgreementApproved, AgreementCancelled, AgreementReceived, - AgreementRejected, AgreementTerminated, InitialProposalReceived, ProposalContent, - ProposalReceived, ProposalRejected, + provider, requestor, AgreementApproved, AgreementCancelled, AgreementCommitted, + AgreementReceived, AgreementRejected, AgreementTerminated, InitialProposalReceived, + ProposalContent, ProposalReceived, ProposalRejected, +}; +use crate::protocol::negotiation::error::{ + CommitAgreementError, ProposeAgreementError, RejectProposalError, }; -use crate::protocol::negotiation::error::{ProposeAgreementError, RejectProposalError}; -use ya_client::model::market::Reason; /// Responsible for communication with markets on other nodes /// during negotiation phase. @@ -35,8 +37,12 @@ struct NegotiationImpl { agreement_received: HandlerSlot, agreement_cancelled: HandlerSlot, agreement_terminated: HandlerSlot, + agreement_committed: HandlerSlot, } +// TODO: Most of these functions don't need to be members of NegotiationApi. +// We should make them plain functions in `provider` module, since it doesn't +// seem, that they will ever need self. impl NegotiationApi { pub fn new( initial_proposal_received: impl CallbackHandler, @@ -45,6 +51,7 @@ impl NegotiationApi { agreement_received: impl CallbackHandler, agreement_cancelled: impl CallbackHandler, agreement_terminated: impl CallbackHandler, + agreement_committed: impl CallbackHandler, ) -> NegotiationApi { let negotiation_impl = NegotiationImpl { initial_proposal_received: HandlerSlot::new(initial_proposal_received), @@ -53,6 +60,7 @@ impl NegotiationApi { agreement_received: HandlerSlot::new(agreement_received), agreement_cancelled: HandlerSlot::new(agreement_cancelled), agreement_terminated: HandlerSlot::new(agreement_terminated), + agreement_committed: HandlerSlot::new(agreement_committed), }; NegotiationApi { inner: Arc::new(negotiation_impl), @@ -101,15 +109,23 @@ impl NegotiationApi { Ok(()) } - /// TODO: pass agreement signature. pub async fn approve_agreement( &self, agreement: &Agreement, timeout: f32, ) -> Result<(), ApproveAgreementError> { let timeout = Duration::from_secs_f32(timeout.max(0.0)); + let id = agreement.id.clone(); + let msg = AgreementApproved { - agreement_id: agreement.id.clone(), + agreement_id: id.clone(), + signature: agreement + .approved_signature + .clone() + .ok_or(ApproveAgreementError::NotSigned(id.clone()))?, + approved_ts: agreement + .approved_ts + .ok_or(ApproveAgreementError::NoApprovalTimestamp(id.clone()))?, }; let net_send_fut = net::from(agreement.provider_id) .to(agreement.requestor_id) @@ -117,8 +133,8 @@ impl NegotiationApi { .send(msg); tokio::time::timeout(timeout, net_send_fut) .await - .map_err(|_| ApproveAgreementError::Timeout(agreement.id.clone()))? - .map_err(|e| GsbAgreementError(e.to_string(), agreement.id.clone()))??; + .map_err(|_| ApproveAgreementError::Timeout(id.clone()))? + .map_err(|e| GsbAgreementError(e.to_string(), id.clone()))??; Ok(()) } @@ -130,6 +146,7 @@ impl NegotiationApi { ) -> Result<(), GsbAgreementError> { let msg = AgreementRejected { agreement_id: agreement_id.clone(), + reason: None, }; net::from(id) .to(owner) @@ -242,6 +259,22 @@ impl NegotiationApi { .await } + async fn on_agreement_committed( + self, + caller: String, + msg: AgreementCommitted, + ) -> Result<(), CommitAgreementError> { + log::debug!( + "Negotiation API: Agreement [{}] committed by [{}].", + &msg.agreement_id, + &caller + ); + self.inner + .agreement_committed + .call(caller, msg.translate(Owner::Provider)) + .await + } + pub async fn bind_gsb( &self, public_prefix: &str, @@ -277,6 +310,10 @@ impl NegotiationApi { .bind_with_processor(move |_, myself, caller: String, msg: AgreementTerminated| { let myself = myself.clone(); myself.on_agreement_terminated(caller, msg) + }) + .bind_with_processor(move |_, myself, caller: String, msg: AgreementCommitted| { + let myself = myself.clone(); + myself.on_agreement_committed(caller, msg) }); Ok(()) } diff --git a/core/market/src/protocol/negotiation/requestor.rs b/core/market/src/protocol/negotiation/requestor.rs index 5aa784cae4..2e7db95ee6 100644 --- a/core/market/src/protocol/negotiation/requestor.rs +++ b/core/market/src/protocol/negotiation/requestor.rs @@ -1,6 +1,7 @@ use futures::future::TryFutureExt; use std::sync::Arc; +use ya_client::model::market::Reason; use ya_client::model::NodeId; use ya_core_model::market::BUS_ID; use ya_net::{self as net, RemoteEndpoint}; @@ -18,8 +19,10 @@ use super::messages::{ AgreementRejected, AgreementTerminated, InitialProposalReceived, ProposalContent, ProposalReceived, ProposalRejected, }; -use crate::protocol::negotiation::error::{ProposeAgreementError, RejectProposalError}; -use ya_client::model::market::Reason; +use crate::protocol::negotiation::error::{ + CommitAgreementError, ProposeAgreementError, RejectProposalError, +}; +use crate::protocol::negotiation::messages::AgreementCommitted; /// Responsible for communication with markets on other nodes /// during negotiation phase. @@ -36,6 +39,9 @@ struct NegotiationImpl { agreement_terminated: HandlerSlot, } +// TODO: Most of these functions don't need to be members of NegotiationApi. +// We should make them plain functions in `requestor` module, since it doesn't +// seem, that they will ever need self. impl NegotiationApi { pub fn new( proposal_received: impl CallbackHandler, @@ -132,18 +138,42 @@ impl NegotiationApi { ) -> Result<(), ProposeAgreementError> { let requestor_id = agreement.requestor_id.clone(); let provider_id = agreement.provider_id.clone(); - let agreement_id = agreement.id.clone(); + let id = agreement.id.clone(); let msg = AgreementReceived { agreement_id: agreement.id.clone(), valid_to: agreement.valid_to.clone(), creation_ts: agreement.creation_ts.clone(), proposal_id: agreement.offer_proposal_id.clone(), + signature: agreement + .proposed_signature + .clone() + .ok_or(ProposeAgreementError::NotSigned(id.clone()))?, + }; + net::from(requestor_id) + .to(provider_id) + .service(&provider::agreement_addr(BUS_ID)) + .send(msg) + .map_err(|e| GsbAgreementError(e.to_string(), id)) + .await??; + Ok(()) + } + + pub async fn commit_agreement(agreement: &Agreement) -> Result<(), CommitAgreementError> { + let requestor_id = agreement.requestor_id.clone(); + let provider_id = agreement.provider_id.clone(); + let id = agreement.id.clone(); + let msg = AgreementCommitted { + agreement_id: agreement.id.clone(), + signature: agreement + .committed_signature + .clone() + .ok_or(CommitAgreementError::NotSigned(id.clone()))?, }; net::from(requestor_id) .to(provider_id) .service(&provider::agreement_addr(BUS_ID)) .send(msg) - .map_err(|e| GsbAgreementError(e.to_string(), agreement_id)) + .map_err(|e| GsbAgreementError(e.to_string(), id)) .await??; Ok(()) } @@ -159,6 +189,7 @@ impl NegotiationApi { ) -> Result<(), GsbAgreementError> { let msg = AgreementCancelled { agreement_id: agreement_id.clone(), + reason: None, }; net::from(id) .to(owner) diff --git a/core/market/src/rest_api/error.rs b/core/market/src/rest_api/error.rs index dd71f42d2a..0c368553b8 100644 --- a/core/market/src/rest_api/error.rs +++ b/core/market/src/rest_api/error.rs @@ -193,6 +193,7 @@ impl ResponseError for AgreementError { let msg = ErrorMessage::new(self.to_string()); match self { AgreementError::NotFound(_) => HttpResponse::NotFound().json(msg), + AgreementError::Expired(_) => HttpResponse::Gone().json(msg), AgreementError::AlreadyExists(_, _) => HttpResponse::Conflict().json(msg), AgreementError::UpdateState(_, e) => e.error_response(), AgreementError::NoNegotiations(_) @@ -207,6 +208,7 @@ impl ResponseError for AgreementError { | AgreementError::ProtocolCreate(_) | AgreementError::ProtocolApprove(_) | AgreementError::ProtocolTerminate(_) + | AgreementError::ProtocolCommit(_) | AgreementError::Internal(_) => HttpResponse::InternalServerError().json(msg), } } @@ -219,6 +221,7 @@ impl ResponseError for AgreementDaoError { AgreementDaoError::InvalidTransition { from, .. } => match from { AgreementState::Proposal => HttpResponse::Conflict().json(msg), AgreementState::Pending + | AgreementState::Approving | AgreementState::Cancelled | AgreementState::Rejected | AgreementState::Expired diff --git a/core/market/src/rest_api/requestor.rs b/core/market/src/rest_api/requestor.rs index b1290cde2e..cc6465f3f1 100644 --- a/core/market/src/rest_api/requestor.rs +++ b/core/market/src/rest_api/requestor.rs @@ -1,9 +1,11 @@ use actix_web::web::{Data, Json, Path, Query}; use actix_web::{HttpResponse, Responder, Scope}; +use metrics::counter; use std::str::FromStr; use std::sync::Arc; use ya_client::model::market::{AgreementProposal, NewDemand, NewProposal, Reason}; +use ya_client::model::ErrorMessage; use ya_service_api_web::middleware::Identity; use ya_std_utils::LogErr; @@ -16,7 +18,6 @@ use super::{ }; use crate::negotiation::ApprovalStatus; use crate::rest_api::QueryAppSessionId; -use ya_client::model::ErrorMessage; pub fn register_endpoints(scope: Scope) -> Scope { scope @@ -205,5 +206,7 @@ async fn cancel_agreement( _id: Identity, _body: Json>, ) -> HttpResponse { + // TODO: Move to final implementation. + counter!("market.agreements.requestor.cancelled", 1); HttpResponse::NotImplemented().finish() } diff --git a/core/market/src/testing/events_helper.rs b/core/market/src/testing/events_helper.rs index 963f8d7b34..cfe4ef3ad9 100644 --- a/core/market/src/testing/events_helper.rs +++ b/core/market/src/testing/events_helper.rs @@ -35,7 +35,7 @@ pub fn generate_event(id: i32, timestamp: NaiveDateTime) -> TestMarketEvent { } } -const QUERY_EVENTS_TIMEOUT: f32 = 1.3; +const QUERY_EVENTS_TIMEOUT: f32 = 2.3; pub mod requestor { use super::*; diff --git a/core/market/src/testing/mock_node.rs b/core/market/src/testing/mock_node.rs index 8f143196ad..5213bec522 100644 --- a/core/market/src/testing/mock_node.rs +++ b/core/market/src/testing/mock_node.rs @@ -243,6 +243,7 @@ impl MarketsNetwork { prov_agreement_received: impl CallbackHandler, prov_agreement_cancelled: impl CallbackHandler, prov_agreement_terminated: impl CallbackHandler, + prov_agreement_committed: impl CallbackHandler, ) -> Self { self.add_negotiation_api( name, @@ -252,6 +253,7 @@ impl MarketsNetwork { prov_agreement_received, prov_agreement_cancelled, prov_agreement_terminated, + prov_agreement_committed, default::empty_on_proposal_received, default::empty_on_proposal_rejected, default::empty_on_agreement_approved, @@ -278,6 +280,7 @@ impl MarketsNetwork { default::empty_on_agreement_received, default::empty_on_agreement_cancelled, default::empty_on_agreement_terminated, + default::empty_on_agreement_committed, req_proposal_received, req_proposal_rejected, req_agreement_approved, @@ -296,6 +299,7 @@ impl MarketsNetwork { prov_agreement_received: impl CallbackHandler, prov_agreement_cancelled: impl CallbackHandler, prov_agreement_terminated: impl CallbackHandler, + prov_agreement_committed: impl CallbackHandler, req_proposal_received: impl CallbackHandler, req_proposal_rejected: impl CallbackHandler, req_agreement_approved: impl CallbackHandler, @@ -309,6 +313,7 @@ impl MarketsNetwork { prov_agreement_received, prov_agreement_cancelled, prov_agreement_terminated, + prov_agreement_committed, ); let requestor = requestor::NegotiationApi::new( @@ -588,8 +593,8 @@ impl MarketServiceExt for MarketService { pub mod default { use super::*; use crate::protocol::negotiation::error::{ - ApproveAgreementError, CounterProposalError, GsbAgreementError, ProposeAgreementError, - RejectProposalError, TerminateAgreementError, + ApproveAgreementError, CommitAgreementError, CounterProposalError, GsbAgreementError, + ProposeAgreementError, RejectProposalError, TerminateAgreementError, }; pub async fn empty_on_offers_retrieved( @@ -669,6 +674,13 @@ pub mod default { Ok(()) } + pub async fn empty_on_agreement_committed( + _caller: String, + _msg: AgreementCommitted, + ) -> Result<(), CommitAgreementError> { + Ok(()) + } + pub async fn empty_on_agreement_terminated( _caller: String, _msg: AgreementTerminated, diff --git a/core/market/tests/test_agreement.rs b/core/market/tests/test_agreement.rs index 2539520460..3938682cc0 100644 --- a/core/market/tests/test_agreement.rs +++ b/core/market/tests/test_agreement.rs @@ -630,7 +630,7 @@ async fn second_approval_should_fail() { agreement_id, AgreementDaoError::InvalidTransition { from: AgreementState::Approved, - to: AgreementState::Approved + to: AgreementState::Approving } ), result diff --git a/utils/std-utils/src/result.rs b/utils/std-utils/src/result.rs index 49ee48ceb3..a6b9cc2015 100644 --- a/utils/std-utils/src/result.rs +++ b/utils/std-utils/src/result.rs @@ -1,18 +1,30 @@ use log::{Level, Record}; +use std::fmt::{Debug, Display}; -pub trait LogErr { +pub trait LogErr { /// If Result is `Err`, this function logs it on error level /// and returns the same Result. In case of `Ok` nothing happens. fn log_err(self) -> Result; fn log_err_msg(self, message: &str) -> Result; + fn log_warn_msg(self, message: &str) -> Result; + + fn log_error(self, message: &str, log_level: Level) -> Result; } -impl LogErr for Result { +impl LogErr for Result { fn log_err(self) -> Result { self.log_err_msg("") } fn log_err_msg(self, message: &str) -> Result { + self.log_error(message, Level::Error) + } + + fn log_warn_msg(self, message: &str) -> Result { + self.log_error(message, Level::Warn) + } + + fn log_error(self, message: &str, log_level: Level) -> Result { if let Err(e) = self { backtrace::trace(|frame| { let mut cont = true; @@ -28,7 +40,7 @@ impl LogErr for Result { } log::logger().log( &Record::builder() - .level(Level::Error) + .level(log_level) .args(format_args!("{}: {}", msg, e)) .module_path(Some(module_path)) .build(),