diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs index 1370e888ac..0632efa248 100644 --- a/crates/chain-connector/src/connector.rs +++ b/crates/chain-connector/src/connector.rs @@ -30,8 +30,10 @@ use eyre::eyre; use jsonrpsee::core::async_trait; use jsonrpsee::core::client::{BatchResponse, ClientT}; use jsonrpsee::core::params::{ArrayParams, BatchRequestBuilder}; +use jsonrpsee::core::traits::ToRpcParams; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::rpc_params; +use serde::de::DeserializeOwned; use serde_json::{json, Value}; use tokio::sync::Mutex; @@ -95,7 +97,7 @@ impl HttpChainConnector { config: ChainConfig, host_id: PeerId, ) -> eyre::Result<(Arc, HashMap)> { - tracing::info!(target: "chain-connector","Connecting to chain via {}", config.http_endpoint); + tracing::info!(target: "chain-connector", "Connecting to chain via {}", config.http_endpoint); let connector = Arc::new(Self { client: Arc::new(HttpClientBuilder::default().build(&config.http_endpoint)?), @@ -113,11 +115,9 @@ impl HttpChainConnector { return Ok(Uint::from(fee)); } - let block: Value = process_response( - self.client - .request("eth_getBlockByNumber", rpc_params!["pending", false]) - .await, - )?; + let block: Value = self + .send_request("eth_getBlockByNumber", rpc_params!["pending", false]) + .await?; let fee = block .as_object() @@ -132,7 +132,7 @@ impl HttpChainConnector { Ok(base_fee_per_gas) } - pub async fn get_app_cid(&self, deals: impl Iterator) -> Result> { + pub async fn get_app_cid(&self, deals: impl Iterator) -> Result> { let data = Deal::appCIDCall {}.abi_encode(); let mut batch = BatchRequestBuilder::new(); for deal in deals { @@ -141,6 +141,8 @@ impl HttpChainConnector { rpc_params![EthCall::to(&data, &deal.to_address()), "latest"], )?; } + + tracing::debug!(target: "chain-connector", "Sending batched get_app_cid request: {batch:?}"); let resp: BatchResponse = self.client.batch_request(batch).await?; let mut cids = vec![]; for result in resp.into_iter() { @@ -173,9 +175,9 @@ impl HttpChainConnector { deals.retain(|deal_id, worker| { if worker.keys().len() > 1 { tracing::error!(target: "chain-connector", "Deal {deal_id} has more then one worker for the peer which is forbiden at the moment"); - false + false } else { - true + true } }); @@ -217,7 +219,7 @@ impl HttpChainConnector { deal_ids: I, ) -> Result>> where - I: IntoIterator + ExactSizeIterator, + I: IntoIterator + ExactSizeIterator, { let mut batch = BatchRequestBuilder::new(); let deal_count = deal_ids.len(); @@ -262,13 +264,31 @@ impl HttpChainConnector { Ok(deal_info) } + async fn send_request(&self, method: &str, params: ArrayParams) -> Result + where + T: DeserializeOwned, + { + // i need it for pretty logs and errors + let serialized_params = params + .clone() + .to_rpc_params()? + .map(|p| p.to_string()) + .unwrap_or("[]".to_string()); + + tracing::debug!(target: "chain-connector", "Sending request {method} with params {serialized_params} to {}", self.config.http_endpoint); + let resp: Result = process_response( + self.client.request(method, params).await, + method, + serialized_params, + ); + resp + } + async fn get_tx_nonce(&self) -> Result { let address = self.config.wallet_key.to_address().to_string(); - let resp: String = process_response( - self.client - .request("eth_getTransactionCount", rpc_params![address, "pending"]) - .await, - )?; + let resp: String = self + .send_request("eth_getTransactionCount", rpc_params![address, "pending"]) + .await?; let nonce = U256::from_str(&resp).map_err(|err| InvalidU256(resp, err.to_string()))?; Ok(nonce) @@ -279,29 +299,27 @@ impl HttpChainConnector { return Ok(Uint::from(fee)); } - let resp: String = process_response( - self.client - .request("eth_maxPriorityFeePerGas", rpc_params![]) - .await, - )?; + let resp: String = self + .send_request("eth_maxPriorityFeePerGas", rpc_params![]) + .await?; + let max_priority_fee_per_gas = U256::from_str(&resp).map_err(|err| InvalidU256(resp, err.to_string()))?; Ok(max_priority_fee_per_gas) } async fn estimate_gas_limit(&self, data: &[u8], to: &str) -> Result { - let resp: String = process_response( - self.client - .request( - "eth_estimateGas", - rpc_params![json!({ - "from": self.config.wallet_key.to_address().to_string(), - "to": to, - "data": encode_hex_0x(data), - })], - ) - .await, - )?; + let resp: String = self + .send_request( + "eth_estimateGas", + rpc_params![json!({ + "from": self.config.wallet_key.to_address().to_string(), + "to": to, + "data": encode_hex_0x(data), + })], + ) + .await?; + let limit = U256::from_str(&resp).map_err(|err| InvalidU256(resp, err.to_string()))?; Ok(limit) } @@ -347,11 +365,9 @@ impl HttpChainConnector { .to_bytes(); let signed_tx = encode_hex_0x(signed_tx); - let result: Result = process_response( - self.client - .request("eth_sendRawTransaction", rpc_params![signed_tx]) - .await, - ); + let result: Result = self + .send_request("eth_sendRawTransaction", rpc_params![signed_tx]) + .await; match result { Ok(resp) => { @@ -378,7 +394,7 @@ impl HttpChainConnector { onchainId: FixedBytes::from_slice(&onchain_worker_id), offchainId: peer_id_to_bytes(worker_id.into()).into(), } - .abi_encode(); + .abi_encode(); tracing::debug!(target: "chain-connector", "Registering worker {worker_id} for deal {deal_id} with onchain_id {}", encode_hex_0x(onchain_worker_id)); self.send_tx(data, &deal_id.to_address()).await } @@ -391,14 +407,13 @@ impl HttpChainConnector { pub async fn get_deal_workers(&self, deal_id: &DealId) -> Result> { let data = Deal::getWorkersCall {}.abi_encode(); - let resp: String = process_response( - self.client - .request( - "eth_call", - rpc_params![EthCall::to(&data, &deal_id.to_address()), "latest"], - ) - .await, - )?; + let resp: String = self + .send_request( + "eth_call", + rpc_params![EthCall::to(&data, &deal_id.to_address()), "latest"], + ) + .await?; + let bytes = decode_hex(&resp)?; let workers = as SolType>::abi_decode(&bytes, true)?; @@ -447,12 +462,11 @@ impl ChainConnector for HttpChainConnector { let data = Offer::getComputePeerCall { peerId: peer_id.into(), } - .abi_encode(); - let resp: String = process_response( - self.client - .request("eth_call", self.make_latest_diamond_rpc_params(data)) - .await, - )?; + .abi_encode(); + let resp: String = self + .send_request("eth_call", self.make_latest_diamond_rpc_params(data)) + .await?; + let compute_peer = ::abi_decode(&decode_hex(&resp)?, true)?; Ok(CommitmentId::new(compute_peer.commitmentId.0)) } @@ -469,6 +483,7 @@ impl ChainConnector for HttpChainConnector { batch.insert("eth_call", self.max_proofs_per_epoch_params())?; batch.insert("eth_getBlockByNumber", rpc_params!["latest", false])?; + tracing::debug!(target: "chain-connector", "Sending batched init params request: {batch:?}"); let resp: BatchResponse = self.client.batch_request(batch).await?; tracing::debug!(target: "chain-connector", "Got cc init params response: {resp:?}"); @@ -516,13 +531,12 @@ impl ChainConnector for HttpChainConnector { let data = Offer::getComputeUnitsCall { peerId: peer_id_to_bytes(self.host_id).into(), } - .abi_encode(); + .abi_encode(); + + let resp: String = self + .send_request("eth_call", self.make_latest_diamond_rpc_params(data)) + .await?; - let resp: String = process_response( - self.client - .request("eth_call", self.make_latest_diamond_rpc_params(data)) - .await, - )?; let bytes = decode_hex(&resp)?; let compute_units = as SolType>::abi_decode(&bytes, true)?; @@ -533,13 +547,12 @@ impl ChainConnector for HttpChainConnector { let data = Capacity::getStatusCall { commitmentId: commitment_id.0.into(), } - .abi_encode(); + .abi_encode(); + + let resp: String = self + .send_request("eth_call", self.make_latest_diamond_rpc_params(data)) + .await?; - let resp: String = process_response( - self.client - .request("eth_call", self.make_latest_diamond_rpc_params(data)) - .await, - )?; Ok(::abi_decode( &decode_hex(&resp)?, true, @@ -547,11 +560,9 @@ impl ChainConnector for HttpChainConnector { } async fn get_global_nonce(&self) -> Result { - let resp: String = process_response( - self.client - .request("eth_call", self.global_nonce_params()) - .await, - )?; + let resp: String = self + .send_request("eth_call", self.global_nonce_params()) + .await?; let bytes: FixedBytes<32> = FixedBytes::from_str(&resp)?; Ok(GlobalNonce::new(bytes.0)) @@ -575,7 +586,7 @@ impl ChainConnector for HttpChainConnector { .map(|hash| hash.as_ref().into()) .collect(), } - .abi_encode(); + .abi_encode(); self.send_tx(data, &self.config.diamond_contract_address) .await @@ -591,6 +602,8 @@ impl ChainConnector for HttpChainConnector { )?; } + tracing::debug!(target: "chain-connector", "Sending batched deal statuses request: {batch:?}"); + let resp: BatchResponse = self.client.batch_request(batch).await?; let statuses = resp .into_iter() @@ -612,7 +625,7 @@ impl ChainConnector for HttpChainConnector { let data = Deal::removeWorkerCall { onchainId: onchain_worker_id, } - .abi_encode(); + .abi_encode(); self.send_tx(data, &deal_id.to_address()).await } @@ -637,6 +650,9 @@ impl ChainConnector for HttpChainConnector { for tx_hash in tx_hashes { batch.insert("eth_getTransactionReceipt", rpc_params![tx_hash])?; } + + tracing::debug!(target: "chain-connector", "Sending batched tx transaction receipts request: {batch:?}"); + let resp: BatchResponse = self.client.batch_request(batch).await?; let receipts = resp @@ -664,7 +680,6 @@ fn parse_str_field(value: Option, field: &'static str) -> Result #[cfg(test)] mod tests { - use alloy_primitives::uint; use alloy_primitives::U256; use alloy_sol_types::sol_data::Array; @@ -699,14 +714,14 @@ mod tests { wallet_key: PrivateKey::from_str( "0x97a2456e78c4894c62eef6031972d1ca296ed40bf311ab54c231f13db59fc428", ) - .unwrap(), + .unwrap(), default_base_fee: None, default_priority_fee: None, }, peer_id_from_hex("0x6497db93b32e4cdd979ada46a23249f444da1efb186cd74b9666bd03f710028b") .unwrap(), ) - .unwrap(); + .unwrap(); connector } @@ -717,7 +732,7 @@ mod tests { id: alloy_primitives::hex!( "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5" ) - .into(), + .into(), deal: Default::default(), startEpoch: Default::default(), onchainWorkerId: Default::default(), @@ -727,7 +742,7 @@ mod tests { id: alloy_primitives::hex!( "ba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d1" ) - .into(), + .into(), deal: Default::default(), startEpoch: Default::default(), onchainWorkerId: Default::default(), @@ -954,7 +969,7 @@ mod tests { ::from_hex( "76889c92f61b9c5df216e048df56eb8f4eb02f172ab0d5b04edb9190ab9c9eec" ) - .unwrap() + .unwrap() ); assert_eq!(init_params.init_timestamp, uint!(1707760129_U256)); assert_eq!( @@ -962,7 +977,7 @@ mod tests { ::from_hex( "0000000000000000000000000000000000000000000000000000000000000005" ) - .unwrap() + .unwrap() ); assert_eq!( init_params.current_epoch, @@ -1027,7 +1042,7 @@ mod tests { assert_matches!( result.unwrap_err(), ConnectorError::RpcCallError { - code: _, + method:_, params:_, code: _, message: _, data, } if is_commitment_not_active(&data) @@ -1138,7 +1153,7 @@ mod tests { id: alloy_primitives::hex!( "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5" ) - .into(), + .into(), deal: alloy_primitives::hex!("5e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c").into(), startEpoch: Default::default(), onchainWorkerId: Default::default(), @@ -1148,7 +1163,7 @@ mod tests { id: alloy_primitives::hex!( "ba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d1" ) - .into(), + .into(), deal: alloy_primitives::hex!("6e3d0fde6f793b3115a9e7f5ebc195bbeed35d6d").into(), startEpoch: Default::default(), onchainWorkerId: Default::default(), @@ -1255,7 +1270,7 @@ mod tests { id: alloy_primitives::hex!( "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5" ) - .into(), + .into(), deal: Default::default(), startEpoch: Default::default(), onchainWorkerId: Default::default(), @@ -1265,7 +1280,7 @@ mod tests { id: alloy_primitives::hex!( "ba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d1" ) - .into(), + .into(), deal: Default::default(), startEpoch: Default::default(), onchainWorkerId: Default::default(), diff --git a/crates/chain-connector/src/error.rs b/crates/chain-connector/src/error.rs index 44d0d53ec6..54ac86d02f 100644 --- a/crates/chain-connector/src/error.rs +++ b/crates/chain-connector/src/error.rs @@ -17,10 +17,9 @@ * along with this program. If not, see . */ -use std::string::FromUtf8Error; - use jsonrpsee::core::client::{Error as RPCError, Error}; use jsonrpsee::types::ErrorObjectOwned; +use std::string::FromUtf8Error; use thiserror::Error; use chain_data::ChainDataError; @@ -32,8 +31,13 @@ pub enum ConnectorError { IpcInternalNetworkError(#[source] ErrorObjectOwned), #[error("RPC error: {0}")] RpcError(#[from] RPCError), - #[error("RPC call error: code: {code}, message: {message}, data: {data}")] + #[error("RPC call {method} with {params} failed with error: code: {code}, message: {message}, data: {data}" + )] RpcCallError { + /// Method + method: String, + /// Params + params: String, /// Code code: i32, /// Message @@ -69,7 +73,11 @@ pub enum ConnectorError { ParseError(#[from] serde_json::Error), } -pub fn process_response(response: Result) -> Result { +pub fn process_response( + response: Result, + method: &str, + params: String, +) -> Result { match response { Ok(data) => Ok(data), Err(err) => match err { @@ -87,7 +95,18 @@ pub fn process_response(response: Result) -> Result Vec { "connected_client=debug", "listener=debug", "chain-listener=debug", + "chain-connector=debug", ]; namespaces