Skip to content

Commit

Permalink
Merge pull request #1036 from golemfactory/release/v0.6-cherrypicked-…
Browse files Browse the repository at this point in the history
…ordering

Release/v0.6 cherrypicked ordering
  • Loading branch information
jiivan authored Feb 10, 2021
2 parents aeb1c9b + e59a01c commit 483c832
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 5 deletions.
29 changes: 24 additions & 5 deletions core/market/src/db/dao/negotiation_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ya_persistence::executor::{do_with_transaction, AsDao, PoolType};
use crate::db::dao::demand::{demand_status, DemandState};
use crate::db::dao::offer::{query_state, OfferState};
use crate::db::dao::sql_functions::datetime;
use crate::db::model::{Agreement, MarketEvent, Owner, Proposal, SubscriptionId};
use crate::db::model::{Agreement, EventType, MarketEvent, Owner, Proposal, SubscriptionId};
use crate::db::schema::market_negotiation_event::dsl;
use crate::db::{DbError, DbResult};
use crate::market::EnvConfig;
Expand Down Expand Up @@ -89,14 +89,33 @@ impl<'c> NegotiationEventsDao<'c> {
// Check subscription wasn't unsubscribed or expired.
validate_subscription(conn, &subscription_id, owner)?;

// TODO: Only ProposalEvents should be in random order.
// Only ProposalEvents should be in random order.
// AgreementEvent and rejections events should be sorted with higher
// priority.
let events = dsl::market_negotiation_event
.filter(dsl::subscription_id.eq(&subscription_id))
.order_by(sql::<sql_types::Bool>("RANDOM()"))
let basic_query =
dsl::market_negotiation_event.filter(dsl::subscription_id.eq(&subscription_id));
let mut events = basic_query
.clone()
.filter(dsl::event_type.ne_all(vec![
EventType::ProviderNewProposal,
EventType::RequestorNewProposal,
]))
.order_by(dsl::timestamp.asc())
.limit(max_events as i64)
.load::<MarketEvent>(conn)?;
if (events.len() as i32) < max_events {
let limit_left: i32 = max_events - (events.len() as i32);
let proposal_events = basic_query
.filter(dsl::event_type.eq_any(vec![
EventType::ProviderNewProposal,
EventType::RequestorNewProposal,
]))
.order_by(sql::<sql_types::Bool>("RANDOM()"))
.limit(limit_left as i64)
.load::<MarketEvent>(conn)?;

events.extend(proposal_events.into_iter());
}

// Remove returned events from queue.
if !events.is_empty() {
Expand Down
76 changes: 76 additions & 0 deletions core/market/tests/test_negotiations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,3 +867,79 @@ async fn test_reject_demand() {
.unwrap();
assert_eq!(proposal0updated.body.state, ProposalState::Rejected);
}

// Events with proposals should come last
#[cfg_attr(not(feature = "test-suite"), ignore)]
#[serial_test::serial]
async fn test_proposal_events_last() {
let network = MarketsNetwork::new(None)
.await
.add_market_instance("Node-1")
.await
.add_market_instance("Node-2")
.await
.add_market_instance("Node-3")
.await;

let market1 = network.get_market("Node-1");
let market2 = network.get_market("Node-2");
let market3 = network.get_market("Node-3");

let identity1 = network.get_default_id("Node-1");
let identity2 = network.get_default_id("Node-2");
let identity3 = network.get_default_id("Node-3");

let demand_id = market1
.subscribe_demand(&sample_demand(), &identity1)
.await
.unwrap();

let offer2_id = market2
.subscribe_offer(&sample_offer(), &identity2)
.await
.unwrap();

// REQUESTOR side.
let proposal0 = requestor::query_proposal(&market1, &demand_id, "Initial #R")
.await
.unwrap();
let proposal0_id = proposal0.get_proposal_id().unwrap();

// Counter proposal
let proposal1 = sample_demand();
market1
.requestor_engine
.counter_proposal(&demand_id, &proposal0_id, &proposal1, &identity1)
.await
.unwrap();

market3
.subscribe_offer(&sample_offer(), &identity3)
.await
.unwrap();

let proposal2 = provider::query_proposal(&market2, &offer2_id, "Initial #P")
.await
.unwrap();
let proposal2_id = proposal2.get_proposal_id().unwrap();
market2
.provider_engine
.reject_proposal(&offer2_id, &proposal2_id, &identity2, None)
.await
.unwrap();

let events = market1
.requestor_engine
.query_events(&demand_id, 3.0, Some(5))
.await
.unwrap();
assert_eq!(events.len(), 2);
match events[0] {
RequestorEvent::ProposalRejectedEvent { .. } => {}
_ => assert!(false, format!("Invalid first event_type: {:#?}", events[0])),
}
match events[events.len() - 1] {
RequestorEvent::ProposalEvent { .. } => {}
_ => assert!(false, format!("Invalid last event_type: {:#?}", events[0])),
}
}

0 comments on commit 483c832

Please sign in to comment.