Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Market GSB GetAgreement with Role #864

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions core/activity/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn web_scope(db: &DbExecutor) -> Scope {
mod common {
use actix_web::{web, Responder};

use ya_core_model::activity;
use ya_core_model::{activity, market::Role};
use ya_persistence::executor::DbExecutor;
use ya_service_api_web::middleware::Identity;
use ya_service_bus::{timeout::IntoTimeoutFuture, RpcEndpoint};
Expand Down Expand Up @@ -49,7 +49,7 @@ mod common {
log::debug!("get_activity_state_web");

// check if caller is the Provider
if authorize_activity_executor(&db, id.identity, &path.activity_id)
if authorize_activity_executor(&db, id.identity, &path.activity_id, Role::Provider)
.await
.is_ok()
{
Expand All @@ -62,7 +62,7 @@ mod common {
log::trace!("get_activity_state_web: Not provider, maybe requestor?");

// check if caller is the Requestor
authorize_activity_initiator(&db, id.identity, &path.activity_id).await?;
authorize_activity_initiator(&db, id.identity, &path.activity_id, Role::Requestor).await?;

log::trace!("get_activity_state_web: I'm the requestor");

Expand All @@ -74,7 +74,7 @@ mod common {
}

// Retrieve and persist activity state
let agreement = get_activity_agreement(&db, &path.activity_id).await?;
let agreement = get_activity_agreement(&db, &path.activity_id, Role::Requestor).await?;
let provider_service = agreement_provider_service(&id, &agreement)?;
let state = provider_service
.send(activity::GetState {
Expand All @@ -97,7 +97,7 @@ mod common {
id: Identity,
) -> impl Responder {
// check if caller is the Provider
if authorize_activity_executor(&db, id.identity, &path.activity_id)
if authorize_activity_executor(&db, id.identity, &path.activity_id, Role::Provider)
.await
.is_ok()
{
Expand All @@ -107,7 +107,7 @@ mod common {
}

// check if caller is the Requestor
authorize_activity_initiator(&db, id.identity, &path.activity_id).await?;
authorize_activity_initiator(&db, id.identity, &path.activity_id, Role::Requestor).await?;

// Return locally persisted usage if activity has been already terminated or terminating
if get_persisted_state(&db, &path.activity_id).await?.alive() {
Expand All @@ -117,7 +117,7 @@ mod common {
}

// Retrieve and persist activity usage
let agreement = get_activity_agreement(&db, &path.activity_id).await?;
let agreement = get_activity_agreement(&db, &path.activity_id, Role::Requestor).await?;
let provider_service = agreement_provider_service(&id, &agreement)?;
let usage = provider_service
.send(activity::GetUsage {
Expand Down
27 changes: 18 additions & 9 deletions core/activity/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,15 @@ pub(crate) async fn set_persisted_usage(
.await?)
}

pub(crate) async fn get_agreement(agreement_id: impl ToString) -> Result<Agreement, Error> {
pub(crate) async fn get_agreement(
agreement_id: impl ToString,
role: market::Role,
) -> Result<Agreement, Error> {
Ok(bus::service(market::BUS_ID)
.send(market::GetAgreement {
agreement_id: agreement_id.to_string(),
})
.send(market::GetAgreement::as_role(
agreement_id.to_string(),
role,
))
.await??)
}

Expand All @@ -126,40 +130,45 @@ pub(crate) async fn get_agreement_id(db: &DbExecutor, activity_id: &str) -> Resu
pub(crate) async fn get_activity_agreement(
db: &DbExecutor,
activity_id: &str,
role: market::Role,
) -> Result<Agreement, Error> {
get_agreement(get_agreement_id(db, activity_id).await?).await
get_agreement(get_agreement_id(db, activity_id).await?, role).await
}

pub(crate) async fn authorize_activity_initiator(
db: &DbExecutor,
caller: impl ToString,
activity_id: &str,
role: market::Role,
) -> Result<(), Error> {
authorize_agreement_initiator(caller, &get_agreement_id(db, activity_id).await?).await
authorize_agreement_initiator(caller, &get_agreement_id(db, activity_id).await?, role).await
}

pub(crate) async fn authorize_activity_executor(
db: &DbExecutor,
caller: impl ToString,
activity_id: &str,
role: market::Role,
) -> Result<(), Error> {
authorize_agreement_executor(caller, &get_agreement_id(db, activity_id).await?).await
authorize_agreement_executor(caller, &get_agreement_id(db, activity_id).await?, role).await
}

pub(crate) async fn authorize_agreement_initiator(
caller: impl ToString,
agreement_id: &str,
role: market::Role,
) -> Result<(), Error> {
let agreement = get_agreement(agreement_id).await?;
let agreement = get_agreement(agreement_id, role).await?;

authorize_caller(&caller.to_string().parse()?, agreement.requestor_id())
}

pub(crate) async fn authorize_agreement_executor(
caller: impl ToString,
agreement_id: &str,
role: market::Role,
) -> Result<(), Error> {
let agreement = get_agreement(agreement_id).await?;
let agreement = get_agreement(agreement_id, role).await?;

authorize_caller(&caller.to_string().parse()?, agreement.provider_id())
}
Expand Down
3 changes: 2 additions & 1 deletion core/activity/src/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use actix_web::{web, Responder};

use ya_client_model::activity::{ActivityState, ProviderEvent};
use ya_core_model::market::Role;
use ya_persistence::executor::DbExecutor;
use ya_service_api_web::middleware::Identity;
use ya_service_bus::timeout::IntoTimeoutFuture;
Expand All @@ -28,7 +29,7 @@ async fn set_activity_state_web(
id: Identity,
) -> impl Responder {
log::debug!("set_activity_state_web {:?}", state);
authorize_activity_executor(&db, id.identity, &path.activity_id).await?;
authorize_activity_executor(&db, id.identity, &path.activity_id, Role::Provider).await?;

set_persisted_state(&db, &path.activity_id, state.into_inner())
.await
Expand Down
19 changes: 10 additions & 9 deletions core/activity/src/provider/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::convert::From;
use std::time::Duration;

use ya_client_model::activity::{ActivityState, ActivityUsage, State, StatePair};
use ya_client_model::NodeId;
use ya_core_model::activity;
use ya_core_model::activity::local::Credentials;
use ya_core_model::market::Role;
use ya_persistence::executor::DbExecutor;
use ya_service_bus::{timeout::*, typed::ServiceBinder};

Expand All @@ -19,8 +22,6 @@ use crate::common::{
use crate::dao::*;
use crate::db::models::ActivityEventType;
use crate::error::Error;
use ya_client_model::NodeId;
use ya_core_model::activity::local::Credentials;

const INACTIVITY_LIMIT_SECONDS_ENV_VAR: &str = "INACTIVITY_LIMIT_SECONDS";
const UNRESPONSIVE_LIMIT_SECONDS_ENV_VAR: &str = "UNRESPONSIVE_LIMIT_SECONDS";
Expand Down Expand Up @@ -78,10 +79,10 @@ async fn create_activity_gsb(
caller: String,
msg: activity::Create,
) -> RpcMessageResult<activity::Create> {
authorize_agreement_initiator(caller, &msg.agreement_id).await?;
authorize_agreement_initiator(caller, &msg.agreement_id, Role::Provider).await?;

let activity_id = generate_id();
let agreement = get_agreement(&msg.agreement_id).await?;
let agreement = get_agreement(&msg.agreement_id, Role::Provider).await?;
let provider_id = agreement.provider_id().clone();

db.as_dao::<ActivityDao>()
Expand Down Expand Up @@ -173,13 +174,13 @@ async fn destroy_activity_gsb(
caller: String,
msg: activity::Destroy,
) -> RpcMessageResult<activity::Destroy> {
authorize_activity_initiator(&db, caller, &msg.activity_id).await?;
authorize_activity_initiator(&db, caller, &msg.activity_id, Role::Provider).await?;

if !get_persisted_state(&db, &msg.activity_id).await?.alive() {
return Ok(());
}

let agreement = get_agreement(&msg.agreement_id).await?;
let agreement = get_agreement(&msg.agreement_id, Role::Provider).await?;
db.as_dao::<EventDao>()
.create(
&msg.activity_id,
Expand Down Expand Up @@ -211,7 +212,7 @@ async fn get_activity_state_gsb(
caller: String,
msg: activity::GetState,
) -> RpcMessageResult<activity::GetState> {
authorize_activity_initiator(&db, caller, &msg.activity_id).await?;
authorize_activity_initiator(&db, caller, &msg.activity_id, Role::Provider).await?;

Ok(get_persisted_state(&db, &msg.activity_id).await?)
}
Expand All @@ -221,7 +222,7 @@ async fn get_activity_usage_gsb(
caller: String,
msg: activity::GetUsage,
) -> RpcMessageResult<activity::GetUsage> {
authorize_activity_initiator(&db, caller, &msg.activity_id).await?;
authorize_activity_initiator(&db, caller, &msg.activity_id, Role::Provider).await?;

Ok(get_persisted_usage(&db, &msg.activity_id).await?)
}
Expand Down Expand Up @@ -401,7 +402,7 @@ mod local {
_caller: String,
msg: activity::local::GetAgreementId,
) -> RpcMessageResult<activity::local::GetAgreementId> {
let agreement = get_activity_agreement(&db, &msg.activity_id).await?;
let agreement = get_activity_agreement(&db, &msg.activity_id, msg.role).await?;
Ok(agreement.agreement_id)
}
}
21 changes: 11 additions & 10 deletions core/activity/src/requestor/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use ya_client_model::activity::{
};
use ya_client_model::market::Agreement;
use ya_core_model::activity;
use ya_core_model::market::Role;
use ya_net::{self as net, RemoteEndpoint};
use ya_persistence::executor::DbExecutor;
use ya_service_api_web::middleware::Identity;
Expand Down Expand Up @@ -79,9 +80,9 @@ async fn create_activity(
id: Identity,
) -> impl Responder {
let agreement_id = body.agreement_id();
authorize_agreement_initiator(id.identity, agreement_id).await?;
authorize_agreement_initiator(id.identity, agreement_id, Role::Requestor).await?;

let agreement = get_agreement(&agreement_id).await?;
let agreement = get_agreement(&agreement_id, Role::Requestor).await?;
log::debug!("agreement: {:#?}", agreement);

let provider_id = agreement.provider_id().clone();
Expand Down Expand Up @@ -124,9 +125,9 @@ async fn destroy_activity(
query: web::Query<QueryTimeout>,
id: Identity,
) -> impl Responder {
authorize_activity_initiator(&db, id.identity, &path.activity_id).await?;
authorize_activity_initiator(&db, id.identity, &path.activity_id, Role::Requestor).await?;

let agreement = get_activity_agreement(&db, &path.activity_id).await?;
let agreement = get_activity_agreement(&db, &path.activity_id, Role::Requestor).await?;
let msg = activity::Destroy {
activity_id: path.activity_id.to_string(),
agreement_id: agreement.agreement_id.clone(),
Expand Down Expand Up @@ -162,11 +163,11 @@ async fn exec(
body: web::Json<ExeScriptRequest>,
id: Identity,
) -> impl Responder {
authorize_activity_initiator(&db, id.identity, &path.activity_id).await?;
authorize_activity_initiator(&db, id.identity, &path.activity_id, Role::Requestor).await?;

let commands: Vec<ExeScriptCommand> =
serde_json::from_str(&body.text).map_err(|e| Error::BadRequest(format!("{:?}", e)))?;
let agreement = get_activity_agreement(&db, &path.activity_id).await?;
let agreement = get_activity_agreement(&db, &path.activity_id, Role::Requestor).await?;
let batch_id = generate_id();
let msg = activity::Exec {
activity_id: path.activity_id.clone(),
Expand Down Expand Up @@ -195,8 +196,8 @@ async fn get_batch_results(
id: Identity,
request: HttpRequest,
) -> Result<impl Responder> {
authorize_activity_initiator(&db, id.identity, &path.activity_id).await?;
let agreement = get_activity_agreement(&db, &path.activity_id).await?;
authorize_activity_initiator(&db, id.identity, &path.activity_id, Role::Requestor).await?;
let agreement = get_activity_agreement(&db, &path.activity_id, Role::Requestor).await?;

if let Some(value) = request.headers().get(header::ACCEPT) {
if value.eq(mime::TEXT_EVENT_STREAM.essence_str()) {
Expand Down Expand Up @@ -311,7 +312,7 @@ async fn encrypted(
mut body: web::Payload,
id: Identity,
) -> impl Responder {
authorize_activity_initiator(&db, id.identity, &path.activity_id).await?;
authorize_activity_initiator(&db, id.identity, &path.activity_id, Role::Requestor).await?;

let mut bytes = web::BytesMut::new();
while let Some(item) = body.next().await {
Expand All @@ -320,7 +321,7 @@ async fn encrypted(
);
}

let agreement = get_activity_agreement(&db, &path.activity_id).await?;
let agreement = get_activity_agreement(&db, &path.activity_id, Role::Requestor).await?;
let msg = activity::sgx::CallEncryptedService {
activity_id: path.activity_id.clone(),
sender: id.identity,
Expand Down
5 changes: 3 additions & 2 deletions core/activity/src/requestor/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use actix_web::{web, Responder};

use ya_core_model::activity;
use ya_core_model::market::Role;
use ya_persistence::executor::DbExecutor;
use ya_service_api_web::middleware::Identity;
use ya_service_bus::{timeout::IntoTimeoutFuture, RpcEndpoint};
Expand All @@ -23,9 +24,9 @@ async fn get_running_command(
query: web::Query<QueryTimeout>,
id: Identity,
) -> impl Responder {
authorize_activity_initiator(&db, id.identity, &path.activity_id).await?;
authorize_activity_initiator(&db, id.identity, &path.activity_id, Role::Requestor).await?;

let agreement = get_activity_agreement(&db, &path.activity_id).await?;
let agreement = get_activity_agreement(&db, &path.activity_id, Role::Requestor).await?;
let msg = activity::GetRunningCommand {
activity_id: path.activity_id.to_string(),
timeout: query.timeout.clone(),
Expand Down
27 changes: 12 additions & 15 deletions core/market/src/market/agreement.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use chrono;

use ya_client::model::market::Agreement as ClientAgreement;
use ya_core_model::market::{GetAgreement, RpcMessageError};
use ya_core_model::market::{GetAgreement, Role, RpcMessageError};
use ya_persistence::executor::DbExecutor;
use ya_service_bus::typed::ServiceBinder;

use crate::db::dao::AgreementDao;
use crate::db::model::{AgreementId, OwnerType};

pub async fn bind_gsb(db: DbExecutor, public_prefix: &str, _local_prefix: &str) {
log::debug!("Binding market agreement public service to service bus");
log::trace!("Binding market agreement public service to service bus");
ServiceBinder::new(public_prefix, &db, ()).bind(get_agreement);
log::debug!("Successfully bound market agreement public service to service bus");
}
Expand All @@ -19,26 +19,23 @@ async fn get_agreement(
_sender_id: String,
msg: GetAgreement,
) -> Result<ClientAgreement, RpcMessageError> {
// On GSB we don't know if Provider or Requestor is calling, so we will try both versions.
let agreement_id = AgreementId::from_client(&msg.agreement_id, OwnerType::Provider)
let owner = match msg.role {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impl From

Role::Provider => OwnerType::Provider,
Role::Requestor => OwnerType::Requestor,
};

let agreement_id = AgreementId::from_client(&msg.agreement_id, owner)
.map_err(|e| RpcMessageError::Market(e.to_string()))?;

// TODO: We should check Agreement owner, like in REST get_agreement implementation, but
// I'm not sure we can trust `sender_id` value from gsb now.
let dao = db.as_dao::<AgreementDao>();
let now = chrono::Utc::now().naive_utc();
Ok(match dao
Ok(dao
.select(&agreement_id, None, now)
.await
.map_err(|e| RpcMessageError::Market(e.to_string()))?
{
None => dao
.select(&agreement_id.swap_owner(), None, now)
.await
.map_err(|e| RpcMessageError::Market(e.to_string()))?,
Some(agreement) => Some(agreement),
}
.ok_or(RpcMessageError::NotFound(msg.agreement_id.clone()))?
.into_client()
.map_err(|e| RpcMessageError::Market(e.to_string()))?)
.ok_or(RpcMessageError::NotFound(msg.agreement_id.clone()))?
.into_client()
.map_err(|e| RpcMessageError::Market(e.to_string()))?)
}
7 changes: 4 additions & 3 deletions core/market/tests/test_agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ async fn test_gsb_get_agreement() -> Result<()> {
let agreement_id = req_engine
.create_agreement(req_id.clone(), &proposal_id, Utc::now())
.await?;

let agreement = bus::service(network.node_gsb_prefixes(REQ_NAME).0)
.send(market::GetAgreement {
agreement_id: agreement_id.into_client(),
})
.send(market::GetAgreement::as_requestor(
agreement_id.into_client(),
))
.await??;
assert_eq!(agreement.agreement_id, agreement_id.into_client());
assert_eq!(agreement.demand.requestor_id, req_id.identity);
Expand Down
Loading