Skip to content

Commit

Permalink
refactor: MpoolPushMessage calling
Browse files Browse the repository at this point in the history
  • Loading branch information
aatifsyed committed Apr 4, 2024
1 parent cd26918 commit 493c4a1
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 38 deletions.
15 changes: 9 additions & 6 deletions src/cli/subcommands/attach_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ use std::{
str::FromStr,
};

use crate::chain::ChainEpochDelta;
use crate::chain_sync::SyncStage;
use crate::rpc_client::*;
use crate::shim::{address::Address, message::Message};
use crate::{
chain::ChainEpochDelta,
rpc::{self, mpool_api::MpoolPushMessage, RpcMethodExt as _},
};
use crate::{cli::humantoken, message::SignedMessage};
use boa_engine::{
object::{builtins::JsArray, FunctionObjectBuilder},
Expand Down Expand Up @@ -255,8 +258,11 @@ async fn send_message(params: SendMessageParams, api: ApiInfo) -> anyhow::Result
Address::from_str(&to)?,
humantoken::parse(&value)?, // Convert forest_shim::TokenAmount to TokenAmount3
);

Ok(api.mpool_push_message(message, None).await?)
Ok(
MpoolPushMessage::call(&rpc::Client::from(api), (message.into(), None))
.await?
.into_inner(),
)
}

type SleepParams = (u64,);
Expand Down Expand Up @@ -344,9 +350,6 @@ impl AttachCommand {
"wallet_has" => ApiInfo::wallet_has_req,
"wallet_set_default" => ApiInfo::wallet_set_default_req,

// Message Pool API
"mpool_push_message" => |(message, specs)| ApiInfo::mpool_push_message_req(message, specs),

// Common API
"version" => |()| ApiInfo::version_req(),
"shutdown" => |()| ApiInfo::shutdown_req(),
Expand Down
6 changes: 5 additions & 1 deletion src/cli/subcommands/send_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

use std::str::FromStr as _;

use crate::rpc::mpool_api::MpoolPushMessage;
use crate::rpc::{self, RpcMethodExt as _};
use crate::rpc_client::ApiInfo;
use crate::shim::address::{Address, StrictAddress};
use crate::shim::econ::TokenAmount;
Expand Down Expand Up @@ -53,7 +55,9 @@ impl SendCommand {
..Default::default()
};

let signed_msg = api.mpool_push_message(message, None).await?;
let signed_msg = MpoolPushMessage::call(&rpc::Client::from(api), (message.into(), None))
.await?
.into_inner();

println!("{}", signed_msg.cid().unwrap());

Expand Down
5 changes: 2 additions & 3 deletions src/rpc/reflect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ pub trait RpcMethodExt<const ARITY: usize>: RpcMethod<ARITY> {
}
}
}
/// Returns [`Err`] if any of the parameters fail to serialize.
fn request(
params: Self::Params,
) -> Result<crate::rpc_client::RpcRequest<Self::Ok>, serde_json::Error>
Expand Down Expand Up @@ -216,13 +217,11 @@ pub trait RpcMethodExt<const ARITY: usize>: RpcMethod<ARITY> {
Self::Params: Serialize,
Self::Ok: DeserializeOwned,
{
// stay on current thread so don't require `Self::Params: Send`
let request = Self::request(params);
async {
// TODO(aatifsyed): https://github.com/ChainSafe/forest/issues/4032
// Client::call has an inappropriate HasLotusJson
// bound, work around it for now.
let json = client.call(request?.lower()).await?;
let json = client.call(Self::request(params)?.lower()).await?;
Ok(serde_json::from_value(json)?)
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/rpc_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,15 @@ impl ApiInfo {
// This function should return jsonrpsee::core::ClientError,
// but that change should wait until _after_ all the methods
// have been migrated.
//
// In the limit, only rpc::Client should be making calls,
// and ApiInfo should be removed.
pub async fn call<T: HasLotusJson + std::fmt::Debug>(
&self,
req: RpcRequest<T>,
) -> Result<T, JsonRpcError> {
use jsonrpsee::core::ClientError;
match rpc::Client::new(
multiaddr2url(&self.multiaddr).ok_or(JsonRpcError::internal_error(
"couldn't convert multiaddr to URL",
None,
))?,
self.token.clone(),
)
.call(req)
.await
{
match rpc::Client::from(self.clone()).call(req).await {
Ok(it) => Ok(it),
Err(e) => match e {
ClientError::Call(it) => Err(it.into()),
Expand All @@ -119,6 +113,12 @@ impl ApiInfo {
}
}

impl From<ApiInfo> for rpc::Client {
fn from(value: ApiInfo) -> Self {
rpc::Client::new(value.url, value.token)
}
}

/// An `RpcRequest` is an at-rest description of a remote procedure call. It can
/// be invoked using `ApiInfo::call`.
///
Expand Down
19 changes: 1 addition & 18 deletions src/rpc_client/mpool_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,13 @@

use crate::{
message::SignedMessage,
rpc::{mpool_api::*, types::MessageSendSpec, RpcMethod},
shim::message::Message,
rpc::{mpool_api::*, RpcMethod},
};
use cid::Cid;

use super::{ApiInfo, JsonRpcError, RpcRequest};

impl ApiInfo {
pub async fn mpool_push_message(
&self,
message: Message,
specs: Option<MessageSendSpec>,
) -> Result<SignedMessage, JsonRpcError> {
self.call(Self::mpool_push_message_req(message, specs))
.await
}

pub fn mpool_push_message_req(
message: Message,
specs: Option<MessageSendSpec>,
) -> RpcRequest<SignedMessage> {
RpcRequest::new(MpoolPushMessage::NAME, (message, specs))
}

pub async fn mpool_pending(&self, cids: Vec<Cid>) -> Result<Vec<SignedMessage>, JsonRpcError> {
self.call(Self::mpool_pending_req(cids)).await
}
Expand Down

0 comments on commit 493c4a1

Please sign in to comment.