Skip to content

Commit

Permalink
Implementation of commit agreement
Browse files Browse the repository at this point in the history
  • Loading branch information
nieznanysprawiciel committed Feb 8, 2021
1 parent 3856475 commit 1587512
Show file tree
Hide file tree
Showing 15 changed files with 440 additions and 126 deletions.
125 changes: 119 additions & 6 deletions core/market/src/db/dao/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,49 +162,80 @@ impl<'c> AgreementDao<'c> {
&self,
id: &AgreementId,
session: &AppSessionId,
) -> Result<(), AgreementDaoError> {
signature: &String,
) -> Result<Agreement, AgreementDaoError> {
let id = id.clone();
let session = session.clone();
let signature = signature.clone();

do_with_transaction(self.pool, move |conn| {
let mut agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

update_state(conn, &mut agreement, AgreementState::Pending)?;
update_proposed_signature(conn, &mut agreement, signature)?;

if let Some(session) = session {
update_session(conn, &mut agreement, session)?;
}
Ok(())
Ok(agreement)
})
.await
}

/// Function won't change appSessionId, if session parameter is None.
pub async fn approve(
/// Signature will be placed in `approved_signature` field.
pub async fn approving(
&self,
id: &AgreementId,
session: &AppSessionId,
) -> Result<(), AgreementDaoError> {
signature: &String,
timestamp: &NaiveDateTime,
) -> Result<Agreement, AgreementDaoError> {
let id = id.clone();
let session = session.clone();
let signature = signature.clone();
let timestamp = timestamp.clone();

do_with_transaction(self.pool, move |conn| {
let mut agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

update_state(conn, &mut agreement, AgreementState::Approved)?;
update_state(conn, &mut agreement, AgreementState::Approving)?;
update_approved_signature(conn, &mut agreement, signature)?;
update_approve_timestamp(conn, &mut agreement, timestamp)?;

// It's important, that if None AppSessionId comes, we shouldn't update Agreement
// appSessionId field to None. This function can be called in different context, for example
// on Requestor, when appSessionId is already set.
if let Some(session) = session {
update_session(conn, &mut agreement, session)?;
}
Ok(agreement)
})
.await
}

/// Signature will be placed in `committed_signature` field.
pub async fn approve(
&self,
id: &AgreementId,
signature: &String,
) -> Result<Agreement, AgreementDaoError> {
let id = id.clone();
let signature = signature.clone();

do_with_transaction(self.pool, move |conn| {
let mut agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

update_state(conn, &mut agreement, AgreementState::Approved)?;
update_committed_signature(conn, &mut agreement, signature)?;

// Always Provider approves.
create_event(conn, &agreement, None, Owner::Provider)?;

Ok(())
Ok(agreement)
})
.await
}
Expand All @@ -229,6 +260,29 @@ impl<'c> AgreementDao<'c> {
.await
}

pub async fn revert_approving(&self, id: &AgreementId) -> Result<bool, AgreementDaoError> {
let id = id.clone();

do_with_transaction(self.pool, move |conn| {
let agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

if agreement.state != AgreementState::Approving {
return Err(AgreementDaoError::InvalidTransition {
from: agreement.state,
to: AgreementState::Pending,
});
}

let num_updated = diesel::update(market_agreement.find(&id))
.set(agreement::state.eq(&AgreementState::Pending))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;
Ok(num_updated > 0)
})
.await
}

pub async fn clean(&self) -> DbResult<()> {
// FIXME use grace time from config file when #460 is merged
log::trace!("Clean market agreements: start");
Expand Down Expand Up @@ -296,6 +350,65 @@ fn update_state(
Ok(num_updated > 0)
}

fn update_proposed_signature(
conn: &ConnType,
agreement: &mut Agreement,
signature: String,
) -> Result<bool, AgreementDaoError> {
let signature = Some(signature);
let num_updated = diesel::update(market_agreement.find(&agreement.id))
.set(agreement::proposed_signature.eq(&signature))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;

agreement.proposed_signature = signature;
Ok(num_updated > 0)
}

fn update_approved_signature(
conn: &ConnType,
agreement: &mut Agreement,
signature: String,
) -> Result<bool, AgreementDaoError> {
let signature = Some(signature);
let num_updated = diesel::update(market_agreement.find(&agreement.id))
.set(agreement::approved_signature.eq(&signature))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;

agreement.approved_signature = signature;
Ok(num_updated > 0)
}

fn update_committed_signature(
conn: &ConnType,
agreement: &mut Agreement,
signature: String,
) -> Result<bool, AgreementDaoError> {
let signature = Some(signature);
let num_updated = diesel::update(market_agreement.find(&agreement.id))
.set(agreement::committed_signature.eq(&signature))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;

agreement.committed_signature = signature;
Ok(num_updated > 0)
}

fn update_approve_timestamp(
conn: &ConnType,
agreement: &mut Agreement,
timestamp: NaiveDateTime,
) -> Result<bool, AgreementDaoError> {
let num_updated = diesel::update(market_agreement.find(&agreement.id))
.set(agreement::approved_ts.eq(&timestamp))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;

agreement.approved_ts = Some(timestamp);
Ok(num_updated > 0)
}

fn update_session(
conn: &ConnType,
agreement: &mut Agreement,
Expand Down
5 changes: 4 additions & 1 deletion core/market/src/db/model/agreement_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ impl NewAgreementEvent {
Ok(Self {
agreement_id: agreement.id.clone(),
event_type: match agreement.state {
AgreementState::Pending | AgreementState::Proposal | AgreementState::Expired => {
AgreementState::Pending
| AgreementState::Proposal
| AgreementState::Expired
| AgreementState::Approving => {
let msg = format!("Wrong [{}] state {}", agreement.id, agreement.state);
log::error!("{}", msg);
return Err(EventFromAgreementError(msg));
Expand Down
8 changes: 7 additions & 1 deletion core/market/src/negotiation/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,13 @@ impl CommonBroker {

validate_transition(&agreement, AgreementState::Terminated)?;

protocol_common::propagate_terminate_agreement(&agreement, reason.clone()).await?;
protocol_common::propagate_terminate_agreement(
&agreement,
reason.clone(),
"NotSigned".to_string(),
Utc::now().naive_utc(),
)
.await?;

let reason_string = CommonBroker::reason2string(&reason);
dao.terminate(&agreement.id, reason_string, agreement.id.owner())
Expand Down
9 changes: 7 additions & 2 deletions core/market/src/negotiation/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use crate::db::{
};
use crate::matcher::error::{DemandError, QueryOfferError};
use crate::protocol::negotiation::error::{
ApproveAgreementError, CounterProposalError as ProtocolProposalError, GsbAgreementError,
NegotiationApiInitError, ProposeAgreementError, RejectProposalError, TerminateAgreementError,
ApproveAgreementError, CommitAgreementError, CounterProposalError as ProtocolProposalError,
GsbAgreementError, NegotiationApiInitError, ProposeAgreementError, RejectProposalError,
TerminateAgreementError,
};

#[derive(Error, Debug)]
Expand Down Expand Up @@ -46,6 +47,8 @@ pub enum MatchValidationError {
pub enum AgreementError {
#[error("Agreement [{0}] not found.")]
NotFound(String),
#[error("Agreement [{0}] expired.")]
Expired(AgreementId),
#[error("Can't create Agreement for Proposal {0}. Proposal {1} not found.")]
ProposalNotFound(ProposalId, ProposalId),
#[error("Can't create second Agreement [{0}] for Proposal [{1}].")]
Expand Down Expand Up @@ -74,6 +77,8 @@ pub enum AgreementError {
ProtocolApprove(#[from] ApproveAgreementError),
#[error("Protocol error while terminating: {0}")]
ProtocolTerminate(#[from] TerminateAgreementError),
#[error("Protocol error while committing: {0}")]
ProtocolCommit(#[from] CommitAgreementError),
#[error("Internal error: {0}")]
Internal(String),
}
Expand Down
4 changes: 2 additions & 2 deletions core/market/src/negotiation/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ where
Type: Debug + PartialEq + Clone + EnableDisplay<Type> + 'static,
for<'a> DisplayEnabler<'a, Type>: std::fmt::Display,
{
#[error("Timeout while waiting for events for subscription [{}]", .0.display())]
#[error("Timeout while waiting for events for id [{}]", .0.display())]
Timeout(Type),
#[error("Unsubscribed [{}]", .0.display())]
Unsubscribed(Type),
#[error("Channel closed while waiting for events for subscription [{}]", .0.display())]
#[error("Channel closed while waiting for events for id [{}]", .0.display())]
ChannelClosed(Type),
}

Expand Down
Loading

0 comments on commit 1587512

Please sign in to comment.