Skip to content

Commit

Permalink
Distinguish owner type based on node_id vs agreement
Browse files Browse the repository at this point in the history
  • Loading branch information
jiivan committed Dec 3, 2020
1 parent 476e5d0 commit 8682a4d
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 104 deletions.
33 changes: 33 additions & 0 deletions core/market/src/db/dao/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,39 @@ impl<'c> AgreementDao<'c> {
.await
}

pub async fn select_by_node(
&self,
id: AgreementId,
node_id: NodeId,
validation_ts: NaiveDateTime,
) -> DbResult<Option<Agreement>> {
// Because we explicitly disallow agreements between the same identities
// (i.e. provider_id != requestor_id), we'll always get the right db row
// with this query.
let id_swapped = id.clone().swap_owner();
let id_orig = id.clone();
do_with_transaction(self.pool, move |conn| {
let query = market_agreement
.filter(agreement::id.eq_any(vec![id_orig, id_swapped]))
.filter(
agreement::provider_id
.eq(node_id)
.or(agreement::requestor_id.eq(node_id)),
);
Ok(match query.first::<Agreement>(conn).optional()? {
Some(mut agreement) => {
if agreement.valid_to < validation_ts {
agreement.state = AgreementState::Expired;
update_state(conn, &id, &agreement.state)?;
}
Some(agreement)
}
None => None,
})
})
.await
}

pub async fn terminate(
&self,
id: &AgreementId,
Expand Down
7 changes: 7 additions & 0 deletions core/market/src/matcher/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ impl Resolver {
}

fn matches(offer: &Offer, demand: &Demand) -> bool {
if offer.node_id == demand.node_id {
log::info!(
"Rejecting Demand Offer pair from single identity. node_id: {}",
offer.node_id
);
return false;
}
match match_demand_offer(
&demand.properties,
&demand.constraints,
Expand Down
99 changes: 54 additions & 45 deletions core/market/src/negotiation/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use ya_client::model::market::proposal::Proposal as ClientProposal;
use ya_client::model::market::reason::{ConvertReason, JsonReason, Reason};
use ya_client::model::market::NewProposal;
use ya_client::model::NodeId;
use ya_core_model::market::BUS_ID;
Expand All @@ -30,7 +31,7 @@ use crate::matcher::{
use crate::negotiation::{
error::{
AgreementError, AgreementEventsError, AgreementStateError, GetProposalError,
MatchValidationError, ProposalError, QueryEventsError,
MatchValidationError, ProposalError, QueryEventsError, ReasonError,
},
notifier::NotifierError,
EventNotifier,
Expand Down Expand Up @@ -295,54 +296,53 @@ impl CommonBroker {
pub async fn terminate_agreement(
&self,
id: Identity,
agreement_id: &AgreementId,
agreement_id: AgreementId,
reason: Option<String>,
owner_type: OwnerType,
) -> Result<(), AgreementError> {
verify_reason(reason.as_ref())?;
let dao = self.db.as_dao::<AgreementDao>();
let mut agreement = match dao
.select(
agreement_id,
Some(id.identity.clone()),
.select_by_node(
agreement_id.clone(),
id.identity.clone(),
Utc::now().naive_utc(),
)
.await
.map_err(|e| AgreementError::Get(agreement_id.clone(), e))?
{
None => return Err(AgreementError::NotFound(agreement_id.clone())),
None => return Err(AgreementError::NotFound(agreement_id)),
Some(agreement) => agreement,
};
// from now on agreement_id is invalid. Use only agreement.id
// (which has valid owner)
expect_state(&agreement, AgreementState::Approved)?;
agreement.state = AgreementState::Terminated;
let owner_type = agreement.id.owner();
self.propagate_terminate_agreement(
&agreement,
id.identity.clone(),
match owner_type {
OwnerType::Requestor => agreement.provider_id,
OwnerType::Provider => agreement.requestor_id,
},
reason.clone(),
owner_type,
)
.await?;
dao.terminate(&agreement.id, reason, owner_type)
.await
.map_err(|e| AgreementError::Get(agreement.id.clone(), e))?;

Err(match agreement.state {
AgreementState::Proposal => AgreementStateError::Proposed(agreement.id),
AgreementState::Pending => AgreementStateError::Confirmed(agreement.id),
AgreementState::Cancelled => AgreementStateError::Cancelled(agreement.id),
AgreementState::Rejected => AgreementStateError::Rejected(agreement.id),
AgreementState::Approved => {
agreement.state = AgreementState::Terminated;
self.propagate_terminate_agreement(
&agreement,
id.identity.clone(),
agreement.provider_id,
reason.clone(),
owner_type,
)
.await?;
dao.terminate(agreement_id, reason, owner_type)
.await
.map_err(|e| AgreementError::Get(agreement_id.clone(), e))?;

counter!("market.agreements.requestor.terminated", 1);
log::info!(
"Requestor {} terminated Agreement [{}] and sent to Provider.",
&id.identity,
&agreement_id,
);
return Ok(());
}
AgreementState::Expired => AgreementStateError::Expired(agreement.id),
AgreementState::Terminated => AgreementStateError::Terminated(agreement.id),
})?
match owner_type {
OwnerType::Provider => counter!("market.agreements.provider.terminated", 1),
OwnerType::Requestor => counter!("market.agreements.requestor.terminated", 1),
};
log::info!(
"Requestor {} terminated Agreement [{}] and sent to Provider.",
&id.identity,
&agreement.id,
);
Ok(())
}
/// Sent to notify other side about termination.
pub async fn propagate_terminate_agreement(
Expand Down Expand Up @@ -400,32 +400,32 @@ impl CommonBroker {
msg: AgreementTerminated,
owner_type: OwnerType,
) -> Result<(), RemoteAgreementError> {
let err_msg = msg.clone();
let dao = self.db.as_dao::<AgreementDao>();
let agreement_id = msg.agreement_id.translate(owner_type);
let agreement = dao
.select(&msg.agreement_id, None, Utc::now().naive_utc())
.select(&agreement_id, None, Utc::now().naive_utc())
.await
.map_err(|_e| RemoteAgreementError::NotFound(msg.agreement_id.clone()))?
.ok_or(RemoteAgreementError::NotFound(msg.agreement_id.clone()))?;
.map_err(|_e| RemoteAgreementError::NotFound(agreement_id.clone()))?
.ok_or(RemoteAgreementError::NotFound(agreement_id.clone()))?;

match owner_type {
OwnerType::Requestor => {
if agreement.provider_id != caller {
// Don't reveal, that we know this Agreement id.
Err(RemoteAgreementError::NotFound(msg.agreement_id.clone()))?
Err(RemoteAgreementError::NotFound(agreement_id.clone()))?
}
}
OwnerType::Provider => {
if agreement.requestor_id != caller {
// Don't reveal, that we know this Agreement id.
Err(RemoteAgreementError::NotFound(msg.agreement_id.clone()))?
Err(RemoteAgreementError::NotFound(agreement_id.clone()))?
}
}
}

dao.terminate(&msg.agreement_id, msg.reason, owner_type)
dao.terminate(&agreement_id, msg.reason, owner_type)
.await
.map_err(|_e| RemoteAgreementError::InternalError(err_msg.agreement_id.clone()))?;
.map_err(|_e| RemoteAgreementError::InternalError(agreement_id.clone()))?;
Ok(())
}

Expand Down Expand Up @@ -595,3 +595,12 @@ pub fn expect_state(
AgreementState::Terminated => AgreementStateError::Terminated(agreement.id.clone()),
})?
}

fn verify_reason(reason: Option<&String>) -> Result<(), ReasonError> {
if let Some(s) = reason {
Reason::from_json_reason(JsonReason {
json: serde_json::from_str(s)?,
})?;
};
Ok(())
}
6 changes: 6 additions & 0 deletions core/market/src/negotiation/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,16 @@ pub enum AgreementError {
ProtocolApprove(#[from] ApproveAgreementError),
#[error("Protocol error while terminating: {0}")]
ProtocolTerminate(#[from] TerminateAgreementError),
#[error(transparent)]
ReasonError(#[from] ReasonError),
#[error("Internal error: {0}")]
Internal(String),
}

#[derive(Error, Debug)]
#[error("Reason parse error: {0}")]
pub struct ReasonError(#[from] pub serde_json::Error);

#[derive(Error, Debug)]
pub enum WaitForApprovalError {
#[error("Agreement [{0}] not found.")]
Expand Down
11 changes: 0 additions & 11 deletions core/market/src/negotiation/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,17 +230,6 @@ impl ProviderBroker {
}
return Ok(());
}

pub async fn terminate_agreement(
&self,
id: Identity,
agreement_id: &AgreementId,
reason: Option<String>,
) -> Result<(), AgreementError> {
self.common
.terminate_agreement(id, agreement_id, reason, OwnerType::Provider)
.await
}
}

// TODO: We need more elegant solution than this. This function still returns
Expand Down
11 changes: 0 additions & 11 deletions core/market/src/negotiation/requestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,17 +400,6 @@ impl RequestorBroker {
}
return Ok(());
}

pub async fn terminate_agreement(
&self,
id: Identity,
agreement_id: &AgreementId,
reason: Option<String>,
) -> Result<(), AgreementError> {
self.common
.terminate_agreement(id, agreement_id, reason, OwnerType::Requestor)
.await
}
}

async fn on_agreement_approved(
Expand Down
19 changes: 19 additions & 0 deletions core/market/src/rest_api/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub fn register_endpoints(scope: Scope) -> Scope {
scope
.service(collect_agreement_events)
.service(get_agreement)
.service(terminate_agreement)
}

#[actix_web::get("/agreements/{agreement_id}")]
Expand Down Expand Up @@ -69,3 +70,21 @@ async fn collect_agreement_events(
.log_err()
.map(|events| HttpResponse::Ok().json(events))
}

#[actix_web::post("/agreements/{agreement_id}/terminate")]
async fn terminate_agreement(
market: Data<Arc<MarketService>>,
path: Path<PathAgreement>,
id: Identity,
reason: Option<String>,
) -> impl Responder {
// We won't attach ourselves too much to owner type here. It will be replaced in CommonBroker
let agreement_id = path.into_inner().to_id(OwnerType::Requestor)?;
market
.requestor_engine
.common
.terminate_agreement(id, agreement_id, reason)
.await
.log_err()
.map(|_| HttpResponse::Ok().finish())
}
1 change: 1 addition & 0 deletions core/market/src/rest_api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl ResponseError for AgreementError {
| AgreementError::OwnProposal(..)
| AgreementError::ProposalNotFound(..)
| AgreementError::ProposalCountered(..)
| AgreementError::ReasonError(..)
| AgreementError::InvalidId(..) => HttpResponse::BadRequest().json(msg),
AgreementError::GetProposal(..)
| AgreementError::Save(..)
Expand Down
17 changes: 0 additions & 17 deletions core/market/src/rest_api/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub fn register_endpoints(scope: Scope) -> Scope {
.service(reject_proposal)
.service(approve_agreement)
.service(reject_agreement)
.service(terminate_agreement)
}

#[actix_web::post("/offers")]
Expand Down Expand Up @@ -164,19 +163,3 @@ async fn reject_agreement(
) -> HttpResponse {
HttpResponse::NotImplemented().finish()
}

#[actix_web::post("/agreements/{agreement_id}/terminate")]
async fn terminate_agreement(
market: Data<Arc<MarketService>>,
path: Path<PathAgreement>,
id: Identity,
reason: Option<String>,
) -> impl Responder {
let agreement_id = path.into_inner().to_id(OwnerType::Provider)?;
market
.provider_engine
.terminate_agreement(id, &agreement_id, reason)
.await
.log_err()
.map(|_| HttpResponse::Ok().finish())
}
17 changes: 0 additions & 17 deletions core/market/src/rest_api/requestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub fn register_endpoints(scope: Scope) -> Scope {
.service(confirm_agreement)
.service(wait_for_approval)
.service(cancel_agreement)
.service(terminate_agreement)
}

#[actix_web::post("/demands")]
Expand Down Expand Up @@ -201,19 +200,3 @@ async fn cancel_agreement(
) -> HttpResponse {
HttpResponse::NotImplemented().finish()
}

#[actix_web::post("/agreements/{agreement_id}/terminate")]
async fn terminate_agreement(
market: Data<Arc<MarketService>>,
path: Path<PathAgreement>,
id: Identity,
reason: Option<String>,
) -> impl Responder {
let agreement_id = path.into_inner().to_id(OwnerType::Requestor)?;
market
.requestor_engine
.terminate_agreement(id, &agreement_id, reason)
.await
.log_err()
.map(|_| HttpResponse::Ok().finish())
}
Loading

0 comments on commit 8682a4d

Please sign in to comment.