Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Market -- approval ack #1015

Merged
merged 11 commits into from
Feb 8, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ CREATE TABLE market_agreement(
approved_signature TEXT,
committed_signature TEXT,

CHECK (state in ('Proposal','Pending','Cancelled','Rejected','Approved','Expired','Terminated'))
CHECK (state in ('Proposal','Pending','Cancelled','Rejected','Approved','Expired','Terminated', 'Approving'))
);

-- Change Proposal state from enum to Text value for better database introspection.
Expand Down
125 changes: 119 additions & 6 deletions core/market/src/db/dao/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,49 +162,80 @@ impl<'c> AgreementDao<'c> {
&self,
id: &AgreementId,
session: &AppSessionId,
) -> Result<(), AgreementDaoError> {
signature: &String,
) -> Result<Agreement, AgreementDaoError> {
let id = id.clone();
let session = session.clone();
let signature = signature.clone();

do_with_transaction(self.pool, move |conn| {
let mut agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

update_state(conn, &mut agreement, AgreementState::Pending)?;
update_proposed_signature(conn, &mut agreement, signature)?;

if let Some(session) = session {
update_session(conn, &mut agreement, session)?;
}
Ok(())
Ok(agreement)
})
.await
}

/// Function won't change appSessionId, if session parameter is None.
pub async fn approve(
/// Signature will be placed in `approved_signature` field.
pub async fn approving(
&self,
id: &AgreementId,
session: &AppSessionId,
) -> Result<(), AgreementDaoError> {
signature: &String,
timestamp: &NaiveDateTime,
) -> Result<Agreement, AgreementDaoError> {
let id = id.clone();
let session = session.clone();
let signature = signature.clone();
let timestamp = timestamp.clone();

do_with_transaction(self.pool, move |conn| {
let mut agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

update_state(conn, &mut agreement, AgreementState::Approved)?;
update_state(conn, &mut agreement, AgreementState::Approving)?;
update_approved_signature(conn, &mut agreement, signature)?;
update_approve_timestamp(conn, &mut agreement, timestamp)?;

// It's important, that if None AppSessionId comes, we shouldn't update Agreement
// appSessionId field to None. This function can be called in different context, for example
// on Requestor, when appSessionId is already set.
if let Some(session) = session {
update_session(conn, &mut agreement, session)?;
}
Ok(agreement)
})
.await
}

/// Signature will be placed in `committed_signature` field.
pub async fn approve(
&self,
id: &AgreementId,
signature: &String,
) -> Result<Agreement, AgreementDaoError> {
let id = id.clone();
let signature = signature.clone();

do_with_transaction(self.pool, move |conn| {
let mut agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

update_state(conn, &mut agreement, AgreementState::Approved)?;
update_committed_signature(conn, &mut agreement, signature)?;

// Always Provider approves.
create_event(conn, &agreement, None, Owner::Provider)?;

Ok(())
Ok(agreement)
})
.await
}
Expand All @@ -229,6 +260,29 @@ impl<'c> AgreementDao<'c> {
.await
}

pub async fn revert_approving(&self, id: &AgreementId) -> Result<bool, AgreementDaoError> {
let id = id.clone();

do_with_transaction(self.pool, move |conn| {
let agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

if agreement.state != AgreementState::Approving {
return Err(AgreementDaoError::InvalidTransition {
from: agreement.state,
to: AgreementState::Pending,
});
}

let num_updated = diesel::update(market_agreement.find(&id))
.set(agreement::state.eq(&AgreementState::Pending))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;
Ok(num_updated > 0)
})
.await
}

pub async fn clean(&self) -> DbResult<()> {
// FIXME use grace time from config file when #460 is merged
log::trace!("Clean market agreements: start");
Expand Down Expand Up @@ -296,6 +350,65 @@ fn update_state(
Ok(num_updated > 0)
}

fn update_proposed_signature(
conn: &ConnType,
agreement: &mut Agreement,
signature: String,
) -> Result<bool, AgreementDaoError> {
let signature = Some(signature);
let num_updated = diesel::update(market_agreement.find(&agreement.id))
.set(agreement::proposed_signature.eq(&signature))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;

agreement.proposed_signature = signature;
Ok(num_updated > 0)
}

fn update_approved_signature(
conn: &ConnType,
agreement: &mut Agreement,
signature: String,
) -> Result<bool, AgreementDaoError> {
let signature = Some(signature);
let num_updated = diesel::update(market_agreement.find(&agreement.id))
.set(agreement::approved_signature.eq(&signature))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;

agreement.approved_signature = signature;
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
Ok(num_updated > 0)
}

fn update_committed_signature(
conn: &ConnType,
agreement: &mut Agreement,
signature: String,
) -> Result<bool, AgreementDaoError> {
let signature = Some(signature);
let num_updated = diesel::update(market_agreement.find(&agreement.id))
.set(agreement::committed_signature.eq(&signature))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;

agreement.committed_signature = signature;
Ok(num_updated > 0)
}

fn update_approve_timestamp(
conn: &ConnType,
agreement: &mut Agreement,
timestamp: NaiveDateTime,
) -> Result<bool, AgreementDaoError> {
let num_updated = diesel::update(market_agreement.find(&agreement.id))
.set(agreement::approved_ts.eq(&timestamp))
.execute(conn)
.map_err(|e| AgreementDaoError::DbError(e.into()))?;

agreement.approved_ts = Some(timestamp);
Ok(num_updated > 0)
}

fn update_session(
conn: &ConnType,
agreement: &mut Agreement,
Expand Down
12 changes: 12 additions & 0 deletions core/market/src/db/model/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub enum AgreementState {
Proposal,
/// Confirmed by a Requestor and sent to Provider for approval
Pending,
/// Additional internal state mapped to `Pending` in client structures.
/// This state will appear after Provider will call `approve_agreement`,
/// but before Requestor will send back `AgreementCommitted`.
Approving,
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
/// Cancelled by a Requestor
Cancelled,
/// Rejected by a Provider
Expand Down Expand Up @@ -180,6 +184,7 @@ impl From<AgreementState> for ClientAgreementState {
match agreement_state {
AgreementState::Proposal => ClientAgreementState::Proposal,
AgreementState::Pending => ClientAgreementState::Pending,
AgreementState::Approving => ClientAgreementState::Pending,
AgreementState::Cancelled => ClientAgreementState::Cancelled,
AgreementState::Rejected => ClientAgreementState::Rejected,
AgreementState::Approved => ClientAgreementState::Approved,
Expand All @@ -199,8 +204,15 @@ pub fn check_transition(from: AgreementState, to: AgreementState) -> Result<(),
_ => (),
},
AgreementState::Pending => match to {
AgreementState::Approving => return Ok(()),
AgreementState::Cancelled => return Ok(()),
AgreementState::Rejected => return Ok(()),
AgreementState::Expired => return Ok(()),
_ => (),
},
AgreementState::Approving => match to {
AgreementState::Pending => return Ok(()),
Copy link
Contributor

@tworec tworec Feb 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear it is intentional; for the curious readers

Suggested change
AgreementState::Pending => return Ok(()),
// we allow transition from `Approving` to the `Pending` back and forth
AgreementState::Pending => return Ok(()),

AgreementState::Cancelled => return Ok(()),
tworec marked this conversation as resolved.
Show resolved Hide resolved
AgreementState::Approved => return Ok(()),
AgreementState::Expired => return Ok(()),
_ => (),
Expand Down
5 changes: 4 additions & 1 deletion core/market/src/db/model/agreement_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ impl NewAgreementEvent {
Ok(Self {
agreement_id: agreement.id.clone(),
event_type: match agreement.state {
AgreementState::Pending | AgreementState::Proposal | AgreementState::Expired => {
AgreementState::Pending
| AgreementState::Proposal
| AgreementState::Expired
| AgreementState::Approving => {
let msg = format!("Wrong [{}] state {}", agreement.id, agreement.state);
log::error!("{}", msg);
return Err(EventFromAgreementError(msg));
Expand Down
8 changes: 7 additions & 1 deletion core/market/src/negotiation/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,13 @@ impl CommonBroker {

validate_transition(&agreement, AgreementState::Terminated)?;

protocol_common::propagate_terminate_agreement(&agreement, reason.clone()).await?;
protocol_common::propagate_terminate_agreement(
&agreement,
reason.clone(),
"NotSigned".to_string(),
Utc::now().naive_utc(),
)
.await?;

let reason_string = CommonBroker::reason2string(&reason);
dao.terminate(&agreement.id, reason_string, agreement.id.owner())
Expand Down
9 changes: 7 additions & 2 deletions core/market/src/negotiation/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use crate::db::{
};
use crate::matcher::error::{DemandError, QueryOfferError};
use crate::protocol::negotiation::error::{
ApproveAgreementError, CounterProposalError as ProtocolProposalError, GsbAgreementError,
NegotiationApiInitError, ProposeAgreementError, RejectProposalError, TerminateAgreementError,
ApproveAgreementError, CommitAgreementError, CounterProposalError as ProtocolProposalError,
GsbAgreementError, NegotiationApiInitError, ProposeAgreementError, RejectProposalError,
TerminateAgreementError,
};

#[derive(Error, Debug)]
Expand Down Expand Up @@ -46,6 +47,8 @@ pub enum MatchValidationError {
pub enum AgreementError {
#[error("Agreement [{0}] not found.")]
NotFound(String),
#[error("Agreement [{0}] expired.")]
Expired(AgreementId),
#[error("Can't create Agreement for Proposal {0}. Proposal {1} not found.")]
ProposalNotFound(ProposalId, ProposalId),
#[error("Can't create second Agreement [{0}] for Proposal [{1}].")]
Expand Down Expand Up @@ -74,6 +77,8 @@ pub enum AgreementError {
ProtocolApprove(#[from] ApproveAgreementError),
#[error("Protocol error while terminating: {0}")]
ProtocolTerminate(#[from] TerminateAgreementError),
#[error("Protocol error while committing: {0}")]
ProtocolCommit(#[from] CommitAgreementError),
#[error("Internal error: {0}")]
Internal(String),
}
Expand Down
4 changes: 2 additions & 2 deletions core/market/src/negotiation/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ where
Type: Debug + PartialEq + Clone + EnableDisplay<Type> + 'static,
for<'a> DisplayEnabler<'a, Type>: std::fmt::Display,
{
#[error("Timeout while waiting for events for subscription [{}]", .0.display())]
#[error("Timeout while waiting for events for id [{}]", .0.display())]
Timeout(Type),
#[error("Unsubscribed [{}]", .0.display())]
Unsubscribed(Type),
#[error("Channel closed while waiting for events for subscription [{}]", .0.display())]
#[error("Channel closed while waiting for events for id [{}]", .0.display())]
ChannelClosed(Type),
}

Expand Down
Loading