Skip to content

Commit

Permalink
Implementation of commit agreement
Browse files Browse the repository at this point in the history
  • Loading branch information
nieznanysprawiciel committed Feb 5, 2021
1 parent 3856475 commit ed4174a
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 117 deletions.
96 changes: 90 additions & 6 deletions core/market/src/db/dao/agreement.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use chrono::NaiveDateTime;
use diesel::expression::AsExpression;
use diesel::prelude::*;
use diesel::query_builder::QueryFragment;

use ya_client::model::NodeId;
use ya_persistence::executor::{do_with_transaction, AsDao, ConnType, PoolType};
Expand All @@ -17,6 +19,8 @@ use crate::db::schema::market_agreement_event::dsl as event;
use crate::db::schema::market_agreement_event::dsl::market_agreement_event;
use crate::db::{DbError, DbResult};
use crate::market::EnvConfig;
use diesel::helper_types::Nullable;
use diesel::sql_types::Text;

const AGREEMENT_STORE_DAYS: EnvConfig<'static, u64> = EnvConfig {
name: "YAGNA_MARKET_AGREEMENT_STORE_DAYS",
Expand Down Expand Up @@ -162,49 +166,95 @@ impl<'c> AgreementDao<'c> {
&self,
id: &AgreementId,
session: &AppSessionId,
) -> Result<(), AgreementDaoError> {
signature: &String,
) -> Result<Agreement, AgreementDaoError> {
let id = id.clone();
let session = session.clone();
let signature = signature.clone();

do_with_transaction(self.pool, move |conn| {
let mut agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

update_state(conn, &mut agreement, AgreementState::Pending)?;
update_signature(
conn,
agreement::proposed_signature,
&mut agreement,
signature,
)?;

if let Some(session) = session {
update_session(conn, &mut agreement, session)?;
}
Ok(())
Ok(agreement)
})
.await
}

/// Function won't change appSessionId, if session parameter is None.
pub async fn approve(
/// Signature will be placed in `approved_signature` field.
pub async fn approving(
&self,
id: &AgreementId,
session: &AppSessionId,
) -> Result<(), AgreementDaoError> {
signature: &String,
timestamp: &NaiveDateTime,
) -> Result<Agreement, AgreementDaoError> {
let id = id.clone();
let session = session.clone();
let signature = signature.clone();
let timestamp = timestamp.clone();

do_with_transaction(self.pool, move |conn| {
let mut agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

update_state(conn, &mut agreement, AgreementState::Approved)?;
update_state(conn, &mut agreement, AgreementState::Approving)?;
update_signature(
conn,
agreement::approved_signature,
&mut agreement,
signature,
)?;
update_approve_timestamp(conn, &mut agreement, timestamp)?;

// It's important, that if None AppSessionId comes, we shouldn't update Agreement
// appSessionId field to None. This function can be called in different context, for example
// on Requestor, when appSessionId is already set.
if let Some(session) = session {
update_session(conn, &mut agreement, session)?;
}
Ok(agreement)
})
.await
}

/// Signature will be placed in `committed_signature` field.
pub async fn approve(
&self,
id: &AgreementId,
signature: &String,
) -> Result<Agreement, AgreementDaoError> {
let id = id.clone();
let signature = signature.clone();

do_with_transaction(self.pool, move |conn| {
let mut agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

update_state(conn, &mut agreement, AgreementState::Approved)?;
update_signature(
conn,
agreement::committed_signature,
&mut agreement,
signature,
)?;

// Always Provider approves.
create_event(conn, &agreement, None, Owner::Provider)?;

Ok(())
Ok(agreement)
})
.await
}
Expand Down Expand Up @@ -296,6 +346,40 @@ fn update_state(
Ok(num_updated > 0)
}

fn update_signature<ColumnSelector>(
conn: &ConnType,
column: ColumnSelector,
agreement: &mut Agreement,
signature: String,
) -> Result<bool, AgreementDaoError>
where
ColumnSelector:
diesel::expression_methods::ExpressionMethods + Column<Table = market_agreement>,
{
let signature = Some(signature);
let num_updated = diesel::update(market_agreement.find(&agreement.id))
.set(column.eq(&signature))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;

agreement.approved_signature = signature;
Ok(num_updated > 0)
}

fn update_approve_timestamp(
conn: &ConnType,
agreement: &mut Agreement,
timestamp: NaiveDateTime,
) -> Result<bool, AgreementDaoError> {
let num_updated = diesel::update(market_agreement.find(&agreement.id))
.set(agreement::approved_ts.eq(&timestamp))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;

agreement.approved_ts = Some(timestamp);
Ok(num_updated > 0)
}

fn update_session(
conn: &ConnType,
agreement: &mut Agreement,
Expand Down
8 changes: 7 additions & 1 deletion core/market/src/negotiation/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,13 @@ impl CommonBroker {

validate_transition(&agreement, AgreementState::Terminated)?;

protocol_common::propagate_terminate_agreement(&agreement, reason.clone()).await?;
protocol_common::propagate_terminate_agreement(
&agreement,
reason.clone(),
"NotSigned".to_string(),
Utc::now().naive_utc(),
)
.await?;

let reason_string = CommonBroker::reason2string(&reason);
dao.terminate(&agreement.id, reason_string, agreement.id.owner())
Expand Down
4 changes: 2 additions & 2 deletions core/market/src/negotiation/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ where
Type: Debug + PartialEq + Clone + EnableDisplay<Type> + 'static,
for<'a> DisplayEnabler<'a, Type>: std::fmt::Display,
{
#[error("Timeout while waiting for events for subscription [{}]", .0.display())]
#[error("Timeout while waiting for events for id [{}]", .0.display())]
Timeout(Type),
#[error("Unsubscribed [{}]", .0.display())]
Unsubscribed(Type),
#[error("Channel closed while waiting for events for subscription [{}]", .0.display())]
#[error("Channel closed while waiting for events for id [{}]", .0.display())]
ChannelClosed(Type),
}

Expand Down
123 changes: 89 additions & 34 deletions core/market/src/negotiation/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use metrics::counter;
use std::sync::Arc;

use ya_client::model::market::{event::ProviderEvent, NewProposal, Reason};
use ya_core_model::NodeId;
use ya_persistence::executor::DbExecutor;
use ya_service_api_web::middleware::Identity;

Expand All @@ -20,8 +21,9 @@ use super::error::*;
use super::notifier::EventNotifier;
use crate::config::Config;
use crate::negotiation::common::validate_transition;
use crate::negotiation::notifier::NotifierError;
use crate::utils::display::EnableDisplay;
use ya_core_model::NodeId;
use ya_std_utils::LogErr;

#[derive(Clone, Debug, Eq, PartialEq, derive_more::Display)]
pub enum ApprovalResult {
Expand Down Expand Up @@ -78,9 +80,7 @@ impl ProviderBroker {
.on_agreement_terminated(msg, caller, Owner::Requestor)
},
move |caller: String, msg: AgreementCommitted| {
commit_broker
.clone()
.on_agreement_committed(msg, caller, Owner::Requestor)
on_agreement_committed(commit_broker.clone(), caller, msg)
},
);

Expand Down Expand Up @@ -221,39 +221,44 @@ impl ProviderBroker {
let agreement = {
let _hold = self.common.agreement_lock.lock(&agreement_id).await;

let agreement = match dao
.select(agreement_id, None, Utc::now().naive_utc())
let agreement = dao
.select(agreement_id, Some(id.identity), Utc::now().naive_utc())
.await
.map_err(|e| AgreementError::Get(agreement_id.to_string(), e))?
{
None => Err(AgreementError::NotFound(agreement_id.to_string()))?,
Some(agreement) => agreement,
};
.ok_or(AgreementError::NotFound(agreement_id.to_string()))?;

validate_transition(&agreement, AgreementState::Approving)?;

// TODO: Update app_session_id here.
// TODO: Sign Agreement.
let signature = "NoSignature".to_string();
let timestamp = Utc::now().naive_utc();

let agreement = dao
.approving(&agreement.id, &app_session_id, &signature, &timestamp)
.await
.map_err(|e| AgreementError::UpdateState(agreement.id.clone(), e))?;

if let Some(session) = app_session_id {
log::info!(
"AppSession id [{}] set for Agreement [{}].",
&session,
&agreement.id
);
}

// TODO: Update state to `AgreementState::Approving`.

agreement
};

// Listen to Agreements notification before we start sending message, because otherwise
// we can lose events.
let mut notifier = self.common.agreement_notifier.listen(&agreement.id);

// It doesn't have to be under lock, since we have `Approving` state.
// Note that this state change between `Approving` and `Pending` in both
// ways is invisible for REST and GSB user, because `Approving` is only our
// internal state and is mapped to `Pending`.
//
// Note: There's reason, that it CAN'T be done under lock. If we hold lock whole time
// Note: There's reason, that it CAN'T be done under lock. If we hold lock whole time
// Requestor won't be able to cancel his Agreement proposal and he is allowed to do it.
// TODO: Send signature and `approved_ts` from Agreement.
self.api.approve_agreement(&agreement, timeout).await?;
// TODO: Reverse state to `Pending` in case of error (reverse under lock).
// TODO: During reversing it can turn out, that we are in `Cancelled` or `Approved` state
Expand All @@ -267,14 +272,44 @@ impl ProviderBroker {
agreement.requestor_id
);

// TODO: Here we must wait until `AgreementCommitted` message, since `approve_agreement`
// is supposed to return after approval.
// TODO: Waiting should set timeout.
// TODO: What in case of timeout?? Reverse to `Pending` state?
// Note: This function isn't responsible for changing Agreement state to `Approved`.
// TODO: Adjust timeout to elapsed time.
// Here we must wait until `AgreementCommitted` message, since `approve_agreement`
// is supposed to return after approval.
match notifier.wait_for_event_with_timeout(timeout).await {
Err(NotifierError::Timeout(_)) => {
// TODO: What in case of timeout?? Reverse to `Pending` state?
Err(ApproveAgreementError::Timeout(agreement.id.clone()))?
}
Err(error) => Err(AgreementError::Internal(format!(
"Code logic error. Agreement events notifier shouldn't return: {}.",
error
)))?,
Ok(_) => (),
}

// TODO: Check Agreement state here, because it could be `Cancelled`.
return Ok(ApprovalResult::Approved);
{
let _hold = self.common.agreement_lock.lock(&agreement_id).await;

let agreement = dao
.select(agreement_id, None, Utc::now().naive_utc())
.await
.map_err(|e| AgreementError::Get(agreement_id.to_string(), e))?
.ok_or(AgreementError::Internal(format!(
"Agreement [{}], which existed previously, disappeared.",
agreement_id
)))?;

match agreement.state {
AgreementState::Cancelled => Ok(ApprovalResult::Cancelled),
AgreementState::Approved => Ok(ApprovalResult::Approved),
AgreementState::Expired => RemoteAgreementError::Expired(agreement.id.clone())?,
_ => Err(AgreementError::Internal(format!(
"Agreement [{}] has unexpected state [{}]",
agreement.id, agreement.state
))),
}
}
}
}

Expand All @@ -301,30 +336,47 @@ async fn agreement_committed(

// Note: we still validate caller here, because we can't be sure, that we were caller
// by the same Requestor.
let agreement = match dao
let agreement = dao
.select(&msg.agreement_id, Some(caller), Utc::now().naive_utc())
.await
.map_err(|e| AgreementError::Get(msg.agreement_id.to_string(), e))?
{
None => Err(AgreementError::NotFound(msg.agreement_id.to_string()))?,
Some(agreement) => agreement,
};
.map_err(|e| RemoteCommitAgreementError::Unexpected {
public_msg: "Internal Error getting Agreement".to_string(),
original_msg: e.to_string(),
})
.log_err()?
.ok_or(RemoteCommitAgreementError::NotFound(
msg.agreement_id.clone(),
))?;

// Note: We can find out here, that our Agreement is already in `Cancelled` state, because
// Requestor is allowed to call `cancel_agreement` at any time, before we commit Agreement.
// In this case we should return here, but we still must call `notify_agreement` to wake other threads.
validate_transition(&agreement, AgreementState::Approving)?;
// In this case we should return here and `cancel_agreement` handler is responsible for
// calling `notify_agreement`.
match validate_transition(&agreement, AgreementState::Approving) {
Ok(_) => Ok(()),
Err(AgreementError::ProtocolApprove(ApproveAgreementError::Remote(
RemoteAgreementError::InvalidState(id, state),
))) => match state {
AgreementState::Cancelled => Err(RemoteCommitAgreementError::Cancelled(id)),
_ => Err(RemoteCommitAgreementError::InvalidState(id, state)),
},
Err(e) => Err(RemoteCommitAgreementError::Unexpected {
public_msg: "Invalid Agreement state".to_string(),
original_msg: e.to_string(),
}),
}
.log_err()?;

// TODO: Validate committed signature from message.

// TODO: `approve` shouldn't set AppSessionId anymore.
dao.approve(&msg.agreement_id, &app_session_id)
dao.approve(&msg.agreement_id, &msg.signature)
.await
.map_err(|e| AgreementError::UpdateState(msg.agreement_id.clone(), e))?;
agreement
.log_err()
.map_err(|e| AgreementError::UpdateState(agreement.id.clone(), e))?
};

broker.notify_agreement(&agreement).await;

counter!("market.agreements.provider.approved", 1);
log::info!(
"Agreement [{}] approved (committed) by [{}].",
Expand Down Expand Up @@ -434,6 +486,9 @@ async fn agreement_received(
Owner::Provider,
);
agreement.state = AgreementState::Pending;
agreement.proposed_signature = Some(msg.signature);

// TODO: Validate signature.

// Check if we generated the same id, as Requestor sent us. If not, reject
// it, because wrong generated ids could be not unique.
Expand Down
Loading

0 comments on commit ed4174a

Please sign in to comment.