Skip to content

Commit

Permalink
Merge pull request #1015 from golemfactory/market/approval-ack
Browse files Browse the repository at this point in the history
Market -- approval ack
  • Loading branch information
tworec authored Feb 8, 2021
2 parents 2326917 + 04c6e65 commit 5e472b9
Show file tree
Hide file tree
Showing 26 changed files with 768 additions and 158 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions agent/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ ya-file-logging = "0.1"
ya-utils-actix = "0.1"
ya-utils-path = "0.1"
ya-utils-process = { version = "0.1", features = ['lock'] }
ya-std-utils = "0.1"

actix = { version = "0.10", default-features = false }
actix-rt = "1.1.1"
Expand Down
6 changes: 3 additions & 3 deletions agent/provider/src/execution/task_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use ya_utils_process::ExeUnitExitStatus;

use super::exeunits_registry::{ExeUnitDesc, ExeUnitsRegistry};
use super::task::Task;
use crate::market::provider_market::AgreementApproved;
use crate::market::provider_market::NewAgreement;
use crate::market::Preset;
use crate::tasks::{AgreementBroken, AgreementClosed};

Expand Down Expand Up @@ -360,7 +360,7 @@ impl TaskRunner {
#[logfn_inputs(Debug, fmt = "{}Got {:?} {:?}")]
pub fn on_agreement_approved(
&mut self,
msg: AgreementApproved,
msg: NewAgreement,
_ctx: &mut Context<Self>,
) -> Result<()> {
// Agreement waits for first create activity event.
Expand Down Expand Up @@ -538,7 +538,7 @@ impl Actor for TaskRunner {
type Context = Context<Self>;
}

forward_actix_handler!(TaskRunner, AgreementApproved, on_agreement_approved);
forward_actix_handler!(TaskRunner, NewAgreement, on_agreement_approved);
forward_actix_handler!(TaskRunner, ExeUnitProcessFinished, on_exeunit_exited);
forward_actix_handler!(TaskRunner, GetExeUnit, get_exeunit);
actix_signal_handler!(TaskRunner, ActivityCreated, activity_created);
Expand Down
108 changes: 55 additions & 53 deletions agent/provider/src/market/provider_market.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use ya_client_model::market::{
agreement_event::AgreementTerminator, Agreement, NewOffer, Proposal, ProviderEvent, Reason,
};
use ya_client_model::NodeId;
use ya_std_utils::LogErr;
use ya_utils_actix::{
actix_handler::ResultTypeGetter,
actix_signal::{SignalSlot, Subscribe},
Expand All @@ -30,6 +31,7 @@ use super::negotiator::{AgreementResponse, AgreementResult, NegotiatorAddr, Prop
use super::Preset;
use crate::market::config::MarketConfig;
use crate::market::termination_reason::GolemReason;
use crate::tasks::task_manager::ClosingCause;
use crate::tasks::{AgreementBroken, AgreementClosed, CloseAgreement};

// =========================================== //
Expand Down Expand Up @@ -62,7 +64,7 @@ pub enum OfferKind {
/// and broadcasts same event to external world.
#[derive(Clone, Debug, Message)]
#[rtype(result = "Result<()>")]
pub struct AgreementApproved {
pub struct NewAgreement {
pub agreement: AgreementView,
}

Expand Down Expand Up @@ -108,7 +110,7 @@ pub struct ProviderMarket {
config: Arc<MarketConfig>,

/// External actors can listen on this signal.
pub agreement_signed_signal: SignalSlot<AgreementApproved>,
pub agreement_signed_signal: SignalSlot<NewAgreement>,
pub agreement_terminated_signal: SignalSlot<CloseAgreement>,

/// Infinite tasks requiring to be killed on shutdown.
Expand All @@ -134,7 +136,7 @@ impl ProviderMarket {
negotiator: Arc::new(NegotiatorAddr::default()),
config: Arc::new(config),
subscriptions: HashMap::new(),
agreement_signed_signal: SignalSlot::<AgreementApproved>::new(),
agreement_signed_signal: SignalSlot::<NewAgreement>::new(),
agreement_terminated_signal: SignalSlot::<CloseAgreement>::new(),
handles: HashMap::new(),
};
Expand Down Expand Up @@ -169,11 +171,7 @@ impl ProviderMarket {
// Market internals - proposals and agreements reactions
// =========================================== //

fn on_agreement_approved(
&mut self,
msg: AgreementApproved,
_ctx: &mut Context<Self>,
) -> Result<()> {
fn on_agreement_approved(&mut self, msg: NewAgreement, _ctx: &mut Context<Self>) -> Result<()> {
log::info!("Got approved agreement [{}].", msg.agreement.agreement_id,);
// At this moment we only forward agreement to outside world.
self.agreement_signed_signal.send_signal(msg)
Expand Down Expand Up @@ -359,6 +357,16 @@ async fn process_agreement(

match action {
AgreementResponse::ApproveAgreement => {
// Prepare Provider for Agreement. We aren't sure here, that approval will
// succeed, but we are obligated to reserve all promised resources for Requestor,
// so after `approve_agreement` will return, we are ready to create activities.
ctx.market
.send(NewAgreement {
agreement: agreement.clone(),
})
.await?
.ok();

// TODO: We should retry approval, but only a few times, than we should
// give up since it's better to take another agreement.
let result = ctx
Expand Down Expand Up @@ -386,11 +394,6 @@ async fn process_agreement(

// We negotiated agreement and here responsibility of ProviderMarket ends.
// Notify outside world about agreement for further processing.
let message = AgreementApproved {
agreement: agreement.clone(),
};

let _ = ctx.market.send(message).await?;
}
AgreementResponse::RejectAgreement { reason } => {
ctx.api
Expand Down Expand Up @@ -571,16 +574,13 @@ impl Handler<OnAgreementTerminated> for ProviderMarket {

self.agreement_terminated_signal
.send_signal(CloseAgreement {
is_terminated: true,
cause: ClosingCause::Termination,
agreement_id: id.clone(),
})
.map_err(|e| {
log::error!(
"Failed to propagate termination info for agreement [{}]. {}",
id,
e
)
})
.log_err_msg(&format!(
"Failed to propagate termination info for agreement [{}]",
id
))
.ok();
Ok(())
}
Expand All @@ -604,14 +604,10 @@ impl Handler<CreateOffer> for ProviderMarket {
.negotiator
.create_offer(&msg.offer_definition)
.await
.map_err(|e| {
log::error!(
"Negotiator failed to create offer for preset [{}]. Error: {}",
msg.preset.name,
e
);
e
})?;
.log_err_msg(&format!(
"Negotiator failed to create offer for preset [{}]",
msg.preset.name,
))?;

log::debug!(
"Offer created: {}",
Expand All @@ -623,14 +619,10 @@ impl Handler<CreateOffer> for ProviderMarket {
let preset_name = msg.preset.name.clone();
subscribe(ctx.market, ctx.api, offer, msg.preset)
.await
.map_err(|error| {
log::error!(
"Can't subscribe new offer for preset [{}], error: {}",
preset_name,
error
);
error
})?;
.log_err_msg(&format!(
"Can't subscribe new offer for preset [{}]",
preset_name,
))?;
Ok(())
}
.boxed_local()
Expand Down Expand Up @@ -702,13 +694,13 @@ async fn resubscribe_offers(
let preset = sub.preset;
let preset_name = preset.name.clone();

if let Err(e) = subscribe(market.clone(), api.clone(), offer, preset).await {
log::warn!(
"Unable to create subscription for preset {}: {}",
subscribe(market.clone(), api.clone(), offer, preset)
.await
.log_warn_msg(&format!(
"Unable to create subscription for preset {}",
preset_name,
e
);
}
))
.ok();
}
}

Expand All @@ -720,18 +712,28 @@ impl Handler<AgreementFinalized> for ProviderMarket {
let agreement_id = msg.id.clone();
let result = msg.result.clone();

if let AgreementResult::ApprovalFailed = &msg.result {
self.agreement_terminated_signal
.send_signal(CloseAgreement {
cause: ClosingCause::ApprovalFail,
agreement_id: agreement_id.clone(),
})
.log_err_msg(&format!(
"Failed to propagate ApprovalFailed info for agreement [{}]",
agreement_id
))
.ok();
}

let future = async move {
if let Err(error) = ctx
.negotiator
ctx.negotiator
.agreement_finalized(&agreement_id, result)
.await
{
log::warn!(
"Negotiator failed while handling agreement [{}] finalize. Error: {}",
.log_err_msg(&format!(
"Negotiator failed while handling agreement [{}] finalize",
&agreement_id,
error,
);
}
))
.ok();
}
.into_actor(self)
.map(|_, myself, ctx| {
Expand Down Expand Up @@ -818,9 +820,9 @@ impl Handler<Unsubscribe> for ProviderMarket {
}

forward_actix_handler!(ProviderMarket, Subscription, on_subscription);
forward_actix_handler!(ProviderMarket, AgreementApproved, on_agreement_approved);
forward_actix_handler!(ProviderMarket, NewAgreement, on_agreement_approved);
actix_signal_handler!(ProviderMarket, CloseAgreement, agreement_terminated_signal);
actix_signal_handler!(ProviderMarket, AgreementApproved, agreement_signed_signal);
actix_signal_handler!(ProviderMarket, NewAgreement, agreement_signed_signal);

fn get_backoff() -> backoff::ExponentialBackoff {
// TODO: We could have config for Market actor to be able to set at least initial interval.
Expand Down
6 changes: 3 additions & 3 deletions agent/provider/src/payments/payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::agreement::{compute_cost, ActivityPayment, AgreementPayment, CostInfo
use super::model::PaymentModel;
use super::payment_checker::{DeadlineChecker, DeadlineElapsed, StopTracking, TrackDeadline};
use crate::execution::{ActivityCreated, ActivityDestroyed};
use crate::market::provider_market::AgreementApproved;
use crate::market::provider_market::NewAgreement;
use crate::market::termination_reason::BreakReason;
use crate::tasks::{AgreementBroken, AgreementClosed, BreakAgreement};

Expand Down Expand Up @@ -165,7 +165,7 @@ impl Payments {

pub fn on_signed_agreement(
&mut self,
msg: AgreementApproved,
msg: NewAgreement,
_ctx: &mut Context<Self>,
) -> Result<()> {
log::info!(
Expand Down Expand Up @@ -383,7 +383,7 @@ async fn compute_cost_and_send_debit_note(
Ok((debit_note, cost_info))
}

forward_actix_handler!(Payments, AgreementApproved, on_signed_agreement);
forward_actix_handler!(Payments, NewAgreement, on_signed_agreement);

impl Handler<ActivityCreated> for Payments {
type Result = anyhow::Result<()>;
Expand Down
35 changes: 23 additions & 12 deletions agent/provider/src/tasks/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use ya_utils_actix::forward_actix_handler;
use super::task_info::TaskInfo;
use super::task_state::{AgreementState, TasksStates};
use crate::execution::{ActivityCreated, ActivityDestroyed, TaskRunner};
use crate::market::provider_market::{AgreementApproved, ProviderMarket};
use crate::market::provider_market::{NewAgreement, ProviderMarket};
use crate::market::termination_reason::BreakReason;
use crate::payments::Payments;

Expand All @@ -36,12 +36,19 @@ pub struct BreakAgreement {
pub reason: BreakReason,
}

#[derive(Clone, PartialEq)]
pub enum ClosingCause {
ApprovalFail,
Termination,
SingleActivity,
}

/// Notifies TaskManager that Requestor close agreement.
#[derive(Message, Clone)]
#[rtype(result = "Result<()>")]
pub struct CloseAgreement {
pub agreement_id: String,
pub is_terminated: bool,
pub cause: ClosingCause,
}

// =========================================== //
Expand Down Expand Up @@ -200,7 +207,7 @@ impl TaskManager {
}
}

fn add_new_agreement(&mut self, msg: &AgreementApproved) -> anyhow::Result<TaskInfo> {
fn add_new_agreement(&mut self, msg: &NewAgreement) -> anyhow::Result<TaskInfo> {
let agreement_id = msg.agreement.agreement_id.clone();
self.tasks.new_agreement(&agreement_id)?;

Expand Down Expand Up @@ -235,7 +242,7 @@ impl Handler<InitializeTaskManager> for TaskManager {

let future = async move {
// Listen to AgreementApproved event.
let msg = Subscribe::<AgreementApproved>(actx.myself.clone().recipient());
let msg = Subscribe::<NewAgreement>(actx.myself.clone().recipient());
actx.market.send(msg).await?;

// Listen to Agreement terminated event from market.
Expand Down Expand Up @@ -263,10 +270,10 @@ impl Handler<InitializeTaskManager> for TaskManager {
// Messages modifying agreement state
// =========================================== //

impl Handler<AgreementApproved> for TaskManager {
impl Handler<NewAgreement> for TaskManager {
type Result = ActorResponse<Self, (), Error>;

fn handle(&mut self, msg: AgreementApproved, ctx: &mut Context<Self>) -> Self::Result {
fn handle(&mut self, msg: NewAgreement, ctx: &mut Context<Self>) -> Self::Result {
// Add new agreement with it's state.
let task_info = match self.add_new_agreement(&msg) {
Err(error) => {
Expand Down Expand Up @@ -398,7 +405,7 @@ impl Handler<ActivityDestroyed> for TaskManager {

actx.myself.do_send(CloseAgreement {
agreement_id: agreement_id.to_string(),
is_terminated: false,
cause: ClosingCause::SingleActivity,
});
}
Ok(())
Expand Down Expand Up @@ -462,14 +469,18 @@ impl Handler<CloseAgreement> for TaskManager {
let future = async move {
start_transition(&actx.myself, &msg.agreement_id, AgreementState::Closed).await?;

let msg = AgreementClosed {
let closed_msg = AgreementClosed {
agreement_id: msg.agreement_id.clone(),
send_terminate: !msg.is_terminated,
send_terminate: msg.cause == ClosingCause::Termination,
};

actx.runner.do_send(msg.clone());
actx.market.do_send(msg.clone());
actx.payments.send(msg.clone()).await??;
// No need to notify market.
if msg.cause != ClosingCause::ApprovalFail {
actx.market.do_send(closed_msg.clone());
}

actx.runner.do_send(closed_msg.clone());
actx.payments.send(closed_msg.clone()).await??;

finish_transition(&actx.myself, &msg.agreement_id, AgreementState::Closed).await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ CREATE TABLE market_agreement(
approved_signature TEXT,
committed_signature TEXT,

CHECK (state in ('Proposal','Pending','Cancelled','Rejected','Approved','Expired','Terminated'))
CHECK (state in ('Proposal','Pending','Cancelled','Rejected','Approved','Expired','Terminated', 'Approving'))
);

-- Change Proposal state from enum to Text value for better database introspection.
Expand Down
Loading

0 comments on commit 5e472b9

Please sign in to comment.