Skip to content

Commit

Permalink
feat: Require Send + Sync for T in Worker<T> to allow near-workspaces…
Browse files Browse the repository at this point in the history
… usage in multithreading async runtimes (#328)
  • Loading branch information
aleksuss authored Oct 30, 2023
1 parent fa1d71b commit ff2bd56
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Run clippy
run: cargo clippy --benches -- -D clippy::all
run: cargo clippy --all-targets -- -D clippy::all -D clippy::nursery

cargo-fmt:
runs-on: ubuntu-20.04
Expand Down
2 changes: 1 addition & 1 deletion examples/src/croncat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const COUNTER_CONTRACT: &[u8] = include_bytes!("../res/counter.wasm");
/// `AgentStatus` struct taken from [croncat repo](github.com/CronCats/contracts/) to
/// deserialize into after we get the result of a transaction and converting over to
/// this particular type.
#[derive(Debug, Deserialize, PartialEq)]
#[derive(Debug, Deserialize, PartialEq, Eq)]
pub enum AgentStatus {
Active,
Pending,
Expand Down
6 changes: 1 addition & 5 deletions examples/src/spooning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,7 @@ async fn main() -> anyhow::Result<()> {

// Patch our testnet STATE into our local sandbox:
worker
.patch_state(
sandbox_contract.id(),
"STATE".as_bytes(),
&status_msg.try_to_vec()?,
)
.patch_state(sandbox_contract.id(), b"STATE", &status_msg.try_to_vec()?)
.await?;

// Now grab the state to see that it has indeed been patched:
Expand Down
6 changes: 3 additions & 3 deletions workspaces/src/error/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl std::error::Error for Error {

impl<T> From<PoisonError<T>> for Error {
fn from(value: PoisonError<T>) -> Self {
Error::custom(ErrorKind::Other, value.to_string())
Self::custom(ErrorKind::Other, value.to_string())
}
}

Expand Down Expand Up @@ -167,7 +167,7 @@ impl SandboxErrorCode {

impl From<SandboxErrorCode> for Error {
fn from(code: SandboxErrorCode) -> Self {
Error::simple(ErrorKind::Sandbox(code))
Self::simple(ErrorKind::Sandbox(code))
}
}

Expand All @@ -189,6 +189,6 @@ impl RpcErrorCode {

impl From<RpcErrorCode> for Error {
fn from(code: RpcErrorCode) -> Self {
Error::simple(ErrorKind::Rpc(code))
Self::simple(ErrorKind::Rpc(code))
}
}
2 changes: 1 addition & 1 deletion workspaces/src/network/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::server::ValidatorKey;

pub(crate) type BoxFuture<'a, T> = std::pin::Pin<Box<dyn Future<Output = T> + Send + 'a>>;

/// This trait provides a way to construct Networks out of a single builder. Currently
/// This trait provides a way to construct Networks out of a single builder. Currently,
/// not planned to offer this trait outside, since the custom networks can just construct
/// themselves however they want utilizing `Worker::new` like so:
/// ```ignore
Expand Down
4 changes: 2 additions & 2 deletions workspaces/src/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ impl ViewResultDetails {

impl From<CallResult> for ViewResultDetails {
fn from(result: CallResult) -> Self {
ViewResultDetails {
Self {
result: result.result,
logs: result.logs,
}
Expand Down Expand Up @@ -525,7 +525,7 @@ impl Value {

impl From<ExecutionOutcomeWithIdView> for ExecutionOutcome {
fn from(view: ExecutionOutcomeWithIdView) -> Self {
ExecutionOutcome {
Self {
transaction_hash: CryptoHash(view.id.0),
block_hash: CryptoHash(view.block_hash.0),
logs: view.outcome.logs,
Expand Down
19 changes: 8 additions & 11 deletions workspaces/src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ impl Client {
pub(crate) async fn query_broadcast_tx(
&self,
method: &methods::broadcast_tx_commit::RpcBroadcastTxCommitRequest,
) -> MethodCallResult<
FinalExecutionOutcomeView,
near_jsonrpc_primitives::types::transactions::RpcTransactionError,
> {
) -> MethodCallResult<FinalExecutionOutcomeView, RpcTransactionError> {
retry(|| async {
let result = self.rpc_client.call(method).await;
match &result {
Expand Down Expand Up @@ -124,16 +121,16 @@ impl Client {

pub(crate) async fn query_nolog<M>(&self, method: M) -> MethodCallResult<M::Response, M::Error>
where
M: methods::RpcMethod,
M: methods::RpcMethod + Send + Sync,
{
retry(|| async { self.rpc_client.call(&method).await }).await
}

pub(crate) async fn query<M>(&self, method: M) -> MethodCallResult<M::Response, M::Error>
where
M: methods::RpcMethod + Debug,
M::Response: Debug,
M::Error: Debug,
M: methods::RpcMethod + Debug + Send + Sync,
M::Response: Debug + Send,
M::Error: Debug + Send,
{
retry(|| async {
let result = self.rpc_client.call(&method).await;
Expand Down Expand Up @@ -456,7 +453,7 @@ impl Client {

pub(crate) async fn access_key(
client: &Client,
account_id: near_primitives::account::id::AccountId,
account_id: AccountId,
public_key: near_crypto::PublicKey,
) -> Result<(AccessKeyView, CryptoHash)> {
let query_resp = client
Expand Down Expand Up @@ -522,8 +519,8 @@ async fn fetch_tx_nonce(

pub(crate) async fn retry<R, E, T, F>(task: F) -> T::Output
where
F: FnMut() -> T,
T: core::future::Future<Output = core::result::Result<R, E>>,
F: FnMut() -> T + Send,
T: core::future::Future<Output = core::result::Result<R, E>> + Send,
{
// Exponential backoff starting w/ 5ms for maximum retry of 4 times with the following delays:
// 5, 25, 125, 625 ms
Expand Down
10 changes: 5 additions & 5 deletions workspaces/src/rpc/patch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<'a> ImportContractTransaction<'a> {
}

/// Process the transaction, and return the result of the execution.
pub async fn transact(self) -> crate::result::Result<Contract> {
pub async fn transact(self) -> Result<Contract> {
let from_account_id = self.account_id;
let into_account_id = self.into_account_id.as_ref().unwrap_or(from_account_id);

Expand Down Expand Up @@ -153,7 +153,7 @@ impl<'a> ImportContractTransaction<'a> {
/// or to patch an entire account.
enum AccountUpdate {
Update(AccountDetailsPatch),
FromCurrent(Box<dyn Fn(AccountDetails) -> AccountDetailsPatch>),
FromCurrent(Box<dyn Fn(AccountDetails) -> AccountDetailsPatch + Send>),
}

pub struct PatchTransaction {
Expand All @@ -166,7 +166,7 @@ pub struct PatchTransaction {

impl PatchTransaction {
pub(crate) fn new(worker: &Worker<Sandbox>, account_id: AccountId) -> Self {
PatchTransaction {
Self {
account_id,
records: vec![],
worker: worker.clone(),
Expand All @@ -184,9 +184,9 @@ impl PatchTransaction {
/// Patch and overwrite the info contained inside an [`crate::Account`] in sandbox. This
/// will allow us to fetch the current details on the chain and allow us to update
/// the account details w.r.t to them.
pub fn account_from_current<F: 'static>(mut self, f: F) -> Self
pub fn account_from_current<F>(mut self, f: F) -> Self
where
F: Fn(AccountDetails) -> AccountDetailsPatch,
F: Fn(AccountDetails) -> AccountDetailsPatch + Send + 'static,
{
self.account_updates
.push(AccountUpdate::FromCurrent(Box::new(f)));
Expand Down
6 changes: 3 additions & 3 deletions workspaces/src/types/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl fmt::Debug for Account {
impl Account {
/// Create a new account with the given path to the credentials JSON file
pub fn from_file(
path: impl AsRef<std::path::Path>,
path: impl AsRef<Path>,
worker: &Worker<impl Network + 'static>,
) -> Result<Self> {
let signer = InMemorySigner::from_file(path.as_ref())?;
Expand Down Expand Up @@ -179,7 +179,7 @@ impl Account {
}

/// Store the credentials of this account locally in the directory provided.
pub async fn store_credentials(&self, save_dir: impl AsRef<Path>) -> Result<()> {
pub async fn store_credentials(&self, save_dir: impl AsRef<Path> + Send) -> Result<()> {
let savepath = save_dir.as_ref();
std::fs::create_dir_all(&save_dir).map_err(|e| ErrorKind::Io.custom(e))?;
let savepath = savepath.join(format!("{}.json", self.id()));
Expand Down Expand Up @@ -334,7 +334,7 @@ pub struct AccountDetailsPatch {
}

impl AccountDetailsPatch {
pub fn reduce(&mut self, acc: AccountDetailsPatch) {
pub fn reduce(&mut self, acc: Self) {
if let Some(balance) = acc.balance {
self.balance = Some(balance);
}
Expand Down
2 changes: 1 addition & 1 deletion workspaces/src/types/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl From<ChunkView> for Chunk {

impl From<ChunkHeaderView> for ChunkHeader {
fn from(view: ChunkHeaderView) -> Self {
ChunkHeader {
Self {
chunk_hash: view.chunk_hash.into(),
prev_block_hash: view.prev_block_hash.into(),
height_created: view.height_created,
Expand Down
3 changes: 1 addition & 2 deletions workspaces/src/types/gas_meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ impl GasMeter {

/// Reset the gas consumed to 0.
pub fn reset(&self) -> Result<()> {
let mut meter = self.gas.lock()?;
*meter = Gas::from_gas(0);
*self.gas.lock()? = Gas::from_gas(0);
Ok(())
}
}
6 changes: 3 additions & 3 deletions workspaces/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ impl TryFrom<u8> for KeyType {

fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(KeyType::ED25519),
1 => Ok(KeyType::SECP256K1),
0 => Ok(Self::ED25519),
1 => Ok(Self::SECP256K1),
unknown_key_type => Err(ErrorKind::DataConversion
.custom(format!("Unknown key type provided: {unknown_key_type}"))),
}
Expand Down Expand Up @@ -336,7 +336,7 @@ impl TryFrom<&[u8]> for CryptoHash {
}
let mut buf = [0; 32];
buf.copy_from_slice(bytes);
Ok(CryptoHash(buf))
Ok(Self(buf))
}
}

Expand Down
43 changes: 24 additions & 19 deletions workspaces/src/worker/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ where
}
}

impl<T: ?Sized> Worker<T>
impl<T> Worker<T>
where
T: NetworkClient,
T: NetworkClient + ?Sized,
{
pub(crate) fn client(&self) -> &Client {
self.workspace.client()
Expand Down Expand Up @@ -144,8 +144,27 @@ where
)
}

/// View account details of a specific account on the network.
pub fn view_account(&self, account_id: &AccountId) -> Query<'_, ViewAccount> {
Query::new(
self.client(),
ViewAccount {
account_id: account_id.clone(),
},
)
}

pub fn gas_price(&self) -> Query<'_, GasPrice> {
Query::new(self.client(), GasPrice)
}
}

impl<T> Worker<T>
where
T: NetworkClient + Send + Sync + ?Sized,
{
/// Transfer tokens from one account to another. The signer is the account
/// that will be used to to send from.
/// that will be used to send from.
pub async fn transfer_near(
&self,
signer: &InMemorySigner,
Expand Down Expand Up @@ -173,26 +192,12 @@ where
.map(ExecutionFinalResult::from_view)
.map_err(crate::error::Error::from)
}

/// View account details of a specific account on the network.
pub fn view_account(&self, account_id: &AccountId) -> Query<'_, ViewAccount> {
Query::new(
self.client(),
ViewAccount {
account_id: account_id.clone(),
},
)
}

pub fn gas_price(&self) -> Query<'_, GasPrice> {
Query::new(self.client(), GasPrice)
}
}

#[cfg(feature = "experimental")]
impl<T: ?Sized> Worker<T>
impl<T> Worker<T>
where
T: NetworkClient,
T: NetworkClient + Send + Sync + ?Sized,
{
pub async fn changes_in_block(
&self,
Expand Down
30 changes: 15 additions & 15 deletions workspaces/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{Network, Result};

/// The `Worker` type allows us to interact with any NEAR related networks, such
/// as mainnet and testnet. This controls where the environment the worker is
/// running on top of is. Refer to this for all network related actions such as
/// running on top of it. Refer to this for all network related actions such as
/// deploying a contract, or interacting with transactions.
pub struct Worker<T: ?Sized> {
pub(crate) workspace: Arc<T>,
Expand Down Expand Up @@ -97,62 +97,62 @@ pub fn custom<'a>(rpc_url: &str) -> NetworkBuilder<'a, Custom> {
/// Run a locally scoped task where a [`sandbox`] instanced [`Worker`] is supplied.
pub async fn with_sandbox<F, T>(task: F) -> Result<T::Output>
where
F: Fn(Worker<Sandbox>) -> T,
T: core::future::Future,
F: Fn(Worker<Sandbox>) -> T + Send + Sync,
T: core::future::Future + Send,
{
Ok(task(sandbox().await?).await)
}

/// Run a locally scoped task where a [`testnet`] instanced [`Worker`] is supplied.
pub async fn with_testnet<F, T>(task: F) -> Result<T::Output>
where
F: Fn(Worker<Testnet>) -> T,
T: core::future::Future,
F: Fn(Worker<Testnet>) -> T + Send + Sync,
T: core::future::Future + Send,
{
Ok(task(testnet().await?).await)
}

/// Run a locally scoped task where a [`testnet_archival`] instanced [`Worker`] is supplied.
pub async fn with_testnet_archival<F, T>(task: F) -> Result<T::Output>
where
F: Fn(Worker<Testnet>) -> T,
T: core::future::Future,
F: Fn(Worker<Testnet>) -> T + Send + Sync,
T: core::future::Future + Send,
{
Ok(task(testnet_archival().await?).await)
}

/// Run a locally scoped task where a [`mainnet`] instanced [`Worker`] is supplied.
pub async fn with_mainnet<F, T>(task: F) -> Result<T::Output>
where
F: Fn(Worker<Mainnet>) -> T,
T: core::future::Future,
F: Fn(Worker<Mainnet>) -> T + Send + Sync,
T: core::future::Future + Send,
{
Ok(task(mainnet().await?).await)
}

/// Run a locally scoped task where a [`mainnet_archival`] instanced [`Worker`] is supplied.
pub async fn with_mainnet_archival<F, T>(task: F) -> Result<T::Output>
where
F: Fn(Worker<Mainnet>) -> T,
T: core::future::Future,
F: Fn(Worker<Mainnet>) -> T + Send + Sync,
T: core::future::Future + Send,
{
Ok(task(mainnet_archival().await?).await)
}

/// Run a locally scoped task where a [`betanet`] instanced [`Worker`] is supplied.
pub async fn with_betanet<F, T>(task: F) -> Result<T::Output>
where
F: Fn(Worker<Betanet>) -> T,
T: core::future::Future,
F: Fn(Worker<Betanet>) -> T + Send + Sync,
T: core::future::Future + Send,
{
Ok(task(betanet().await?).await)
}

#[allow(dead_code)]
pub async fn with_custom<F, T>(task: F, rpc_url: &str) -> Result<T::Output>
where
F: Fn(Worker<Custom>) -> T,
T: core::future::Future,
F: Fn(Worker<Custom>) -> T + Send + Sync,
T: core::future::Future + Send,
{
Ok(task(custom(rpc_url).await?).await)
}

0 comments on commit ff2bd56

Please sign in to comment.