Skip to content

Commit

Permalink
feat(chain-listener): exit expired deals (#2143)
Browse files Browse the repository at this point in the history
* feat(chain-listener): exit expired deals

* update

* fix

* fix

* fix

* fix

* subscribe to unit events on start

* don't send exit tx for ended deals

* fix unit deactivation

* exit on deal ended

* resubscribe on subscription termination

* log message from newHeads subs

* fix
  • Loading branch information
justprosh authored Mar 9, 2024
1 parent 2a9b599 commit 596f2a6
Show file tree
Hide file tree
Showing 30 changed files with 669 additions and 93 deletions.
16 changes: 10 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ members = [
"crates/chain-types",
"crates/types",
"crates/core-manager",
]
]
exclude = [
"nox/tests/tetraplets",
]
Expand Down
1 change: 1 addition & 0 deletions crates/chain-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ futures = { workspace = true }
ccp-shared = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }

[dev-dependencies]
mockito = { workspace = true }
59 changes: 53 additions & 6 deletions crates/chain-connector/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use ccp_shared::proof::CCProof;
use ccp_shared::types::{Difficulty, GlobalNonce};
use ccp_shared::types::{Difficulty, GlobalNonce, CUID};
use clarity::Transaction;
use ethabi::ethereum_types::U256;
use ethabi::Token;
Expand All @@ -18,19 +18,23 @@ use tokio::sync::Mutex;

use chain_data::ChainDataError::InvalidTokenSize;
use chain_data::{next_opt, parse_chain_data, peer_id_to_bytes, ChainFunction};
use chain_types::{Commitment, CommitmentId, CommitmentStatus, ComputePeer, ComputeUnit};
use chain_types::{
Commitment, CommitmentId, CommitmentStatus, ComputePeer, ComputeUnit, DealStatus,
};
use fluence_libp2p::PeerId;
use particle_args::{Args, JError};
use particle_builtins::{wrap, CustomService};
use particle_execution::{ParticleParams, ServiceFunction};
use server_config::ChainConfig;
use types::DealId;

use crate::error::{process_response, ConnectorError};
use crate::function::{GetCommitmentFunction, GetStatusFunction, SubmitProofFunction};
use crate::function::{GetCommitmentFunction, GetCommitmentStatusFunction, SubmitProofFunction};
use crate::ConnectorError::InvalidBaseFeePerGas;
use crate::{
CurrentEpochFunction, DifficultyFunction, EpochDurationFunction, GetComputePeerFunction,
GetComputeUnitsFunction, GetGlobalNonceFunction, InitTimestampFunction,
GetComputeUnitsFunction, GetDealStatusFunction, GetGlobalNonceFunction, InitTimestampFunction,
ReturnComputeUnitFromDeal,
};

const BASE_FEE_MULTIPLIER: f64 = 0.125;
Expand Down Expand Up @@ -166,6 +170,7 @@ impl ChainConnector {

pub async fn send_tx(&self, data: Vec<u8>, to: &str) -> Result<String, ConnectorError> {
let base_fee_per_gas = self.get_base_fee_per_gas().await?;
tracing::info!(target: "chain-connector", "Estimating gas for tx from {} to {} data {}", self.config.wallet_key.to_address(), to, hex::encode(&data));
let gas_limit = self.estimate_gas_limit(&data, to).await?;
let max_priority_fee_per_gas = self.max_priority_fee_per_gas().await?;

Expand Down Expand Up @@ -236,7 +241,7 @@ impl ChainConnector {
&self,
commitment_id: CommitmentId,
) -> Result<CommitmentStatus, ConnectorError> {
let data = GetStatusFunction::data(&[Token::FixedBytes(commitment_id.0)])?;
let data = GetCommitmentStatusFunction::data(&[Token::FixedBytes(commitment_id.0)])?;
let resp: String = process_response(
self.client
.request(
Expand Down Expand Up @@ -327,7 +332,7 @@ impl ChainConnector {
.await,
)?;
let mut tokens =
parse_chain_data(&resp, &GetComputeUnitsFunction::signature())?.into_iter();
parse_chain_data(&resp, &GetComputeUnitsFunction::result_signature())?.into_iter();
let units = next_opt(&mut tokens, "units", Token::into_array)?.into_iter();
let compute_units = units
.map(ComputeUnit::from_token)
Expand Down Expand Up @@ -391,6 +396,48 @@ impl ChainConnector {
})
}

pub async fn get_deal_statuses<'a, I>(
&self,
deal_ids: I,
) -> Result<Vec<Result<DealStatus, ConnectorError>>, ConnectorError>
where
I: Iterator<Item = &'a DealId>,
{
let mut batch = BatchRequestBuilder::new();
for deal_id in deal_ids {
let data = GetDealStatusFunction::data(&[])?;
batch.insert(
"eth_call",
rpc_params![
json!({
"data": data,
"to": deal_id.to_address(),
}),
"latest"
],
)?;
}

let resp: BatchResponse<String> = self.client.batch_request(batch).await?;
let mut statuses = vec![];

for status in resp.into_iter() {
let status = status
.map(|r| DealStatus::from(&r).map_err(ConnectorError::ParseChainDataFailed))
.map_err(|e| ConnectorError::RpcError(e.to_owned().into()))?;
statuses.push(status);
}

Ok(statuses)
}

pub async fn exit_deal(&self, cu_id: &CUID) -> Result<String, ConnectorError> {
let data =
ReturnComputeUnitFromDeal::data_bytes(&[Token::FixedBytes(cu_id.as_ref().to_vec())])?;
self.send_tx(data, &self.config.market_contract_address)
.await
}

fn difficulty_params(&self) -> eyre::Result<ArrayParams> {
let data = DifficultyFunction::data(&[])?;
Ok(rpc_params![
Expand Down
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/current_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl ChainFunction for CurrentEpochFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::Uint(256)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/difficulty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl ChainFunction for DifficultyFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::FixedBytes(32)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/epoch_duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl ChainFunction for EpochDurationFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::Uint(256)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/get_commitment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl ChainFunction for GetCommitmentFunction {
state_mutability: ethabi::StateMutability::View,
}
}
fn signature() -> Vec<ethabi::ParamType> {
fn result_signature() -> Vec<ethabi::ParamType> {
Commitment::signature()
}
}
6 changes: 3 additions & 3 deletions crates/chain-connector/src/function/get_commitment_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use chain_data::ChainFunction;
/// @param commitmentId Commitment id
/// @return status commitment status
/// function getStatus(bytes32 commitmentId) external view returns (CCStatus);
pub struct GetStatusFunction;
pub struct GetCommitmentStatusFunction;

impl ChainFunction for GetStatusFunction {
impl ChainFunction for GetCommitmentStatusFunction {
fn function() -> ethabi::Function {
#[allow(deprecated)]
let function = ethabi::Function {
Expand All @@ -23,7 +23,7 @@ impl ChainFunction for GetStatusFunction {
function
}

fn signature() -> Vec<ethabi::ParamType> {
fn result_signature() -> Vec<ethabi::ParamType> {
vec![ethabi::ParamType::FixedBytes(32)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/get_compute_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl ChainFunction for GetComputePeerFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
ComputePeer::signature()
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/get_compute_units.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl ChainFunction for GetComputeUnitsFunction {
state_mutability: ethabi::StateMutability::View,
}
}
fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::Array(Box::new(ParamType::Tuple(
ComputeUnit::signature(),
)))]
Expand Down
23 changes: 23 additions & 0 deletions crates/chain-connector/src/function/get_deal_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use chain_data::ChainFunction;
use ethabi::ParamType;

/// function getStatus() public view returns (Status)
pub struct GetDealStatusFunction;

impl ChainFunction for GetDealStatusFunction {
fn function() -> ethabi::Function {
#[allow(deprecated)]
let function = ethabi::Function {
name: "getStatus".to_string(),
inputs: vec![],
outputs: vec![],
constant: None,
state_mutability: ethabi::StateMutability::View,
};
function
}

fn result_signature() -> Vec<ParamType> {
vec![ParamType::FixedBytes(32)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/global_nonce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl ChainFunction for GetGlobalNonceFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::FixedBytes(32)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/init_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl ChainFunction for InitTimestampFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::Uint(256)]
}
}
6 changes: 5 additions & 1 deletion crates/chain-connector/src/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@ mod get_commitment;
mod get_commitment_status;
mod get_compute_peer;
mod get_compute_units;
mod get_deal_status;
mod global_nonce;
mod init_timestamp;
mod remove_cu_from_deal;
mod submit_proof;

pub use current_epoch::CurrentEpochFunction;
pub use difficulty::DifficultyFunction;
pub use epoch_duration::EpochDurationFunction;
pub use get_commitment::GetCommitmentFunction;
pub use get_commitment_status::GetStatusFunction;
pub use get_commitment_status::GetCommitmentStatusFunction;
pub use get_compute_peer::GetComputePeerFunction;
pub use get_compute_units::GetComputeUnitsFunction;
pub use get_deal_status::GetDealStatusFunction;
pub use global_nonce::GetGlobalNonceFunction;
pub use init_timestamp::InitTimestampFunction;
pub use remove_cu_from_deal::ReturnComputeUnitFromDeal;
pub use submit_proof::SubmitProofFunction;
27 changes: 27 additions & 0 deletions crates/chain-connector/src/function/remove_cu_from_deal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use chain_data::ChainFunction;
use ethabi::{Function, Param, ParamType, StateMutability};

/// @dev Return the compute unit from a deal
/// function returnComputeUnitFromDeal(bytes32 unitId) external;
pub struct ReturnComputeUnitFromDeal;

impl ChainFunction for ReturnComputeUnitFromDeal {
fn function() -> Function {
#[allow(deprecated)]
Function {
name: "returnComputeUnitFromDeal".to_string(),
inputs: vec![Param {
name: "unitId".to_string(),
kind: ParamType::FixedBytes(32),
internal_type: None,
}],
outputs: vec![],
constant: None,
state_mutability: StateMutability::NonPayable,
}
}

fn result_signature() -> Vec<ParamType> {
vec![]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/submit_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl ChainFunction for SubmitProofFunction {
function
}

fn signature() -> Vec<ethabi::ParamType> {
fn result_signature() -> Vec<ethabi::ParamType> {
vec![
ethabi::ParamType::FixedBytes(32),
ethabi::ParamType::FixedBytes(32),
Expand Down
8 changes: 4 additions & 4 deletions crates/chain-data/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ethabi::Token;

pub trait ChainFunction {
fn function() -> ethabi::Function;
fn signature() -> Vec<ethabi::ParamType>;
fn result_signature() -> Vec<ethabi::ParamType>;

fn data(inputs: &[Token]) -> Result<String, ChainDataError> {
let function = Self::function();
Expand All @@ -18,16 +18,16 @@ pub trait ChainFunction {
}

fn decode_uint(data: &str) -> Result<U256, ChainDataError> {
let mut tokens = crate::parse_chain_data(data, &Self::signature())?.into_iter();
let mut tokens = crate::parse_chain_data(data, &Self::result_signature())?.into_iter();
next_opt(&mut tokens, "uint", Token::into_uint)
}

fn decode_fixed_bytes(data: &str) -> Result<Vec<u8>, ChainDataError> {
let mut tokens = crate::parse_chain_data(data, &Self::signature())?.into_iter();
let mut tokens = crate::parse_chain_data(data, &Self::result_signature())?.into_iter();
next_opt(&mut tokens, "bytes", Token::into_fixed_bytes)
}

fn decode_tuple(data: &str) -> Result<Vec<Token>, ChainDataError> {
crate::parse_chain_data(data, &Self::signature())
crate::parse_chain_data(data, &Self::result_signature())
}
}
Loading

0 comments on commit 596f2a6

Please sign in to comment.