Skip to content

Commit

Permalink
Solution proposal (not compiling)
Browse files Browse the repository at this point in the history
  • Loading branch information
nieznanysprawiciel committed Feb 4, 2021
1 parent 0f1df34 commit 2f7e130
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 34 deletions.
11 changes: 11 additions & 0 deletions core/market/src/db/model/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub enum AgreementState {
Proposal,
/// Confirmed by a Requestor and sent to Provider for approval
Pending,
/// Additional internal state mapped to `Pending` in client structures.
/// This state will appear after Provider will call `approve_agreement`,
/// but before Requestor will send back `AgreementCommitted`.
Approving,
/// Cancelled by a Requestor
Cancelled,
/// Rejected by a Provider
Expand Down Expand Up @@ -180,6 +184,7 @@ impl From<AgreementState> for ClientAgreementState {
match agreement_state {
AgreementState::Proposal => ClientAgreementState::Proposal,
AgreementState::Pending => ClientAgreementState::Pending,
AgreementState::Approving => ClientAgreementState::Pending,
AgreementState::Cancelled => ClientAgreementState::Cancelled,
AgreementState::Rejected => ClientAgreementState::Rejected,
AgreementState::Approved => ClientAgreementState::Approved,
Expand All @@ -199,8 +204,14 @@ pub fn check_transition(from: AgreementState, to: AgreementState) -> Result<(),
_ => (),
},
AgreementState::Pending => match to {
AgreementState::Approving => return Ok(()),
AgreementState::Cancelled => return Ok(()),
AgreementState::Rejected => return Ok(()),
AgreementState::Expired => return Ok(()),
_ => (),
},
AgreementState::Approving => match to {
AgreementState::Cancelled => return Ok(()),
AgreementState::Approved => return Ok(()),
AgreementState::Expired => return Ok(()),
_ => (),
Expand Down
89 changes: 72 additions & 17 deletions core/market/src/negotiation/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::notifier::EventNotifier;
use crate::config::Config;
use crate::negotiation::common::validate_transition;
use crate::utils::display::EnableDisplay;
use ya_core_model::NodeId;

#[derive(Clone, Debug, Eq, PartialEq, derive_more::Display)]
pub enum ApprovalResult {
Expand Down Expand Up @@ -51,6 +52,7 @@ impl ProviderBroker {
let broker3 = broker.clone();
let broker_proposal_reject = broker.clone();
let broker_terminated = broker.clone();
let commit_broker = broker.clone();

let api = NegotiationApi::new(
move |caller: String, msg: InitialProposalReceived| {
Expand All @@ -75,6 +77,11 @@ impl ProviderBroker {
.clone()
.on_agreement_terminated(msg, caller, Owner::Requestor)
},
move |caller: String, msg: AgreementCommitted| {
commit_broker
.clone()
.on_agreement_committed(msg, caller, Owner::Requestor)
},
);

// Initialize counters to 0 value. Otherwise they won't appear on metrics endpoint
Expand Down Expand Up @@ -223,37 +230,85 @@ impl ProviderBroker {
Some(agreement) => agreement,
};

validate_transition(&agreement, AgreementState::Approved)?;
validate_transition(&agreement, AgreementState::Approving)?;

// `db.update_state` must be invoked after successful approve_agreement
// TODO: if dao.approve fails, Provider and Requestor have inconsistent state.
self.api.approve_agreement(&agreement, timeout).await?;
dao.approve(agreement_id, &app_session_id)
.await
.map_err(|e| AgreementError::UpdateState(agreement_id.clone(), e))?;
// TODO: Update app_session_id here.
if let Some(session) = app_session_id {
log::info!(
"AppSession id [{}] set for Agreement [{}].",
&session,
&agreement.id
);
}

// TODO: Update state to `AgreementState::Approving`.

// TODO: Send signature and `approved_ts` from Agreement.
self.api.approve_agreement(&agreement, timeout).await?;
agreement
};

self.common.notify_agreement(&agreement).await;
// TODO: Here we must wait until `AgreementCommitted` message, since `approve_agreement`
// is supposed to return after approval.

counter!("market.agreements.provider.approved", 1);
log::info!(
"Provider {} approved Agreement [{}].",
id.display(),
&agreement_id,
&agreement.id,
);
if let Some(session) = app_session_id {
log::info!(
"AppSession id [{}] set for Agreement [{}].",
&session,
&agreement_id
);
}
return Ok(ApprovalResult::Approved);
}
}

async fn on_agreement_committed(
broker: CommonBroker,
caller: String,
msg: AgreementCommitted,
) -> Result<(), CommitAgreementError> {
let agreement_id = msg.agreement_id.clone();
let caller_id = CommonBroker::parse_caller(&caller)?;
commit_agreement(broker, caller_id, msg)
.await
.map_err(|e| CommitAgreementError::Remote(e, agreement_id))
}

async fn commit_agreement(
broker: CommonBroker,
caller: NodeId,
msg: AgreementCommitted,
) -> Result<(), RemoteCommitAgreementError> {
let dao = broker.db.as_dao::<AgreementDao>();
let agreement = {
let _hold = broker.agreement_lock.lock(&msg.agreement_id).await;

// Note: we still validate caller here, because we can't be sure, that we were caller
// by the same Requestor.
let agreement = match dao
.select(&msg.agreement_id, Some(caller), Utc::now().naive_utc())
.await
.map_err(|e| AgreementError::Get(msg.agreement_id.to_string(), e))?
{
None => Err(AgreementError::NotFound(msg.agreement_id.to_string()))?,
Some(agreement) => agreement,
};

/// TODO: `approve` shouldn't set AppSessionId anymore.
dao.approve(&msg.agreement_id, &app_session_id)
.await
.map_err(|e| AgreementError::UpdateState(msg.agreement_id.clone(), e))?;
agreement
};

broker.notify_agreement(&agreement).await;
counter!("market.agreements.provider.approved", 1);
log::info!(
"Agreement [{}] approved (committed) by [{}].",
&agreement.id,
&caller
);
Ok(())
}

// TODO: We need more elegant solution than this. This function still returns
// CounterProposalError, which should be hidden in negotiation API and implementations
// of handlers should return RemoteProposalError.
Expand Down
79 changes: 64 additions & 15 deletions core/market/src/negotiation/requestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ impl RequestorBroker {
move |caller: String, msg: AgreementApproved| {
on_agreement_approved(broker2.clone(), caller, msg)
},
move |_caller: String, _msg: AgreementRejected| async move { unimplemented!() },
move |_caller: String, _msg: AgreementRejected| async move {
counter!("market.agreements.requestor.rejected", 1);
unimplemented!()
},
move |caller: String, msg: AgreementTerminated| {
broker_terminated
.clone()
Expand Down Expand Up @@ -311,15 +314,12 @@ impl RequestorBroker {

match agreement.state {
AgreementState::Approved => {
counter!("market.agreements.requestor.approved", 1);
return Ok(ApprovalStatus::Approved);
}
AgreementState::Rejected => {
counter!("market.agreements.requestor.rejected", 1);
return Ok(ApprovalStatus::Rejected);
}
AgreementState::Cancelled => {
counter!("market.agreements.requestor.cancelled", 1);
return Ok(ApprovalStatus::Cancelled);
}
AgreementState::Expired => return Err(WaitForApprovalError::Expired(id.clone())),
Expand Down Expand Up @@ -423,6 +423,8 @@ async fn agreement_approved(
caller: NodeId,
msg: AgreementApproved,
) -> Result<(), RemoteAgreementError> {
// TODO: We should check as many conditions here, as possible, because we want
// to return meaningful message to Provider, what is impossible from `commit_agreement`.
let agreement = {
// We aren't sure, if `confirm_agreement` execution is finished,
// so we must lock, to avoid attempt to change database state before.
Expand All @@ -441,41 +443,88 @@ async fn agreement_approved(
Err(RemoteAgreementError::NotFound(msg.agreement_id.clone()))?
}

// TODO: Validate Agreement `valid_to` timestamp. In previous version we got
// error from database update, but know we want to escape early, because
// otherwise we can't response with this error to Provider.
validate_transition(&agreement, AgreementState::Approving)?;

// TODO: Validate agreement signature.
// TODO: Sign Agreement and update `approved_signature`.
// TODO: Update `approved_ts` Agreement field.
// TODO: Update state to `AgreementState::Approving`.

agreement
};

/// TODO: Commit Agreement. We must spawn committing later, because we need to
/// return from this function to provider.
tokio::task::spawn_local(commit_agreement(broker, agreement.id));
log::info!(
"Agreement [{}] approved by [{}].",
&agreement.id,
&agreement.provider_id
);
Ok(())
}

async fn commit_agreement(broker: CommonBroker, agreement_id: AgreementId) {
let agreement = match {
let _hold = broker.agreement_lock.lock(&agreement_id).await;

let dao = broker.db.as_dao::<AgreementDao>();
let agreement = dao
.select(&msg.agreement_id, None, Utc::now().naive_utc())
.await
.map_err(|_e| RemoteAgreementError::NotFound(msg.agreement_id.clone()))?
.ok_or(RemoteAgreementError::NotFound(msg.agreement_id.clone()))?;

// TODO: Send `AgreementCommited` message to Provider.
// TODO: We have problem to make GSB call here, since we don't have `api` object.
// Note that in this scenario, we update database after Provider already
// got `AgreementCommitted` and updated Agreement state to `Approved`, so we will
// wake up `wait_for_agreement` after Provider.

// Note: session must be None, because either we already set this value in ConfirmAgreement,
// or we purposely left it None.
broker
.db
.as_dao::<AgreementDao>()
.approve(&msg.agreement_id, &None)
dao.approve(&agreement_id, &None)
.await
.map_err(|err| match err {
AgreementDaoError::InvalidTransition { from, .. } => {
match from {
// Expired Agreement could be InvalidState either, but we want to explicit
// say to provider, that Agreement has expired.
AgreementState::Expired => {
RemoteAgreementError::Expired(msg.agreement_id.clone())
RemoteAgreementError::Expired(agreement_id.clone())
}
_ => RemoteAgreementError::InvalidState(msg.agreement_id.clone(), from),
_ => RemoteAgreementError::InvalidState(agreement_id.clone(), from),
}
}
e => {
// Log our internal error, but don't reveal error message to Provider.
log::warn!(
"Approve Agreement [{}] internal error: {}",
&msg.agreement_id,
&agreement_id,
e
);
RemoteAgreementError::InternalError(msg.agreement_id.clone())
RemoteAgreementError::InternalError(agreement_id.clone())
}
})?;
agreement
Ok(agreement)
} {
Ok(agreement) => agreement,
Err(e) => {
// TODO: Return to pending state here.
}
};

broker.notify_agreement(&agreement).await;
log::info!("Agreement [{}] approved by [{}].", &agreement.id, &caller);
Ok(())

counter!("market.agreements.requestor.approved", 1);
log::info!(
"Agreement [{}] approved (committed) by [{}].",
&agreement.id,
&agreement.provider_id
);
}

pub async fn proposal_receiver_thread(
Expand Down
11 changes: 11 additions & 0 deletions core/market/src/protocol/negotiation/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ pub enum RemoteAgreementError {
InternalError(AgreementId),
}

#[derive(Error, Debug, Serialize, Deserialize)]
pub enum CommitAgreementError {
#[error("Remote Commit Agreement [{1}] error: {0}")]
Remote(RemoteCommitAgreementError, AgreementId),
#[error(transparent)]
CallerParse(#[from] CallerParseError),
}

#[derive(Error, Debug, Serialize, Deserialize)]
pub enum RemoteCommitAgreementError {}

impl RemoteSensitiveError for RemoteProposeAgreementError {
fn hide_sensitive_info(self) -> RemoteProposeAgreementError {
match self {
Expand Down
Loading

0 comments on commit 2f7e130

Please sign in to comment.