Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional tests and fixes to terminate agreement #849

Merged
merged 3 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/market/src/db/dao/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ fn terminate(

let event = NewAgreementEvent {
agreement_id: id.clone(),
reason: reason,
reason,
event_type: AgreementEventType::Terminated,
issuer: owner_type,
};
Expand Down
2 changes: 1 addition & 1 deletion core/market/src/db/model/agreement_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl AgreementEvent {
let event_date = DateTime::<Utc>::from_utc(self.timestamp, Utc);
let reason = self
.reason
.map(|reason| serde_json::from_str::<JsonReason>(&reason))
.map(|reason| serde_json::from_str(&reason).map(|value| JsonReason {json: value}))
.map(|result| result.map_err(|e| {
log::warn!(
"Agreement Event with not parsable Reason in database. Error: {}. Shouldn't happen \
Expand Down
13 changes: 13 additions & 0 deletions core/market/src/market.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,19 @@ impl MarketService {
.map(|event| event.into_client())
.collect())
}

pub async fn terminate_agreement(
&self,
id: Identity,
agreement_id: AgreementId,
reason: Option<String>,
) -> Result<(), AgreementError> {
// We won't attach ourselves too much to owner type here. It will be replaced in CommonBroker
self.requestor_engine
.common
.terminate_agreement(id, agreement_id, reason)
.await
}
}

impl Service for MarketService {
Expand Down
19 changes: 11 additions & 8 deletions core/market/src/negotiation/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use ya_client::model::market::proposal::Proposal as ClientProposal;
use ya_client::model::market::reason::{ConvertReason, JsonReason, Reason};
use ya_client::model::market::reason::Reason;
use ya_client::model::market::NewProposal;
use ya_client::model::NodeId;
use ya_core_model::market::BUS_ID;
Expand Down Expand Up @@ -429,7 +429,13 @@ impl CommonBroker {
}
}

dao.terminate(&agreement_id, msg.reason, owner_type)
// Opposite side terminated.
let terminator = match owner_type {
OwnerType::Provider => OwnerType::Requestor,
OwnerType::Requestor => OwnerType::Provider,
};

dao.terminate(&agreement_id, msg.reason, terminator)
.await
.map_err(|e| {
log::info!(
Expand Down Expand Up @@ -610,10 +616,7 @@ pub fn expect_state(
}

fn verify_reason(reason: Option<&String>) -> Result<(), ReasonError> {
if let Some(s) = reason {
Reason::from_json_reason(JsonReason {
json: serde_json::from_str(s)?,
})?;
};
Ok(())
Ok(if let Some(s) = reason {
serde_json::from_str::<Reason>(s)?;
})
}
4 changes: 1 addition & 3 deletions core/market/src/rest_api/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ async fn terminate_agreement(
let agreement_id = path.into_inner().to_id(OwnerType::Requestor)?;
log::debug!("Calling common. qry: {:?}, id: {:?}", query, id); // XXX
market
.requestor_engine
.common
.terminate_agreement(id, agreement_id, query.reason.clone())
.terminate_agreement(id, agreement_id, query.into_inner().reason)
.await
.log_err()
.map(|_| HttpResponse::Ok().finish())
Expand Down
41 changes: 35 additions & 6 deletions core/market/src/testing/agreement_utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use chrono::{DateTime, Duration, Utc};

use crate::db::model::AgreementId;

use crate::testing::proposal_util::{exchange_proposals_exclusive, NegotiationHelper};
use crate::testing::proposal_util::{exchange_proposals_exclusive_with_ids, NegotiationHelper};
use crate::testing::MarketsNetwork;
use crate::testing::OwnerType;

use ya_client::model::market::Reason;
use ya_service_api_web::middleware::Identity;

pub struct NegotiationAgreementHelper {
pub negotiation: NegotiationHelper,
pub p_agreement: AgreementId,
Expand All @@ -21,13 +23,32 @@ pub async fn negotiate_agreement(
r_session: &str,
p_session: &str,
) -> Result<NegotiationAgreementHelper, anyhow::Error> {
let req_mkt = network.get_market(req_name);
let prov_mkt = network.get_market(prov_name);

let req_id = network.get_default_id(req_name);
let prov_id = network.get_default_id(prov_name);

let negotiation = exchange_proposals_exclusive(network, req_name, prov_name, match_on).await?;
negotiate_agreement_with_ids(
network, req_name, prov_name, match_on, r_session, p_session, &req_id, &prov_id,
)
.await
}

pub async fn negotiate_agreement_with_ids(
network: &MarketsNetwork,
req_name: &str,
prov_name: &str,
match_on: &str,
r_session: &str,
p_session: &str,
req_id: &Identity,
prov_id: &Identity,
) -> Result<NegotiationAgreementHelper, anyhow::Error> {
let req_mkt = network.get_market(req_name);
let prov_mkt = network.get_market(prov_name);

let negotiation = exchange_proposals_exclusive_with_ids(
network, req_name, prov_name, match_on, req_id, prov_id,
)
.await?;

let r_agreement = req_mkt
.requestor_engine
Expand Down Expand Up @@ -67,3 +88,11 @@ pub async fn negotiate_agreement(
confirm_timestamp,
})
}

pub fn gen_reason(message: &str) -> String {
let reason = Reason {
message: message.to_string(),
extra: Default::default(),
};
serde_json::to_string(&reason).unwrap()
}
50 changes: 44 additions & 6 deletions core/market/src/testing/mock_identity.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,71 @@
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use crate::identity::{IdentityApi, IdentityError};

use std::collections::HashMap;
use ya_client::model::NodeId;
use ya_service_api_web::middleware::Identity;

pub struct MockIdentity {
inner: Arc<Mutex<MockIdentityInner>>,
}

struct MockIdentityInner {
pub default: Identity,
pub identities: HashMap<String, Identity>,
}

#[async_trait::async_trait(?Send)]
impl IdentityApi for MockIdentity {
async fn default_identity(&self) -> Result<NodeId, IdentityError> {
Ok(self.default.identity.clone())
Ok(self.get_default_id().identity)
}

async fn list(&self) -> Result<Vec<NodeId>, IdentityError> {
Ok(vec![self.default.identity.clone()])
Ok(self
.list_ids()
.into_iter()
.map(|(_, id)| id.identity)
.collect())
}
}

impl MockIdentity {
pub fn new(name: &str) -> Arc<MockIdentity> {
let mock_identity = MockIdentity {
default: generate_identity(name),
let default = generate_identity(name);
let mut identities = HashMap::new();
identities
.entry(name.to_string())
.or_insert(default.clone());

let mock_identity = MockIdentityInner {
default,
identities,
};
Arc::new(mock_identity)

Arc::new(MockIdentity {
inner: Arc::new(Mutex::new(mock_identity)),
})
}
pub fn new_identity(&self, name: &str) -> Identity {
let new_id = generate_identity(name);
self.inner
.lock()
.unwrap()
.identities
.entry(name.to_string())
.or_insert(new_id)
.clone()
}

pub fn get_default_id(&self) -> Identity {
self.inner.lock().unwrap().default.clone()
}

pub fn list_ids(&self) -> HashMap<String, Identity> {
self.inner.lock().unwrap().identities.clone()
}
}

Expand Down
43 changes: 35 additions & 8 deletions core/market/src/testing/mock_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use actix_http::{body::Body, Request};
use actix_service::Service as ActixService;
use actix_web::{dev::ServiceResponse, test, App};
use anyhow::{anyhow, bail, Context, Result};
use std::collections::HashMap;
use std::{fs, path::PathBuf, sync::Arc, time::Duration};

use ya_client::model::market::RequestorEvent;
Expand Down Expand Up @@ -145,7 +146,7 @@ impl MarketsNetwork {
kind: node_kind,
};

let node_id = node.mock_identity.default.clone().identity;
let node_id = node.mock_identity.get_default_id().clone().identity;
log::info!("Creating mock node {}: [{}].", name, &node_id);
BCastService::default().register(&node_id, &self.test_name);
MockNet::default().register_node(&node_id, &public_gsb_prefix);
Expand All @@ -155,15 +156,17 @@ impl MarketsNetwork {
}

pub fn break_networking_for(&self, node_name: &str) -> Result<()> {
let id = self.get_default_id(node_name);
MockNet::default().unregister_node(&id.identity)
for (_, id) in self.list_ids(node_name) {
MockNet::default().unregister_node(&id.identity)?
}
Ok(())
}

pub fn enable_networking_for(&self, node_name: &str) -> Result<()> {
let id = self.get_default_id(node_name);
let (public_gsb_prefix, _) = gsb_prefixes(&self.test_name, node_name);

MockNet::default().register_node(&id.identity, &public_gsb_prefix);
for (_, id) in self.list_ids(node_name) {
let (public_gsb_prefix, _) = gsb_prefixes(&self.test_name, node_name);
MockNet::default().register_node(&id.identity, &public_gsb_prefix);
}
Ok(())
}

Expand Down Expand Up @@ -385,10 +388,34 @@ impl MarketsNetwork {
.find(|node| &node.name == node_name)
.map(|node| node.mock_identity.clone())
.unwrap()
.default
.get_default_id()
.clone()
}

pub fn create_identity(&self, node_name: &str, id_name: &str) -> Identity {
let mock_identity = self
.nodes
.iter()
.find(|node| &node.name == node_name)
.map(|node| node.mock_identity.clone())
.unwrap();
let id = mock_identity.new_identity(id_name);

let node_id = id.identity.clone();
let (public_gsb_prefix, _) = gsb_prefixes(&self.test_name, node_name);

MockNet::default().register_node(&node_id, &public_gsb_prefix);
return id;
}

pub fn list_ids(&self, node_name: &str) -> HashMap<String, Identity> {
self.nodes
.iter()
.find(|node| &node.name == node_name)
.map(|node| node.mock_identity.list_ids())
.unwrap()
}

pub async fn get_rest_app(
&self,
node_name: &str,
Expand Down
36 changes: 33 additions & 3 deletions core/market/src/testing/proposal_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::testing::OwnerType;

use ya_client::model::market::{NewDemand, NewOffer};
use ya_client::model::NodeId;
use ya_service_api_web::middleware::Identity;

pub fn generate_proposal(
unifier: i64,
Expand Down Expand Up @@ -64,12 +65,37 @@ pub async fn exchange_draft_proposals(
req_name: &str,
prov_name: &str,
) -> Result<NegotiationHelper, anyhow::Error> {
let req_id = network.get_default_id(req_name);
let prov_id = network.get_default_id(prov_name);

exchange_proposals_impl(
network,
req_name,
prov_name,
&sample_offer(),
&sample_demand(),
&req_id,
&prov_id,
)
.await
}

pub async fn exchange_proposals_exclusive_with_ids(
network: &MarketsNetwork,
req_name: &str,
prov_name: &str,
match_on: &str,
req_id: &Identity,
prov_id: &Identity,
) -> Result<NegotiationHelper, anyhow::Error> {
exchange_proposals_impl(
network,
req_name,
prov_name,
&exclusive_offer(match_on),
&exclusive_demand(match_on),
&req_id,
&prov_id,
)
.await
}
Expand All @@ -80,12 +106,17 @@ pub async fn exchange_proposals_exclusive(
prov_name: &str,
match_on: &str,
) -> Result<NegotiationHelper, anyhow::Error> {
let req_id = network.get_default_id(req_name);
let prov_id = network.get_default_id(prov_name);

exchange_proposals_impl(
network,
req_name,
prov_name,
&exclusive_offer(match_on),
&exclusive_demand(match_on),
&req_id,
&prov_id,
)
.await
}
Expand All @@ -96,13 +127,12 @@ pub async fn exchange_proposals_impl(
prov_name: &str,
offer: &NewOffer,
demand: &NewDemand,
req_id: &Identity,
prov_id: &Identity,
) -> Result<NegotiationHelper, anyhow::Error> {
let req_mkt = network.get_market(req_name);
let prov_mkt = network.get_market(prov_name);

let req_id = network.get_default_id(req_name);
let prov_id = network.get_default_id(prov_name);

let demand_id = req_mkt.subscribe_demand(demand, &req_id).await?;
let offer_id = prov_mkt.subscribe_offer(offer, &prov_id).await?;

Expand Down
Loading