Skip to content

Commit

Permalink
Built-in Actor events for DDO (#1491)
Browse files Browse the repository at this point in the history
* squashed commit to merge built-in Actor events to master
  • Loading branch information
aarshkshah1992 authored Feb 1, 2024
1 parent 859c731 commit 72e350a
Show file tree
Hide file tree
Showing 49 changed files with 1,966 additions and 447 deletions.
76 changes: 76 additions & 0 deletions actors/market/src/emit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use fil_actors_runtime::runtime::Runtime;
use fil_actors_runtime::{ActorError, EventBuilder};
use fvm_shared::deal::DealID;
use fvm_shared::ActorID;

/// Indicates a deal has been published.
pub fn deal_published(
rt: &impl Runtime,
client: ActorID,
provider: ActorID,
deal_id: DealID,
) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new()
.typ("deal-published")
.with_parties(deal_id, client, provider)
.build()?,
)
}

/// Indicates a deal has been activated.
pub fn deal_activated(
rt: &impl Runtime,
deal_id: DealID,
client: ActorID,
provider: ActorID,
) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new()
.typ("deal-activated")
.with_parties(deal_id, client, provider)
.build()?,
)
}

/// Indicates a deal has been terminated.
pub fn deal_terminated(
rt: &impl Runtime,
deal_id: DealID,
client: ActorID,
provider: ActorID,
) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new()
.typ("deal-terminated")
.with_parties(deal_id, client, provider)
.build()?,
)
}

/// Indicates a deal has been completed successfully.
pub fn deal_completed(
rt: &impl Runtime,
deal_id: DealID,
client: ActorID,
provider: ActorID,
) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new()
.typ("deal-completed")
.with_parties(deal_id, client, provider)
.build()?,
)
}

trait WithParties {
fn with_parties(self, id: DealID, client: ActorID, provider: ActorID) -> EventBuilder;
}

impl WithParties for EventBuilder {
fn with_parties(self, id: DealID, client: ActorID, provider: ActorID) -> EventBuilder {
self.field_indexed("id", &id)
.field_indexed("client", &client)
.field_indexed("provider", &provider)
}
}
109 changes: 81 additions & 28 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub mod policy;
pub mod testing;

mod deal;
mod emit;
mod state;
mod types;

Expand Down Expand Up @@ -470,19 +471,26 @@ impl Actor {
// notify clients, any failures cause the entire publish_storage_deals method to fail
// it's unsafe to ignore errors here, since that could be used to attack storage contract clients
// that might be unaware they're making storage deals
for (i, valid_deal) in valid_deals.iter().enumerate() {
for (valid_deal, &deal_id) in valid_deals.iter().zip(&new_deal_ids) {
_ = extract_send_result(rt.send_simple(
&valid_deal.proposal.client,
MARKET_NOTIFY_DEAL_METHOD,
IpldBlock::serialize_cbor(&MarketNotifyDealParams {
proposal: valid_deal.serialized_proposal.to_vec(),
deal_id: new_deal_ids[i],
deal_id,
})?,
TokenAmount::zero(),
))
.with_context_code(ExitCode::USR_ILLEGAL_ARGUMENT, || {
format!("failed to notify deal with proposal cid {}", valid_deal.cid)
})?;

emit::deal_published(
rt,
valid_deal.proposal.client.id().unwrap(),
valid_deal.proposal.provider.id().unwrap(),
deal_id,
)?;
}

Ok(PublishStorageDealsReturn { ids: new_deal_ids, valid_deals: valid_input_bf })
Expand Down Expand Up @@ -555,7 +563,7 @@ impl Actor {
let mut deal_states: Vec<(DealID, DealState)> = vec![];
let mut batch_gen = BatchReturnGen::new(params.sectors.len());
let mut activations: Vec<SectorDealActivation> = vec![];
let mut activated_deals = BTreeSet::<DealID>::new();
let mut activated_deals: HashSet<DealID> = HashSet::new();
let mut sectors_deals: Vec<(SectorNumber, SectorDealIDs)> = vec![];

'sector: for sector in params.sectors {
Expand Down Expand Up @@ -598,8 +606,7 @@ impl Actor {
validated_proposals.push(proposal);
}

let mut verified_infos = vec![];
let mut nonverified_deal_space = BigInt::zero();
let mut activated = vec![];
// Given that all deals validated, prepare the state updates for them all.
// There's no continue below here to ensure updates are consistent.
// Any error must abort.
Expand All @@ -609,16 +616,12 @@ impl Actor {
let alloc_id =
pending_deal_allocation_ids.delete(deal_id)?.unwrap_or(NO_ALLOCATION_ID);

if alloc_id != NO_ALLOCATION_ID {
verified_infos.push(VerifiedDealInfo {
client: proposal.client.id().unwrap(),
allocation_id: alloc_id,
data: proposal.piece_cid,
size: proposal.piece_size,
})
} else {
nonverified_deal_space += proposal.piece_size.0;
}
activated.push(ActivatedDeal {
client: proposal.client.id().unwrap(),
allocation_id: alloc_id,
data: proposal.piece_cid,
size: proposal.piece_size,
});

// Prepare initial deal state.
deal_states.push((
Expand All @@ -640,11 +643,17 @@ impl Actor {

sectors_deals
.push((sector.sector_number, SectorDealIDs { deals: sector.deal_ids.clone() }));
activations.push(SectorDealActivation {
nonverified_deal_space,
verified_infos,
unsealed_cid: data_commitment,
});
activations.push(SectorDealActivation { activated, unsealed_cid: data_commitment });

for (deal_id, proposal) in sector.deal_ids.iter().zip(&validated_proposals) {
emit::deal_activated(
rt,
*deal_id,
proposal.client.id().unwrap(),
proposal.provider.id().unwrap(),
)?;
}

batch_gen.add_success();
}

Expand Down Expand Up @@ -843,6 +852,13 @@ impl Actor {
.entry(state.sector_number)
.or_default()
.push(id);

emit::deal_terminated(
rt,
id,
deal.client.id().unwrap(),
deal.provider.id().unwrap(),
)?;
}

st.remove_sector_deal_ids(rt.store(), &provider_deals_to_remove)?;
Expand Down Expand Up @@ -926,13 +942,14 @@ impl Actor {
// https://github.com/filecoin-project/builtin-actors/issues/1389
// handling of legacy deals is still done in cron. we handle such deals here and continue to
// reschedule them. eventually, all legacy deals will expire and the below code can be removed.
let (slash_amount, _payment_amount, remove_deal) = st.process_deal_update(
rt.store(),
&state,
&deal_proposal,
&dcid,
curr_epoch,
)?;
let (slash_amount, _payment_amount, completed, remove_deal) = st
.process_deal_update(
rt.store(),
&state,
&deal_proposal,
&dcid,
curr_epoch,
)?;

if remove_deal {
// TODO: remove handling for terminated-deal slashing when marked-for-termination deals are all processed
Expand All @@ -949,6 +966,15 @@ impl Actor {
.entry(state.sector_number)
.or_default()
.push(deal_id);

if !completed {
emit::deal_terminated(
rt,
deal_id,
deal_proposal.client.id().unwrap(),
deal_proposal.provider.id().unwrap(),
)?;
}
} else {
if !slash_amount.is_zero() {
return Err(actor_error!(
Expand All @@ -972,6 +998,15 @@ impl Actor {
);
new_updates_scheduled.entry(next_epoch).or_default().push(deal_id);
}

if completed {
emit::deal_completed(
rt,
deal_id,
deal_proposal.client.id().unwrap(),
deal_proposal.provider.id().unwrap(),
)?;
}
}
epochs_completed.push(i);
}
Expand Down Expand Up @@ -1247,7 +1282,7 @@ impl Actor {
));
}

let (_, payment_amount, remove_deal) = match st.process_deal_update(
let (_, payment_amount, completed, remove_deal) = match st.process_deal_update(
rt.store(),
&deal_state,
&deal_proposal,
Expand All @@ -1269,6 +1304,15 @@ impl Actor {
.entry(deal_state.sector_number)
.or_default()
.push(deal_id);

if !completed {
emit::deal_terminated(
rt,
deal_id,
deal_proposal.client.id().unwrap(),
deal_proposal.provider.id().unwrap(),
)?;
}
} else {
deal_state.last_updated_epoch = curr_epoch;
new_deal_states.push((deal_id, deal_state));
Expand All @@ -1279,6 +1323,15 @@ impl Actor {
payment: payment_amount,
});
batch_gen.add_success();

if completed {
emit::deal_completed(
rt,
deal_id,
deal_proposal.client.id().unwrap(),
deal_proposal.provider.id().unwrap(),
)?;
}
}

st.put_deal_states(rt.store(), &new_deal_states)?;
Expand Down
9 changes: 5 additions & 4 deletions actors/market/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,7 @@ impl State {
(
/* slash_amount */ TokenAmount,
/* payment_amount */ TokenAmount,
/* is_deal_completed */ bool,
/* remove */ bool,
),
ActorError,
Expand Down Expand Up @@ -859,7 +860,7 @@ impl State {

// this is a safe no-op but can happen if a storage provider calls settle_deal_payments too early
if deal.start_epoch > epoch {
return Ok((TokenAmount::zero(), TokenAmount::zero(), false));
return Ok((TokenAmount::zero(), TokenAmount::zero(), false, false));
}

let payment_end_epoch = if ever_slashed {
Expand Down Expand Up @@ -921,15 +922,15 @@ impl State {
self.slash_balance(store, &deal.provider, &slashed, Reason::ProviderCollateral)
.context("slashing balance")?;

return Ok((slashed, payment_remaining + elapsed_payment, true));
return Ok((slashed, payment_remaining + elapsed_payment, false, true));
}

if epoch >= deal.end_epoch {
self.process_deal_expired(store, deal, state)?;
return Ok((TokenAmount::zero(), elapsed_payment, true));
return Ok((TokenAmount::zero(), elapsed_payment, true, true));
}

Ok((TokenAmount::zero(), elapsed_payment, false))
Ok((TokenAmount::zero(), elapsed_payment, false, false))
}

pub fn process_slashed_deal<BS>(
Expand Down
13 changes: 5 additions & 8 deletions actors/market/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,23 +102,20 @@ pub struct BatchActivateDealsParams {
pub compute_cid: bool,
}

// Information about a verified deal that has been activated.
// Information about a deal that has been activated.
#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct VerifiedDealInfo {
pub struct ActivatedDeal {
pub client: ActorID,
pub allocation_id: AllocationID,
pub allocation_id: AllocationID, // NO_ALLOCATION_ID for unverified deals.
pub data: Cid,
pub size: PaddedPieceSize,
}

// Information about a sector-grouping of deals that have been activated.
#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct SectorDealActivation {
/// The total size of the non-verified deals activated.
#[serde(with = "bigint_ser")]
pub nonverified_deal_space: BigInt,
/// Information about each verified deal activated.
pub verified_infos: Vec<VerifiedDealInfo>,
/// Information about each deal activated.
pub activated: Vec<ActivatedDeal>,
/// Unsealed CID computed from the deals specified for the sector.
/// A None indicates no deals were specified, or the computation was not requested.
pub unsealed_cid: Option<Cid>,
Expand Down
1 change: 1 addition & 0 deletions actors/market/tests/activate_deal_failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ fn assert_activation_failure(
deal_ids: vec![deal_id],
}],
false,
&[],
)
.unwrap();
let res: BatchActivateDealsResult =
Expand Down
9 changes: 5 additions & 4 deletions actors/market/tests/batch_activate_deals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ fn sectors_fail_and_succeed_independently_during_batch_activation() {
},
];

let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false).unwrap();
let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false, &[id_4]).unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

Expand Down Expand Up @@ -227,7 +227,7 @@ fn handles_sectors_empty_of_deals_gracefully() {
SectorDeals { sector_number: 3, deal_ids: vec![], sector_type, sector_expiry: END_EPOCH },
];

let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false).unwrap();
let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false, &[id_1]).unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

Expand Down Expand Up @@ -273,7 +273,7 @@ fn fails_to_activate_single_sector_duplicate_deals() {
sector_expiry: END_EPOCH,
},
];
let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false).unwrap();
let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false, &[]).unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

Expand Down Expand Up @@ -328,7 +328,8 @@ fn fails_to_activate_cross_sector_duplicate_deals() {
},
];

let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false).unwrap();
let res =
batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false, &[id_1, id_3]).unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

Expand Down
Loading

0 comments on commit 72e350a

Please sign in to comment.