Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Market -- approval ack #1015

Merged
merged 11 commits into from
Feb 8, 2021
12 changes: 12 additions & 0 deletions core/market/src/db/model/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub enum AgreementState {
Proposal,
/// Confirmed by a Requestor and sent to Provider for approval
Pending,
/// Additional internal state mapped to `Pending` in client structures.
/// This state will appear after Provider will call `approve_agreement`,
/// but before Requestor will send back `AgreementCommitted`.
Approving,
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
/// Cancelled by a Requestor
Cancelled,
/// Rejected by a Provider
Expand Down Expand Up @@ -180,6 +184,7 @@ impl From<AgreementState> for ClientAgreementState {
match agreement_state {
AgreementState::Proposal => ClientAgreementState::Proposal,
AgreementState::Pending => ClientAgreementState::Pending,
AgreementState::Approving => ClientAgreementState::Pending,
AgreementState::Cancelled => ClientAgreementState::Cancelled,
AgreementState::Rejected => ClientAgreementState::Rejected,
AgreementState::Approved => ClientAgreementState::Approved,
Expand All @@ -199,8 +204,15 @@ pub fn check_transition(from: AgreementState, to: AgreementState) -> Result<(),
_ => (),
},
AgreementState::Pending => match to {
AgreementState::Approving => return Ok(()),
AgreementState::Cancelled => return Ok(()),
AgreementState::Rejected => return Ok(()),
AgreementState::Expired => return Ok(()),
_ => (),
},
AgreementState::Approving => match to {
AgreementState::Pending => return Ok(()),
Copy link
Contributor

@tworec tworec Feb 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear it is intentional; for the curious readers

Suggested change
AgreementState::Pending => return Ok(()),
// we allow transition from `Approving` to the `Pending` back and forth
AgreementState::Pending => return Ok(()),

AgreementState::Cancelled => return Ok(()),
tworec marked this conversation as resolved.
Show resolved Hide resolved
AgreementState::Approved => return Ok(()),
AgreementState::Expired => return Ok(()),
_ => (),
Expand Down
12 changes: 2 additions & 10 deletions core/market/src/negotiation/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,7 @@ impl CommonBroker {
// that could have been called by other party at the same time. Termination
// consists of 2 operations: sending message to other party and updating state.
// Race conditions could appear in this situation.
self.agreement_lock
.get_lock(&agreement.id)
.await
.lock()
.await;
let _hold = self.agreement_lock.lock(&agreement.id).await;

validate_transition(&agreement, AgreementState::Terminated)?;

Expand Down Expand Up @@ -437,11 +433,7 @@ impl CommonBroker {
// that could have been called by one of our Agents. Termination consists
// of 2 operations: sending message to other party and updating state.
// Race conditions could appear in this situation.
self.agreement_lock
.get_lock(&agreement_id)
.await
.lock()
.await;
let _hold = self.agreement_lock.lock(&agreement_id).await;

let agreement = dao
.select(&agreement_id, None, Utc::now().naive_utc())
Expand Down
123 changes: 99 additions & 24 deletions core/market/src/negotiation/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::notifier::EventNotifier;
use crate::config::Config;
use crate::negotiation::common::validate_transition;
use crate::utils::display::EnableDisplay;
use ya_core_model::NodeId;

#[derive(Clone, Debug, Eq, PartialEq, derive_more::Display)]
pub enum ApprovalResult {
Expand Down Expand Up @@ -51,6 +52,7 @@ impl ProviderBroker {
let broker3 = broker.clone();
let broker_proposal_reject = broker.clone();
let broker_terminated = broker.clone();
let commit_broker = broker.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have in some old branch, a builder for Brokers, that would allow to get rid of this, but I'm to shy (read: it is to close to Mainnet release) to reveal it now ;)


let api = NegotiationApi::new(
move |caller: String, msg: InitialProposalReceived| {
Expand All @@ -75,6 +77,11 @@ impl ProviderBroker {
.clone()
.on_agreement_terminated(msg, caller, Owner::Requestor)
},
move |caller: String, msg: AgreementCommitted| {
commit_broker
.clone()
.on_agreement_committed(msg, caller, Owner::Requestor)
},
);

// Initialize counters to 0 value. Otherwise they won't appear on metrics endpoint
Expand Down Expand Up @@ -212,12 +219,7 @@ impl ProviderBroker {
) -> Result<ApprovalResult, AgreementError> {
let dao = self.common.db.as_dao::<AgreementDao>();
let agreement = {
self.common
.agreement_lock
.get_lock(&agreement_id)
.await
.lock()
.await;
let _hold = self.common.agreement_lock.lock(&agreement_id).await;

let agreement = match dao
.select(agreement_id, None, Utc::now().naive_utc())
Expand All @@ -228,37 +230,110 @@ impl ProviderBroker {
Some(agreement) => agreement,
};

validate_transition(&agreement, AgreementState::Approved)?;
validate_transition(&agreement, AgreementState::Approving)?;

// `db.update_state` must be invoked after successful approve_agreement
// TODO: if dao.approve fails, Provider and Requestor have inconsistent state.
self.api.approve_agreement(&agreement, timeout).await?;
dao.approve(agreement_id, &app_session_id)
.await
.map_err(|e| AgreementError::UpdateState(agreement_id.clone(), e))?;
// TODO: Update app_session_id here.
if let Some(session) = app_session_id {
log::info!(
"AppSession id [{}] set for Agreement [{}].",
&session,
&agreement.id
);
}

// TODO: Update state to `AgreementState::Approving`.

agreement
};

self.common.notify_agreement(&agreement).await;
// It doesn't have to be under lock, since we have `Approving` state.
// Note that this state change between `Approving` and `Pending` in both
// ways is invisible for REST and GSB user, because `Approving` is only our
// internal state and is mapped to `Pending`.
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
//
// Note: There's reason, that it CAN'T be done under lock. If we hold lock whole time
// Requestor won't be able to cancel his Agreement proposal and he is allowed to do it.
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Send signature and `approved_ts` from Agreement.
self.api.approve_agreement(&agreement, timeout).await?;
// TODO: Reverse state to `Pending` in case of error (reverse under lock).
// TODO: During reversing it can turn out, that we are in `Cancelled` or `Approved` state
// since we weren't under lock during `self.api.approve_agreement` execution. In such a case,
// we shouldn't return error from here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should check carefully what is the API error in such a case. In case of state error, it is ok not to return error here, but in other cases (eg. network error) we still should return error, I think


counter!("market.agreements.provider.approved", 1);
log::info!(
"Provider {} approved Agreement [{}].",
"Provider {} approved Agreement [{}]. Waiting for commit from Requestor [{}].",
id.display(),
&agreement_id,
&agreement.id,
agreement.requestor_id
);
if let Some(session) = app_session_id {
log::info!(
"AppSession id [{}] set for Agreement [{}].",
&session,
&agreement_id
);
}

// TODO: Here we must wait until `AgreementCommitted` message, since `approve_agreement`
// is supposed to return after approval.
// TODO: Waiting should set timeout.
// TODO: What in case of timeout?? Reverse to `Pending` state?
// Note: This function isn't responsible for changing Agreement state to `Approved`.

// TODO: Check Agreement state here, because it could be `Cancelled`.
return Ok(ApprovalResult::Approved);
}
}

async fn on_agreement_committed(
broker: CommonBroker,
caller: String,
msg: AgreementCommitted,
) -> Result<(), CommitAgreementError> {
let agreement_id = msg.agreement_id.clone();
let caller_id = CommonBroker::parse_caller(&caller)?;
agreement_committed(broker, caller_id, msg)
.await
.map_err(|e| CommitAgreementError::Remote(e, agreement_id))
}

async fn agreement_committed(
broker: CommonBroker,
caller: NodeId,
msg: AgreementCommitted,
) -> Result<(), RemoteCommitAgreementError> {
let dao = broker.db.as_dao::<AgreementDao>();
let agreement = {
let _hold = broker.agreement_lock.lock(&msg.agreement_id).await;

// Note: we still validate caller here, because we can't be sure, that we were caller
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
// by the same Requestor.
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
let agreement = match dao
.select(&msg.agreement_id, Some(caller), Utc::now().naive_utc())
.await
.map_err(|e| AgreementError::Get(msg.agreement_id.to_string(), e))?
{
None => Err(AgreementError::NotFound(msg.agreement_id.to_string()))?,
Some(agreement) => agreement,
};

// Note: We can find out here, that our Agreement is already in `Cancelled` state, because
// Requestor is allowed to call `cancel_agreement` at any time, before we commit Agreement.
// In this case we should return here, but we still must call `notify_agreement` to wake other threads.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they have been already wakened up as a side effect of cancel_agreement or other commit_agreement which is also possible in between. But notifying more is better than less.

validate_transition(&agreement, AgreementState::Approving)?;

// TODO: Validate committed signature from message.

// TODO: `approve` shouldn't set AppSessionId anymore.
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
dao.approve(&msg.agreement_id, &app_session_id)
.await
.map_err(|e| AgreementError::UpdateState(msg.agreement_id.clone(), e))?;
agreement
};

broker.notify_agreement(&agreement).await;
counter!("market.agreements.provider.approved", 1);
log::info!(
"Agreement [{}] approved (committed) by [{}].",
&agreement.id,
&caller
);
Ok(())
}

// TODO: We need more elegant solution than this. This function still returns
// CounterProposalError, which should be hidden in negotiation API and implementations
// of handlers should return RemoteProposalError.
Expand Down
97 changes: 70 additions & 27 deletions core/market/src/negotiation/requestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ impl RequestorBroker {
move |caller: String, msg: AgreementApproved| {
on_agreement_approved(broker2.clone(), caller, msg)
},
move |_caller: String, _msg: AgreementRejected| async move { unimplemented!() },
move |_caller: String, _msg: AgreementRejected| async move {
counter!("market.agreements.requestor.rejected", 1);
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
unimplemented!()
},
move |caller: String, msg: AgreementTerminated| {
broker_terminated
.clone()
Expand Down Expand Up @@ -311,15 +314,12 @@ impl RequestorBroker {

match agreement.state {
AgreementState::Approved => {
counter!("market.agreements.requestor.approved", 1);
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
return Ok(ApprovalStatus::Approved);
}
AgreementState::Rejected => {
counter!("market.agreements.requestor.rejected", 1);
return Ok(ApprovalStatus::Rejected);
}
AgreementState::Cancelled => {
counter!("market.agreements.requestor.cancelled", 1);
return Ok(ApprovalStatus::Cancelled);
}
AgreementState::Expired => return Err(WaitForApprovalError::Expired(id.clone())),
Expand Down Expand Up @@ -357,12 +357,7 @@ impl RequestorBroker {
// We won't be able to process `on_agreement_approved`, before we
// finish execution under this lock. This avoids errors related to
// Provider approving Agreement before we set proper state in database.
self.common
.agreement_lock
.get_lock(&agreement_id)
.await
.lock()
.await;
let _hold = self.common.agreement_lock.lock(&agreement_id).await;

let agreement = match dao
.select(
Expand Down Expand Up @@ -428,15 +423,12 @@ async fn agreement_approved(
caller: NodeId,
msg: AgreementApproved,
) -> Result<(), RemoteAgreementError> {
// TODO: We should check as many conditions here, as possible, because we want
// to return meaningful message to Provider, what is impossible from `commit_agreement`.
let agreement = {
// We aren't sure, if `confirm_agreement` execution is finished,
// so we must lock, to avoid attempt to change database state before.
broker
.agreement_lock
.get_lock(&msg.agreement_id)
.await
.lock()
.await;
let _hold = broker.agreement_lock.lock(&msg.agreement_id).await;

let agreement = broker
.db
Expand All @@ -451,41 +443,92 @@ async fn agreement_approved(
Err(RemoteAgreementError::NotFound(msg.agreement_id.clone()))?
}

// TODO: Validate Agreement `valid_to` timestamp. In previous version we got
// error from database update, but know we want to escape early, because
// otherwise we can't response with this error to Provider.
validate_transition(&agreement, AgreementState::Approving)?;

// TODO: Validate agreement signature.
// TODO: Sign Agreement and update `approved_signature`.
// TODO: Update `approved_ts` Agreement field.
// TODO: Update state to `AgreementState::Approving`.

agreement
};

/// TODO: Commit Agreement. We must spawn committing later, because we need to
/// return from this function to provider.
tokio::task::spawn_local(commit_agreement(broker, agreement.id));
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
log::info!(
"Agreement [{}] approved by [{}].",
&agreement.id,
&agreement.provider_id
);
Ok(())
}

async fn commit_agreement(broker: CommonBroker, agreement_id: AgreementId) {
// Note: in this scenario, we update database after Provider already
// got `AgreementCommitted` and updated Agreement state to `Approved`, so we will
// wake up `wait_for_agreement` after Provider.
nieznanysprawiciel marked this conversation as resolved.
Show resolved Hide resolved
let agreement = match {
let _hold = broker.agreement_lock.lock(&agreement_id).await;

let dao = broker.db.as_dao::<AgreementDao>();
let agreement = dao
.select(&msg.agreement_id, None, Utc::now().naive_utc())
.await
.map_err(|_e| RemoteAgreementError::NotFound(msg.agreement_id.clone()))?
.ok_or(RemoteAgreementError::NotFound(msg.agreement_id.clone()))?;

// TODO: Send `AgreementCommited` message to Provider.
// TODO: We have problem to make GSB call here, since we don't have `api` object.
// we must place this function in `protocol/negotiation/common`

// Note: This GSB call is racing with potential `cancel_agreement` call.
// In this case Provider code will decide, which call won the race.

// Note: session must be None, because either we already set this value in ConfirmAgreement,
// or we purposely left it None.
broker
.db
.as_dao::<AgreementDao>()
.approve(&msg.agreement_id, &None)
dao.approve(&agreement_id, &None)
.await
.map_err(|err| match err {
AgreementDaoError::InvalidTransition { from, .. } => {
match from {
// Expired Agreement could be InvalidState either, but we want to explicit
// say to provider, that Agreement has expired.
AgreementState::Expired => {
RemoteAgreementError::Expired(msg.agreement_id.clone())
RemoteAgreementError::Expired(agreement_id.clone())
}
_ => RemoteAgreementError::InvalidState(msg.agreement_id.clone(), from),
_ => RemoteAgreementError::InvalidState(agreement_id.clone(), from),
}
}
e => {
// Log our internal error, but don't reveal error message to Provider.
log::warn!(
"Approve Agreement [{}] internal error: {}",
&msg.agreement_id,
&agreement_id,
e
);
RemoteAgreementError::InternalError(msg.agreement_id.clone())
RemoteAgreementError::InternalError(agreement_id.clone())
}
})?;
agreement
Ok(agreement)
} {
Ok(agreement) => agreement,
Err(e) => {
// TODO: Return to pending state here.
}
};

broker.notify_agreement(&agreement).await;
log::info!("Agreement [{}] approved by [{}].", &agreement.id, &caller);
Ok(())

counter!("market.agreements.requestor.approved", 1);
tworec marked this conversation as resolved.
Show resolved Hide resolved
log::info!(
"Agreement [{}] approved (committed) by [{}].",
&agreement.id,
&agreement.provider_id
);
}

pub async fn proposal_receiver_thread(
Expand Down
Loading