Skip to content

Commit

Permalink
Solution proposal (not compiling)
Browse files Browse the repository at this point in the history
  • Loading branch information
nieznanysprawiciel committed Feb 4, 2021
1 parent 0f1df34 commit a5a8314
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 17 deletions.
4 changes: 4 additions & 0 deletions core/market/src/db/model/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub enum AgreementState {
Proposal,
/// Confirmed by a Requestor and sent to Provider for approval
Pending,
/// Additional internal state mapped to `Pending` in client structures.
/// This state will appear after Provider will call `approve_agreement`,
/// but before Requestor will send back `AgreementCommitted`.
Approving,
/// Cancelled by a Requestor
Cancelled,
/// Rejected by a Provider
Expand Down
25 changes: 25 additions & 0 deletions core/market/src/negotiation/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl ProviderBroker {
let broker3 = broker.clone();
let broker_proposal_reject = broker.clone();
let broker_terminated = broker.clone();
let commit_broker = broker.clone();

let api = NegotiationApi::new(
move |caller: String, msg: InitialProposalReceived| {
Expand All @@ -75,6 +76,11 @@ impl ProviderBroker {
.clone()
.on_agreement_terminated(msg, caller, Owner::Requestor)
},
move |caller: String, msg: AgreementCommitted| {
commit_broker
.clone()
.on_agreement_committed(msg, caller, Owner::Requestor)
},
);

// Initialize counters to 0 value. Otherwise they won't appear on metrics endpoint
Expand Down Expand Up @@ -402,6 +408,25 @@ async fn agreement_received(
Ok(())
}

async fn on_agreement_committed(
broker: CommonBroker,
caller: String,
msg: AgreementCommitted,
) -> Result<(), CommitAgreementError> {
let agreement_id = msg.agreement_id.clone();
commit_agreement(broker, caller, msg)
.await
.map_err(|e| CommitAgreementError::Remote(e, agreement_id))
}

async fn commit_agreement(
broker: CommonBroker,
caller: String,
msg: AgreementCommitted,
) -> Result<(), RemoteCommitAgreementError> {
unimplemented!()
}

impl From<GetProposalError> for RemoteProposeAgreementError {
fn from(e: GetProposalError) -> Self {
match e {
Expand Down
74 changes: 59 additions & 15 deletions core/market/src/negotiation/requestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ impl RequestorBroker {
move |caller: String, msg: AgreementApproved| {
on_agreement_approved(broker2.clone(), caller, msg)
},
move |_caller: String, _msg: AgreementRejected| async move { unimplemented!() },
move |_caller: String, _msg: AgreementRejected| async move {
counter!("market.agreements.requestor.rejected", 1);
unimplemented!()
},
move |caller: String, msg: AgreementTerminated| {
broker_terminated
.clone()
Expand Down Expand Up @@ -311,15 +314,12 @@ impl RequestorBroker {

match agreement.state {
AgreementState::Approved => {
counter!("market.agreements.requestor.approved", 1);
return Ok(ApprovalStatus::Approved);
}
AgreementState::Rejected => {
counter!("market.agreements.requestor.rejected", 1);
return Ok(ApprovalStatus::Rejected);
}
AgreementState::Cancelled => {
counter!("market.agreements.requestor.cancelled", 1);
return Ok(ApprovalStatus::Cancelled);
}
AgreementState::Expired => return Err(WaitForApprovalError::Expired(id.clone())),
Expand Down Expand Up @@ -423,6 +423,8 @@ async fn agreement_approved(
caller: NodeId,
msg: AgreementApproved,
) -> Result<(), RemoteAgreementError> {
// TODO: We should check as many conditions here, as possible, because we want
// to return meaningful message to Provider, what is impossible from `commit_agreement`.
let agreement = {
// We aren't sure, if `confirm_agreement` execution is finished,
// so we must lock, to avoid attempt to change database state before.
Expand All @@ -441,41 +443,83 @@ async fn agreement_approved(
Err(RemoteAgreementError::NotFound(msg.agreement_id.clone()))?
}

// TODO: Validate Agreement `valid_to` timestamp. In previous version we got
// error from database update, but know we want to escape early, because
// otherwise we can't response with this error to Provider.
validate_transition(&agreement, AgreementState::Approving)?;

// TODO: Validate agreement signature.
// TODO: Sign Agreement and update `approved_signature`.
// TODO: Update `approved_ts` Agreement field.
// TODO: Update state to `AgreementState::Approving`.

agreement
};

/// TODO: Commit Agreement. We must spawn committing later, because we need to
/// return from this function to provider.
tokio::task::spawn_local(commit_agreement(broker, agreement.id));
Ok(())
}

async fn commit_agreement(broker: CommonBroker, agreement_id: AgreementId) {
let agreement = match {
let _hold = broker.agreement_lock.lock(&agreement_id).await;

let dao = broker.db.as_dao::<AgreementDao>();
let agreement = dao
.select(&msg.agreement_id, None, Utc::now().naive_utc())
.await
.map_err(|_e| RemoteAgreementError::NotFound(msg.agreement_id.clone()))?
.ok_or(RemoteAgreementError::NotFound(msg.agreement_id.clone()))?;

// TODO: Send `AgreementCommited` message to Provider.
// TODO: We have problem to make GSB call here, since we don't have `api` object.
// Note that in this scenario, we update database after Provider already
// got `AgreementCommitted` and updated Agreement state to `Approved`, so we will
// wake up `wait_for_agreement` after Provider.

// Note: session must be None, because either we already set this value in ConfirmAgreement,
// or we purposely left it None.
broker
.db
.as_dao::<AgreementDao>()
.approve(&msg.agreement_id, &None)
dao.approve(&agreement_id, &None)
.await
.map_err(|err| match err {
AgreementDaoError::InvalidTransition { from, .. } => {
match from {
// Expired Agreement could be InvalidState either, but we want to explicit
// say to provider, that Agreement has expired.
AgreementState::Expired => {
RemoteAgreementError::Expired(msg.agreement_id.clone())
RemoteAgreementError::Expired(agreement_id.clone())
}
_ => RemoteAgreementError::InvalidState(msg.agreement_id.clone(), from),
_ => RemoteAgreementError::InvalidState(agreement_id.clone(), from),
}
}
e => {
// Log our internal error, but don't reveal error message to Provider.
log::warn!(
"Approve Agreement [{}] internal error: {}",
&msg.agreement_id,
&agreement_id,
e
);
RemoteAgreementError::InternalError(msg.agreement_id.clone())
RemoteAgreementError::InternalError(agreement_id.clone())
}
})?;
agreement
Ok(agreement)
} {
Ok(agreement) => agreement,
Err(e) => {
// TODO: Return to pending state here.
}
};

broker.notify_agreement(&agreement).await;
log::info!("Agreement [{}] approved by [{}].", &agreement.id, &caller);
Ok(())

counter!("market.agreements.requestor.approved", 1);
log::info!(
"Agreement [{}] approved by [{}].",
&agreement.id,
&agreement.provider_id
);
}

pub async fn proposal_receiver_thread(
Expand Down
9 changes: 9 additions & 0 deletions core/market/src/protocol/negotiation/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ pub enum RemoteAgreementError {
InternalError(AgreementId),
}

#[derive(Error, Debug, Serialize, Deserialize)]
pub enum CommitAgreementError {
#[error("Remote Commit Agreement [{1}] error: {0}")]
Remote(RemoteCommitAgreementError, AgreementId),
}

#[derive(Error, Debug, Serialize, Deserialize)]
pub enum RemoteCommitAgreementError {}

impl RemoteSensitiveError for RemoteProposeAgreementError {
fn hide_sensitive_info(self) -> RemoteProposeAgreementError {
match self {
Expand Down
37 changes: 35 additions & 2 deletions core/market/src/protocol/negotiation/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ pub struct AgreementReceived {
pub agreement_id: AgreementId,
pub creation_ts: NaiveDateTime,
pub valid_to: NaiveDateTime,
// TODO: We should send here signature.
/// This will be placed in `proposed_signature` Agreement field.
pub signature: String,
}

impl RpcMessage for AgreementReceived {
Expand All @@ -130,7 +131,12 @@ impl RpcMessage for AgreementReceived {
#[serde(rename_all = "camelCase")]
pub struct AgreementApproved {
pub agreement_id: AgreementId,
// TODO: We should send here signature.
/// This will be placed in `approved_signature` Agreement field.
pub signature: String,
/// This timestamp will differ from timestamp, when Agreement will be updated in
/// database to `Approved` state and `AgreementApprovedEvent` timestamp either.
/// But we can't set it to any value, because we should include this field in signature.
pub approved_ts: NaiveDateTime,
}

impl RpcMessage for AgreementApproved {
Expand All @@ -143,6 +149,7 @@ impl RpcMessage for AgreementApproved {
#[serde(rename_all = "camelCase")]
pub struct AgreementRejected {
pub agreement_id: AgreementId,
pub reason: Option<Reason>,
}

impl RpcMessage for AgreementRejected {
Expand All @@ -155,6 +162,7 @@ impl RpcMessage for AgreementRejected {
#[serde(rename_all = "camelCase")]
pub struct AgreementCancelled {
pub agreement_id: AgreementId,
pub reason: Option<Reason>,
}

impl RpcMessage for AgreementCancelled {
Expand All @@ -168,6 +176,10 @@ impl RpcMessage for AgreementCancelled {
pub struct AgreementTerminated {
pub agreement_id: AgreementId,
pub reason: Option<Reason>,
/// Signature for `AgreementTerminatedEvent`.
pub signature: String,
/// Termination timestamp, that will be included in signature.
pub termination_ts: NaiveDateTime,
}

impl RpcMessage for AgreementTerminated {
Expand All @@ -176,6 +188,20 @@ impl RpcMessage for AgreementTerminated {
type Error = TerminateAgreementError;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AgreementCommitted {
pub agreement_id: AgreementId,
/// This will be placed in `committed_signature` Agreement field.
pub signature: String,
}

impl RpcMessage for AgreementCommitted {
const ID: &'static str = "CommitAgreement";
type Item = ();
type Error = AgreementCommitted;
}

/// The same messaged will be used on GSB and as messages in callbacks.
impl<Message: RpcMessage> CallbackMessage for Message {
type Ok = <Message as RpcMessage>::Item;
Expand Down Expand Up @@ -237,3 +263,10 @@ impl AgreementTerminated {
self
}
}

impl AgreementCommitted {
pub fn translate(mut self, owner: Owner) -> Self {
self.agreement_id = self.agreement_id.translate(owner);
self
}
}
4 changes: 4 additions & 0 deletions core/market/src/protocol/negotiation/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::messages::{
ProposalReceived, ProposalRejected,
};
use crate::protocol::negotiation::error::{ProposeAgreementError, RejectProposalError};
use crate::testing::negotiation::messages::AgreementCommitted;
use ya_client::model::market::Reason;

/// Responsible for communication with markets on other nodes
Expand All @@ -35,6 +36,7 @@ struct NegotiationImpl {
agreement_received: HandlerSlot<AgreementReceived>,
agreement_cancelled: HandlerSlot<AgreementCancelled>,
agreement_terminated: HandlerSlot<AgreementTerminated>,
agreement_committed: HandlerSlot<AgreementCommitted>,
}

impl NegotiationApi {
Expand All @@ -45,6 +47,7 @@ impl NegotiationApi {
agreement_received: impl CallbackHandler<AgreementReceived>,
agreement_cancelled: impl CallbackHandler<AgreementCancelled>,
agreement_terminated: impl CallbackHandler<AgreementTerminated>,
agreement_committed: impl CallbackHandler<AgreementCommitted>,
) -> NegotiationApi {
let negotiation_impl = NegotiationImpl {
initial_proposal_received: HandlerSlot::new(initial_proposal_received),
Expand All @@ -53,6 +56,7 @@ impl NegotiationApi {
agreement_received: HandlerSlot::new(agreement_received),
agreement_cancelled: HandlerSlot::new(agreement_cancelled),
agreement_terminated: HandlerSlot::new(agreement_terminated),
agreement_committed: HandlerSlot::new(agreement_committed),
};
NegotiationApi {
inner: Arc::new(negotiation_impl),
Expand Down
2 changes: 2 additions & 0 deletions core/market/src/rest_api/requestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,5 +205,7 @@ async fn cancel_agreement(
_id: Identity,
_body: Json<Option<Reason>>,
) -> HttpResponse {
// TODO: Move to final implementation.
counter!("market.agreements.requestor.cancelled", 1);
HttpResponse::NotImplemented().finish()
}

0 comments on commit a5a8314

Please sign in to comment.